Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions packages/task-graph/src/cache/CacheRef.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,35 @@ export function isCacheRef(value: unknown): value is CacheRef {
return candidate.kind === CACHE_REF_KIND && typeof candidate.$ref === "string";
}

/**
* Matches the pre-brand on-disk shape `{ $ref: string, size?, mime? }` without
* the `kind` discriminator. Used ONLY at declared binary streaming port slots
* by {@link CacheCoordinator} to upgrade legacy cache rows in place. Never
* call this on arbitrary nested fields — that would defeat the brand check
* and re-introduce the shape-only collision risk that the brand exists to
* close.
*
* An already-branded {@link CacheRef} is rejected (returns `false`) so callers
* can use this as a strict "needs upgrade" predicate without first filtering
* out the branded case.
*/
export function isLegacyUnbrandedCacheRefShape(
value: unknown
): value is { $ref: string; size?: number; mime?: string } {
if (typeof value !== "object" || value === null) return false;
const candidate = value as {
readonly kind?: unknown;
readonly $ref?: unknown;
readonly size?: unknown;
readonly mime?: unknown;
};
if (candidate.kind === CACHE_REF_KIND) return false;
if (typeof candidate.$ref !== "string") return false;
if (candidate.size !== undefined && typeof candidate.size !== "number") return false;
if (candidate.mime !== undefined && typeof candidate.mime !== "string") return false;
return true;
}

/**
* Construct a branded {@link CacheRef}. Cache backings MUST use this helper (or
* spread `{kind: CACHE_REF_KIND, ...}` themselves) so the resulting ref carries
Expand Down
8 changes: 8 additions & 0 deletions packages/task-graph/src/cache/RunPrivateCacheRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ export class RunPrivateCacheRepo extends TaskOutputRepository {
await this.backing.saveOutput(this.ns(cacheIdentity), inputs, output, createdAt);
}

/**
* Legacy unbranded `{$ref}` shapes are NOT re-wrapped here; that is the
* responsibility of {@link CacheCoordinator}, which has the output schema
* in scope and can restrict re-wrap to declared binary streaming ports.
* Re-wrapping at this layer would have to walk arbitrary nested fields and
* would re-introduce the shape-only collision risk that the brand exists
* to close.
*/
public async getOutput(
cacheIdentity: string,
inputs: TaskInput
Expand Down
38 changes: 34 additions & 4 deletions packages/task-graph/src/task/CacheCoordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { getPortCodec } from "@workglow/util";
import type { DataPortSchema } from "@workglow/util/schema";
import { type CachePolicy, isPolicyCached, isPolicyPrivate } from "../cache/CachePolicy";
import type { CacheRef } from "../cache/CacheRef";
import { isCacheRef, makeCacheRef } from "../cache/CacheRef";
import { isCacheRef, isLegacyUnbrandedCacheRefShape, makeCacheRef } from "../cache/CacheRef";
import type { CacheRegistry } from "../cache/CacheRegistry";
import { RunPrivateCacheRepo } from "../cache/RunPrivateCacheRepo";
import type { TaskOutputRepository } from "../storage/TaskOutputRepository";
Expand Down Expand Up @@ -106,6 +106,26 @@ export class CacheCoordinator<Input extends TaskInput, Output extends TaskOutput
outputSchema as unknown as SchemaProperties
)) as Output;

// Upgrade legacy unbranded `{$ref}` rows in place at every declared binary
// streaming port. Pre-brand cache writes landed without the literal `kind`
// discriminator, so subsequent `isCacheRef` probes would treat them as
// ordinary objects and skip rehydration / threshold logic. Scope is the
// same set of schema-declared binary ports `hydrateRefsBelowThreshold`
// uses, so non-binary fields that legitimately carry a `{$ref: string}`
// shape (JSON-Schema refs, user metadata) are NEVER auto-promoted.
if (outputs !== null && typeof outputs === "object") {
const binaryPorts = getStreamingPorts(outputSchema).filter((p) => p.mode === "binary");
if (binaryPorts.length > 0) {
const slots = outputs as Record<string, unknown>;
for (const { port } of binaryPorts) {
const value = slots[port];
if (isLegacyUnbrandedCacheRefShape(value)) {
slots[port] = makeCacheRef(value);
}
}
}
}

ctx.telemetrySpan?.addEvent("workglow.task.cache_hit");

if (isStreamable) {
Expand Down Expand Up @@ -286,10 +306,20 @@ export class CacheCoordinator<Input extends TaskInput, Output extends TaskOutput
const rehydrations = await Promise.all(
binaryPorts.map(async (port) => {
const value = source[port];
if (!isCacheRef(value)) return undefined;
const size = value.size;
// Accept both branded refs and the pre-brand on-disk shape. The
// streaming finish-event path can flow into this method without
// going through `lookup`, so re-wrap inline here as defence in
// depth — restricted to declared binary streaming ports for the
// same scoping reasons as the lookup-path upgrade.
const ref: CacheRef | undefined = isCacheRef(value)
? value
: isLegacyUnbrandedCacheRefShape(value)
? makeCacheRef(value)
: undefined;
if (ref === undefined) return undefined;
const size = ref.size;
if (size === undefined || size >= referenceThresholdBytes) return undefined;
const blob = await cache.getOutputByRef!(value);
const blob = await cache.getOutputByRef!(ref);
if (blob === undefined) return undefined;
const format = assertBinaryFormat(outputSchema, port);
const inlined = format === "binary" ? await blob.arrayBuffer() : blob;
Expand Down
9 changes: 9 additions & 0 deletions packages/task-graph/src/task/TaskRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ function hasRunConfig(i: unknown): i is { runConfig: Partial<IRunConfig> } {
return i !== null && typeof i === "object" && "runConfig" in (i as object);
}

/**
* Default {@link IExecuteContext.binaryBackpressure} used by non-streaming
* execute paths. Module-private and shared across calls so tasks can
* `await ctx.binaryBackpressure()` unconditionally without per-call
* allocation; only {@link StreamProcessor} installs a router-aware version.
*/
const NOOP_BACKPRESSURE = (): Promise<void> => Promise.resolve();

/**
* Responsible for running tasks
* Manages the execution lifecycle of individual tasks
Expand Down Expand Up @@ -590,6 +598,7 @@ export class TaskRunner<
registry: this.registry,
resourceScope: this.resourceScope,
runId: this.runId,
binaryBackpressure: NOOP_BACKPRESSURE,
});
return result;
}
Expand Down
8 changes: 8 additions & 0 deletions packages/task-graph/src/task/WhileTaskRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ import type { TaskRunContext } from "./TaskRunContext";
import type { TaskInput, TaskOutput } from "./TaskTypes";
import type { WhileTask, WhileTaskConfig } from "./WhileTask";

/**
* Default {@link IExecuteContext.binaryBackpressure} used by the WhileTask
* runner. Shared across calls so tasks can call it unconditionally without
* paying a per-call allocation cost.
*/
const WHILE_NOOP_BACKPRESSURE = (): Promise<void> => Promise.resolve();

/**
* Runner for WhileTask that delegates to the task's execute() method
* instead of directly running the subgraph once (which is what
Expand Down Expand Up @@ -37,6 +44,7 @@ export class WhileTaskRunner<
updateProgress: this.handleProgress.bind(this),
own: this.own,
registry: this.registry,
binaryBackpressure: WHILE_NOOP_BACKPRESSURE,
});

return result;
Expand Down
Loading
Loading