Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 43 additions & 6 deletions packages/task-graph/src/cache/CacheRef.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
* SPDX-License-Identifier: Apache-2.0
*/

/**
* Brand value for {@link CacheRef}. A literal string (not a Symbol) so the brand
* survives JSON serialization across queue rows / IPC boundaries — a Symbol-based
* brand would be erased by `JSON.stringify` and the resulting object would no
* longer be identifiable as a cache reference on the receiving side.
*/
export const CACHE_REF_KIND = "task-graph/CacheRef" as const;

/**
* A reference to bytes that live in the configured cache backing rather than
* inline in a task `Output`. Emitted by `TaskRunner` for binary output ports
Expand All @@ -14,24 +22,53 @@
* it back into bytes. `size` and `mime` are best-effort hints populated when
* known at finish time; absent values do not imply unknown failure.
*
* The `kind` brand discriminates a cache ref from other `{$ref: string}`
* shapes (e.g. JSON-Schema references) so the resolver never walks an
* untrusted `$ref` string into the cache. The brand is a literal so it survives
* JSON round-trip across queue boundaries.
*
* Resolution is best-effort: the cache backing's TTL is the lifetime contract,
* and `resolveOutputRef` returns `undefined` when the underlying entry has
* been evicted.
*/
export type CacheRef = {
export interface ICacheRef {
readonly kind: typeof CACHE_REF_KIND;
readonly $ref: string;
readonly size?: number;
readonly mime?: string;
};
}

export type CacheRef = ICacheRef;

/**
* Narrow an unknown value to {@link CacheRef}. The discriminator is a `$ref`
* property of type `string`; other fields are optional and not inspected.
* Narrow an unknown value to {@link CacheRef}. Discriminates on the literal
* {@link CACHE_REF_KIND} brand AND a string `$ref`; shape-only `{$ref: string}`
* objects (JSON-Schema refs, user metadata) do NOT match.
*/
export function isCacheRef(value: unknown): value is CacheRef {
if (typeof value !== "object" || value === null) return false;
const candidate = value as { readonly $ref?: unknown };
return typeof candidate.$ref === "string";
const candidate = value as { readonly kind?: unknown; readonly $ref?: unknown };
return candidate.kind === CACHE_REF_KIND && typeof candidate.$ref === "string";
}

/**
* Construct a branded {@link CacheRef}. Cache backings MUST use this helper (or
* spread `{kind: CACHE_REF_KIND, ...}` themselves) so the resulting ref carries
* the brand. Helpers in {@link CacheCoordinator} / {@link RunPrivateCacheRepo}
* defensively re-wrap legacy backings whose `saveOutputStream` predates the
* brand and returns an unbranded `{$ref}` shape.
*/
export function makeCacheRef(raw: {
readonly $ref: string;
readonly size?: number;
readonly mime?: string;
}): CacheRef {
return {
kind: CACHE_REF_KIND,
$ref: raw.$ref,
...(raw.size !== undefined && { size: raw.size }),
...(raw.mime !== undefined && { mime: raw.mime }),
};
}

