fix(task-graph): harden CacheRef, canonicalize binary format, byte-bounded backpressure#557
Open
sroussey wants to merge 3 commits into
Open
fix(task-graph): harden CacheRef, canonicalize binary format, byte-bounded backpressure#557sroussey wants to merge 3 commits into
sroussey wants to merge 3 commits into
Conversation
…ly collisions
`CacheRef` was discriminated by shape alone: any object with `{ $ref: string }`
satisfied `isCacheRef`, including JSON-Schema `$ref` pointers embedded in
metadata. The cache-ref resolver walks task outputs and calls
`getOutputByRef(ref)` on every match — so any code path that surfaces an
attacker-influenced `{$ref: "cache://OTHER_RUN/secret"}` shape (e.g. a tool
result, an AI structured-output field, a parsed-JSON document) could trick
`resolveJobOutput` / `resolveOutput` into reading bytes from another run or
tenant's private cache slot.
This patch adds a literal `kind: "task-graph/CacheRef"` brand discriminator
that:
- survives JSON serialization across queue rows / IPC (Symbol-based brands
would be erased by `JSON.stringify` and break cross-process resolution);
- is checked by `isCacheRef` alongside the `$ref` string;
- is applied uniformly by a new `makeCacheRef(...)` helper that callers
use to construct refs.
`CacheCoordinator.getBinaryRefSinksByPolicy` and
`RunPrivateCacheRepo.saveOutputStream` now defensively re-wrap the value
returned by legacy backings (`isCacheRef(raw) ? raw : makeCacheRef(raw)`),
so a backing that pre-dates the brand still produces a discriminator-bearing
ref when seen through the framework. In-tree test repositories and callers
are updated to use `makeCacheRef`.
Test coverage:
- `CacheRef.test.ts` now expects shape-only `{$ref: string}` to be rejected
and exercises JSON round-trip preserving the brand.
- `resolveOutput.test.ts` adds a case where a JSON-Schema-shaped
`{schema: {$ref: "#/\$defs/Foo"}}` is left untouched and the resolver is
never called (identity preserved).
- `resolveJobOutput.test.ts` adds the cross-tenant attack case: an
attacker-supplied `{note: {\$ref: "cache://OTHER_RUN/secret"}}` never
invokes `getOutputByRef`.
…b"|"binary"
`materializeBinary` previously accepted any string and silently coerced
unknown values (including casing typos like `"Blob"`) to the ArrayBuffer
branch. A task author writing `format: "Blob"` would unknowingly produce an
ArrayBuffer where every downstream consumer expected a Blob — and the
mismatch only surfaced at the consumer (often as a misleading runtime
error during streaming, or worse, silent data corruption when the consumer
duck-typed both shapes).
This patch establishes a canonical `BinaryFormat = "blob" | "binary"` type
and routes every binary-port consumer through a single
`assertBinaryFormat(schema, port)` helper:
- `undefined` and `"blob"` resolve to `"blob"` (the documented default);
- `"binary"` resolves to `"binary"`;
- anything else throws with the allowed vocabulary in the message.
`materializeBinary` now takes the canonical `BinaryFormat` directly and
`StreamProcessor` / `CacheCoordinator.hydrateRefsBelowThreshold` both call
`assertBinaryFormat` before invoking it.
`TaskRegistry.registerTask` runs the same check at registration time over
every output port with `x-stream: "binary"`, so the typo fails near the
task definition rather than during a streaming run. The task is not added
to the registry when the check fails.
Test coverage:
- `StreamBinaryTypes.test.ts` replaces the now-removed "unknown format =
binary" behavior with `assertBinaryFormat` cases for `"blob"`,
`"binary"`, undefined-default, the casing typo `"Blob"`, and an unknown
value (`"wat"`).
- `TaskRegistry.test.ts` adds cases asserting registration throws on a
binary port with `format: "Blob"`, and succeeds on `"blob"` /
`"binary"`.
- `Spec2QueueRowAndRehydrate.test.ts` adds symmetric rehydration cases:
`format: "blob"` rehydrates into a `Blob`, `format: "binary"` into an
`ArrayBuffer`.
…efault 8 MiB)
The streaming binary router buffered chunks without bound. A fast producer
(e.g. an AI image / audio generator yielding 1 MiB chunks) feeding a slow
sink (remote object store, throttled FS) would let the producer race ahead
and accumulate the entire payload in memory before the sink saw the first
chunk — turning a notionally O(1) streaming path into peak-residency O(N).
The old comment even acknowledged the issue ("backpressure: there is none")
and offloaded the problem onto the sink author.
This patch:
- Introduces `DEFAULT_BINARY_HIGH_WATER_BYTES = 8 MiB` in `StreamTypes.ts`.
- `BinaryStreamRouter` now tracks `bufferedBytes` (sum of un-consumed
chunk sizes). `push(chunk)` returns a Promise that resolves
immediately while `bufferedBytes < highWaterMarkBytes`, and parks the
producer until the consumer drains under the mark otherwise. `end()`
and `fail()` BOTH release any parked producer so an abort mid-park
does not leak the Promise.
- `StreamProcessor` `await router.push(...)` on every `binary-delta`
yield, so the byte-bounded backpressure applies for tasks running
through the standard streaming path.
- `IRunConfig.binaryHighWaterBytes` lets callers override per-run.
Threaded through `TaskRunner` → `StreamProcessor.run` deps.
- `IExecuteContext.binaryBackpressure?: () => Promise<void>` is a
cooperative hook for tasks that emit via a side channel and cannot
use the awaited `push` path; the StreamProcessor and StreamPump
install router-aware implementations, and an absent runtime supplies
a no-op (free for tasks that don't call it).
- `StreamPump.pipeBinaryToCache` (the EventEmitter path used for the
cache-ingest tee) gets the same byte-counted queue and returns a
`backpressure()` function alongside `promise` / `detach`.
Test coverage in `StreamingBackpressure.test.ts` adds a "binary
backpressure" describe block:
- 100 × 1 MiB through a slow (50 ms / chunk) sink with a 4 MiB
high-water mark: peak buffer stays at or below `mark + 1 chunk`
and every byte is delivered.
- End-to-end: 100 MiB through `StreamProcessor.run` with the same
high-water mark, asserting full delivery without drops.
- Abort-while-parked: a producer parked at the high-water mark sees
its `push()` Promise settle within 100 ms of `r.end()`.
Coverage Report
File CoverageNo changed files found. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Three follow-up fixes for the binary-streaming framework + result-as-reference work in #545. Targets
claude/stoic-bell-RLJWTso the patches merge with the rest of that PR.Fix 1 (CRITICAL) — Brand
CacheRefwith a literalkindFiles:
packages/task-graph/src/cache/CacheRef.ts,packages/task-graph/src/task/CacheCoordinator.ts,packages/task-graph/src/cache/RunPrivateCacheRepo.ts+ test updates.CacheRefwas discriminated by shape alone: any object with{$ref: string}satisfiedisCacheRef. The cache-ref resolver walks task outputs and callsgetOutputByRef(ref)on every match — so any code path surfacing an attacker-influenced{$ref: "cache://OTHER_RUN/secret"}(tool result, parsed JSON, AI structured output, embedded JSON-Schema reference) could trickresolveJobOutput/resolveOutputinto reading bytes from another run or tenant's private cache slot.Adds a literal
kind: "task-graph/CacheRef"brand (a string, not a Symbol — so it survives JSON round-trip across queue rows / IPC), checked alongside$refinisCacheRef. NewmakeCacheRef(...)helper constructs branded refs;CacheCoordinator.getBinaryRefSinksByPolicyandRunPrivateCacheRepo.saveOutputStreamdefensively re-wrap legacy backings.Fix 2 (HIGH) — Canonical
BinaryFormatvocabularyFiles:
packages/task-graph/src/task/StreamTypes.ts,StreamProcessor.ts,CacheCoordinator.ts,TaskRegistry.ts+ test updates.materializeBinarypreviously accepted any string and silently coerced unknown values (including casing typos like"Blob") to the ArrayBuffer branch — aformat: "Blob"mistake produced ArrayBuffers where every downstream consumer expected Blobs, with the mismatch only surfacing far from the task definition.Introduces
BinaryFormat = "blob" | "binary"and a singleassertBinaryFormat(schema, port)helper used byStreamProcessor,CacheCoordinator.hydrateRefsBelowThreshold, andTaskRegistry.registerTask. Invalidformaton anx-stream: "binary"port now throws at registration time, not during a streaming run.Fix 3 (HIGH) — Byte-bounded backpressure (default 8 MiB)
Files:
packages/task-graph/src/task/StreamProcessor.ts(BinaryStreamRouter),StreamTypes.ts,task-graph/StreamPump.ts(pipeBinaryToCache),ITask.ts,TaskRunner.ts+ test updates.BinaryStreamRouterbuffered chunks without bound. A fast producer (AI image / audio generator yielding 1 MiB chunks) feeding a slow sink (remote object store, throttled FS) would race ahead and accumulate the whole payload in memory before the sink saw the first chunk.Adds
DEFAULT_BINARY_HIGH_WATER_BYTES = 8 MiBand convertspush(chunk)to a Promise that parks the producer oncebufferedBytes >= highWaterMarkBytes.end()andfail()release any parked producer so abort mid-park does not leak the Promise.IRunConfig.binaryHighWaterBytesallows per-run override.IExecuteContext.binaryBackpressure?: () => Promise<void>lets tasks emitting via a side channel cooperate; absent runtime supplies a no-op.StreamPump.pipeBinaryToCachegets the same byte-counted queue and exposes abackpressure()callable.Test plan
claude/stoic-bell-RLJWT)bun run build:packagesclean across all 71 turbo tasksbun scripts/test.ts task vitest— 1117 pass / 24 skipped (no regressions)isCacheRefrejects shape-only{$ref: string};resolveOutputdoes not walk JSON-Schema refs;resolveJobOutputnever callsgetOutputByReffor attacker-supplied unbranded shapes;assertBinaryFormatthrows for"Blob"/"wat"/ accepts"blob"/"binary"/ undefined;TaskRegistry.registerTaskrejectsformat: "Blob"on a binary port; symmetricBlob/ArrayBufferrehydration; 100 × 1 MiB through a 50 ms / chunk sink keeps peak buffer ≤ 4 MiB + 1 chunk; abort-while-parked settles within 100 ms; 100 MiB end-to-end throughStreamProcessor.runwith backpressure delivers every byte.Follow-up to #545.
Generated by Claude Code