Skip to content

feat: add Comet CachedBatchSerializer for native in-memory cache#4569

Draft
andygrove wants to merge 20 commits into
apache:mainfrom
andygrove:comet-cached-batch-serializer
Draft

feat: add Comet CachedBatchSerializer for native in-memory cache#4569
andygrove wants to merge 20 commits into
apache:mainfrom
andygrove:comet-cached-batch-serializer

Conversation

@andygrove

@andygrove andygrove commented Jun 2, 2026

Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #2391

Rationale for this change

When a DataFrame or table is cached (df.cache() / CACHE TABLE), Spark's DefaultCachedBatchSerializer stores each column in Spark's compressed columnar format. Comet does not treat InMemoryTableScanExec as native, so it inserts a CometSparkToColumnarExec above it and pays a JVM-to-Arrow conversion on every read of the cached data:

cached (compressed) -> decompress to Spark ColumnarBatch -> convert to Arrow -> native

That conversion runs on every scan, which undercuts the benefit of caching for native pipelines. This PR lets Comet store the cache as compressed Arrow IPC once, at cache-build time, so repeated scans feed native execution directly with no per-read conversion. This is the same approach used by other columnar Spark accelerators.

What changes are included in this PR?

A new CometCachedBatchSerializer (plugged into Spark's spark.sql.cache.serializer) that:

  • Encodes each cached batch to compressed Arrow IPC (reusing Comet's existing serializeBatches/decodeBatches), storing the bytes plus a Spark-format per-column stats row.
  • Extends Spark's SimpleMetricsCachedBatchSerializer, so batch-level partition pruning (buildFilter) works using the computed min/max/null/count stats.
  • Decodes back to CometVector-backed ColumnarBatch on read, with column pruning and an InternalRow fallback for non-Comet consumers.
  • Delegates transparently to Spark's DefaultCachedBatchSerializer for schemas it does not support (nested/complex types), so it is a safe drop-in.

Supporting changes:

  • CometSparkToColumnarExec gains a passthrough fast-path: batches whose columns are already CometVector are forwarded without a re-copy (with a numPassthroughBatches metric).
  • CometDriverPlugin installs the serializer at startup when the new spark.comet.cache.serializer.enabled config (default off) is set, respecting any user-provided serializer. The Spark property is a static config, so it must be set before the session is created.
  • New config spark.comet.cache.serializer.enabled (default off).

Supported flat types: boolean, integral, floating point, decimal, string, binary, date, timestamp, timestamp_ntz. Nested types delegate. Off by default.

How are these changes tested?

New tests:

  • CometCachedBatchSerializerSuite: stats-row layout; build path (compressed IPC + stats); decode round-trip; column pruning; the columnar read path (identity and pruned projections); the CometSparkToColumnarExec passthrough metric; a regression test that string min/max stats survive encoding (they are copied off the Arrow buffer); and end-to-end tests for cached-vs-uncached correctness, filtered pruning on numeric and string columns, MEMORY_AND_DISK spill, array-type delegation, and timestamp_ntz value round-tripping.
  • CometPluginsSuite: the driver plugin installs the serializer only when enabled and never overrides a user-provided non-default serializer.

Verified compiling and passing on Spark 3.4, 3.5, and 4.x profiles.

andygrove added 20 commits June 2, 2026 08:13
Add TimestampNTZType alongside TimestampType in isCometType, readValue,
CometCacheColumnStats.ordered, and CometCacheColumnStats.compare so that
timestamp-without-timezone columns are cached natively.

Remove the unnecessary .asInstanceOf[CachedBatch] cast from encode (Iterator
covariance makes it redundant), drop the dead val attrs = schema aliases in
both build methods, and add a lifecycle comment on encodeBytes documenting
that stats must be computed before serialization clears the VectorSchemaRoot.

Strengthen the build test to use coalesce(1) for deterministic batch count
(exactly 3 batches for 250 rows at batch size 100) and assert real stat
values: minimum lowerBound of the id column is 0 and nullCount is 0.
Add a test for convertCachedBatchToColumnarBatch exercising both the
identity-projection passthrough (full columns) and pruned projection.
Improve selectedIndices to throw IllegalStateException with a diagnostic
message instead of a raw map key lookup. Update the isIdentityProjection
comment to state both conditions explicitly.
…hout copy

When CometSparkToColumnarExec receives batches that are already Arrow
(all columns are CometVector, e.g. from CometCachedBatchSerializer),
skip the columnarBatchToArrowBatchIter re-copy and pass the batch
through directly. Adds a numPassthroughBatches metric to track when
the fast-path fires.
Wire COMET_CACHE_SERIALIZER_ENABLED to CometDriverPlugin.init so the
static spark.sql.cache.serializer config is set on the SparkConf at
SparkContext startup. A user-provided non-default serializer is
respected and not overridden.
…g bounds

In CometCachedBatchSerializer.readValue, the StringType case previously
returned col.getUTF8String(r) directly, which is a view into the Arrow
value buffer (backed by UTF8String.fromAddress). The stats accumulator
stored these views as lowerBound/upperBound, then encodeBytes called
serializeBatches which clears the VectorSchemaRoot and releases those
Arrow buffers. The stored string bounds then dangled, causing
SimpleMetricsCachedBatchSerializer.buildFilter to prune batches using
garbage stats, resulting in missing rows on filtered cached string scans.

Fix by calling .copy() to materialize a heap copy before the buffer is
freed. Add regression tests: one verifying stats survive encode, one
verifying an equality filter on a cached string column returns correct rows.
@pchintar

pchintar commented Jun 4, 2026

Copy link
Copy Markdown

@andygrove oh oh I didn't notice this at all until now 😬

@andygrove andygrove added this to the 1.0.0 milestone Jun 11, 2026
@andygrove

Copy link
Copy Markdown
Member Author

Comparison of #4569 and #4591

These two PRs both close #2391 and take the same fundamental approach, so cross-linking a comparison here for visibility.

Shared goal and mechanics

Both solve the same problem: Comet does not treat InMemoryTableScanExec as native, so it inserts a CometSparkToColumnarExec and pays a JVM-to-Arrow conversion on every cached read. Both fix this by storing the cache as compressed Arrow IPC once at build time, so repeated scans feed native execution directly.

Both share the same core building blocks:

  • A custom CachedBatchSerializer that encodes batches with Comet's existing serializeBatches / decodeBatches (compressed Arrow IPC).
  • A CometCachedBatch payload holding the IPC bytes.
  • Installation of the serializer via CometDriverPlugin at startup.
  • A new config, off by default.
  • Decode back to CometVector-backed ColumnarBatch with column pruning, plus an InternalRow fallback for non-Comet consumers.
  • Roughly the same size and the same test layout (a serializer suite plus a plugin/exec suite).

Key differences

Dimension #4569 #4591
Serializer base class SimpleMetricsCachedBatchSerializer plain CachedBatchSerializer
Batch-level pruning Yes: stores a Spark-format per-column stats row (min/max/null/count), so buildFilter prunes batches No: stats = InternalRow.empty, buildFilter is a pass-through no-op
How the scan stays native No new operator. Reuses CometSparkToColumnarExec with a passthrough fast-path: if columns are already CometVector, forward without re-copy (adds numPassthroughBatches metric) New operator CometInMemoryTableScanExec (a CometExec / LeafExecNode) plus a CometOperatorSerde, wired into CometExecRule's nativeExecs map
Unsupported types Delegates to DefaultCachedBatchSerializer (nested/complex) as an explicit drop-in Delegates to DefaultCachedBatchSerializer by inspecting batch class in the convert methods
Serializer install policy Sets spark.sql.cache.serializer only when enabled, and never overrides a user-provided non-default serializer Always registers the serializer; the serializer decides at runtime whether to use Comet format or delegate
Config spark.comet.cache.serializer.enabled spark.comet.exec.inMemoryCache.enabled (EXEC category)
Plan-rule changes Minimal (works through the existing SparkToColumnar path) Adds an explicit InMemoryTableScanExec case with detailed fallback-reason messages (disabled / wrong-batch-class / empty-buffer)

Architectural distinction

The main difference is the integration strategy:

  • feat: add Comet CachedBatchSerializer for native in-memory cache #4569 is serializer-centric. It does the minimum on the plan-rule side and leans on the existing CometSparkToColumnarExec, teaching it to skip the copy when batches are already Arrow. It also invests in stats so filter pushdown prunes cached batches, and is careful to respect a user-set serializer.
  • feat: Add Native Support for In-Memory Cache #4591 is operator-centric. It introduces a dedicated CometInMemoryTableScanExec node and serde, giving cached scans a first-class place in the Comet operator framework with explicit fallback reasons. It currently skips column statistics, so cached filters do not get batch pruning.

Suggested path forward

A strong combined outcome would pair #4591's dedicated scan operator and explicit fallback reasons with #4569's stats-based buildFilter, passthrough fast-path, and respect-user-serializer install logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Explore options for accelerating InMemoryTableScanExec

2 participants