Skip to content

[FLINK-39740][table-runtime] Do not regress highestSqnAndSizeState on row replace in LinkedMultiSetState#28240

Open
jubins wants to merge 2 commits into
apache:masterfrom
jubins:j-flink-39740-linkedmultisetstate-bug-fix
Open

[FLINK-39740][table-runtime] Do not regress highestSqnAndSizeState on row replace in LinkedMultiSetState#28240
jubins wants to merge 2 commits into
apache:masterfrom
jubins:j-flink-39740-linkedmultisetstate-bug-fix

Conversation

@jubins
Copy link
Copy Markdown

@jubins jubins commented May 23, 2026

What is the purpose of the change

Fixes FLINK-39740 — in LinkedMultiSetState, highestSqnAndSizeState tracks the highest sequence number ever assigned to a row in the doubly-linked list. It must grow monotonically, since new rows compute their SQN as highSqn + 1. However, LinkedMultiSetState.add() wrote the computed newSqn back into highestSqnAndSizeState unconditionally — including in the replace branch, where newSqn is set to the existing row's SQN (which is generally less than the current highSqn). The next genuinely new row would then compute newSqn = staleHighSqn + 1 and collide with an existing node in sqnToNodeState, silently overwriting it. The doubly-linked list is then corrupt: iteration returns the wrong rows, and downstream operators reading from this state observe missing or stale data.

Brief change log

  • In LinkedMultiSetState.add(), moved the highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize)) call inside the if (isNewRowKey) { ... } block. The replace branch already enforces newSqn = rowSqn and newSize = oldSize, so the prior unconditional write was redundant (state value didn't actually change) and destructive (rowSqn ≤ highSqn regressed the monotonic invariant).
  • No state schema, serializer, or public-API surface was modified — the fix is a single-line correction to the write path of an existing internal state primitive.

Verifying this change

This change is covered by a new regression test in SequencedMultiSetStateTest:

  • testAddAfterReplacingNonHighestRow — adds three rows with distinct keys (assigning SQNs 0, 1, 2), then replaces the first row (add on an existing key), then adds a fourth new row. The test asserts the iterator returns all four rows in insertion order with the replaced value visible. Before the fix, the replace regressed highSqn from 2 to 0, the fourth row reused SQN 1 and overwrote the second row's node in sqnToNodeState, and iteration returned only three rows with the second row missing.

Existing tests in SequencedMultiSetStateTest (testBasicFlow, testAdd, testAppend, testRemove, etc.) continue to pass, confirming the fix does not regress the happy path or the other state-mutation operations.

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): noLinkedMultiSetState is annotated @Internal (see line 80 of the class). The class is intended for internal use within Flink's table runtime, as its Javadoc explicitly states.
  • The serializers: no — state shape, MetaSqnInfoSerializer, NodeSerializer, RowSqnInfoSerializer, and RowDataKeySerializer are all unchanged. Existing checkpointed state can still be read.
  • The runtime per-record code paths (performance sensitive): no — the change removes one state write in the replace path (a small net positive). The add and append code paths are otherwise unchanged.
  • Anything that affects deployment or recovery (JobManager, Checkpointing, Kubernetes/Yarn, ZooKeeper): no with one caveat — checkpoints/savepoints written by an affected version (≥ 2.3.0) that exercised the buggy replace-then-add sequence may already contain a corrupt linked list. This PR prevents new corruption but does not retroactively repair existing corruption. Repair would require a separate state-migration step and is out of scope here.
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no — this is a correctness fix to existing behavior. No user-visible API or configuration changes.
  • If yes, how is the feature documented? n/a

Was generative AI tooling used to co-author this PR?

  • Yes — Claude was used as a pair-programming assistant for discussing the approach and implementation structure. All code was written, understood, and verified by the author.

Generated-by: Claude Opus 4.7

… row replace in LinkedMultiSetState

LinkedMultiSetState.add() unconditionally wrote the computed newSqn back into highestSqnAndSizeState. In the replace branch, newSqn was set to the existing row's SQN — generally less than the current highSqn — so the write regressed the highest-SQN invariant. The next genuinely new row would then compute newSqn = highSqn + 1, collide with an existing node in sqnToNodeState, and silently corrupt the doubly-linked list.
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 23, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build


@TestTemplate
public void testAddAfterReplacingNonHighestRow() throws Exception {
// Regression test for FLINK-39740: replacing an existing row by key must not regress
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this jira reference is not needed.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the JIRA reference.

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label May 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants