Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
db6bc29
feat(pool): QuestDBClient sender connection pool (Java parity)
ideoma Jun 29, 2026
048b452
use async local instead of thread local for pinned sender
ideoma Jun 29, 2026
b701b24
query pool
ideoma Jun 29, 2026
27473e0
add query pool files to git
ideoma Jun 29, 2026
832adbd
fix: Partial-construction leak in QuestDBClientImpl ctor leaks ingest…
ideoma Jun 29, 2026
6045b04
fix: Close() disposes borrowed (in-use) senders, racing the borrower'…
ideoma Jun 30, 2026
169e468
fix: Uppercase WS::/WSS:: builds a query pool whose connection string…
ideoma Jun 30, 2026
603eb0d
fix: A retired SF slot can permanently shrink pool capacity when the …
ideoma Jun 30, 2026
94c63ed
fix: BorrowAsync allocates a linked CancellationTokenSource on every …
ideoma Jun 30, 2026
784cf1b
fix: Test coverage gaps: No SF-mode concurrency stress — both multi-t…
ideoma Jun 30, 2026
ae1a8fa
remove risk of using of sender after return via Borrowed sender
ideoma Jun 30, 2026
3dd6cbc
avoid GC on rescaling decimals that fit BCL decimal range
ideoma Jun 30, 2026
7d9a0cc
add connection timeout property. Change to zero GC BCL decimal rescaling
ideoma Jun 30, 2026
cc4d46c
Make Dispose not throwing
ideoma Jul 1, 2026
4165684
support lazy_connect
ideoma Jul 1, 2026
1e3b345
fix: SF pool: reaping an idle sender can make a concurrent borrow thr…
ideoma Jul 1, 2026
72a0105
fix: standalone ws:: Send() throws on every call when close_flush_tim…
ideoma Jul 1, 2026
9e5e15d
fixes
ideoma Jul 2, 2026
8de8ad4
fix: transactional sender returned with staged rows is now discarded…
ideoma Jul 2, 2026
d6a4504
fix: Cancel() TOCTOU can cancel a different borrower's query
ideoma Jul 2, 2026
b5667ca
fix: IsManagedSlot uses lax int.TryParse, stranding lookalike orphan …
ideoma Jul 2, 2026
132713b
fix: BorrowedSender.cs — The handle implements IQwpWebSocketSender un…
ideoma Jul 2, 2026
46a875e
add missing files
ideoma Jul 2, 2026
a061006
fix flaky tests
ideoma Jul 2, 2026
ecb6eab
fix: lazy_connect conflict check rejects
ideoma Jul 2, 2026
38ab608
fix: Housekeeper reaps idle pooled WS senders with un-ACKed frames;
ideoma Jul 2, 2026
2784be4
add missing files
ideoma Jul 2, 2026
65272b6
fix sender reaper vs borrow race
ideoma Jul 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 162 additions & 6 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,12 @@ behaviours:
via `ValidateWebSocketKeys` (string-ctor path) or
`ValidateWebSocketKeysAgainstDefaults` (programmatic-init path,
default-comparison heuristic).
- `transaction=on` (WS-only): connection-scoped transactional mode —
auto-flush stages rows with `FLAG_DEFER_COMMIT`, `Send()`/`Commit()`
commits, and the server drops staged rows on connection close.
**Mutually exclusive with `sf_dir`** (`ConfigError`): SF replays
persisted frames across connections, which would re-stage and later
publish uncommitted (possibly abandoned) transactional rows.
- `auto_flush=off` zeros `auto_flush_rows` / `auto_flush_bytes` /
`auto_flush_interval` to `-1`. WS-specific defaults
(`auto_flush_rows=1000`, `auto_flush_bytes=8 MiB`, `auto_flush_interval=100ms`)
Expand All @@ -351,12 +357,162 @@ behaviours:

### Connection pooling

