Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.util.Preconditions;

/**
* Container class used by {@link StorageApiWritesShardedRecords} and {@link
Expand Down Expand Up @@ -138,22 +139,29 @@ public void close() {
}
}

@Memoized
public byte[] getTableSchemaHash() {
return TableRowToStorageApiProto.tableSchemaHash(getTableSchema());
}

boolean hasSchemaChanged(TableSchema updatedTableSchema) {
return updatedTableSchema.hashCode() != getTableSchema().hashCode();
}

public ByteString encodeUnknownFields(TableRow unknown, boolean ignoreUnknownValues)
throws TableRowToStorageApiProto.SchemaConversionException {
Message msg =
TableRowToStorageApiProto.messageFromTableRow(
getSchemaInformation(),
getDescriptorIgnoreRequired(),
unknown,
ignoreUnknownValues,
true,
null,
null,
null);
Preconditions.checkArgumentNotNull(
TableRowToStorageApiProto.messageFromTableRow(
getSchemaInformation(),
getDescriptorIgnoreRequired(),
unknown,
ignoreUnknownValues,
true,
null,
null,
null,
TableRowToStorageApiProto.ErrorCollector.DONT_COLLECT));
return msg.toByteString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3246,8 +3246,12 @@ public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
/**
* Allows the schema of the destination table to be updated as a side effect of the write.
*
* <p>This configuration applies only when writing to BigQuery with {@link Method#FILE_LOADS} as
* <p>This configuration applies only when writing to BigQuery with {@link Method#FILE_LOADS},
* {@link Method#STORAGE_WRITE_API", or {@link Method#STORAGE_API_AT_LEAST_ONCE} as
* method.
* <p>If using with storage-write API, new fields (except for nested messages) will always be created with type
* STRING.
* TODO: Followon PR will add support for a user-supplied type mapping.
*/
public Write<T> withSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions) {
checkArgument(schemaUpdateOptions != null, "schemaUpdateOptions can not be null");
Expand Down Expand Up @@ -4202,9 +4206,19 @@ private <DestinationT> WriteResult continueExpandTyped(
}
return input.apply(batchLoads);
} else if (method == Method.STORAGE_WRITE_API || method == Method.STORAGE_API_AT_LEAST_ONCE) {
boolean useSchemaUpdate =
getSchemaUpdateOptions() != null && !getSchemaUpdateOptions().isEmpty();
if (useSchemaUpdate) {
checkArgument(
!getAutoSchemaUpdate(),
"Schema update options are not supported when using auto schema update");
checkArgument(!getIgnoreUnknownValues());
}
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
StorageApiDynamicDestinations<T, DestinationT> storageApiDynamicDestinations;
if (getUseBeamSchema()) {
checkArgument(
!useSchemaUpdate, "SchemaUpdateOptions are not supported when using Beam schemas");
// This ensures that the Beam rows are directly translated into protos for Storage API
// writes, with no
// need to round trip through JSON TableRow objects.
Expand All @@ -4216,6 +4230,9 @@ private <DestinationT> WriteResult continueExpandTyped(
getFormatRecordOnFailureFunction(),
getRowMutationInformationFn() != null);
} else if (getWriteProtosClass() != null && getDirectWriteProtos()) {
checkArgument(
!useSchemaUpdate, "SchemaUpdateOptions are not supported when writing protos");

// We could support both of these by falling back to
// StorageApiDynamicDestinationsTableRow. This
// would defeat the optimization (we would be forced to create a new dynamic proto message
Expand All @@ -4233,13 +4250,18 @@ private <DestinationT> WriteResult continueExpandTyped(
!getIgnoreUnknownValues(),
"ignoreUnknownValues not supported when using writeProtos."
+ " Try setting withDirectWriteProtos(false)");
checkArgument(!useSchemaUpdate);

storageApiDynamicDestinations =
(StorageApiDynamicDestinations<T, DestinationT>)
new StorageApiDynamicDestinationsProto(
dynamicDestinations,
getWriteProtosClass(),
getFormatRecordOnFailureFunction());
} else if (getAvroRowWriterFactory() != null) {
checkArgument(
!useSchemaUpdate, "SchemaUpdateOptions are not supported when writing avros");

// we can configure the avro to storage write api proto converter for this
// assuming the format function returns an Avro GenericRecord
// and there is a schema defined
Expand All @@ -4248,7 +4270,6 @@ private <DestinationT> WriteResult continueExpandTyped(
|| getDynamicDestinations() != null
|| getSchemaFromView() != null,
"A schema must be provided for avro rows to be used with StorageWrite API.");

RowWriterFactory.AvroRowWriterFactory<T, GenericRecord, DestinationT>
recordWriterFactory =
(RowWriterFactory.AvroRowWriterFactory<T, GenericRecord, DestinationT>)
Expand All @@ -4275,7 +4296,10 @@ private <DestinationT> WriteResult continueExpandTyped(
getRowMutationInformationFn() != null,
getCreateDisposition(),
getIgnoreUnknownValues(),
getAutoSchemaUpdate());
getAutoSchemaUpdate(),
getSchemaUpdateOptions() == null
? Collections.emptySet()
: getSchemaUpdateOptions());
}

