Skip to content

Add new Iceberg BatchedWriter that auto handles batches and is durabl…#1184

Merged
bbalser merged 3 commits intomainfrom
bbalser/helium-iceberg-batched-writer
May 5, 2026
Merged

Add new Iceberg BatchedWriter that auto handles batches and is durabl…#1184
bbalser merged 3 commits intomainfrom
bbalser/helium-iceberg-batched-writer

Conversation

@bbalser
Copy link
Copy Markdown
Collaborator

@bbalser bbalser commented May 5, 2026

Add BatchedWriter to helium_iceberg with crash-recovery spooling

Summary

Introduces BatchedWriter<T> — a batching layer over IcebergTable<T> that accumulates records, spools them to disk, and commits them to Iceberg in larger snapshots. Designed for streaming-ingestion call sites that today either commit per record (snapshot churn) or hand-roll their own buffering.

Records are durably spooled in Arrow-IPC stream format on the way in, so a process abort between flushes is recoverable: on next startup the new task replays any leftover spool files for its table before accepting new traffic.

Public API

let (writer, task) = BatchedWriter::new(
    table,
    BatchedWriterConfig::new(spool_dir)
        .with_max_batch_size(10_000)
        .with_batch_timeout(Duration::from_secs(60)),
);

// Register `task` with TaskManager (impl ManagedTask), or spawn `task.run(shutdown)`.

writer.queue(record).await?;          // returns once on disk (kernel page cache)
writer.queue_all(records).await?;     // batch variant
writer.flush().await?;                // forces an Iceberg commit, waits for result
  • queue / queue_all are ack'd: they don't return until the records have been pushed through the spool's BufWriter to the kernel page cache. After they return, the records survive a process abort.
  • The task triggers an Iceberg commit when the spool reaches max_batch_size, when batch_timeout elapses, on explicit flush(), on triggered::Listener shutdown, or on channel close.
  • Each commit emits an info log: flushed batch to iceberg table="ns.tbl" reason="size" records=10000 duration_ms=842. Reasons: "size", "timeout", "manual", "shutdown", "channel_closed".

How the spool works

  • One {namespace}__{table}__{uuid_v7}.arrow file per task, opened lazily on first append (so a clean shutdown with nothing buffered leaves the dir empty).
  • Append path: T → RecordBatch → StreamWriter::write → BufWriter::flush (kernel page cache). Wrapped in spawn_blocking because arrow-ipc is sync-only.
  • Flush path: StreamWriter::finish → File::sync_data (the only fsync) → read all batches back via StreamReaderIcebergTable::write_record_batches → delete file.
  • Replay path: scan the dir for files matching the table's prefix, read each, commit, delete. Truncated trailing batches (kill -9 mid-append) are detected and dropped via the IPC stream's length-prefix framing.

Files changed

File Change
helium_iceberg/src/batched_writer/mod.rs New: BatchedWriter, BatchedWriterConfig, BatchedWriterTask, ManagedTask impl, run loop, log helper.
helium_iceberg/src/batched_writer/spool.rs New: Spool — Arrow-IPC stream lifecycle, append, flush, replay.
helium_iceberg/src/iceberg_table.rs write_data_files now accepts Vec<RecordBatch> so one snapshot can absorb many spool batches; new IcebergTable::write_record_batches skips the arrow-json round-trip; records_to_batch promoted to pub(crate).
helium_iceberg/src/lib.rs Re-export the new types.
helium_iceberg/Cargo.toml Add arrow-ipc, task-manager, triggered; tempfile as dev-dep.
helium_iceberg/tests/batched_writer.rs New: 5 integration tests against the Polaris/Trino/S3 harness.

Design decisions

  • Non-idempotent only. A batch aggregates records from many submissions; idempotency stays on IcebergTable directly via write_idempotent.
  • Separate API, not DataWriter<T>. write_idempotent doesn't fit batching semantics, so we don't pretend to.
  • ManagedTask integration. Mirrors FileSink — fits the workspace shutdown story.
  • spool_dir is required. No Default impl; the builder's BatchedWriterConfig::new(spool_dir) is the entry point.

@bbalser bbalser requested review from macpie and michaeldjeffrey May 5, 2026 18:55
use crate::{Error, Result};
use spool::Spool;

const DEFAULT_MAX_BATCH_SIZE: usize = 10_000;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What unit is this?
glacon-rs uses the bytesize crate that provides some const constructors.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just the number of records, not bytesize

@bbalser bbalser marked this pull request as ready for review May 5, 2026 20:15
@bbalser bbalser merged commit 9c3ac8a into main May 5, 2026
29 checks passed
@bbalser bbalser deleted the bbalser/helium-iceberg-batched-writer branch May 5, 2026 20:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants