Skip to content

[FLINK-36472][table] Preserve right-side projections when converting Correlate to physical#28246

Open
Au-Miner wants to merge 1 commit into
apache:masterfrom
Au-Miner:FLINK-36472
Open

[FLINK-36472][table] Preserve right-side projections when converting Correlate to physical#28246
Au-Miner wants to merge 1 commit into
apache:masterfrom
Au-Miner:FLINK-36472

Conversation

@Au-Miner
Copy link
Copy Markdown
Contributor

What is the purpose of the change

When a Correlate has a Calc on its right side that adds non-identity projections (e.g. LATERAL (SELECT s * 2 FROM UNNEST(...) AS T(s))), the physical conversion previously dropped those projections, leaving wrong output columns. This PR preserves them by wrapping the resulting Correlate in a Calc that re-applies the right-side projections.

Brief change log

  • StreamPhysicalCorrelateRule / BatchPhysicalCorrelateRule: split the right-side calc into (scan, projects, condition); when projects are non-identity, build an inner *PhysicalCorrelate with the natural join row type and wrap it in a *PhysicalCalc whose program is built by the new buildOuterProgram helper that shifts right-side projections past the left field count.
  • FlinkCalcMergeRule: add BATCH_PHYSICAL_INSTANCE so the new wrapping calc can merge with adjacent batch calcs (mirrors the existing STREAM_PHYSICAL_INSTANCE).
  • FlinkBatchRuleSets#PHYSICAL_OPT_RULES: register FlinkCalcMergeRule.BATCH_PHYSICAL_INSTANCE.

Verifying this change

Added testLateralProjectionFromUnnest in UnnestTestBase (exercised by both stream and batch UnnestTest) which asserts that the optimized plan contains a Calc(select=[a, *(b0, 2) AS doubled]) over the Correlate. Added testCorrelateWithRightSideProjection in CorrelateITCase which runs LATERAL (SELECT CONCAT(v, '_x') FROM TABLE(STRING_SPLIT(c, '|')) AS T(v)) and asserts the projected values appear in the sink. Updated UnnestTest.xml plans (stream + batch) accordingly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? no

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 25, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@Au-Miner Au-Miner marked this pull request as ready for review May 25, 2026 03:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants