diff --git a/packages/runtime/src/errors.ts b/packages/runtime/src/errors.ts index 3dc2d3c0..3dfd04fa 100644 --- a/packages/runtime/src/errors.ts +++ b/packages/runtime/src/errors.ts @@ -433,6 +433,19 @@ export class SessionDataVersionError extends FlueError { } } +export class StreamChunkSegmentTooLargeError extends FlueError { + constructor({ serializedBytes, maximumBytes }: { serializedBytes: number; maximumBytes: number }) { + super({ + type: 'stream_chunk_segment_too_large', + message: `Stream recovery segment is too large to persist (${serializedBytes} bytes; maximum ${maximumBytes} bytes).`, + details: 'The model produced more recovery data in one flush interval than the persistence backend can store safely.', + dev: 'Reduce individual streamed output size or increase stream flush frequency before retrying.', + meta: { serializedBytes, maximumBytes }, + }); + this.name = 'StreamChunkSegmentTooLargeError'; + } +} + export class PersistedSchemaVersionError extends FlueError { constructor({ storedVersion, diff --git a/packages/runtime/src/index.ts b/packages/runtime/src/index.ts index fb2a47b4..b2f2ea9b 100644 --- a/packages/runtime/src/index.ts +++ b/packages/runtime/src/index.ts @@ -35,6 +35,7 @@ export { SessionDeletedError, SessionNotFoundError, SessionDataVersionError, + StreamChunkSegmentTooLargeError, SkillDefinitionValidationError, SkillNotRegisteredError, SubagentNotDeclaredError, diff --git a/packages/runtime/src/runtime/stream-chunks.ts b/packages/runtime/src/runtime/stream-chunks.ts index 49358eab..16457e22 100644 --- a/packages/runtime/src/runtime/stream-chunks.ts +++ b/packages/runtime/src/runtime/stream-chunks.ts @@ -1,13 +1,30 @@ import type { AssistantMessage, AssistantMessageEvent } from '@earendil-works/pi-ai'; import type { AgentSubmissionStore } from '../agent-execution-store.ts'; +import { StreamChunkSegmentTooLargeError } from '../errors.ts'; import type { SignalMessage } from '../types.ts'; const STREAM_FLUSH_INTERVAL_MS = 3_000; +export const MAX_STREAM_CHUNK_SEGMENT_BYTES = 1_900_000; -type StreamChunkEvent = AssistantMessageEvent; +const textEncoder = new TextEncoder(); + +type CompactPartial = Omit< + AssistantMessage, + 'role' | 'content' | 'stopReason' | 'errorMessage' +>; +type CompactStreamEvent = + | { type: 'text_delta'; contentIndex: number; delta: string } + | { type: 'text_end'; contentIndex: number; content: string } + | { type: 'thinking_start'; contentIndex: number } + | { type: 'thinking_delta'; contentIndex: number; delta: string } + | { type: 'thinking_end'; contentIndex: number; content: string } + | { type: 'toolcall' }; + +type StoredStreamEvent = CompactStreamEvent & { partial?: CompactPartial }; export class StreamChunkWriter { - private pending: StreamChunkEvent[] = []; + private pending: CompactStreamEvent[] = []; + private pendingPartial: AssistantMessage | undefined; private segmentIndex = 0; private timer: ReturnType | undefined; private flushing: Promise | undefined; @@ -19,10 +36,15 @@ export class StreamChunkWriter { readonly streamKey: string, ) {} - write(event: StreamChunkEvent): void { + write(event: AssistantMessageEvent): void { if (!this.active || this.failed) return; - this.pending.push(event); - if (!this.timer) { + this.pendingPartial = + 'partial' in event ? event.partial : event.type === 'done' ? event.message : event.error; + const compact = compactStreamEvent(event); + if (compact && (compact.type !== 'toolcall' || !this.pending.some(isToolCallMarker))) { + this.pending.push(compact); + } + if (!this.timer && this.pending.length > 0) { this.timer = setTimeout(() => { this.timer = undefined; void this.flush().catch((err) => { @@ -39,12 +61,23 @@ export class StreamChunkWriter { this.timer = undefined; } if (this.flushing) await this.flushing; - if (this.failed || this.pending.length === 0) return; - const events = this.pending; + if (this.failed || this.pending.length === 0 || !this.pendingPartial) return; + const pending = this.pending; + const partial = this.pendingPartial; this.pending = []; + this.pendingPartial = undefined; + const body = serializeStreamEvents(pending, partial); + const serializedBytes = textEncoder.encode(body).byteLength; + if (serializedBytes > MAX_STREAM_CHUNK_SEGMENT_BYTES) { + this.failed = true; + throw new StreamChunkSegmentTooLargeError({ + serializedBytes, + maximumBytes: MAX_STREAM_CHUNK_SEGMENT_BYTES, + }); + } const segmentIndex = this.segmentIndex++; this.flushing = this.store - .appendStreamChunkSegment(this.streamKey, segmentIndex, JSON.stringify(events)) + .appendStreamChunkSegment(this.streamKey, segmentIndex, body) .then((inserted) => { if (!inserted) this.failed = true; }); @@ -56,7 +89,6 @@ export class StreamChunkWriter { } finally { this.flushing = undefined; } - // Only re-schedule if the writer is still active (not closed). if (this.active && this.pending.length > 0 && !this.timer && !this.failed) { this.timer = setTimeout(() => { this.timer = undefined; @@ -88,11 +120,12 @@ export function reconstructInterruptedStream( ): { partial: AssistantMessage; interrupted: SignalMessage; continued: SignalMessage } | null { const events = segments.flatMap((segment) => parseSegment(segment.body)); const blocks: Array = []; - let partial: AssistantMessage | undefined; + let partial: AssistantMessage | CompactPartial | undefined; let sawToolCall = false; for (const update of events) { - if ('partial' in update) partial = update.partial; + if ('partial' in update && update.partial) partial = update.partial; if ( + update.type === 'toolcall' || update.type === 'toolcall_start' || update.type === 'toolcall_delta' || update.type === 'toolcall_end' @@ -101,44 +134,18 @@ export function reconstructInterruptedStream( continue; } if (update.type === 'text_delta') { - const existing = blocks[update.contentIndex]; - if (existing?.type === 'text') { - existing.text += update.delta; - } else { - blocks[update.contentIndex] = { type: 'text', text: update.delta }; - } + appendText(blocks, update.contentIndex, update.delta); } else if (update.type === 'text_end') { - const existing = blocks[update.contentIndex]; - if (existing?.type === 'text') { - existing.text = update.content; - } else { - blocks[update.contentIndex] = { type: 'text', text: update.content }; - } + blocks[update.contentIndex] = { type: 'text', text: update.content }; } else if (update.type === 'thinking_start') { blocks[update.contentIndex] = { type: 'thinking', thinking: '' }; } else if (update.type === 'thinking_delta') { - const existing = blocks[update.contentIndex]; - if (existing?.type === 'thinking') { - existing.thinking += update.delta; - } else { - blocks[update.contentIndex] = { type: 'thinking', thinking: update.delta }; - } + appendThinking(blocks, update.contentIndex, update.delta); } else if (update.type === 'thinking_end') { - const existing = blocks[update.contentIndex]; - if (existing?.type === 'thinking') { - existing.thinking = update.content; - } else { - blocks[update.contentIndex] = { type: 'thinking', thinking: update.content }; - } + blocks[update.contentIndex] = { type: 'thinking', thinking: update.content }; } } if (sawToolCall || !partial) return null; - // Reconstructed blocks intentionally omit provider signature metadata - // (textSignature, thinkingSignature) because stream deltas don't carry them. - // This is safe: recovered content is rendered as signal messages (XML) for the - // model, not sent back as provider-facing assistant blocks. If the architecture - // changes to feed recovered content directly to the provider, signatures must - // be preserved from the original partial AssistantMessage. const content = blocks.filter((block): block is AssistantMessage['content'][number] => { if (!block) return false; return block.type === 'text' @@ -148,6 +155,7 @@ export function reconstructInterruptedStream( if (content.length === 0) return null; const recovered: AssistantMessage = { ...partial, + role: 'assistant', content, stopReason: 'aborted', errorMessage: 'Stream interrupted before completion.', @@ -172,10 +180,74 @@ export function reconstructInterruptedStream( }; } -function parseSegment(body: string): StreamChunkEvent[] { +function compactStreamEvent(event: AssistantMessageEvent): CompactStreamEvent | undefined { + switch (event.type) { + case 'text_delta': + return { type: event.type, contentIndex: event.contentIndex, delta: event.delta }; + case 'text_end': + return { type: event.type, contentIndex: event.contentIndex, content: event.content }; + case 'thinking_start': + return { type: event.type, contentIndex: event.contentIndex }; + case 'thinking_delta': + return { type: event.type, contentIndex: event.contentIndex, delta: event.delta }; + case 'thinking_end': + return { type: event.type, contentIndex: event.contentIndex, content: event.content }; + case 'toolcall_start': + case 'toolcall_delta': + case 'toolcall_end': + return { type: 'toolcall' }; + default: + return undefined; + } +} + +function isToolCallMarker(event: CompactStreamEvent): boolean { + return event.type === 'toolcall'; +} + +function serializeStreamEvents( + pending: CompactStreamEvent[], + message: AssistantMessage, +): string { + const events: StoredStreamEvent[] = [...pending]; + const last = events.at(-1); + if (last) { + const { + role: _role, + content: _content, + stopReason: _stopReason, + errorMessage: _errorMessage, + ...partial + } = message; + events[events.length - 1] = { ...last, partial }; + } + return JSON.stringify(events); +} + +function appendText( + blocks: Array, + contentIndex: number, + content: string, +): void { + const existing = blocks[contentIndex]; + if (existing?.type === 'text') existing.text += content; + else blocks[contentIndex] = { type: 'text', text: content }; +} + +function appendThinking( + blocks: Array, + contentIndex: number, + content: string, +): void { + const existing = blocks[contentIndex]; + if (existing?.type === 'thinking') existing.thinking += content; + else blocks[contentIndex] = { type: 'thinking', thinking: content }; +} + +function parseSegment(body: string): Array { try { const parsed = JSON.parse(body) as unknown; - return Array.isArray(parsed) ? (parsed as StreamChunkEvent[]) : []; + return Array.isArray(parsed) ? (parsed as Array) : []; } catch { return []; } diff --git a/packages/runtime/test/stream-chunks.test.ts b/packages/runtime/test/stream-chunks.test.ts index 0f4ae01d..dbc183bd 100644 --- a/packages/runtime/test/stream-chunks.test.ts +++ b/packages/runtime/test/stream-chunks.test.ts @@ -1,6 +1,11 @@ import type { AssistantMessage, AssistantMessageEvent } from '@earendil-works/pi-ai'; import { describe, expect, it, vi } from 'vitest'; -import { reconstructInterruptedStream, StreamChunkWriter } from '../src/runtime/stream-chunks.ts'; +import { StreamChunkSegmentTooLargeError } from '../src/index.ts'; +import { + MAX_STREAM_CHUNK_SEGMENT_BYTES, + reconstructInterruptedStream, + StreamChunkWriter, +} from '../src/runtime/stream-chunks.ts'; // ─── Helpers ──────────────────────────────────────────────────────────────── @@ -166,7 +171,96 @@ describe('StreamChunkWriter', () => { expect(storedSegment.streamKey).toBe('test-key'); expect(storedSegment.segmentIndex).toBe(0); const parsed = JSON.parse(storedSegment.body); - expect(parsed).toHaveLength(2); + expect(parsed).toMatchObject([ + { type: 'text_delta', contentIndex: 0, delta: 'hello' }, + { type: 'text_delta', contentIndex: 0, delta: ' world', partial: expect.any(Object) }, + ]); + expect(parsed[0]).not.toHaveProperty('partial'); + expect(parsed[1].partial).not.toHaveProperty('content'); + }); + + it('stores cumulative provider partials with linear growth when text streams in many deltas', async () => { + const stored: Array<{ segmentIndex: number; body: string }> = []; + const store = { + appendStreamChunkSegment: async (_key: string, segmentIndex: number, body: string) => { + stored.push({ segmentIndex, body }); + return true; + }, + }; + const writer = new StreamChunkWriter(store, 'linear-key'); + let cumulative = ''; + for (let index = 0; index < 500; index++) { + const delta = `token-${String(index)} `; + cumulative += delta; + writer.write({ + type: 'text_delta', + contentIndex: 0, + delta, + partial: fakePartial([{ type: 'text', text: cumulative }]), + }); + } + await writer.flush(); + + const totalBytes = stored.reduce( + (total, item) => total + new TextEncoder().encode(item.body).byteLength, + 0, + ); + expect(totalBytes).toBeLessThan(100_000); + const recovered = reconstructInterruptedStream(stored, 'linear-key'); + expect(recovered?.partial.content).toEqual([{ type: 'text', text: cumulative }]); + }); + + it('rejects an oversized UTF-8 segment before writing it to storage', async () => { + let callCount = 0; + const store = { + appendStreamChunkSegment: async () => { + callCount++; + return true; + }, + }; + const writer = new StreamChunkWriter(store, 'oversized-key'); + const content = '🙂"\\\n'.repeat(250_000); + writer.write({ + type: 'text_delta', + contentIndex: 0, + delta: content, + partial: fakePartial([{ type: 'text', text: content }]), + }); + const error = await writer.flush().catch((cause: unknown) => cause); + + expect(error).toBeInstanceOf(StreamChunkSegmentTooLargeError); + expect(error).toMatchObject({ + type: 'stream_chunk_segment_too_large', + meta: { + maximumBytes: MAX_STREAM_CHUNK_SEGMENT_BYTES, + serializedBytes: expect.any(Number), + }, + }); + expect(callCount).toBe(0); + }); + + it('keeps compact tool-call streams ineligible for recovery without persisting arguments', async () => { + const stored: Array<{ segmentIndex: number; body: string }> = []; + const store = { + appendStreamChunkSegment: async (_key: string, segmentIndex: number, body: string) => { + stored.push({ segmentIndex, body }); + return true; + }, + }; + const writer = new StreamChunkWriter(store, 'tool-key'); + const argumentsText = '{"prompt":"do not persist this marker"}'; + writer.write({ + type: 'toolcall_delta', + contentIndex: 0, + delta: argumentsText, + partial: fakePartial(), + }); + await writer.flush(); + + expect(stored).toHaveLength(1); + expect(stored[0]?.body).not.toContain('do not persist this marker'); + expect(JSON.parse(stored[0]?.body ?? '')).toMatchObject([{ type: 'toolcall' }]); + expect(reconstructInterruptedStream(stored, 'tool-key')).toBeNull(); }); it('marks itself failed on insert rejection and stops writing', async () => {