Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions packages/runtime/src/cloudflare/agent-coordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@ import type {
} from '../agent-execution-store.ts';
import type { FlueContextInternal } from '../client.ts';
import type { FlueTraceCarrier } from '../execution-interceptor.ts';
import { assertProductEventV3 } from '../product-event.ts';
import {
createAgentSubmissionObserverRegistry,
type createAgentSubmissionSessionHandler,
createDirectAgentSubmissionInput,
processSubmission,
reconcileInterruptedSubmission,
type SubmissionInputOutbox,
submissionSyntheticRequest,
} from '../runtime/agent-submissions.ts';
import type { AgentInteractionStart } from '../runtime/dev-lifecycle-logger.ts';
import { agentStreamPath } from '../runtime/event-stream-store.ts';
import { assertAgentDispatchAdmissionInput, handleAgentRequest } from '../runtime/handle-agent.ts';
import { handleStreamHead, handleStreamRead } from '../runtime/handle-stream-routes.ts';
import { isStreamExcludedEvent } from '../runtime/run-store.ts';
import { assertProductEventV3 } from '../product-event.ts';
import { deleteSessionTree } from '../session.ts';
import type { AttachedAgentEvent, DirectAgentPayload } from '../types.ts';
import { createSqlAgentExecutionStore } from './agent-execution-store.ts';
Expand Down Expand Up @@ -290,8 +291,9 @@ class CloudflareAgentCoordinator {
private createDurableContext(
request: Request,
dispatchId?: string,
initialEventIndex?: number,
): FlueContextInternal {
const ctx = this.createContext(request, undefined, dispatchId);
const ctx = this.createContext(request, initialEventIndex, dispatchId);
const streamPath = agentStreamPath(this.agentName, this.instance.name);
ctx.subscribeEvent(async (event) => {
if (isStreamExcludedEvent(event) || event.type === 'submission_settled') return;
Expand All @@ -300,6 +302,14 @@ class CloudflareAgentCoordinator {
return ctx;
}

private deliverInputEvent(input: AgentSubmission, outbox: SubmissionInputOutbox) {
return this.eventStreamStore.appendEventOnce(
agentStreamPath(input.input.agent, input.input.id),
outbox.eventKey,
outbox.event,
);
}

private assertAgentsDurabilityApi(method: 'runFiber' | 'schedule'): void {
if (typeof this.instance[method] !== 'function') {
throw new Error(
Expand Down Expand Up @@ -345,13 +355,15 @@ class CloudflareAgentCoordinator {
await this.submissions.recordSubmissionTerminalOffset(attempt, terminal.eventKey, offset);
if (await this.submissions.finalizeSubmissionTerminal(attempt, terminal.eventKey)) {
const journal = await this.submissions.getTurnJournal(terminal.submissionId);
if (journal?.streamKey) await this.submissions.deleteStreamChunkSegments(journal.streamKey);
if (journal?.streamKey)
await this.submissions.deleteStreamChunkSegments(journal.streamKey);
const event = terminal.event as {
outcome?: 'completed' | 'failed';
result?: unknown;
error?: { message?: string };
};
if (event.outcome === 'completed') this.observers.complete(terminal.submissionId, event.result);
if (event.outcome === 'completed')
this.observers.complete(terminal.submissionId, event.result);
if (event.outcome === 'failed') {
this.observers.fail(
terminal.submissionId,
Expand Down Expand Up @@ -468,7 +480,12 @@ class CloudflareAgentCoordinator {
submission,
agent,
(dispatchId) =>
this.createDurableContext(submissionSyntheticRequest(submission.input), dispatchId),
this.createDurableContext(
submissionSyntheticRequest(submission.input),
dispatchId,
submission.kind === 'direct' ? 1 : undefined,
),
(outbox) => this.deliverInputEvent(submission, outbox),
(terminal) =>
this.eventStreamStore.appendEventOnce(
agentStreamPath(this.agentName, this.instance.name),
Expand Down Expand Up @@ -587,8 +604,13 @@ class CloudflareAgentCoordinator {
return agent;
},
createContext: (dispatchId) =>
this.createDurableContext(submissionSyntheticRequest(submission.input), dispatchId),
this.createDurableContext(
submissionSyntheticRequest(submission.input),
dispatchId,
submission.kind === 'direct' ? 1 : undefined,
),
observers: this.observers,
deliverInputEvent: (outbox) => this.deliverInputEvent(submission, outbox),
deliverTerminalEvent: (terminal) =>
this.eventStreamStore.appendEventOnce(
agentStreamPath(this.agentName, this.instance.name),
Expand Down
47 changes: 39 additions & 8 deletions packages/runtime/src/node/agent-coordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,30 @@ import type {
AgentSubmissionStore,
} from '../agent-execution-store.ts';
import { LEASE_DURATION_MS } from '../agent-execution-store.ts';
import { assertProductEventV3 } from '../product-event.ts';
import {
type AgentSubmissionInput,
type AttachedAgentSubmissionAdmission,
createAgentSubmissionObserverRegistry,
createDirectAgentSubmissionInput,
processSubmission,
reconcileInterruptedSubmission,
type SubmissionInputOutbox,
submissionSyntheticRequest,
} from '../runtime/agent-submissions.ts';
import type { AgentInteractionStart } from '../runtime/dev-lifecycle-logger.ts';
import type { DispatchInput, DispatchQueue } from '../runtime/dispatch-queue.ts';
import { agentStreamPath } from '../runtime/event-stream-store.ts';
import type { CreateAgentContextFn } from '../runtime/handle-agent.ts';
import type { RuntimeActivityGate, RuntimeActivityLease } from '../runtime/runtime-activity-gate.ts';
import { isStreamExcludedEvent } from '../runtime/run-store.ts';
import { assertProductEventV3 } from '../product-event.ts';
import type {
RuntimeActivityGate,
RuntimeActivityLease,
} from '../runtime/runtime-activity-gate.ts';
import { deleteSessionTree } from '../session.ts';
import type {
AttachedAgentEvent,
AgentDefinition,
AttachedAgentEvent,
DirectAgentPayload,
DispatchReceipt,
SessionStore,
Expand Down Expand Up @@ -100,7 +104,15 @@ export function createNodeAgentCoordinator(options: {
onInteractionStart?: (interaction: AgentInteractionStart) => void;
activityGate?: RuntimeActivityGate;
}): NodeAgentCoordinator {
const { submissions, sessions, agents, createContext, eventStreamStore, onInteractionStart, activityGate } = options;
const {
submissions,
sessions,
agents,
createContext,
eventStreamStore,
onInteractionStart,
activityGate,
} = options;
const observers = createAgentSubmissionObserverRegistry();

// ── Lease ownership ──────────────────────────────────────────────────
Expand Down Expand Up @@ -171,6 +183,7 @@ export function createNodeAgentCoordinator(options: {
id: input.id,
agentName: input.agent,
request: submissionSyntheticRequest(input),
...(input.kind === 'direct' ? { initialEventIndex: 1 } : {}),
dispatchId,
});
// Subscribe to events for durable agent event persistence.
Expand All @@ -186,10 +199,19 @@ export function createNodeAgentCoordinator(options: {

function resolveAgent(name: string): AgentDefinition {
const agent = agents.find((record) => record.name === name)?.definition;
if (!agent) throw new Error(`[flue] submission target agent "${name}" has no agent definition.`);
if (!agent)
throw new Error(`[flue] submission target agent "${name}" has no agent definition.`);
return agent;
}

function deliverInputEvent(input: AgentSubmissionInput, outbox: SubmissionInputOutbox) {
return eventStreamStore.appendEventOnce(
agentStreamPath(input.agent, input.id),
outbox.eventKey,
outbox.event,
);
}

/**
* Start processing a claimed submission as an independent async task.
* Adds itself to `activeSubmissions`, removes on completion, and
Expand All @@ -210,6 +232,7 @@ export function createNodeAgentCoordinator(options: {
resolveAgent,
createContext: makeSubmissionContext(claimed.input),
observers,
deliverInputEvent: (outbox) => deliverInputEvent(claimed.input, outbox),
deliverTerminalEvent: (terminal) =>
eventStreamStore.appendEventOnce(
agentStreamPath(claimed.input.agent, claimed.input.id),
Expand Down Expand Up @@ -416,7 +439,9 @@ export function createNodeAgentCoordinator(options: {
if (!submission || submission.kind !== 'direct') continue;
const streamPath = agentStreamPath(submission.input.agent, submission.input.id);
await eventStreamStore.createStream(streamPath);
const offset = terminal.offset ?? (await eventStreamStore.appendEventOnce(streamPath, terminal.eventKey, terminal.event));
const offset =
terminal.offset ??
(await eventStreamStore.appendEventOnce(streamPath, terminal.eventKey, terminal.event));
const attempt = { submissionId: terminal.submissionId, attemptId: terminal.attemptId };
await submissions.recordSubmissionTerminalOffset(attempt, terminal.eventKey, offset);
if (await submissions.finalizeSubmissionTerminal(attempt, terminal.eventKey)) {
Expand All @@ -429,7 +454,10 @@ export function createNodeAgentCoordinator(options: {
};
if (event.outcome === 'completed') observers.complete(terminal.submissionId, event.result);
if (event.outcome === 'failed') {
observers.fail(terminal.submissionId, new Error(event.error?.message ?? 'Agent submission failed.'));
observers.fail(
terminal.submissionId,
new Error(event.error?.message ?? 'Agent submission failed.'),
);
}
}
}
Expand Down Expand Up @@ -466,6 +494,7 @@ export function createNodeAgentCoordinator(options: {
submission,
agent,
makeSubmissionContext(submission.input),
(outbox) => deliverInputEvent(submission.input, outbox),
(terminal) =>
eventStreamStore.appendEventOnce(
agentStreamPath(submission.input.agent, submission.input.id),
Expand Down Expand Up @@ -575,7 +604,9 @@ export function createNodeAgentCoordinator(options: {
const agent = agents.find((record) => record.name === agentName)?.definition;
if (!agent) {
activityLease?.release();
throw new Error(`[flue] direct prompt target agent "${agentName}" has no agent definition.`);
throw new Error(
`[flue] direct prompt target agent "${agentName}" has no agent definition.`,
);
}

const input = createDirectAgentSubmissionInput({
Expand Down
59 changes: 55 additions & 4 deletions packages/runtime/src/runtime/agent-submissions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import {
SubmissionRetryExhaustedError,
SubmissionTimeoutError,
} from '../errors.ts';
import { interceptExecution, type FlueTraceCarrier } from '../execution-interceptor.ts';
import { redactEventImages } from '../event-redaction.ts';
import { type FlueTraceCarrier, interceptExecution } from '../execution-interceptor.ts';
import { getInternalSession } from '../session.ts';
import { createUserContextMessage } from '../session-history.ts';
import type {
AgentDefinition,
AttachedAgentEvent,
CallHandle,
AgentDefinition,
DirectAgentPayload,
PromptResponse,
} from '../types.ts';
Expand All @@ -43,6 +45,11 @@ export interface DirectAgentSubmissionInput {

export type AgentSubmissionInput = DispatchAgentSubmissionInput | DirectAgentSubmissionInput;

export interface SubmissionInputOutbox {
readonly eventKey: string;
readonly event: AttachedAgentEvent;
}

export interface AgentSubmissionInterruption {
readonly submissionId: string;
readonly kind: AgentSubmissionInput['kind'];
Expand Down Expand Up @@ -162,6 +169,38 @@ export function createDirectAgentSubmissionInput(options: {
};
}

export function createDirectSubmissionInputOutbox(
input: DirectAgentSubmissionInput,
): SubmissionInputOutbox {
const turnId = `turn_${input.submissionId}`;
const redacted = redactEventImages({
type: 'message_end',
message: createUserContextMessage(
input.payload.message,
input.acceptedAt,
input.payload.images,
),
turnId,
});
if (redacted.type !== 'message_end') {
throw new Error('[flue] Direct submission input event changed type during redaction.');
}
return {
eventKey: `direct-submission:${input.submissionId}:input`,
event: {
type: 'message_end',
message: redacted.message,
turnId,
v: 3,
eventIndex: 0,
timestamp: input.acceptedAt,
agentName: input.agent,
instanceId: input.id,
submissionId: input.submissionId,
},
};
}

export function createAgentSubmissionSessionHandler(
agent: AgentDefinition,
input: AgentSubmissionInput,
Expand Down Expand Up @@ -315,12 +354,16 @@ export async function reconcileInterruptedSubmission(
submission: AgentSubmission,
agent: AgentDefinition,
createContext: (dispatchId: string | undefined) => FlueContextInternal,
deliverInputEvent: (input: SubmissionInputOutbox) => Promise<string>,
deliverTerminalEvent: (terminal: SubmissionTerminalOutbox) => Promise<string>,
lease?: { ownerId: string; leaseExpiresAt: number },
): Promise<ReconciliationResult> {
const { input } = submission;
const attempt = submissionAttemptRef(submission);
if (!attempt) return { disposition: 'stale' };
if (input.kind === 'direct') {
await deliverInputEvent(createDirectSubmissionInputOutbox(input));
}

// Inspect canonical session state first: a completed canonical response
// is finished provider work and settles as success unconditionally. The
Expand Down Expand Up @@ -610,6 +653,7 @@ export interface ProcessSubmissionOptions {
createContext: (dispatchId: string | undefined) => FlueContextInternal;
/** Observer registry for direct submission events and settlement. */
observers: Pick<AgentSubmissionObserverRegistry, 'publish' | 'complete' | 'fail'>;
deliverInputEvent: (input: SubmissionInputOutbox) => Promise<string>;
deliverTerminalEvent: (terminal: SubmissionTerminalOutbox) => Promise<string>;
onInteractionStart?: (interaction: {
agentName: string;
Expand Down Expand Up @@ -659,6 +703,9 @@ export async function processSubmission(opts: ProcessSubmissionOptions): Promise
};
const persisted = await submissions.getSubmission(submission.submissionId);
if (persisted?.status !== 'running' || persisted.attemptId !== attempt.attemptId) return;
if (input.kind === 'direct') {
await opts.deliverInputEvent(createDirectSubmissionInputOutbox(input));
}
if (submission.attemptCount === 1 && opts.onInteractionStart) {
try {
opts.onInteractionStart({
Expand Down Expand Up @@ -770,7 +817,8 @@ export async function processSubmission(opts: ProcessSubmissionOptions): Promise
result,
)
: await submissions.completeSubmission(attempt);
if (submission.kind === 'direct' && settled) observers.complete(submission.submissionId, result);
if (submission.kind === 'direct' && settled)
observers.complete(submission.submissionId, result);
} finally {
if (submission.kind === 'direct') ctx.setEventCallback(undefined);
opts.onSettled?.();
Expand Down Expand Up @@ -858,7 +906,10 @@ async function settleDirectSubmission(
try {
await ctx.flushEventCallbacks();
} catch (callbackError) {
console.error('[flue:event-stream] Event persistence failed before terminal settlement:', callbackError);
console.error(
'[flue:event-stream] Event persistence failed before terminal settlement:',
callbackError,
);
}
const offset = terminal.offset ?? (await deliverTerminalEvent(terminal));
if (!(await submissions.recordSubmissionTerminalOffset(attempt, eventKey, offset))) return false;
Expand Down
Loading
Loading