Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
8925bf6
Add design spec: DuckDB CPU attribution via dedicated executor
realtonyyoung Jun 27, 2026
de1a916
Address spec review: #5642 wording, dashboard formula, typo
realtonyyoung Jul 1, 2026
f4eab47
Add implementation plan: DuckDB executor integration + /metrics wiring
realtonyyoung Jul 2, 2026
cbace1d
docs(plan): note arm64 local build must not use /p:Platform=x64
realtonyyoung Jul 2, 2026
fdc87d3
chore: consume Kurrent.Quack executor build (local feed placeholder v…
realtonyyoung Jul 2, 2026
67bf821
feat: DuckDB executor lifetime owns DB open; remove per-connection po…
realtonyyoung Jul 2, 2026
eb0c363
feat: kurrentdb.duckdb.cpu.seconds observable counter over executor C…
realtonyyoung Jul 2, 2026
8916e95
feat: index processors run DuckDB work on executor dispatchers (pinne…
realtonyyoung Jul 2, 2026
334b5f1
feat: index readers execute on DuckDB dispatchers
realtonyyoung Jul 2, 2026
17978a1
feat: QueryEngine and stats run on DuckDB dispatchers; Quack owns que…
realtonyyoung Jul 2, 2026
3102d84
test: fixtures build on DuckDB executor
realtonyyoung Jul 2, 2026
1e54151
fix: bridge SchemaRegistry's DuckDBConnectionPool to the executor's s…
realtonyyoung Jul 2, 2026
df31346
docs: correct UiStatsService class comment (dispatcher, not thread-po…
realtonyyoung Jul 2, 2026
8ecf4c3
fix: address final whole-branch review findings
realtonyyoung Jul 2, 2026
1404035
fix: address KurrentDB Codex review (read-lock lifetime, shutdown flu…
realtonyyoung Jul 3, 2026
3961d43
Merge DuckDB executor integration into the design-spec PR for combine…
realtonyyoung Jul 3, 2026
2ff3959
docs(spec): mark PR #5642 closed/superseded (review follow-up)
realtonyyoung Jul 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/server/diagnostics/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,14 @@ kurrentdb_proc_contention_count_total 297 1688147136862
kurrentdb_gc_pause_duration_max_seconds{range="16-20 seconds"} 0.0485873 1688147136862
```

### DuckDB

CPU time consumed by the dedicated DuckDB executor that owns every thread DuckDB runs on (used by secondary indexing and the query engine). Because the executor owns those threads, this is the DuckDB share of the process CPU reported by `kurrentdb_proc_cpu`: compare `rate(kurrentdb_duckdb_cpu_seconds_total)` against `kurrentdb_proc_cpu` to see the fraction of KurrentDB's CPU spent inside DuckDB.

| Time Series | Type | Description |
|:------------|:-----|:------------|
| `kurrentdb_duckdb_cpu_seconds_total{role=<worker\|dispatcher>}` | [Counter](#common-types) | Cumulative CPU seconds consumed by DuckDB's executor threads, split by `role`: `worker` (DuckDB's parallel task-scheduler threads) and `dispatcher` (threads that run queries, reads, and index commits). Requires per-thread CPU support (Linux/Windows); reads 0 on macOS. Excludes DuckDB work the schema registry runs on its own threads (low volume; its parallel portions still run on worker threads). |

### Projections

Projection metrics track the statistics for projections.
Expand Down
578 changes: 578 additions & 0 deletions docs/superpowers/plans/2026-07-01-duckdb-executor-integration.md

Large diffs are not rendered by default.

148 changes: 148 additions & 0 deletions docs/superpowers/specs/2026-06-26-duckdb-cpu-attribution-design.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@
<PackageVersion Include="Kurrent.Surge.Core" Version="1.1.1-alpha.1.71" />
<PackageVersion Include="Kurrent.Surge.DataProtection" Version="1.1.1-alpha.1.71" />
<PackageVersion Include="Kurrent.Surge.DuckDB" Version="1.1.1-alpha.1.71" />
<PackageVersion Include="Kurrent.Quack" Version="0.0.0-alpha.181" />
<PackageVersion Include="Kurrent.Quack.Arrow" Version="0.0.0-alpha.181" />
<PackageVersion Include="Kurrent.Quack" Version="0.0.0-local.3" />
<PackageVersion Include="Kurrent.Quack.Arrow" Version="0.0.0-local.3" />
<PackageVersion Include="KurrentDB.Client" Version="1.0.0" />
<PackageVersion Include="librdkafka.redist" Version="2.5.0" />
<PackageVersion Include="LruCacheNet" Version="1.2.0" />
Expand Down
6 changes: 0 additions & 6 deletions src/KurrentDB.Core.Testing/Helpers/MiniNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
using KurrentDB.Core.Bus;
using KurrentDB.Core.Certificates;
using KurrentDB.Core.Configuration.Sources;
using KurrentDB.Core.DuckDB;
using KurrentDB.Core.Messages;
using KurrentDB.Core.Services.Monitoring;
using KurrentDB.Core.Services.Storage;
Expand Down Expand Up @@ -270,11 +269,6 @@ public MiniNode(string pathname,
builder.Services.AddSerilog();
Node.Startup.ConfigureServices(builder.Services);
_webHost = builder.Build();
_webHost.Use(async (ctx, next) => {
var factory = ctx.RequestServices.GetRequiredService<DuckDBConnectionPoolLifetime>();
ctx.Features.Set<ConnectionScopedDuckDBConnectionPool>(new(factory));
await next();
});
Node.Startup.Configure(_webHost);
_started = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_adminUserCreated = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ IAsyncEnumerator<ReadResponse> GetEnumerator() {
user: SystemAccounts.System,
requiresLeader: false,
expiryStrategy: DefaultExpiryStrategy.Instance,
pool: null,
cancellationToken: cancellationToken
)
: new Enumerator.ReadIndexBackwards(
Expand All @@ -266,7 +265,6 @@ IAsyncEnumerator<ReadResponse> GetEnumerator() {
user: SystemAccounts.System,
requiresLeader: false,
expiryStrategy: DefaultExpiryStrategy.Instance,
pool: null,
cancellationToken: cancellationToken
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public static async IAsyncEnumerable<ReadResponse> SubscribeToIndex(this IPublis
checkpoint: position,
user: SystemAccounts.System,
requiresLeader: false,
pool: null,
cancellationToken: cancellationToken
);

Expand Down
8 changes: 4 additions & 4 deletions src/KurrentDB.Core/ClusterVNodeStartup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,6 @@ public void Configure(WebApplication app) {
app.UseAuthorization();
app.UseAntiforgery();

// provides a lazy DuckDB connection pool unique to the connection
app.UseDuckDb();

// allow all subsystems to register their legacy controllers before calling MapLegacyHttp
foreach (var component in _plugableComponents)
component.ConfigureApplication(app, _configuration);
Expand Down Expand Up @@ -314,7 +311,10 @@ public void ConfigureServices(IServiceCollection services) {
services.AddGrpcReflection();
#endif

services.AddDuckDb();
services.AddDuckDb(
_metricsConfiguration.ServiceName,
workerCount: _configuration.GetValue("KurrentDB:DuckDB:WorkerThreads", Math.Clamp(Environment.ProcessorCount / 2, 2, 16)),
dispatcherCount: _configuration.GetValue("KurrentDB:DuckDB:DispatcherThreads", Math.Clamp(Environment.ProcessorCount / 2, 2, 8)));

// Ask the node itself to add DI registrations
_configureNodeServices(services);
Expand Down
134 changes: 0 additions & 134 deletions src/KurrentDB.Core/DuckDB/DuckDBConnectionPoolLifetime.cs

This file was deleted.

39 changes: 39 additions & 0 deletions src/KurrentDB.Core/DuckDB/DuckDBCpuMetrics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

#nullable enable

using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using Kurrent.Quack;

namespace KurrentDB.Core.DuckDB;

// Reports the cumulative CPU time consumed by the DuckDB executor's worker and dispatcher threads as an
// observable counter, tagged by role. This is the whole point of routing DuckDB work through the executor:
// all DuckDB CPU is now attributable to threads the executor owns, so a single SampleCpu() reading sums them.
public class DuckDBCpuMetrics {
public const string MeterName = "KurrentDB.DuckDB";

// Held so the meter — and therefore its ObservableCounter — stays rooted for the lifetime of this object
// (owned by DuckDBExecutorLifetime, i.e. process lifetime). Without this reference the meter could be
// garbage-collected and the instrument would silently stop reporting.
private readonly Meter _meter;

public DuckDBCpuMetrics(Meter meter, string serviceName, Func<IReadOnlyList<DuckDBExecutor.CpuSample>> sampleCpu) {
_meter = meter;
meter.CreateObservableCounter($"{serviceName}.duckdb.cpu.seconds", Observe,
description: "CPU time consumed by DuckDB's executor worker and dispatcher threads, in seconds " +
"(excludes DuckDB work SchemaRegistry runs on its own threads via a shared connection pool; " +
"that work's parallel portions still run on the executor's worker threads).");

IEnumerable<Measurement<double>> Observe() {
double workers = 0, dispatchers = 0;
foreach (var s in sampleCpu())
if (s.Role == "worker") workers += s.CpuSeconds; else dispatchers += s.CpuSeconds;
yield return new(workers, new KeyValuePair<string, object?>("role", "worker"));
yield return new(dispatchers, new KeyValuePair<string, object?>("role", "dispatcher"));
}
}
}
111 changes: 111 additions & 0 deletions src/KurrentDB.Core/DuckDB/DuckDBExecutorLifetime.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using DotNext;
using Kurrent.Quack;
using KurrentDB.Core.TransactionLog.Chunks;
using KurrentDB.DuckDB;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace KurrentDB.Core.DuckDB;

// Owns the DuckDB executor: database open (with thread + memory settings applied at open),
// one-time schema setups, the shutdown checkpoint, and executor disposal.
public sealed class DuckDBExecutorLifetime : Disposable, IHostedService {
private readonly ILogger<DuckDBExecutorLifetime> _log;
[CanBeNull] private string _tempPath;

public DuckDBExecutor Executor { get; }

// The CPU metric is created here (eagerly, at node startup) rather than via an activated DI singleton so the
// instrument exists even without a metrics consumer, and so it samples the very executor this lifetime owns.
public DuckDBCpuMetrics CpuMetrics { get; }

public DuckDBExecutorLifetime(
TFChunkDbConfig config,
IEnumerable<IDuckDBSetup> setups,
int workerCount,
int dispatcherCount,
string serviceName,
[CanBeNull] ILogger<DuckDBExecutorLifetime> log) {
_log = log ?? NullLogger<DuckDBExecutorLifetime>.Instance;

var path = config.InMemDb ? GetTempPath() : $"{config.Path}/kurrent.ddb";
var memoryMib = (int)(GC.GetGCMemoryInfo().TotalAvailableMemoryBytes / 1024 / 1024 * 0.25); // 25% of RAM, as before
var connectionString = $"Data Source={path};memory_limit={memoryMib}MB";

Executor = new DuckDBExecutor(connectionString, workerCount, dispatcherCount);
try {
CpuMetrics = new DuckDBCpuMetrics(new Meter(DuckDBCpuMetrics.MeterName, "1.0.0"), serviceName, Executor.SampleCpu);
_log.LogInformation("DuckDB executor started at {path}: {workers} workers, {dispatchers} dispatchers, memory_limit {memory}MB",
path, workerCount, dispatcherCount, memoryMib);

// One-time setups (IndexingDbSchema, SchemaDbSchema) — effects are database-wide, so running
// them once on any executor-owned connection preserves today's semantics exactly.
Executor.Execute(connection => {
foreach (var setup in setups)
setup.Execute(connection);
return 0;
}, CancellationToken.None).AsTask().GetAwaiter().GetResult();
} catch {
// A setup failure (e.g. a schema-migration error) throws before this object exists for anyone to
// Dispose(), so the executor's worker/dispatcher threads and open DB handle would leak. Dispose it.
Executor.DisposeAsync().AsTask().GetAwaiter().GetResult();
throw;
}

return;

string GetTempPath() {
_tempPath = Path.GetTempFileName();
File.Delete(_tempPath); // DuckDB refuses a pre-existing empty file; recreate at the same path
return _tempPath;
}
}

public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;

// Bound on the shutdown checkpoint: it runs on a dispatcher, and if streaming queries have saturated the
// dispatcher pool the checkpoint could otherwise block shutdown indefinitely. A missed checkpoint is not data
// loss — DuckDB replays its WAL on next open — so time out and continue rather than hang teardown. (The broader
// dispatcher-starvation question is tracked for the load/soak: size DispatcherThreads above the expected
// concurrent-streaming-query count, or split streaming onto its own dispatcher pool.)
private static readonly TimeSpan CheckpointTimeout = TimeSpan.FromSeconds(30);

public async Task StopAsync(CancellationToken cancellationToken) {
_log.LogDebug("Checkpointing DuckDB");
var checkpoint = Executor.Execute(connection => {
connection.Checkpoint();
return 0;
}, cancellationToken).AsTask();
try {
await checkpoint.WaitAsync(CheckpointTimeout, cancellationToken);
} catch (TimeoutException) {
_log.LogWarning("DuckDB shutdown checkpoint did not complete within {Timeout}; skipping it (the WAL is " +
"replayed on next open). This can happen if streaming queries have saturated the DuckDB dispatcher pool.",
CheckpointTimeout);
}
}

protected override void Dispose(bool disposing) {
if (disposing) {
Executor.DisposeAsync().AsTask().GetAwaiter().GetResult();
if (_tempPath != null) {
try {
File.Delete(_tempPath);
} catch (IOException) {
// let the OS clean it up
}
}
}
base.Dispose(disposing);
}
}
Loading
Loading