diff --git a/packages/job-queue/src/job/Job.ts b/packages/job-queue/src/job/Job.ts index c5a6b00ce..fdab8308d 100644 --- a/packages/job-queue/src/job/Job.ts +++ b/packages/job-queue/src/job/Job.ts @@ -6,7 +6,7 @@ import { JobStatus } from "../queue-storage/IQueueStorage"; import { JobError } from "./JobError"; -import type { JobProgressListener } from "./JobQueueEventListeners"; +import type { JobProgressListener, StreamEventLike } from "./JobQueueEventListeners"; export { JobStatus }; @@ -20,6 +20,12 @@ export interface IJobExecuteContext { message?: string, details?: Record | null ) => Promise; + /** + * OPTIONAL. Present only when the worker's transport can deliver stream + * events. Jobs MUST NOT retain references to chunk buffers after calling + * this (buffers may be transferred across a worker boundary and detached). + */ + emitStreamEvent?: (event: StreamEventLike) => void; } /** diff --git a/packages/job-queue/src/job/JobQueueClient.ts b/packages/job-queue/src/job/JobQueueClient.ts index 2ae48100e..2ae4b142e 100644 --- a/packages/job-queue/src/job/JobQueueClient.ts +++ b/packages/job-queue/src/job/JobQueueClient.ts @@ -26,6 +26,8 @@ import { JobQueueEventListeners, JobQueueEventParameters, JobQueueEvents, + JobStreamListener, + type StreamEventLike, } from "./JobQueueEventListeners"; import type { JobQueueServer } from "./JobQueueServer"; import { storageToClass } from "./JobStorageConverters"; @@ -38,6 +40,12 @@ export interface JobHandle { waitFor(): Promise; abort(): Promise; onProgress(callback: JobProgressListener): () => void; + /** + * OPTIONAL — present only when this handle's transport can deliver stream + * events (a same-process server-attached queue). Absent on storage-only + * backends; callers branch on `typeof handle.onStream === "function"`. + */ + onStream?(callback: JobStreamListener): () => void; } /** @@ -78,6 +86,11 @@ export class JobQueueClient { */ protected readonly jobProgressListeners: Map> = new Map(); + /** + * Map of job IDs to their stream listeners + */ + protected readonly jobStreamListeners: Map> = new Map(); + /** * Last known progress state for each job */ @@ -391,6 +404,27 @@ export class JobQueueClient { }; } + /** + * Subscribe to stream events for a specific job + */ + public onJobStream(jobId: unknown, listener: JobStreamListener): () => void { + if (!this.jobStreamListeners.has(jobId)) { + this.jobStreamListeners.set(jobId, new Set()); + } + const listeners = this.jobStreamListeners.get(jobId)!; + listeners.add(listener); + + return () => { + const listeners = this.jobStreamListeners.get(jobId); + if (listeners) { + listeners.delete(listener); + if (listeners.size === 0) { + this.jobStreamListeners.delete(jobId); + } + } + }; + } + // ======================================================================== // Event handling // ======================================================================== @@ -524,23 +558,54 @@ export class JobQueueClient { } } + /** + * Called by server when a job emits a stream event. Listener throws are + * isolated per-listener — one misbehaving subscriber does not interrupt + * delivery to the rest or abort the dispatch itself. + * @internal + */ + public handleJobStream(jobId: unknown, event: StreamEventLike): void { + this.events.emit("job_stream", this.queueName, jobId, event); + + const listeners = this.jobStreamListeners.get(jobId); + if (!listeners) return; + for (const listener of listeners) { + try { + listener(event); + } catch (err) { + getLogger().error("JobHandle.onStream listener threw", { + jobId, + error: err instanceof Error ? err.message : String(err), + }); + } + } + } + // ======================================================================== // Private helpers // ======================================================================== private createJobHandle(id: unknown): JobHandle { - return { + const handle: JobHandle = { id, waitFor: () => this.waitFor(id), abort: () => this.abort(id), onProgress: (callback: JobProgressListener) => this.onJobProgress(id, callback), }; + // Stream delivery requires a same-process server-attached transport — the + // same signal `connect()` uses. Storage-only backends omit `onStream`, so + // callers branch on `typeof handle.onStream === "function"`. + if (this.server) { + handle.onStream = (callback: JobStreamListener) => this.onJobStream(id, callback); + } + return handle; } private cleanupJob(jobId: unknown): void { this.activeJobPromises.delete(jobId); this.lastKnownProgress.delete(jobId); this.jobProgressListeners.delete(jobId); + this.jobStreamListeners.delete(jobId); } private handleStorageChange(change: QueueChangePayload): void { diff --git a/packages/job-queue/src/job/JobQueueEventListeners.ts b/packages/job-queue/src/job/JobQueueEventListeners.ts index ad3b7c457..18913f196 100644 --- a/packages/job-queue/src/job/JobQueueEventListeners.ts +++ b/packages/job-queue/src/job/JobQueueEventListeners.ts @@ -25,6 +25,7 @@ export type JobQueueEventListeners = { message: string, details: Record | null ) => void; + job_stream: (queueName: string, jobId: unknown, event: StreamEventLike) => void; }; export type JobQueueEvents = keyof JobQueueEventListeners; @@ -46,3 +47,16 @@ export type JobProgressListener = ( message: string, details: Record | null ) => void; + +/** + * Minimal structural shape of a stream event crossing the job-queue boundary. + * + * `@workglow/job-queue` sits below `@workglow/task-graph` in the dependency + * graph, so it cannot import task-graph's `StreamEvent`. This structural type + * captures just what the queue plumbing needs; task-graph's `StreamEvent` is + * assignable to it, so real stream producers interoperate transparently. + */ +export type StreamEventLike = { type: string; port?: string; [k: string]: unknown }; + +/** Listener for cross-process stream events emitted by an executing job. */ +export type JobStreamListener = (event: StreamEventLike) => void; diff --git a/packages/job-queue/src/job/JobQueueServer.ts b/packages/job-queue/src/job/JobQueueServer.ts index b26df7028..72df5af96 100644 --- a/packages/job-queue/src/job/JobQueueServer.ts +++ b/packages/job-queue/src/job/JobQueueServer.ts @@ -13,6 +13,7 @@ import type { JobStorageFormat, QueueChangePayload } from "../queue-storage/IQue import { JobStatus } from "../queue-storage/IQueueStorage"; import type { DeadLetter } from "./DeadLetter"; import { Job, JobClass } from "./Job"; +import type { StreamEventLike } from "./JobQueueEventListeners"; import { JobQueueClient } from "./JobQueueClient"; import { JobQueueWorker } from "./JobQueueWorker"; import { classToStorage, storageToClass } from "./JobStorageConverters"; @@ -49,6 +50,7 @@ export type JobQueueServerEventListeners = { message: string, details: Record | null ) => void; + job_stream: (queueName: string, jobId: unknown, event: StreamEventLike) => void; }; export type JobQueueServerEvents = keyof JobQueueServerEventListeners; @@ -479,6 +481,11 @@ export class JobQueueServer< this.forwardToClients("handleJobProgress", jobId, progress, message, details); }); + worker.on("job_stream", (jobId, event) => { + this.events.emit("job_stream", this.queueName, jobId, event); + this.forwardToClients("handleJobStream", jobId, event); + }); + return worker; } @@ -502,6 +509,11 @@ export class JobQueueServer< message: string, details: Record | null ): void; + protected forwardToClients( + method: "handleJobStream", + jobId: unknown, + event: StreamEventLike + ): void; protected forwardToClients(method: string, ...args: unknown[]): void { for (const client of this.clients) { const fn = (client as any)[method]; diff --git a/packages/job-queue/src/job/JobQueueWorker.ts b/packages/job-queue/src/job/JobQueueWorker.ts index 8b740f55a..5f726346c 100644 --- a/packages/job-queue/src/job/JobQueueWorker.ts +++ b/packages/job-queue/src/job/JobQueueWorker.ts @@ -31,6 +31,7 @@ import { RetryableJobError, } from "./JobError"; import { withJobErrorDiagnostics } from "./JobErrorDiagnostics"; +import type { StreamEventLike } from "./JobQueueEventListeners"; import { classToStorage, storageToClass } from "./JobStorageConverters"; /** @@ -56,6 +57,7 @@ export type JobQueueWorkerEventListeners = { message: string, details: Record | null ) => void; + job_stream: (jobId: unknown, event: StreamEventLike) => void; worker_start: () => void; worker_stop: () => void; }; @@ -812,6 +814,7 @@ export class JobQueueWorker< return await job.execute(job.input, { signal, updateProgress: this.updateProgress.bind(this, job.id), + emitStreamEvent: (event) => this.emitStreamEvent(job.id, event), }); } @@ -833,6 +836,17 @@ export class JobQueueWorker< this.events.emit("job_progress", jobId, progress, message, details); } + /** + * Emit a cross-process stream event for a job. + * + * Mirrors {@link updateProgress}: stream events are delivered in-memory via + * the `job_stream` event and forwarded by an attached `JobQueueServer` to + * subscribed clients. Storage is not touched. + */ + protected emitStreamEvent(jobId: unknown, event: StreamEventLike): void { + this.events.emit("job_stream", jobId, event); + } + /** Internal — resolve the active claim for a job id, throw if missing. */ private getClaim(jobId: unknown): IClaim> | undefined { return this.activeClaims.get(jobId); diff --git a/packages/task-graph/src/cache/CacheRef.ts b/packages/task-graph/src/cache/CacheRef.ts new file mode 100644 index 000000000..20222a48e --- /dev/null +++ b/packages/task-graph/src/cache/CacheRef.ts @@ -0,0 +1,57 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * A reference to bytes that live in the configured cache backing rather than + * inline in a task `Output`. Emitted by `TaskRunner` for binary output ports + * whose committed size meets the `IRunConfig.referenceThresholdBytes` and + * whose cache backing implements `saveOutputStream`. + * + * `$ref` is opaque to consumers: only the cache backing knows how to translate + * it back into bytes. `size` and `mime` are best-effort hints populated when + * known at finish time; absent values do not imply unknown failure. + * + * Resolution is best-effort: the cache backing's TTL is the lifetime contract, + * and `resolveOutputRef` returns `undefined` when the underlying entry has + * been evicted. + */ +export type CacheRef = { + readonly $ref: string; + readonly size?: number; + readonly mime?: string; +}; + +/** + * Narrow an unknown value to {@link CacheRef}. The discriminator is a `$ref` + * property of type `string`; other fields are optional and not inspected. + */ +export function isCacheRef(value: unknown): value is CacheRef { + if (typeof value !== "object" || value === null) return false; + const candidate = value as { readonly $ref?: unknown }; + return typeof candidate.$ref === "string"; +} + +/** + * Default threshold (in bytes) at which a binary output port becomes a + * {@link CacheRef} instead of being inlined in `Output`. Below this size, the + * runner inlines the bytes; at or above, it emits a reference. + * + * `0` is a sentinel meaning "always emit a reference" and is honored by the + * runtime path (a callsite that wants to force refs sets `0` explicitly via + * `IRunConfig.referenceThresholdBytes`). + */ +export const REFERENCE_THRESHOLD_BYTES_DEFAULT = 65_536; + +/** + * Resolve the effective reference threshold for a run, falling back to + * {@link REFERENCE_THRESHOLD_BYTES_DEFAULT} when unset. A negative value is + * treated as the default (negative thresholds are nonsensical). + */ +export function resolveReferenceThreshold(threshold: number | undefined): number { + if (threshold === undefined) return REFERENCE_THRESHOLD_BYTES_DEFAULT; + if (threshold < 0) return REFERENCE_THRESHOLD_BYTES_DEFAULT; + return threshold; +} diff --git a/packages/task-graph/src/cache/RunPrivateCacheRepo.ts b/packages/task-graph/src/cache/RunPrivateCacheRepo.ts index 9b0e137b9..33e7b8bf0 100644 --- a/packages/task-graph/src/cache/RunPrivateCacheRepo.ts +++ b/packages/task-graph/src/cache/RunPrivateCacheRepo.ts @@ -6,6 +6,7 @@ import { TaskOutputRepository } from "../storage/TaskOutputRepository"; import type { TaskInput, TaskOutput } from "../task/TaskTypes"; +import type { CacheRef } from "./CacheRef"; export interface RunPrivateCacheRepoOptions { backing: TaskOutputRepository; @@ -43,6 +44,20 @@ export class RunPrivateCacheRepo extends TaskOutputRepository { super({ outputCompression: backing.outputCompression }); this.backing = backing; this.runId = runId; + // Mirror the backing's optional-method shape on this instance so callers + // probing `typeof repo.saveOutputStream === "function"` (or the + // getOutputByRef/getOutputStreamByRef siblings) see the true capability + // instead of the always-present wrapper override. Class methods live on + // the prototype; assigning `undefined` on the instance shadows them. + if (typeof backing.saveOutputStream !== "function") { + (this as { saveOutputStream?: unknown }).saveOutputStream = undefined; + } + if (typeof backing.getOutputByRef !== "function") { + (this as { getOutputByRef?: unknown }).getOutputByRef = undefined; + } + if (typeof backing.getOutputStreamByRef !== "function") { + (this as { getOutputStreamByRef?: unknown }).getOutputStreamByRef = undefined; + } } /** @@ -86,6 +101,55 @@ export class RunPrivateCacheRepo extends TaskOutputRepository { return this.backing.getOutput(this.ns(cacheIdentity), inputs); } + /** + * Forwards the streaming sink to the backing repository, applying the same + * `runId` namespacing as `saveOutput`. Only present in effect when the + * backing repo supports streaming; `supportsStreaming()` (below) reflects the + * backing repo so callers branch correctly before calling this. + * + * Returns whatever {@link CacheRef} the backing produced (already namespaced + * via the wrapped `taskType`). Resolvers calling `getOutputByRef` on this + * wrapper forward to the backing, which decodes its own `$ref`. + */ + public override saveOutputStream( + taskType: string, + inputs: TaskInput, + chunks: AsyncIterable, + metadata: Record + ): Promise { + const fn = this.backing.saveOutputStream; + if (typeof fn !== "function") { + return Promise.reject( + new Error( + `RunPrivateCacheRepo: backing repository does not implement saveOutputStream. ` + + `Call supportsStreaming() before saveOutputStream.` + ) + ); + } + return fn.call(this.backing, this.ns(taskType), inputs, chunks, metadata); + } + + /** + * Forwards by-ref retrieval to the backing repository. The `$ref` already + * encodes whatever the backing needs to locate the entry; no namespacing is + * re-applied here. + */ + public override getOutputByRef(ref: CacheRef): Promise { + if (typeof this.backing.getOutputByRef !== "function") return Promise.resolve(undefined); + return this.backing.getOutputByRef(ref); + } + + /** Forwards streaming by-ref retrieval to the backing repository. */ + public override getOutputStreamByRef(ref: CacheRef): AsyncIterable | undefined { + if (typeof this.backing.getOutputStreamByRef !== "function") return undefined; + return this.backing.getOutputStreamByRef(ref); + } + + /** Mirrors the backing repository's streaming capability. */ + public override supportsStreaming(): boolean { + return this.backing.supportsStreaming(); + } + /** * Override of `TaskOutputRepository.clear()` that only deletes entries * namespaced under THIS wrapper's `runId`. Entries from other runs are not diff --git a/packages/task-graph/src/cache/index.ts b/packages/task-graph/src/cache/index.ts index ac1518d67..708d1c5de 100644 --- a/packages/task-graph/src/cache/index.ts +++ b/packages/task-graph/src/cache/index.ts @@ -6,5 +6,8 @@ export * from "./CacheJanitor"; export * from "./CachePolicy"; +export * from "./CacheRef"; export * from "./CacheRegistry"; +export * from "./resolveJobOutput"; +export * from "./resolveRef"; export * from "./RunPrivateCacheRepo"; diff --git a/packages/task-graph/src/cache/resolveJobOutput.ts b/packages/task-graph/src/cache/resolveJobOutput.ts new file mode 100644 index 000000000..82b8ee704 --- /dev/null +++ b/packages/task-graph/src/cache/resolveJobOutput.ts @@ -0,0 +1,55 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { CacheRef } from "./CacheRef"; +import type { CacheRefResolver, ResolveOutputOptions } from "./resolveRef"; +import { resolveOutput } from "./resolveRef"; + +/** + * Structural type matching `@workglow/job-queue`'s `JobHandle`. Declared + * locally so this module doesn't have to import from job-queue (avoiding a + * runtime dependency edge for a structural shape). + */ +export interface JobHandleLike { + waitFor(): Promise; +} + +/** + * Carrier of the resolver. Two-shape input: either a {@link CacheRefResolver} + * function directly, or anything with a `getOutputByRef` method (the shape + * `TaskOutputRepository` exposes). + */ +export type RefBacking = + | CacheRefResolver + | { readonly getOutputByRef?: (ref: CacheRef) => Promise }; + +/** + * Await a job's completion and hydrate every {@link CacheRef} inside its + * `Output` to inline bytes via the supplied backing. The backing can be a + * raw resolver function or any object exposing `getOutputByRef` (e.g. a + * `TaskOutputRepository`). + * + * On cache miss the placeholder is replaced by `undefined` (best-effort + * resolution). Backings that don't implement `getOutputByRef` + * leave every ref in place. + */ +export async function resolveJobOutput( + handle: JobHandleLike, + backing: RefBacking, + options?: ResolveOutputOptions +): Promise { + const output = await handle.waitFor(); + const resolver = asResolver(backing); + if (resolver === undefined) return output; + return resolveOutput(output, resolver, options); +} + +function asResolver(backing: RefBacking): CacheRefResolver | undefined { + if (typeof backing === "function") return backing; + const get = backing.getOutputByRef; + if (typeof get !== "function") return undefined; + return (ref) => get.call(backing, ref); +} diff --git a/packages/task-graph/src/cache/resolveRef.ts b/packages/task-graph/src/cache/resolveRef.ts new file mode 100644 index 000000000..585317a07 --- /dev/null +++ b/packages/task-graph/src/cache/resolveRef.ts @@ -0,0 +1,195 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import { isCacheRef } from "./CacheRef"; +import type { CacheRef } from "./CacheRef"; + +/** + * Resolves a single {@link CacheRef} to bytes (or `undefined` on cache miss). + * Wired up by callers against their configured cache backing; this module is + * unaware of any specific repository implementation. + */ +export type CacheRefResolver = (ref: CacheRef) => Promise; + +/** + * Streaming counterpart of {@link CacheRefResolver}. Returns an async iterable + * of chunks for consumers that want to pipe the bytes further (e.g. into an + * HTTP response) without materializing the full payload. Returns `undefined` + * if the backing has no streaming retrieval for this ref or the entry is + * absent. + */ +export type CacheRefStreamResolver = ( + ref: CacheRef +) => AsyncIterable | undefined; + +/** Options accepted by {@link resolveOutput}. */ +export type ResolveOutputOptions = { + /** + * Maximum number of concurrent resolver calls. Defaults to unbounded + * (`Infinity`), suitable for backings that handle their own pacing. + * Set a finite value when the backing is rate-limited. + */ + readonly concurrency?: number; + /** + * Predicate deciding which refs are resolved. Refs that fail the filter are + * left in place (the slot keeps the original {@link CacheRef}). When omitted, + * every ref is resolved. + */ + readonly filter?: (ref: CacheRef) => boolean; +}; + +/** + * Recursively visit a task output and replace every {@link CacheRef} encountered + * with the value produced by the resolver. Non-ref values are returned as-is. + * + * Identity is preserved when the input contains no refs (or none that match the + * optional filter): the same object reference comes back, so callers can rely + * on `===` / `WeakMap` keys not being silently invalidated by an auto-resolve. + * + * Plain objects and arrays are walked structurally; objects with a non-Object + * prototype (class instances such as `Error`, `URL`) are also walked, and the + * returned clone preserves their prototype. `Blob`, `ArrayBuffer`, typed + * arrays, `Date`, `RegExp`, and `Promise` are treated as opaque leaves. + * `Map`/`Set` are walked through so that refs nested inside them can resolve. + * + * On cache miss the resolver returns `undefined`; the corresponding slot in + * the returned output is `undefined`. This is the documented best-effort + * behavior — callers either tolerate missing bytes or check explicitly. + */ +export async function resolveOutput( + output: T, + resolver: CacheRefResolver, + options?: ResolveOutputOptions +): Promise { + if (!hasMatchingRef(output, options?.filter)) return output; + const limit = createLimiter(options?.concurrency); + return (await walk(output, resolver, limit, options?.filter)) as T; +} + +/** + * Cheap pre-scan: returns `true` if any {@link CacheRef} (matching the + * optional filter) is reachable inside `value`. Lets `resolveOutput` + * short-circuit and preserve identity when nothing needs resolving. + */ +function hasMatchingRef(value: unknown, filter: ((ref: CacheRef) => boolean) | undefined): boolean { + if (isCacheRef(value)) return filter ? filter(value) : true; + if (value === null || value === undefined) return false; + if (isLeaf(value)) return false; + if (Array.isArray(value)) { + for (const v of value) { + if (hasMatchingRef(v, filter)) return true; + } + return false; + } + if (value instanceof Map) { + for (const v of value.values()) { + if (hasMatchingRef(v, filter)) return true; + } + return false; + } + if (value instanceof Set) { + for (const v of value) { + if (hasMatchingRef(v, filter)) return true; + } + return false; + } + if (typeof value === "object") { + const source = value as Record; + for (const k of Object.keys(source)) { + if (hasMatchingRef(source[k], filter)) return true; + } + return false; + } + return false; +} + +async function walk( + value: unknown, + resolver: CacheRefResolver, + limit: Limiter, + filter: ((ref: CacheRef) => boolean) | undefined +): Promise { + if (isCacheRef(value)) { + if (filter && !filter(value)) return value; + return limit.run(() => resolver(value)); + } + if (value === null || value === undefined) return value; + if (isLeaf(value)) return value; + if (!hasMatchingRef(value, filter)) return value; + if (Array.isArray(value)) { + return Promise.all(value.map((v) => walk(v, resolver, limit, filter))); + } + if (value instanceof Map) { + const out = new Map(); + const entries = Array.from(value.entries()); + const resolved = await Promise.all( + entries.map(async ([k, v]) => [k, await walk(v, resolver, limit, filter)] as const) + ); + for (const [k, v] of resolved) out.set(k, v); + return out; + } + if (value instanceof Set) { + const out = new Set(); + const resolved = await Promise.all( + Array.from(value).map((v) => walk(v, resolver, limit, filter)) + ); + for (const v of resolved) out.add(v); + return out; + } + if (typeof value === "object") { + const source = value as Record; + // Preserve prototype so class instances (Error, URL, custom classes) + // survive the walk without losing methods/instanceof identity. + const proto = Object.getPrototypeOf(source); + const out: Record = + proto === null || proto === Object.prototype ? {} : Object.create(proto); + // Iterate in source order so the returned object's enumeration order + // matches the input even though resolutions race. + const keys = Object.keys(source); + const resolvedValues = await Promise.all( + keys.map((k) => walk(source[k], resolver, limit, filter)) + ); + for (let i = 0; i < keys.length; i++) out[keys[i]!] = resolvedValues[i]; + return out; + } + return value; +} + +function isLeaf(value: unknown): boolean { + if (typeof value !== "object" || value === null) return true; + if (value instanceof Blob) return true; + if (value instanceof ArrayBuffer) return true; + if (ArrayBuffer.isView(value)) return true; + if (value instanceof Date) return true; + if (value instanceof RegExp) return true; + if (value instanceof Promise) return true; + return false; +} + +type Limiter = { run(fn: () => Promise): Promise }; + +function createLimiter(concurrency: number | undefined): Limiter { + if (concurrency === undefined || concurrency === Infinity) { + return { run: (fn) => fn() }; + } + let free = Math.max(1, Math.floor(concurrency)); + const waiters: Array<() => void> = []; + return { + async run(fn: () => Promise): Promise { + while (free <= 0) { + await new Promise((resolve) => waiters.push(resolve)); + } + free--; + try { + return await fn(); + } finally { + free++; + const next = waiters.shift(); + if (next) next(); + } + }, + }; +} diff --git a/packages/task-graph/src/storage/TaskOutputRepository.ts b/packages/task-graph/src/storage/TaskOutputRepository.ts index 1a3b8ed5e..277237836 100644 --- a/packages/task-graph/src/storage/TaskOutputRepository.ts +++ b/packages/task-graph/src/storage/TaskOutputRepository.ts @@ -5,6 +5,7 @@ */ import { createServiceToken, EventEmitter, EventParameters } from "@workglow/util"; +import type { CacheRef } from "../cache/CacheRef"; import { TaskInput, TaskOutput } from "../task/TaskTypes"; export const TASK_OUTPUT_REPOSITORY = createServiceToken( @@ -79,6 +80,51 @@ export abstract class TaskOutputRepository { createdAt?: Date // for testing purposes ): Promise; + /** + * OPTIONAL streaming sink. Implementations that can ingest a byte stream + * without materializing the full payload (e.g. a file-backed cache) declare + * this method; the runner pipes `binary-delta` chunks straight to it. The + * default base class does NOT implement it — call `supportsStreaming()` to + * branch. `metadata` carries side-band data (e.g. HTTP response headers). + * + * Returns a {@link CacheRef} that the runner places into `Output` at the + * binary port slot when the reference threshold is met. The `$ref` string is + * opaque; only this repository (and any wrapping namespacer like + * {@link RunPrivateCacheRepo}) needs to know how to decode it via + * {@link getOutputByRef} / {@link getOutputStreamByRef}. + * + * Implementations that provide `saveOutputStream` MUST also provide + * `getOutputByRef` (and ideally `getOutputStreamByRef`); a ref written by + * one without a paired reader is unresolvable. + */ + saveOutputStream?( + taskType: string, + inputs: TaskInput, + chunks: AsyncIterable, + metadata: Record + ): Promise; + + /** + * OPTIONAL reader counterpart of {@link saveOutputStream}. Resolves a + * {@link CacheRef} previously produced by `saveOutputStream` to a `Blob`. + * Returns `undefined` on cache miss (TTL expiry, manual clear). The runner + * never calls this directly; consumers calling `JobHandle.result()` or + * `resolveOutput` reach it through the resolver layer. + */ + getOutputByRef?(ref: CacheRef): Promise; + + /** + * OPTIONAL streaming reader counterpart of {@link saveOutputStream}. Returns + * an async iterable of bytes for the referenced entry, or `undefined` when + * the entry is absent or this backing does not support streaming retrieval. + */ + getOutputStreamByRef?(ref: CacheRef): AsyncIterable | undefined; + + /** True when this repository implements `saveOutputStream`. */ + supportsStreaming(): boolean { + return typeof this.saveOutputStream === "function"; + } + abstract getOutput(taskType: string, inputs: TaskInput): Promise; abstract clear(): Promise; diff --git a/packages/task-graph/src/task-graph/StreamPump.ts b/packages/task-graph/src/task-graph/StreamPump.ts index b53515cc4..f38c039bb 100644 --- a/packages/task-graph/src/task-graph/StreamPump.ts +++ b/packages/task-graph/src/task-graph/StreamPump.ts @@ -193,6 +193,11 @@ export class StreamPump { registry: options.registry, resourceScope: options.resourceScope, runId: options.runId, + // Sinks are installed regardless of downstream needs: when both an + // accumulator and a router exist (downstream needs materialized + cache + // can stream), StreamProcessor tees — accumulator drives the enriched + // finish event for edge consumers; the router's CacheRef takes the + // port slot in finalOutput so the queue/cache row stays small. }); await this.edgeMaterializer.pushOutputFromNodeToEdges(task, results); @@ -229,7 +234,15 @@ export class StreamPump { outputCache: TaskOutputRepository | undefined, accumulateLeafOutputs: boolean ): boolean { - if (outputCache) return true; + if (outputCache) { + // Relaxation: when the cache can ingest a byte stream, the task streams + // ONLY binary, and no downstream edge needs the materialized value, the + // bytes are piped straight to the cache sink instead of being buffered + // into an enriched finish event. This is the memory win for large binary + // outputs (e.g. file/image producers). + if (StreamPump.canStreamBinaryToCache(this.graph, task, outputCache)) return false; + return true; + } const outEdges = this.graph.getTargetDataflows(task.id); if (outEdges.length === 0) return accumulateLeafOutputs; @@ -256,6 +269,161 @@ export class StreamPump { return false; } + /** + * Decides whether a streaming task's binary output can be piped straight to a + * stream-capable cache sink (skipping in-memory accumulation). True when: + * + * 1. The cache reports `supportsStreaming()` (NOT a `typeof saveOutputStream` + * duck-type — wrappers like `RunPrivateCacheRepo` always expose a concrete + * `saveOutputStream` but their `supportsStreaming()` reflects the BACKING + * repo, so the duck-type would falsely report `true` over a non-streaming + * backing store). + * 2. The task's only streaming output port(s) are binary. + * 3. No downstream dataflow edge needs the materialized value (every consumer + * accepts the raw binary stream, or there are no consumers). + * + * Exposed as a static (taking the graph explicitly) so the decision is + * unit-testable in isolation from a live run — mirroring + * {@link StreamPump.pipeBinaryToCache}. + */ + static canStreamBinaryToCache( + graph: TaskGraph, + task: ITask, + outputCache: TaskOutputRepository | undefined + ): boolean { + // Defensive: a repository may not implement `supportsStreaming` (the base + // class does, but test doubles / partial mocks may not). Treat anything + // that cannot affirmatively report streaming support as non-streaming. + if (typeof outputCache?.supportsStreaming !== "function") return false; + if (!outputCache.supportsStreaming()) return false; + + const outSchema = task.outputSchema(); + const streamingPorts = getStreamingPorts(outSchema); + const binaryOnly = + streamingPorts.length > 0 && streamingPorts.every((p) => p.mode === "binary"); + if (!binaryOnly) return false; + + return !StreamPump.anyConsumerNeedsMaterialized(graph, task); + } + + /** + * Returns `true` when any outgoing dataflow edge from {@link task} has a + * target task whose input port can't consume the source's stream mode + * directly (per {@link edgeNeedsAccumulation}). Independent of the cache — + * used by the graph runner to decide whether to inhibit binary-stream sinks + * on the source task's runner (refs can't survive across an edge whose + * target expects a materialized value). + * + * Treats fan-out `*` edges as always-needs-materialized (conservative). + */ + static anyConsumerNeedsMaterialized(graph: TaskGraph, task: ITask): boolean { + const outSchema = task.outputSchema(); + const outEdges = graph.getTargetDataflows(task.id); + return outEdges.some((df) => { + if (df.sourceTaskPortId === DATAFLOW_ALL_PORTS) return true; + const targetTask = graph.getTask(df.targetTaskId); + if (!targetTask) return false; + return edgeNeedsAccumulation( + outSchema, + df.sourceTaskPortId, + targetTask.inputSchema(), + df.targetTaskPortId + ); + }); + } + + /** + * Drives a stream-capable cache sink from a streaming task's `binary-delta` + * events. Returns an `{ promise, detach }` pair: `promise` resolves once the + * cache's `saveOutputStream` has consumed every chunk (after the task emits + * `stream_end`); `detach` removes the listeners. The chunk iterable is fed by + * the task's `stream_chunk` events and closed on `stream_end`. + * + * Abort/error contract: `StreamProcessor` emits `stream_end` only on success + * (it throws on abort/error before emitting it). To avoid a hang + listener + * leak when the source task aborts or errors mid-stream, the iterable is also + * terminated by the task's `abort`/`error` events and by an optional + * `AbortSignal`. On any of those the + * iterable ends gracefully so the sink can finalize the bytes seen so far — + * the returned promise ALWAYS settles and `detach` ALWAYS runs. + * + * Exposed as a static so the assembly (binary-delta events → AsyncIterable → + * `saveOutputStream`) is unit-testable in isolation from a graph run. + */ + static pipeBinaryToCache( + task: ITask, + binaryPortId: string | undefined, + sink: (chunks: AsyncIterable) => Promise, + signal?: AbortSignal + ): { promise: Promise; detach: () => void } { + const queue: Uint8Array[] = []; + let done = false; + let notify: (() => void) | undefined; + + const wake = () => { + const n = notify; + notify = undefined; + n?.(); + }; + + const onChunk = (event: StreamEvent) => { + if (event.type === "binary-delta") { + if (binaryPortId === undefined || event.port === binaryPortId) { + queue.push(event.binaryDelta); + wake(); + } + } + }; + const onEnd = () => { + done = true; + wake(); + }; + // Abort/error termination: StreamProcessor never emits `stream_end` on these + // paths, so without this the iterable would await forever. Terminate the + // iterable (don't throw) so the sink finalizes the bytes seen so far and the + // promise settles — the source's own abort/error already surfaces to the run. + const onTerminate = () => { + done = true; + wake(); + }; + + task.on("stream_chunk", onChunk); + task.on("stream_end", onEnd); + task.on("abort", onTerminate); + task.on("error", onTerminate); + if (signal) { + if (signal.aborted) onTerminate(); + else signal.addEventListener("abort", onTerminate); + } + + const detach = () => { + task.off("stream_chunk", onChunk); + task.off("stream_end", onEnd); + task.off("abort", onTerminate); + task.off("error", onTerminate); + signal?.removeEventListener("abort", onTerminate); + }; + + async function* chunkIterable(): AsyncIterable { + while (true) { + while (queue.length > 0) { + yield queue.shift()!; + } + if (done) return; + await new Promise((resolve) => { + notify = resolve; + }); + } + } + + // Discard the sink's return value (helper signals completion only; callers + // wanting a CacheRef should hold the sink-returning promise themselves). + const promise = sink(chunkIterable()) + .finally(detach) + .then(() => undefined); + return { promise, detach }; + } + /** * Returns true if an event carries a port-specific delta (text-delta or object-delta). */ diff --git a/packages/task-graph/src/task/CacheCoordinator.ts b/packages/task-graph/src/task/CacheCoordinator.ts index 187c9ce63..c72c2c1a0 100644 --- a/packages/task-graph/src/task/CacheCoordinator.ts +++ b/packages/task-graph/src/task/CacheCoordinator.ts @@ -5,10 +5,15 @@ */ import { getPortCodec } from "@workglow/util"; +import type { DataPortSchema } from "@workglow/util/schema"; import { type CachePolicy, isPolicyCached, isPolicyPrivate } from "../cache/CachePolicy"; +import type { CacheRef } from "../cache/CacheRef"; +import { isCacheRef } from "../cache/CacheRef"; import type { CacheRegistry } from "../cache/CacheRegistry"; import { RunPrivateCacheRepo } from "../cache/RunPrivateCacheRepo"; import type { TaskOutputRepository } from "../storage/TaskOutputRepository"; +import type { BinaryRefSink } from "./StreamProcessor"; +import { getBinaryPortFormat, getBinaryPortId, getStreamingPorts } from "./StreamTypes"; import type { ITask } from "./ITask"; import type { StreamEvent } from "./StreamTypes"; import { Task } from "./Task"; @@ -138,6 +143,23 @@ export class CacheCoordinator, + metadata: Record, + outputCache: TaskOutputRepository | undefined + ): Promise { + if (!outputCache || !this.task.cacheable) return undefined; + if (!outputCache.supportsStreaming()) return undefined; + return outputCache.saveOutputStream!(this.task.type, keyInputs, chunks, metadata); + } + // ======================================================================== // Policy-aware routing methods // ======================================================================== @@ -183,6 +205,100 @@ export class CacheCoordinator | undefined { + if (!this.task.cacheable) return undefined; + const cache = this.repoFor(registry, policy); + if (!cache || !cache.supportsStreaming()) return undefined; + const port = getBinaryPortId(outputSchema); + if (port === undefined) return undefined; + const taskType = this.task.type; + const sink: BinaryRefSink = (chunks) => + cache.saveOutputStream!(taskType, keyInputs, chunks, {}); + return new Map([[port, sink]]); + } + + /** + * Post-process the streaming task's `Output`: for every **binary streaming + * port** (per the schema) whose value is a {@link CacheRef} with + * `size < referenceThresholdBytes`, rehydrate the bytes via `getOutputByRef` + * and inline them as `Blob`/`ArrayBuffer` (per the port's `format` + * annotation). Refs at or above the threshold are left in place. + * `referenceThresholdBytes === 0` forces every ref to survive regardless of + * size. + * + * Restricted to schema-declared binary streaming ports so that legitimate + * non-binary fields that happen to carry a `{$ref: string}` shape (e.g. a + * JSON-Schema reference embedded in metadata) are not mistakenly resolved + * against the cache. + * + * Refs without a known `size` are kept as-is (the writer didn't measure; + * conservatively assume "large enough to keep as ref"). Backings that want + * threshold-based rehydration MUST populate `size` on the CacheRef they + * return from `saveOutputStream`. + */ + public async hydrateRefsBelowThreshold( + output: Output, + registry: CacheRegistry | undefined, + policy: CachePolicy, + outputSchema: DataPortSchema, + referenceThresholdBytes: number + ): Promise { + if (referenceThresholdBytes === 0) return output; + if (output === null || typeof output !== "object") return output; + const cache = this.repoFor(registry, policy); + if (!cache || typeof cache.getOutputByRef !== "function") return output; + + const binaryPorts = getStreamingPorts(outputSchema) + .filter((p) => p.mode === "binary") + .map((p) => p.port); + if (binaryPorts.length === 0) return output; + + const source = output as Record; + let out: Record | undefined; + const rehydrations = await Promise.all( + binaryPorts.map(async (port) => { + const value = source[port]; + if (!isCacheRef(value)) return undefined; + const size = value.size; + if (size === undefined || size >= referenceThresholdBytes) return undefined; + const blob = await cache.getOutputByRef!(value); + if (blob === undefined) return undefined; + const format = getBinaryPortFormat(outputSchema, port); + const inlined = format === "binary" ? await blob.arrayBuffer() : blob; + return { port, inlined }; + }) + ); + for (const r of rehydrations) { + if (!r) continue; + out ??= { ...source }; + out[r.port] = r.inlined; + } + return (out ?? source) as Output; + } + // ======================================================================== // Private static helpers (lifted from current module-private functions in // TaskRunner.ts) diff --git a/packages/task-graph/src/task/ITask.ts b/packages/task-graph/src/task/ITask.ts index b05c2386c..d1b93c7f1 100644 --- a/packages/task-graph/src/task/ITask.ts +++ b/packages/task-graph/src/task/ITask.ts @@ -109,6 +109,22 @@ export interface IRunConfig { */ shouldAccumulate?: boolean; + /** + * Threshold (in bytes) at which a binary output port's value is replaced by + * a {@link CacheRef} in `Output` instead of being inlined. Below this size, + * the runner inlines the bytes; at or above, it emits a reference and the + * bytes live only in the cache backing. + * + * `0` forces a reference for every binary port regardless of size. Negative + * values and `undefined` fall back to + * {@link REFERENCE_THRESHOLD_BYTES_DEFAULT} (64 KB). + * + * Only applied when the cache backing implements `saveOutputStream` and the + * port carries binary stream events; otherwise the value is always inlined + * regardless of this setting. + */ + referenceThresholdBytes?: number; + /** * Optional callback invoked whenever a task's progress changes during execution. * @param task - The task whose progress changed. diff --git a/packages/task-graph/src/task/StreamProcessor.ts b/packages/task-graph/src/task/StreamProcessor.ts index 386fd0553..a121c17b4 100644 --- a/packages/task-graph/src/task/StreamProcessor.ts +++ b/packages/task-graph/src/task/StreamProcessor.ts @@ -5,13 +5,30 @@ */ import type { ResourceScope, ServiceRegistry } from "@workglow/util"; +import type { CacheRef } from "../cache/CacheRef"; import type { Taskish } from "../task-graph/Conversions"; import type { ITask } from "./ITask"; import type { StreamEvent, StreamMode } from "./StreamTypes"; -import { getOutputStreamMode, getStreamingPorts } from "./StreamTypes"; +import { + getBinaryPortFormat, + getOutputStreamMode, + getStreamingPorts, + materializeBinary, +} from "./StreamTypes"; import { TaskAbortedError, TaskError } from "./TaskError"; import type { TaskRunContext } from "./TaskRunContext"; import type { TaskInput, TaskOutput } from "./TaskTypes"; + +/** + * Consumer for a port's binary-delta stream. The processor exposes chunks as + * an async iterable; the sink returns the {@link CacheRef} the processor + * places into `Output` at the port slot. + * + * Implementations are typically thin wrappers around + * `TaskOutputRepository.saveOutputStream` — the runner supplies the wrapper + * once it knows the cache key. + */ +export type BinaryRefSink = (chunks: AsyncIterable) => Promise; import { TaskStatus } from "./TaskTypes"; /** @@ -31,6 +48,18 @@ export interface StreamProcessorDeps { ...args: any[] ) => Promise; readonly own: >(i: T) => T; + /** + * Per-port binary-stream sinks. When a port has a sink registered, the + * processor routes that port's `binary-delta` chunks to the sink (as an + * async iterable) **instead** of accumulating them into a `Blob` / + * `ArrayBuffer` in memory. At finish, the sink's returned {@link CacheRef} + * replaces the port's slot in the output object — unless an explicit + * binary finish payload is present for that port, which always wins + * (artifact precedence: an explicit whole payload wins over a delta-built one). + * + * Ports without a sink follow the normal accumulation path. + */ + readonly binaryRefSinks?: ReadonlyMap; } /** @@ -70,6 +99,27 @@ export class StreamProcessor const accumulatedObjects = ctx.shouldAccumulate ? new Map | unknown[]>() : undefined; + const accumulatedBinary = ctx.shouldAccumulate + ? new Map() + : undefined; + // Per-port routers: lazily created on the first binary-delta whose port has + // a sink in `deps.binaryRefSinks`. Routes chunks to the sink instead of + // accumulating in memory; at finish, awaits the sink's returned CacheRef + // and writes it into the output at the port slot. + const sinks = deps.binaryRefSinks; + const routers = new Map(); + const ensureRouter = (port: string): BinaryStreamRouter | undefined => { + if (!sinks) return undefined; + const sink = sinks.get(port); + if (!sink) return undefined; + let r = routers.get(port); + if (!r) { + r = new BinaryStreamRouter(sink); + routers.set(port, r); + } + return r; + }; + let streamingStarted = false; let finalOutput: Output | undefined; @@ -84,6 +134,7 @@ export class StreamProcessor inputStreams: deps.inputStreams, }); + try { for await (const event of stream) { // For snapshot events, update runOutputData BEFORE emitting stream_chunk // so listeners see the latest snapshot when they handle the event. @@ -149,6 +200,28 @@ export class StreamProcessor this.task.emit("stream_chunk", event as StreamEvent); break; } + case "binary-delta": { + if (!streamingStarted) { + streamingStarted = true; + this.task.status = TaskStatus.STREAMING; + this.task.emit("status", this.task.status); + } + // Tee: when both a router AND an accumulator exist + // for this port (graph context where the cache can stream but a + // downstream edge needs the materialized value), push to BOTH — + // router writes to the cache for the small ref-bearing Output, + // accumulator drives the enriched finish event so edge consumers + // still receive a Blob/ArrayBuffer. + const router = ensureRouter(event.port); + if (router) router.push(event.binaryDelta); + if (accumulatedBinary) { + const arr = accumulatedBinary.get(event.port) ?? []; + arr.push(event.binaryDelta); + accumulatedBinary.set(event.port, arr); + } + this.task.emit("stream_chunk", event as StreamEvent); + break; + } case "snapshot": { if (!streamingStarted) { streamingStarted = true; @@ -159,11 +232,17 @@ export class StreamProcessor break; } case "finish": { - if (accumulated || accumulatedObjects) { + const hasEnrichment = + accumulated !== undefined || + accumulatedObjects !== undefined || + accumulatedBinary !== undefined || + routers.size > 0; + if (hasEnrichment) { // Emit an enriched finish event: merge accumulated deltas into // the finish payload so downstream dataflows get complete port data // without needing to re-accumulate themselves. - const merged: Record = { ...(event.data || {}) }; + const explicitPayload = (event.data || {}) as Record; + const merged: Record = { ...explicitPayload }; if (accumulated) { for (const [port, text] of accumulated) { if (text.length > 0) merged[port] = text; @@ -174,13 +253,42 @@ export class StreamProcessor merged[port] = obj; } } + if (accumulatedBinary) { + const outSchema = this.task.outputSchema(); + for (const [port, chunks] of accumulatedBinary) { + // Explicit binary finish payload wins. (Unlike text/object + // deltas above, which overwrite event.data, binary yields to + // an explicit payload — it's a whole artifact, not a partial.) + if (port in explicitPayload) continue; + const format = getBinaryPortFormat(outSchema, port); + merged[port] = materializeBinary(chunks, format); + } + } + // Close routers and collect refs. Explicit binary finish payload + // still wins for the OUTPUT slot (artifact precedence); the + // router's CacheRef is discarded in that case but the cache + // write already happened. + for (const router of routers.values()) router.end(); + const refs = new Map(); + for (const [port, router] of routers) { + if (port in explicitPayload) { + // Drain the promise so the sink doesn't leak; ignore the ref. + router.ref().catch(() => {}); + continue; + } + refs.set(port, await router.ref()); + } // For replace-mode streams, finish carries data: {} by convention. // Fall back to the last snapshot (runOutputData) so the final output - // is not silently cleared when the finish payload is empty. + // is not silently cleared when the finish payload is empty — + // overlaying router refs on top so cache-written bytes are not + // orphaned (the ref still lands in the OUTPUT slot). if (streamMode === "replace" && Object.keys(merged).length === 0) { const lastSnapshot = this.task.runOutputData; if (lastSnapshot && Object.keys(lastSnapshot).length > 0) { - finalOutput = lastSnapshot as Output; + const snapshotWithRefs: Record = { ...lastSnapshot }; + for (const [port, ref] of refs) snapshotWithRefs[port] = ref; + finalOutput = snapshotWithRefs as Output; this.task.emit("stream_chunk", { type: "finish", data: lastSnapshot, @@ -188,8 +296,20 @@ export class StreamProcessor break; } } - finalOutput = merged as unknown as Output; + // The emitted finish event always carries the materialized payload + // (from accumulators) so edge consumers see Blob/ArrayBuffer. + // finalOutput diverges only when a router produced a ref for a + // port that wasn't already pinned by an explicit payload — that + // ref takes the slot in the return value so the queue/cache row + // stays small (the tee path). this.task.emit("stream_chunk", { type: "finish", data: merged } as StreamEvent); + if (refs.size === 0) { + finalOutput = merged as unknown as Output; + } else { + const finalMerged: Record = { ...merged }; + for (const [port, ref] of refs) finalMerged[port] = ref; + finalOutput = finalMerged as unknown as Output; + } } else { // No accumulation. For replace-mode streams the provider's finish // event carries `data: {}` by convention — the snapshots already @@ -219,6 +339,19 @@ export class StreamProcessor } } } + } catch (err) { + // Surface the error to any in-flight router sinks so they reject + // (rather than waiting forever on the producer). The original error is + // rethrown unchanged. + const failure = err instanceof Error ? err : new Error(String(err)); + for (const router of routers.values()) router.fail(failure); + throw err; + } finally { + // Defensive: if the loop exited without seeing a `finish` event + // (e.g. abort, generator return without yield), close routers so their + // sinks see end-of-stream rather than blocking on the next chunk. + for (const router of routers.values()) router.end(); + } // Check if the task was aborted during streaming if (ctx.abortController.signal.aborted) { @@ -234,3 +367,82 @@ export class StreamProcessor return this.task.runOutputData as Output; } } + +/** + * Producer-consumer router used by {@link StreamProcessor} to forward a single + * binary output port's `binary-delta` chunks to a {@link BinaryRefSink}. The + * sink consumes the chunks via the async iterable and returns a + * {@link CacheRef} that the processor places into `Output` at finish. + * + * Lifecycle: chunks pushed via `push()` are yielded to the sink in order. + * `end()` signals end-of-stream (sink completes consumption, refPromise + * resolves). `fail(err)` causes the iterable to throw on the next read + * (refPromise rejects). `end()` and `fail()` are idempotent. + * + * Backpressure: there is none — the producer (`executeStream`) writes into + * `buffer` synchronously; if the sink consumes more slowly than the producer + * emits chunks, the buffer grows unbounded. This is acceptable for cache + * backings whose write throughput meets or exceeds the upstream source + * (the common case: cache is a local SSD/memory FS; source is bounded by + * network or compute). For genuinely slow backings (remote object stores, + * throttled FS), wrap the sink in a chunked-uploader that applies its own + * pacing — there is no signal we can send back into `binary-delta`. + */ +class BinaryStreamRouter { + private readonly buffer: Uint8Array[] = []; + private finished = false; + private failure: Error | undefined; + private notify: (() => void) | undefined; + private readonly refPromise: Promise; + + constructor(sink: BinaryRefSink) { + this.refPromise = sink(this.iterable()); + // Observe rejection so an unawaited refPromise (e.g. after fail() in an + // error path) doesn't surface as an unhandled rejection. Subsequent + // `await this.refPromise` still rejects. + this.refPromise.catch(() => {}); + } + + push(chunk: Uint8Array): void { + if (this.finished) return; + this.buffer.push(chunk); + this.wake(); + } + + end(): void { + if (this.finished) return; + this.finished = true; + this.wake(); + } + + fail(err: Error): void { + if (this.finished) return; + this.failure = err; + this.finished = true; + this.wake(); + } + + ref(): Promise { + return this.refPromise; + } + + private wake(): void { + const n = this.notify; + this.notify = undefined; + n?.(); + } + + private async *iterable(): AsyncIterable { + while (true) { + while (this.buffer.length > 0) { + yield this.buffer.shift()!; + } + if (this.failure) throw this.failure; + if (this.finished) return; + await new Promise((res) => { + this.notify = res; + }); + } + } +} + diff --git a/packages/task-graph/src/task/StreamTypes.ts b/packages/task-graph/src/task/StreamTypes.ts index 3f79f20d3..2220a370e 100644 --- a/packages/task-graph/src/task/StreamTypes.ts +++ b/packages/task-graph/src/task/StreamTypes.ts @@ -12,12 +12,13 @@ import type { DataPortSchema, JsonSchema } from "@workglow/util/schema"; * - `append`: Each chunk is a delta (e.g., a new token). * - `replace`: Each chunk is a corrected/revised snapshot of the complete output so far. * - `object`: Each chunk is a progressively more complete partial object snapshot. + * - `binary`: Each chunk is an ordered byte slice; consumer concatenates into a Blob/ArrayBuffer. * - `mixed`: Multiple ports use different stream modes (e.g., append + object). * * Declared per-port via the `x-stream` schema extension property. * Absent `x-stream` = `"none"`. */ -export type StreamMode = "none" | "append" | "replace" | "object" | "mixed"; +export type StreamMode = "none" | "append" | "replace" | "object" | "binary" | "mixed"; /** * Append mode: delta chunk (consumer accumulates). @@ -45,6 +46,18 @@ export type StreamObjectDelta = { objectDelta: Record | unknown[]; }; +/** + * Binary mode: an ordered, append-only chunk of bytes (consumer concatenates). + * `port` identifies which output port this delta belongs to. Chunks are + * materialized on `finish` into a `Blob` or `ArrayBuffer` per the port's + * schema `format` (see `materializeBinary`). + */ +export type StreamBinaryDelta = { + type: "binary-delta"; + port: string; + binaryDelta: Uint8Array; +}; + /** * Replace mode: full snapshot chunk (replaces previous state). */ @@ -104,6 +117,7 @@ export type StreamPhase = { export type StreamEvent> = | StreamTextDelta | StreamObjectDelta + | StreamBinaryDelta | StreamSnapshot | StreamFinish | StreamError @@ -126,7 +140,8 @@ export function getPortStreamMode(schema: DataPortSchema | JsonSchema, portId: s const prop = (schema.properties as Record)?.[portId]; if (!prop || typeof prop === "boolean") return "none"; const xStream = prop["x-stream"]; - if (xStream === "append" || xStream === "replace" || xStream === "object") return xStream; + if (xStream === "append" || xStream === "replace" || xStream === "object" || xStream === "binary") + return xStream; return "none"; } @@ -147,7 +162,12 @@ export function getStreamingPorts( for (const [name, prop] of Object.entries(props)) { if (!prop || typeof prop === "boolean") continue; const xStream = (prop as any)["x-stream"]; - if (xStream === "append" || xStream === "replace" || xStream === "object") { + if ( + xStream === "append" || + xStream === "replace" || + xStream === "object" || + xStream === "binary" + ) { result.push({ port: name, mode: xStream }); } } @@ -251,6 +271,69 @@ export function getObjectPortId(schema: DataPortSchema): string | undefined { return undefined; } +/** + * Returns the port ID (property name) of the first output port that declares + * `x-stream: "binary"`, or `undefined` if no such port exists. + * + * @param schema - The task's output DataPortSchema + * @returns The port name with binary streaming, or undefined + */ +export function getBinaryPortId(schema: DataPortSchema): string | undefined { + if (typeof schema === "boolean") return undefined; + const props = schema.properties; + if (!props) return undefined; + + for (const [name, prop] of Object.entries(props)) { + if (!prop || typeof prop === "boolean") continue; + if ((prop as any)["x-stream"] === "binary") return name; + } + return undefined; +} + +/** + * Reads the `format` annotation of a single output port from the task's output + * schema. Used to decide whether accumulated binary chunks materialize into a + * `Blob` (`format: "blob"` or absent) or an `ArrayBuffer` (`format: "binary"`). + */ +export function getBinaryPortFormat(schema: DataPortSchema, port: string): string | undefined { + if (typeof schema === "boolean") return undefined; + const prop = (schema.properties as Record)?.[port]; + if (!prop || typeof prop === "boolean") return undefined; + return prop.format as string | undefined; +} + +/** + * Materializes ordered binary chunks into the value type declared by the + * output port's schema `format`: + * - `"binary"` → `ArrayBuffer` + * - `"blob"` (or absent) → `Blob` (the default) + * + * Chunks are concatenated in arrival order. Callers MUST pass chunks in the + * order they were emitted. + * + * @param chunks - Ordered binary chunks to concatenate + * @param format - The output port's schema `format` (e.g. `"binary"` or `"blob"`) + * @returns The materialized `Blob` or `ArrayBuffer` + */ +export function materializeBinary( + chunks: readonly Uint8Array[], + format: string | undefined +): Blob | ArrayBuffer { + if (format === "blob" || format === undefined) { + return new Blob(chunks as unknown as BlobPart[]); + } + // format === "binary" (and any other non-blob value) → ArrayBuffer + let total = 0; + for (const c of chunks) total += c.byteLength; + const merged = new Uint8Array(total); + let offset = 0; + for (const c of chunks) { + merged.set(c, offset); + offset += c.byteLength; + } + return merged.buffer; +} + /** * Returns a map of port names to their JSON Schemas for every output port * that declares `"x-structured-output": true`. diff --git a/packages/task-graph/src/task/TaskRunner.ts b/packages/task-graph/src/task/TaskRunner.ts index 5dcfaa009..68a8ff88a 100644 --- a/packages/task-graph/src/task/TaskRunner.ts +++ b/packages/task-graph/src/task/TaskRunner.ts @@ -13,7 +13,12 @@ import { SpanStatusCode, } from "@workglow/util"; import type { CacheRegistry } from "../cache"; -import { CACHE_REGISTRY, DefaultCacheRegistry, RunPrivateCacheRepo } from "../cache"; +import { + CACHE_REGISTRY, + DefaultCacheRegistry, + resolveReferenceThreshold, + RunPrivateCacheRepo, +} from "../cache"; import { TASK_OUTPUT_REPOSITORY, TaskOutputRepository } from "../storage/TaskOutputRepository"; import type { Taskish } from "../task-graph/Conversions"; import { ensureTask } from "../task-graph/Conversions"; @@ -248,6 +253,23 @@ export class TaskRunner< ); if (outputs === undefined) { + // Build per-port binary-stream sinks when the cache supports + // streaming and the schema has a binary port. The sinks always run + // (memory-bounded write to cache); the runtime threshold controls + // whether the resulting CacheRef SURVIVES in Output or gets + // rehydrated to an inline Blob/ArrayBuffer below. + const referenceThresholdBytes = resolveReferenceThreshold( + config.referenceThresholdBytes ?? this.task.runConfig.referenceThresholdBytes + ); + const binaryRefSinks = isStreamable + ? this.cacheCoordinator.getBinaryRefSinksByPolicy( + keyInputs, + this.cacheRegistry, + policy, + this.task.outputSchema() + ) + : undefined; + outputs = isStreamable ? await this.streamProcessor.run(inputs, ctx, { registry: this.registry, @@ -255,9 +277,24 @@ export class TaskRunner< inputStreams: this.inputStreams, onProgress: this.handleProgress.bind(this), own: this.own, + binaryRefSinks, }) : await this.executeTask(inputs, ctx); + // Rehydrate refs whose committed size is below the configured + // threshold so callers see inline bytes for small outputs (threshold + // default = 64 KiB). Refs at/above threshold survive. threshold = 0 + // forces every ref to survive regardless of size. + if (outputs !== undefined && binaryRefSinks !== undefined) { + outputs = await this.cacheCoordinator.hydrateRefsBelowThreshold( + outputs as Output, + this.cacheRegistry, + policy, + this.task.outputSchema(), + referenceThresholdBytes + ); + } + await this.cacheCoordinator.saveByPolicy( keyInputs, outputs as Output, diff --git a/packages/test/src/test/job-queue/JobQueueStream.test.ts b/packages/test/src/test/job-queue/JobQueueStream.test.ts new file mode 100644 index 000000000..9523d94f5 --- /dev/null +++ b/packages/test/src/test/job-queue/JobQueueStream.test.ts @@ -0,0 +1,111 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { IJobExecuteContext, StreamEventLike } from "@workglow/job-queue"; +import { + InMemoryQueueStorage, + Job, + JobQueueClient, + JobQueueServer, + wrapQueueStorage, +} from "@workglow/job-queue"; +import { uuid4 } from "@workglow/util"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; + +interface SInput { + readonly [key: string]: unknown; +} +interface SOutput { + readonly ok: true; + readonly [key: string]: unknown; +} + +/** + * A job that emits a few stream events during execution via the OPTIONAL + * `emitStreamEvent` context hook, then returns a result. Two ordered + * `binary-delta` chunks followed by a `finish` exercise both binary payload + * delivery and ordering across the same-process server-attached channel. + */ +class StreamEmittingJob extends Job { + public override async execute(_input: SInput, context: IJobExecuteContext): Promise { + context.emitStreamEvent?.({ + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([1, 2]), + }); + context.emitStreamEvent?.({ + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([3]), + }); + context.emitStreamEvent?.({ type: "finish", data: {} }); + return { ok: true }; + } +} + +// Same-process server-attached harness, mirroring genericJobQueueTests.ts: +// InMemory queue storage + JobQueueServer + JobQueueClient, with the client +// attached to the server (`client.attach(server)`) so the client's `this.server` +// is set and `JobHandle.onStream` is present. These same-process queue tests run +// unconditionally in the repo (see InMemoryJobQueue.test.ts), so this suite is +// not gated behind any RUN_QUEUE_TESTS flag. +describe("job-queue stream delivery (same-process)", () => { + let server: JobQueueServer; + let client: JobQueueClient; + let storage: InMemoryQueueStorage; + let queueName: string; + + beforeEach(async () => { + queueName = `test-stream-${uuid4()}`; + storage = new InMemoryQueueStorage(queueName); + await storage.migrate(); + + const { messageQueue, jobStore } = wrapQueueStorage(storage); + server = new JobQueueServer(StreamEmittingJob, { + messageQueue, + jobStore, + queueName, + pollIntervalMs: 1, + stopTimeoutMs: 0, + }); + client = new JobQueueClient({ messageQueue, jobStore, queueName }); + // Attach for same-process optimization → sets client.server → enables onStream. + client.attach(server); + }); + + afterEach(async () => { + if (server) await server.stop(); + if (storage) await storage.deleteAll(); + }); + + it("delivers stream events in order via handle.onStream", async () => { + await server.start(); + + const handle = await client.send({ taskType: "stream" }); + + // onStream is present only on a server-attached handle (capability gate). + expect(typeof handle.onStream).toBe("function"); + + const received: StreamEventLike[] = []; + const cleanup = handle.onStream!((event) => { + received.push(event); + }); + + const output = await handle.waitFor(); + cleanup(); + + expect(output).toEqual({ ok: true }); + + // Events arrived in emission order. + expect(received.map((e) => e.type)).toEqual(["binary-delta", "binary-delta", "finish"]); + + // Binary payloads preserved byte-for-byte across the channel. + const firstBytes = received[0].binaryDelta as Uint8Array; + const secondBytes = received[1].binaryDelta as Uint8Array; + expect(Array.from(firstBytes)).toEqual([1, 2]); + expect(Array.from(secondBytes)).toEqual([3]); + }); +}); diff --git a/packages/test/src/test/job-queue/JobQueueStreamWorker.integration.test.ts b/packages/test/src/test/job-queue/JobQueueStreamWorker.integration.test.ts new file mode 100644 index 000000000..5e5a26d9c --- /dev/null +++ b/packages/test/src/test/job-queue/JobQueueStreamWorker.integration.test.ts @@ -0,0 +1,93 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Worker } from "node:worker_threads"; +import { afterEach, describe, expect, it } from "vitest"; + +/** + * Node primitive validation: structured-clone + transferable buffers across a + * `worker_threads` boundary. Navigational marker for a future cross-thread + * queue host — NOT a test of current `@workglow/job-queue` behavior. + * + * SCOPE — read before changing this test: today, `@workglow/job-queue`'s own + * stream channel (`IJobExecuteContext.emitStreamEvent` → worker `job_stream` + * event → `JobQueueServer.forwardToClients("handleJobStream", …)` → + * `JobQueueClient` → `JobHandle.onStream`) is entirely SAME-PROCESS: the + * `JobQueueWorker` runs in-process inside `JobQueueServer`, and + * `forwardToClients` invokes attached-client methods directly (no postMessage, + * no worker thread, no transferables). Cross-PROCESS coordination is handled + * by the message-queue storage layer via `IMessageQueue.subscribeToChanges` + * with serialized rows — also not a transferables path. There is no + * `WorkerManager`-hosted queue transport anywhere in the package. The actual + * same-process delivery path is proven by JobQueueStream.test.ts. + * + * This test therefore exercises the underlying Node primitive that a future + * `WorkerServer`-hosted queue would have to rely on: binary chunks emitted + * from a worker thread can be TRANSFERRED (not copied) to the host via + * `postMessage`, which is what `WorkerServerBase.extractTransferables` + * (packages/util/src/worker/WorkerServerBase.ts ~line 30) walks payloads to + * arrange. Note that `WorkerServerBase` currently applies that walk only in + * `postResult` (terminal complete message), not in `postStreamChunk` — so + * even on that boundary, incremental chunks are structure-cloned today; this + * test validates that the transfer semantics work for the binary-delta payload + * shape if anyone later wires them up. The worker emits two `binary-delta` + * events across the thread boundary (see jobQueueStreamWorker.fixture.mjs); + * the host receives the full byte sequence in order, and the worker's + * transferred views detach (`byteLength` becomes 0). Run under Node (vitest's + * default pool); bun's worker_threads does not detach transferred buffers, + * which is why this is an `.integration` test executed under the Node ABI per + * libs/.claude/CLAUDE.md. + */ +describe("worker_threads transfer mechanism — payload validation for a future cross-thread queue host (not current job-queue code)", () => { + let worker: Worker | undefined; + + afterEach(async () => { + if (worker) { + await worker.terminate(); + worker = undefined; + } + }); + + it("host receives the full byte sequence in order; worker buffers detach", async () => { + const fixtureUrl = new URL("./jobQueueStreamWorker.fixture.mjs", import.meta.url); + worker = new Worker(fixtureUrl); + + const received: Array<{ type: string; port?: string; binaryDelta?: Uint8Array }> = []; + + const done = await new Promise<{ firstByteLength: number; secondByteLength: number }>( + (resolve, reject) => { + worker!.on("error", reject); + worker!.on("message", (msg: Record) => { + // The host plays the role of `JobHandle.onStream` listener: collect + // every stream event the worker emits across the thread boundary. + if (msg.type === "binary-delta" || msg.type === "finish") { + received.push(msg as { type: string; port?: string; binaryDelta?: Uint8Array }); + } else if (msg.type === "done") { + resolve(msg as { firstByteLength: number; secondByteLength: number }); + } + }); + worker!.postMessage("start"); + } + ); + + // Events arrived in emission order across the thread boundary. + expect(received.map((e) => e.type)).toEqual(["binary-delta", "binary-delta", "finish"]); + + // Host received the full byte sequence in order across the two events. + const hostBytes = received + .filter((e) => e.type === "binary-delta") + .flatMap((e) => Array.from(e.binaryDelta as Uint8Array)); + expect(hostBytes).toEqual([1, 2, 3]); + + // Detachment: the worker transferred (did not copy) each chunk buffer, so + // its own views are now detached (byteLength === 0). This is the + // `WorkerServerBase.extractTransferables` behavior. Asserted under Node, + // which detaches transferred buffers per the structured-clone transfer + // semantics the design depends on. + expect(done.firstByteLength).toBe(0); + expect(done.secondByteLength).toBe(0); + }); +}); diff --git a/packages/test/src/test/job-queue/jobQueueStreamWorker.fixture.mjs b/packages/test/src/test/job-queue/jobQueueStreamWorker.fixture.mjs new file mode 100644 index 000000000..2b031a718 --- /dev/null +++ b/packages/test/src/test/job-queue/jobQueueStreamWorker.fixture.mjs @@ -0,0 +1,53 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * Worker-thread fixture for JobQueueStreamWorker.integration.test.ts. + * + * Simulates a job executing inside a real worker thread that emits two ordered + * `binary-delta` stream events. Each chunk is posted to the host with the + * chunk's underlying `ArrayBuffer` in the transfer list — the exact mechanism + * `WorkerServerBase.extractTransferables` (packages/util/src/worker/WorkerServerBase.ts) + * applies automatically when a TypedArray crosses a worker boundary. The + * transfer (rather than copy) detaches the worker's view of the buffer, which + * the host asserts via the reported `byteLength` values in the terminal "done" + * message (a stand-in for the job result the job would otherwise return). + * + * Plain `.mjs` (not `.ts`) so it can be launched directly by `worker_threads` + * under the Node runtime vitest uses, without a TypeScript transform step. + */ + +import { parentPort } from "node:worker_threads"; + +if (!parentPort) { + throw new Error("jobQueueStreamWorker.fixture.mjs must run as a worker thread"); +} + +parentPort.on("message", () => { + // Two ordered binary-delta chunks, then a finish — mirrors a job calling + // ctx.emitStreamEvent?.(...) during execution. + const first = new Uint8Array([1, 2]); + const second = new Uint8Array([3]); + + parentPort.postMessage( + { type: "binary-delta", port: "bytes", binaryDelta: first }, + [first.buffer] + ); + parentPort.postMessage( + { type: "binary-delta", port: "bytes", binaryDelta: second }, + [second.buffer] + ); + parentPort.postMessage({ type: "finish", data: {} }); + + // Report the worker-side byteLength of the retained chunk views AFTER they + // were transferred. A genuine transfer detaches the underlying buffer, so + // these are 0 under Node. (Stands in for the job result.) + parentPort.postMessage({ + type: "done", + firstByteLength: first.byteLength, + secondByteLength: second.byteLength, + }); +}); diff --git a/packages/test/src/test/task-graph/CacheRef.test.ts b/packages/test/src/test/task-graph/CacheRef.test.ts new file mode 100644 index 000000000..b736d7b51 --- /dev/null +++ b/packages/test/src/test/task-graph/CacheRef.test.ts @@ -0,0 +1,86 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, expect, it } from "vitest"; +import { + isCacheRef, + REFERENCE_THRESHOLD_BYTES_DEFAULT, + resolveReferenceThreshold, +} from "@workglow/task-graph"; +import type { CacheRef, IRunConfig } from "@workglow/task-graph"; + +describe("isCacheRef", () => { + it("accepts a minimal ref carrying only $ref", () => { + const ref: CacheRef = { $ref: "cache://k1" }; + expect(isCacheRef(ref)).toBe(true); + }); + + it("accepts a ref carrying size and mime hints", () => { + const ref: CacheRef = { $ref: "cache://k2", size: 1024, mime: "audio/wav" }; + expect(isCacheRef(ref)).toBe(true); + }); + + it("rejects values without a string $ref", () => { + expect(isCacheRef({})).toBe(false); + expect(isCacheRef({ ref: "cache://k" })).toBe(false); + expect(isCacheRef({ $ref: 42 })).toBe(false); + expect(isCacheRef({ $ref: null })).toBe(false); + }); + + it("rejects primitives and null", () => { + expect(isCacheRef(null)).toBe(false); + expect(isCacheRef(undefined)).toBe(false); + expect(isCacheRef("cache://k")).toBe(false); + expect(isCacheRef(42)).toBe(false); + expect(isCacheRef(true)).toBe(false); + }); + + it("accepts a ref where $ref is the empty string (still string-typed)", () => { + expect(isCacheRef({ $ref: "" })).toBe(true); + }); + + it("does not confuse JSON-Schema $ref strings with cache refs by shape", () => { + // JSON Schema $ref also uses { $ref: string }. Shape is identical at this + // layer; discrimination by call site / port-context is the contract, not + // shape inspection. This test documents the limitation. + const jsonSchemaRef = { $ref: "#/definitions/Foo" }; + expect(isCacheRef(jsonSchemaRef)).toBe(true); + }); +}); + +describe("resolveReferenceThreshold", () => { + it("returns the default constant when threshold is undefined", () => { + expect(resolveReferenceThreshold(undefined)).toBe(REFERENCE_THRESHOLD_BYTES_DEFAULT); + }); + + it("returns the configured threshold when set to a positive number", () => { + expect(resolveReferenceThreshold(1024)).toBe(1024); + expect(resolveReferenceThreshold(1_000_000)).toBe(1_000_000); + }); + + it("returns 0 when set to 0 (sentinel: always emit a reference)", () => { + expect(resolveReferenceThreshold(0)).toBe(0); + }); + + it("falls back to the default when given a negative value", () => { + expect(resolveReferenceThreshold(-1)).toBe(REFERENCE_THRESHOLD_BYTES_DEFAULT); + }); + + it("the default is 64 KiB", () => { + expect(REFERENCE_THRESHOLD_BYTES_DEFAULT).toBe(65_536); + }); + + it("IRunConfig accepts referenceThresholdBytes as a number", () => { + const cfg: IRunConfig = { referenceThresholdBytes: 0 }; + expect(resolveReferenceThreshold(cfg.referenceThresholdBytes)).toBe(0); + const cfg2: IRunConfig = { referenceThresholdBytes: 2048 }; + expect(resolveReferenceThreshold(cfg2.referenceThresholdBytes)).toBe(2048); + const cfg3: IRunConfig = {}; + expect(resolveReferenceThreshold(cfg3.referenceThresholdBytes)).toBe( + REFERENCE_THRESHOLD_BYTES_DEFAULT + ); + }); +}); diff --git a/packages/test/src/test/task-graph/Spec2QueueRowAndRehydrate.test.ts b/packages/test/src/test/task-graph/Spec2QueueRowAndRehydrate.test.ts new file mode 100644 index 000000000..f453b30ec --- /dev/null +++ b/packages/test/src/test/task-graph/Spec2QueueRowAndRehydrate.test.ts @@ -0,0 +1,241 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { CacheRef, StreamEvent } from "@workglow/task-graph"; +import { + CACHE_REGISTRY, + DefaultCacheRegistry, + IExecuteContext, + isCacheRef, + resolveJobOutput, + Task, + TaskOutputRepository, + TaskRegistry, +} from "@workglow/task-graph"; +import type { JobHandleLike, TaskInput, TaskOutput } from "@workglow/task-graph"; +import { Container, ServiceRegistry, sleep } from "@workglow/util"; +import { DataPortSchema } from "@workglow/util/schema"; +import { beforeAll, beforeEach, describe, expect, it } from "vitest"; + +type BinOut = { bytes: Blob }; + +/** + * Streaming memory cache that exposes both `saveOutputStream` (Spec 2 path, + * returns CacheRef + stores bytes in a side map) and `getOutputByRef` so the + * cross-process resolution test below can hydrate refs without touching the + * main `saveOutput` row. + */ +class StreamingMemoryRepo extends TaskOutputRepository { + public readonly saved = new Map(); + public readonly streamed = new Map(); + override async saveOutput(t: string, i: TaskInput, o: TaskOutput): Promise { + this.saved.set(t + JSON.stringify(i), o); + } + override async getOutput(t: string, i: TaskInput): Promise { + return this.saved.get(t + JSON.stringify(i)); + } + override async clear(): Promise { + this.saved.clear(); + this.streamed.clear(); + } + override async size(): Promise { + return this.saved.size; + } + override async clearOlderThan(): Promise {} + override isDurable(): boolean { + return false; + } + override async saveOutputStream( + taskType: string, + inputs: TaskInput, + chunks: AsyncIterable, + _metadata: Record + ): Promise { + const parts: Uint8Array[] = []; + let size = 0; + for await (const c of chunks) { + parts.push(c); + size += c.byteLength; + } + const merged = new Uint8Array(size); + let off = 0; + for (const p of parts) { + merged.set(p, off); + off += p.byteLength; + } + const key = `inmem://${taskType}::${JSON.stringify(inputs)}`; + this.streamed.set(key, merged); + return { $ref: key, size, mime: "application/octet-stream" }; + } + override async getOutputByRef(ref: CacheRef): Promise { + const bytes = this.streamed.get(ref.$ref); + return bytes === undefined ? undefined : new Blob([bytes as unknown as BlobPart]); + } +} + +class NonStreamingMemoryRepo extends TaskOutputRepository { + public readonly saved = new Map(); + override async saveOutput(t: string, i: TaskInput, o: TaskOutput): Promise { + this.saved.set(t + JSON.stringify(i), o); + } + override async getOutput(t: string, i: TaskInput): Promise { + return this.saved.get(t + JSON.stringify(i)); + } + override async clear(): Promise { + this.saved.clear(); + } + override async size(): Promise { + return this.saved.size; + } + override async clearOlderThan(): Promise {} + override isDurable(): boolean { + return false; + } +} + +const CHUNK = 4 * 1024; // 4 KiB +const CHUNKS = 16; // 64 KiB total — large enough that inline-vs-ref is dramatic + +class BigBlobStreamTask extends Task, BinOut> { + public static override type = "Spec2QueueRowTest_BigBlobStream"; + public static override category = "Test"; + public static override cacheable = true; + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { bytes: { type: "object", format: "blob", "x-stream": "binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + for (let i = 0; i < CHUNKS; i++) { + const chunk = new Uint8Array(CHUNK).fill(i & 0xff); + yield { type: "binary-delta", port: "bytes", binaryDelta: chunk }; + if (i % 4 === 3) await sleep(0); + } + yield { type: "finish", data: {} as BinOut }; + } +} + +beforeAll(() => { + TaskRegistry.registerTask(BigBlobStreamTask as any); +}); + +let services: ServiceRegistry; +let repo: StreamingMemoryRepo; +beforeEach(() => { + repo = new StreamingMemoryRepo({}); + services = new ServiceRegistry(new Container()); + services.registerInstance(CACHE_REGISTRY, new DefaultCacheRegistry({ deterministic: repo })); +}); + +/** + * The principal user value of Spec 2: the SAVED ROW in the cache (the same + * value job-queue would carry through `JobStorageFormat.output`) stays small + * regardless of payload size when the ref path is taken. These tests measure + * the wire size by JSON-serializing the saved output the way a real storage + * backend (Postgres/SQLite) would. + */ +describe("Spec 2 — saved-row size & cross-process rehydration", () => { + it("force-ref keeps the saved row tiny (CacheRef envelope only); bytes live in the streaming cache", async () => { + const task = new BigBlobStreamTask(); + const output = await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + + // Wire shape of the cached small row. + expect(repo.saved.size).toBe(1); + const [savedOutput] = Array.from(repo.saved.values()) as Array>; + expect(isCacheRef(savedOutput.bytes)).toBe(true); + + const savedJson = JSON.stringify(savedOutput); + // CacheRef envelope is well under 1 KiB regardless of payload size. + expect(savedJson.length).toBeLessThan(1024); + + // Bytes are present in the streaming side of the cache (full size). + const ref = savedOutput.bytes as CacheRef; + expect(ref.size).toBe(CHUNKS * CHUNK); + const hydrated = await repo.getOutputByRef(ref); + expect(hydrated).toBeInstanceOf(Blob); + expect(hydrated!.size).toBe(CHUNKS * CHUNK); + + // And Output's port slot is a ref (not a Blob). + expect(isCacheRef(output.bytes)).toBe(true); + }); + + it("contrast: a non-streaming cache embeds the full Blob in the saved row (the old bloat path)", async () => { + // Same task, but the cache cannot stream — the runner falls through to + // accumulation and the saved row contains the serialized payload. + const nonStreamRepo = new NonStreamingMemoryRepo({}); + const altServices = new ServiceRegistry(new Container()); + altServices.registerInstance( + CACHE_REGISTRY, + new DefaultCacheRegistry({ deterministic: nonStreamRepo }) + ); + + const task = new BigBlobStreamTask(); + await task.run({}, { registry: altServices, referenceThresholdBytes: 0 }); + + expect(nonStreamRepo.saved.size).toBe(1); + const [savedOutput] = Array.from(nonStreamRepo.saved.values()) as Array< + Record + >; + expect(isCacheRef(savedOutput.bytes)).toBe(false); + expect(savedOutput.bytes).toBeInstanceOf(Blob); + // The Blob itself isn't JSON-encoded inline by default, but the + // observable point is: this row carries the artifact, not a reference. + expect((savedOutput.bytes as Blob).size).toBe(CHUNKS * CHUNK); + }); + + it("cross-process simulation: serialize the small row, deserialize elsewhere, resolveJobOutput against shared cache", async () => { + const task = new BigBlobStreamTask(); + await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + const [savedOutput] = Array.from(repo.saved.values()) as Array>; + + // "Process A" → wire: serialize the small row to a string (what Postgres + // would store in JSONB / SQLite would store in TEXT for the JobStorageFormat + // output column). + const wire = JSON.stringify(savedOutput); + expect(wire.length).toBeLessThan(1024); + + // "Process B" pulls the small row off the queue and reconstructs Output. + // The CacheRef survives JSON round-trip unchanged (just data). + const received = JSON.parse(wire) as { bytes: CacheRef }; + expect(isCacheRef(received.bytes)).toBe(true); + + // Process B resolves the ref against the SHARED cache (in real + // deployments: S3, networked FS, shared Postgres) — here `repo` is the + // shared backing for the test. resolveJobOutput is the queue-boundary + // bridge that callers wrap their JobHandle in. + const handle: JobHandleLike<{ bytes: Blob }> = { + waitFor: async () => received as unknown as { bytes: Blob }, + }; + const resolved = await resolveJobOutput(handle, repo); + + expect(resolved.bytes).toBeInstanceOf(Blob); + expect((resolved.bytes as Blob).size).toBe(CHUNKS * CHUNK); + }); + + it("dangling refs (cache cleared between save and read) resolve to undefined — best-effort contract", async () => { + const task = new BigBlobStreamTask(); + await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + const [savedOutput] = Array.from(repo.saved.values()) as Array>; + + // Cache TTL expired / explicit clear / different deployment with no + // backing access — the ref now points nowhere. + await repo.clear(); + + const handle: JobHandleLike<{ bytes: Blob }> = { + waitFor: async () => savedOutput as unknown as { bytes: Blob }, + }; + const resolved = await resolveJobOutput(handle, repo); + // Per Spec 2 §2: best-effort, returns undefined on cache miss. + expect(resolved.bytes).toBeUndefined(); + }); +}); diff --git a/packages/test/src/test/task-graph/StreamBinaryProcessor.test.ts b/packages/test/src/test/task-graph/StreamBinaryProcessor.test.ts new file mode 100644 index 000000000..d97534e46 --- /dev/null +++ b/packages/test/src/test/task-graph/StreamBinaryProcessor.test.ts @@ -0,0 +1,95 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { StreamEvent } from "@workglow/task-graph"; +import { IExecuteContext, Task, TaskRegistry } from "@workglow/task-graph"; +import { sleep } from "@workglow/util"; +import { DataPortSchema } from "@workglow/util/schema"; +import { beforeAll, describe, expect, it } from "vitest"; + +type BinOut = { bytes: Blob | ArrayBuffer }; + +/** + * A streaming source task (binary mode) that yields two byte chunks and an + * empty finish, mirroring how real binary producers emit `binary-delta` events. + */ +class BlobStreamTask extends Task, BinOut> { + public static override type = "BlobStreamTask"; + public static override category = "Test"; + public static override cacheable = false; + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { bytes: { type: "object", format: "blob", "x-stream": "binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([1, 2]) }; + await sleep(2); + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([3, 4]) }; + yield { type: "finish", data: {} as BinOut }; + } +} + +class ArrayBufferStreamTask extends BlobStreamTask { + public static override type = "ArrayBufferStreamTask"; + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { bytes: { type: "object", format: "binary", "x-stream": "binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } +} + +class BinaryFinishOverrideTask extends BlobStreamTask { + public static override type = "BinaryFinishOverrideTask"; + + override async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([9, 9]) }; + // Explicit finish payload at the binary port must win over accumulation. + yield { type: "finish", data: { bytes: new Blob([new Uint8Array([7])]) } as BinOut }; + } +} + +describe("StreamProcessor binary accumulation", () => { + beforeAll(() => { + TaskRegistry.registerTask(BlobStreamTask); + TaskRegistry.registerTask(ArrayBufferStreamTask); + TaskRegistry.registerTask(BinaryFinishOverrideTask); + }); + + it("accumulates binary deltas into a Blob (format: blob)", async () => { + const task = new BlobStreamTask({}); + const out = (await task.run()) as BinOut; + expect(out.bytes).toBeInstanceOf(Blob); + const buf = await (out.bytes as Blob).arrayBuffer(); + expect(Array.from(new Uint8Array(buf))).toEqual([1, 2, 3, 4]); + }); + + it("accumulates binary deltas into an ArrayBuffer (format: binary)", async () => { + const task = new ArrayBufferStreamTask({}); + const out = (await task.run()) as BinOut; + expect(out.bytes).toBeInstanceOf(ArrayBuffer); + expect(Array.from(new Uint8Array(out.bytes as ArrayBuffer))).toEqual([1, 2, 3, 4]); + }); + + it("uses explicit finish payload at the binary port verbatim", async () => { + const out = (await new BinaryFinishOverrideTask({}).run()) as BinOut; + const buf = await (out.bytes as Blob).arrayBuffer(); + expect(Array.from(new Uint8Array(buf))).toEqual([7]); // not [9,9] + }); +}); diff --git a/packages/test/src/test/task-graph/StreamBinaryPump.test.ts b/packages/test/src/test/task-graph/StreamBinaryPump.test.ts new file mode 100644 index 000000000..2968139b7 --- /dev/null +++ b/packages/test/src/test/task-graph/StreamBinaryPump.test.ts @@ -0,0 +1,625 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * StreamPump binary-stream behavior. + * + * C1 (regression guard): a binary source feeding a NON-binary consumer must + * MATERIALIZE across the edge — `edgeNeedsAccumulation(binary → non-stream)` is + * `true`, so the pump accumulates and the sink receives a finished `Blob`. + * + * C2 (cache-streaming decision + assembly): + * - The DECISION (`StreamPump.canStreamBinaryToCache`) is asserted directly, in + * isolation from a live run: `true` for a streaming-capable cache + binary-only + * leaf with no value-needing consumer; `false` for a buffered cache, for a + * downstream edge that needs the materialized value, and (defensively) for a + * cache that cannot report `supportsStreaming()`. + * - The byte-stream assembly (`binary-delta` events → `AsyncIterable` + * → `saveOutputStream`) is unit-tested in isolation via + * `StreamPump.pipeBinaryToCache`, which asserts the cache RECEIVES the bytes. + * + * NOTE (reduced scope): the live cache pipe through `TaskRunner` (suppressing the + * buffered save and driving `saveOutputStream` with the real normalized cache + * key) is deferred to Spec 2 — `TaskRunner.run()` owns `keyInputs`/policy slot + * resolution and StreamPump calls it as a black box. See the NOTE in + * `StreamPump.runStreamingTask`. + * + * IMPORTANT: in the current reduced scope nothing drives `saveOutputStream` during + * a REAL graph run, so a streaming-cache run currently emits a finish with the + * binary port absent AND the cache never receives the bytes. That absence is NOT + * the desired outcome — it is a known-incomplete path. The goal we assert here is + * "decision = don't accumulate"; live delivery (the cache actually getting the + * bytes on a real run) is completed in Spec 2. We deliberately do NOT assert that + * a binary-less finish from a real streaming-cache run is "correct", because that + * would bless silent data loss. + */ + +import type { CacheRef, ITask, StreamEvent, TaskInput, TaskOutput } from "@workglow/task-graph"; +import type { DataPortSchema } from "@workglow/util/schema"; +import { + Dataflow, + getBinaryPortId, + IExecuteContext, + StreamPump, + Task, + TaskGraph, + TaskGraphRunner, + TaskOutputRepository, + TaskStatus, +} from "@workglow/task-graph"; +import { setLogger, sleep } from "@workglow/util"; +import { beforeEach, describe, expect, it } from "vitest"; +import { getTestingLogger } from "../../binding/TestingLogger"; + +setLogger(getTestingLogger()); + +// ============================================================================ +// Test tasks +// ============================================================================ + +type BinOut = { bytes: Blob | ArrayBuffer }; + +/** + * Binary streaming source: yields two `binary-delta` chunks then an empty + * `finish` (mirrors a real producer that does not re-buffer its output). + */ +class BinaryStreamSource extends Task, BinOut> { + public static override type = "StreamBinaryPump_Source"; + public static override category = "Test"; + public static override cacheable = false; + + public static override inputSchema(): DataPortSchema { + return { type: "object", properties: {}, additionalProperties: false } as const; + } + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { bytes: { type: "object", format: "blob", "x-stream": "binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + async *executeStream( + _input: Record, + context: IExecuteContext + ): AsyncIterable> { + if (context.signal.aborted) return; + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([1, 2]) }; + await sleep(2); + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([3, 4]) }; + yield { type: "finish", data: {} as BinOut }; + } + + override async execute(): Promise { + return { bytes: new Blob([new Uint8Array([1, 2, 3, 4])]) }; + } +} + +/** + * A cacheable variant — needed to exercise the cache-streaming decision (the + * cache is only consulted for cacheable tasks). + */ +class CacheableBinaryStreamSource extends BinaryStreamSource { + public static override type = "StreamBinaryPump_CacheableSource"; + public static override cacheable = true; +} + +type SinkInput = { bytes: Blob | ArrayBuffer }; +type SinkOutput = { length: number; isBlob: boolean }; + +/** + * Non-binary consumer: its `bytes` input port has NO `x-stream`, so a binary + * source feeding it MUST materialize across the edge. + */ +class BinarySinkTask extends Task { + public static override type = "StreamBinaryPump_Sink"; + public static override category = "Test"; + public static override cacheable = false; + + public received: Blob | ArrayBuffer | undefined = undefined; + + public static override inputSchema(): DataPortSchema { + // No `type` constraint (accepts the materialized Blob at runtime) and NO + // `x-stream` ⇒ a non-streaming consumer that needs the value across the edge. + return { + type: "object", + properties: { bytes: { title: "Bytes", description: "materialized binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { + length: { type: "number" }, + isBlob: { type: "boolean" }, + }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + override async execute(input: SinkInput): Promise { + this.received = input.bytes; + if (input.bytes instanceof Blob) { + return { length: input.bytes.size, isBlob: true }; + } + if (input.bytes instanceof ArrayBuffer) { + return { length: input.bytes.byteLength, isBlob: false }; + } + return { length: -1, isBlob: false }; + } +} + +// ============================================================================ +// Cache repositories (in-test) +// ============================================================================ + +/** + * Records whether `saveOutputStream` (streaming) vs `saveOutput` (buffered) was + * invoked, and the total bytes seen through the streaming path. + */ +class StreamingMemoryRepo extends TaskOutputRepository { + public saveOutputCalls = 0; + public saveOutputStreamCalls = 0; + public streamedBytes: number[] = []; + private store = new Map(); + + constructor() { + super({ outputCompression: false }); + } + + override async saveOutput( + taskType: string, + inputs: TaskInput, + output: TaskOutput + ): Promise { + this.saveOutputCalls++; + this.store.set(taskType + JSON.stringify(inputs), output); + } + + override async getOutput(taskType: string, inputs: TaskInput): Promise { + return this.store.get(taskType + JSON.stringify(inputs)); + } + + override async clear(): Promise { + this.store.clear(); + } + + override async size(): Promise { + return this.store.size; + } + + override async clearOlderThan(): Promise {} + + override isDurable(): boolean { + return false; + } + + override async saveOutputStream( + taskType: string, + inputs: TaskInput, + chunks: AsyncIterable, + _metadata: Record + ): Promise { + this.saveOutputStreamCalls++; + let size = 0; + for await (const c of chunks) { + size += c.byteLength; + for (const b of c) this.streamedBytes.push(b); + } + return { $ref: `inmem://${taskType}::${JSON.stringify(inputs)}`, size }; + } +} + +/** + * A buffered-only cache: extends the streaming repo but removes the streaming + * capability so `supportsStreaming()` returns `false`. + */ +class BufferedMemoryRepo extends StreamingMemoryRepo { + public override saveOutputStream = + undefined as unknown as StreamingMemoryRepo["saveOutputStream"]; +} + +// ============================================================================ +// Helpers +// ============================================================================ + +function blobFromFinish(event: StreamEvent | undefined): Blob | ArrayBuffer | undefined { + if (!event || event.type !== "finish") return undefined; + return (event.data as Record)?.bytes as Blob | ArrayBuffer | undefined; +} + +async function* gen(...chunks: Uint8Array[]): AsyncIterable { + for (const c of chunks) yield c; +} + +/** + * Resolves to the awaited promise, or rejects with a sentinel if `ms` elapses + * first. Used so a regression (the promise never settling) fails fast instead + * of hanging the whole suite. + */ +function withTimeout(p: Promise, ms: number, label: string): Promise { + return Promise.race([ + p, + new Promise((_resolve, reject) => + setTimeout(() => reject(new Error(`timeout: ${label} did not settle within ${ms}ms`)), ms) + ), + ]); +} + +/** Minimal `ITask`-shaped event source for the `pipeBinaryToCache` assembly test. */ +class FakeEmitter { + private listeners = new Map void>>(); + on(name: string, fn: (...args: any[]) => void): void { + let s = this.listeners.get(name); + if (!s) this.listeners.set(name, (s = new Set())); + s.add(fn); + } + off(name: string, fn: (...args: any[]) => void): void { + this.listeners.get(name)?.delete(fn); + } + emit(name: string, ...args: any[]): void { + for (const fn of this.listeners.get(name) ?? []) fn(...args); + } + /** Total live listeners across all event names — used to assert detachment. */ + listenerCount(): number { + let n = 0; + for (const s of this.listeners.values()) n += s.size; + return n; + } +} + +// ============================================================================ +// C1: regression guard — binary source materializes across a non-binary edge +// ============================================================================ + +describe("StreamBinaryPump — C1 binary source → non-binary consumer", () => { + it("materializes a Blob across the edge (no production change)", async () => { + const graph = new TaskGraph(); + const source = new BinaryStreamSource({ id: "source" }); + const sink = new BinarySinkTask({ id: "sink" }); + + graph.addTasks([source, sink]); + graph.addDataflow(new Dataflow("source", "bytes", "sink", "bytes")); + + const runner = new TaskGraphRunner(graph); + const results = await runner.runGraph({}); + + expect(source.status).toBe(TaskStatus.COMPLETED); + expect(sink.status).toBe(TaskStatus.COMPLETED); + + // The sink received a materialized Blob with the concatenated bytes. + expect(sink.received).toBeInstanceOf(Blob); + const buf = await (sink.received as Blob).arrayBuffer(); + expect(Array.from(new Uint8Array(buf))).toEqual([1, 2, 3, 4]); + + const sinkResult = results.find((r) => r.id === "sink"); + expect(sinkResult).toBeDefined(); + expect((sinkResult!.data as SinkOutput).isBlob).toBe(true); + expect((sinkResult!.data as SinkOutput).length).toBe(4); + }); +}); + +// ============================================================================ +// C2: cache-streaming decision — asserted DIRECTLY via canStreamBinaryToCache +// +// These tests assert the DECISION in isolation, not a real-run outcome. We +// deliberately do NOT run a streaming-cache graph and assert "binary port absent +// from finish" as correct: in the reduced scope nothing drives saveOutputStream +// on a real run, so absent bytes there means SILENT DATA LOSS, not success. The +// live pipe (cache actually receiving the bytes on a real run) lands in Spec 2. +// ============================================================================ + +describe("StreamBinaryPump.canStreamBinaryToCache — decision", () => { + it("returns true: streaming cache + binary-only leaf + no value-needing consumer", () => { + const graph = new TaskGraph(); + const source = new CacheableBinaryStreamSource({ id: "source" }); + graph.addTask(source); + + expect(StreamPump.canStreamBinaryToCache(graph, source, new StreamingMemoryRepo())).toBe(true); + }); + + it("returns false: buffered (non-streaming) cache", () => { + const graph = new TaskGraph(); + const source = new CacheableBinaryStreamSource({ id: "source" }); + graph.addTask(source); + + const cache = new BufferedMemoryRepo(); + expect(cache.supportsStreaming()).toBe(false); + expect(StreamPump.canStreamBinaryToCache(graph, source, cache)).toBe(false); + }); + + it("returns false: a downstream edge needs the materialized value", () => { + const graph = new TaskGraph(); + const source = new CacheableBinaryStreamSource({ id: "source" }); + const sink = new BinarySinkTask({ id: "sink" }); + graph.addTasks([source, sink]); + graph.addDataflow(new Dataflow("source", "bytes", "sink", "bytes")); + + // Streaming-capable cache present, but the non-binary consumer needs the + // value across the edge ⇒ must still accumulate. + expect(StreamPump.canStreamBinaryToCache(graph, source, new StreamingMemoryRepo())).toBe(false); + }); + + it("returns false (defensive): a cache that cannot report supportsStreaming()", () => { + const graph = new TaskGraph(); + const source = new CacheableBinaryStreamSource({ id: "source" }); + graph.addTask(source); + + // A `{}`-style partial double with no `supportsStreaming` method: the guard + // must treat anything that can't affirmatively report streaming support as + // non-streaming, never optimistically piping. + const partialCache = {} as unknown as TaskOutputRepository; + expect(StreamPump.canStreamBinaryToCache(graph, source, partialCache)).toBe(false); + }); +}); + +// ============================================================================ +// C2: cache-streaming decision — observed on a real run via the source's finish. +// +// These run a real graph and assert the bytes ARE materialized (present) when the +// decision is "accumulate". They guard the POSITIVE outcome (bytes delivered), not +// the absence of bytes, so they do not bless data loss. +// ============================================================================ + +describe("StreamBinaryPump — C2 accumulation materializes bytes on a real run", () => { + let logger = getTestingLogger(); + setLogger(logger); + + it("DOES accumulate a leaf binary task when the cache cannot stream", async () => { + const graph = new TaskGraph(); + const source = new CacheableBinaryStreamSource({ id: "source" }); + graph.addTask(source); + const runner = new TaskGraphRunner(graph); + + const finishes: StreamEvent[] = []; + source.on("stream_chunk", (e) => { + if (e.type === "finish") finishes.push(e); + }); + + const cache = new BufferedMemoryRepo(); + expect(cache.supportsStreaming()).toBe(false); + await runner.runGraph({}, { outputCache: cache }); + + // Decision = true ⇒ enriched finish ⇒ binary port materialized to a Blob. + expect(finishes.length).toBe(1); + const bytes = blobFromFinish(finishes[0]); + expect(bytes).toBeInstanceOf(Blob); + const buf = await (bytes as Blob).arrayBuffer(); + expect(Array.from(new Uint8Array(buf))).toEqual([1, 2, 3, 4]); + }); + + it("tees when a downstream edge needs materialized AND the cache can stream", async () => { + // Spec 2 Phase E: cache-can-stream + downstream-needs-materialized used to + // inhibit refs entirely. Now both paths fire — accumulator drives the + // enriched finish event (Blob for the edge consumer) and the router + // writes to the cache so the queue/cache row stays small. + const graph = new TaskGraph(); + const source = new CacheableBinaryStreamSource({ id: "source" }); + const sink = new BinarySinkTask({ id: "sink" }); + graph.addTasks([source, sink]); + graph.addDataflow(new Dataflow("source", "bytes", "sink", "bytes")); + const runner = new TaskGraphRunner(graph); + + const finishes: StreamEvent[] = []; + source.on("stream_chunk", (e) => { + if (e.type === "finish") finishes.push(e); + }); + + const cache = new StreamingMemoryRepo(); + await runner.runGraph({}, { outputCache: cache }); + + // Edge path: downstream still receives a materialized Blob. + expect(finishes.length).toBe(1); + const bytes = blobFromFinish(finishes[0]); + expect(bytes).toBeInstanceOf(Blob); + expect(sink.received).toBeInstanceOf(Blob); + + // Cache path: the streaming sink fired too (tee). + expect(cache.saveOutputStreamCalls).toBeGreaterThanOrEqual(1); + }); +}); + +// ============================================================================ +// C2: byte-stream assembly (binary-delta events → AsyncIterable → sink) +// ============================================================================ + +describe("StreamBinaryPump.pipeBinaryToCache — assembly", () => { + it("feeds binary-delta chunks to the sink and resolves on stream_end", async () => { + const emitter = new FakeEmitter(); + const repo = new StreamingMemoryRepo(); + + const { promise } = StreamPump.pipeBinaryToCache( + emitter as unknown as ITask, + "bytes", + (chunks) => repo.saveOutputStream("T", { k: 1 }, chunks, {}) + ); + + emitter.emit("stream_chunk", { + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([1, 2]), + }); + emitter.emit("stream_chunk", { + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([3]), + }); + emitter.emit("stream_end", {}); + + await promise; + + expect(repo.saveOutputStreamCalls).toBe(1); + expect(repo.saveOutputCalls).toBe(0); + expect(repo.streamedBytes).toEqual([1, 2, 3]); + }); + + it("filters chunks to the requested binary port", async () => { + const emitter = new FakeEmitter(); + const seen: number[] = []; + + const { promise } = StreamPump.pipeBinaryToCache( + emitter as unknown as ITask, + "bytes", + async (chunks) => { + for await (const c of chunks) for (const b of c) seen.push(b); + } + ); + + emitter.emit("stream_chunk", { + type: "binary-delta", + port: "other", + binaryDelta: new Uint8Array([9]), + }); + emitter.emit("stream_chunk", { + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([5, 6]), + }); + emitter.emit("stream_end", {}); + + await promise; + expect(seen).toEqual([5, 6]); + }); + + it("uses getBinaryPortId to resolve the source's binary port", () => { + expect(getBinaryPortId(CacheableBinaryStreamSource.outputSchema())).toBe("bytes"); + }); + + // -------------------------------------------------------------------------- + // Failure path: source aborts/errors WITHOUT emitting stream_end. + // + // StreamProcessor emits `stream_end` ONLY on success — on abort/error it + // throws before emitting it. Against the OLD helper (which terminated the + // iterable only on `stream_end`) the iterable would await forever, the sink + // would never resolve, the returned promise would never settle, and `detach` + // (wired via `.finally`) would never run → permanent listener leak + hang. + // These tests force settlement under a timeout guard so a regression fails + // fast rather than hanging the suite, and assert all listeners are detached. + // -------------------------------------------------------------------------- + + it("settles and detaches when the source ABORTS without stream_end", async () => { + const emitter = new FakeEmitter(); + const seen: number[] = []; + + const { promise } = StreamPump.pipeBinaryToCache( + emitter as unknown as ITask, + "bytes", + async (chunks) => { + for await (const c of chunks) for (const b of c) seen.push(b); + } + ); + + // A chunk or two, then abort — NO stream_end. + emitter.emit("stream_chunk", { + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([1, 2]), + }); + emitter.emit("abort", new Error("aborted")); + + // Against the OLD code this never settles → withTimeout rejects → test fails. + await withTimeout(promise, 500, "pipeBinaryToCache(abort)"); + + // The bytes seen before the abort were finalized, and listeners are gone. + expect(seen).toEqual([1, 2]); + expect(emitter.listenerCount()).toBe(0); + }); + + it("settles and detaches when the source ERRORS without stream_end", async () => { + const emitter = new FakeEmitter(); + + const { promise } = StreamPump.pipeBinaryToCache( + emitter as unknown as ITask, + "bytes", + async (chunks) => { + // Consume whatever arrives; the helper must close the iterable on error. + for await (const _c of chunks) { + /* drain */ + } + } + ); + + emitter.emit("error", new Error("boom")); + + await withTimeout(promise, 500, "pipeBinaryToCache(error)"); + expect(emitter.listenerCount()).toBe(0); + }); + + it("settles and detaches when an AbortSignal fires without stream_end", async () => { + const emitter = new FakeEmitter(); + const controller = new AbortController(); + + const { promise } = StreamPump.pipeBinaryToCache( + emitter as unknown as ITask, + "bytes", + async (chunks) => { + for await (const _c of chunks) { + /* drain */ + } + }, + controller.signal + ); + + emitter.emit("stream_chunk", { + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([7]), + }); + controller.abort(); + + await withTimeout(promise, 500, "pipeBinaryToCache(signal)"); + expect(emitter.listenerCount()).toBe(0); + }); + + it("settles when the AbortSignal is ALREADY aborted at call time", async () => { + const emitter = new FakeEmitter(); + const { promise } = StreamPump.pipeBinaryToCache( + emitter as unknown as ITask, + "bytes", + async (chunks) => { + for await (const _c of chunks) { + /* drain */ + } + }, + AbortSignal.abort() + ); + + await withTimeout(promise, 500, "pipeBinaryToCache(pre-aborted)"); + expect(emitter.listenerCount()).toBe(0); + }); +}); + +// ============================================================================ +// Sanity: the in-test repos behave as expected +// ============================================================================ + +describe("StreamBinaryPump — repo capability sanity", () => { + let repo: StreamingMemoryRepo; + beforeEach(() => { + repo = new StreamingMemoryRepo(); + }); + + it("streaming repo reports supportsStreaming() === true", () => { + expect(repo.supportsStreaming()).toBe(true); + }); + + it("buffered repo reports supportsStreaming() === false", () => { + expect(new BufferedMemoryRepo().supportsStreaming()).toBe(false); + }); + + it("saveOutputStream concatenates all delivered bytes", async () => { + await repo.saveOutputStream( + "T", + { k: 1 }, + gen(new Uint8Array([1, 2]), new Uint8Array([3])), + {} + ); + expect(repo.streamedBytes).toEqual([1, 2, 3]); + }); +}); diff --git a/packages/test/src/test/task-graph/StreamBinaryTypes.test.ts b/packages/test/src/test/task-graph/StreamBinaryTypes.test.ts new file mode 100644 index 000000000..5a48d3e85 --- /dev/null +++ b/packages/test/src/test/task-graph/StreamBinaryTypes.test.ts @@ -0,0 +1,126 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ +import { describe, expect, it } from "vitest"; +import type { StreamBinaryDelta, StreamEvent, StreamMode } from "@workglow/task-graph"; +import { + getPortStreamMode, + getStreamingPorts, + getOutputStreamMode, + getBinaryPortId, + edgeNeedsAccumulation, + materializeBinary, +} from "@workglow/task-graph"; +import type { DataPortSchema } from "@workglow/util/schema"; + +const binarySchema = { + type: "object", + properties: { + bytes: { type: "object", format: "blob", "x-stream": "binary" }, + }, + additionalProperties: false, +} as const satisfies DataPortSchema; + +const mixedSchema = { + type: "object", + properties: { + text: { type: "string", "x-stream": "append" }, + bytes: { type: "object", format: "binary", "x-stream": "binary" }, + }, + additionalProperties: false, +} as const satisfies DataPortSchema; + +describe("StreamBinaryDelta type", () => { + it("is assignable to StreamEvent and carries a Uint8Array delta", () => { + const evt: StreamEvent = { + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([1, 2, 3]), + } satisfies StreamBinaryDelta; + expect(evt.type).toBe("binary-delta"); + if (evt.type === "binary-delta") { + expect(evt.binaryDelta).toBeInstanceOf(Uint8Array); + expect(Array.from(evt.binaryDelta)).toEqual([1, 2, 3]); + } + }); + + it("admits 'binary' as a StreamMode", () => { + const mode: StreamMode = "binary"; + expect(mode).toBe("binary"); + }); +}); + +describe("binary-aware port helpers", () => { + it("getPortStreamMode returns 'binary'", () => { + expect(getPortStreamMode(binarySchema, "bytes")).toBe("binary"); + }); + + it("getStreamingPorts includes binary ports", () => { + expect(getStreamingPorts(binarySchema)).toEqual([{ port: "bytes", mode: "binary" }]); + }); + + it("getOutputStreamMode returns 'binary' for a single binary port", () => { + expect(getOutputStreamMode(binarySchema)).toBe("binary"); + }); + + it("getOutputStreamMode returns 'mixed' for append + binary", () => { + expect(getOutputStreamMode(mixedSchema)).toBe("mixed"); + }); + + it("getBinaryPortId finds the first binary port", () => { + expect(getBinaryPortId(binarySchema)).toBe("bytes"); + expect(getBinaryPortId(mixedSchema)).toBe("bytes"); + }); + + it("getBinaryPortId returns undefined when no binary port", () => { + const noBinary = { + type: "object", + properties: { text: { type: "string", "x-stream": "append" } }, + } as const satisfies DataPortSchema; + expect(getBinaryPortId(noBinary)).toBeUndefined(); + }); + + it("edgeNeedsAccumulation: binary source → non-binary target accumulates", () => { + const target = { + type: "object", + properties: { bytes: { type: "object" } }, + } as const satisfies DataPortSchema; + expect(edgeNeedsAccumulation(binarySchema, "bytes", target, "bytes")).toBe(true); + }); + + it("edgeNeedsAccumulation: binary → binary passes through", () => { + expect(edgeNeedsAccumulation(binarySchema, "bytes", binarySchema, "bytes")).toBe(false); + }); +}); + +describe("materializeBinary", () => { + const chunks = [new Uint8Array([1, 2]), new Uint8Array([3, 4, 5])]; + + it("concatenates to an ArrayBuffer when format is 'binary'", async () => { + const out = materializeBinary(chunks, "binary"); + expect(out).toBeInstanceOf(ArrayBuffer); + expect(Array.from(new Uint8Array(out as ArrayBuffer))).toEqual([1, 2, 3, 4, 5]); + }); + + it("concatenates to a Blob when format is 'blob'", async () => { + const out = materializeBinary(chunks, "blob"); + expect(out).toBeInstanceOf(Blob); + const buf = await (out as Blob).arrayBuffer(); + expect(Array.from(new Uint8Array(buf))).toEqual([1, 2, 3, 4, 5]); + }); + + it("defaults to Blob when format is undefined", () => { + expect(materializeBinary(chunks, undefined)).toBeInstanceOf(Blob); + }); + + it("handles an empty chunk list", () => { + expect(materializeBinary([], "binary")).toBeInstanceOf(ArrayBuffer); + expect((materializeBinary([], "binary") as ArrayBuffer).byteLength).toBe(0); + }); + + it("treats an unknown format as binary (ArrayBuffer)", () => { + expect(materializeBinary(chunks, "wat")).toBeInstanceOf(ArrayBuffer); + }); +}); diff --git a/packages/test/src/test/task-graph/StreamProcessorBinaryRefSink.test.ts b/packages/test/src/test/task-graph/StreamProcessorBinaryRefSink.test.ts new file mode 100644 index 000000000..f14b9d42d --- /dev/null +++ b/packages/test/src/test/task-graph/StreamProcessorBinaryRefSink.test.ts @@ -0,0 +1,225 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { BinaryRefSink, CacheRef, StreamEvent } from "@workglow/task-graph"; +import { IExecuteContext, isCacheRef, Task, TaskRegistry } from "@workglow/task-graph"; +import { sleep } from "@workglow/util"; +import { DataPortSchema } from "@workglow/util/schema"; +import { beforeAll, describe, expect, it } from "vitest"; + +type BinOut = { bytes: Blob | ArrayBuffer }; +type TwoBinOut = { audio: Blob; transcript: Blob }; + +class BlobStreamTask extends Task, BinOut> { + public static override type = "BinaryRefSinkTest_BlobStream"; + public static override category = "Test"; + public static override cacheable = false; + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { bytes: { type: "object", format: "blob", "x-stream": "binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([1, 2]) }; + await sleep(1); + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([3]) }; + yield { type: "finish", data: {} as BinOut }; + } +} + +class TwoPortStreamTask extends Task, TwoBinOut> { + public static override type = "BinaryRefSinkTest_TwoPort"; + public static override category = "Test"; + public static override cacheable = false; + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { + audio: { type: "object", format: "blob", "x-stream": "binary" }, + transcript: { type: "object", format: "blob", "x-stream": "binary" }, + }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + yield { type: "binary-delta", port: "audio", binaryDelta: new Uint8Array([10, 11]) }; + yield { type: "binary-delta", port: "transcript", binaryDelta: new Uint8Array([20]) }; + yield { type: "binary-delta", port: "audio", binaryDelta: new Uint8Array([12]) }; + yield { type: "finish", data: {} as TwoBinOut }; + } +} + +class ExplicitFinishPayloadTask extends BlobStreamTask { + public static override type = "BinaryRefSinkTest_ExplicitFinish"; + + override async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([9, 9]) }; + yield { type: "finish", data: { bytes: new Blob([new Uint8Array([7])]) } as BinOut }; + } +} + +beforeAll(() => { + TaskRegistry.registerTask(BlobStreamTask as any); + TaskRegistry.registerTask(TwoPortStreamTask as any); + TaskRegistry.registerTask(ExplicitFinishPayloadTask as any); +}); + +function makeSink(): { + sink: BinaryRefSink; + collected: Promise<{ ref: CacheRef; bytes: number[] }>; +} { + const $ref = `inmem://test/${Math.random().toString(36).slice(2)}`; + let resolveCollected: (v: { ref: CacheRef; bytes: number[] }) => void = () => {}; + let rejectCollected: (e: unknown) => void = () => {}; + const collected = new Promise<{ ref: CacheRef; bytes: number[] }>((res, rej) => { + resolveCollected = res; + rejectCollected = rej; + }); + const sink: BinaryRefSink = async (chunks) => { + const bytes: number[] = []; + try { + for await (const c of chunks) { + for (const b of c) bytes.push(b); + } + } catch (err) { + rejectCollected(err); + throw err; + } + const ref = { $ref, size: bytes.length, mime: "application/octet-stream" }; + resolveCollected({ ref, bytes }); + return ref; + }; + return { sink, collected }; +} + +describe("StreamProcessor — binaryRefSinks (direct deps wiring)", () => { + it("routes a single binary port to its sink and produces CacheRef in Output", async () => { + const task = new BlobStreamTask(); + const { sink, collected } = makeSink(); + + // Drive the streamProcessor directly with sinks injected. + const processor = (task as any).runner.streamProcessor as { + run(input: any, ctx: any, deps: any): Promise; + }; + + // Mimic minimal ctx + deps the processor needs. + const abortController = new AbortController(); + const ctx = { + abortController, + shouldAccumulate: true, + registry: undefined, + runId: undefined, + runStartedAt: new Date(), + runOutputData: {}, + telemetrySpan: undefined, + dispose: () => {}, + } as any; + + const output = (await processor.run({}, ctx, { + registry: undefined as any, + resourceScope: undefined, + inputStreams: undefined, + onProgress: async () => {}, + own: (t: T) => t, + binaryRefSinks: new Map([["bytes", sink]]), + })) as BinOut; + + expect(output).toBeDefined(); + expect(isCacheRef((output as any).bytes)).toBe(true); + const { ref, bytes } = await collected; + expect(ref.size).toBe(3); + expect(bytes).toEqual([1, 2, 3]); + expect((output as any).bytes.$ref).toBe(ref.$ref); + }); + + it("routes only the configured port; other binary ports continue to accumulate", async () => { + const task = new TwoPortStreamTask(); + const { sink: audioSink, collected: audioCollected } = makeSink(); + + const processor = (task as any).runner.streamProcessor as { + run(input: any, ctx: any, deps: any): Promise; + }; + const abortController = new AbortController(); + const ctx = { + abortController, + shouldAccumulate: true, + registry: undefined, + runId: undefined, + runStartedAt: new Date(), + runOutputData: {}, + telemetrySpan: undefined, + dispose: () => {}, + } as any; + + const output = (await processor.run({}, ctx, { + registry: undefined as any, + resourceScope: undefined, + inputStreams: undefined, + onProgress: async () => {}, + own: (t: T) => t, + binaryRefSinks: new Map([["audio", audioSink]]), + })) as TwoBinOut; + + expect(isCacheRef((output as any).audio)).toBe(true); + expect((output as any).transcript).toBeInstanceOf(Blob); + const { bytes: audioBytes } = await audioCollected; + expect(audioBytes).toEqual([10, 11, 12]); + const transcriptBytes = new Uint8Array(await (output.transcript as Blob).arrayBuffer()); + expect(Array.from(transcriptBytes)).toEqual([20]); + }); + + it("explicit binary finish payload wins over the sink's CacheRef (artifact precedence)", async () => { + const task = new ExplicitFinishPayloadTask(); + const { sink, collected } = makeSink(); + const processor = (task as any).runner.streamProcessor as { + run(input: any, ctx: any, deps: any): Promise; + }; + const abortController = new AbortController(); + const ctx = { + abortController, + shouldAccumulate: true, + registry: undefined, + runId: undefined, + runStartedAt: new Date(), + runOutputData: {}, + telemetrySpan: undefined, + dispose: () => {}, + } as any; + + const output = (await processor.run({}, ctx, { + registry: undefined as any, + resourceScope: undefined, + inputStreams: undefined, + onProgress: async () => {}, + own: (t: T) => t, + binaryRefSinks: new Map([["bytes", sink]]), + })) as BinOut; + + // The explicit finish payload (Blob of [7]) takes the slot, not the ref. + expect(isCacheRef((output as any).bytes)).toBe(false); + expect((output as any).bytes).toBeInstanceOf(Blob); + const blobBytes = new Uint8Array(await (output.bytes as Blob).arrayBuffer()); + expect(Array.from(blobBytes)).toEqual([7]); + // The sink still observed the deltas (just lost the race for the slot). + const { bytes } = await collected; + expect(bytes).toEqual([9, 9]); + }); +}); diff --git a/packages/test/src/test/task-graph/TaskOutputRepositoryStream.test.ts b/packages/test/src/test/task-graph/TaskOutputRepositoryStream.test.ts new file mode 100644 index 000000000..aa852b90f --- /dev/null +++ b/packages/test/src/test/task-graph/TaskOutputRepositoryStream.test.ts @@ -0,0 +1,190 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ +import { describe, expect, it } from "vitest"; +import { RunPrivateCacheRepo, TaskOutputRepository } from "@workglow/task-graph"; +import type { CacheRef, TaskInput, TaskOutput } from "@workglow/task-graph"; + +class StreamingMemoryRepo extends TaskOutputRepository { + public readonly streamed = new Map(); + public readonly streamedMetadata = new Map>(); + private store = new Map(); + override async saveOutput(t: string, i: TaskInput, o: TaskOutput): Promise { + this.store.set(t + JSON.stringify(i), o); + } + override async getOutput(t: string, i: TaskInput): Promise { + return this.store.get(t + JSON.stringify(i)); + } + override async clear(): Promise { + this.store.clear(); + this.streamed.clear(); + } + override async size(): Promise { + return this.store.size; + } + override async clearOlderThan(): Promise {} + override isDurable(): boolean { + return false; + } + override async saveOutputStream( + taskType: string, + inputs: TaskInput, + chunks: AsyncIterable, + metadata: Record + ): Promise { + const parts: Uint8Array[] = []; + for await (const c of chunks) parts.push(c); + let total = 0; + for (const p of parts) total += p.byteLength; + const merged = new Uint8Array(total); + let off = 0; + for (const p of parts) { + merged.set(p, off); + off += p.byteLength; + } + const key = taskType + JSON.stringify(inputs); + this.streamed.set(key, merged); + this.streamedMetadata.set(key, metadata); + return { $ref: `inmem://${key}`, size: total }; + } + override async getOutputByRef(ref: CacheRef): Promise { + const key = ref.$ref.replace(/^inmem:\/\//, ""); + const bytes = this.streamed.get(key); + return bytes === undefined ? undefined : new Blob([bytes as unknown as BlobPart]); + } + override getOutputStreamByRef(ref: CacheRef): AsyncIterable | undefined { + const key = ref.$ref.replace(/^inmem:\/\//, ""); + const bytes = this.streamed.get(key); + if (bytes === undefined) return undefined; + return (async function* () { + yield bytes; + })(); + } +} + +// A minimal repo that simply does NOT define `saveOutputStream`, so it has no +// streaming capability (no double-cast shadowing). +class NonStreamingMemoryRepo extends TaskOutputRepository { + private store = new Map(); + override async saveOutput(t: string, i: TaskInput, o: TaskOutput): Promise { + this.store.set(t + JSON.stringify(i), o); + } + override async getOutput(t: string, i: TaskInput): Promise { + return this.store.get(t + JSON.stringify(i)); + } + override async clear(): Promise { + this.store.clear(); + } + override async size(): Promise { + return this.store.size; + } + override async clearOlderThan(): Promise {} + override isDurable(): boolean { + return false; + } +} + +async function* gen(...chunks: Uint8Array[]): AsyncIterable { + for (const c of chunks) yield c; +} + +describe("TaskOutputRepository.saveOutputStream", () => { + it("supportsStreaming reflects presence of saveOutputStream", () => { + expect(new StreamingMemoryRepo({}).supportsStreaming()).toBe(true); + expect(new NonStreamingMemoryRepo({}).supportsStreaming()).toBe(false); + }); + + it("streams chunks and the total equals total bytes streamed", async () => { + const repo = new StreamingMemoryRepo({}); + await repo.saveOutputStream( + "T", + { k: 1 }, + gen(new Uint8Array([1, 2]), new Uint8Array([3])), + {} + ); + expect(Array.from(repo.streamed.get('T{"k":1}')!)).toEqual([1, 2, 3]); + }); + + it("an empty stream stores a zero-length Uint8Array", async () => { + const repo = new StreamingMemoryRepo({}); + await repo.saveOutputStream("T", { k: 1 }, gen(), {}); + const stored = repo.streamed.get('T{"k":1}')!; + expect(stored).toBeInstanceOf(Uint8Array); + expect(stored.byteLength).toBe(0); + }); + + it("passes the metadata arg through to the repo (side-band contract)", async () => { + const repo = new StreamingMemoryRepo({}); + const metadata = { contentType: "application/octet-stream", status: 200 }; + await repo.saveOutputStream("T", { k: 1 }, gen(new Uint8Array([9])), metadata); + expect(repo.streamedMetadata.get('T{"k":1}')).toEqual(metadata); + }); + + it("RunPrivateCacheRepo forwards streaming with namespaced taskType", async () => { + const backing = new StreamingMemoryRepo({}); + const wrapper = new RunPrivateCacheRepo({ backing, runId: "run-A" }); + + expect(wrapper.supportsStreaming()).toBe(true); + + await wrapper.saveOutputStream("T", { k: 1 }, gen(new Uint8Array([1, 2, 3])), {}); + + // taskType is namespaced exactly as saveOutput namespaces it. + const namespacedKey = `__run:run-A::T${JSON.stringify({ k: 1 })}`; + expect(Array.from(backing.streamed.get(namespacedKey)!)).toEqual([1, 2, 3]); + expect(backing.streamed.has('T{"k":1}')).toBe(false); + }); + + it("RunPrivateCacheRepo.supportsStreaming() is false when backing lacks it", () => { + const backing = new NonStreamingMemoryRepo({}); + const wrapper = new RunPrivateCacheRepo({ backing, runId: "run-A" }); + expect(wrapper.supportsStreaming()).toBe(false); + }); + + it("saveOutputStream returns a CacheRef the same backing can resolve to bytes", async () => { + const repo = new StreamingMemoryRepo({}); + const ref = await repo.saveOutputStream( + "T", + { k: 1 }, + gen(new Uint8Array([7, 8, 9])), + {} + ); + expect(typeof ref.$ref).toBe("string"); + expect(ref.size).toBe(3); + const hydrated = await repo.getOutputByRef(ref); + expect(hydrated).toBeInstanceOf(Blob); + const bytes = new Uint8Array(await hydrated!.arrayBuffer()); + expect(Array.from(bytes)).toEqual([7, 8, 9]); + }); + + it("getOutputStreamByRef yields bytes for a saved ref", async () => { + const repo = new StreamingMemoryRepo({}); + const ref = await repo.saveOutputStream("T", { k: 2 }, gen(new Uint8Array([4, 5])), {}); + const stream = repo.getOutputStreamByRef(ref); + expect(stream).toBeDefined(); + const collected: number[] = []; + for await (const chunk of stream!) { + for (const b of chunk) collected.push(b); + } + expect(collected).toEqual([4, 5]); + }); + + it("getOutputByRef returns undefined after clear (dangling reference)", async () => { + const repo = new StreamingMemoryRepo({}); + const ref = await repo.saveOutputStream("T", { k: 3 }, gen(new Uint8Array([1])), {}); + expect(await repo.getOutputByRef(ref)).toBeInstanceOf(Blob); + await repo.clear(); + expect(await repo.getOutputByRef(ref)).toBeUndefined(); + }); + + it("RunPrivateCacheRepo forwards getOutputByRef to backing", async () => { + const backing = new StreamingMemoryRepo({}); + const wrapper = new RunPrivateCacheRepo({ backing, runId: "run-B" }); + const ref = await wrapper.saveOutputStream("T", { k: 4 }, gen(new Uint8Array([42])), {}); + const hydrated = await wrapper.getOutputByRef(ref); + expect(hydrated).toBeInstanceOf(Blob); + const bytes = new Uint8Array(await hydrated!.arrayBuffer()); + expect(Array.from(bytes)).toEqual([42]); + }); +}); diff --git a/packages/test/src/test/task-graph/TaskRunnerRefPath.test.ts b/packages/test/src/test/task-graph/TaskRunnerRefPath.test.ts new file mode 100644 index 000000000..b644da9b2 --- /dev/null +++ b/packages/test/src/test/task-graph/TaskRunnerRefPath.test.ts @@ -0,0 +1,205 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { CacheRef, StreamEvent } from "@workglow/task-graph"; +import { + CACHE_REGISTRY, + DefaultCacheRegistry, + IExecuteContext, + isCacheRef, + Task, + TaskOutputRepository, + TaskRegistry, +} from "@workglow/task-graph"; +import type { TaskInput, TaskOutput } from "@workglow/task-graph"; +import { Container, ServiceRegistry, sleep } from "@workglow/util"; +import { DataPortSchema } from "@workglow/util/schema"; +import { beforeAll, beforeEach, describe, expect, it } from "vitest"; + +type BinOut = { bytes: Blob | ArrayBuffer }; + +class StreamingMemoryRepo extends TaskOutputRepository { + public readonly saved = new Map(); + public readonly streamed = new Map(); + public saveOutputCalls = 0; + public saveOutputStreamCalls = 0; + + override async saveOutput(t: string, i: TaskInput, o: TaskOutput): Promise { + this.saveOutputCalls++; + this.saved.set(t + JSON.stringify(i), o); + } + override async getOutput(t: string, i: TaskInput): Promise { + return this.saved.get(t + JSON.stringify(i)); + } + override async clear(): Promise { + this.saved.clear(); + this.streamed.clear(); + } + override async size(): Promise { + return this.saved.size; + } + override async clearOlderThan(): Promise {} + override isDurable(): boolean { + return false; + } + override async saveOutputStream( + taskType: string, + inputs: TaskInput, + chunks: AsyncIterable, + _metadata: Record + ): Promise { + this.saveOutputStreamCalls++; + const parts: Uint8Array[] = []; + let size = 0; + for await (const c of chunks) { + parts.push(c); + size += c.byteLength; + } + const merged = new Uint8Array(size); + let off = 0; + for (const p of parts) { + merged.set(p, off); + off += p.byteLength; + } + const key = `inmem://${taskType}::${JSON.stringify(inputs)}`; + this.streamed.set(key, merged); + return { $ref: key, size, mime: "application/octet-stream" }; + } + override async getOutputByRef(ref: CacheRef): Promise { + const bytes = this.streamed.get(ref.$ref); + return bytes === undefined ? undefined : new Blob([bytes as unknown as BlobPart]); + } +} + +class BlobStreamTask extends Task, BinOut> { + public static override type = "TaskRunnerRefPathTest_BlobStream"; + public static override category = "Test"; + public static override cacheable = true; + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { bytes: { type: "object", format: "blob", "x-stream": "binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([1, 2, 3]) }; + await sleep(1); + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([4, 5]) }; + yield { type: "finish", data: {} as BinOut }; + } +} + +class NonCacheableBlobStreamTask extends BlobStreamTask { + public static override type = "TaskRunnerRefPathTest_NonCacheableBlobStream"; + public static override cacheable = false; +} + +beforeAll(() => { + TaskRegistry.registerTask(BlobStreamTask as any); + TaskRegistry.registerTask(NonCacheableBlobStreamTask as any); +}); + +let repo: StreamingMemoryRepo; +let services: ServiceRegistry; +beforeEach(() => { + repo = new StreamingMemoryRepo({}); + services = new ServiceRegistry(new Container()); + services.registerInstance(CACHE_REGISTRY, new DefaultCacheRegistry({ deterministic: repo })); +}); + +describe("TaskRunner — referenceThresholdBytes: 0 (force-ref) ref path", () => { + it("Output carries a CacheRef at the binary port; bytes live in the streaming cache", async () => { + const task = new BlobStreamTask(); + const output = await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + + expect(repo.saveOutputStreamCalls).toBe(1); + expect(isCacheRef(output.bytes)).toBe(true); + + const ref = output.bytes as unknown as CacheRef; + const hydrated = await repo.getOutputByRef(ref); + expect(hydrated).toBeInstanceOf(Blob); + const bytes = new Uint8Array(await hydrated!.arrayBuffer()); + expect(Array.from(bytes)).toEqual([1, 2, 3, 4, 5]); + expect(ref.size).toBe(5); + }); + + it("saveOutput still runs (small Output with embedded ref → small queue/cache row)", async () => { + const task = new BlobStreamTask(); + await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + + expect(repo.saveOutputCalls).toBe(1); + // The cached small row contains the ref, NOT the bytes. + const [savedOutput] = Array.from(repo.saved.values()); + expect(isCacheRef((savedOutput as any).bytes)).toBe(true); + }); + + it("defaults (threshold 64 KiB) produce inline Blob in Output — small outputs rehydrate", async () => { + const task = new BlobStreamTask(); + const output = await task.run({}, { registry: services }); + + // D.4: sink runs unconditionally when cache supports streaming; the + // rehydrate step converts the ref back to an inline Blob because total + // bytes (5) is below the 64 KiB default threshold. + expect(repo.saveOutputStreamCalls).toBe(1); + expect(output.bytes).toBeInstanceOf(Blob); + const bytes = new Uint8Array(await (output.bytes as Blob).arrayBuffer()); + expect(Array.from(bytes)).toEqual([1, 2, 3, 4, 5]); + }); + + it("non-cacheable tasks fall through to accumulation even with threshold=0", async () => { + const task = new NonCacheableBlobStreamTask(); + const output = await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + + expect(repo.saveOutputStreamCalls).toBe(0); + expect(output.bytes).toBeInstanceOf(Blob); + }); +}); + +describe("TaskRunner — Phase D.4 threshold-based size decision", () => { + it("output below threshold is rehydrated to an inline Blob (sink still ran for memory bound)", async () => { + const task = new BlobStreamTask(); + // Threshold well above the 5 bytes the task produces → rehydrate inline. + const output = await task.run({}, { registry: services, referenceThresholdBytes: 100 }); + + expect(repo.saveOutputStreamCalls).toBe(1); // sink ran (memory-bounded write) + expect(output.bytes).toBeInstanceOf(Blob); // but the slot is now an inline Blob + const bytes = new Uint8Array(await (output.bytes as Blob).arrayBuffer()); + expect(Array.from(bytes)).toEqual([1, 2, 3, 4, 5]); + }); + + it("output at or above threshold keeps the CacheRef", async () => { + const task = new BlobStreamTask(); + // 5 bytes >= threshold 5 → ref survives. + const output = await task.run({}, { registry: services, referenceThresholdBytes: 5 }); + + expect(repo.saveOutputStreamCalls).toBe(1); + expect(isCacheRef(output.bytes)).toBe(true); + expect((output.bytes as unknown as CacheRef).size).toBe(5); + }); + + it("threshold=0 (force-ref) overrides the size check; the ref survives regardless", async () => { + const task = new BlobStreamTask(); + const output = await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + + expect(repo.saveOutputStreamCalls).toBe(1); + expect(isCacheRef(output.bytes)).toBe(true); + }); + + it("default threshold (64 KiB) rehydrates the small-output path automatically", async () => { + const task = new BlobStreamTask(); + // No threshold specified → resolves to 64 KiB default; 5 bytes is below. + const output = await task.run({}, { registry: services }); + + expect(repo.saveOutputStreamCalls).toBe(1); // sink now always runs when cache supports it + expect(output.bytes).toBeInstanceOf(Blob); + }); +}); diff --git a/packages/test/src/test/task-graph/resolveJobOutput.test.ts b/packages/test/src/test/task-graph/resolveJobOutput.test.ts new file mode 100644 index 000000000..0eb6d58e2 --- /dev/null +++ b/packages/test/src/test/task-graph/resolveJobOutput.test.ts @@ -0,0 +1,86 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, expect, it } from "vitest"; +import { resolveJobOutput } from "@workglow/task-graph"; +import type { CacheRef, CacheRefResolver, JobHandleLike } from "@workglow/task-graph"; + +const handleOf = (value: T): JobHandleLike => ({ + waitFor: async () => value, +}); + +const ref = (key: string, size = 0): CacheRef => ({ $ref: key, size }); + +describe("resolveJobOutput", () => { + it("awaits the job and hydrates a top-level ref through a function resolver", async () => { + const blob = new Blob([new Uint8Array([1, 2, 3])]); + const resolver: CacheRefResolver = async (r) => + r.$ref === "cache://A" ? blob : undefined; + const handle = handleOf({ bytes: ref("cache://A", 3) as unknown as Blob }); + const out = await resolveJobOutput(handle, resolver); + expect(out.bytes).toBe(blob); + }); + + it("accepts an object with getOutputByRef (TaskOutputRepository shape)", async () => { + const blob = new Blob([new Uint8Array([7, 8])]); + const backing = { + getOutputByRef: async (r: CacheRef) => (r.$ref === "cache://B" ? blob : undefined), + }; + const handle = handleOf({ payload: ref("cache://B", 2) as unknown as Blob }); + const out = await resolveJobOutput(handle, backing); + expect(out.payload).toBe(blob); + }); + + it("returns the output unchanged when the backing has no getOutputByRef", async () => { + const original = { bytes: ref("cache://x", 1) as unknown as Blob }; + const handle = handleOf(original); + const out = await resolveJobOutput(handle, {}); + expect(out).toBe(original); + }); + + it("replaces refs with undefined on cache miss (best-effort)", async () => { + const handle = handleOf({ bytes: ref("cache://missing", 1) as unknown as Blob }); + const out = await resolveJobOutput(handle, async () => undefined); + expect(out.bytes).toBeUndefined(); + }); + + it("walks nested structures", async () => { + const blob = new Blob([new Uint8Array([42])]); + const handle = handleOf({ + meta: { lang: "en" }, + payload: { audio: ref("cache://A", 1) as unknown as Blob }, + }); + const out = await resolveJobOutput(handle, async () => blob); + expect(out.meta).toEqual({ lang: "en" }); + expect(out.payload.audio).toBe(blob); + }); + + it("propagates rejection from the underlying handle.waitFor()", async () => { + const handle: JobHandleLike = { + waitFor: async () => { + throw new Error("job failed"); + }, + }; + await expect(resolveJobOutput(handle, async () => undefined)).rejects.toThrow("job failed"); + }); + + it("forwards ResolveOutputOptions to the underlying walker", async () => { + let inFlight = 0; + let observedMax = 0; + const resolver: CacheRefResolver = async () => { + inFlight++; + observedMax = Math.max(observedMax, inFlight); + await new Promise((res) => setTimeout(res, 5)); + inFlight--; + return new Blob(); + }; + const handle = handleOf( + Array.from({ length: 6 }, (_, i) => ref(`cache://r${i}`, 1) as unknown as Blob) + ); + await resolveJobOutput(handle, resolver, { concurrency: 2 }); + expect(observedMax).toBeLessThanOrEqual(2); + }); +}); diff --git a/packages/test/src/test/task-graph/resolveOutput.test.ts b/packages/test/src/test/task-graph/resolveOutput.test.ts new file mode 100644 index 000000000..1a5a4ead7 --- /dev/null +++ b/packages/test/src/test/task-graph/resolveOutput.test.ts @@ -0,0 +1,136 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, expect, it, vi } from "vitest"; +import { resolveOutput } from "@workglow/task-graph"; +import type { CacheRef, CacheRefResolver } from "@workglow/task-graph"; + +const ref = (key: string, size?: number, mime?: string): CacheRef => ({ + $ref: key, + ...(size !== undefined ? { size } : {}), + ...(mime !== undefined ? { mime } : {}), +}); + +const fakeResolver = + (table: Record): CacheRefResolver => + async (r) => + table[r.$ref]; + +describe("resolveOutput", () => { + it("returns primitives and non-ref objects unchanged", async () => { + const resolver = vi.fn(fakeResolver({})); + const input = { a: 1, b: "two", c: true, d: null }; + expect(await resolveOutput(input, resolver)).toEqual(input); + expect(resolver).not.toHaveBeenCalled(); + }); + + it("resolves a top-level ref to bytes", async () => { + const blob = new Blob([new Uint8Array([1, 2, 3])]); + const table = { "cache://x": blob }; + const out = await resolveOutput(ref("cache://x") as unknown as Blob, fakeResolver(table)); + expect(out).toBe(blob); + }); + + it("resolves refs nested inside a plain object, leaving siblings alone", async () => { + const audio = new Blob([new Uint8Array([9, 9, 9])]); + const input = { + transcript: "hello", + audio: ref("cache://a", 3, "audio/wav") as unknown as Blob, + meta: { lang: "en" }, + }; + const out = await resolveOutput(input, fakeResolver({ "cache://a": audio })); + expect(out.transcript).toBe("hello"); + expect(out.audio).toBe(audio); + expect(out.meta).toEqual({ lang: "en" }); + }); + + it("resolves refs inside arrays", async () => { + const b1 = new Blob([new Uint8Array([1])]); + const b2 = new Blob([new Uint8Array([2])]); + const input = [ + ref("cache://1") as unknown as Blob, + "plain", + ref("cache://2") as unknown as Blob, + ]; + const out = await resolveOutput(input, fakeResolver({ "cache://1": b1, "cache://2": b2 })); + expect(out[0]).toBe(b1); + expect(out[1]).toBe("plain"); + expect(out[2]).toBe(b2); + }); + + it("treats Blob, ArrayBuffer, typed arrays, Date as opaque leaves (not walked)", async () => { + const blob = new Blob([new Uint8Array([1])]); + const ab = new ArrayBuffer(8); + const u8 = new Uint8Array([5, 6, 7]); + const date = new Date(2026, 0, 1); + const resolver = vi.fn(); + const input = { blob, ab, u8, date }; + const out = await resolveOutput(input, resolver); + expect(out.blob).toBe(blob); + expect(out.ab).toBe(ab); + expect(out.u8).toBe(u8); + expect(out.date).toBe(date); + expect(resolver).not.toHaveBeenCalled(); + }); + + it("returns undefined for refs the resolver cannot resolve (best-effort)", async () => { + const input = { audio: ref("cache://missing") as unknown as Blob }; + const out = await resolveOutput(input, fakeResolver({})); + expect(out.audio).toBeUndefined(); + }); + + it("propagates resolver rejections (caller-controlled error policy)", async () => { + const failingResolver: CacheRefResolver = async () => { + throw new Error("backing down"); + }; + await expect( + resolveOutput({ x: ref("cache://k") as unknown as Blob }, failingResolver) + ).rejects.toThrow("backing down"); + }); + + it("resolves refs in deeply nested structures", async () => { + const b = new Blob([new Uint8Array([42])]); + const input = { + level1: { + level2: { + items: [{ payload: ref("cache://deep") as unknown as Blob }], + }, + }, + }; + const out = await resolveOutput(input, fakeResolver({ "cache://deep": b })); + expect(out.level1.level2.items[0].payload).toBe(b); + }); + + it("honors a concurrency bound: never exceeds the configured maximum in flight", async () => { + let inFlight = 0; + let observedMax = 0; + const resolver: CacheRefResolver = async (r) => { + inFlight++; + observedMax = Math.max(observedMax, inFlight); + await new Promise((res) => setTimeout(res, 5)); + inFlight--; + return new Blob([new Uint8Array([Number(r.$ref.slice(-1))])]); + }; + const refs = Array.from({ length: 8 }, (_, i) => ref(`cache://r${i}`)); + await resolveOutput(refs as unknown as Blob[], resolver, { concurrency: 2 }); + expect(observedMax).toBeLessThanOrEqual(2); + }); + + it("with concurrency undefined runs all resolutions in parallel", async () => { + let inFlight = 0; + let observedMax = 0; + const resolver: CacheRefResolver = async () => { + inFlight++; + observedMax = Math.max(observedMax, inFlight); + await new Promise((res) => setTimeout(res, 5)); + inFlight--; + return new Blob(); + }; + const refs = Array.from({ length: 6 }, (_, i) => ref(`cache://r${i}`)); + await resolveOutput(refs as unknown as Blob[], resolver); + expect(observedMax).toBe(6); + }); +}); diff --git a/packages/util/src/json-schema/JsonSchema.ts b/packages/util/src/json-schema/JsonSchema.ts index 4b5a1d4c0..7d5d7774a 100644 --- a/packages/util/src/json-schema/JsonSchema.ts +++ b/packages/util/src/json-schema/JsonSchema.ts @@ -24,7 +24,7 @@ export type JsonSchemaCustomProps = { "x-ui"?: unknown; "x-ui-iteration"?: boolean; // marks property as iteration-injected (hidden from parent, read-only in subgraph) "x-auto-generated"?: boolean; // marks a primary key column as auto-generated by storage backend - "x-stream"?: "append" | "replace" | "object"; // streaming mode for this port (absent = none/non-streaming) + "x-stream"?: "append" | "replace" | "object" | "binary"; // streaming mode for this port (absent = none/non-streaming) "x-structured-output"?: boolean; // marks a port as requiring structured output from the AI provider };