DuckDB CPU attribution: dedicated executor + KurrentDB integration#5655
DuckDB CPU attribution: dedicated executor + KurrentDB integration#5655realtonyyoung wants to merge 17 commits into
Conversation
Specs the rework of the DuckDB CPU metric to address review feedback on PR #5642 (caller-side measurement misses parallel worker-thread CPU, blocks async, and relies on incidental ref-struct thread-affinity). Approach: own DuckDB's execution threads (worker pool via external_threads + a dispatcher pool) and sum their CPU as an observable counter — correct for parallel work, async- and thread-safe by construction. Two-repo split: Kurrent.Quack first (executor + native task-scheduler interop + cross-thread CPU read), then KurrentDB (call-site migration + metric registration). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Deploying eventstore with
|
| Latest commit: |
2ff3959
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://0424338b.eventstore.pages.dev |
| Branch Preview URL: | https://spec-duckdb-cpu-attribution.eventstore.pages.dev |
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
There was a problem hiding this comment.
Pull request overview
This PR adds an RFC/design spec describing a revised approach for attributing DuckDB CPU usage by executing DuckDB work on dedicated, owned threads (rather than measuring caller-thread CPU deltas), with a two-repository rollout plan across Kurrent.Quack and KurrentDB.
Changes:
- Introduces a
DuckDBExecutordesign with separate worker and dispatcher thread pools to avoid deadlocks and correctly capture parallel DuckDB CPU. - Defines a metric strategy based on summing per-thread CPU time for owned threads and exporting it via OpenTelemetry.
- Outlines call-site migration points in KurrentDB and a staged rollout/deprecation plan for the earlier caller-side metric idea.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- #5642 is open (not merged): "shipped" -> "proposed", especially since the caller-side metric is being pulled per this spec. - kurrentdb_proc_cpu is an ObservableUpDownCounter (gauge), so rate() is wrong; compare the DuckDB CPU-seconds rate against the gauge directly. - "expected-standard" -> "expected to be standard". Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…ersion) Version 0.0.0-local.1 is a local-feed placeholder (NuGet.config, git-excluded) pointing at a dotnet-pack of the Kurrent.Quack feat/duckdb-executor branch. Flipped to the published prerelease in the final integration task once the Quack PR is merged and tagged. Kurrent.Quack.Arrow moves in lockstep (ConnectionHelpers lives in core, referenced by Arrow). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…ol plumbing Replaces DuckDBConnectionPoolLifetime (shared pool + per-Kestrel-connection READ_ONLY pool via ConnectionInterceptor + RequestServices-swapping middleware) with DuckDBExecutorLifetime, which owns a single Kurrent.Quack DuckDBExecutor that runs all DuckDB work on its own worker/dispatcher threads. This is the foundation for measuring total DuckDB CPU via DuckDBExecutor.SampleCpu(). AddDuckDb now takes (serviceName, workerCount, dispatcherCount) and no longer registers DuckDBConnectionPool or the Kestrel interceptor/middleware. Also removes the Core-side pool plumbing that would otherwise dangle: the Pool property on ReadIndexEventsForward/Backward, the pool ctor params on Enumerator.ReadIndexForwards/Backwards/IndexSubscription, the RequestServices.GetRequiredService<DuckDBConnectionPool>() resolution in Streams.Read.cs, and four additional pool: null / .Pool call sites found by the build gate (PublisherReadExtensions, PublisherSubscribeExtensions, SubscriptionsService's long-poll retry cloning). KurrentDB.Core builds green. KurrentDB.SecondaryIndexing intentionally does not build yet (its readers/QueryEngine/StatsService still reference DuckDBConnectionPool) — migrating them is Tasks 5-7. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…PU samples
Turns the DuckDBCpuMetrics shell into a real ObservableCounter named
"{serviceName}.duckdb.cpu.seconds" that sums DuckDBExecutor.SampleCpu() per
role (worker/dispatcher). The metric is created eagerly by DuckDBExecutorLifetime
(a hosted service, so instantiated at node startup) over the executor it owns, so
the instrument exists even without a consumer. Adds "KurrentDB.DuckDB" to
metricsconfig.json Meters and a TDD unit test asserting the per-role sums.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…d connections) Processors now bind their BufferedView appender to a caller-owned executor.OpenConnection(); every DuckDB call on that connection (the ctor read, CreateUserIndex, Flush/commit, SetCheckpoint) runs via executor.ExecuteOn so it executes on a measured dispatcher thread. TryIndex row-appends stay on the subscription thread (in-memory buffer, intentionally unmeasured). Commit() becomes ValueTask CommitAsync(CancellationToken) and Checkpoint(...) becomes ValueTask CheckpointAsync(...); the subscriptions await them so appender CreateRow (subscription thread) and Flush (dispatcher, via ExecuteOn) never overlap. Dispose blocks on CommitAsync once before unregister/dispose. UserIndex engine/subscription thread the DuckDBExecutor through instead of the pool; the two db.Rent cleanup sites become executor.Execute ops. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
SecondaryIndexReaderBase takes a DuckDBExecutor and rents a connection per read via executor.Execute; the abstract GetDbRecordsForwards/Backwards now receive an open DuckDBAdvancedConnection (instead of a pool) and keep their CaptureSnapshot(connection) + query bodies minus the Rent wrapper. Removes GetPool and the msg.Pool usage deleted from the messages in the executor-lifetime change. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…ry cancellation ExecuteAsync moves the whole rent/snapshot/prepare/consume/cleanup pipeline into one executor.Execute op (the dispatcher blocks on the consumer by design; dispatcherCount bounds concurrent streams). QueryEngine sheds its own InterruptQueryOnCancellation registration and its DuckDBException(Interrupt)->OCE mapping — Quack registers the interrupt and maps it internally. GetArrowSchema gets the same Execute wrap (no consume loop) and stays synchronous by blocking on the dispatcher op. PrepareQuery becomes ValueTask<MemoryOwner<byte>> PrepareQueryAsync(ReadOnlyMemory<byte>, ...); the Rewriter's two Rent sites collapse into the single rented connection of one Execute op. StatsService methods become async ValueTask<...>, each wrapping snapshot+query in executor.Execute. Callers updated to await: FlightSqlServer (PlainQuery/Schema go genuinely async; ConnectionState's registry call blocks on the async prepare to keep its span+out mechanism intact), UI QueryService, and UiStatsService (drops its Task.Run offload since the dispatcher already runs the work off the render thread). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
DuckDbIntegrationTest constructs a DuckDBExecutor (workerCount 2, dispatcherCount 2) instead of a raw pool, exposes it as `protected readonly DuckDBExecutor Executor`, and runs schema setup + all query helpers via Executor.Execute. Processor/reader tests build from the executor, and their Commit()/query helpers become async (CommitAsync + Execute). Drops the stale `pool: null` arg from the ReadIndexEventsForward/Backward test message construction (the Pool property was removed from those messages). ReadTests awaits PrepareQueryAsync and CommitAsync. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…hared DB Task 4 removed the shared DuckDBConnectionPool DI registration, but SchemaRegistry reaches DuckDB through Kurrent.Surge.DuckDB's IDuckDBConnectionProvider, which is constructed from a DuckDBConnectionPool - so every node boot failed to resolve it (14 integration tests). The external projector framework hard-requires that provider, so a full reroute to executor.Execute is not possible. Register a DuckDBConnectionPool built from DuckDBExecutor.CreateSharedConnectionPool() (added in Kurrent.Quack): an independent, DI-owned pool that shares the executor's already-open database (same file => same in-process DuckDB instance and task scheduler). SchemaRegistry's query work therefore still runs on the executor's measured worker threads. Bumps the local Quack feed to local.2 for the new API. SecondaryIndexing.Tests 74/74 green; SchemaRegistry.Tests 113 passed + 1 skipped. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…ol offload) Review follow-up: the class summary still described the removed Task.Run thread-pool offload; StatsService now runs its DuckDB work on an executor dispatcher thread. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
- DuckDBExecutorLifetime: wrap the one-time-setup execution in try/catch that disposes the executor before rethrow, so a setup failure at startup can't leak the executor's worker/dispatcher threads + open DB handle (Important #2). - DefaultIndexProcessor.CommitAsync: guard on IsDisposed, not IsDisposingOrDisposed, so the dispose-time flush actually runs (DotNext sets Disposing before Dispose(bool)). Matches UserIndexProcessor; removes the shutdown re-index window. Verified safe: Dispose() drains the subscription before Dispose(bool), and _committing guards residual concurrency (Minor #4). - DuckDBCpuMetrics: note in the metric description that SchemaRegistry's own-thread DuckDB work is excluded (its parallel portions still run on worker threads) (Minor #5). - docs/server/diagnostics/metrics.md: document the new DuckDB CPU meter with the kurrentdb_proc_cpu cross-reference for computing DuckDB's CPU share (Important #3). SecondaryIndexing.Tests 74/74 green after the changes. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…sh, meter root, checkpoint bound) - UserIndexEngineSubscription.ReadForwards/ReadBackwards: make async and await the DuckDB read UNDER the read lock. They now queue an async executor.Execute, so returning the ValueTask while releasing the lock let a concurrent stop/delete take the write lock and dispose the processor before the queued read ran (P2). - DefaultIndexProcessor.CommitAsync: revert the dispose-time flush (guard on IsDisposingOrDisposed, not IsDisposed). Handle(BecomeShuttingDown) disposes the processor WITHOUT draining the subscription, so a dispose-time Flush() could race an in-flight TryIndex/CreateRow. Skipping is safe — the index self-heals from its committed position on restart (P2). - DuckDBCpuMetrics: store the Meter in a field so it (and its ObservableCounter) stays rooted for the process lifetime; it was created inline and only kept alive by listener internals (P3). - DuckDBExecutorLifetime.StopAsync: bound the shutdown checkpoint with a timeout so a dispatcher pool saturated by streaming queries can't hang teardown; a missed checkpoint is not data loss (WAL replays on open). The broader dispatcher-starvation fix (pool split vs sizing) is deferred to the load/soak (P1, per decision). SecondaryIndexing.Tests 74/74. (Directory.Packages.props bumped to the reviewed Quack local.3 placeholder.) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
realtonyyoung
left a comment
There was a problem hiding this comment.
Reviewed the C# integration in spec/duckdb-cpu-attribution against origin/master, intentionally ignoring the docs/superpowers spec/plan markdown and the Kurrent.Quack 0.0.0-local.3 placeholder as requested.
Findings
No severity-categorized C# defects found.
Merge verdict: mergeable from this C# integration review, assuming the known draft-only docs/package-placeholder items are resolved before the PR leaves draft.
Prior fixes verified
UserIndexEngineSubscription.ReadForwards/ReadBackwardsare now async and await the reader under the read lock. This keeps the processor alive until the queued executor read captures its snapshot.DefaultIndexProcessor.CommitAsyncnow guards onIsDisposingOrDisposed; the dispose-time flush is skipped as intended, andBecomeShuttingDownstill disposes without draining.DuckDBCpuMetricsstores theMeter, so the observable counter remains rooted for the lifetime of the metrics object.DuckDBExecutorLifetime.StopAsyncbounds the shutdown checkpoint with a 30s timeout. The remaining caveat is limited to a checkpoint that has already started and does not respond promptly to executor disposal/interrupt, which is separate from the dispatcher-starvation case this fix addresses.
Full pass notes
- Appender serialization looks sound: default indexing keeps
TryIndexand batchCommitAsyncon the single subscription processing task; shutdown deliberately skips the derived-state flush. User-index reads/snapshots hold the read lock across the async executor read or snapshot capture. - The async read ripple is consistent through
ISecondaryIndexReader,SecondaryIndexReaderBase, storage reader workers, and transport enumerators. Cancellation/expiry handling is preserved at the storage worker boundary. - The SchemaRegistry bridge is wired through the executor-backed shared
DuckDBConnectionPool. I also checked the external registration shape:AddDuckDBConnectionProviderusesTryAddSingleton, so it should not replace the existing shared pool registration. - Query lifecycle/disposal looks balanced: query execution captures snapshots and releases reader/statement/snapshots in
finally.
Verification
git diff --check origin/master...spec/duckdb-cpu-attributionpassed.- Targeted Release test pass succeeded: 37 tests across
DefaultIndexProcessorTests,DuckDBCpuMetricsTests, and default index reader tests.
What
This PR now carries both the approved design spec (RFC) and the KurrentDB implementation of it: all DuckDB work is routed through the Kurrent.Quack
DuckDBExecutor(which owns DuckDB's worker/dispatcher threads), andkurrentdb.duckdb.cpu.secondsis exposed on/metricsso operators can see DuckDB's share of the process CPU thatkurrentdb_proc_cpureports.docs/superpowers/specs/2026-06-26-duckdb-cpu-attribution-design.mddocs/superpowers/plans/2026-07-01-duckdb-executor-integration.mdThis depends on the executor library in kurrent-io/Kurrent.Quack#52. Until that merges and publishes a prerelease,
src/Directory.Packages.propspins a local-feed placeholder (Kurrent.Quack/Kurrent.Quack.Arrow0.0.0-local.3), so CI will be red here (it can't restore that package). When #52 publishes, we flip to the published version and mark this ready.Reviewable now — the code is complete and green locally: SecondaryIndexing.Tests 74/74, SchemaRegistry.Tests 113 (+1 skipped), the
DuckDBCpuMetricsunit test, and a live/metricssmoke (bothrole="worker"/role="dispatcher"series present).What the integration does
DuckDBExecutorLifetimeowns DB open (threads/external_threads+ the 25%memory_limitheuristic); the old shared-RW + per-Kestrel-connection READ_ONLY pool mechanism (interceptor / middleware /msg.Poolplumbing) is removed end-to-end.BufferedViewappender is bound to it) and run flush/checkpoint viaexecutor.ExecuteOn; readers, queries, and stats run viaexecutor.Execute.TryIndexrow-appends stay on the subscription thread.Executeop (Quack owns cancellation); StatsService is async.DuckDBExecutor.CreateSharedConnectionPool()) that shares the executor's open database, since the externalKurrent.Surge.DuckDBprovider requires aDuckDBConnectionPool.ObservableCounter kurrentdb.duckdb.cpu.seconds(tagrole=worker/dispatcher) overSampleCpu(); meterKurrentDB.DuckDBinmetricsconfig.json; documented indocs/server/diagnostics/metrics.md. (DuckDB's worker/dispatcher CPU split is non-deterministic — sum across roles for total DuckDB CPU.)Supersedes #5642 (the caller-side approach, now closed).
Known follow-up (before default-on)
Streaming FlightSQL queries block a dispatcher, and the dispatcher pool is shared with index commits/reads/checkpoint — enough slow streaming clients can starve it. The shutdown checkpoint is now bounded by a timeout; the steady-state fix (size
DispatcherThreadsvs. split streaming onto its own pool) is deferred to the spec §11 load/soak.🤖 Generated with Claude Code