Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Support for Bigtable materialized view source added (Java) ([#38053](https://github.com/apache/beam/issues/38053)).

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,12 @@ public abstract static class Read extends PTransform<PBegin, PCollection<Row>> {
return tableId != null && tableId.isAccessible() ? tableId.get() : null;
}

/** Returns the materialized view being read from. */
public @Nullable String getMaterializedViewName() {
ValueProvider<String> mvName = getBigtableReadOptions().getMaterializedViewName();
return mvName != null && mvName.isAccessible() ? mvName.get() : null;
}

/**
* Returns the Google Cloud Bigtable instance being read from, and other parameters.
*
Expand All @@ -415,7 +421,6 @@ static Read create() {
.setBigtableConfig(config)
.setBigtableReadOptions(
BigtableReadOptions.builder()
.setTableId(StaticValueProvider.of(""))
.setKeyRanges(
StaticValueProvider.of(Collections.singletonList(ByteKeyRange.ALL_KEYS)))
.build())
Expand Down Expand Up @@ -521,6 +526,30 @@ public Read withAppProfileId(String appProfileId) {
return withAppProfileId(StaticValueProvider.of(appProfileId));
}

/**
* Returns a new {@link BigtableIO.Read} that will read from the specified materialized view.
* Mutually exclusive with {@link #withTableId}.
*
* <p>Does not modify this object.
*/
public Read withMaterializedViewName(ValueProvider<String> materializedViewName) {
BigtableReadOptions bigtableReadOptions = getBigtableReadOptions();
return toBuilder()
.setBigtableReadOptions(
bigtableReadOptions.toBuilder().setMaterializedViewName(materializedViewName).build())
.build();
}

/**
* Returns a new {@link BigtableIO.Read} that will read from the specified materialized view.
* Mutually exclusive with {@link #withTableId}.
*
* <p>Does not modify this object.
*/
public Read withMaterializedViewName(String materializedViewName) {
return withMaterializedViewName(StaticValueProvider.of(materializedViewName));
}

/**
* WARNING: Should be used only to specify additional parameters for connection to the Cloud
* Bigtable, instanceId and projectId should be provided over {@link #withInstanceId} and {@link
Expand Down Expand Up @@ -769,6 +798,11 @@ public final String toString() {
private void validateTableExists(
BigtableConfig config, BigtableReadOptions readOptions, PipelineOptions options) {
if (config.getValidate() && config.isDataAccessible() && readOptions.isDataAccessible()) {
// Skip table existence validation for materialized views since the standard
// readRow-based check only works for tables.
if (readOptions.getMaterializedViewName() != null) {
return;
}
ValueProvider<String> tableIdProvider = checkArgumentNotNull(readOptions.getTableId());
String tableId = checkArgumentNotNull(tableIdProvider.get());
try {
Expand Down Expand Up @@ -1890,16 +1924,24 @@ public void validate() {
}

ValueProvider<String> tableId = readOptions.getTableId();
checkArgument(
tableId != null && tableId.isAccessible() && !tableId.get().isEmpty(),
"tableId was not supplied");
ValueProvider<String> mvName = readOptions.getMaterializedViewName();
boolean hasTableId = tableId != null && tableId.isAccessible() && !tableId.get().isEmpty();
boolean hasMvName = mvName != null && mvName.isAccessible() && !mvName.get().isEmpty();
checkArgument(hasTableId || hasMvName, "tableId or materializedViewName was not supplied");
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder.add(DisplayData.item("tableId", readOptions.getTableId()).withLabel("Table ID"));
if (readOptions.getTableId() != null) {
builder.add(DisplayData.item("tableId", readOptions.getTableId()).withLabel("Table ID"));
}
if (readOptions.getMaterializedViewName() != null) {
builder.add(
DisplayData.item("materializedViewName", readOptions.getMaterializedViewName())
.withLabel("Materialized View Name"));
}

if (getRowFilter() != null) {
builder.add(
Expand Down Expand Up @@ -1984,10 +2026,14 @@ public List<ByteKeyRange> getRanges() {
return readOptions.getMaxBufferElementCount();
}

public ValueProvider<String> getTableId() {
public @Nullable ValueProvider<String> getTableId() {
return readOptions.getTableId();
}

public @Nullable ValueProvider<String> getMaterializedViewName() {
return readOptions.getMaterializedViewName();
}

void reportLineageOnce(BigtableService.Reader reader) {
if (!reportedLineage) {
reader.reportLineage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
abstract class BigtableReadOptions implements Serializable {

/** Returns the table id. */
abstract ValueProvider<String> getTableId();
abstract @Nullable ValueProvider<String> getTableId();

/** Returns the materialized view name. */
abstract @Nullable ValueProvider<String> getMaterializedViewName();

/** Returns the row filter to use. */
abstract @Nullable ValueProvider<RowFilter> getRowFilter();
Expand Down Expand Up @@ -72,7 +75,9 @@ static BigtableReadOptions.Builder builder() {
@AutoValue.Builder
abstract static class Builder {

abstract Builder setTableId(ValueProvider<String> tableId);
abstract Builder setTableId(@Nullable ValueProvider<String> tableId);

abstract Builder setMaterializedViewName(@Nullable ValueProvider<String> materializedViewName);

abstract Builder setRowFilter(ValueProvider<RowFilter> rowFilter);

Expand Down Expand Up @@ -108,12 +113,18 @@ BigtableReadOptions withKeyRange(ByteKeyRange keyRange) {
}

boolean isDataAccessible() {
if (getMaterializedViewName() != null) {
return getMaterializedViewName().isAccessible();
}
return getTableId() != null && getTableId().isAccessible();
}

void populateDisplayData(DisplayData.Builder builder) {
builder
.addIfNotNull(DisplayData.item("tableId", getTableId()).withLabel("Bigtable Table Id"))
.addIfNotNull(
DisplayData.item("materializedViewName", getMaterializedViewName())
.withLabel("Bigtable Materialized View Name"))
.addIfNotNull(DisplayData.item("rowFilter", getRowFilter()).withLabel("Row Filter"))
.addIfNotNull(
DisplayData.item("rowFilterTextProto", getRowFilterTextProto()).withLabel("Row Filter"))
Expand All @@ -127,9 +138,19 @@ void populateDisplayData(DisplayData.Builder builder) {
}

void validate() {
boolean hasTableId =
getTableId() != null && (!getTableId().isAccessible() || !getTableId().get().isEmpty());
boolean hasMaterializedViewName =
getMaterializedViewName() != null
&& (!getMaterializedViewName().isAccessible()
|| !getMaterializedViewName().get().isEmpty());

checkArgument(
hasTableId || hasMaterializedViewName,
"Either a Bigtable table id or a materialized view name must be set");
checkArgument(
getTableId() != null && (!getTableId().isAccessible() || !getTableId().get().isEmpty()),
"Could not obtain Bigtable table id");
!(hasTableId && hasMaterializedViewName),
"Only one of Bigtable table id or materialized view name can be set, not both");

if (getRowFilter() != null && getRowFilter().isAccessible()) {
checkArgument(getRowFilter().get() != null, "rowFilter can not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ public String identifier() {

@Override
public String description() {
return "Reads data from a Google Cloud Bigtable table.\n"
+ "The transform requires the project ID, instance ID, and table ID parameters.\n"
+ "Optionally, the output can be flattened or nested rows.\n"
return "Reads data from a Google Cloud Bigtable table or materialized view.\n"
+ "The transform requires the project ID, instance ID, and either table ID or\n"
+ "materialized view name. Optionally, the output can be flattened or nested rows.\n"
+ "Example usage:\n"
+ " - type: ReadFromBigTable\n"
+ " config:\n"
Expand All @@ -116,9 +116,17 @@ public abstract static class BigtableReadSchemaTransformConfiguration implements
public void validate() {
String emptyStringMessage =
"Invalid Bigtable Read configuration: %s should not be a non-empty String";
checkArgument(!this.getTableId().isEmpty(), String.format(emptyStringMessage, "table"));
checkArgument(!this.getInstanceId().isEmpty(), String.format(emptyStringMessage, "instance"));
checkArgument(!this.getProjectId().isEmpty(), String.format(emptyStringMessage, "project"));

String tableId = this.getTableId();
String mvName = this.getMaterializedViewName();
boolean hasTableId = tableId != null && !tableId.isEmpty();
boolean hasMvName = mvName != null && !mvName.isEmpty();
checkArgument(hasTableId || hasMvName, "Either table or materializedViewName must be set");
checkArgument(
!(hasTableId && hasMvName),
"Only one of table or materializedViewName can be set, not both");
}

public static Builder builder() {
Expand All @@ -128,7 +136,11 @@ public static Builder builder() {
}

@SchemaFieldDescription("Bigtable table ID to read from.")
public abstract String getTableId();
public abstract @Nullable String getTableId();

@SchemaFieldDescription(
"Bigtable materialized view name to read from. Mutually exclusive with table.")
public abstract @Nullable String getMaterializedViewName();

@SchemaFieldDescription("Bigtable instance ID to connect to.")
public abstract String getInstanceId();
Expand All @@ -145,6 +157,8 @@ public static Builder builder() {
public abstract static class Builder {
public abstract Builder setTableId(String tableId);

public abstract Builder setMaterializedViewName(String materializedViewName);

public abstract Builder setInstanceId(String instanceId);

public abstract Builder setProjectId(String projectId);
Expand Down Expand Up @@ -176,14 +190,19 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
String.format(
"Input to %s is expected to be empty, but is not.", getClass().getSimpleName()));

PCollection<com.google.bigtable.v2.Row> bigtableRows =
input
.getPipeline()
.apply(
BigtableIO.read()
.withTableId(configuration.getTableId())
.withInstanceId(configuration.getInstanceId())
.withProjectId(configuration.getProjectId()));
BigtableIO.Read read =
BigtableIO.read()
.withInstanceId(configuration.getInstanceId())
.withProjectId(configuration.getProjectId());
String mvName = configuration.getMaterializedViewName();
String tableId = configuration.getTableId();
if (mvName != null && !mvName.isEmpty()) {
read = read.withMaterializedViewName(mvName);
} else if (tableId != null) {
read = read.withTableId(tableId);
}

PCollection<com.google.bigtable.v2.Row> bigtableRows = input.getPipeline().apply(read);

Schema outputSchema =
Boolean.FALSE.equals(configuration.getFlatten()) ? ROW_SCHEMA : FLATTENED_ROW_SCHEMA;
Expand Down
Loading
Loading