Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.util.Preconditions;

/**
* Container class used by {@link StorageApiWritesShardedRecords} and {@link
Expand Down Expand Up @@ -138,22 +139,29 @@ public void close() {
}
}

@Memoized
public byte[] getTableSchemaHash() {
return TableRowToStorageApiProto.tableSchemaHash(getTableSchema());
}

boolean hasSchemaChanged(TableSchema updatedTableSchema) {
return updatedTableSchema.hashCode() != getTableSchema().hashCode();
}

public ByteString encodeUnknownFields(TableRow unknown, boolean ignoreUnknownValues)
throws TableRowToStorageApiProto.SchemaConversionException {
Message msg =
TableRowToStorageApiProto.messageFromTableRow(
getSchemaInformation(),
getDescriptorIgnoreRequired(),
unknown,
ignoreUnknownValues,
true,
null,
null,
null);
Preconditions.checkArgumentNotNull(
TableRowToStorageApiProto.messageFromTableRow(
getSchemaInformation(),
getDescriptorIgnoreRequired(),
unknown,
ignoreUnknownValues,
true,
null,
null,
null,
TableRowToStorageApiProto.ErrorCollector.DONT_COLLECT));
return msg.toByteString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4202,9 +4202,19 @@ private <DestinationT> WriteResult continueExpandTyped(
}
return input.apply(batchLoads);
} else if (method == Method.STORAGE_WRITE_API || method == Method.STORAGE_API_AT_LEAST_ONCE) {
boolean useSchemaUpdate =
getSchemaUpdateOptions() != null && !getSchemaUpdateOptions().isEmpty();
if (useSchemaUpdate) {
checkArgument(
!getAutoSchemaUpdate(),
"Schema update options are not supported when using auto schema update");
checkArgument(!getIgnoreUnknownValues());
}
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
StorageApiDynamicDestinations<T, DestinationT> storageApiDynamicDestinations;
if (getUseBeamSchema()) {
checkArgument(
!useSchemaUpdate, "SchemaUpdateOptions are not supported when using Beam schemas");
// This ensures that the Beam rows are directly translated into protos for Storage API
// writes, with no
// need to round trip through JSON TableRow objects.
Expand All @@ -4216,6 +4226,9 @@ private <DestinationT> WriteResult continueExpandTyped(
getFormatRecordOnFailureFunction(),
getRowMutationInformationFn() != null);
} else if (getWriteProtosClass() != null && getDirectWriteProtos()) {
checkArgument(
!useSchemaUpdate, "SchemaUpdateOptions are not supported when writing protos");

// We could support both of these by falling back to
// StorageApiDynamicDestinationsTableRow. This
// would defeat the optimization (we would be forced to create a new dynamic proto message
Expand All @@ -4233,13 +4246,18 @@ private <DestinationT> WriteResult continueExpandTyped(
!getIgnoreUnknownValues(),
"ignoreUnknownValues not supported when using writeProtos."
+ " Try setting withDirectWriteProtos(false)");
checkArgument(!useSchemaUpdate);

storageApiDynamicDestinations =
(StorageApiDynamicDestinations<T, DestinationT>)
new StorageApiDynamicDestinationsProto(
dynamicDestinations,
getWriteProtosClass(),
getFormatRecordOnFailureFunction());
} else if (getAvroRowWriterFactory() != null) {
checkArgument(
!useSchemaUpdate, "SchemaUpdateOptions are not supported when writing avros");

// we can configure the avro to storage write api proto converter for this
// assuming the format function returns an Avro GenericRecord
// and there is a schema defined
Expand All @@ -4248,7 +4266,6 @@ private <DestinationT> WriteResult continueExpandTyped(
|| getDynamicDestinations() != null
|| getSchemaFromView() != null,
"A schema must be provided for avro rows to be used with StorageWrite API.");

RowWriterFactory.AvroRowWriterFactory<T, GenericRecord, DestinationT>
recordWriterFactory =
(RowWriterFactory.AvroRowWriterFactory<T, GenericRecord, DestinationT>)
Expand All @@ -4275,7 +4292,10 @@ private <DestinationT> WriteResult continueExpandTyped(
getRowMutationInformationFn() != null,
getCreateDisposition(),
getIgnoreUnknownValues(),
getAutoSchemaUpdate());
getAutoSchemaUpdate(),
getSchemaUpdateOptions() == null
? Collections.emptySet()
: getSchemaUpdateOptions());
}

int numShards = getStorageApiNumStreams(bqOptions);
Expand All @@ -4287,6 +4307,7 @@ private <DestinationT> WriteResult continueExpandTyped(
StorageApiLoads<DestinationT, T> storageApiLoads =
new StorageApiLoads<>(
destinationCoder,
elementCoder,
storageApiDynamicDestinations,
getRowMutationInformationFn(),
getCreateDisposition(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,12 @@ public interface BigQueryOptions
Boolean getGroupFilesFileLoad();

void setGroupFilesFileLoad(Boolean value);

@Hidden
@Description(
"The number of parallelization to use for buffering elements when upgrading table schemas.")
@Default.Integer(50)
Integer getSchemaUpgradeBufferingShards();

void setSchemaUpgradeBufferingShards(Integer value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ <T> long insertAll(
/** Patch BigQuery {@link Table} description. */
Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription)
throws IOException, InterruptedException;

Table patchTableSchema(
TableReference tableReference, com.google.api.services.bigquery.model.TableSchema newSchema)
throws IOException, InterruptedException;
}

/** An interface to get, create and flush Cloud BigQuery STORAGE API write streams. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1459,6 +1459,31 @@ public Table patchTableDescription(
ALWAYS_RETRY);
}

@Override
public Table patchTableSchema(
TableReference tableReference, com.google.api.services.bigquery.model.TableSchema newSchema)
throws IOException, InterruptedException {
Table newTable = new Table();
newTable.setSchema(newSchema);

Tables.Patch request =
client
.tables()
.patch(
tableReference.getProjectId(),
tableReference.getDatasetId(),
tableReference.getTableId(),
newTable);
return executeWithRetries(
request,
String.format(
"Unable to patch table: %s, aborting after %d retries.",
tableReference, MAX_RPC_RETRIES),
Sleeper.DEFAULT,
createDefaultBackoff(),
DONT_RETRY_INVALID_ARG_OR_PRECONDITION);
}

@Override
public void close() throws Exception {
// Nothing to close
Expand Down Expand Up @@ -1664,6 +1689,11 @@ public void close() throws Exception {
return !errorExtractor.itemNotFound(input);
};

static final SerializableFunction<IOException, Boolean> DONT_RETRY_INVALID_ARG_OR_PRECONDITION =
input -> {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
return !errorExtractor.preconditionNotMet(input) && !errorExtractor.badRequest(input);
};
static final SerializableFunction<IOException, Boolean> ALWAYS_RETRY = input -> true;

@VisibleForTesting
Expand Down
Loading
Loading