Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/content.zh/docs/sql/reference/queries/joins.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ o_002 12.51 EUR 1.10 12:06:00
这里的 `INTERVAL` 时间减法用于等待后续事件,以确保 join 满足预期。
请确保 join 两边设置了正确的 watermark 。

**Note:** Probe-side (left) records that arrive late (their event time is less than or equal to the current watermark) are dropped on arrival and counted by the `numLateRecordsDropped` operator metric. They are not joined or emitted, not even as null-padded results for `LEFT JOIN`, because the matching build-side version may already have been cleaned up.

**注意:** 事件时间 temporal join 需要包含主键相等的条件,即:`currency_rates` 表的主键 `currency_rates.currency` 包含在条件 `orders.currency = currency_rates.currency` 中。

与 [regular joins](#regular-joins) 相比,就算 build side(例子中的 currency_rates 表)发生变更了,之前的 temporal table 的结果也不会被影响。
Expand Down
2 changes: 2 additions & 0 deletions docs/content/docs/sql/reference/queries/joins.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ o_002 12.51 EUR 1.10 12:06:00
The `INTERVAL` time subtraction is used to wait for late events in order to make sure the join will meet the expectation.
Please ensure both sides of the join have set watermark correctly.

**Note:** Probe-side (left) records that arrive late (their event time is less than or equal to the current watermark) are dropped on arrival and counted by the `numLateRecordsDropped` operator metric. They are not joined or emitted, not even as null-padded results for `LEFT JOIN`, because the matching build-side version may already have been cleaned up.

**Note:** The event-time temporal join requires the primary key contained in the equivalence condition of the temporal join condition, e.g., The primary key `currency_rates.currency` of table `currency_rates` to be constrained in the condition `orders.currency = currency_rates.currency`.

In contrast to [regular joins](#regular-joins), the previous temporal table results will not be affected despite the changes on the build side.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class TemporalJoinTestPrograms {
Row.of(3L, "Euro", "2020-10-10 00:00:45"))
.producedAfterRestore(
Row.of(1L, "Euro", "2020-10-10 00:00:58"),
Row.of(1L, "USD", "2020-10-10 00:00:58"))
Row.of(1L, "USD", "2020-10-10 00:00:59"))
.build();

static final SourceTestStep ORDERS_WITH_NESTED_ID =
Expand Down Expand Up @@ -88,7 +88,7 @@ public class TemporalJoinTestPrograms {
1L,
Row.of("usd"),
mapOf("currency", "USD"),
"2020-10-10 00:00:58"))
"2020-10-10 00:00:59"))
.build();

static final SourceTestStep RATES =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimer;
Expand Down Expand Up @@ -61,9 +62,14 @@
* idea is that between watermarks we are collecting those elements and once we are sure that there
* will be no updates we emit the correct result and clean up the expired data in state.
*
* <p>Cleaning up the state drops all of the "old" values from the probe side, where "old" is
* defined as older then the current watermark. Build side is also cleaned up in the similar
* fashion, however we always keep at least one record - the latest one - even if it's past the last
* <p>Probe-side records that arrive late (their event time is less than or equal to the current
* watermark) are dropped on arrival and counted via the {@code numLateRecordsDropped} metric; they
* are not joined or emitted (not even as null-padded results for left outer joins), because the
* matching build-side version may already have been cleaned up.
*
* <p>Cleaning up the state drops all the "old" values from the probe side, where "old" is defined
* as older than the current watermark. Build side is also cleaned up in the similar fashion,
* however we always keep at least one record - the latest one - even if it's past the last
* watermark.
*
* <p>One more trick is how the emitting results and cleaning up is triggered. It is achieved by
Expand All @@ -84,6 +90,7 @@ public class TemporalRowTimeJoinOperator extends BaseTwoInputStreamOperatorWithS
private static final String RIGHT_STATE_NAME = "right";
private static final String REGISTERED_TIMER_STATE_NAME = "timer";
private static final String TIMERS_STATE_NAME = "timers";
private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";

private final boolean isLeftOuterJoin;
private final InternalTypeInfo<RowData> leftType;
Expand Down Expand Up @@ -123,6 +130,8 @@ public class TemporalRowTimeJoinOperator extends BaseTwoInputStreamOperatorWithS
private transient JoinedRowData outRow;
private transient GenericRowData rightNullRow;

private transient Counter numLateRecordsDropped;

