diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java index f63566befce7d..7382a9a9b848a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java @@ -86,6 +86,27 @@ public interface TableSemantics { */ int[] partitionByColumns(); + /** + * Returns the upsert key of the passed table as derived by the planner from primary key + * constraints and the rewritten relational plan. The upsert key uniquely identifies a row + * within the input changelog and survives planner transformations that preserve key semantics + * (e.g. filters, projections that retain key columns). Applies to both table arguments with row + * and set semantics. + * + *

This complements {@link #partitionByColumns()}: a caller is not required to repeat the + * primary key via {@code PARTITION BY} just so a PTF can identify rows - the planner already + * knows the key from the input table's declaration. + * + * @return An array of indexes (0-based) that specify the upsert key columns. Returns an empty + * array if the planner could not derive an upsert key for the input (e.g., append-only + * sources without a declared primary key, or operations that destroyed the key). Returns an + * empty array during the type inference phase as the upsert key is still unknown at that + * point. + */ + default int[] upsertKeyColumns() { + return new int[0]; + } + /** * Returns information about how the passed table is ordered. Applies only to table arguments * with set semantics. diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java index fe881f8fd1fc2..ad8cad8379ab9 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java @@ -35,9 +35,10 @@ public class TableSemanticsMock implements TableSemantics { private final SortDirection[] orderByDirections; private final int timeColumn; private final ChangelogMode changelogMode; + private final int[] upsertKeyColumns; public TableSemanticsMock(DataType dataType) { - this(dataType, new int[0], new int[0], -1, null); + this(dataType, new int[0], new int[0], -1, null, new int[0]); } public TableSemanticsMock( @@ -46,6 +47,16 @@ public TableSemanticsMock( int[] orderByColumns, int timeColumn, @Nullable ChangelogMode changelogMode) { + this(dataType, partitionByColumns, orderByColumns, timeColumn, changelogMode, new int[0]); + } + + public TableSemanticsMock( + DataType dataType, + int[] partitionByColumns, + int[] orderByColumns, + int timeColumn, + @Nullable ChangelogMode changelogMode, + int[] upsertKeyColumns) { this.dataType = dataType; this.partitionByColumns = partitionByColumns; this.orderByColumns = orderByColumns; @@ -55,6 +66,7 @@ public TableSemanticsMock( } this.timeColumn = timeColumn; this.changelogMode = changelogMode; + this.upsertKeyColumns = upsertKeyColumns; } @Override @@ -82,6 +94,11 @@ public int timeColumn() { return timeColumn; } + @Override + public int[] upsertKeyColumns() { + return upsertKeyColumns; + } + @Override public Optional changelogMode() { return Optional.ofNullable(changelogMode); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java index 2c393adf875ec..59e67eaea94f8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java @@ -335,19 +335,20 @@ public boolean hasScalarArgument(String name) { * scalar arguments through the same coercion path as validation. */ public CallContext toCallContext(RexCall call) { - return toCallContext(call, null, null, null); + return toCallContext(call, null, null, null, null); } /** * Variant of {@link #toCallContext(RexCall)} that additionally exposes the call's input time - * columns and changelog modes - needed by the streaming codegen path so PTFs can specialize - * themselves to the exact call. + * columns, changelog modes, and per-input upsert keys - needed by the streaming codegen path so + * PTFs can specialize themselves to the exact call. */ public CallContext toCallContext( RexCall call, @Nullable List inputTimeColumns, @Nullable List inputChangelogModes, - @Nullable ChangelogMode outputChangelogMode) { + @Nullable ChangelogMode outputChangelogMode, + @Nullable List inputUpsertKeys) { return new OperatorBindingCallContext( dataTypeFactory, getDefinition(), @@ -355,7 +356,8 @@ public CallContext toCallContext( call.getType(), inputTimeColumns, inputChangelogModes, - outputChangelogMode); + outputChangelogMode, + inputUpsertKeys); } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java index f31406dad196d..47ae23ed482c8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java @@ -64,13 +64,14 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext { private final @Nullable List inputTimeColumns; private final @Nullable List inputChangelogModes; private final @Nullable ChangelogMode outputChangelogMode; + private final @Nullable List inputUpsertKeys; public OperatorBindingCallContext( DataTypeFactory dataTypeFactory, FunctionDefinition definition, SqlOperatorBinding binding, RelDataType returnRelDataType) { - this(dataTypeFactory, definition, binding, returnRelDataType, null, null, null); + this(dataTypeFactory, definition, binding, returnRelDataType, null, null, null, null); } public OperatorBindingCallContext( @@ -80,7 +81,8 @@ public OperatorBindingCallContext( RelDataType returnRelDataType, @Nullable List inputTimeColumns, @Nullable List inputChangelogModes, - @Nullable ChangelogMode outputChangelogMode) { + @Nullable ChangelogMode outputChangelogMode, + @Nullable List inputUpsertKeys) { super( dataTypeFactory, definition, @@ -109,6 +111,7 @@ public int size() { this.inputTimeColumns = inputTimeColumns; this.inputChangelogModes = inputChangelogModes; this.outputChangelogMode = outputChangelogMode; + this.inputUpsertKeys = inputUpsertKeys; } @Override @@ -173,13 +176,18 @@ public Optional getTableSemantics(int pos) { Optional.ofNullable(inputChangelogModes) .map(m -> m.get(tableArgCall.getInputIndex())) .orElse(null); + final int[] upsertKeyColumns = + Optional.ofNullable(inputUpsertKeys) + .map(m -> m.get(tableArgCall.getInputIndex())) + .orElse(new int[0]); return Optional.of( OperatorBindingTableSemantics.create( argumentDataTypes.get(pos), staticArg, tableArgCall, timeColumn, - changelogMode)); + changelogMode, + upsertKeyColumns)); } @Override @@ -283,20 +291,23 @@ private static class OperatorBindingTableSemantics implements TableSemantics { private final SortDirection[] orderByDirections; private final int timeColumn; private final @Nullable ChangelogMode changelogMode; + private final int[] upsertKeyColumns; public static OperatorBindingTableSemantics create( DataType tableDataType, StaticArgument staticArg, RexTableArgCall tableArgCall, int timeColumn, - @Nullable ChangelogMode changelogMode) { + @Nullable ChangelogMode changelogMode, + int[] upsertKeyColumns) { return new OperatorBindingTableSemantics( createDataType(tableDataType, staticArg), tableArgCall.getPartitionKeys(), tableArgCall.getOrderKeys(), RexTableArgCall.toSortDirections(tableArgCall.getSortOrder()), timeColumn, - changelogMode); + changelogMode, + upsertKeyColumns); } private OperatorBindingTableSemantics( @@ -305,13 +316,15 @@ private OperatorBindingTableSemantics( int[] orderByColumns, SortDirection[] orderByDirections, int timeColumn, - @Nullable ChangelogMode changelogMode) { + @Nullable ChangelogMode changelogMode, + int[] upsertKeyColumns) { this.dataType = dataType; this.partitionByColumns = partitionByColumns; this.orderByColumns = orderByColumns; this.orderByDirections = orderByDirections; this.timeColumn = timeColumn; this.changelogMode = changelogMode; + this.upsertKeyColumns = upsertKeyColumns; } private static DataType createDataType(DataType tableDataType, StaticArgument staticArg) { @@ -349,6 +362,11 @@ public int timeColumn() { return timeColumn; } + @Override + public int[] upsertKeyColumns() { + return upsertKeyColumns; + } + @Override public Optional changelogMode() { return Optional.ofNullable(changelogMode); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java index 3973329af7484..33de7a7bc8464 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java @@ -108,6 +108,7 @@ public class StreamExecProcessTableFunction extends ExecNodeBase public static final String FIELD_NAME_FUNCTION_CALL = "functionCall"; public static final String FIELD_NAME_INPUT_CHANGELOG_MODES = "inputChangelogModes"; public static final String FIELD_NAME_OUTPUT_CHANGELOG_MODE = "outputChangelogMode"; + public static final String FIELD_NAME_INPUT_UPSERT_KEYS = "inputUpsertKeys"; @JsonProperty(FIELD_NAME_UID) private final @Nullable String uid; @@ -121,6 +122,9 @@ public class StreamExecProcessTableFunction extends ExecNodeBase @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) private final ChangelogMode outputChangelogMode; + @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS) + private final List inputUpsertKeys; + public StreamExecProcessTableFunction( ReadableConfig tableConfig, List inputProperties, @@ -129,7 +133,8 @@ public StreamExecProcessTableFunction( @Nullable String uid, RexCall invocation, List inputChangelogModes, - ChangelogMode outputChangelogMode) { + ChangelogMode outputChangelogMode, + List inputUpsertKeys) { this( ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecProcessTableFunction.class), @@ -141,7 +146,8 @@ public StreamExecProcessTableFunction( uid, invocation, inputChangelogModes, - outputChangelogMode); + outputChangelogMode, + inputUpsertKeys); } @JsonCreator @@ -155,7 +161,8 @@ public StreamExecProcessTableFunction( @JsonProperty(FIELD_NAME_UID) @Nullable String uid, @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation, @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES) List inputChangelogModes, - @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode outputChangelogMode) { + @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode outputChangelogMode, + @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS) @Nullable List inputUpsertKeys) { super(id, context, persistedConfig, inputProperties, outputType, description); this.uid = uid; // Mirror the FlinkLogicalTableFunctionScan converter for the compiled-plan restore path: @@ -164,6 +171,14 @@ public StreamExecProcessTableFunction( this.invocation = BridgingSqlFunction.resolveCallTraits((RexCall) invocation); this.inputChangelogModes = inputChangelogModes; this.outputChangelogMode = outputChangelogMode; + // Older compiled plans (pre-FLINK-39735) did not persist this field. Default to per-input + // empty arrays so the runtime sees the same behavior as before (no derivable upsert key). + this.inputUpsertKeys = + inputUpsertKeys != null + ? inputUpsertKeys + : IntStream.range(0, inputChangelogModes.size()) + .mapToObj(i -> new int[0]) + .collect(Collectors.toList()); } public @Nullable String getUid() { @@ -202,7 +217,12 @@ protected Transformation translateToPlanInternal( final RexCall udfCall = StreamPhysicalProcessTableFunction.toUdfCall(invocation); final GeneratedRunnerResult generated = ProcessTableRunnerGenerator.generate( - ctx, udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode); + ctx, + udfCall, + inputTimeColumns, + inputChangelogModes, + outputChangelogMode, + inputUpsertKeys); final GeneratedProcessTableRunner generatedRunner = generated.runner(); final LinkedHashMap stateInfos = generated.stateInfos(); @@ -309,6 +329,7 @@ private RuntimeTableSemantics createRuntimeTableSemantics( } final int timeColumn = inputTimeColumns.get(tableArgCall.getInputIndex()); + final int[] upsertKeyColumns = inputUpsertKeys.get(tableArgCall.getInputIndex()); return new RuntimeTableSemantics( tableArg.getName(), @@ -320,7 +341,8 @@ private RuntimeTableSemantics createRuntimeTableSemantics( consumedChangelogMode, tableArg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH), tableArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE), - timeColumn); + timeColumn, + upsertKeyColumns); } private Transformation createKeyedTransformation( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java index 5ccecf18e71be..6f90b38e431d1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java @@ -27,11 +27,13 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.RexTableArgCall; import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils; +import org.apache.flink.table.planner.plan.utils.UpsertKeyUtil; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.planner.utils.ShortcutUtils; import org.apache.flink.table.types.inference.StaticArgument; @@ -165,6 +167,7 @@ public ExecNode translateToExecNode() { verifyTimeAttributes(getInputs(), call, inputChangelogModes, outputChangelogMode); final List> providedInputArgs = getProvidedInputArgs(call); verifyPassThroughColumnsForUpdates(providedInputArgs, outputChangelogMode); + final List inputUpsertKeys = deriveInputUpsertKeys(getInputs()); return new StreamExecProcessTableFunction( unwrapTableConfig(this), getInputs().stream().map(i -> InputProperty.DEFAULT).collect(Collectors.toList()), @@ -173,7 +176,26 @@ public ExecNode translateToExecNode() { uid, call, inputChangelogModes, - outputChangelogMode); + outputChangelogMode, + inputUpsertKeys); + } + + /** + * Derives an upsert key (collapsed to one candidate via {@link UpsertKeyUtil#smallestKey}) for + * each input. Returns an empty array entry for inputs without a derivable upsert key + * (append-only sources without a declared primary key, or operations that destroyed the key). + * Surfaces as {@link org.apache.flink.table.functions.TableSemantics#upsertKeyColumns()} so + * PTFs can identify rows without requiring callers to repeat the key via PARTITION BY. + */ + private static List deriveInputUpsertKeys(List inputs) { + final List perInput = new ArrayList<>(inputs.size()); + for (RelNode input : inputs) { + final FlinkRelMetadataQuery fmq = + FlinkRelMetadataQuery.reuseOrCreate(input.getCluster().getMetadataQuery()); + final Set upsertKeys = fmq.getUpsertKeys(input); + perInput.add(UpsertKeyUtil.smallestKey(upsertKeys).orElse(new int[0])); + } + return perInput; } @Override diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala index 52df803d5c8f8..402157aa0fdb6 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala @@ -65,7 +65,8 @@ object ProcessTableRunnerGenerator { udfCall: RexCall, inputTimeColumns: java.util.List[Integer], inputChangelogModes: java.util.List[ChangelogMode], - outputChangelogMode: ChangelogMode): GeneratedRunnerResult = { + outputChangelogMode: ChangelogMode, + inputUpsertKeys: java.util.List[Array[Int]]): GeneratedRunnerResult = { val function: BridgingSqlFunction = udfCall.getOperator.asInstanceOf[BridgingSqlFunction] val definition: FunctionDefinition = function.getDefinition val dataTypeFactory = function.getDataTypeFactory @@ -77,7 +78,12 @@ object ProcessTableRunnerGenerator { // Thus, functions can reconfigure themselves for the exact use case. // Including updating their state layout. val callContext = - function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode) + function.toCallContext( + udfCall, + inputTimeColumns, + inputChangelogModes, + outputChangelogMode, + inputUpsertKeys) // Create the final UDF for runtime val udf = UserDefinedFunctionHelper.createSpecializedFunction( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index fd646a34148b4..47939aa8dffe4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -1676,7 +1676,12 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val inputTimeColumns = StreamPhysicalProcessTableFunction.toInputTimeColumns(process.getCall) val function = udfCall.getOperator.asInstanceOf[BridgingSqlFunction] val callContext = - function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode) + function.toCallContext( + udfCall, + inputTimeColumns, + inputChangelogModes, + outputChangelogMode, + null) // Expose a simplified context focused on changelog-relevant inputs: changelog modes, // resolved literal arguments, and table semantics (e.g., partition-by columns). diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java index 8d83e51770d59..76187ed679e0b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java @@ -456,6 +456,18 @@ public void eval(Context ctx, @ArgumentHint(SET_SEMANTIC_TABLE) Row r, String s) } } + /** + * Testing function for FLINK-39735: surfaces the planner-derived upsert key on {@link + * TableSemantics}. Used by tests to assert that {@code upsertKeyColumns()} reports the + * primary-key columns of the input even when the caller did not write {@code PARTITION BY}. + */ + public static class UpsertKeyContextFunction extends AppendProcessTableFunctionBase { + public void eval(Context ctx, @ArgumentHint(ROW_SEMANTIC_TABLE) Row r) { + final TableSemantics semantics = ctx.tableSemanticsFor("r"); + collectObjects(r, semantics.upsertKeyColumns()); + } + } + /** Testing function. */ public static class PojoStateFunction extends AppendProcessTableFunctionBase { public void eval(@StateHint Score s, @ArgumentHint(SET_SEMANTIC_TABLE) Row r) { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java index cabab4c613143..6719adda32034 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java @@ -44,6 +44,7 @@ public class RuntimeTableSemantics implements TableSemantics, Serializable { private final boolean passColumnsThrough; private final boolean hasSetSemantics; private final int timeColumn; + private final int[] upsertKeyColumns; private transient ChangelogMode changelogMode; @@ -57,7 +58,8 @@ public RuntimeTableSemantics( RuntimeChangelogMode consumedChangelogMode, boolean passColumnsThrough, boolean hasSetSemantics, - int timeColumn) { + int timeColumn, + int[] upsertKeyColumns) { this.argName = argName; this.inputIndex = inputIndex; this.dataType = dataType; @@ -68,6 +70,7 @@ public RuntimeTableSemantics( this.passColumnsThrough = passColumnsThrough; this.hasSetSemantics = hasSetSemantics; this.timeColumn = timeColumn; + this.upsertKeyColumns = upsertKeyColumns; } public String getArgName() { @@ -118,6 +121,11 @@ public int timeColumn() { return timeColumn; } + @Override + public int[] upsertKeyColumns() { + return upsertKeyColumns; + } + @Override public Optional changelogMode() { return Optional.of(getChangelogMode()); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java index be390ab5f5557..38a0df4811168 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java @@ -246,7 +246,8 @@ private static RuntimeTableSemantics tableSemantics() { RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()), /* passColumnsThrough */ false, /* hasSetSemantics */ true, - /* timeColumn */ 1); + /* timeColumn */ 1, + /* upsertKeyColumns */ new int[0]); } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java index fadf21d7dd942..1f81a36ed767a 100644 --- a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java +++ b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java @@ -30,10 +30,16 @@ class TestHarnessTableSemantics implements TableSemantics { private final DataType dataType; private final int[] partitionByColumns; + private final int[] upsertKeyColumns; TestHarnessTableSemantics(DataType dataType, int[] partitionByColumns) { + this(dataType, partitionByColumns, new int[0]); + } + + TestHarnessTableSemantics(DataType dataType, int[] partitionByColumns, int[] upsertKeyColumns) { this.dataType = dataType; this.partitionByColumns = partitionByColumns; + this.upsertKeyColumns = upsertKeyColumns; } @Override @@ -61,6 +67,11 @@ public int timeColumn() { return -1; } + @Override + public int[] upsertKeyColumns() { + return upsertKeyColumns; + } + @Override public Optional changelogMode() { return Optional.empty();