HTTP is thread-safe at the underlying `HttpClient` level; the Sender
itself is **not** thread-safe — one Sender per producer thread, or wrap
your own pool. There is no in-tree `LineSenderPool`; the HTTP transport
already shares `HttpClient`s under the hood via `IHttpClientFactory`
semantics in `HttpSender`. WS / SF manage their own concurrency model
(in-flight window, slot lock) and explicitly reject pooling.
A single `ISender` (or `IQwpQueryClient`) is **not** thread-safe — one per
producer thread. For multi-threaded producers there is now an in-tree pool,
`QuestDBClient` (mirrors the Java client's `QuestDB` handle), which pools
**both** ingest senders and (net7.0+) egress query clients. Files live in
`Pooling/` (public `QuestDBClient`/`IQuestDBClient`/`QuestDBClientBuilder` in
namespace `QuestDB`; internal `SenderPool`/`PooledSender`/`BorrowedSender`/`QueryClientPool`/
`PooledQueryClient`/`PoolHousekeeper`/`QuestDBClientImpl` in `QuestDB.Pooling`).
The public `Query` builder (namespace `QuestDB`, `Query.cs`) is the query-side
surface.

- **Entry points** mirror `Sender`/`ISender`: `QuestDBClient.Connect(confStr)`
or `QuestDBClient.Builder()…Build()` returns `IQuestDBClient`. Construct
once, share across threads. For distinct ingest/query endpoints use
`QuestDBClient.Connect(ingestConfStr, queryConfStr)` or
`Builder().IngestConfig(...).QueryConfig(...)` (Java `connect(ingest, query)`
parity); a single `ws`/`wss` `FromConfig` string serves both pools.
- **Borrow / return**: `BorrowSender()` (+ `BorrowSenderAsync`) returns an
`ISender` that is a fresh per-borrow `BorrowedSender` handle wrapping a
reusable `PooledSender` pool entry. **Dispose does not send** — it is pure
resource release: it discards any buffered-but-unsent rows (`Clear`),
**returns the entry to the pool** (it does NOT close the underlying sender),
and **never throws**. Call `Send()`/`SendAsync()` to hand rows to the
transport, and `Flush(timeout)`/`FlushAsync(timeout)` (drain = send + await
ACK) for delivery confirmation, **before** disposing; a sender whose buffer
can't be cleared (terminally failed) is discarded instead of re-pooled, as is
a transactional (`transaction=on`) ws sender still owing a commit for
auto-flushed rows staged server-side under `FLAG_DEFER_COMMIT` (the
`IPooledTransactionalSender` seam) — QWP has no rollback, so re-pooling the
live connection would let the next borrower's first commit publish the
abandoned rows; discarding closes the connection, which drops them. This is
airtight only because `transaction=on` + `sf_dir` is rejected at config
validation — SF replay would otherwise resurrect the discarded borrower's
deferred frames into the successor slot's connection.
**Pooled WS `Send()` is fast flush-to-ring** (the pool sets
`SenderOptions.SendAwaitsAck=false`): the pooled connection ships async after
return and delivery is confirmed via the pool-wide `Flush` below — unlike a
**standalone** WS `Send()`, which drains (`SendAwaitsAck=true` default) so
`Send()`-before-`Dispose` delivers on its own. The
handle is **use-after-return safe**: once disposed every ingest member throws
`ObjectDisposedException` (so a stale reference can't alias the entry a later
borrower now holds), and a second dispose is a tolerated no-op. There is no
context-affine / pinned-sender API — borrow per unit of work and dispose to
return (a single `ISender` is not thread-safe, so never share a borrowed one
across threads).
- **Pool-wide drain**: `IQuestDBClient.Flush(timeout)` / `FlushAsync(timeout)`
(→ `SenderPool.Flush`) fans `ISender.Flush` across every pooled sender — the
quiescence barrier for "confirm everything landed, then mark done". Call it
after all borrowed senders are returned; it does not synchronise against a
concurrent borrow. Returns `true` only if every sender drained within the
timeout. `close_flush_timeout_millis` is the default timeout for the no-arg
`Flush()` overloads (it no longer governs Dispose, which does not drain).
- **Sizing**: elastic between `sender_pool_min` and `sender_pool_max`,
bounded by a `SemaphoreSlim` capacity gate (counts in-use senders;
creation happens outside the lock). Idle senders sit in an
idle-time-sorted deque: borrowers pop/push the hot end (LIFO reuse, so
the excess goes genuinely cold and shrinks toward `min`), and the
housekeeper's `ReapIdle` sweeps from the cold end, stopping at the
first entry inside its timeout — O(reaped), not O(idle). `max_lifetime`
follows `CreatedAtUtc` (which the deque is NOT sorted by — an entry
can cross it while parked), so it runs as a separate walk gated on
`_idleOldestCreatedUtc`, a conservative lower bound over parked
entries' creation times that only fires when something actually
crossed the lifetime.
`BorrowSender` blocks up to `acquire_timeout_ms` then throws
`IngressError(ErrorCode.PoolExhausted)`. Reaping is **gated on full
drain**: an idle ws sender whose cursor-engine ring still holds
un-acked frames is skipped by both the idle and max-lifetime paths
(the idle sweep re-parks it at the hot end with a refreshed
`IdleSinceUtc`, so the idle clock effectively restarts at full-drain;
the age walk just leaves it for the next sweep), because tearing it down
would, in RAM mode, free the ring and silently drop those frames — the
pool-wide `Flush` can't cover an already-reaped sender. Surfaced via the net-agnostic `IPooledDrainAwareSender.IsFullyDrained`
seam (implemented by `QwpWebSocketSender` → `QwpCursorSendEngine.IsFullyDrained`;
HTTP/TCP deliver synchronously and are always drained). There is **no
bound**: a permanently wedged sender that never drains lives until the
pool is closed (data retained, not silently dropped) — consistent with
the "Flush before close" contract, since Dispose does not drain.
- **Config keys** (all-protocol, on `SenderOptions`, `[JsonIgnore]`d out of
`ToString` so a plain sender round-trips byte-identically): `sender_pool_min`
(1), `sender_pool_max` (4), `acquire_timeout_ms` (5000), `idle_timeout_ms`
(60000), `max_lifetime_ms` (1800000), `housekeeper_interval_ms` (5000),
`lazy_connect` (off). Validated by `SenderOptions.ValidatePoolOptions()`.
- **Tolerant startup (`lazy_connect=on`)**: a facade-only pool key (also
`QuestDBClientBuilder.LazyConnect()`) — in `QwpConnectStringKeys.Shared`, so a
plain `Sender` parses and ignores it on every scheme. It lets
`QuestDBClient` build while the server is down: the `ws`/`wss` ingest side
connects asynchronously (buffering writes) and the read pool defaults
`query_pool_min=0` so nothing connects eagerly — the read pool stays
**enabled** and a query connects lazily on first `NewQuery`. `QuestDBClientBuilder.Build`
reads it (`ResolveLazyConnect`), injects `initial_connect_retry=async` for the
pooled ws senders (via `SenderPool`'s `forceWsAsyncConnect`) when unset, and
rejects a conflicting blocking-startup knob up front (`IngressError`/`ConfigError`):
an explicit `initial_connect_retry` other than `async`, or an explicit
`query_pool_min > 0` (connect string or `QueryPoolMin(...)` builder call, tracked
across ingest + separate-query configs). Mirrors java-questdb-client `lazy_connect`.
- **Query pooling (net7.0+)**: `IQuestDBClient.NewQuery()` returns a fresh
`Query` builder (`Sql`/`Binds`/`Handler` then `ExecuteAsync`/`Execute`);
`ExecuteSqlAsync(sql, handler, ct)` is the convenience shortcut. There is
**no** public borrow/release for queries — the client lease is implicit per
`ExecuteAsync` and self-returns in a `finally` (Java `newQuery()`/
`executeSql()` parity; Java's thread-local `query()` is **not** ported). The
`Task` returned by `ExecuteAsync` replaces Java's `Completion`. A clean return
re-pools the client (`GiveBack`); any throw — transport/protocol or a hard
`CancellationToken` cancel (which is permanently terminal) — discards it
(`MarkBroken` → `DiscardBroken`), since the egress client's terminal state is
sticky with no reset. `Query.Cancel()` is the **cooperative** path (forwards to
the in-flight client's `Cancel()`; the query ends normally and the client is
re-pooled). `PooledQueryClient` also checks an internal `IPooledQueryClientInner.
IsTerminalOrDisposed` seam (implemented by `QwpQueryWebSocketClient`) before
re-pooling. The query pool is the **stripped sibling** of `SenderPool` — no
SF/slot/leak-debt machinery (read side has no store-and-forward) and no
AsyncLocal pin. Query-pool config: `query_pool_min` (1), `query_pool_max` (4)
on `SenderOptions` (`[JsonIgnore]`, in `QwpConnectStringKeys.Shared` so both
`SenderOptions` and `QueryOptions` accept them), validated by the separate
`ValidateQueryPoolOptions()` (kept out of `EnsureValid` so a plain `Sender`
stays lenient); the acquire/idle/lifetime/housekeeper knobs are **shared** with
the sender pool, and one `PoolHousekeeper` sweeps both. The query pool is built
only when a `ws`/`wss` query config is present (single `ws` string, explicit
`QueryConfig`, or `Connect(ingest, query)`); an `http`/`tcp` ingest handle with
no query config throws `IngressError(ConfigError)` on `NewQuery`. All query
members are gated `#if NET7_0_OR_GREATER` (so `IQuestDBClient` differs by TFM);
net6.0 keeps the sender-only surface.
- **Store-and-forward**: when pooling `ws::`+`sf_dir`, each pooled sender
gets a distinct slot identity `sender_id = <base>-<index>` (via
`SenderPool.ApplySlotIdentity`) so siblings never collide on a slot
directory / flock. Free indices live in a LIFO stack (most recently freed
reused first, keeping the on-disk slot-directory working set compact). A
discarded/reaped sender's index is freed only after
`IPooledSlotSender.IsSlotLockReleased` confirms the lock dropped (a
net-agnostic seam implemented by `QwpWebSocketSender`, backed by
`QwpSlotLock.IsReleased`); otherwise it is **retired** (`_retired`) and one
capacity permit stays physically withheld on its behalf, shrinking effective
`max` (a discarded sender's own permit is simply not released; the reaper
competes through the same gate as borrowers — it takes a free permit up
front via `TryWithholdPermit` and **stops the sweep** when a mid-borrow
thread already drained the last one — that borrower reuses the idle
sender instead, so a legitimate borrow never sees a spurious `PoolExhausted`).
This shrink is **not permanent**: when a wedged/deferred engine teardown
(`QwpCursorSendEngine.Dispose` defers `ReleaseSharedResources` past its 5 s
pump-join budget) leaves the lock still held at dispose time, the housekeeper's
`ReclaimRetiredSlots` re-tests `IsInnerSlotLockReleased` each sweep — once it
flips true the index is freed and the withheld permit released
(`_capacity.Release()`). So `LeakedSlotCount` (= `_retired.Count`) is a live
gauge of currently-retired slots, not a monotonic counter. Pooled
senders pass their managed family to `QwpOrphanScanner.ClaimOrphans(...,
managedBase, managedCount)` so orphan adoption skips live/future siblings
but still drains true out-of-family orphans (when `drain_orphans=on`).
In-range stranded slots recover lazily — a pooled sender replays its own
slot's segments on open (`QwpSegmentRing.Open`) when the index is
(re)allocated. **Known limitation:** unlike the Java pool, there is no
proactive housekeeper-driven two-pass startup drain of in-range slots
that are not currently allocated (e.g. a pool that permanently shrank);
their data stays on disk until a future run reallocates the index.
- HTTP still shares `HttpClient`s under the hood per address inside
`HttpSender`; the pool is layered above the `ISender` seam and is
protocol-agnostic.

### Value types

Expand Down
Loading
Loading