public TemporalRowTimeJoinOperator(
InternalTypeInfo<RowData> leftType,
InternalTypeInfo<RowData> rightType,
Expand Down Expand Up @@ -174,13 +183,23 @@ public void open() throws Exception {
outRow = new JoinedRowData();
rightNullRow = new GenericRowData(rightType.toRowType().getFieldCount());
collector = new TimestampedCollector<>(output);

numLateRecordsDropped =
getRuntimeContext().getMetricGroup().counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
}

@Override
public void processElement1(StreamRecord<RowData> element) throws Exception {
RowData row = element.getValue();
long leftTime = getLeftTime(row);
if (leftTime <= timerService.currentWatermark()) {
// The probe-side record is late. Drop it, because the matching build-side version may
// already have been cleaned up.
numLateRecordsDropped.inc();
return;
}
leftState.put(getNextLeftIndex(), row);
registerSmallestTimer(getLeftTime(row)); // Timer to emit and clean up the state
registerSmallestTimer(leftTime); // Timer to emit and clean up the state

registerProcessingCleanupTimer();
}
Expand Down Expand Up @@ -441,4 +460,9 @@ static String getNextLeftIndexStateName() {
static String getRegisteredTimerStateName() {
return REGISTERED_TIMER_STATE_NAME;
}

@VisibleForTesting
Counter getNumLateRecordsDropped() {
return numLateRecordsDropped;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
class TemporalRowTimeJoinOperatorTest extends TemporalTimeJoinOperatorTestBase {
/** Test rowtime temporal join. */
@Test
void testRowTimeTemporalJoin() throws Exception {
void testRowTimeInnerTemporalJoin() throws Exception {
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(new Watermark(1));
expectedOutput.add(new Watermark(0));
expectedOutput.add(new Watermark(2));
expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "1a2"));
expectedOutput.add(new Watermark(5));
Expand All @@ -54,14 +54,12 @@ void testRowTimeTemporalJoin() throws Exception {
testRowTimeTemporalJoin(false, expectedOutput);
}

/** Test rowtime left temporal join. */
@Test
void testRowTimeLeftTemporalJoin() throws Exception {
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(new Watermark(1));
expectedOutput.add(new Watermark(0));
expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null));
expectedOutput.add(new Watermark(2));
expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null));
expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "1a2"));
expectedOutput.add(new Watermark(5));
expectedOutput.add(insertRecord(6L, "k2", "2a3", 4L, "k2", "2a4"));
Expand All @@ -84,16 +82,15 @@ private void testRowTimeTemporalJoin(boolean isLeftOuterJoin, List<Object> expec

testHarness.open();

testHarness.processWatermark1(new Watermark(1));
testHarness.processWatermark2(new Watermark(1));
testHarness.processWatermark1(new Watermark(0));
testHarness.processWatermark2(new Watermark(0));

testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
testHarness.processElement2(insertRecord(2L, "k1", "1a2"));

testHarness.processWatermark1(new Watermark(2));
testHarness.processWatermark2(new Watermark(2));

testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
testHarness.processElement1(insertRecord(3L, "k1", "1a3"));
testHarness.processElement2(insertRecord(4L, "k2", "2a4"));

Expand Down Expand Up @@ -194,9 +191,9 @@ void testRowTimeTemporalJoinWithStateRetention() throws Exception {
}

@Test
void testRowTimeTemporalJoinOnUpsertSource() throws Exception {
void testRowTimeInnerTemporalJoinOnUpsertSource() throws Exception {
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(new Watermark(1));
expectedOutput.add(new Watermark(0));
expectedOutput.add(new Watermark(2));
expectedOutput.add(updateAfterRecord(3L, "k1", "1a3", 2L, "k1", "1a2"));
expectedOutput.add(new Watermark(5));
Expand All @@ -212,7 +209,7 @@ void testRowTimeTemporalJoinOnUpsertSource() throws Exception {
@Test
void testRowTimeLeftTemporalJoinOnUpsertSource() throws Exception {
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(new Watermark(1));
expectedOutput.add(new Watermark(0));
expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null));
expectedOutput.add(new Watermark(2));
expectedOutput.add(updateAfterRecord(3L, "k1", "1a3", 2L, "k1", "1a2"));
Expand All @@ -237,8 +234,8 @@ private void testRowTimeTemporalJoinOnUpsertSource(

testHarness.open();

testHarness.processWatermark1(new Watermark(1));
testHarness.processWatermark2(new Watermark(1));
testHarness.processWatermark1(new Watermark(0));
testHarness.processWatermark2(new Watermark(0));

testHarness.processElement1(insertRecord(1L, "k1", "1a1"));
testHarness.processElement2(insertRecord(2L, "k1", "1a2"));
Expand Down Expand Up @@ -270,6 +267,96 @@ private void testRowTimeTemporalJoinOnUpsertSource(
testHarness.close();
}

@Test
void testRowTimeInnerTemporalJoinLateRecords() throws Exception {
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(new Watermark(1));
expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "2a2"));
expectedOutput.add(new Watermark(5));
expectedOutput.add(insertRecord(7L, "k1", "1a7", 2L, "k1", "2a2"));
expectedOutput.add(new Watermark(8));
expectedOutput.add(new Watermark(11));
expectedOutput.add(insertRecord(13L, "k2", "1a13", 9L, "k2", "2a9"));
expectedOutput.add(new Watermark(13));
expectedOutput.add(new Watermark(15));

testRowTimeTemporalJoinLateRecords(false, expectedOutput);
}

@Test
void testRowTimeLeftTemporalJoinLateRecords() throws Exception {
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(new Watermark(1));
expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "2a2"));
expectedOutput.add(new Watermark(5));
expectedOutput.add(insertRecord(7L, "k1", "1a7", 2L, "k1", "2a2"));
expectedOutput.add(new Watermark(8));
expectedOutput.add(insertRecord(10L, "k2", "1a10", null, null, null));
expectedOutput.add(new Watermark(11));
expectedOutput.add(insertRecord(13L, "k2", "1a13", 9L, "k2", "2a9"));
expectedOutput.add(new Watermark(13));
expectedOutput.add(new Watermark(15));

testRowTimeTemporalJoinLateRecords(true, expectedOutput);
}

