feat: add namespace insert write path#587
Conversation
| byte[] requestData = serializeCurrentBatch(); | ||
| InsertIntoTableRequest request = new InsertIntoTableRequest().id(tableId).mode("append"); | ||
| try { | ||
| namespace.insertIntoTable(request, requestData); |
There was a problem hiding this comment.
This path commits visible data directly from executor tasks via namespace.insertIntoTable(...). If a task inserts one or more batches and Spark later retries or speculatively reruns the same partition because a later batch, executor loss, or close fails, the replacement attempt can insert the same rows again and the Spark job can still succeed with duplicates. Please do not describe the current path as safe or atomic. Either add first-class namespace idempotency with a stable cross-attempt key such as write_id + partition_id + batch_ordinal, gated by namespace capability/version discovery, or move visible writes behind a staged driver commit. Disabling retries/speculation only reduces exposure; it is not a correctness fix.
| rowsInserted += rowCount; | ||
| insertRequests++; | ||
| hasRowsInCurrentBatch = false; | ||
| allocateBatch(); |
There was a problem hiding this comment.
After a successful insert, flush() allocates the next Arrow batch, and the final commit() flush uses the same path. If the last insert has already succeeded but a later allocateBatch() or close() cleanup fails, Spark treats the task as failed and may retry data that was already inserted. Please split normal flush from final flush so the last successful insert does not allocate another batch. Once any insert has happened, cleanup of the sharding evaluator, Arrow batch, blob resolver, and namespace should be best-effort: log failures but do not throw them back to Spark. Close failures before any insert can still fail the task.
| @Override | ||
| public WriterCommitMessage commit() throws IOException { | ||
| if (shardingKeyEvaluator != null) { | ||
| shardingKeyEvaluator.flush(this::writePartitionedRow); |
There was a problem hiding this comment.
This new namespace insert writer calls shardingKeyEvaluator.flush(this::writePartitionedRow) in commit(). Inside ShardingBatchKeyEvaluator.flush(...), it calls the row consumer in the loop, and that consumer may trigger namespace inserts. The no-extra-batch validation and the reset/allocation in finally happen after the consumer has run. If some rows have already been inserted and a later validation or reset fails, Spark can retry and duplicate those rows. Please finish evaluator validation and key extraction before invoking the consumer, including the no-extra-batch check. After the consumer may have forced namespace inserts, only cleanup/close failures should be best-effort.
|
|
||
|
|
||
| @pytest.mark.rest_dir_compatible | ||
| class TestDMLNamespaceInsert: |
There was a problem hiding this comment.
TestDMLNamespaceInsert currently covers basic append and fixed-size-list vector writes, but not the blob-reference resolver path that namespace insert depends on. Please add an integration test that reads from a source path producing blob references, then writes to a namespace-insert target with use_namespace_insert=true through a shuffled or joined DataFrameWriterV2 append, and verifies byte equality in the target table. The test also needs to prove that the namespace insert writer path was selected; byte equality alone can pass through the default writer. If the test is outside TestDMLNamespaceInsert, update NAMESPACE_INSERT_PYTEST_CMD and mark it @pytest.mark.rest_dir_compatible.
| - synchronize | ||
| - ready_for_review | ||
| - reopened | ||
| paths: |
There was a problem hiding this comment.
This workflow is intended to cover namespace insert, but the current paths mostly cover write/options/integration files. Namespace insert also depends on the Arrow IPC writer, blob-reference read/write path, source-context optimizer rule, session-extension injection, sharding utilities, namespace catalog construction, and runtime namespace wiring. Prefer source-area filters instead of enumerating individual dependencies, for example lance-spark-base_2.12/src/main/**, lance-spark-base_2.12/src/test/**, lance-spark-*/src/main/**, and lance-spark-*/src/test/**, so key path changes still run the local/rest-dir namespace insert tests.
| | `batch_size` | Integer | `8192` | Maximum rows per Arrow batch/request before flushing. | | ||
| | `max_batch_bytes` | Long | `268435456` | Maximum approximate bytes per Arrow batch/request before flushing. | | ||
|
|
||
| Namespace insert writes are intended for append ingestion through a namespace implementation, |
There was a problem hiding this comment.
The docs currently say that rows may be visible after failures, but they do not say that namespace insert is currently an at-least-once path, nor that a Spark job can succeed with duplicate rows after task retry or speculation. Please update both user-facing docs to state that replay de-duplication can only be claimed when the connector verifies a first-class namespace idempotency capability with defined replay semantics. Otherwise users should treat this as at-least-once ingestion. Users who need Spark driver-side atomic commit semantics should use the default writer.
Adds an opt-in namespace insert append path for namespace-backed Lance tables.
Expected user experience:
use_namespace_insert=true.namespace_insert_parallelismlets users request the number of writer tasks; sharded tables use the sharding distribution, and unsharded tables repartition by the first output column.Includes local/rest-dir Docker CI coverage and documentation updates.