Tracking issue
Investigate adding a native MERGE strategy in lance-spark that dispatches
directly to lance-core's MergeInsertBuilder instead of going through
Spark's V2 row-level operations / position-delta path. Goal: measure
whether the native path is meaningfully faster on tables with many
fragments, then file upstream against lance-format/lance-spark if the
numbers justify it.
Today (lance-format/lance-spark @ HEAD)
MERGE INTO lance . ` SQL routes through:
LancePositionDeltaDataset implements SupportsRowLevelOperations
LancePositionDeltaOperation with representUpdateAsDeleteAndInsert = true
rowId() = [_rowaddr], requiredMetadataAttributes() = [_fragid]
Catalyst's RewriteMergeIntoTable lowers MERGE into a position-delta
plan: WriteDelta(... Project(deletes+inserts) Join(targetScan, source, ON merge-condition) ...). The join, optional shuffle, and per-_fragid
write are all Spark stages.
LanceScan.planInputPartitions() emits one InputPartition per Lance
fragment, so target-scan and position-delta-write stages are
fragment-aligned. With a 2500-fragment table, several fragment-aligned
stages × 2500 = thousands of Spark tasks per MERGE.
FilterPushDown.isFilterSupported accepts only basic V2 Filter types
(EqualTo, In, range, null). The MERGE ON condition is evaluated as
a join predicate, not a pushable filter — so pushedFilters is empty
for the target scan, zonemap pruning skips, and all fragments are
scanned regardless of indexes.
MergeInsertBuilder from lance-core is not invoked anywhere in
lance-spark today.
Proposal
A planner strategy that intercepts MergeIntoTable nodes targeting Lance
tables and routes them through MergeInsertBuilder:
dataset.mergeInsertBuilder(onColumns)
.whenMatchedUpdateAll() // or: ...UpdateAllIf(condition)
.whenMatchedDelete() // or: ...DeleteIf(condition)
.whenNotMatchedInsertAll() // or: ...InsertAllIf(condition)
.whenNotMatchedBySourceDelete() // or: ...DeleteIf(condition)
.execute(sourceRecordBatches);
Pros:
- Avoids Spark Join + shuffle + position-delta write (the per-
_fragid
rewrite stage).
- Lance-core can use
BTREE/Bitmap indexes on the merge ON column
for fragment pruning during the match phase.
- Per-arm predicates (
whenMatchedUpdateAllIf etc.) map naturally onto
the SQL WHEN MATCHED ... AND <cond> clauses.
Cons:
- Single-process:
MergeInsertBuilder.execute doesn't shard across
Spark executors. Source has to be materialized as RecordBatches on
the driver (or one executor).
- Predicate translation: Spark
Expression → Lance SQL string for the
per-arm *If predicates.
Plan
-
Stand up a benchmark in lance-spark-knn_2.12/.../benchmark/ (same
harness pattern as IndexedNearestJoinBenchmark):
- Build a synthetic Lance table at varying scales (100, 500, 2500
fragments).
- Run
MERGE INTO SQL via the current path.
- Run an equivalent direct
MergeInsertBuilder.execute from
Java/Scala.
- Validate equivalence (oracle check on a sample of merge keys).
- Time both, report wall-clock + Spark task counts.
-
If MergeInsertBuilder wins meaningfully on the
2500-fragment-with-small-source case (the workload shape seen in
prod), file upstream as a feature request with the numbers attached.
-
If it doesn't (e.g. one-process source materialization dominates),
document why and close.
Out of scope for this tracking issue
- Streaming source path (always materialize for the bench).
- Auto-routing heuristic between native and position-delta paths (will
manifest as a config flag in the PoC).
- Full Catalyst MERGE rewrite — the PoC can call MergeInsertBuilder
directly from a benchmark harness, no SQL parser changes needed
for measurement.
Status
Tracking issue
Investigate adding a native MERGE strategy in lance-spark that dispatches
directly to lance-core's
MergeInsertBuilderinstead of going throughSpark's V2 row-level operations / position-delta path. Goal: measure
whether the native path is meaningfully faster on tables with many
fragments, then file upstream against
lance-format/lance-sparkif thenumbers justify it.
Today (lance-format/lance-spark @ HEAD)
MERGE INTOlance.` SQL routes through:LancePositionDeltaDataset implements SupportsRowLevelOperationsLancePositionDeltaOperationwithrepresentUpdateAsDeleteAndInsert = truerowId() = [_rowaddr],requiredMetadataAttributes() = [_fragid]Catalyst's
RewriteMergeIntoTablelowers MERGE into a position-deltaplan:
WriteDelta(... Project(deletes+inserts) Join(targetScan, source, ON merge-condition) ...). The join, optional shuffle, and per-_fragidwrite are all Spark stages.
LanceScan.planInputPartitions()emits oneInputPartitionper Lancefragment, so target-scan and position-delta-write stages are
fragment-aligned. With a 2500-fragment table, several fragment-aligned
stages × 2500 = thousands of Spark tasks per MERGE.
FilterPushDown.isFilterSupportedaccepts only basic V2Filtertypes(
EqualTo,In, range, null). The MERGEONcondition is evaluated asa join predicate, not a pushable filter — so
pushedFiltersis emptyfor the target scan, zonemap pruning skips, and all fragments are
scanned regardless of indexes.
MergeInsertBuilderfrom lance-core is not invoked anywhere inlance-spark today.
Proposal
A planner strategy that intercepts
MergeIntoTablenodes targeting Lancetables and routes them through
MergeInsertBuilder:Pros:
_fragidrewrite stage).
BTREE/Bitmapindexes on the mergeONcolumnfor fragment pruning during the match phase.
whenMatchedUpdateAllIfetc.) map naturally ontothe SQL
WHEN MATCHED ... AND <cond>clauses.Cons:
MergeInsertBuilder.executedoesn't shard acrossSpark executors. Source has to be materialized as RecordBatches on
the driver (or one executor).
Expression→ Lance SQL string for theper-arm
*Ifpredicates.Plan
Stand up a benchmark in
lance-spark-knn_2.12/.../benchmark/(sameharness pattern as
IndexedNearestJoinBenchmark):fragments).
MERGE INTOSQL via the current path.MergeInsertBuilder.executefromJava/Scala.
If
MergeInsertBuilderwins meaningfully on the2500-fragment-with-small-source case (the workload shape seen in
prod), file upstream as a feature request with the numbers attached.
If it doesn't (e.g. one-process source materialization dominates),
document why and close.
Out of scope for this tracking issue
manifest as a config flag in the PoC).
directly from a benchmark harness, no SQL parser changes needed
for measurement.
Status
MergeInsertBuilderdirect-call run