fix(dedupjoin): restore parallel probe for REPLACE INTO OldColCapture#24179
Merged
mergify[bot] merged 2 commits intomatrixorigin:mainfrom Apr 24, 2026
Merged
Conversation
The REPLACE INTO merged main-table scan path (introduced in matrixorigin#24153) broke parallel DEDUP JOIN probe: each worker kept capturedVecs / captured in container-local state, but the finalize() channel protocol only merged the matched bitmap. Non-merger workers' captures were silently dropped, which matrixorigin#24044 (actually 7cc544d) worked around by forcing Mcpu=1 on probe scopes whenever OldColCapture was active. That workaround wiped out the parallelism gains matrixorigin#24153 was meant to deliver. Fix by extending the finalize merge protocol: - Replace chan *bitmap.Bitmap with chan *WorkerJoinMsg carrying {matched, captured, capturedVecs}. Non-merger workers transfer capture ownership to the merger; the merger folds them in via mergeCaptured() using first-wins semantics across workers. - Remove the Mcpu=1 forcing in compile.go so broadcast DedupJoin with OldColCapture runs parallel again. Shuffle + capture stays NYI (separate follow-up once cross-CN channel semantics are designed). Tests: adds targeted unit tests for mergeCaptured (disjoint buckets, first-wins conflict, empty worker msg), channel round-trip with ownership transfer, and context-cancel / channel-close behavior. Existing end-to-end capture tests still pass. Benchmark (YCSB workload-replace, 100K ops, threads=16, zipfian, single CN, one iteration): | metric | main | fix | delta | |---------------------|---------|---------|----------| | Throughput (ops/s) | 1384.64 | 1561.67 | +12.8% | | UPDATE p95 (us) | 22527 | 18223 | -19.1% | | UPDATE p99 (us) | 53471 | 40895 | -23.5% | Related: matrixorigin#23946 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
aunjgr
approved these changes
Apr 24, 2026
Contributor
Merge Queue Status
This pull request spent 56 minutes 21 seconds in the queue, including 56 minutes 10 seconds running CI. Required conditions to merge
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What type of PR is this?
Which issue(s) this PR fixes:
issue #23946
What this PR does / why we need it:
Background
PR #24153 introduced an
OldColCapturemechanism in DEDUP JOIN to merge REPLACE INTO's two main-table scans into one. Each worker recorded probe-side old-column values into per-workercapturedVecskeyed by build bucket, intended to be emitted alongside the build row at finalize.That introduced a latent bug: parallel probe workers kept
capturedVecsandcapturedas container-local state, but thefinalize()multi-worker protocol only merged thematchedbitmap throughap.Channel. Non-merger workers' captured values were silently dropped, so rows matched by those workers would emit NULL (or stale) in the placeholder slot.Commit
7cc544d2d(fix(replace): disable parallel probe for DEDUP JOIN with OldColCapture) worked around this by forcingMcpu = 1on every probe scope wheneverOldColCapturewas active (pkg/sql/compile/compile.go). Shuffle + capture additionally panics NYI. The REPLACE INTO merged-scan path has thus been running without any probe-side parallelism since.This PR
Restore parallelism by upgrading the finalize merge protocol.
WorkerJoinMsgstruct inpkg/sql/colexec/dedupjoin/types.gocarries{matched, captured, capturedVecs}.DedupJoin.Channelbecomeschan *WorkerJoinMsg.capturedbitmap and, for each bucket not already present in its owncaptured, copies per-column values from the worker'scapturedVecsinto its own. First-wins semantics across workers — any one captured value for a bucket is semantically equivalent since HashOnUnique gives a 1:1 bucket↔build-row mapping.capturedVecsafter merging (ownership was transferred).Mcpu = 1forcing incompile.goso broadcast DedupJoin with OldColCapture runs parallel again. The shuffle + capture NYI panic stays — cross-CN pipeline channel semantics forcapturedVecsare out of scope here and will be addressed separately.dupOperator(also from7cc544d2d) is retained as it's a prerequisite for parallel clones.Tests
Added unit tests in
pkg/sql/colexec/dedupjoin/join_test.go:TestMergeCaptured_DisjointBuckets— parallel workers capture different buckets; after merge, merger owns the union.TestMergeCaptured_FirstWinsOnConflict— when both workers captured the same bucket, merger keeps its own value.TestMergeCaptured_EmptyWorkerMsg— worker with empty capture doesn't corrupt merger state.TestWorkerJoinMsg_ChannelRoundTrip— full channel send/receive + merge + free cycle without leaks.TestReceiveWorkerMsg_ContextCancel/_ChannelClose— receive helper respects context cancellation and closed channel.Existing end-to-end
TestDedupJoinCapture{,PartialMatch,Reset}unchanged, all pass.go test -raceclean.make static-checkclean.Benchmark
YCSB
workload-replace(100K ops, updateproportion=1, zipfian, threads=16, single CN, one iteration):Tail latency improvements (p99 -23%, max -56%) match the expected effect of restoring parallel probe: reduced queueing at the DEDUP JOIN stage under the high-conflict zipfian workload.
Follow-ups (separate issues / PRs)
Channel/WorkerJoinMsgtransport).Special notes for your reviewer:
The cross-worker first-wins semantic for probe-side same-key duplicates is equivalent to the existing intra-worker first-wins: the captured value is a property of the build-side bucket (main-table old row), not of which probe row matched first. If reviewers want to eliminate the non-determinism entirely, we'd need to serialize capture writes, which defeats the purpose of parallelism.