diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java index f29ecc548c1c..4bb0f714ce83 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java @@ -64,7 +64,8 @@ public class IntegrationTestEnv extends ExternalResource { private DatabaseAdminClient databaseAdminClient; private DatabaseClient databaseClient; private boolean isPostgres; - private boolean isPlacementTableBasedChangeStream; + private boolean isMutableChangeStream; + private boolean isPlacementTable; public boolean useSeparateMetadataDb; @Override @@ -100,13 +101,18 @@ protected void before() throws Throwable { IntegrationTestEnv() { this.isPostgres = false; - this.isPlacementTableBasedChangeStream = false; + this.isMutableChangeStream = false; + this.isPlacementTable = false; } IntegrationTestEnv( - boolean isPostgres, boolean isPlacementTableBasedChangeStream, Optional host) { + boolean isPostgres, + boolean isMutableChangeStream, + boolean isPlacementTable, + Optional host) { this.isPostgres = isPostgres; - this.isPlacementTableBasedChangeStream = isPlacementTableBasedChangeStream; + this.isMutableChangeStream = isMutableChangeStream; + this.isPlacementTable = isPlacementTable; if (host.isPresent()) { this.host = host.get(); } @@ -209,7 +215,7 @@ String createSingersTable() throws InterruptedException, ExecutionException, Tim } String createGSQLTableDDL(String tableName) { - if (this.isPlacementTableBasedChangeStream) { + if (this.isPlacementTable) { // create a placement table. return "CREATE TABLE " + tableName @@ -240,8 +246,7 @@ String createChangeStreamFor(String tableName) .updateDatabaseDdl( instanceId, databaseId, - Collections.singletonList( - "CREATE CHANGE STREAM \"" + changeStreamName + "\" FOR \"" + tableName + "\""), + Collections.singletonList(createPostgresChangeStreamDDL(changeStreamName, tableName)), null) .get(TIMEOUT_MINUTES, TimeUnit.MINUTES); } else { @@ -258,7 +263,7 @@ String createChangeStreamFor(String tableName) } String createGSQLChangeStreamDDL(String changeStreamName, String tableName) { - if (this.isPlacementTableBasedChangeStream) { + if (this.isMutableChangeStream) { // Create a MUTABLE_KEY_RANGE change stream. String statement = "CREATE CHANGE STREAM " @@ -271,6 +276,21 @@ String createGSQLChangeStreamDDL(String changeStreamName, String tableName) { return "CREATE CHANGE STREAM " + changeStreamName + " FOR " + tableName; } + String createPostgresChangeStreamDDL(String changeStreamName, String tableName) { + if (this.isMutableChangeStream) { + // Create a MUTABLE_KEY_RANGE change stream. + String statement = + "CREATE CHANGE STREAM \"" + + changeStreamName + + "\" FOR \"" + + tableName + + "\"" + + " WITH (partition_mode = 'MUTABLE_KEY_RANGE')"; + return statement; + } + return "CREATE CHANGE STREAM \"" + changeStreamName + "\" FOR \"" + tableName + "\""; + } + void createRoleAndGrantPrivileges(String table, String changeStream) throws InterruptedException, ExecutionException, TimeoutException { if (this.isPostgres) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java index 573ac8259101..eb5d151005ad 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPlacementTablePostgresIT.java @@ -65,7 +65,8 @@ public class SpannerChangeStreamPlacementTablePostgresIT { public static final IntegrationTestEnv ENV = new IntegrationTestEnv( /*isPostgres=*/ true, - /*isPlacementTableBasedChangeStream=*/ true, + /*isMutableChangeStream=*/ true, + /*isPlacementTable=*/ true, /*host=*/ Optional.empty()); @Rule public final transient TestPipeline pipeline = TestPipeline.create(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPostgresIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPostgresIT.java index 1fb9cd4a506e..5f5f55e46964 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPostgresIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamPostgresIT.java @@ -65,7 +65,8 @@ public class SpannerChangeStreamPostgresIT { public static final IntegrationTestEnv ENV = new IntegrationTestEnv( /*isPostgres=*/ true, - /*isPlacementTableBasedChangeStream=*/ false, + /*isMutableChangeStream=*/ false, + /*isPlacementTable=*/ false, /*host=*/ Optional.empty()); @Rule public final transient TestPipeline pipeline = TestPipeline.create();