-
Notifications
You must be signed in to change notification settings - Fork 15
fix(core): add per-request timeout to Requester to catch silent fetch hangs #579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
4f65577
312fe8b
2d1eba8
5360ba8
4487187
c0487bc
235db2b
17fcce9
f1cb41c
9716b6d
f660549
5ff5209
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,8 @@ const { Delegate } = require('../../core'); | |
| const { FetchError } = require('../../errors'); | ||
| const { get } = require('../../assertions'); | ||
|
|
||
| const DEFAULT_REQUEST_TIMEOUT_MS = 60_000; | ||
|
|
||
| class Requester extends Delegate { | ||
| constructor(params) { | ||
| super(params); | ||
|
|
@@ -13,6 +15,30 @@ class Requester extends Delegate { | |
| this.delegateTypes.push(this.DLGT_INVALID_AUTH); | ||
| this.agent = get(params, 'agent', null); | ||
|
|
||
| // Per-attempt HTTP timeout. Without this the framework called fetch() | ||
| // with no AbortController and no timeout — a silently-hung TCP | ||
| // connection (server accepts but never responds) blocked the calling | ||
| // promise forever, cascading into stalled batches, stalled syncs, | ||
| // and worker-lambda timeouts. | ||
| // | ||
| // Configuration precedence: | ||
| // 1. Instance param: new Requester({ requestTimeoutMs: 30_000 }) | ||
| // 2. Class static: static requestTimeoutMs = 30_000 | ||
| // 3. Default: DEFAULT_REQUEST_TIMEOUT_MS (60s) | ||
| // | ||
| // Pass 0 (or null) to disable the timeout entirely — reserved for | ||
| // test doubles and documented long-running endpoints. | ||
| // Intentionally NOT using `get(params, ...)` here — the Frigg | ||
| // `get` helper throws RequiredPropertyError if the key is missing | ||
| // and no default is provided, which would collide with the fall- | ||
| // through to the class-level static override. | ||
| const instanceTimeout = params?.requestTimeoutMs; | ||
| this.requestTimeoutMs = | ||
| instanceTimeout !== undefined && instanceTimeout !== null | ||
| ? instanceTimeout | ||
| : this.constructor.requestTimeoutMs ?? | ||
| DEFAULT_REQUEST_TIMEOUT_MS; | ||
|
|
||
| // Allow passing in the fetch function | ||
| // Instance methods can use this.fetch without differentiating | ||
| this.fetch = get(params, 'fetch', fetch); | ||
|
|
@@ -48,52 +74,134 @@ class Requester extends Delegate { | |
|
|
||
| if (this.agent) options.agent = this.agent; | ||
|
|
||
| let response; | ||
| // Per-attempt timeout — fresh AbortController per call so the retry | ||
| // recursion (with its own backoff sleeps) always gets a clean | ||
| // signal. Timer is cleared in the finally block regardless of | ||
| // outcome. | ||
| const timeoutMs = this.requestTimeoutMs; | ||
| const controller = timeoutMs > 0 ? new AbortController() : null; | ||
| const timeoutHandle = controller | ||
| ? setTimeout(() => controller.abort(), timeoutMs) | ||
| : null; | ||
| const fetchOptions = controller | ||
| ? { ...options, signal: controller.signal } | ||
| : options; | ||
|
|
||
| // Timer must stay active through body consumption. node-fetch v2 | ||
| // resolves the fetch() promise when headers arrive, not when the | ||
| // body is fully read — so a server that sends headers and then | ||
| // stalls the body would still hang parsedBody() or | ||
| // FetchError.create()'s response.text() call. We clear the timer | ||
| // only after the body is fully consumed (success path) or | ||
| // deliberately before each recursive retry so the new attempt | ||
| // starts with its own fresh timer. | ||
| let timerCleared = false; | ||
| const clearRequestTimer = () => { | ||
| if (!timerCleared && timeoutHandle) { | ||
| clearTimeout(timeoutHandle); | ||
| timerCleared = true; | ||
| } | ||
| }; | ||
|
|
||
| try { | ||
| response = await this.fetch(encodedUrl, options); | ||
| } catch (e) { | ||
| if (e.code === 'ECONNRESET' && i < this.backOff.length) { | ||
| let response; | ||
| try { | ||
| response = await this.fetch(encodedUrl, fetchOptions); | ||
| } catch (e) { | ||
| // AbortController fires AbortError (name) / ETIMEDOUT-shaped | ||
| // errors (type on node-fetch) when we hit the timeout. No | ||
| // retry on timeout: a slow endpoint is a downstream problem, | ||
| // and each retry would wait another `timeoutMs` before giving | ||
| // up — amplifying the hang into a per-record multi-minute | ||
| // stall at batch scale. | ||
| const isTimeout = | ||
| e?.name === 'AbortError' || e?.type === 'aborted'; | ||
| if (e?.code === 'ECONNRESET' && i < this.backOff.length) { | ||
| clearRequestTimer(); | ||
| const delay = this.backOff[i] * 1000; | ||
| await new Promise((resolve) => | ||
| setTimeout(resolve, delay) | ||
| ); | ||
| return this._request(url, options, i + 1); | ||
| } | ||
| const fetchError = await FetchError.create({ | ||
| resource: encodedUrl, | ||
| init: options, | ||
| responseBody: isTimeout | ||
| ? `Request timed out after ${timeoutMs}ms` | ||
| : e, | ||
| }); | ||
| if (isTimeout) { | ||
| // Flag + machine-readable fields so callers can | ||
| // distinguish a timeout from a generic network error | ||
| // without parsing the message (which FetchError | ||
| // sanitizes outside of STAGE=dev). | ||
| fetchError.isTimeout = true; | ||
| fetchError.timeoutMs = timeoutMs; | ||
| } | ||
| throw fetchError; | ||
| } | ||
|
|
||
| const { status } = response; | ||
|
|
||
| // If the status is retriable and there are back off requests left, retry the request | ||
| if ((status === 429 || status >= 500) && i < this.backOff.length) { | ||
| clearRequestTimer(); | ||
| const delay = this.backOff[i] * 1000; | ||
| await new Promise((resolve) => setTimeout(resolve, delay)); | ||
| return this._request(url, options, i + 1); | ||
| } | ||
| throw await FetchError.create({ | ||
| resource: encodedUrl, | ||
| init: options, | ||
| responseBody: e, | ||
| }); | ||
| } | ||
| const { status } = response; | ||
|
|
||
| // If the status is retriable and there are back off requests left, retry the request | ||
| if ((status === 429 || status >= 500) && i < this.backOff.length) { | ||
| const delay = this.backOff[i] * 1000; | ||
| await new Promise((resolve) => setTimeout(resolve, delay)); | ||
| return this._request(url, options, i + 1); | ||
| } else if (status === 401) { | ||
| if (!this.isRefreshable || this.refreshCount > 0) { | ||
| await this.notify(this.DLGT_INVALID_AUTH); | ||
| } else { | ||
| this.refreshCount++; | ||
| const refreshSucceeded = await this.refreshAuth(); | ||
| if (refreshSucceeded) { | ||
| return this._request(url, options, i + 1); | ||
| } else if (status === 401) { | ||
| if (!this.isRefreshable || this.refreshCount > 0) { | ||
| await this.notify(this.DLGT_INVALID_AUTH); | ||
| } else { | ||
| this.refreshCount++; | ||
| const refreshSucceeded = await this.refreshAuth(); | ||
| if (refreshSucceeded) { | ||
| clearRequestTimer(); | ||
| return this._request(url, options, i + 1); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // If the error wasn't retried, throw. | ||
| if (status >= 400) { | ||
| throw await FetchError.create({ | ||
| resource: encodedUrl, | ||
| init: options, | ||
| response, | ||
| }); | ||
| // If the error wasn't retried, throw. FetchError.create reads | ||
| // the response body (response.text()) — timer must still be | ||
| // alive to catch a stalled body stream. | ||
| if (status >= 400) { | ||
| const fetchError = await FetchError.create({ | ||
| resource: encodedUrl, | ||
| init: options, | ||
| response, | ||
| }); | ||
| throw this._maybeFlagTimeoutDuringBodyRead( | ||
| fetchError, | ||
| timeoutMs | ||
| ); | ||
| } | ||
|
|
||
| // parsedBody consumes the response body stream. If the server | ||
| // stalls mid-stream the timer (still armed) aborts it. | ||
| return options.returnFullRes | ||
| ? response | ||
| : await this.parsedBody(response); | ||
| } catch (e) { | ||
| // If the abort fired during body consumption, node-fetch emits | ||
| // the error as an AbortError on the body stream. Surface the | ||
| // same isTimeout flag callers use for header-phase timeouts. | ||
| throw this._maybeFlagTimeoutDuringBodyRead(e, timeoutMs); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If a 4xx/5xx response stalls while Useful? React with 👍 / 👎. |
||
| } finally { | ||
| clearRequestTimer(); | ||
| } | ||
| } | ||
|
|
||
| return options.returnFullRes | ||
| ? response | ||
| : await this.parsedBody(response); | ||
| _maybeFlagTimeoutDuringBodyRead(err, timeoutMs) { | ||
| if (!err || typeof err !== 'object') return err; | ||
| if (err.isTimeout) return err; | ||
| const isAbort = | ||
| err.name === 'AbortError' || err.type === 'aborted'; | ||
| if (!isAbort) return err; | ||
| err.isTimeout = true; | ||
| err.timeoutMs = timeoutMs; | ||
| return err; | ||
| } | ||
|
|
||
| async _get(options) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nulltimeout override as disabledThe constructor currently treats
requestTimeoutMs: nullas “use class/default timeout” becausenullis excluded byinstanceTimeout !== undefined && instanceTimeout !== null. That contradicts the new in-code API note sayingnulldisables timeouts, so integrations passingnullfrom config to opt out will still time out unexpectedly. Please align the condition with the documented behavior (or update docs/tests if fallback is intended).Useful? React with 👍 / 👎.