From 216965ece3ee15370abf836957dafc26f27ca2c5 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Fri, 22 May 2026 16:26:02 +0200 Subject: [PATCH] [FLINK-39156][docs] Document ON CONFLICT This closes #27683. --- .../docs/sql/reference/dml/insert.md | 172 ++++++++++++++++++ docs/content/docs/sql/reference/dml/insert.md | 172 ++++++++++++++++++ 2 files changed, 344 insertions(+) diff --git a/docs/content.zh/docs/sql/reference/dml/insert.md b/docs/content.zh/docs/sql/reference/dml/insert.md index 53a3fa74107e3..0d879c1c91a2f 100644 --- a/docs/content.zh/docs/sql/reference/dml/insert.md +++ b/docs/content.zh/docs/sql/reference/dml/insert.md @@ -302,4 +302,176 @@ INSERT INTO students END; ``` +## ON CONFLICT clause + +When a query produces a changelog stream with an upsert key that differs from the sink table's primary key, multiple records with different upsert keys may map to the same primary key. The `ON CONFLICT` clause specifies how to resolve these primary key conflicts at the sink. + +### When is ON CONFLICT required? + +By default, Flink requires an explicit `ON CONFLICT` clause whenever the query's upsert key differs from the sink table's primary key. Without it, the query fails at planning time. This forces you to consider whether your query genuinely has a conflict scenario or whether there is a logic issue (e.g., a missing `GROUP BY`). + +This check is controlled by the configuration option `table.exec.sink.require-on-conflict` (default: `true`). Setting it to `false` restores the legacy behavior where no `ON CONFLICT` clause was required, but may lead to non-deterministic results. + +Alternatively, if you do not need consistency guarantees for conflicting keys, you can disable the sink upsert materializer entirely by setting `table.exec.sink.upsert-materialize` to `NONE`. This removes the materializer operator from the pipeline, so no buffering, compaction, or conflict resolution is performed. Records are passed directly to the sink as they arrive. + +### Syntax + +```sql +[EXECUTE] INSERT INTO [catalog_name.][db_name.]table_name + select_statement + ON CONFLICT conflict_action + +conflict_action: + DO NOTHING + | DO ERROR + | DO DEDUPLICATE +``` + +### Strategies + +#### DO ERROR + +Throws an exception at runtime if multiple records with different upsert keys map to the same primary key. Use this when you believe no real conflict exists — for example, the planner could not prove that the upsert key matches the primary key, but you know they are logically equivalent. + +Buffered records are compacted on watermark progression before conflict checking, so transient disorder from changelog reordering does not cause false errors. + +```sql +INSERT INTO product_orders +SELECT p.name, o.order_id +FROM orders o JOIN products p ON o.product_name = p.name +ON CONFLICT DO ERROR; +``` + +#### DO NOTHING + +Keeps the first record that arrives for a given primary key and silently discards subsequent conflicting records. Use this when it is acceptable to drop duplicate primary key values from different upsert keys. + +Like `DO ERROR`, this strategy uses watermark-based compaction before applying conflict resolution. + +```sql +INSERT INTO product_orders +SELECT p.name, o.order_id +FROM orders o JOIN products p ON o.product_name = p.name +ON CONFLICT DO NOTHING; +``` + +#### DO DEDUPLICATE + +{{< hint warning >}} +`DO DEDUPLICATE` maintains the full history of changes per primary key in state to support rollback on retraction. This results in significantly higher state usage compared to `DO ERROR` and `DO NOTHING`. +{{< /hint >}} + +Maintains the full history of changes per primary key so that retractions can be correctly rolled back. This is the most correct strategy when true multi-source updates to the same primary key occur and correctness cannot be sacrificed. + +```sql +INSERT INTO product_orders +SELECT p.name, o.order_id +FROM orders o JOIN products p ON o.product_name = p.name +ON CONFLICT DO DEDUPLICATE; +``` + +### How conflicts happen + +A conflict occurs when the query's upsert key differs from the sink table's primary key. For example, consider a join whose result has an upsert key derived from the join condition, but the target table has a different primary key. Records from different upstream upsert keys can then collide on the same primary key in the sink. + +Because retraction (`-U`) and update (`+U`) messages may travel different paths through the pipeline, they can arrive at the sink out of order. `DO ERROR` and `DO NOTHING` use watermark-based compaction to wait for a consistent set of changes before applying conflict resolution, preventing false positives from transient reordering. + +### Watermark-based compaction + +Changelog messages produced by operators such as joins can arrive at the sink out of order. A retraction (`-U`) for a row may arrive after a new insert (`+I`) for a different row that shares the same primary key, making it look like two active records exist for that key — a false conflict. + +Watermark-based compaction solves this by buffering incoming records keyed by their primary key and upsert key. When a watermark advances, all buffered records with timestamps up to that watermark are compacted: matching insert and retraction pairs for the same upsert key cancel each other out (for example, `+I` and `-D`, or `-U` and `+U` pairs). + +**Example.** Using the `orders JOIN products` query from above, suppose order 1 changes its product from `Laptop` to `Phone` while order 3 is also for `Laptop`. The join emits these changelog records: + +``` ++I[Laptop, 1] -- upsert key: order_id=1 ++I[Laptop, 3] -- upsert key: order_id=3 +-U[Laptop, 1] -- upsert key: order_id=1 (retraction for order 1's old product) ++U[Phone, 1] -- upsert key: order_id=1 (order 1 now maps to Phone) +``` + +Without compaction, after the first two `+I` records arrive the operator sees two active records for PK `Laptop` with different upsert keys (`order_id=1` and `order_id=3`) — a false conflict. With compaction, the operator waits for the watermark. The retraction `-U[Laptop, 1]` then cancels the earlier `+I[Laptop, 1]` (same upsert key `order_id=1`), leaving only `+I[Laptop, 3]` for PK `Laptop` — no conflict. + +After compaction, if zero or one record remains per primary key, there is no conflict. If multiple records with different upsert keys still remain, a genuine conflict exists and is resolved by the chosen strategy (`DO ERROR` or `DO NOTHING`). `DO DEDUPLICATE` does not use watermark-based compaction; instead, it maintains the full history of changes in state to support correct rollback on retraction. + +### Examples + +```sql +-- Source and dimension tables +CREATE TABLE orders ( + order_id BIGINT, + product_name STRING, + quantity INT, + PRIMARY KEY(order_id) NOT ENFORCED +) WITH (...); + +CREATE TABLE products ( + name STRING, + PRIMARY KEY(name) NOT ENFORCED +) WITH (...); + +-- Sink table +CREATE TABLE product_orders ( + product_name STRING, + last_order_id BIGINT, + PRIMARY KEY(product_name) NOT ENFORCED +) WITH (...); + +-- This join produces an upsert key that may differ from the sink's PK, +-- so ON CONFLICT is required. +INSERT INTO product_orders +SELECT p.name, o.order_id +FROM orders o JOIN products p ON o.product_name = p.name +ON CONFLICT DO NOTHING; +``` + +Given the following data in the source tables: + +``` +orders: products: ++----------+--------------+----------+ +--------+ +| order_id | product_name | quantity | | name | ++----------+--------------+----------+ +--------+ +| 1 | Laptop | 2 | | Laptop | +| 2 | Phone | 1 | | Phone | +| 3 | Laptop | 5 | +--------+ ++----------+--------------+----------+ +``` + +The join produces these changelog records for `product_orders`: + +``` ++I[Laptop, 1] -- upsert key: order_id=1 ++I[Phone, 2] -- upsert key: order_id=2 ++I[Laptop, 3] -- upsert key: order_id=3 ← conflicts with order_id=1 on PK 'Laptop' +``` + +Two records with different upsert keys (`order_id=1` and `order_id=3`) target the same +primary key (`product_name='Laptop'`). This is the conflict each strategy resolves differently: + +- **`DO ERROR`** — throws a runtime exception because two distinct upsert keys map to the same primary key. +- **`DO NOTHING`** — keeps the first record and discards the conflict: + + | product_name | last_order_id | + |:-------------|:--------------| + | Laptop | 1 | + | Phone | 2 | + +- **`DO DEDUPLICATE`** — accepts both; the last arriving value is visible: + + | product_name | last_order_id | + |:-------------|:--------------| + | Laptop | 3 | + | Phone | 2 | + +**What happens on retraction?** If order 3 is later deleted from the source, the join +emits a retraction `-D[Laptop, 3]`: + +- **`DO NOTHING`** — the retraction has no effect because `(Laptop, 3)` was never written. + The Laptop row remains with `last_order_id=1`. +- **`DO DEDUPLICATE`** — rolls back to the previous value. Laptop falls back to order 1, + producing `{(Laptop, 1), (Phone, 2)}`. The full history kept in state enables this + correct rollback. + {{< top >}} diff --git a/docs/content/docs/sql/reference/dml/insert.md b/docs/content/docs/sql/reference/dml/insert.md index 4213b915479ad..11bc66632feec 100644 --- a/docs/content/docs/sql/reference/dml/insert.md +++ b/docs/content/docs/sql/reference/dml/insert.md @@ -312,5 +312,177 @@ INSERT INTO students END; ``` +## ON CONFLICT clause + +When a query produces an updating table with an upsert key that differs from the sink table's primary key, multiple records with different upsert keys may map to the same primary key. The `ON CONFLICT` clause specifies how to resolve these primary key conflicts at the sink. + +### When is ON CONFLICT required? + +By default, Flink requires an explicit `ON CONFLICT` clause whenever the query's upsert key differs from the sink table's primary key. Without it, the query fails at planning time. This forces you to consider whether your query genuinely has a conflict scenario or whether there is a logic issue (e.g., a missing `GROUP BY`). + +This check is controlled by the configuration option `table.exec.sink.require-on-conflict` (default: `true`). Setting it to `false` restores the legacy behavior where no `ON CONFLICT` clause was required, but may lead to non-deterministic results. + +Alternatively, if you do not need consistency guarantees for conflicting keys, you can disable the sink upsert materializer entirely by setting `table.exec.sink.upsert-materialize` to `NONE`. This removes the materializer operator from the pipeline, so no buffering, compaction, or conflict resolution is performed. Records are passed directly to the sink as they arrive. + +### Syntax + +```sql +[EXECUTE] INSERT INTO [catalog_name.][db_name.]table_name + select_statement + ON CONFLICT conflict_action + +conflict_action: + DO NOTHING + | DO ERROR + | DO DEDUPLICATE +``` + +### Strategies + +#### DO ERROR + +Throws an exception at runtime if multiple records with different upsert keys map to the same primary key. Use this when you believe no real conflict exists — for example, the planner could not prove that the upsert key matches the primary key, but you know they are logically equivalent. + +Buffered records are compacted on watermark progression before conflict checking, so transient disorder from changelog reordering does not cause false errors. + +```sql +INSERT INTO product_orders +SELECT p.name, o.order_id +FROM orders o JOIN products p ON o.product_name = p.name +ON CONFLICT DO ERROR; +``` + +#### DO NOTHING + +Keeps the first record that arrives for a given primary key and silently discards subsequent conflicting records. Use this when it is acceptable to drop duplicate primary key values from different upsert keys. + +Like `DO ERROR`, this strategy uses watermark-based compaction before applying conflict resolution. + +```sql +INSERT INTO product_orders +SELECT p.name, o.order_id +FROM orders o JOIN products p ON o.product_name = p.name +ON CONFLICT DO NOTHING; +``` + +#### DO DEDUPLICATE + +{{< hint warning >}} +`DO DEDUPLICATE` maintains the full history of changes per primary key in state to support rollback on retraction. This results in significantly higher state usage compared to `DO ERROR` and `DO NOTHING`. +{{< /hint >}} + +Maintains the full history of changes per primary key so that retractions can be correctly rolled back. This is the most correct strategy when true multi-source updates to the same primary key occur and correctness cannot be sacrificed. + +```sql +INSERT INTO product_orders +SELECT p.name, o.order_id +FROM orders o JOIN products p ON o.product_name = p.name +ON CONFLICT DO DEDUPLICATE; +``` + +### How conflicts happen + +A conflict occurs when the query's upsert key differs from the sink table's primary key. For example, consider a join whose result has an upsert key derived from the join condition, but the target table has a different primary key. Records from different upstream upsert keys can then collide on the same primary key in the sink. + +Because retraction (`-U`) and update (`+U`) messages may travel different paths through the pipeline, they can arrive at the sink out of order. `DO ERROR` and `DO NOTHING` use watermark-based compaction to wait for a consistent set of changes before applying conflict resolution, preventing false positives from transient reordering. + +### Watermark-based compaction + +Changelog messages produced by operators such as joins can arrive at the sink out of order. A retraction (`-U`) for a row may arrive after a new insert (`+I`) for a different row that shares the same primary key, making it look like two active records exist for that key — a false conflict. + +Watermark-based compaction solves this by buffering incoming records keyed by their primary key and upsert key. When a watermark advances, all buffered records with timestamps up to that watermark are compacted: matching insert and retraction pairs for the same upsert key cancel each other out (for example, `+I` and `-D`, or `-U` and `+U` pairs). + +**Example.** Using the `orders JOIN products` query from above, suppose order 1 changes its product from `Laptop` to `Phone` while order 3 is also for `Laptop`. The join emits these changelog records: + +``` ++I[Laptop, 1] -- upsert key: order_id=1 ++I[Laptop, 3] -- upsert key: order_id=3 +-U[Laptop, 1] -- upsert key: order_id=1 (retraction for order 1's old product) ++U[Phone, 1] -- upsert key: order_id=1 (order 1 now maps to Phone) +``` + +Without compaction, after the first two `+I` records arrive the operator sees two active records for PK `Laptop` with different upsert keys (`order_id=1` and `order_id=3`) — a false conflict. With compaction, the operator waits for the watermark. The retraction `-U[Laptop, 1]` then cancels the earlier `+I[Laptop, 1]` (same upsert key `order_id=1`), leaving only `+I[Laptop, 3]` for PK `Laptop` — no conflict. + +After compaction, if zero or one record remains per primary key, there is no conflict. If multiple records with different upsert keys still remain, a genuine conflict exists and is resolved by the chosen strategy (`DO ERROR` or `DO NOTHING`). `DO DEDUPLICATE` does not use watermark-based compaction; instead, it maintains the full history of changes in state to support correct rollback on retraction. + +### Examples + +```sql +-- Source and dimension tables +CREATE TABLE orders ( + order_id BIGINT, + product_name STRING, + quantity INT, + PRIMARY KEY(order_id) NOT ENFORCED +) WITH (...); + +CREATE TABLE products ( + name STRING, + PRIMARY KEY(name) NOT ENFORCED +) WITH (...); + +-- Sink table +CREATE TABLE product_orders ( + product_name STRING, + last_order_id BIGINT, + PRIMARY KEY(product_name) NOT ENFORCED +) WITH (...); + +-- This join produces an upsert key that may differ from the sink's PK, +-- so ON CONFLICT is required. +INSERT INTO product_orders +SELECT p.name, o.order_id +FROM orders o JOIN products p ON o.product_name = p.name +ON CONFLICT DO NOTHING; +``` + +Given the following data in the source tables: + +``` +orders: products: ++----------+--------------+----------+ +--------+ +| order_id | product_name | quantity | | name | ++----------+--------------+----------+ +--------+ +| 1 | Laptop | 2 | | Laptop | +| 2 | Phone | 1 | | Phone | +| 3 | Laptop | 5 | +--------+ ++----------+--------------+----------+ +``` + +The join produces these changelog records for `product_orders`: + +``` ++I[Laptop, 1] -- upsert key: order_id=1 ++I[Phone, 2] -- upsert key: order_id=2 ++I[Laptop, 3] -- upsert key: order_id=3 ← conflicts with order_id=1 on PK 'Laptop' +``` + +Two records with different upsert keys (`order_id=1` and `order_id=3`) target the same +primary key (`product_name='Laptop'`). This is the conflict each strategy resolves differently: + +- **`DO ERROR`** — throws a runtime exception because two distinct upsert keys map to the same primary key. +- **`DO NOTHING`** — keeps the first record and discards the conflict: + + | product_name | last_order_id | + |:-------------|:--------------| + | Laptop | 1 | + | Phone | 2 | + +- **`DO DEDUPLICATE`** — accepts both; the last arriving value is visible: + + | product_name | last_order_id | + |:-------------|:--------------| + | Laptop | 3 | + | Phone | 2 | + +**What happens on retraction?** If order 3 is later deleted from the source, the join +emits a retraction `-D[Laptop, 3]`: + +- **`DO NOTHING`** — the retraction has no effect because `(Laptop, 3)` was never written. + The Laptop row remains with `last_order_id=1`. +- **`DO DEDUPLICATE`** — rolls back to the previous value. Laptop falls back to order 1, + producing `{(Laptop, 1), (Phone, 2)}`. The full history kept in state enables this + correct rollback. + {{< top >}}