/**
Expand Down
17 changes: 10 additions & 7 deletions packages/task-graph/src/cache/RunPrivateCacheRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import { TaskOutputRepository } from "../storage/TaskOutputRepository";
import type { TaskInput, TaskOutput } from "../task/TaskTypes";
import type { CacheRef } from "./CacheRef";
import { isCacheRef, makeCacheRef } from "./CacheRef";

export interface RunPrivateCacheRepoOptions {
backing: TaskOutputRepository;
Expand Down Expand Up @@ -111,22 +112,24 @@ export class RunPrivateCacheRepo extends TaskOutputRepository {
* via the wrapped `taskType`). Resolvers calling `getOutputByRef` on this
* wrapper forward to the backing, which decodes its own `$ref`.
*/
public override saveOutputStream(
public override async saveOutputStream(
taskType: string,
inputs: TaskInput,
chunks: AsyncIterable<Uint8Array>,
metadata: Record<string, unknown>
): Promise<CacheRef> {
const fn = this.backing.saveOutputStream;
if (typeof fn !== "function") {
return Promise.reject(
new Error(
`RunPrivateCacheRepo: backing repository does not implement saveOutputStream. ` +
`Call supportsStreaming() before saveOutputStream.`
)
throw new Error(
`RunPrivateCacheRepo: backing repository does not implement saveOutputStream. ` +
`Call supportsStreaming() before saveOutputStream.`
);
}
return fn.call(this.backing, this.ns(taskType), inputs, chunks, metadata);
// Re-wrap the backing's CacheRef so legacy backings that pre-date the
// `kind` brand still produce a discriminator-bearing ref through this
// wrapper. Branded refs pass through unchanged.
const raw = await fn.call(this.backing, this.ns(taskType), inputs, chunks, metadata);
return isCacheRef(raw) ? raw : makeCacheRef(raw);
}

/**
Expand Down
79 changes: 66 additions & 13 deletions packages/task-graph/src/task-graph/StreamPump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ import type { ResourceScope, ServiceRegistry } from "@workglow/util";
import type { TaskOutputRepository } from "../storage/TaskOutputRepository";
import type { ITask } from "../task/ITask";
import type { StreamEvent, StreamMode } from "../task/StreamTypes";
import { edgeNeedsAccumulation, getOutputStreamMode, getStreamingPorts } from "../task/StreamTypes";
import {
DEFAULT_BINARY_HIGH_WATER_BYTES,
edgeNeedsAccumulation,
getOutputStreamMode,
getStreamingPorts,
} from "../task/StreamTypes";
import type { TaskInput } from "../task/TaskTypes";
import { TaskStatus } from "../task/TaskTypes";
import { Dataflow, DATAFLOW_ALL_PORTS } from "./Dataflow";
Expand Down Expand Up @@ -354,37 +359,58 @@ export class StreamPump {
task: ITask,
binaryPortId: string | undefined,
sink: (chunks: AsyncIterable<Uint8Array>) => Promise<unknown>,
signal?: AbortSignal
): { promise: Promise<void>; detach: () => void } {
signal?: AbortSignal,
options?: { readonly highWaterMarkBytes?: number }
): {
promise: Promise<void>;
detach: () => void;
backpressure: () => Promise<void>;
} {
const queue: Uint8Array[] = [];
let bufferedBytes = 0;
const highWaterMarkBytes = Math.max(
1,
options?.highWaterMarkBytes ?? DEFAULT_BINARY_HIGH_WATER_BYTES
);
let done = false;
let notify: (() => void) | undefined;
let chunkNotify: (() => void) | undefined;
let drainNotify: (() => void) | undefined;

const wake = () => {
const n = notify;
notify = undefined;
const wakeChunk = () => {
const n = chunkNotify;
chunkNotify = undefined;
n?.();
};
const wakeDrain = () => {
const n = drainNotify;
drainNotify = undefined;
n?.();
};

const onChunk = (event: StreamEvent) => {
if (event.type === "binary-delta") {
if (binaryPortId === undefined || event.port === binaryPortId) {
queue.push(event.binaryDelta);
wake();
bufferedBytes += event.binaryDelta.byteLength;
wakeChunk();
}
}
};
const onEnd = () => {
done = true;
wake();
wakeChunk();
// Release any cooperative-backpressure awaiter that was parked at
// high-water; without this an abort-while-parked would leak.
wakeDrain();
};
// Abort/error termination: StreamProcessor never emits `stream_end` on these
// paths, so without this the iterable would await forever. Terminate the
// iterable (don't throw) so the sink finalizes the bytes seen so far and the
// promise settles — the source's own abort/error already surfaces to the run.
const onTerminate = () => {
done = true;
wake();
wakeChunk();
wakeDrain();
};

task.on("stream_chunk", onChunk);
Expand All @@ -407,21 +433,48 @@ export class StreamPump {
async function* chunkIterable(): AsyncIterable<Uint8Array> {
while (true) {
while (queue.length > 0) {
yield queue.shift()!;
const chunk = queue.shift()!;
bufferedBytes -= chunk.byteLength;
if (drainNotify && bufferedBytes < highWaterMarkBytes) wakeDrain();
yield chunk;
}
if (done) return;
await new Promise<void>((resolve) => {
notify = resolve;
chunkNotify = resolve;
});
}
}

/**
* Cooperative backpressure hook. Resolves immediately while buffered
* bytes stay under the high-water mark; otherwise parks until the
* consumer drains the queue, or the stream is closed.
*
* The EventEmitter delivery path cannot apply mandatory backpressure
* (the listener fires synchronously), so this is opt-in: a task can
* `await ctx.binaryBackpressure()` between large yields. Tasks that
* never call it pay nothing.
*/
const backpressure = (): Promise<void> => {
if (done) return Promise.resolve();
if (bufferedBytes < highWaterMarkBytes) return Promise.resolve();
return new Promise<void>((resolve) => {
const prev = drainNotify;
drainNotify = prev
? () => {
prev();
resolve();
}
: resolve;
});
};

// Discard the sink's return value (helper signals completion only; callers
// wanting a CacheRef should hold the sink-returning promise themselves).
const promise = sink(chunkIterable())
.finally(detach)
.then(() => undefined);
return { promise, detach };
return { promise, detach, backpressure };
}

/**
Expand Down
17 changes: 11 additions & 6 deletions packages/task-graph/src/task/CacheCoordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import { getPortCodec } from "@workglow/util";
import type { DataPortSchema } from "@workglow/util/schema";
import { type CachePolicy, isPolicyCached, isPolicyPrivate } from "../cache/CachePolicy";
import type { CacheRef } from "../cache/CacheRef";
import { isCacheRef } from "../cache/CacheRef";
import { isCacheRef, makeCacheRef } from "../cache/CacheRef";
import type { CacheRegistry } from "../cache/CacheRegistry";
import { RunPrivateCacheRepo } from "../cache/RunPrivateCacheRepo";
import type { TaskOutputRepository } from "../storage/TaskOutputRepository";
import type { BinaryRefSink } from "./StreamProcessor";
import { getBinaryPortFormat, getBinaryPortId, getStreamingPorts } from "./StreamTypes";
import type { ITask } from "./ITask";
import type { BinaryRefSink } from "./StreamProcessor";
import type { StreamEvent } from "./StreamTypes";
import { assertBinaryFormat, getBinaryPortId, getStreamingPorts } from "./StreamTypes";
import { Task } from "./Task";
import type { TaskRunContext } from "./TaskRunContext";
import type { TaskInput, TaskOutput } from "./TaskTypes";
Expand Down Expand Up @@ -235,8 +235,13 @@ export class CacheCoordinator<Input extends TaskInput, Output extends TaskOutput
const port = getBinaryPortId(outputSchema);
if (port === undefined) return undefined;
const taskType = this.task.type;
const sink: BinaryRefSink = (chunks) =>
cache.saveOutputStream!(taskType, keyInputs, chunks, {});
// Re-wrap the backing's CacheRef so legacy `saveOutputStream` implementations
// that pre-date the `kind` brand still produce a discriminator-bearing ref.
// Branded refs pass through unchanged (preserving size/mime hints).
const sink: BinaryRefSink = async (chunks) => {
const raw = await cache.saveOutputStream!(taskType, keyInputs, chunks, {});
return isCacheRef(raw) ? raw : makeCacheRef(raw);
};
return new Map([[port, sink]]);
}

Expand Down Expand Up @@ -286,7 +291,7 @@ export class CacheCoordinator<Input extends TaskInput, Output extends TaskOutput
if (size === undefined || size >= referenceThresholdBytes) return undefined;
const blob = await cache.getOutputByRef!(value);
if (blob === undefined) return undefined;
const format = getBinaryPortFormat(outputSchema, port);
const format = assertBinaryFormat(outputSchema, port);
const inlined = format === "binary" ? await blob.arrayBuffer() : blob;
return { port, inlined };
})
Expand Down
22 changes: 22 additions & 0 deletions packages/task-graph/src/task/ITask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ export interface IExecuteContext {
* did not provide it.
*/
resourceScope?: ResourceScope;
/**
* Optional cooperative backpressure hook for streaming tasks that emit very
* large binary outputs by direct event emission (rather than through the
* StreamProcessor's `await router.push(...)` path). Tasks may `await` this
* between yields/emits to give downstream sinks a chance to drain.
*
* Defaults to a no-op when the runtime does not install a real backpressure
* source — tasks can call it unconditionally without paying a cost.
*/
binaryBackpressure?: () => Promise<void>;
}

export type IExecutePreviewContext = Pick<IExecuteContext, "own">;
Expand Down Expand Up @@ -125,6 +135,18 @@ export interface IRunConfig {
*/
referenceThresholdBytes?: number;

/**
* High-water mark (bytes) for the streaming runtime's per-port binary
* router buffer. When the buffered (un-consumed) byte total reaches or
* exceeds this threshold, the producer (`executeStream`) is parked between
* `binary-delta` yields until the cache sink drains the buffer back below
* the mark. Bounds peak memory for fast-producer / slow-sink scenarios.
*
* Defaults to {@link DEFAULT_BINARY_HIGH_WATER_BYTES} (8 MiB) when omitted
* or set to a non-positive value.
*/
binaryHighWaterBytes?: number;

/**
* Optional callback invoked whenever a task's progress changes during execution.
* @param task - The task whose progress changed.
Expand Down
Loading