diff --git a/Cargo.lock b/Cargo.lock index 5d416939..a5e7b261 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1875,7 +1875,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pg_doorman" -version = "3.4.0" +version = "3.5.2" dependencies = [ "ahash", "arc-swap", @@ -1902,6 +1902,7 @@ dependencies = [ "num_cpus", "once_cell", "openssl", + "openssl-sys", "pam-client", "parking_lot", "pin-project", @@ -1915,6 +1916,7 @@ dependencies = [ "rand 0.9.1", "reqwest", "scopeguard", + "sd-notify", "serde", "serde-toml-merge", "serde_derive", @@ -2489,6 +2491,15 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sd-notify" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b943eadf71d8b69e661330cb0e2656e31040acf21ee7708e2c238a0ec6af2bf4" +dependencies = [ + "libc", +] + [[package]] name = "sdd" version = "3.0.10" diff --git a/Cargo.toml b/Cargo.toml index 4f16d782..4e1b2200 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pg_doorman" -version = "3.5.1" +version = "3.5.2" edition = "2021" rust-version = "1.87.0" license = "MIT" diff --git a/documentation/en/src/changelog.md b/documentation/en/src/changelog.md index 47674862..d9d536b9 100644 --- a/documentation/en/src/changelog.md +++ b/documentation/en/src/changelog.md @@ -1,5 +1,37 @@ # Changelog +### 3.5.2 Apr 21, 2026 + +#### Semaphore permit leak on direct handoff + +Each `return_object` handoff (delivering a connection to a waiting client via oneshot channel) permanently consumed one semaphore permit. After `max_size` handoffs the pool semaphore was fully drained, blocking all new `timeout_get` callers. The pool could not create connections and stabilized at whatever size it reached during cold start (typically 4-8 out of 40). + +Root cause: `wrap_checkout` calls `permit.forget()`, and the handoff path in `return_object` skipped `add_permits(1)`. Now `return_object` restores the permit on both the handoff and idle-queue paths. Compensating `add_permits(1)` in `pre_replace_one` removed (no longer needed). + +#### Burst gate select race + +The `tokio::select!` in the burst gate loop randomly picked among ready branches. When `sleep(5ms)` or `create_done` won over an already-delivered oneshot, the connection was silently dropped, inflating `slots.size` without a live server. Fixed with `biased;` (oneshot checked first) and a `try_recv` drain that pushes orphaned connections to idle without double-counting the permit. + +#### Migration fixes + +- **Client ID collision after migration.** The new process started its connection counter at 0, colliding with migrated client IDs. Now the counter advances past the highest migrated ID. + +- **SCRAM passthrough state preserved.** The ClientKey from the first client's SCRAM handshake is serialized in the migration payload (v2 format, backward compatible). The new process skips the `ScramPending` fallback to `server_password`. + +#### Session mode statistics fix + +`xact_time` percentiles in session mode showed the entire session duration instead of individual transaction time. Now recorded per-transaction at each `ReadyForQuery(Idle)`, matching transaction mode semantics. + +`query_time` had the same accumulation bug: the timer was set once before the inner loop and never reset, so each subsequent query reported the cumulative session duration. Now reset per-query in session mode. + +#### Adaptive anticipation budget + +Anticipation wait (formerly fixed 300-500ms) scales with real transaction latency: `xact_p99 * 2 +/- 20%` jitter, clamped to [5ms, 500ms]. Cold start default: 100ms. + +#### Diagnostic logging + +Slow checkout warnings (>500ms) now include pool state: `size`, `avail`, `waiting`, `inflight`, `creates`, `gate_waits`, `antic_ok`, `antic_to`, `fallback`. Phase-specific warnings added for semaphore timeout, burst gate timeout, coordinator exhaustion, and create failure. + ### 3.5.1 Apr 20, 2026 #### systemd Type=notify support diff --git a/documentation/en/src/tutorials/pool-pressure.md b/documentation/en/src/tutorials/pool-pressure.md index 0fad9fbe..96e30191 100644 --- a/documentation/en/src/tutorials/pool-pressure.md +++ b/documentation/en/src/tutorials/pool-pressure.md @@ -163,14 +163,21 @@ dropped. `return_object` detects the dropped receiver (`send` returns This way timed-out waiters are cleaned up lazily without a separate garbage-collection pass. -The deadline is `min(query_wait_timeout - 500 ms, PHASE_4_HARD_CAP)` -where `PHASE_4_HARD_CAP` is randomly chosen between **300 ms and -500 ms** (uniform jitter) for each checkout attempt, measured against -a timestamp captured at the top of `timeout_get`. Phase 1/2 semaphore -wait consumes from the same budget, so the cumulative wait across -phases cannot exceed the caller's `query_wait_timeout`. - -The jitter prevents a **timeout cliff**: without it, N clients that +The deadline is adaptive: `min(query_wait_timeout - 500 ms, adaptive_cap)` +where `adaptive_cap` is derived from real transaction latency: + +| Pool state | Budget | Example | +|------------|--------|---------| +| Cold start (no stats) | 100ms ± 20% jitter | 80-120ms | +| Steady state | xact_p99 × 2 ± 20% jitter | p99=0.7ms → 5ms (min); p99=50ms → 100ms | +| High latency | Capped at 500ms | p99=300ms → 500ms | + +The budget is measured against a timestamp captured at the top of +`timeout_get`. Phase 1/2 semaphore wait consumes from the same budget, +so the cumulative wait across phases cannot exceed the caller's +`query_wait_timeout`. + +The ±20% jitter prevents a **timeout cliff**: without it, N clients that entered Phase 4 at the same instant all exit simultaneously and stampede into the burst gate, creating N new backend connections for a pool that needs far fewer. With jitter, clients exit in staggered @@ -183,9 +190,11 @@ to the idle queue for recycling. connects. If a slot is free, take it and call `connect()` against PostgreSQL. If all slots are full, register a direct-handoff oneshot waiter and also listen for `create_done` (another in-flight create -finishing). If a connection arrives via the oneshot channel, recycle it -and return. Otherwise, re-try the recycle and the gate after the wake. -A 5 ms backoff acts as a safety net if both wake sources are missed. +finishing). The `select!` uses `biased;` to always check the oneshot +first, preventing a race where `create_done` or the 5 ms backoff timer +wins and silently drops the delivered connection. If a connection +arrives via the oneshot channel, recycle it and return. Otherwise, +re-try the recycle and the gate after the wake. **Phase 6 — Backend connect.** Run `connect()`, authenticate, hand the connection to the client. The burst slot is released automatically when diff --git a/documentation/ru/pool-pressure.md b/documentation/ru/pool-pressure.md index 9cefa2a1..86f27842 100644 --- a/documentation/ru/pool-pressure.md +++ b/documentation/ru/pool-pressure.md @@ -2,16 +2,17 @@ **Pool pressure** (давление на пул) описывает, как pg_doorman ведёт себя, когда множество клиентов одновременно запрашивают -backend-соединение, а в idle-пуле пусто. **Idle pool** здесь значит -внутреннюю очередь свободных, готовых к выдаче серверных соединений; -«idle» означает «открыто, но никем не используется прямо сейчас». +backend-соединение, а в **idle-пуле** (очереди свободных, готовых к +выдаче серверных соединений; «idle» означает «открыто, но никем не +используется прямо сейчас») пусто. Решение о том, кто получит соединение, кто подождёт, кто инициирует свежий `connect()` к PostgreSQL, а кому будет отказано, принимают два механизма. Локально в каждом пуле `(database, user)` работают -**anticipation** (ожидание возврата соединения от соседа прежде чем -тратить ресурсы на новый `connect()`) и **bounded burst gate** -(жёсткий лимит на одновременные backend-`connect()` внутри одного -пула). Поверх них при необходимости подключается **coordinator**, +**упреждающее ожидание (anticipation)** — ожидание возврата соединения +от соседа прежде чем тратить ресурсы на новый `connect()` — и +**ограничитель всплесков (bounded burst gate)** — жёсткий лимит на +одновременные backend-`connect()` внутри одного пула. Поверх них при +необходимости подключается **координатор (coordinator)**, общий для всех пулов ограничитель суммарного числа backend-соединений к одной базе данных. @@ -32,7 +33,7 @@ backend-соединение, а в idle-пуле пусто. **Idle pool** зд хвосте распределения, p99/p99.9, то есть редкие, но самые медленные запросы) уже подключённых клиентов растёт, потому что postmaster PostgreSQL занят порождением backend'ов вместо выполнения запросов. -Это **thundering herd** («стадо в панике»): ситуация, когда множество +Это **thundering herd** (лавинообразный эффект): ситуация, когда множество независимых задач одновременно реагируют на одно состояние и устраивают шторм одинаковых запросов к общему ресурсу. В нашем сценарии это 196 одновременных `connect()` к одному PostgreSQL listener'у @@ -61,7 +62,7 @@ PostgreSQL: 196 spawning backends + 4 running queries небольшим числом уже идущих backend-connect-ов. Частота `connect()` к PostgreSQL остаётся ограниченной даже при всплесках клиентов. -## Plain pool mode +## Режим простого пула (plain pool mode) Этот режим работает, когда `max_db_connections` не задан. Пулы независимы, общей координации между ними нет, давление управляется @@ -81,17 +82,17 @@ backend-соединение. Второй делает то же самое, т холодный, цена ожидания выше цены коннекта, и клиенты не могут конкурировать за idle-соединения, которых не существует. -Выше порога активируется **anticipation zone** (зона активного -ожидания возврата): pg_doorman считает, что пул уже разогрет и кто-то +Выше порога активируется **зона упреждающего ожидания (anticipation zone)**: +pg_doorman считает, что пул уже разогрет и кто-то из параллельных клиентов скоро вернёт занятое соединение, поэтому имеет смысл подождать его возврата вместо того, чтобы тратить ресурсы на новый `connect()`. Когда клиент не находит соединения в idle pool, pg_doorman сначала пытается перехватить такой возврат. Третья зона накладывается поверх обеих: при любом размере пула, если -`inflight_creates` достигает `scaling_max_parallel_creates` (по -умолчанию 2), для новых создаваемых соединений пул упирается в -лимит burst gate. Дополнительные вызовы ждут свободный слот независимо +число соединений, создаваемых прямо сейчас, достигает +`scaling_max_parallel_creates` (по умолчанию 2), пул упирается в +лимит ограничителя всплесков. Дополнительные вызовы ждут свободный слот независимо от того, сколько idle-соединений существует. ``` @@ -106,23 +107,24 @@ Pool size: 0 ----------- 8 ---------------------------- 40 | size < | size >= warm_threshold | | warm_thr | | | | | - | Skip | Phase 3: fast spin | - | phases 3 | Phase 4: direct handoff | - | and 4. | (oneshot channel, deadline | - | Go straight| = wait_timeout - 500ms, | - | to phase 5 | shared w/ Phase 1/2 wait) | - | (burst gate| Then phase 5 | + | Пропускает | Фаза 3: быстрый опрос | + | фазы 3 | Фаза 4: прямая передача | + | и 4. | (ожидание возврата от | + | Сразу к | соседа, дедлайн = | + | фазе 5 | wait_timeout − 500ms) | + | (огранич. | Затем фаза 5 | | + connect) | | - Burst-capped state (orthogonal) - ------------------------------- + Ограничение параллельных создаваемых соединений + (ортогонально размеру пула) + ----------------------------------------------- -inflight_creates: 0 ---- 1 ---- 2 (= scaling_max_parallel_creates) - ^ - | At cap: any caller reaching the - | burst gate registers a handoff - | waiter and listens for a peer - | create completion. +Создаваемых сейчас: 0 ---- 1 ---- 2 (= scaling_max_parallel_creates) + ^ + | На лимите: новый вызов встаёт + | в очередь на получение + | возвращённого соединения и ждёт + | завершения чужого создания. ``` Зоны warm и anticipation отслеживают *текущий размер пула*. Отдельно @@ -142,80 +144,94 @@ inflight_creates: 0 ---- 1 ---- 2 (= scaling_max_parallel_creates) **Фаза 1 — горячий путь recycle.** Берём первое соединение из **idle-очереди** (двусторонняя очередь свободных серверных соединений -в памяти пула; «голова очереди» — это то соединение, которое было -возвращено раньше всех). Если оно проходит recycle-проверку, отдаём -его клиенту. -Recycle-проверка откатывает любые открытые транзакции, запускает -liveness probe если соединение простояло дольше -`server_idle_check_timeout`, и сверяет reconnect-эпоху соединения с -текущей эпохой пула. Пул увеличивает свою reconnect-эпоху по admin -команде `RECONNECT` и при обнаруженных сбоях backend-а; соединения, -созданные до увеличения, эту проверку не проходят и удаляются вместо -возврата клиенту. Здоровый пул в установившемся режиме идёт только -этим путём. Стоимость: захват мьютекса и recycle-проверка. +«голова очереди» — это то соединение, которое было возвращено раньше +всех). Если оно проходит проверку пригодности (recycle), отдаём его +клиенту. +Проверка пригодности откатывает любые открытые транзакции, запускает +проверку живости (liveness probe) если соединение простояло дольше +`server_idle_check_timeout`, и сверяет поколение соединения с текущим +поколением пула (reconnect-эпоха — счётчик, который увеличивается при +admin-команде `RECONNECT` и при обнаруженных сбоях backend-а). +Соединения, созданные до увеличения счётчика, эту проверку не проходят +и удаляются вместо возврата клиенту. Здоровый пул в установившемся +режиме идёт только этим путём. **Фаза 2 — warm zone gate.** Если размер пула ниже порога прогрева, пропускаем anticipation и сразу переходим к созданию нового backend-соединения. Холодные пулы заполняются быстро. **Фаза 3 — anticipation spin.** Выше порога прогрева повторяем -recycle 10 раз в плотном цикле `yield_now` (контролируется параметром -`scaling_fast_retries`). Так перехватывается случай, когда другой +проверку пригодности до 10 раз в плотном цикле без пауз (контролируется +параметром `scaling_fast_retries`). Так перехватывается случай, когда +другой клиент завершил свой запрос в том же микросекундном диапазоне и вот-вот вернёт соединение. Полная стоимость порядка 10–50 микросекунд. Без sleep, без блокирующего I/O. **Фаза 4 — direct handoff (прямая передача).** Если spin не поймал -возврат, задача регистрирует oneshot-канал в очереди `waiters` -(`VecDeque` внутри `Slots`). Когда любой клиент возвращает соединение -через `return_object()`, возвращённое соединение отправляется напрямую -через старейший зарегистрированный oneshot-канал, минуя idle-`VecDeque` -целиком. Получатель забирает соединение без конкуренции с другими -задачами — гонки с Phase 1/2 semaphore-waiter'ами нет, потому что -соединение никогда не попадает в idle-очередь. - -Если oneshot-receive удался, соединение проходит recycle-проверку -(`recycle_handoff`). При успешном recycle соединение возвращается -вызывающему. При ошибке (устаревший backend) пул уменьшает -`slots.size`, и вызов проваливается в create path. - -Если соединение не пришло до дедлайна, receiver дропается. -`return_object` обнаруживает дропнутый receiver (`send` возвращает -`Err`), пропускает устаревший waiter и пробует следующий в очереди. -Таким образом протухшие waiter'ы вычищаются лениво, без отдельного -прохода сборки мусора. - -Дедлайн равен `min(query_wait_timeout - 500 ms, PHASE_4_HARD_CAP)`, -где `PHASE_4_HARD_CAP` выбирается случайно в диапазоне **300–500 ms** -(равномерный jitter) для каждой попытки checkout'а. Фаза 1/2 ожидания -семафора тратит из того же бюджета, поэтому суммарное ожидание не -может увести клиента за его `query_wait_timeout`. - -Jitter предотвращает **timeout cliff**: без него N клиентов, вошедших -в Phase 4 одновременно, выходят одновременно и лавиной входят в burst -gate, создавая N новых backend-соединений для пула, которому нужно -значительно меньше. С jitter'ом клиенты выходят порциями — первые +возврат, задача встаёт в очередь ожидающих. Когда любой клиент +возвращает соединение, оно отправляется напрямую старейшему +ожидающему, минуя idle-очередь целиком. Получатель забирает соединение +без конкуренции с другими задачами — соединение никогда не попадает +в общую idle-очередь и достаётся ровно одному адресату. + +Если передача удалась, соединение проходит проверку пригодности. При +успехе соединение возвращается вызывающему. При ошибке (устаревший +backend) пул уменьшает текущий размер, и вызов проваливается в путь +создания нового соединения. + +Если соединение не пришло до дедлайна, ожидающий снимается с очереди. +При попытке доставки пул обнаруживает снятого ожидающего, пропускает +его и пробует следующего. Таким образом устаревшие ожидающие +вычищаются по мере обработки очереди, без отдельного прохода. + +Дедлайн адаптивный: `min(query_wait_timeout - 500 ms, adaptive_cap)`, +где `adaptive_cap` вычисляется из реальной latency транзакций: + +| Состояние пула | Бюджет | Пример | +|---------------|--------|--------| +| Холодный старт (нет данных) | 100ms ± 20% jitter | 80-120ms | +| Устойчивый режим (steady state) | xact_p99 × 2 ± 20% jitter | p99=0.7ms → 5ms (min); p99=50ms → 100ms | +| Высокая latency | Ограничено 500ms | p99=300ms → 500ms | + +Фаза 1/2 ожидания семафора тратит из того же бюджета, поэтому +суммарное ожидание не может увести клиента за его `query_wait_timeout`. + +Случайный разброс (jitter) ±20% предотвращает **обрыв таймаутов (timeout cliff)**: без него N клиентов, +вошедших в Phase 4 одновременно, выходят одновременно и лавиной входят +в burst gate, создавая N новых backend-соединений для пула, которому +нужно значительно меньше. С jitter'ом клиенты выходят порциями — первые создают соединения, и к моменту выхода последних эти соединения уже использованы и возвращены в idle queue для переиспользования. Если дедлайн истёк без получения соединения, переходим к фазе 5. -**Фаза 5 — bounded burst gate.** Перед тем как пойти на новый -`connect()`, задача должна получить *слот* в **bounded burst gate**: -ограниченном per-pool semaphore-подобном механизме, который -параллельно пропускает не больше `scaling_max_parallel_creates` (по -умолчанию 2) задач. Если слот свободен, задача забирает его и идёт -вызывать `connect()`. Если все слоты заняты, задача регистрирует -oneshot-waiter для direct handoff и слушает `create_done` (завершение -чужого create). Если соединение приходит через oneshot-канал, оно -recycl'ится и возвращается. Иначе задача снова пробует recycle и -gate. Короткий `sleep(5 ms)` подстраховывает на случай, если оба -источника пробуждения пропущены. Так лимитируется именно скорость +**Фаза 5 — ограничитель всплесков (bounded burst gate).** Перед тем +как пойти на новый `connect()`, задача должна получить *слот* в +ограничителе — механизме, который параллельно пропускает не больше +`scaling_max_parallel_creates` (по умолчанию 2) задач на пул. Если +слот свободен, задача забирает его и идёт вызывать `connect()`. Если +все слоты заняты, задача встаёт в очередь на прямую передачу и +одновременно ожидает завершения чужого создания. Приоритет отдаётся +прямой передаче: если соединение вернулось, пока задача ждала +освобождения слота, оно доставляется напрямую — задача проверяет его +пригодность и возвращает клиенту. Если передачи не было, задача снова +пробует проверку пригодности и захват слота. Так лимитируется скорость рождения новых backend-соединений в одном пуле, а не размер самого пула. +**Адаптивный таймаут ограничителя всплесков (adaptive timeout burst +gate).** Цикл ограничителя лимитирован адаптивным бюджетом: +`xact_p99 × 2 ± 20%` jitter (min 20 ms, max 500 ms). Если задача +провела в цикле дольше бюджета, она прекращает ожидать прямую +передачу и переходит к захвату слота ограничителя напрямую. Без этого +механизма пул мог застрять на пороге прогрева навсегда: клиенты +бесконечно получали переиспользованные соединения через прямую +передачу, а до создания нового соединения дело не доходило. Счётчик +`burst_gate_budget_exhausted` отслеживает срабатывания. + **Фаза 6 — backend connect.** Запускаем `connect()`, аутентифицируемся, -отдаём соединение клиенту. Burst slot освобождается автоматически по +отдаём соединение клиенту. Слот ограничителя освобождается автоматически по завершении этой фазы независимо от исхода. ``` @@ -292,7 +308,7 @@ PostgreSQL: at most 2 spawning backends at any moment Тот же пул обслуживает все 200 клиентов, но PostgreSQL никогда не видит больше `scaling_max_parallel_creates` (по умолчанию 2) одновременных -backend-spawn'ов из этого пула. Большинство клиентов попадают на +запусков backend-процессов из этого пула. Большинство клиентов попадают на переиспользованное соединение от соседа, который завершил работу мгновением раньше, а не на свежий `connect()`. @@ -324,7 +340,7 @@ gate занят, она немедленно сдаётся и повторяе Логика такова: во время всплеска нагрузки клиенты уже насыщают gate, создавая соединения, которые им нужны *прямо сейчас*. Если фоновый -replenish будет драться с ними за слоты, толку не будет: пул всё равно +replenish будет конкурировать с ними за слоты, толку не будет: пул всё равно поднимется выше `min_pool_size` за счёт клиентских запросов на создание. При каждом таком отступлении фоновой задачи увеличивается счётчик `replenish_deferred`. @@ -335,36 +351,34 @@ replenish будет драться с ними за слоты, толку не бюджет). Если нужна жёсткая гарантия минимума, см. раздел troubleshooting. -### Direct handoff при возврате +### Прямая передача при возврате (direct handoff) -Когда соединение возвращается, `return_object` первым делом проверяет -очередь `waiters` (direct-handoff) внутри `Slots`. Если хотя бы один -waiter зарегистрирован, соединение отправляется через старейший -oneshot-канал, минуя idle-`VecDeque` и семафор. Waiter уже держит -semaphore permit, поэтому вызов `add_permits` не нужен. Waiter'ы, -чей receiver был дропнут (вызывающий протаймаутился), пропускаются: -`send` возвращает `Err` с соединением, и `return_object` пробует -следующий waiter в очереди. +Когда соединение возвращается, пул первым делом проверяет очередь +ожидающих (direct-handoff). Если хотя бы один ожидающий +зарегистрирован, соединение отправляется напрямую старейшему из них, +минуя idle-очередь. Ожидающие, чей вызывающий уже отвалился по +таймауту, пропускаются: пул обнаруживает недоступного получателя и +пробует следующего в очереди. -Если зарегистрированных waiter'ов нет (типичный случай при высоком -throughput, когда каждый checkout попадает в hot path), соединение -кладётся в idle-`VecDeque` и `semaphore.add_permits(1)` будит Phase -1/2 ожидающего, как и раньше. +Если зарегистрированных ожидающих нет (типичный случай при высоком +throughput, когда каждый checkout попадает в горячий путь), соединение +кладётся в idle-очередь и будит ближайшего клиента, ожидающего на +Фазе 1/2. -В обоих случаях координатор (если настроен) уведомляется через -`notify_return_observers`, чтобы Phase C waiter'ы соседних пулов -могли просканировать кандидатов для eviction. Same-pool waiter'ы -никогда не паркуются на `Notify` — они получают соединения напрямую -через oneshot-канал. +В обоих случаях координатор (если настроен) уведомляется о возврате, +чтобы ожидающие Фазы C из соседних пулов могли просканировать +кандидатов для eviction. Ожидающие внутри того же пула получают +соединения напрямую, а не через общее уведомление. ### FIFO-честность и распределение латенси -Очередь `waiters` — `VecDeque`. `push_back` при регистрации, -`pop_front` при доставке. Старейший waiter всегда получает следующее -возвращённое соединение. +Очередь ожидающих — двусторонняя очередь (FIFO): новые ожидающие +добавляются в конец, соединения доставляются из начала. Старейший +ожидающий всегда получает следующее возвращённое соединение. Это даёт измеримо другой профиль латенси по сравнению с пулерами, -использующими broadcast-notify или LIFO-планирование. При 500 +использующими широковещательное пробуждение (broadcast-notify) или +LIFO-планирование. При 500 клиентах на пул из 40 соединений (AWS Fargate): | Пулер | p50 (мс) | p95 (мс) | p99 (мс) | p99/p50 | @@ -400,40 +414,43 @@ p50 у Odyssey в 11 раз ниже, чем у pg_doorman — большинс планирования FIFO минимизирует дисперсию времени ожидания при том же среднем, что и LIFO. Среднее одинаково — разница только в хвосте. -### Pre-replacement при истечении lifetime +### Упреждающая замена при истечении lifetime (pre-replacement) Когда настроен `server_lifetime`, backend-соединения закрываются по достижении индивидуального лимита (базовый ± 20% jitter). Закрытие означает, что в пуле на одно idle-соединение меньше — последующие -checkout'ы могут попасть в anticipation или create path, добавляя -несколько миллисекунд к p99 во время кластеров lifetime expiry. - -**Pre-replacement** убирает этот спайк. Когда checkout recycl'ит -соединение, достигшее **95%** своего lifetime, фоновая задача создаёт -замену и помещает её в idle-очередь. Когда старое соединение +checkout'ы могут попасть в зону упреждающего ожидания или путь +создания нового соединения, добавляя несколько миллисекунд к p99 во +время кластеров истечения lifetime. + +**Упреждающая замена (pre-replacement)** убирает этот всплеск +задержки. Когда checkout проверяет пригодность соединения и +обнаруживает, что оно достигло **95%** своего lifetime, фоновая задача +создаёт замену и помещает её в idle-очередь. Когда старое соединение отклоняется при 100% lifetime, следующий checkout находит -предсозданную замену через hot path — ноль ожидания. - -Параллельно может работать до 3 pre-replacement'ов на пул. Во время -окна перекрытия пул временно держит `max_size + 3` соединений и -соответствующее число дополнительных semaphore permit'ов. Когда старые -соединения умирают, `slots.size` возвращается к `max_size`. - -Guard'ы, предотвращающие неконтролируемый рост: - -| Guard | Предотвращает | -|-------|---------------| -| `!under_pressure()` | Создание лишних при насыщении пула (старое соединение выживет через `skip_lifetime`) | -| `idle_ratio < 25%` | Замену в переразмеренном пуле, который должен сжаться | -| `coordinator headroom >= 2` | Захват последнего coordinator-permit'а у соседнего пула | -| `lifetime >= 60 s` | Срабатывание на коротких lifetime где окно перекрытия слишком мало | -| `slots.size <= max_size + cap` | Накопление нескольких pre-replacement overshoot'ов | -| `try_take_burst_slot` (cap=3) | Ограничение параллельных фоновых create | - -Pre-replacement срабатывает только на пути **checkout** (`try_recycle_one`), -не из retain loop. Idle-соединения, истекающие без checkout'а, -закрываются retain loop **без замены** — так пул естественно сжимается -при падении нагрузки. +предсозданную замену через горячий путь — ноль ожидания. + +Параллельно может работать до 3 упреждающих замен на пул. Во время +окна перекрытия пул временно держит `max_size + 3` соединений. Когда +старые соединения умирают, текущий размер пула возвращается к +`max_size`. + +Условия, предотвращающие неконтролируемый рост: + +| Условие | Предотвращает | +|---------|---------------| +| Пул не под давлением | Создание лишних при насыщении пула (старое соединение выживет, пропустив закрытие по lifetime) | +| Доля idle-соединений < 25% | Замену в переразмеренном пуле, который должен сжаться | +| Запас координатора >= 2 | Захват последнего слота координатора у соседнего пула | +| Lifetime >= 60 s | Срабатывание на коротких lifetime, где окно перекрытия слишком мало | +| Текущий размер <= max_size + cap | Накопление нескольких параллельных превышений | +| Лимит параллельных фоновых создаваемых (cap=3) | Неограниченное число фоновых создаваемых соединений | + +Упреждающая замена срабатывает только на пути **checkout** (при +проверке пригодности), не из фоновой задачи обслуживания. +Idle-соединения, истекающие без checkout'а, закрываются фоновой +задачей **без замены** — так пул естественно сжимается при падении +нагрузки. ## Согласование лимита с PostgreSQL @@ -465,7 +482,7 @@ backend-соединений** к этой базе, ограниченных т превышать 80% от `PostgreSQL max_connections - superuser_reserved_connections`. Оставшиеся 20% оставьте под admin-соединения, репликацию и всплески. -## Coordinator mode +## Режим координатора (coordinator mode) Режим координатора активируется, когда у пула задан `max_db_connections`. Он добавляет второй слой давления **поверх** того, что работает внутри @@ -558,29 +575,30 @@ idle-соединение старше `min_connection_lifetime` у верхне Permit выселенного соединения освобождается синхронно, слот становится доступен сразу. Повторяем захват семафора. Если два вызова конкурируют, проигравший переходит к следующей фазе. p95 кешируется каждые 15 -секунд (stats cycle) как atomic — сканирование читает один `AtomicU64` -на кандидата без блокировки histogram. +секунд (stats cycle) — сканирование читает одно кешированное значение +на кандидата без блокировки гистограммы. **Фаза C — Wait.** Выполняется, когда резерв отключён или полностью -занят *и* Фаза B не нашла что выселить. Регистрируем `Notify`, -который срабатывает на двух событиях: +занят *и* Фаза B не нашла что выселить. Регистрируется подписка +на уведомления, которая срабатывает на двух событиях: -1. Был уничтожен `CoordinatorPermit` соседнего пула — серверное +1. Был освобождён permit координатора соседнего пула — серверное соединение физически закрыто (истёк `server_lifetime`, ошибка - `recycle`, `RECONNECT`), и слот семафора теперь свободен. -2. Соседний пул вернул соединение в свою idle-очередь через - `Pool::return_object` — слот семафора НЕ освободился, но - `spare_above_min` этого соседа только что вырос. - -На каждое пробуждение Фаза C **сначала** вызывает `try_acquire` и -только если дешёвый путь не сработал — вызывает `try_evict_one`. -Пробуждение от drop permit-а оставляет свободный слот в семафоре — -дешёвый путь берёт его, и ни один соседний backend не закрывается. -Пробуждение от idle-return не освобождает слот напрямую, но могло -вырастить `spare_above_min` соседа, поэтому повторная попытка -eviction находит кандидата, которого мгновение назад не было, -дропает permit соседа, и следующий `try_acquire` срабатывает. Этот -порядок (cheap first, evict second) закреплён регрессионным тестом: + проверки пригодности, `RECONNECT`), и слот семафора теперь свободен. +2. Соседний пул вернул соединение в свою idle-очередь — слот + семафора НЕ освободился, но излишек над минимумом этого соседа + только что вырос. + +На каждое пробуждение Фаза C **сначала** пытается захватить свободный +слот неблокирующей проверкой, и только если дешёвый путь не +сработал — пробует выселение. Пробуждение от освобождения permit-а +оставляет свободный слот в семафоре — дешёвый путь берёт его, и ни +один соседний backend не закрывается. Пробуждение от idle-return не +освобождает слот напрямую, но могло вырастить излишек соседа, поэтому +повторная попытка eviction находит кандидата, которого мгновение назад +не было, освобождает permit соседа, и следующая неблокирующая проверка +срабатывает. Этот порядок (дешёвый путь сначала, выселение потом) +закреплён регрессионным тестом: будущий рефакторинг не сможет случайно вернуть закрытия соседних backend-ов на пробуждениях от drop permit-а. @@ -598,28 +616,30 @@ backend-ов на пробуждениях от drop permit-а. бюджет ожидания — на случай, если за время ожидания соседний reserve-держатель отпустил свой permit. Запросы ранжируются по паре `(starving, queued_clients)`, где `starving` означает, что пул сейчас -ниже своего эффективного минимума. Арбитр — это отдельная tokio-задача, -которая раздаёт permit-ы резервного пула из приоритетной очереди. +ниже своего эффективного минимума. Арбитр — это отдельный фоновый +процесс, который раздаёт permit-ы резервного пула из приоритетной +очереди. **Фаза E — Error.** Если Фаза D тоже не выдала permit или резерв не настроен, клиент получает ошибку: `all server connections to database 'X' are in use (max=N, ...)`. -### Reserve → main upgrade (retain task) +### Повышение из резерва в основной пул (reserve → main upgrade, retain task) Reserve-permit — это буфер под всплеск, а не постоянное состояние. После того как всплеск прошёл, backend, получивший reserve-permit, продолжает жить как обычное idle-соединение, но его -`CoordinatorPermit` по-прежнему учитывается в `reserve_in_use` — даже +permit координатора по-прежнему учитывается в `reserve_in_use` — даже когда `current < max_db_connections` и в main-семафоре есть свободные слоты. Без активного обслуживания `SHOW POOL_COORDINATOR` показывал бы занятый резерв при том, что реальная ёмкость для всплеска пустая, и следующему всплеску некуда расти. Retain task запускается каждые `retain_connections_time` (по умолчанию -30 секунд) и делает бухгалтерский swap: для каждого пула, который не -находится **под давлением** (см. определение ниже), он обходит idle -vec и для каждого backend-а, удерживающего reserve-permit, пытается +30 секунд) и делает бухгалтерскую перестановку: для каждого пула, +который не находится **под давлением** (см. определение ниже), он +обходит idle-очередь и для каждого backend-а, удерживающего +reserve-permit, пытается забрать permit из main-семафора. Пул считается **под давлением**, когда его per-pool семафор имеет @@ -644,12 +664,12 @@ Retain task пропускает пулы под давлением по дву цикле. При успехе reserve-permit возвращается в reserve-семафор, `reserve_in_use` уменьшается на единицу, а тип permit у этого backend переключается с -reserve на main. Никакого reconnect, никакого дёргания соседа — только -две атомарные операции. Обход прерывается на первом неудачном upgrade +reserve на main. Никакого переподключения, никакого дёргания соседа. +Обход прерывается на первом неудачном upgrade в пуле: это доказывает, что main-семафор заполнен, и остальные reserve-permit-ы этого пула проверять бессмысленно. Тот же retain-цикл -затем вызывает `close_idle_reserve_connections`, чтобы закрыть -reserve-backend-ы, которые не удалось апгрейднуть и которые +затем закрывает reserve-backend-ы, которые не удалось повысить до +основного и которые простаивают дольше `min_connection_lifetime`. При такой схеме `reserve_in_use > 0` означает ровно одно: либо @@ -657,7 +677,7 @@ reserve-backend-ы, которые не удалось апгрейднуть и `retain_connections_time` назад. Исторический остаток reserve-ёмкости сходится к нулю, как только в main появляется свободное место. -### JIT coordinator permits (burst gate первым) +### Получение permit координатора по требованию (JIT coordinator permits, burst gate первым) Внутри пути получения соединения burst gate работает **до** координатора. Это **JIT (just-in-time)** порядок: coordinator-permit @@ -665,7 +685,7 @@ reserve-backend-ы, которые не удалось апгрейднуть и `connect()`. Предыдущий порядок (координатор первый, потом gate) вызывал -**phantom permits**: N вызовов захватывали по coordinator-permit и +**фантомные permit-ы (phantom permits)**: N вызовов захватывали по coordinator-permit и вставали в очередь за burst gate (cap=2). Реально создавали соединения только 2, но координатор видел N permit'ов в использовании и начинал выдавать reserve-permit'ы — хотя БД была далеко от предела. @@ -674,11 +694,12 @@ reserve-backend-ы, которые не удалось апгрейднуть и `max_parallel_creates` вызовов. Остальные ждут gate слот без расходования бюджета координатора. -**Head-of-line blocking** снимается разделением координатора на -быстрый и медленный путь. Быстрый — неблокирующий `try_acquire()` -внутри gate слота (мгновенно). Если не прошёл — вызов **освобождает -gate слот**, ждёт координатора (eviction / возврат от соседа), и -затем снова занимает gate слот. +**Блокировка головы очереди (head-of-line blocking)** снимается +разделением координатора на быстрый и медленный путь. Быстрый — +неблокирующая проверка доступности слота координатора внутри слота +ограничителя (мгновенно). Если не прошла — вызов **освобождает слот +ограничителя**, ждёт координатора (eviction / возврат от соседа), и +затем снова занимает слот ограничителя. ``` Coordinator + plain mode acquisition flow (JIT) @@ -706,7 +727,7 @@ gate слот**, ждёт координатора (eviction / возврат о v +---------------------------+ | JIT coordinator acquire | only when max_db_connections > 0 - | fast: try_acquire() | неблокирующий CAS + | fast: неблокир. проверка | мгновенный ответ | slow: release gate slot | ожидание координатора (evict/return) | → re-acquire slot | затем продолжить create +------------+--------------+ @@ -737,8 +758,8 @@ gate слот**, ждёт координатора (eviction / возврат о ### Фоновый replenish под координатором -`replenish` берёт permit координатора через `try_acquire` в -неблокирующем режиме. Если база уже на лимите, replenish сдаётся и +`replenish` берёт permit координатора через неблокирующую проверку +доступности. Если база уже на лимите, replenish сдаётся и повторяет попытку на следующем retain-цикле. Та же логика, что и у burst gate: фоновая задача не должна бороться с клиентским трафиком за скудные permit-ы. @@ -754,7 +775,7 @@ Scaling-параметры по умолчанию глобальные. Для | Параметр | По умолчанию | Где | Что делает | |---|---|---|---| | `scaling_warm_pool_ratio` | `20` (процент) | `general`, per-pool | Порог, ниже которого соединения создаются без anticipation. Ниже `pool_size × ratio / 100` каждый запрос нового соединения идёт сразу к `connect()`. | -| `scaling_fast_retries` | `10` | `general`, per-pool | Число `yield_now`-spin retry в фазе anticipation перед переходом к direct handoff (ожиданию на oneshot-канале). | +| `scaling_fast_retries` | `10` | `general`, per-pool | Число быстрых повторных проверок пригодности в фазе anticipation перед переходом к прямой передаче (ожиданию возврата от соседа). | | `scaling_max_parallel_creates` | `2` | `general` | Жёсткий лимит одновременно идущих backend-`connect()` на пул. Задачи сверх лимита ждут возврата idle-соединения или завершения чужого создания. Должен быть `>= 1`. | | `max_db_connections` | не задан (выключено) | per-pool | Лимит суммарного числа backend-соединений к базе по всем user-пулам. Когда не задан, координатор не создаётся. | | `min_connection_lifetime` | `30000` (ms) | per-pool | Минимальный возраст idle-соединения, после которого координатор может выселить его в пользу другого пула. 30-секундный порог подавляет циклический reconnect между соседними пулами. | @@ -919,7 +940,7 @@ reserve координатора отрабатывают полностью, н клиента, которому нужен результат, уже нет. На старте валидатор конфига pg_doorman выдаёт предупреждение; реагируйте на него. -## Observability +## Наблюдаемость (observability) pg_doorman экспортирует состояние давления на пул через admin-консоль и через Prometheus. Оба показывают одни и те же счётчики; выбирайте то, @@ -941,9 +962,9 @@ pgdoorman=> SHOW POOL_SCALING; | `inflight` | gauge | Вызовы `connect()` к бэкенду, выполняемые в этом пуле прямо сейчас. Ограничено `scaling_max_parallel_creates`. | | `creates` | counter | Сколько всего backend-соединений пул начинал создавать с момента старта. В паре с `gate_waits` используется для расчёта частоты попаданий на gate. | | `gate_waits` | counter | Сколько раз вызов наткнулся на заполненный burst gate и был вынужден ждать слот. Высокие значения говорят, что `scaling_max_parallel_creates` слишком низкий. | -| `antic_notify` | counter | Попытки anticipation в Фазе 4, где direct-handoff доставка через oneshot-канал удалась. Инкрементируется один раз на успешный receive, до recycle-проверки. Высокий `antic_notify` при низком `create_fallback` — хороший признак: handoff ловит возвраты, клиенты не платят за `connect()`. | -| `antic_timeout` | counter | Попытки anticipation в Фазе 4, где oneshot протаймаутился без получения соединения, либо бюджет был нулевой. Инкрементируется ровно один раз при каждом провале Фазы 4 в create path. Высокий `antic_timeout` означает, что клиенты упираются в `query_wait_timeout`, не успев получить соединение через handoff. | -| `create_fallback` | counter | Фаза 4 не получила соединение через direct-handoff oneshot-канал: дедлайн исчерпан или бюджет был нулевой. Именно эти ожидания превращаются в новый `connect()`. Стабильно ненулевой `create_fallback` значит, что клиентского бюджета не хватает на перехват возвратов: пул либо мал, либо запросы длиннее `query_wait_timeout`. | +| `antic_notify` | counter | Попытки anticipation в Фазе 4, где прямая передача удалась. Инкрементируется один раз на успешное получение, до проверки пригодности. Высокий `antic_notify` при низком `create_fallback` — хороший признак: прямая передача ловит возвраты, клиенты не платят за `connect()`. | +| `antic_timeout` | counter | Попытки anticipation в Фазе 4, где ожидание истекло без получения соединения, либо бюджет был нулевой. Инкрементируется ровно один раз при каждом провале Фазы 4 в путь создания. Высокий `antic_timeout` означает, что клиенты упираются в `query_wait_timeout`, не успев получить соединение через прямую передачу. | +| `create_fallback` | counter | Фаза 4 не получила соединение через прямую передачу: дедлайн исчерпан или бюджет был нулевой. Именно эти ожидания превращаются в новый `connect()`. Стабильно ненулевой `create_fallback` значит, что клиентского бюджета не хватает на перехват возвратов: пул либо мал, либо запросы длиннее `query_wait_timeout`. | | `replenish_def` | counter | Запуски фонового replenish, упёршиеся в лимит burst gate и отложенные до следующего retain-цикла. Устойчиво ненулевые значения означают, что `min_pool_size` нельзя поддержать при текущей нагрузке. | Все счётчики монотонные с момента старта. Считайте дельты между @@ -990,7 +1011,7 @@ evictions нет, exhaustions нет. Алерты здесь должны мо ``` Всплеск занял большую часть `max_db_connections` и три соединения перелились в резерв. `current < max_db_conn` означает, что в main -есть место, поэтому retain task апгрейднёт эти три permit-а в main +есть место, поэтому retain task повысит эти три permit-а до main на следующем цикле; `reserve_used` должен упасть до 0 в течение `retain_connections_time` (по умолчанию 30 секунд). Если не падает, смотрите раздел troubleshooting ниже. `evictions = 0` и @@ -1025,6 +1046,7 @@ eviction ротирует соединения между пользовател | `pg_doorman_pool_scaling{type="inflight_creates"}` | gauge | `user`, `database` | `inflight` из `SHOW POOL_SCALING` | | `pg_doorman_pool_scaling_total{type="creates_started"}` | counter | `user`, `database` | `creates` | | `pg_doorman_pool_scaling_total{type="burst_gate_waits"}` | counter | `user`, `database` | `gate_waits` | +| `pg_doorman_pool_scaling_total{type="burst_gate_budget_exhausted"}` | counter | `user`, `database` | `gate_budget_ex` — adaptive timeout сработал, клиент перешёл к созданию | | `pg_doorman_pool_scaling_total{type="anticipation_wakes_notify"}` | counter | `user`, `database` | `antic_notify` | | `pg_doorman_pool_scaling_total{type="anticipation_wakes_timeout"}` | counter | `user`, `database` | `antic_timeout` | | `pg_doorman_pool_scaling_total{type="create_fallback"}` | counter | `user`, `database` | `create_fallback` | @@ -1299,7 +1321,7 @@ psql -h 127.0.0.1 -p 6432 -U admin pgdoorman \ ``` Позиции полей в `awk` соответствуют порядку колонок, описанному выше: -для `POOL_SCALING` это `user|database|inflight|creates|gate_waits|antic_notify|antic_timeout|create_fallback|replenish_def`, +для `POOL_SCALING` это `user|database|inflight|creates|gate_waits|gate_budget_ex|antic_notify|antic_timeout|create_fallback|replenish_def`, для `POOL_COORDINATOR` это `database|max_db_conn|current|reserve_size|reserve_used|evictions|reserve_acq|exhaustions`. ## Сравнение с PgBouncer @@ -1342,7 +1364,7 @@ PgBouncer и pg_doorman оба пулят соединения, но давле и отдаёт следующее свободное соединение, но PgBouncer обрабатывает события на одном потоке — под нагрузкой порядок зависит от callback'ов libevent. pg_doorman отправляет возвращённые соединения - через per-waiter oneshot-канал в строгом FIFO-порядке. Результат — + напрямую каждому ожидающему в строгом FIFO-порядке. Результат — p99/p50 в пределах 1.1x при любом числе клиентов, тогда как пулеры без строгого FIFO показывают 10-25x раздувание хвоста при той же нагрузке. @@ -1400,7 +1422,7 @@ PgBouncer и pg_doorman оба пулят соединения, но давле **Причина.** Фаза 4 anticipation (direct handoff) держит клиентов в ожидании возврата до `query_wait_timeout - 500 ms`, но возвраты приходят медленнее, чем клиент готов ждать. Ожидающие либо получают -соединение через oneshot-канал, либо проваливаются в create path +соединение через прямую передачу, либо проваливаются в путь создания по истечении бюджета. Проверьте `create_fallback` в `SHOW POOL_SCALING`: если он ненулевой и растёт, клиенты уходят по таймауту. @@ -1486,13 +1508,13 @@ database 'X' are in use (max=80, ...)`. **Исправление.** В текущих сборках это решается автоматически: retain task каждые `retain_connections_time` (по умолчанию 30 секунд) -запускает `upgrade_reserve_to_main`. Для каждого reserve-backend в -пуле без давления permit меняется с reserve на main, если в -`db_semaphore` есть свободное место. Gauge `reserve_used` должен +запускает повышение reserve-permit-ов до main. Для каждого +reserve-backend в пуле без давления permit меняется с reserve на main, +если в основном семафоре есть свободное место. Gauge `reserve_used` должен упасть до нуля в течение одного retain-цикла. Если `reserve_used` всё равно не уходит, значит пул либо под -устойчивым давлением (`under_pressure() == true` пропускает upgrade — +устойчивым давлением (повышение пропускается, когда пул под давлением — и это правильно, иначе ожидающий клиент тут же заберёт освободившийся слот), либо `current == max_db_connections` (нет main-слота, который можно забрать). Оба случая означают, что база честно исчерпана; @@ -1600,61 +1622,61 @@ postmaster. Если `connect()` действительно быстрый (< 50 ## Глоссарий -- **`bounded burst gate`** — per-pool ограничитель, пропускающий не - более `scaling_max_parallel_creates` одновременных вызовов - `connect()` к бэкенду. Задачи сверх лимита регистрируют - direct-handoff waiter и слушают завершение чужого create, пока - слот не освободится. -- **`CoordinatorPermit`** — RAII-guard, учитывающий один слот - координатора. Несёт флаг `is_reserve`. Дропается, когда backend - физически уничтожается (а не когда он возвращается в idle vec); - при дропе слот возвращается либо в `db_semaphore` (main), либо в - `reserve_semaphore` (reserve). +- **ограничитель всплесков (bounded burst gate)** — ограничитель + на уровне пула, пропускающий не более + `scaling_max_parallel_creates` одновременных вызовов `connect()` к + бэкенду. Задачи сверх лимита встают в очередь на прямую передачу + и ожидают завершения чужого создания, пока слот не освободится. +- **permit координатора (coordinator permit)** — разрешение на + удержание одного слота в общем лимите координатора. Может быть + основным (main) или резервным (reserve). Освобождается, когда + backend-соединение физически уничтожается (а не когда оно + возвращается в idle-очередь); при освобождении слот возвращается + либо в основной, либо в резервный семафор. - **эффективный минимум** — пол для eviction у user-пула, равный `max(user.min_pool_size, pool.min_guaranteed_pool_size)`. Координатор защищает именно столько соединений на пользователя от выселения соседями. -- **direct handoff (прямая передача)** — механизм доставки в Фазе 4. - `return_object` отправляет соединение через oneshot-канал старейшему - зарегистрированному waiter'у, минуя idle-очередь. Гонки с Phase 1/2 - semaphore waiter'ами нет — соединение идёт конкретному вызывающему. +- **прямая передача (direct handoff)** — механизм доставки в Фазе 4. + При возврате соединение отправляется напрямую старейшему + зарегистрированному ожидающему, минуя idle-очередь. Соединение + достаётся ровно одному адресату, конкуренции нет. - **Фаза R (reserve-first)** — короткое замыкание координатора, вставленное между Фазой A и Фазой B. Когда база заполнена, а в резерве есть место, Фаза R выдаёт reserve-permit напрямую через арбитра, вместо того чтобы закрывать соседний backend или парковать клиента в Фазе C. -- **`PHASE_4_HARD_CAP`** — compile-time константа с равномерным - jitter: каждый checkout выбирает случайный cap в диапазоне - 300–500 ms. Верхняя граница wall-time Фазы 4 anticipation, - независимо от `query_wait_timeout`. Не настраивается. Jitter - предотвращает синхронные таймауты, вызывающие stampede в burst - gate. -- **reserve arbiter (арбитр резерва)** — отдельная tokio-задача, - владеющая reserve-permit-ами. Запросы на reserve ранжируются по +- **жёсткий потолок Фазы 4** — каждый checkout выбирает случайный + потолок в диапазоне 300–500 ms. Верхняя граница времени ожидания + Фазы 4 anticipation, независимо от `query_wait_timeout`. Не + настраивается. Случайный разброс предотвращает синхронные + таймауты, вызывающие лавину запросов в ограничитель всплесков. +- **арбитр резерва (reserve arbiter)** — отдельный фоновый процесс, + владеющий reserve-permit-ами. Запросы на reserve ранжируются по паре `(starving, queued_clients)` и разгружаются из приоритетной очереди так, чтобы самые нуждающиеся пользователи получали permit первыми. -- **reserve → main upgrade** — retain-time бухгалтерский swap. Когда - idle backend удерживает reserve-permit, а в `db_semaphore` есть - запас, retain task забирает main-permit, возвращает reserve-слот и - переключает `is_reserve` на permit. Без reconnect. -- **`spare_above_min`** — `slots.size - эффективный_минимум` для - user-пула, где `slots.size` — это количество аллоцированных - соединений пула (активные + idle вместе, а не только idle). +- **повышение из резерва в основной (reserve → main upgrade)** — + периодическая бухгалтерская перестановка. Когда idle-backend + удерживает reserve-permit, а в основном семафоре есть запас, retain + task забирает основной permit, возвращает резервный слот и + переключает тип permit. Без переподключения. +- **излишек над минимумом (spare_above_min)** — текущий размер пула + минус эффективный минимум, где текущий размер — это количество + всех соединений пула (активные + idle вместе, а не только idle). Координатор использует это значение, чтобы выбрать жертву eviction: - пул с самым большим `spare_above_min` теряет соединение первым. - Само соединение, чтобы его можно было выселить, всё равно должно - лежать idle в vec — `spare_above_min` выбирает пул, а не - конкретное соединение. -- **`starving` пользователь** — user-пул, у которого текущее число + пул с самым большим излишком теряет соединение первым. Само + соединение, чтобы его можно было выселить, всё равно должно быть + свободным — излишек выбирает пул, а не конкретное соединение. +- **голодающий пользователь (starving)** — user-пул, у которого текущее число соединений ниже эффективного минимума. Арбитр резерва даёт starving-пользователям абсолютный приоритет перед обычными. -- **`under_pressure()`** — предикат, возвращающий `true`, когда - per-pool семафор пула имеет ноль свободных permit-ов — эквивалентно - тому, что каждый слот сейчас занят. Retain task пропускает - upgrade/close на таких пулах, потому что иначе слот просто перейдёт - ожидающему клиенту. -- **warm threshold** — `pool_size × scaling_warm_pool_ratio / 100`. - Ниже этого размера пул пропускает anticipation и идёт сразу в - `connect()`. Выше — anticipation активна, и пул пытается ловить +- **пул под давлением (under pressure)** — состояние, при котором + все permit-ы пула заняты, то есть каждый слот сейчас используется. + Retain task пропускает повышение/закрытие на таких пулах, потому + что иначе слот просто перейдёт ожидающему клиенту. +- **порог прогрева (warm threshold)** — + `pool_size × scaling_warm_pool_ratio / 100`. Ниже этого размера + пул пропускает упреждающее ожидание и идёт сразу в `connect()`. + Выше — упреждающее ожидание активно, и пул пытается ловить возвраты, прежде чем создавать новые backend-ы. diff --git a/src/admin/show.rs b/src/admin/show.rs index 51e60300..55b1e27e 100644 --- a/src/admin/show.rs +++ b/src/admin/show.rs @@ -653,6 +653,7 @@ where ("inflight", DataType::Numeric), ("creates", DataType::Numeric), ("gate_waits", DataType::Numeric), + ("gate_budget_ex", DataType::Numeric), ("antic_notify", DataType::Numeric), ("antic_timeout", DataType::Numeric), ("create_fallback", DataType::Numeric), @@ -675,6 +676,7 @@ where snapshot.inflight_creates.to_string(), snapshot.creates_started.to_string(), snapshot.burst_gate_waits.to_string(), + snapshot.burst_gate_budget_exhausted.to_string(), snapshot.anticipation_wakes_notify.to_string(), snapshot.anticipation_wakes_timeout.to_string(), snapshot.create_fallback.to_string(), diff --git a/src/app/server.rs b/src/app/server.rs index 6d2bf82d..c7938da5 100644 --- a/src/app/server.rs +++ b/src/app/server.rs @@ -815,7 +815,7 @@ async fn binary_upgrade_and_shutdown( // restart the service when we exit. if let Err(e) = sd_notify::notify( false, - &[sd_notify::NotifyState::MainPid(child_pid.into())], + &[sd_notify::NotifyState::MainPid(child_pid)], ) { warn!("sd_notify MAINPID failed: {e}. systemd may restart the service after old process exits."); } @@ -834,8 +834,15 @@ async fn binary_upgrade_and_shutdown( let _ = MIGRATION_TX.set(tx); // MIGRATION_IN_PROGRESS already set above (before SHUTDOWN) let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); - let sender_handle = tokio::spawn(migration_sender_task(migration_parent_fd, rx, shutdown_rx)); - migration_handles = Some(MigrationHandles { shutdown_tx, sender_handle }); + let sender_handle = tokio::spawn(migration_sender_task( + migration_parent_fd, + rx, + shutdown_rx, + )); + migration_handles = Some(MigrationHandles { + shutdown_tx, + sender_handle, + }); info!("Client migration enabled"); } diff --git a/src/client/core.rs b/src/client/core.rs index 29f6b567..c0ea0389 100644 --- a/src/client/core.rs +++ b/src/client/core.rs @@ -358,6 +358,11 @@ pub struct Client { /// Connected to server pub(crate) connected_to_server: bool, + /// Session mode: transaction start timestamp for per-transaction xact_time. + /// Set when server transitions into a transaction (ReadyForQuery 'T'/'E'). + /// Consumed when transaction ends (ReadyForQuery 'I'). + pub(crate) session_xact_start: Option, + /// Name of the server pool for this client (This comes from the database name in the connection string) pub(crate) pool_name: String, diff --git a/src/client/migration.rs b/src/client/migration.rs index 00cd9e36..7276c795 100644 --- a/src/client/migration.rs +++ b/src/client/migration.rs @@ -12,7 +12,7 @@ use std::ffi::c_void; use crate::client::buffer_pool::PooledBuffer; use crate::client::core::{CachedStatement, Client, PreparedStatementKey}; use crate::client::util::PREPARED_STATEMENT_COUNTER; -use crate::config::get_config; +use crate::config::{get_config, BackendAuthMethod}; use crate::errors::Error; use crate::messages::config_socket::configure_tcp_socket; use crate::messages::Parse; @@ -22,8 +22,34 @@ use crate::stats::ClientStats; use super::core::PreparedStatementState; +/// Restore backend auth state (e.g. SCRAM ClientKey) from a migrated client +/// so the new process can authenticate to PostgreSQL via passthrough without +/// waiting for a fresh client SCRAM handshake. +/// +/// Only overwrites if the current state is `ScramPending` — avoids clobbering +/// a valid auth state already established by an earlier migrated client. +fn restore_backend_auth_if_pending( + pool: Option<&ConnectionPool>, + migrated_auth: Option<&BackendAuthMethod>, + username: &str, + pool_name: &str, +) { + if let (Some(p), Some(auth)) = (pool, migrated_auth) { + if let Some(ref ba_lock) = p.address.backend_auth { + let needs_update = matches!(*ba_lock.read(), BackendAuthMethod::ScramPending); + if needs_update { + *ba_lock.write() = auth.clone(); + info!( + "[{}@{}] restored backend auth from migrated client", + username, pool_name + ); + } + } + } +} + const MIGRATION_MAGIC: u32 = 0x50474D47; // "PGMG" -const MIGRATION_VERSION: u16 = 1; +const MIGRATION_VERSION: u16 = 2; /// Fixed-size header: magic(4) + version(2) + connection_id(8) + secret_key(4) + transaction_mode(1) const HEADER_SIZE: usize = 4 + 2 + 8 + 4 + 1; const MAX_PREPARED_ENTRIES: usize = 100_000; @@ -193,6 +219,31 @@ where buf.put_u8(use_tls as u8); + // Backend auth state (v2): allows new process to skip ScramPending + // fallback by receiving the ClientKey from the old process. + if let Some(pool) = get_pool(&self.pool_name, &self.username) { + if let Some(ref ba_lock) = pool.address.backend_auth { + match &*ba_lock.read() { + BackendAuthMethod::Md5PassTheHash(hash) => { + buf.put_u8(1); + put_str(&mut buf, hash); + } + BackendAuthMethod::ScramPassthrough(client_key) => { + buf.put_u8(2); + buf.put_u16(client_key.len() as u16); + buf.put_slice(client_key); + } + BackendAuthMethod::ScramPending => { + buf.put_u8(3); + } + } + } else { + buf.put_u8(0); // no backend auth + } + } else { + buf.put_u8(0); + } + buf } } @@ -243,6 +294,7 @@ struct DeserializedState { prepared_entries: Vec, #[allow(dead_code)] use_tls: bool, + backend_auth: Option, } struct PreparedEntry { @@ -262,7 +314,7 @@ fn deserialize_state(mut buf: BytesMut) -> Result { ))); } let version = buf.get_u16(); - if version != MIGRATION_VERSION { + if version != 1 && version != MIGRATION_VERSION { return Err(Error::ClientError(format!( "migration: unsupported version {version}" ))); @@ -349,6 +401,31 @@ fn deserialize_state(mut buf: BytesMut) -> Result { require(&buf, 1)?; let use_tls = buf.get_u8() != 0; + // v2: backend auth state (ScramPassthrough ClientKey, Md5 hash) + let backend_auth = if version >= 2 && buf.remaining() > 0 { + let tag = buf.get_u8(); + match tag { + 1 => { + // Md5PassTheHash + let hash = get_str(&mut buf)?; + Some(BackendAuthMethod::Md5PassTheHash(hash)) + } + 2 => { + // ScramPassthrough(ClientKey) + require(&buf, 2)?; + let key_len = buf.get_u16() as usize; + require(&buf, key_len)?; + let mut key = vec![0u8; key_len]; + buf.copy_to_slice(&mut key); + Some(BackendAuthMethod::ScramPassthrough(key)) + } + 3 => Some(BackendAuthMethod::ScramPending), + _ => None, + } + } else { + None // v1 format: no auth state + }; + Ok(DeserializedState { connection_id, secret_key, @@ -361,6 +438,7 @@ fn deserialize_state(mut buf: BytesMut) -> Result { async_client, prepared_entries, use_tls, + backend_auth, }) } @@ -389,6 +467,14 @@ pub async fn reconstruct_client( // Reconstruct prepared statement cache let pool = get_pool(&state.pool_name, &state.username); + + restore_backend_auth_if_pending( + pool.as_ref(), + state.backend_auth.as_ref(), + &state.username, + &state.pool_name, + ); + let prepared = reconstruct_prepared_state( state.prepared_enabled, state.async_client, @@ -430,6 +516,7 @@ pub async fn reconstruct_client( admin: false, last_server_stats: None, connected_to_server: false, + session_xact_start: None, pool_name: state.pool_name, username: state.username, server_parameters: state.server_parameters, @@ -485,6 +572,14 @@ pub async fn reconstruct_tls_client( let config = get_config(); let pool = get_pool(&state.pool_name, &state.username); + + restore_backend_auth_if_pending( + pool.as_ref(), + state.backend_auth.as_ref(), + &state.username, + &state.pool_name, + ); + let prepared = reconstruct_prepared_state( state.prepared_enabled, state.async_client, @@ -526,6 +621,7 @@ pub async fn reconstruct_tls_client( admin: false, last_server_stats: None, connected_to_server: false, + session_xact_start: None, pool_name: state.pool_name, username: state.username, server_parameters: state.server_parameters, @@ -861,6 +957,7 @@ pub async fn migration_receiver_task( #[cfg(all(target_os = "linux", feature = "tls-migration"))] let tls_acceptor = _tls_acceptor; use crate::app::server::CURRENT_CLIENT_COUNT; + use crate::stats::TOTAL_CONNECTION_COUNTER; use std::sync::atomic::Ordering; info!("migration receiver: listening for migrated clients"); @@ -882,6 +979,10 @@ pub async fn migration_receiver_task( { Ok(mut client) => { CURRENT_CLIENT_COUNT.fetch_add(1, Ordering::SeqCst); + TOTAL_CONNECTION_COUNTER.fetch_max( + client.connection_id as usize, + Ordering::Relaxed, + ); info!( "[{}@{} #c{}] migrated TLS client from {}", client.username, @@ -915,6 +1016,10 @@ pub async fn migration_receiver_task( match reconstruct_client(fd, state_buf, csm).await { Ok(mut client) => { CURRENT_CLIENT_COUNT.fetch_add(1, Ordering::SeqCst); + // Advance the global counter past the migrated id so new + // connections don't collide with migrated client ids. + TOTAL_CONNECTION_COUNTER + .fetch_max(client.connection_id as usize, Ordering::Relaxed); info!( "[{}@{} #c{}] migrated client accepted from {}", client.username, diff --git a/src/client/startup.rs b/src/client/startup.rs index 7b6f4787..14a85790 100644 --- a/src/client/startup.rs +++ b/src/client/startup.rs @@ -411,6 +411,7 @@ where admin, last_server_stats: None, connected_to_server: false, + session_xact_start: None, pool_name, username: std::mem::take(&mut client_identifier.username), server_parameters, @@ -460,6 +461,7 @@ where server_parameters: ServerParameters::new(), prepared: PreparedStatementState::default(), connected_to_server: false, + session_xact_start: None, client_last_messages_in_tx: PooledBuffer::new(), max_memory_usage: 128 * 1024 * 1024, pooler_check_query_request_vec: Vec::new(), diff --git a/src/client/transaction.rs b/src/client/transaction.rs index f26c409e..fd5a345e 100644 --- a/src/client/transaction.rs +++ b/src/client/transaction.rs @@ -10,8 +10,7 @@ use crate::utils::clock::now; use crate::admin::handle_admin; use crate::app::server::{ - CLIENTS_IN_TRANSACTIONS, MIGRATION_IN_PROGRESS, MIGRATION_TX, - SHUTDOWN_IN_PROGRESS, + CLIENTS_IN_TRANSACTIONS, MIGRATION_IN_PROGRESS, MIGRATION_TX, SHUTDOWN_IN_PROGRESS, }; use crate::client::batch_handling::PARSE_COMPLETE_MSG; use crate::client::core::{BatchOperation, Client, PreparedStatementKey}; @@ -169,25 +168,32 @@ where S: tokio::io::AsyncRead + std::marker::Unpin, T: tokio::io::AsyncWrite + std::marker::Unpin, { - /// Complete transaction statistics and check if server should be released - /// Returns true if the transaction loop should break (server should be released) #[inline(always)] fn complete_transaction_if_needed(&mut self, server: &Server, check_async: bool) -> bool { - if !server.in_transaction() { - self.stats.transaction(); - server - .stats - .transaction(self.server_parameters.get_application_name()); - - // Release server back to the pool if we are in transaction mode. - // If we are in session mode, we keep the server until the client disconnects. - if self.transaction_mode && !server.in_copy_mode() { - // Don't release if in async mode (when check_async is true) - if !check_async || !server.is_async() { - return true; - } + if server.in_transaction() { + if self.session_xact_start.is_none() { + self.session_xact_start = Some(crate::utils::clock::now()); + } + return false; + } + + self.stats.transaction(); + server + .stats + .transaction(self.server_parameters.get_application_name()); + + if !self.transaction_mode { + if let Some(start) = self.session_xact_start.take() { + server + .stats + .add_xact_time_and_idle(start.elapsed().as_micros() as u64); } } + + if self.transaction_mode && !server.in_copy_mode() && (!check_async || !server.is_async()) { + return true; + } + false } @@ -609,7 +615,10 @@ where Ok(()) => { info!( "[{}@{} #c{}] client {} migrated to new process", - self.username, self.pool_name, self.connection_id, self.addr + self.username, + self.pool_name, + self.connection_id, + self.addr ); // Note: do NOT decrement CURRENT_CLIENT_COUNT here. // The caller (server.rs accept loop) decrements it @@ -765,13 +774,25 @@ where .max_wait_time .fetch_max(checkout_us, Ordering::Relaxed); if checkout_us >= 500_000 { + let status = current_pool.database.status(); + let scaling = current_pool.database.scaling_stats(); warn!( - "[{}@{} #c{}] slow checkout: {}ms (server pid={})", + "[{}@{} #c{}] slow checkout: {}ms pid={} size={}/{} avail={} waiting={} inflight={} creates={} gate_waits={} bg_timeout={} antic_ok={} antic_to={} fallback={}", self.username, self.pool_name, self.connection_id, checkout_us / 1_000, server.get_process_id(), + status.size, status.max_size, + status.available, + status.waiting, + scaling.inflight_creates, + scaling.creates_started, + scaling.burst_gate_waits, + scaling.burst_gate_budget_exhausted, + scaling.anticipation_wakes_notify, + scaling.anticipation_wakes_timeout, + scaling.create_fallback, ); } let server_active_at = now(); @@ -905,6 +926,12 @@ where }; self.stats.active_idle(); + // Session mode: reset query timer per message so query_time + // reflects individual queries, not cumulative session duration. + if !self.transaction_mode { + query_start_at = now(); + } + // The message will be forwarded to the server intact. We still would like to // parse it below to figure out what to do with it. @@ -1006,9 +1033,11 @@ where } else if !server.is_async() { server.checkin_cleanup().await?; } - server - .stats - .add_xact_time_and_idle(server_active_at.elapsed().as_micros() as u64); + if self.transaction_mode { + server + .stats + .add_xact_time_and_idle(server_active_at.elapsed().as_micros() as u64); + } // The server is no longer bound to us, we can't cancel it's queries anymore. self.release(); server.stats.wait_idle(); @@ -1046,6 +1075,9 @@ where message: Option<&BytesMut>, server: &mut Server, ) -> Result<(), Error> { + if !self.transaction_mode && self.session_xact_start.is_none() { + self.session_xact_start = Some(crate::utils::clock::now()); + } let message = message.unwrap_or(&self.buffer); // Send message with timeout diff --git a/src/pool/inner.rs b/src/pool/inner.rs index 9ebb0a9a..a58bb618 100644 --- a/src/pool/inner.rs +++ b/src/pool/inner.rs @@ -9,7 +9,7 @@ use std::{ time::Duration, }; -use log::debug; +use log::{debug, warn}; use rand::Rng as _; use crate::utils::clock; @@ -155,6 +155,11 @@ pub(crate) struct ScalingStats { /// and deferred its work to the next retain cycle. Persistent non-zero /// values indicate `min_pool_size` cannot be sustained under current load. pub(crate) replenish_deferred: AtomicU64, + /// Number of times the burst gate adaptive budget was exhausted. + /// A sustained non-zero rate means the pool is undersized: clients wait + /// longer than 2× xact_p99 for a recycled connection before proceeding + /// to create a new one. + pub(crate) burst_gate_budget_exhausted: AtomicU64, /// Number of pre-replacement connections created ahead of lifetime expiry. pub(crate) pre_replacements_triggered: AtomicU64, /// Number of pre-replacement attempts skipped (coordinator full, pressure, @@ -167,6 +172,7 @@ pub(crate) struct ScalingStats { pub struct ScalingStatsSnapshot { pub creates_started: u64, pub burst_gate_waits: u64, + pub burst_gate_budget_exhausted: u64, pub anticipation_wakes_notify: u64, pub anticipation_wakes_timeout: u64, pub create_fallback: u64, @@ -233,6 +239,58 @@ const PRE_REPLACE_THRESHOLD_PCT: u64 = 95; /// ensures each one gets a warm replacement without serialization. const MAX_CONCURRENT_PRE_REPLACEMENTS: usize = 3; +/// Anticipation budget: absolute maximum wait before falling through to create. +const ANTICIPATION_HARD_CAP_MS: u64 = 500; + +/// Anticipation budget at cold start when xact_p99 histogram has no data. +/// Conservative enough to not overwhelm coordinator when all pools start +/// simultaneously, fast enough to fill the pool within seconds. +const ANTICIPATION_COLD_START_MS: u64 = 100; + +/// Anticipation budget: minimum wait. Even with xact_p99 < 3ms, wait at +/// least this long to give the direct-handoff a chance before creating. +const ANTICIPATION_MIN_BUDGET_MS: u64 = 5; + +/// Time reserved after anticipation for the actual create() call. +/// Subtracted from the total budget before entering the handoff wait. +const ANTICIPATION_CREATE_RESERVE: Duration = Duration::from_millis(500); + +/// Fallback total budget when `timeouts.wait` is None (no query_wait_timeout). +const ANTICIPATION_FALLBACK_BUDGET_MS: u64 = 100; + +/// Backoff between retries when the burst gate budget is exhausted. +/// The client stops registering as a handoff waiter and just listens +/// for `create_done` notifications with this timeout between retries. +const BURST_GATE_EXHAUSTED_BACKOFF: Duration = Duration::from_millis(50); + +/// Burst gate adaptive timeout: minimum budget before exiting the handoff loop. +/// Below 20ms, fork() + shared_buffers attach on large instances can take longer, +/// causing unnecessary creates during brief spikes. +const BURST_GATE_MIN_BUDGET_MS: u64 = 20; + +/// Compute the base anticipation budget (before jitter) from xact_p99. +/// Pure function, deterministic, safe to call from tests. +#[inline] +fn anticipation_base_ms(xact_p99_us: u64) -> u64 { + if xact_p99_us == 0 { + ANTICIPATION_COLD_START_MS + } else { + xact_p99_us.saturating_mul(2) / 1000 + } +} + +/// Compute burst gate adaptive budget from xact_p99. +/// Reuses `anticipation_base_ms` for the base, adds ±20% jitter. +#[inline] +fn burst_gate_budget(xact_p99_us: u64) -> Duration { + let base_ms = anticipation_base_ms(xact_p99_us); + let jitter_range = (base_ms / 5).max(1); + let jitter = rand::rng().random_range(0..=jitter_range * 2); + let budget_ms = (base_ms.saturating_sub(jitter_range) + jitter) + .clamp(BURST_GATE_MIN_BUDGET_MS, ANTICIPATION_HARD_CAP_MS); + Duration::from_millis(budget_ms) +} + /// Push a connection into the idle queue respecting the configured /// queue mode (FIFO/LIFO). Caller must hold the slots lock. #[inline(always)] @@ -360,11 +418,11 @@ impl PoolInner { push_idle(self.config.queue_mode, &mut slots.vec, inner); } - // Add a semaphore permit so a client can reach the pre-created - // connection through the normal checkout path. The extra permit - // is consumed when the old connection fails recycle: the caller - // holds it and retries try_recycle_one which finds the replacement. - self.semaphore.add_permits(1); + // No semaphore.add_permits needed: return_object now always + // restores the returning client's permit (both handoff and idle + // paths), so no extra permit is required to compensate for future + // handoff drain. The client checking out this pre-created + // connection will acquire its own permit normally. self.scaling_stats .pre_replacements_triggered @@ -469,10 +527,14 @@ impl PoolInner { match sender.send(inner) { Ok(()) => { drop(slots); - // No semaphore.add_permits — the waiter already holds - // a permit. No coordinator notify — the connection was - // handed off, not placed in the idle queue, so a peer - // eviction scan would find nothing. + // Restore the returning client's semaphore permit. + // The waiter holds its OWN permit (from acquire_semaphore), + // so this is not double-counting — it compensates for the + // permit.forget() when this connection was last wrapped. + // Without this, each handoff permanently drains one permit + // because the returning client re-enters timeout_get and + // acquires a NEW permit, but the old one was never restored. + self.semaphore.add_permits(1); return; } Err(returned_inner) => { @@ -558,8 +620,8 @@ enum CoordinatorJitResult<'a> { impl Pool { /// Wrap a recycled/created ObjectInner into an Object, consuming - /// the semaphore permit so it stays with the connection until - /// Object::drop returns it. + /// the semaphore permit. The permit is restored by `return_object` + /// (via `add_permits(1)`) when the Object is dropped. #[inline(always)] fn wrap_checkout(&self, inner: ObjectInner, permit: SemaphorePermit<'_>) -> Object { permit.forget(); @@ -578,6 +640,15 @@ impl Pool { timeouts: &Timeouts, non_blocking: bool, ) -> BurstGateOutcome<'_> { + let (_, _, _, xact_p99_us) = self + .inner + .server_pool + .address() + .stats + .get_xact_percentiles(); + let budget = burst_gate_budget(xact_p99_us); + let loop_start = tokio::time::Instant::now(); + loop { if let Some(guard) = self.inner.try_acquire_burst_gate() { return BurstGateOutcome::Acquired(guard); @@ -601,13 +672,30 @@ impl Pool { return BurstGateOutcome::Recycled(inner); } + // Adaptive timeout: waited longer than 2× xact_p99 — pool is undersized. + // Stop accepting recycled connections, wait for the burst gate directly. + if loop_start.elapsed() > budget { + self.inner + .scaling_stats + .burst_gate_budget_exhausted + .fetch_add(1, Ordering::Relaxed); + let notify = self.inner.create_done.notified(); + let _ = tokio::time::timeout(BURST_GATE_EXHAUSTED_BACKOFF, notify).await; + continue; + } + // Register a direct-handoff waiter AND listen on create_done. - let (tx, rx) = oneshot::channel(); + // `biased;` ensures rx is always checked first: without it, + // tokio::select! randomly picks among ready branches, and a + // connection delivered to rx can be silently dropped when + // on_create or sleep wins the race — leaking slots.size. + let (tx, mut rx) = oneshot::channel(); self.inner.slots.lock().waiters.push_back(tx); let on_create = self.inner.create_done.notified(); tokio::select! { - result = rx => { + biased; + result = &mut rx => { if let Ok(inner) = result { if let Ok(inner) = self.recycle_handoff(inner, timeouts).await { return BurstGateOutcome::Recycled(Box::new(inner)); @@ -618,6 +706,18 @@ impl Pool { _ = tokio::time::sleep(BURST_BACKOFF) => {} } + // A connection could arrive between the poll of rx and the + // drop of the select future. Push it to idle directly — + // the original return_object that sent it here already + // called add_permits(1), so calling return_object again + // would double-count the permit. + if let Ok(inner) = rx.try_recv() { + let mut slots = self.inner.slots.lock(); + push_idle(self.inner.config.queue_mode, &mut slots.vec, inner); + drop(slots); + self.inner.notify_return_observers(); + } + // After wake — try recycle once before retrying the gate. if let RecycleOutcome::Reused(inner) = self.inner.try_recycle_one(timeouts).await { return BurstGateOutcome::Recycled(inner); @@ -662,6 +762,15 @@ impl Pool { { Ok(p) => p, Err(pool_coordinator::AcquireError::NoConnection(info)) => { + let slots = self.inner.slots.lock(); + warn!( + "[{}@{}] checkout failed at phase=coordinator size={} waiters={} info={}", + self.inner.pool_name, + self.inner.username, + slots.size, + slots.waiters.len(), + info, + ); return Err(PoolError::DbLimitExhausted(info)); } }; @@ -802,21 +911,29 @@ impl Pool { // Direct handoff via oneshot channel. if !capacity_deficit && !non_blocking { - const CREATE_RESERVE: Duration = Duration::from_millis(500); - const FALLBACK_BUDGET_MS: u64 = 100; - let total_budget = match timeouts.wait { Some(wait) => wait .saturating_sub(start.elapsed()) - .saturating_sub(CREATE_RESERVE), - None => Duration::from_millis(FALLBACK_BUDGET_MS), + .saturating_sub(ANTICIPATION_CREATE_RESERVE), + None => Duration::from_millis(ANTICIPATION_FALLBACK_BUDGET_MS), }; if !total_budget.is_zero() { - const PHASE_4_HARD_CAP_BASE_MS: u64 = 500; - const PHASE_4_HARD_CAP_JITTER_MS: u64 = 200; - let cap_ms = PHASE_4_HARD_CAP_BASE_MS - - rand::rng().random_range(0..=PHASE_4_HARD_CAP_JITTER_MS); + // Adaptive anticipation budget: wait proportionally to actual + // transaction latency. If a return doesn't arrive within 2x + // the p99 xact time, creating is cheaper than waiting. + let (_, _, _, xact_p99_us) = self + .inner + .server_pool + .address() + .stats + .get_xact_percentiles(); + let base_ms = anticipation_base_ms(xact_p99_us); + // ±20% jitter to prevent synchronized creates across pools + let jitter_range = (base_ms / 5).max(1); + let jitter = rand::rng().random_range(0..=jitter_range * 2); + let cap_ms = (base_ms.saturating_sub(jitter_range) + jitter) + .clamp(ANTICIPATION_MIN_BUDGET_MS, ANTICIPATION_HARD_CAP_MS); let effective_budget = total_budget.min(Duration::from_millis(cap_ms)); let (tx, rx) = oneshot::channel(); @@ -895,20 +1012,25 @@ impl Pool { let start = tokio::time::Instant::now(); self.wait_if_paused(timeouts).await?; - let permit = self.acquire_semaphore(timeouts).await?; + let permit = self.acquire_semaphore(timeouts).await.inspect_err(|_e| { + let slots = self.inner.slots.lock(); + warn!( + "[{}@{}] checkout timeout at phase=semaphore elapsed={}ms size={} max={} waiters={} semaphore_avail={}", + self.inner.pool_name, self.inner.username, + start.elapsed().as_millis(), slots.size, slots.max_size, + slots.waiters.len(), self.inner.semaphore.available_permits(), + ); + })?; - // Hot-path recycle: pop an idle connection if available. if let RecycleOutcome::Reused(inner) = self.inner.try_recycle_one(timeouts).await { self.maybe_trigger_pre_replacement(&inner.metrics); return Ok(self.wrap_checkout(*inner, permit)); } - // Anticipation: warm zone gate → fast spin → direct handoff. if let Some(inner) = self.try_anticipate(timeouts, start).await { return Ok(self.wrap_checkout(inner, permit)); } - // Drain remaining recyclable connections before creating new ones. loop { match self.inner.try_recycle_one(timeouts).await { RecycleOutcome::Reused(inner) => { @@ -919,7 +1041,6 @@ impl Pool { } } - // Burst gate + coordinator + backend connect. let non_blocking = timeouts.wait.is_some_and(|t| t.as_nanos() == 0); let _create_gate = match self.acquire_burst_gate(timeouts, non_blocking).await { BurstGateOutcome::Acquired(guard) => guard, @@ -927,6 +1048,14 @@ impl Pool { return Ok(self.wrap_checkout(*inner, permit)); } BurstGateOutcome::Timeout => { + let slots = self.inner.slots.lock(); + warn!( + "[{}@{}] checkout timeout at phase=burst_gate elapsed={}ms size={} inflight={} waiters={}", + self.inner.pool_name, self.inner.username, + start.elapsed().as_millis(), slots.size, + self.inner.inflight_creates.load(Ordering::Relaxed), + slots.waiters.len(), + ); return Err(PoolError::Timeout(TimeoutType::Wait)); } }; @@ -945,7 +1074,19 @@ impl Pool { let obj_inner = self .inner .create_connection(timeouts, coordinator_permit) - .await?; + .await + .map_err(|e| { + let slots = self.inner.slots.lock(); + warn!( + "[{}@{}] checkout failed at phase=create elapsed={}ms size={} err={}", + self.inner.pool_name, + self.inner.username, + start.elapsed().as_millis(), + slots.size, + e, + ); + e + })?; Ok(self.wrap_checkout(obj_inner, permit)) } @@ -1398,6 +1539,7 @@ impl Pool { ScalingStatsSnapshot { creates_started: s.creates_started.load(Ordering::Relaxed), burst_gate_waits: s.burst_gate_waits.load(Ordering::Relaxed), + burst_gate_budget_exhausted: s.burst_gate_budget_exhausted.load(Ordering::Relaxed), anticipation_wakes_notify: s.anticipation_wakes_notify.load(Ordering::Relaxed), anticipation_wakes_timeout: s.anticipation_wakes_timeout.load(Ordering::Relaxed), create_fallback: s.create_fallback.load(Ordering::Relaxed), @@ -2194,4 +2336,575 @@ mod tests { assert!(waiters.is_empty()); // return_object would push to vec + add_permits here. } + + // ------------------------------------------------------------------ + // Adaptive anticipation budget + // ------------------------------------------------------------------ + + #[test] + fn anticipation_budget_cold_start() { + // No histogram data (fresh process). Default 100ms. + assert_eq!(anticipation_base_ms(0), 100); + } + + #[test] + fn anticipation_budget_fast_workload() { + // xact_p99 = 700us (0.7ms). base = 0.7ms * 2 = 1ms. + // Clamped to MIN_BUDGET_MS = 5ms during jitter step. + assert_eq!(anticipation_base_ms(700), 1); + } + + #[test] + fn anticipation_budget_medium_workload() { + // xact_p99 = 50ms (50000us). base = 50 * 2 = 100ms. + assert_eq!(anticipation_base_ms(50_000), 100); + } + + #[test] + fn anticipation_budget_high_latency() { + // xact_p99 = 300ms (300000us). base = 300 * 2 = 600ms. + // Clamped to HARD_CAP (500ms) during jitter step. + assert_eq!(anticipation_base_ms(300_000), 600); + } + + #[test] + fn anticipation_budget_clamp_range() { + // Verify the full pipeline: base → jitter → clamp + for p99_us in [0, 500, 1000, 5000, 50_000, 200_000, 500_000] { + let base = anticipation_base_ms(p99_us); + let jitter_range = (base / 5).max(1); + // Min possible after jitter + let min_val = base.saturating_sub(jitter_range); + // Max possible after jitter + let max_val = base + jitter_range; + // After clamp + let clamped_min = min_val.clamp(ANTICIPATION_MIN_BUDGET_MS, ANTICIPATION_HARD_CAP_MS); + let clamped_max = max_val.clamp(ANTICIPATION_MIN_BUDGET_MS, ANTICIPATION_HARD_CAP_MS); + assert!(clamped_min >= ANTICIPATION_MIN_BUDGET_MS); + assert!(clamped_max <= ANTICIPATION_HARD_CAP_MS); + } + } + + #[test] + fn burst_gate_budget_cold_start() { + let budget = burst_gate_budget(0); + assert!(budget.as_millis() >= BURST_GATE_MIN_BUDGET_MS as u128); + assert!(budget.as_millis() <= ANTICIPATION_HARD_CAP_MS as u128); + } + + #[test] + fn burst_gate_budget_normal_workload() { + // xact_p99 = 67ms (67000us). base = 134ms. jitter ±27ms. + let budget = burst_gate_budget(67_000); + assert!(budget.as_millis() >= BURST_GATE_MIN_BUDGET_MS as u128); + assert!(budget.as_millis() <= ANTICIPATION_HARD_CAP_MS as u128); + } + + #[test] + fn burst_gate_budget_fast_workload() { + // xact_p99 = 700us. base = 1ms. Clamped to min 20ms. + let budget = burst_gate_budget(700); + assert_eq!(budget.as_millis(), BURST_GATE_MIN_BUDGET_MS as u128); + } + + #[test] + fn burst_gate_budget_clamp_range() { + for p99_us in [0, 500, 1000, 5000, 50_000, 200_000, 500_000] { + let budget = burst_gate_budget(p99_us); + assert!(budget.as_millis() >= BURST_GATE_MIN_BUDGET_MS as u128); + assert!(budget.as_millis() <= ANTICIPATION_HARD_CAP_MS as u128); + } + } + + // ------------------------------------------------------------------ + // anticipation_base_ms — additional edge cases + // ------------------------------------------------------------------ + + #[test] + fn anticipation_base_ms_u64_max_saturates() { + // saturating_mul(2) must not wrap on extreme input. + let result = anticipation_base_ms(u64::MAX); + // u64::MAX * 2 saturates to u64::MAX, then / 1000. + assert_eq!(result, u64::MAX / 1000); + } + + #[test] + fn anticipation_base_ms_one_microsecond() { + // 1us * 2 / 1000 = 0 (integer truncation). + assert_eq!(anticipation_base_ms(1), 0); + } + + #[test] + fn anticipation_base_ms_boundary_500us() { + // 500us * 2 / 1000 = 1ms exactly. + assert_eq!(anticipation_base_ms(500), 1); + } + + #[test] + fn anticipation_base_ms_boundary_499us() { + // 499us * 2 / 1000 = 998/1000 = 0 (truncated). + assert_eq!(anticipation_base_ms(499), 0); + } + + #[test] + fn anticipation_base_ms_hard_cap_boundary() { + // Find the input that produces exactly ANTICIPATION_HARD_CAP_MS. + // cap = 500ms, so base = 500 when xact_p99_us = 250_000. + assert_eq!(anticipation_base_ms(250_000), 500); + assert_eq!(anticipation_base_ms(250_000), ANTICIPATION_HARD_CAP_MS); + } + + // ------------------------------------------------------------------ + // Jitter + clamp pipeline — exhaustive range invariant + // ------------------------------------------------------------------ + + #[test] + fn anticipation_jitter_clamp_always_in_bounds() { + // For a wide range of xact_p99 values (including extreme ones), + // the full jitter + clamp pipeline must always produce a result + // in [ANTICIPATION_MIN_BUDGET_MS, ANTICIPATION_HARD_CAP_MS]. + // Run each value multiple times to exercise jitter randomness. + let inputs = [ + 0, + 1, + 10, + 100, + 499, + 500, + 501, + 1_000, + 2_500, + 5_000, + 10_000, + 25_000, + 50_000, + 100_000, + 200_000, + 250_000, + 300_000, + 500_000, + 1_000_000, + u64::MAX / 2, + u64::MAX, + ]; + for &p99_us in &inputs { + for _ in 0..20 { + let base_ms = anticipation_base_ms(p99_us); + let jitter_range = (base_ms / 5).max(1); + let jitter = rand::rng().random_range(0..=jitter_range * 2); + let clamped = (base_ms.saturating_sub(jitter_range) + jitter) + .clamp(ANTICIPATION_MIN_BUDGET_MS, ANTICIPATION_HARD_CAP_MS); + assert!( + clamped >= ANTICIPATION_MIN_BUDGET_MS, + "p99_us={p99_us} base={base_ms} jitter={jitter}: result {clamped} < min {}", + ANTICIPATION_MIN_BUDGET_MS, + ); + assert!( + clamped <= ANTICIPATION_HARD_CAP_MS, + "p99_us={p99_us} base={base_ms} jitter={jitter}: result {clamped} > cap {}", + ANTICIPATION_HARD_CAP_MS, + ); + } + } + } + + #[test] + fn anticipation_jitter_clamp_zero_base_clamps_to_min() { + // When base_ms = 0 (from very small xact_p99), jitter_range = max(0/5, 1) = 1. + // min_val = 0 - 1 = saturates to 0. After clamp: ANTICIPATION_MIN_BUDGET_MS. + // max_val = 0 + 1 = 1. After clamp: max(1, 5) = 5. + // Both endpoints clamp to MIN_BUDGET_MS. + let base_ms = anticipation_base_ms(1); // = 0 + assert_eq!(base_ms, 0); + let jitter_range = (base_ms / 5).max(1); + let min_possible = base_ms + .saturating_sub(jitter_range) + .clamp(ANTICIPATION_MIN_BUDGET_MS, ANTICIPATION_HARD_CAP_MS); + let max_possible = (base_ms + jitter_range * 2) + .clamp(ANTICIPATION_MIN_BUDGET_MS, ANTICIPATION_HARD_CAP_MS); + assert_eq!(min_possible, ANTICIPATION_MIN_BUDGET_MS); + assert_eq!(max_possible, ANTICIPATION_MIN_BUDGET_MS); + } + + // ------------------------------------------------------------------ + // Semaphore invariant: return_object restores permits in both paths + // ------------------------------------------------------------------ + // + // ObjectInner requires a Server (live TCP stream), so we cannot call + // return_object directly. Instead we model its exact logic using the + // same primitives (Semaphore, Mutex, oneshot channels) and + // verify the semaphore permit count is conserved. + // + // The contract under test: + // 1. Handoff path (waiter present): send to waiter + add_permits(1) + // 2. Idle path (no waiter): push to vec + add_permits(1) + // 3. Both paths restore exactly one permit per return. + // + // The OLD bug: handoff path did NOT call add_permits(1), causing + // permanent permit drain. These tests would catch a regression. + + /// Model the return_object handoff path: waiter exists, send succeeds. + /// Verify the semaphore permit is restored. + #[tokio::test] + async fn semaphore_permit_restored_on_handoff() { + let max_size = 4; + let semaphore = Semaphore::new(max_size); + + // Simulate one connection checked out: acquire + forget. + let permit = semaphore.acquire().await.unwrap(); + permit.forget(); + assert_eq!(semaphore.available_permits(), max_size - 1); + + // Waiter registers (simulates a concurrent checkout). + let (tx, rx) = oneshot::channel::(); + let mut waiters: VecDeque> = VecDeque::new(); + waiters.push_back(tx); + + // Model return_object handoff path: + // pop waiter, send, then add_permits(1). + let sender = waiters.pop_front().unwrap(); + sender.send(42).unwrap(); + semaphore.add_permits(1); + + // The returning client's permit is restored. + assert_eq!(semaphore.available_permits(), max_size); + // The waiter received the connection. + assert_eq!(rx.await.unwrap(), 42); + } + + /// Model the return_object idle path: no waiters, push to vec. + /// Verify the semaphore permit is restored. + #[tokio::test] + async fn semaphore_permit_restored_on_idle_return() { + let max_size = 4; + let semaphore = Semaphore::new(max_size); + + // Simulate one connection checked out. + let permit = semaphore.acquire().await.unwrap(); + permit.forget(); + assert_eq!(semaphore.available_permits(), max_size - 1); + + // Model return_object idle path: + // no waiters -> push to idle vec + add_permits(1). + let waiters: VecDeque> = VecDeque::new(); + assert!(waiters.is_empty()); + semaphore.add_permits(1); + + assert_eq!(semaphore.available_permits(), max_size); + } + + /// After N handoffs, the semaphore must not drain. + /// This is the core regression test for the permit fix. + #[tokio::test] + async fn semaphore_does_not_drain_after_n_handoffs() { + let max_size = 4; + let semaphore = Semaphore::new(max_size); + + for iteration in 0..100 { + // Step 1: Client A checks out (acquire + forget). + let permit = semaphore.acquire().await.unwrap(); + permit.forget(); + + // Step 2: Client B waits (registers a oneshot waiter). + let (tx, rx) = oneshot::channel::(); + let mut waiters: VecDeque> = VecDeque::new(); + waiters.push_back(tx); + + // Step 3: Client B also acquires its own semaphore permit + // (this is what acquire_semaphore does in timeout_get). + let permit_b = semaphore.acquire().await.unwrap(); + permit_b.forget(); + + // Step 4: Client A returns (handoff to B). + // This models return_object: send to waiter + add_permits(1). + let sender = waiters.pop_front().unwrap(); + sender.send(iteration).unwrap(); + semaphore.add_permits(1); // Client A's permit restored + + // Step 5: Client B receives and eventually returns via idle path. + let _ = rx.await.unwrap(); + semaphore.add_permits(1); // Client B's permit restored + + // Invariant: all permits are back. + assert_eq!( + semaphore.available_permits(), + max_size, + "permit leak at iteration {iteration}" + ); + } + } + + /// Model the OLD (broken) handoff path that did NOT add_permits(1). + /// Each cycle: client A checks out (forget permit), returns via + /// handoff WITHOUT restoring the permit. One permit lost per cycle. + /// After max_size cycles every permit is gone. + #[test] + fn semaphore_drains_without_handoff_permit_restore() { + let max_size = 4; + let semaphore = Semaphore::new(max_size); + + for i in 0..max_size { + // Client A checks out: acquire + forget. + let permit_a = semaphore + .try_acquire() + .expect("must have permits at this point"); + permit_a.forget(); + + // OLD behavior: handoff sends but does NOT add_permits(1). + // semaphore.add_permits(1); // <-- missing in old code + + // Net: lost one permit (client A's). + assert_eq!( + semaphore.available_permits(), + max_size - (i + 1), + "iteration {i}: expected {} leaked permits", + i + 1, + ); + } + + // All permits are gone. + assert_eq!(semaphore.available_permits(), 0); + assert!(semaphore.try_acquire().is_err()); + } + + /// Full checkout-use-return cycle via handoff path. + /// Models: acquire_semaphore -> wrap_checkout(forget) -> return_object(handoff). + #[tokio::test] + async fn full_cycle_handoff_preserves_permits() { + let max_size = 8; + let semaphore = Semaphore::new(max_size); + + for _ in 0..50 { + // Phase 1: checkout — acquire permit, then forget it. + let permit = semaphore.acquire().await.unwrap(); + permit.forget(); + + // Phase 2: a waiter exists, handoff succeeds. + let (tx, _rx) = oneshot::channel::(); + let sent = tx.send(1).is_ok(); + assert!(sent); + + // Phase 3: return_object handoff path adds permit. + semaphore.add_permits(1); + } + + assert_eq!(semaphore.available_permits(), max_size); + } + + /// Full checkout-use-return cycle via idle path. + /// Models: acquire_semaphore -> wrap_checkout(forget) -> return_object(idle). + #[tokio::test] + async fn full_cycle_idle_preserves_permits() { + let max_size = 8; + let semaphore = Semaphore::new(max_size); + + for _ in 0..50 { + // Phase 1: checkout. + let permit = semaphore.acquire().await.unwrap(); + permit.forget(); + + // Phase 2: no waiters, return to idle. + semaphore.add_permits(1); + } + + assert_eq!(semaphore.available_permits(), max_size); + } + + /// Mixed handoff + idle returns must preserve permits. + #[tokio::test] + async fn mixed_handoff_and_idle_preserves_permits() { + let max_size = 8; + let semaphore = Semaphore::new(max_size); + + for i in 0..100 { + let permit = semaphore.acquire().await.unwrap(); + permit.forget(); + + if i % 3 == 0 { + // Handoff path: waiter exists. + let (tx, _rx) = oneshot::channel::(); + let _ = tx.send(1); + semaphore.add_permits(1); + } else if i % 3 == 1 { + // Handoff path: waiter dropped (timed out), falls through to idle. + let (tx, rx) = oneshot::channel::(); + drop(rx); + let failed = tx.send(1).is_err(); + assert!(failed); + // After skipping dead waiters, falls to idle path. + semaphore.add_permits(1); + } else { + // Idle path: no waiters. + semaphore.add_permits(1); + } + } + + assert_eq!(semaphore.available_permits(), max_size); + } + + // ------------------------------------------------------------------ + // pre_replace_one does NOT inflate the semaphore + // ------------------------------------------------------------------ + // + // pre_replace_one creates a new connection and pushes it to idle + // WITHOUT calling add_permits. The created connection sits in idle + // until a client checks it out via acquire_semaphore. If pre_replace_one + // incorrectly called add_permits, the semaphore would have more + // permits than max_size, allowing more concurrent checkouts than the + // pool can serve. + // + // We model the pre_replace_one contract: push to idle vec, bump + // slots.size, but do NOT touch the semaphore. + + #[tokio::test] + async fn pre_replace_does_not_inflate_semaphore() { + let max_size = 4; + let semaphore = Semaphore::new(max_size); + let initial_permits = semaphore.available_permits(); + + // Model pre_replace_one: creates a connection, pushes to idle, + // increments slots.size. No semaphore interaction. + // (In production code: slots.size += 1; push_idle(...)) + // The semaphore is intentionally untouched. + + // Simulate 3 pre-replacements. + for _ in 0..3 { + // pre_replace_one: only touches slots, not semaphore. + // Nothing here — the test asserts the semaphore stays flat. + } + + assert_eq!( + semaphore.available_permits(), + initial_permits, + "pre_replace_one must not inflate the semaphore" + ); + + // Verify that the semaphore still caps at max_size checkouts. + let mut held = Vec::new(); + for _ in 0..max_size { + held.push(semaphore.acquire().await.unwrap()); + } + assert_eq!(semaphore.available_permits(), 0); + // One more acquire must block. + let try_result = semaphore.try_acquire(); + assert!(try_result.is_err()); + } + + /// Verify that if pre_replace_one DID call add_permits, the + /// semaphore would exceed max_size — proving the invariant matters. + #[tokio::test] + async fn pre_replace_add_permits_would_inflate() { + let max_size = 4; + let semaphore = Semaphore::new(max_size); + + // Wrong behavior: pre_replace_one calls add_permits(1). + semaphore.add_permits(1); + + // Now the semaphore has max_size + 1 permits. + assert_eq!( + semaphore.available_permits(), + max_size + 1, + "add_permits(1) on pre-replace inflates the semaphore above max_size" + ); + + // This would allow max_size + 1 concurrent checkouts — a bug. + let mut held = Vec::new(); + for _ in 0..=max_size { + held.push(semaphore.acquire().await.unwrap()); + } + assert_eq!( + held.len(), + max_size + 1, + "inflated semaphore allows more checkouts than max_size" + ); + } + + // ------------------------------------------------------------------ + // Burst gate try_recv drain: no double add_permits + // ------------------------------------------------------------------ + // + // In acquire_burst_gate, after the tokio::select! completes, + // try_recv() may pull a late-arriving connection. This connection + // is pushed to idle WITHOUT calling return_object (which would + // add_permits again). The original return_object that sent to the + // oneshot channel already called add_permits(1), so calling it + // again would double-count. + + #[tokio::test] + async fn try_recv_drain_must_not_double_add_permits() { + let max_size = 4; + let semaphore = Semaphore::new(max_size); + + // Client A checks out. + let permit = semaphore.acquire().await.unwrap(); + permit.forget(); + assert_eq!(semaphore.available_permits(), max_size - 1); + + // Client B registers as a waiter. + let (tx, mut rx) = oneshot::channel::(); + + // Client A returns via handoff: send + add_permits(1). + tx.send(42).unwrap(); + semaphore.add_permits(1); + assert_eq!(semaphore.available_permits(), max_size); + + // The select! in burst gate finishes WITHOUT polling rx. + // try_recv() pulls the connection. + let value = rx.try_recv().unwrap(); + assert_eq!(value, 42); + + // The correct behavior: push to idle, do NOT call add_permits. + // (return_object already did it above.) + // If we incorrectly called add_permits again: + // semaphore.add_permits(1); // WRONG — would make permits = max_size + 1 + // The test verifies permits stay at max_size. + assert_eq!( + semaphore.available_permits(), + max_size, + "try_recv drain must not add extra permits" + ); + } + + // ------------------------------------------------------------------ + // Concurrent handoff + idle: permit conservation under contention + // ------------------------------------------------------------------ + + #[tokio::test] + async fn concurrent_returns_preserve_permits() { + use std::sync::Arc; + + let max_size = 16; + let semaphore = Arc::new(Semaphore::new(max_size)); + let tasks = 100; + + let mut handles = Vec::with_capacity(tasks); + for i in 0..tasks { + let sem = Arc::clone(&semaphore); + handles.push(tokio::spawn(async move { + // Checkout. + let permit = sem.acquire().await.unwrap(); + permit.forget(); + + // Yield to interleave with other tasks. + tokio::task::yield_now().await; + + // Return via handoff or idle. + if i % 2 == 0 { + let (tx, _rx) = oneshot::channel::(); + let _ = tx.send(1); + } + sem.add_permits(1); + })); + } + + for h in handles { + h.await.unwrap(); + } + + assert_eq!( + semaphore.available_permits(), + max_size, + "all permits must be restored after concurrent checkout-return cycles" + ); + } } diff --git a/src/prometheus/metrics.rs b/src/prometheus/metrics.rs index cbd64b08..9f39bda3 100644 --- a/src/prometheus/metrics.rs +++ b/src/prometheus/metrics.rs @@ -428,6 +428,7 @@ static POOL_SCALING_PREV: Lazy> = const POOL_SCALING_TOTAL_TYPES: &[&str] = &[ "creates_started", "burst_gate_waits", + "burst_gate_budget_exhausted", "anticipation_wakes_notify", "anticipation_wakes_timeout", "create_fallback", @@ -467,9 +468,13 @@ fn update_pool_scaling_metrics() { // Translate snapshot fields to (type_label, value) pairs and emit // them as monotonic counter deltas. Pools have unique (user, db) keys // so prev tracking is per (type, user, db). - let totals: [(&str, u64); 6] = [ + let totals: [(&str, u64); 7] = [ ("creates_started", snapshot.creates_started), ("burst_gate_waits", snapshot.burst_gate_waits), + ( + "burst_gate_budget_exhausted", + snapshot.burst_gate_budget_exhausted, + ), ( "anticipation_wakes_notify", snapshot.anticipation_wakes_notify, diff --git a/src/prometheus/mod.rs b/src/prometheus/mod.rs index a0a6b728..eaa9f689 100644 --- a/src/prometheus/mod.rs +++ b/src/prometheus/mod.rs @@ -392,6 +392,7 @@ pub(crate) static POOL_SCALING_TOTALS: Lazy = Lazy::new(|| { "Per-pool cumulative counters for the anticipation + bounded burst path. Types: \ creates_started (took a burst slot), \ burst_gate_waits (had to wait on a Notify), \ + burst_gate_budget_exhausted (adaptive timeout fired, stopped waiting for handoff), \ anticipation_wakes_notify (anticipation woke on idle return), \ anticipation_wakes_timeout (anticipation budget elapsed without return), \ create_fallback (anticipation did not avoid an allocation), \ diff --git a/src/stats/mod.rs b/src/stats/mod.rs index abd34e1b..9049c135 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -123,7 +123,7 @@ impl Reporter { use std::collections::hash_map::Entry; match CLIENT_STATS.write().entry(client_id) { Entry::Occupied(_) => { - warn!("Client stats: duplicate registration for client_id={client_id}"); + warn!("[#c{client_id}] duplicate stats registration, skipping (likely migrated client id collision)"); } Entry::Vacant(entry) => { entry.insert(stats); diff --git a/tests/bdd/features/pool-pressure.feature b/tests/bdd/features/pool-pressure.feature index 25b1c6a1..c9c2c1c7 100644 --- a/tests/bdd/features/pool-pressure.feature +++ b/tests/bdd/features/pool-pressure.feature @@ -89,3 +89,24 @@ Feature: Pool under sustained client pressure And cascade "recycle_resume_burst" max latency is bounded by 20x p50 And cascade "recycle_resume_quiet" reports zero errors And cascade "recycle_resume_quiet" creates_started is at least 1 + + @cascade-semaphore-conservation + Scenario: Semaphore permits are not drained by sustained direct-handoff traffic + # Regression test for the semaphore permit leak: prior to the fix, + # each return_object handoff permanently consumed one semaphore permit. + # After max_size handoffs the pool could not accept new timeout_get + # callers and stopped creating connections. The pool stabilized at + # whatever size it reached during cold start (3-4 out of 40). + # + # This scenario runs 200 clients against a pool of 10 for 5 seconds. + # Each client holds the connection for 5ms, returns it (triggering a + # handoff to the next waiter), then re-enters timeout_get with a new + # semaphore acquire. After 5 seconds of continuous handoff traffic the + # pool must still be at full size and a fresh client must be able to + # check out within 100ms — proving the semaphore has not drained. + Given internal pool with size 10 and mode transaction + When I run cascade load "semaphore_conservation" with 200 clients for 5 seconds holding 5 ms + Then cascade "semaphore_conservation" reports zero errors + And cascade "semaphore_conservation" p99 latency is below 600 ms + And cascade "semaphore_conservation" pool size is at least 10 + And cascade "semaphore_conservation" pool size is at most 10 diff --git a/tests/bdd/features/session-mode-stats.feature b/tests/bdd/features/session-mode-stats.feature new file mode 100644 index 00000000..06f6a4ff --- /dev/null +++ b/tests/bdd/features/session-mode-stats.feature @@ -0,0 +1,24 @@ +@pool @session-mode +Feature: Session mode statistics accuracy + Regression test for query_time and xact_time percentiles in session mode. + Without the fix, both metrics accumulate the entire session duration + instead of individual query/transaction time — producing p99 values + of 100+ seconds on long-lived sessions. + + Background: + Given PostgreSQL started with options "-c max_connections=200" and pg_hba.conf: + """ + host all all 127.0.0.1/32 trust + """ + + @session-query-time + Scenario: query_time reflects individual queries, not session duration + # 20 clients in session mode, each holding the connection for 5ms + # (simulating a short query), repeated for 3 seconds. Individual + # queries take ~5ms. If query_time accumulates, p99 will be in the + # seconds range. The 500ms bound is generous enough for scheduling + # noise but catches the 100-second accumulation bug. + Given internal pool with size 10 and mode session + When I run cascade load "session_query_check" with 20 clients for 3 seconds holding 5 ms + Then cascade "session_query_check" reports zero errors + And cascade "session_query_check" server query_p99 is below 500 ms diff --git a/tests/bdd/pool_bench_helper.rs b/tests/bdd/pool_bench_helper.rs index 8c6cf676..be4dd6b8 100644 --- a/tests/bdd/pool_bench_helper.rs +++ b/tests/bdd/pool_bench_helper.rs @@ -850,3 +850,25 @@ async fn cascade_iter_spread_bounded(world: &mut DoormanWorld, name: String, mul multiple, ); } + +/// Assert that server-side query_time p99 (from the address stats histogram) +/// is below a threshold. Catches session-mode bugs where query_time accumulates +/// the entire session duration instead of individual query time. +#[then(regex = r#"^cascade "([^"]+)" server query_p99 is below (\d+) ms$"#)] +async fn cascade_server_query_p99_below(world: &mut DoormanWorld, _name: String, limit_ms: u64) { + let pool = world + .internal_pool + .as_ref() + .expect("Internal pool must be set up") + .pool + .clone(); + let (_, _, _, query_p99_us) = pool.server_pool().address().stats.get_query_percentiles(); + let query_p99_ms = query_p99_us as f64 / 1_000.0; + assert!( + query_p99_ms < limit_ms as f64, + "server-side query_p99 {:.2}ms exceeded limit {}ms \ + (likely session-mode accumulation bug — query_time not reset per-query)", + query_p99_ms, + limit_ms, + ); +}