int numShards = getStorageApiNumStreams(bqOptions);
Expand All @@ -4287,6 +4311,7 @@ private <DestinationT> WriteResult continueExpandTyped(
StorageApiLoads<DestinationT, T> storageApiLoads =
new StorageApiLoads<>(
destinationCoder,
elementCoder,
storageApiDynamicDestinations,
getRowMutationInformationFn(),
getCreateDisposition(),
Expand All @@ -4304,7 +4329,8 @@ private <DestinationT> WriteResult continueExpandTyped(
getDefaultMissingValueInterpretation(),
getBigLakeConfiguration(),
getBadRecordRouter(),
getBadRecordErrorHandler());
getBadRecordErrorHandler(),
!getSchemaUpdateOptions().isEmpty());
return input.apply("StorageApiLoads", storageApiLoads);
} else {
throw new RuntimeException("Unexpected write method " + method);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,12 @@ public interface BigQueryOptions
Boolean getGroupFilesFileLoad();

void setGroupFilesFileLoad(Boolean value);

@Hidden
@Description(
"The number of parallelization to use for buffering elements when upgrading table schemas.")
@Default.Integer(50)
Integer getSchemaUpgradeBufferingShards();

void setSchemaUpgradeBufferingShards(Integer value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ <T> long insertAll(
/** Patch BigQuery {@link Table} description. */
Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription)
throws IOException, InterruptedException;

Table patchTableSchema(
TableReference tableReference, com.google.api.services.bigquery.model.TableSchema newSchema)
throws IOException, InterruptedException;
}

/** An interface to get, create and flush Cloud BigQuery STORAGE API write streams. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1459,6 +1459,31 @@ public Table patchTableDescription(
ALWAYS_RETRY);
}

@Override
public Table patchTableSchema(
TableReference tableReference, com.google.api.services.bigquery.model.TableSchema newSchema)
throws IOException, InterruptedException {
Table newTable = new Table();
newTable.setSchema(newSchema);

Tables.Patch request =
client
.tables()
.patch(
tableReference.getProjectId(),
tableReference.getDatasetId(),
tableReference.getTableId(),
newTable);
return executeWithRetries(
request,
String.format(
"Unable to patch table: %s, aborting after %d retries.",
tableReference, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
createDefaultBackoff(),
DONT_RETRY_INVALID_ARG_OR_PRECONDITION);
}

@Override
public void close() throws Exception {
// Nothing to close
Expand Down Expand Up @@ -1664,6 +1689,11 @@ public void close() throws Exception {
return !errorExtractor.itemNotFound(input);
};

static final SerializableFunction<IOException, Boolean> DONT_RETRY_INVALID_ARG_OR_PRECONDITION =
input -> {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
return !errorExtractor.preconditionNotMet(input) && !errorExtractor.badRequest(input);
};
static final SerializableFunction<IOException, Boolean> ALWAYS_RETRY = input -> true;

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class BigQuerySinkMetrics {

public static final String METRICS_NAMESPACE = "BigQuerySink";

// Status codes
// Status codes
public static final String UNKNOWN = Status.Code.UNKNOWN.toString();
public static final String OK = Status.Code.OK.toString();
Expand All @@ -63,7 +64,9 @@ public enum RpcMethod {
STREAMING_INSERTS,
APPEND_ROWS,
FLUSH_ROWS,
FINALIZE_STREAM
FINALIZE_STREAM,
PATCH_TABLE,
OPEN_WRITE_STREAM
}

// Status of a BigQuery row from the AppendRows RPC call.
Expand Down
Loading
Loading