Skip to content

feat(pool): QuestDBClient sender connection pool (Java parity)#76

Draft
ideoma wants to merge 28 commits into
mainfrom
feat/sender-pool
Draft

feat(pool): QuestDBClient sender connection pool (Java parity)#76
ideoma wants to merge 28 commits into
mainfrom
feat/sender-pool

Conversation

@ideoma

@ideoma ideoma commented Jun 29, 2026

Copy link
Copy Markdown
Collaborator

Summary

Adds QuestDBClient, an elastic, thread-safe pool of ISender instances mirroring the Java client's QuestDB handle. Construct once, share across threads; BorrowSender() (+ BorrowSenderAsync) leases a sender that flushes and returns to the pool on dispose (not disconnect). A background housekeeper reaps idle / over-age senders. On net7.0+ the same handle also pools egress query clients (NewQuery() / ExecuteSqlAsync). Also adds lazy_connect for tolerant startup (start with the server down, buffer writes, read once it's up).

await using var client = QuestDBClient.Builder()
    .FromConfig("http::addr=localhost:9000;").SenderPoolMax(8).Build();
using (var s = client.BorrowSender())
    s.Table("trades").Column("px", 1.0).At(DateTime.UtcNow);

Borrow semantics. A borrowed sender is leased for one unit of work and is not thread-safe. Dispose flushes pending rows and returns it to the pool, where another thread may immediately re-borrow it — so do not touch a borrowed sender after disposing it, do not cache it past the using scope, and never share a live one across threads. The handle is use-after-return safe: once disposed it is inert (not aliased), so every ingest member throws ObjectDisposedException and a stale reference can never reach — or corrupt — the entry the pool has since lent to a different borrower; a second dispose is a tolerated no-op. That guard makes accidental misuse fail loudly, but it does not make a live borrowed sender shareable. There is no thread-affine / pinned-sender API — borrow per unit of work.

Highlights

  • Sender pool: 6 config keys on SenderOptions (sender_pool_min/max, acquire_timeout_ms, idle_timeout_ms, max_lifetime_ms, housekeeper_interval_ms), [JsonIgnore]d so a plain sender round-trips byte-identically. SemaphoreSlim capacity gate, create-outside-lock, elastic min/max, acquire-timeout (PoolExhausted).
  • Query pool (net7.0+): the same handle pools IQwpQueryClients — NewQuery() / ExecuteSqlAsync(...), with the client lease held implicitly per execution (no public borrow/release). Adds query_pool_min/max; the acquire/idle/lifetime/housekeeper knobs are shared with the sender pool and swept by one housekeeper.
  • Tolerant startup (lazy_connect) — a pool-facade connect-string flag (also QuestDBClientBuilder.LazyConnect()) that lets the handle build while the server is down. The ws/wss ingest side connects asynchronously (buffering writes meanwhile) and the read pool defaults query_pool_min=0 so nothing connects eagerly — the read pool stays enabled and a query connects lazily on first NewQuery once the server is up. Build() (ResolveLazyConnect) injects initial_connect_retry=async for the pooled ws senders when unset, and rejects a conflicting blocking-startup knob up front (IngressError/ConfigError): an explicit initial_connect_retry other than async, or an explicit query_pool_min > 0 (connect string or QueryPoolMin(...) builder call, tracked across ingest + separate-query configs). Lives in QwpConnectStringKeys.Shared, so a plain Sender parses and ignores it. Mirrors java-questdb-client lazy_connect.
  • Store-and-forward: per-slot sender_id=<base>-<index>, slot bitmap with lock-release-aware reclaim/retire (unified leak-debt capacity accounting).
  • Two production gaps closed (both absent in C#): QwpSlotLock.IsReleasedQwpWebSocketSender.IsSlotLockReleased (net-agnostic IPooledSlotSender seam); QwpOrphanScanner exclude-managed-slots filter.
  • Pooled ws::/wss:: senders are castable to IQwpWebSocketSender (full QWP column set, Ping, seqTxn watermarks).

Out of scope (documented)

  • Proactive housekeeper-driven startup drain of in-range SF slots not currently allocated — those recover lazily on reallocation.

Test plan

  • ~80 new pool unit tests under src/net-questdb-client-tests/Pooling/ (sender + query pool: config, borrow/return, exhaustion, reaping, SF slot allocation/reclaim/retire, error-safety) plus PoolOptionsTests at the test-project root.
  • lazy_connect coverage: LazyConnectTests (flag parsing, accepted-on-every-scheme, ToString round-trip, ResolveLazyConnect defaulting/conflict logic, and facade builds/conflicts through QuestDBClient.Connect / builder) plus LazyConnectRecoveryTests — end-to-end down→up→restart against an in-process QWP server whose lifetime the test controls: start the client before the server (writes buffer), bring it up (buffered writes drain), then restart the server mid-stream on the same port (the engine reconnects to the fresh instance and replays only the un-acked frames). DummyQwpServer gains a fixed-Port option (defaults to random, so existing tests are unchanged).
  • Full non-integration suite (net10.0): 1698 passed, 0 failed, 51 skipped.
  • Compiles on net6.0–net10.0 (lib + examples + benchmarks); SF/WS paths use the net-agnostic seam so the pool builds without the net7-only WS type.

🤖 Generated with Claude Code

Adds an elastic, thread-safe pool of ISender instances mirroring the Java
client's QuestDB handle: QuestDBClient.Connect / Builder, BorrowSender
(+async), thread-affine Sender()/ReleaseSender(), and a housekeeper that
reaps idle / over-age senders. Pooled senders return on dispose (flush +
give back), not disconnect.

- Config keys on SenderOptions (sender_pool_min/max, acquire_timeout_ms,
  idle_timeout_ms, max_lifetime_ms, housekeeper_interval_ms), JsonIgnore'd
  so plain senders round-trip unchanged.
- SemaphoreSlim capacity gate, create-outside-lock, elastic min/max.
- Store-and-forward: per-slot sender_id=<base>-<index>, slot bitmap with
  lock-release-aware reclaim/retire (unified leak-debt capacity accounting).
- Two production gaps closed: QwpSlotLock.IsReleased ->
  QwpWebSocketSender.IsSlotLockReleased (via IPooledSlotSender seam), and
  QwpOrphanScanner exclude-managed-slots filtering.
- Pooled ws:: senders castable to IQwpWebSocketSender for the full QWP set.
- ~80 unit tests; example-sender-pool; CLAUDE.md updated.

Egress query pooling and proactive in-range SF startup recovery are not
included (documented limitations).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 29, 2026

Copy link
Copy Markdown

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1a744205-6046-4779-bf50-80ea6ce3a7ce

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/sender-pool

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

ideoma added 23 commits June 29, 2026 14:36
… connections (and SF flocks) when the query pool fails to construct
…engine defers its lock release past the 5 s teardown budget
…hreaded tests run on HTTP pools, so AllocateSlotIndex/RetireSlotIndex/SettleLeakDebtLocked (the most race-prone code) are never exercised under contention
…conditionally, so the documented capability probe lies for HTTP/TCP pools
@ideoma

ideoma commented Jul 2, 2026

Copy link
Copy Markdown
Collaborator Author

Code review — QuestDBClient connection pool (level 2)

Verdict: Approve with minor changes. No critical or data-loss/leak/corruption defect survived verification. The permit/leak-debt accounting, use-after-return guard, and the Cancel-TOCTOU fix are all correct on independent trace. Six review passes (correctness/concurrency, borrowed-handle lifecycle, config/lazy_connect, query pool, tests, adversarial fresh-context) plus a line-by-line read of the core pool files all corroborate a clean core. Findings below are 3 Moderate + Minor.

Moderate

1. lazy_connect conflict check rejects a config the error message itself tells you to writeQuestDBClientBuilder.cs:233,272 + ResolveLazyConnect:325
queryPoolMinExplicitPositive is a sticky OR that is never lowered when a higher-precedence source overrides query_pool_min back to 0:

QuestDBClient.Builder().FromConfig("ws::addr=x;query_pool_min=3;")
    .LazyConnect().QueryPoolMin(0).Build();   // throws ConfigError

The effective query_pool_min is 0 (the builder call won, line 270), yet the conflict check fires on the stale flag — and the thrown message says "drop query_pool_min or set it to 0", which is exactly what .QueryPoolMin(0) did. This contradicts both the builder's documented "builder knob wins" precedence and its own remediation advice. Test the final effective query_pool_min > 0 together with IsQueryPoolMinExplicit, not a monotonic OR.

2. Shared pool timing knobs set only in a separate query config are silently droppedQuestDBClientBuilder.cs:250-266
On the two-config path, queryOpts is parsed but only query_pool_min/max + lazy_connect are copied back; acquire_timeout_ms, idle_timeout_ms, max_lifetime_ms, housekeeper_interval_ms from the query string are accepted (they're Shared keys, so no error) and then discarded — QueryClientPool is sized entirely from the ingest-derived poolConfig.

QuestDBClient.Connect("http::addr=x;", "ws::addr=y;acquire_timeout_ms=200;idle_timeout_ms=1000");

The query pool silently uses 5000/60000 (ingest defaults), not 200/1000. Either honor these from the query config or reject them there — accept-and-ignore is silent misconfiguration.

3. Cooperative Query.Cancel() during a failover reconnect throws + discardsQwpQueryWebSocketClient.cs:216,222,232 + Query.cs:142-147
Documented cooperative contract: "query ends normally and the client is re-pooled." But if Cancel() sets _cancelTargetRid == requestId while a mid-stream failover round is active, the loop throws OperationCanceledException("query cancelled during failover"); the finally then MarkTerminal()s (both suppress-flags false), and Query.ExecuteAsync's catch MarkBroken()s → the client is discarded, and the caller of the graceful API gets an OCE. Narrowly reachable (failover=on multi-addr egress + concurrent transport failure + cancel), and defensible since the connection was rebuilding anyway — but it's a real deviation on both axes. Worth at least a doc note.

Minor

  • NullableColumn null-path skips the disposed guardISender.cs:342-501. These default-interface methods aren't overridden by BorrowedSender; on a null value they return this without hitting a gated member, so disposedHandle.NullableColumn("x",(long?)null) silently no-ops instead of throwing ObjectDisposedException. Benign (no aliasing of the reused entry), but violates the class doc's "every ingest member throws after return." Gate them on Active().
  • Blanket MarkBroken() churns a reusable connectionQuery.cs:142-147. The inner client deliberately stays non-terminal after a result-handler throw whose post-throw drain succeeded (_drainOkAfterHandlerThrow) so it can be reused — but the unconditional MarkBroken() forces a discard in exactly that case. Redundant for correctness (the IsTerminalOrDisposed seam already discards every genuine failure); consider scoping it to non-handler throws.
  • ValidatePoolOptions() runs on plain http/tcp sendersSenderOptions.cs:613. Sender.New("tcp::addr=x:9009;sender_pool_min=2;sender_pool_max=1;") throws, though pool keys are documented "parsed & ignored" on non-pooled schemes. Over-strict but fails loud; defaults pass.
  • Reaping uses wall-clock DateTime.UtcNow, not a monotonic clockSenderPool.cs:619,633-634, QueryClientPool.cs:445,459-460. A backward clock step stalls reaping until the clock catches up; a forward step (NTP, VM resume) makes every above-min entry simultaneously over-age → mass close/reconnect storm. The PeriodicTimer cadence is already monotonic; only the reap decision is affected. Stopwatch.GetTimestamp() / Environment.TickCount64 would fix it.
  • No-arg pool Flush() always returns false when close_flush_timeout_millis == 0SenderPool.cs:108,351,382. The no-arg overload uses close_flush_timeout_millis as its timeout; AwaitAckedFsnAsync(fsn, TimeSpan.Zero) can never observe an ACK, so the pool-wide delivery barrier permanently reports failure. A user who sets 0 to make pooled Send() fast and then relies on no-arg Flush() for confirmation gets a silent permanent false. Callers can pass an explicit Flush(timeout); worth a doc note.
  • IQuestDBClient.Flush drains in-use (borrowed) senders with no borrow synchronizationSenderPool.cs:417-423. Documented quiescence-barrier caveat, but reframed as a public-API foot-gun: "flush everything" overlapping an active borrow is a data race on the non-thread-safe inner buffer. Nothing guards it.

Test coverage gaps

  • No test forces the injected factory to throw on the Nth borrow — the runtime create-failure permit+slot release (SenderPool.cs:456-466 / QueryClientPool.cs:293-296) is the exact leak the accounting exists to prevent, and only the prewarm-throw path is covered.
  • Zero cancellation-token coverage on Borrow/Flush: the linked-token / Closed()-vs-OCE branches and the Flush AggregateException→rethrow-OCE unwrap are untested.
  • ConcurrentBorrowReturnAndCloseDoNotThrow asserts only "no exception," never permit post-state (the plain-path close-race has no accounting invariant check, unlike the SF path).
  • Flush-returns-false is only exercised via a throwing sender, never a genuine non-throwing timeout (FakeSender.Flush ignores its timeout arg).

Verified clean (dismissed after source verification)

  • Permit / leak-debt accounting — balanced on every path (invariant available + in-use + retired − debt = max); no double-release, permanent leak, index-reuse-while-locked, or lock-ordering deadlock. The only under-lock semaphore op is the non-blocking Wait(0).
  • Use-after-return — every BorrowedSender/BorrowedQwpSender member routes through Active(); double-dispose is a true no-op.
  • Cancel TOCTOU — fix correct and complete: lease+rid read under _cancelGate precedes the clear-before-return, SendCancelAsync re-checks the rid under _sendLock, and _cancelTargetRid uses a never-regress CAS over monotonic per-client rids.
  • Transactional discard (Clear() doesn't reset _hasDeferredMessages, so the owes-a-commit check stays accurate), SF slot identity + IsManagedSlot lookalike rejection (default-03), construction-failure teardown ordering, PoolHousekeeper fault-swallowing / no unobserved task exceptions, SemaphoreSlim dispose-vs-blocked-waiter — all correct.
  • ColumnDecimal64(long, byte) removal — intentional raw-limb removal; the decimal overload remains. Not a regression.

All findings are in-diff. 🤖 Generated with Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant