From 7a30809b8bf4d158feb0e6d417791543bb671af3 Mon Sep 17 00:00:00 2001 From: George Zhang Date: Wed, 24 Jun 2026 16:56:10 -0700 Subject: [PATCH] fix(runtime): persist direct prompt user events --- .../src/cloudflare/agent-coordinator.ts | 34 +++++-- .../runtime/src/node/agent-coordinator.ts | 47 ++++++++-- .../runtime/src/runtime/agent-submissions.ts | 59 +++++++++++- .../test/cloudflare-agent-coordinator.test.ts | 46 ++++++++- .../test/node-agent-coordinator.test.ts | 94 +++++++++++++++++-- 5 files changed, 251 insertions(+), 29 deletions(-) diff --git a/packages/runtime/src/cloudflare/agent-coordinator.ts b/packages/runtime/src/cloudflare/agent-coordinator.ts index 3de35276..9c32155d 100644 --- a/packages/runtime/src/cloudflare/agent-coordinator.ts +++ b/packages/runtime/src/cloudflare/agent-coordinator.ts @@ -5,12 +5,14 @@ import type { } from '../agent-execution-store.ts'; import type { FlueContextInternal } from '../client.ts'; import type { FlueTraceCarrier } from '../execution-interceptor.ts'; +import { assertProductEventV3 } from '../product-event.ts'; import { createAgentSubmissionObserverRegistry, type createAgentSubmissionSessionHandler, createDirectAgentSubmissionInput, processSubmission, reconcileInterruptedSubmission, + type SubmissionInputOutbox, submissionSyntheticRequest, } from '../runtime/agent-submissions.ts'; import type { AgentInteractionStart } from '../runtime/dev-lifecycle-logger.ts'; @@ -18,7 +20,6 @@ import { agentStreamPath } from '../runtime/event-stream-store.ts'; import { assertAgentDispatchAdmissionInput, handleAgentRequest } from '../runtime/handle-agent.ts'; import { handleStreamHead, handleStreamRead } from '../runtime/handle-stream-routes.ts'; import { isStreamExcludedEvent } from '../runtime/run-store.ts'; -import { assertProductEventV3 } from '../product-event.ts'; import { deleteSessionTree } from '../session.ts'; import type { AttachedAgentEvent, DirectAgentPayload } from '../types.ts'; import { createSqlAgentExecutionStore } from './agent-execution-store.ts'; @@ -290,8 +291,9 @@ class CloudflareAgentCoordinator { private createDurableContext( request: Request, dispatchId?: string, + initialEventIndex?: number, ): FlueContextInternal { - const ctx = this.createContext(request, undefined, dispatchId); + const ctx = this.createContext(request, initialEventIndex, dispatchId); const streamPath = agentStreamPath(this.agentName, this.instance.name); ctx.subscribeEvent(async (event) => { if (isStreamExcludedEvent(event) || event.type === 'submission_settled') return; @@ -300,6 +302,14 @@ class CloudflareAgentCoordinator { return ctx; } + private deliverInputEvent(input: AgentSubmission, outbox: SubmissionInputOutbox) { + return this.eventStreamStore.appendEventOnce( + agentStreamPath(input.input.agent, input.input.id), + outbox.eventKey, + outbox.event, + ); + } + private assertAgentsDurabilityApi(method: 'runFiber' | 'schedule'): void { if (typeof this.instance[method] !== 'function') { throw new Error( @@ -345,13 +355,15 @@ class CloudflareAgentCoordinator { await this.submissions.recordSubmissionTerminalOffset(attempt, terminal.eventKey, offset); if (await this.submissions.finalizeSubmissionTerminal(attempt, terminal.eventKey)) { const journal = await this.submissions.getTurnJournal(terminal.submissionId); - if (journal?.streamKey) await this.submissions.deleteStreamChunkSegments(journal.streamKey); + if (journal?.streamKey) + await this.submissions.deleteStreamChunkSegments(journal.streamKey); const event = terminal.event as { outcome?: 'completed' | 'failed'; result?: unknown; error?: { message?: string }; }; - if (event.outcome === 'completed') this.observers.complete(terminal.submissionId, event.result); + if (event.outcome === 'completed') + this.observers.complete(terminal.submissionId, event.result); if (event.outcome === 'failed') { this.observers.fail( terminal.submissionId, @@ -468,7 +480,12 @@ class CloudflareAgentCoordinator { submission, agent, (dispatchId) => - this.createDurableContext(submissionSyntheticRequest(submission.input), dispatchId), + this.createDurableContext( + submissionSyntheticRequest(submission.input), + dispatchId, + submission.kind === 'direct' ? 1 : undefined, + ), + (outbox) => this.deliverInputEvent(submission, outbox), (terminal) => this.eventStreamStore.appendEventOnce( agentStreamPath(this.agentName, this.instance.name), @@ -587,8 +604,13 @@ class CloudflareAgentCoordinator { return agent; }, createContext: (dispatchId) => - this.createDurableContext(submissionSyntheticRequest(submission.input), dispatchId), + this.createDurableContext( + submissionSyntheticRequest(submission.input), + dispatchId, + submission.kind === 'direct' ? 1 : undefined, + ), observers: this.observers, + deliverInputEvent: (outbox) => this.deliverInputEvent(submission, outbox), deliverTerminalEvent: (terminal) => this.eventStreamStore.appendEventOnce( agentStreamPath(this.agentName, this.instance.name), diff --git a/packages/runtime/src/node/agent-coordinator.ts b/packages/runtime/src/node/agent-coordinator.ts index 64f12c19..022b2b78 100644 --- a/packages/runtime/src/node/agent-coordinator.ts +++ b/packages/runtime/src/node/agent-coordinator.ts @@ -4,6 +4,7 @@ import type { AgentSubmissionStore, } from '../agent-execution-store.ts'; import { LEASE_DURATION_MS } from '../agent-execution-store.ts'; +import { assertProductEventV3 } from '../product-event.ts'; import { type AgentSubmissionInput, type AttachedAgentSubmissionAdmission, @@ -11,19 +12,22 @@ import { createDirectAgentSubmissionInput, processSubmission, reconcileInterruptedSubmission, + type SubmissionInputOutbox, submissionSyntheticRequest, } from '../runtime/agent-submissions.ts'; import type { AgentInteractionStart } from '../runtime/dev-lifecycle-logger.ts'; import type { DispatchInput, DispatchQueue } from '../runtime/dispatch-queue.ts'; import { agentStreamPath } from '../runtime/event-stream-store.ts'; import type { CreateAgentContextFn } from '../runtime/handle-agent.ts'; -import type { RuntimeActivityGate, RuntimeActivityLease } from '../runtime/runtime-activity-gate.ts'; import { isStreamExcludedEvent } from '../runtime/run-store.ts'; -import { assertProductEventV3 } from '../product-event.ts'; +import type { + RuntimeActivityGate, + RuntimeActivityLease, +} from '../runtime/runtime-activity-gate.ts'; import { deleteSessionTree } from '../session.ts'; import type { - AttachedAgentEvent, AgentDefinition, + AttachedAgentEvent, DirectAgentPayload, DispatchReceipt, SessionStore, @@ -100,7 +104,15 @@ export function createNodeAgentCoordinator(options: { onInteractionStart?: (interaction: AgentInteractionStart) => void; activityGate?: RuntimeActivityGate; }): NodeAgentCoordinator { - const { submissions, sessions, agents, createContext, eventStreamStore, onInteractionStart, activityGate } = options; + const { + submissions, + sessions, + agents, + createContext, + eventStreamStore, + onInteractionStart, + activityGate, + } = options; const observers = createAgentSubmissionObserverRegistry(); // ── Lease ownership ────────────────────────────────────────────────── @@ -171,6 +183,7 @@ export function createNodeAgentCoordinator(options: { id: input.id, agentName: input.agent, request: submissionSyntheticRequest(input), + ...(input.kind === 'direct' ? { initialEventIndex: 1 } : {}), dispatchId, }); // Subscribe to events for durable agent event persistence. @@ -186,10 +199,19 @@ export function createNodeAgentCoordinator(options: { function resolveAgent(name: string): AgentDefinition { const agent = agents.find((record) => record.name === name)?.definition; - if (!agent) throw new Error(`[flue] submission target agent "${name}" has no agent definition.`); + if (!agent) + throw new Error(`[flue] submission target agent "${name}" has no agent definition.`); return agent; } + function deliverInputEvent(input: AgentSubmissionInput, outbox: SubmissionInputOutbox) { + return eventStreamStore.appendEventOnce( + agentStreamPath(input.agent, input.id), + outbox.eventKey, + outbox.event, + ); + } + /** * Start processing a claimed submission as an independent async task. * Adds itself to `activeSubmissions`, removes on completion, and @@ -210,6 +232,7 @@ export function createNodeAgentCoordinator(options: { resolveAgent, createContext: makeSubmissionContext(claimed.input), observers, + deliverInputEvent: (outbox) => deliverInputEvent(claimed.input, outbox), deliverTerminalEvent: (terminal) => eventStreamStore.appendEventOnce( agentStreamPath(claimed.input.agent, claimed.input.id), @@ -416,7 +439,9 @@ export function createNodeAgentCoordinator(options: { if (!submission || submission.kind !== 'direct') continue; const streamPath = agentStreamPath(submission.input.agent, submission.input.id); await eventStreamStore.createStream(streamPath); - const offset = terminal.offset ?? (await eventStreamStore.appendEventOnce(streamPath, terminal.eventKey, terminal.event)); + const offset = + terminal.offset ?? + (await eventStreamStore.appendEventOnce(streamPath, terminal.eventKey, terminal.event)); const attempt = { submissionId: terminal.submissionId, attemptId: terminal.attemptId }; await submissions.recordSubmissionTerminalOffset(attempt, terminal.eventKey, offset); if (await submissions.finalizeSubmissionTerminal(attempt, terminal.eventKey)) { @@ -429,7 +454,10 @@ export function createNodeAgentCoordinator(options: { }; if (event.outcome === 'completed') observers.complete(terminal.submissionId, event.result); if (event.outcome === 'failed') { - observers.fail(terminal.submissionId, new Error(event.error?.message ?? 'Agent submission failed.')); + observers.fail( + terminal.submissionId, + new Error(event.error?.message ?? 'Agent submission failed.'), + ); } } } @@ -466,6 +494,7 @@ export function createNodeAgentCoordinator(options: { submission, agent, makeSubmissionContext(submission.input), + (outbox) => deliverInputEvent(submission.input, outbox), (terminal) => eventStreamStore.appendEventOnce( agentStreamPath(submission.input.agent, submission.input.id), @@ -575,7 +604,9 @@ export function createNodeAgentCoordinator(options: { const agent = agents.find((record) => record.name === agentName)?.definition; if (!agent) { activityLease?.release(); - throw new Error(`[flue] direct prompt target agent "${agentName}" has no agent definition.`); + throw new Error( + `[flue] direct prompt target agent "${agentName}" has no agent definition.`, + ); } const input = createDirectAgentSubmissionInput({ diff --git a/packages/runtime/src/runtime/agent-submissions.ts b/packages/runtime/src/runtime/agent-submissions.ts index 38fe947c..730a1111 100644 --- a/packages/runtime/src/runtime/agent-submissions.ts +++ b/packages/runtime/src/runtime/agent-submissions.ts @@ -13,12 +13,14 @@ import { SubmissionRetryExhaustedError, SubmissionTimeoutError, } from '../errors.ts'; -import { interceptExecution, type FlueTraceCarrier } from '../execution-interceptor.ts'; +import { redactEventImages } from '../event-redaction.ts'; +import { type FlueTraceCarrier, interceptExecution } from '../execution-interceptor.ts'; import { getInternalSession } from '../session.ts'; +import { createUserContextMessage } from '../session-history.ts'; import type { + AgentDefinition, AttachedAgentEvent, CallHandle, - AgentDefinition, DirectAgentPayload, PromptResponse, } from '../types.ts'; @@ -43,6 +45,11 @@ export interface DirectAgentSubmissionInput { export type AgentSubmissionInput = DispatchAgentSubmissionInput | DirectAgentSubmissionInput; +export interface SubmissionInputOutbox { + readonly eventKey: string; + readonly event: AttachedAgentEvent; +} + export interface AgentSubmissionInterruption { readonly submissionId: string; readonly kind: AgentSubmissionInput['kind']; @@ -162,6 +169,38 @@ export function createDirectAgentSubmissionInput(options: { }; } +export function createDirectSubmissionInputOutbox( + input: DirectAgentSubmissionInput, +): SubmissionInputOutbox { + const turnId = `turn_${input.submissionId}`; + const redacted = redactEventImages({ + type: 'message_end', + message: createUserContextMessage( + input.payload.message, + input.acceptedAt, + input.payload.images, + ), + turnId, + }); + if (redacted.type !== 'message_end') { + throw new Error('[flue] Direct submission input event changed type during redaction.'); + } + return { + eventKey: `direct-submission:${input.submissionId}:input`, + event: { + type: 'message_end', + message: redacted.message, + turnId, + v: 3, + eventIndex: 0, + timestamp: input.acceptedAt, + agentName: input.agent, + instanceId: input.id, + submissionId: input.submissionId, + }, + }; +} + export function createAgentSubmissionSessionHandler( agent: AgentDefinition, input: AgentSubmissionInput, @@ -315,12 +354,16 @@ export async function reconcileInterruptedSubmission( submission: AgentSubmission, agent: AgentDefinition, createContext: (dispatchId: string | undefined) => FlueContextInternal, + deliverInputEvent: (input: SubmissionInputOutbox) => Promise, deliverTerminalEvent: (terminal: SubmissionTerminalOutbox) => Promise, lease?: { ownerId: string; leaseExpiresAt: number }, ): Promise { const { input } = submission; const attempt = submissionAttemptRef(submission); if (!attempt) return { disposition: 'stale' }; + if (input.kind === 'direct') { + await deliverInputEvent(createDirectSubmissionInputOutbox(input)); + } // Inspect canonical session state first: a completed canonical response // is finished provider work and settles as success unconditionally. The @@ -610,6 +653,7 @@ export interface ProcessSubmissionOptions { createContext: (dispatchId: string | undefined) => FlueContextInternal; /** Observer registry for direct submission events and settlement. */ observers: Pick; + deliverInputEvent: (input: SubmissionInputOutbox) => Promise; deliverTerminalEvent: (terminal: SubmissionTerminalOutbox) => Promise; onInteractionStart?: (interaction: { agentName: string; @@ -659,6 +703,9 @@ export async function processSubmission(opts: ProcessSubmissionOptions): Promise }; const persisted = await submissions.getSubmission(submission.submissionId); if (persisted?.status !== 'running' || persisted.attemptId !== attempt.attemptId) return; + if (input.kind === 'direct') { + await opts.deliverInputEvent(createDirectSubmissionInputOutbox(input)); + } if (submission.attemptCount === 1 && opts.onInteractionStart) { try { opts.onInteractionStart({ @@ -770,7 +817,8 @@ export async function processSubmission(opts: ProcessSubmissionOptions): Promise result, ) : await submissions.completeSubmission(attempt); - if (submission.kind === 'direct' && settled) observers.complete(submission.submissionId, result); + if (submission.kind === 'direct' && settled) + observers.complete(submission.submissionId, result); } finally { if (submission.kind === 'direct') ctx.setEventCallback(undefined); opts.onSettled?.(); @@ -858,7 +906,10 @@ async function settleDirectSubmission( try { await ctx.flushEventCallbacks(); } catch (callbackError) { - console.error('[flue:event-stream] Event persistence failed before terminal settlement:', callbackError); + console.error( + '[flue:event-stream] Event persistence failed before terminal settlement:', + callbackError, + ); } const offset = terminal.offset ?? (await deliverTerminalEvent(terminal)); if (!(await submissions.recordSubmissionTerminalOffset(attempt, eventKey, offset))) return false; diff --git a/packages/runtime/test/cloudflare-agent-coordinator.test.ts b/packages/runtime/test/cloudflare-agent-coordinator.test.ts index 316d5e02..55d4d3a6 100644 --- a/packages/runtime/test/cloudflare-agent-coordinator.test.ts +++ b/packages/runtime/test/cloudflare-agent-coordinator.test.ts @@ -8,6 +8,7 @@ import type { AgentSubmissionInterruption, DirectAgentSubmissionInput, } from '../src/runtime/agent-submissions.ts'; +import { agentStreamPath } from '../src/runtime/event-stream-store.ts'; import { createTestEventStreamStore } from './helpers/test-event-stream-store.ts'; afterEach(() => { @@ -60,7 +61,9 @@ function makeFakeSql() { function makeRuntime( options: { - createdAgent?: Parameters[0]['agents'][number]['definition']; + createdAgent?: Parameters< + typeof createCloudflareAgentRuntime + >[0]['agents'][number]['definition']; createContext?: Parameters[0]['createContext']; createEventStreamStore?: Parameters< typeof createCloudflareAgentRuntime @@ -68,9 +71,7 @@ function makeRuntime( } = {}, ) { return createCloudflareAgentRuntime({ - agents: options.createdAgent - ? [{ name: 'assistant', definition: options.createdAgent }] - : [], + agents: options.createdAgent ? [{ name: 'assistant', definition: options.createdAgent }] : [], createContext: options.createContext ?? (() => { @@ -462,6 +463,43 @@ describe('createCloudflareAgentRuntime()', () => { }); }); + it('appends the direct input event while reconciling an interrupted attempt', async () => { + const { storage } = makeFakeSql(); + const recovery = makeRecoveryContext({ inspection: 'absent' }); + const eventStreamStore = createTestEventStreamStore(); + const runtime = makeRuntime({ + createdAgent: {} as never, + createContext: () => recovery.ctx, + createEventStreamStore: () => eventStreamStore, + }); + const instance = makeInstance(storage); + const executionStore = prepare(runtime, instance); + await eventStreamStore.createStream(agentStreamPath('assistant', 'agent-1')); + await executionStore.submissions.admitDirect(directInput()); + await executionStore.submissions.claimSubmission({ + submissionId: 'direct-1', + attemptId: 'attempt-1', + ownerId: 'test-owner', + leaseExpiresAt: Date.now() + 30_000, + }); + + await runtime.onStart(instance, () => {}); + + const stream = await eventStreamStore.readEvents(agentStreamPath('assistant', 'agent-1'), { + offset: '-1', + }); + expect(stream.events.map((event) => event.data)).toEqual([ + expect.objectContaining({ + type: 'message_end', + eventIndex: 0, + timestamp: '2026-06-03T00:00:00.000Z', + instanceId: 'agent-1', + submissionId: 'direct-1', + message: expect.objectContaining({ role: 'user' }), + }), + ]); + }); + it('records interruption before settling applied incomplete canonical input as error', async () => { const events: string[] = []; const { storage } = makeFakeSql(); diff --git a/packages/runtime/test/node-agent-coordinator.test.ts b/packages/runtime/test/node-agent-coordinator.test.ts index d467fe4b..fe244ad2 100644 --- a/packages/runtime/test/node-agent-coordinator.test.ts +++ b/packages/runtime/test/node-agent-coordinator.test.ts @@ -159,8 +159,13 @@ async function createFauxCoordinator( dbPath: string, provider: FauxProviderRegistration, durability?: { maxAttempts?: number; timeoutMs?: number }, -): Promise<{ coordinator: NodeAgentCoordinator; executionStore: AgentExecutionStore }> { +): Promise<{ + coordinator: NodeAgentCoordinator; + executionStore: AgentExecutionStore; + eventStreamStore: ReturnType; +}> { const executionStore = await openExecutionStore(dbPath); + const eventStreamStore = createTestEventStreamStore(); const agent = defineAgent(() => ({ model: `${provider.getModel().provider}/${provider.getModel().id}`, durability, @@ -170,9 +175,9 @@ async function createFauxCoordinator( sessions: executionStore.sessions, agents: [{ name: 'assistant', definition: agent }], createContext: makeFauxCreateContext(provider, executionStore), - eventStreamStore: createTestEventStreamStore(), + eventStreamStore, }); - return { coordinator, executionStore }; + return { coordinator, executionStore, eventStreamStore }; } // --------------------------------------------------------------------------- @@ -1855,6 +1860,65 @@ describe('NodeAgentCoordinator', () => { expect(await executionStore.submissions.hasUnsettledSubmissions()).toBe(false); }); + it('replays the complete transcript after a direct prompt settles', async () => { + const dbPath = createTempDbPath(); + const provider = createFauxProvider(); + provider.setResponses([fauxAssistantMessage('Durable reply.')]); + const executionStore = await openExecutionStore(dbPath); + const eventStreamStore = createTestEventStreamStore(); + const coordinator = createNodeAgentCoordinator({ + submissions: executionStore.submissions, + sessions: executionStore.sessions, + agents: [ + { + name: 'assistant', + definition: defineAgent(() => ({ + model: `${provider.getModel().provider}/${provider.getModel().id}`, + })), + }, + ], + createContext: makeFauxCreateContext(provider, executionStore), + eventStreamStore, + }); + + const receipt = await coordinator.createAdmission( + 'assistant', + 'instance-1', + )({ message: 'Durable prompt' }); + const session = await executionStore.sessions.load( + createSessionStorageKey('instance-1', 'default', 'default'), + ); + const stream = await eventStreamStore.readEvents(agentStreamPath('assistant', 'instance-1'), { + offset: '-1', + }); + const messageEvents = stream.events + .map((event) => event.data as Record) + .filter((event) => event.type === 'message_end'); + + expect( + session?.entries + .filter((entry) => entry.type === 'message') + .map((entry) => entry.message.role), + ).toEqual(['user', 'assistant']); + expect(messageEvents.map((event) => (event.message as { role: string }).role)).toEqual([ + 'user', + 'assistant', + ]); + expect(messageEvents[0]).toMatchObject({ + eventIndex: 0, + submissionId: receipt.submissionId, + message: { + role: 'user', + content: [{ type: 'text', text: 'Durable prompt' }], + }, + }); + expect(messageEvents[1]).toMatchObject({ + submissionId: receipt.submissionId, + message: { role: 'assistant' }, + }); + expect((messageEvents[1]?.eventIndex as number) > 0).toBe(true); + }); + it('appends the terminal result before settling a direct prompt', async () => { const dbPath = createTempDbPath(); const provider = createFauxProvider(); @@ -1889,8 +1953,10 @@ describe('NodeAgentCoordinator', () => { eventStreamStore, }); - const receipt = await coordinator - .createAdmission('assistant', 'instance-1')({ message: 'Hello terminal event' }); + const receipt = await coordinator.createAdmission( + 'assistant', + 'instance-1', + )({ message: 'Hello terminal event' }); const stream = await eventStreamStore.readEvents(agentStreamPath('assistant', 'instance-1')); const terminal = stream.events .map((event) => event.data as Record) @@ -2011,12 +2077,26 @@ describe('NodeAgentCoordinator', () => { // Submission is running with no canonical input — simulates crash before input applied. // "Restart": new coordinator reconciles. - const { coordinator, executionStore } = await createFauxCoordinator(dbPath, provider); + const { coordinator, executionStore, eventStreamStore } = await createFauxCoordinator( + dbPath, + provider, + ); await coordinator.reconcileSubmissions(); const submission = await executionStore.submissions.getSubmission('direct-interrupted'); + const stream = await eventStreamStore.readEvents(agentStreamPath('assistant', 'instance-1'), { + offset: '-1', + }); + const userMessages = stream.events + .map((event) => event.data as Record) + .filter( + (event) => + event.type === 'message_end' && + (event.message as { role?: string } | undefined)?.role === 'user', + ); expect(submission).toMatchObject({ status: 'settled' }); expect(submission?.error).toBeUndefined(); + expect(userMessages).toHaveLength(1); }); it('terminalizes an interrupted direct prompt when input was applied but no response completed', async () => { @@ -2099,7 +2179,7 @@ describe('NodeAgentCoordinator', () => { await executionStore.sessions.save(storageKey, { version: 8, conversationId: 'conv_01KT3P3GZGFBCKHKMQ11A7H2HW', - affinityKey: generateSessionAffinityKey(), + affinityKey: generateSessionAffinityKey(), childSessions: [], entries: [ {