/**
* Verifies that probe-side records whose event time is less than or equal to the current
* watermark are dropped on arrival: they are not joined, not emitted (even with a left outer
* join), and are counted in the {@code numLateRecordsDropped} metric.
*/
private void testRowTimeTemporalJoinLateRecords(
boolean isLeftOuter, List<Object> expectedOutput) throws Exception {
TemporalRowTimeJoinOperator joinOperator =
new TemporalRowTimeJoinOperator(
rowType, rowType, joinCondition, 0, 0, 0, 0, isLeftOuter);
KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> testHarness =
createTestHarness(joinOperator);

testHarness.open();

// initialize watermark to 1
testHarness.processWatermark1(new Watermark(1));
testHarness.processWatermark2(new Watermark(1));

// Establish a build-side version at time 2 and a non-late probe record at time 3.
testHarness.processElement2(insertRecord(2L, "k1", "2a2"));
testHarness.processElement1(insertRecord(3L, "k1", "1a3"));
testHarness.processWatermark1(new Watermark(5));
testHarness.processWatermark2(new Watermark(5));

// After Watermark(5), any probe record with leftTime <= 5 is late and must be dropped.
testHarness.processElement1(insertRecord(5L, "k1", "1a5")); // leftTime == watermark
testHarness.processElement1(insertRecord(4L, "k1", "1a4")); // leftTime < watermark
testHarness.processElement1(insertRecord(1L, "k1", "1a1")); // leftTime << watermark
// A non-late probe record should still be processed.
testHarness.processElement1(insertRecord(7L, "k1", "1a7"));
testHarness.processWatermark1(new Watermark(8));
testHarness.processWatermark2(new Watermark(8));

// A record for late retraction
testHarness.processElement1(insertRecord(10L, "k2", "1a10"));
testHarness.processWatermark1(new Watermark(11));
testHarness.processWatermark2(new Watermark(11));

// Add a late retraction and a late build-side record
testHarness.processElement1(insertRecord(13L, "k2", "1a13"));
testHarness.processElement2(insertRecord(9L, "k2", "2a9"));
testHarness.processElement1(deleteRecord(10L, "k2", "1a10")); // late -> dropped
testHarness.processWatermark1(new Watermark(13));
testHarness.processWatermark2(new Watermark(13));

// Another late retraction
testHarness.processElement1(deleteRecord(13L, "k2", "1a13"));
testHarness.processWatermark1(new Watermark(15));
testHarness.processWatermark2(new Watermark(15));

assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
assertThat(joinOperator.getNumLateRecordsDropped().getCount()).isEqualTo(5L);

testHarness.close();
}

private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData>
createTestHarness(TemporalRowTimeJoinOperator temporalJoinOperator) throws Exception {

Expand Down