Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions packages/runtime/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,19 @@ export class SessionDataVersionError extends FlueError {
}
}

export class StreamChunkSegmentTooLargeError extends FlueError {
constructor({ serializedBytes, maximumBytes }: { serializedBytes: number; maximumBytes: number }) {
super({
type: 'stream_chunk_segment_too_large',
message: `Stream recovery segment is too large to persist (${serializedBytes} bytes; maximum ${maximumBytes} bytes).`,
details: 'The model produced more recovery data in one flush interval than the persistence backend can store safely.',
dev: 'Reduce individual streamed output size or increase stream flush frequency before retrying.',
meta: { serializedBytes, maximumBytes },
});
this.name = 'StreamChunkSegmentTooLargeError';
}
}

export class PersistedSchemaVersionError extends FlueError {
constructor({
storedVersion,
Expand Down
1 change: 1 addition & 0 deletions packages/runtime/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export {
SessionDeletedError,
SessionNotFoundError,
SessionDataVersionError,
StreamChunkSegmentTooLargeError,
SkillDefinitionValidationError,
SkillNotRegisteredError,
SubagentNotDeclaredError,
Expand Down
158 changes: 115 additions & 43 deletions packages/runtime/src/runtime/stream-chunks.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
import type { AssistantMessage, AssistantMessageEvent } from '@earendil-works/pi-ai';
import type { AgentSubmissionStore } from '../agent-execution-store.ts';
import { StreamChunkSegmentTooLargeError } from '../errors.ts';
import type { SignalMessage } from '../types.ts';

const STREAM_FLUSH_INTERVAL_MS = 3_000;
export const MAX_STREAM_CHUNK_SEGMENT_BYTES = 1_900_000;

type StreamChunkEvent = AssistantMessageEvent;
const textEncoder = new TextEncoder();

type CompactPartial = Omit<
AssistantMessage,
'role' | 'content' | 'stopReason' | 'errorMessage'
>;
type CompactStreamEvent =
| { type: 'text_delta'; contentIndex: number; delta: string }
| { type: 'text_end'; contentIndex: number; content: string }
| { type: 'thinking_start'; contentIndex: number }
| { type: 'thinking_delta'; contentIndex: number; delta: string }
| { type: 'thinking_end'; contentIndex: number; content: string }
| { type: 'toolcall' };

type StoredStreamEvent = CompactStreamEvent & { partial?: CompactPartial };

export class StreamChunkWriter {
private pending: StreamChunkEvent[] = [];
private pending: CompactStreamEvent[] = [];
private pendingPartial: AssistantMessage | undefined;
private segmentIndex = 0;
private timer: ReturnType<typeof setTimeout> | undefined;
private flushing: Promise<void> | undefined;
Expand All @@ -19,10 +36,15 @@ export class StreamChunkWriter {
readonly streamKey: string,
) {}

write(event: StreamChunkEvent): void {
write(event: AssistantMessageEvent): void {
if (!this.active || this.failed) return;
this.pending.push(event);
if (!this.timer) {
this.pendingPartial =
'partial' in event ? event.partial : event.type === 'done' ? event.message : event.error;
const compact = compactStreamEvent(event);
if (compact && (compact.type !== 'toolcall' || !this.pending.some(isToolCallMarker))) {
this.pending.push(compact);
}
if (!this.timer && this.pending.length > 0) {
this.timer = setTimeout(() => {
this.timer = undefined;
void this.flush().catch((err) => {
Expand All @@ -39,12 +61,23 @@ export class StreamChunkWriter {
this.timer = undefined;
}
if (this.flushing) await this.flushing;
if (this.failed || this.pending.length === 0) return;
const events = this.pending;
if (this.failed || this.pending.length === 0 || !this.pendingPartial) return;
const pending = this.pending;
const partial = this.pendingPartial;
this.pending = [];
this.pendingPartial = undefined;
const body = serializeStreamEvents(pending, partial);
const serializedBytes = textEncoder.encode(body).byteLength;
if (serializedBytes > MAX_STREAM_CHUNK_SEGMENT_BYTES) {
this.failed = true;
throw new StreamChunkSegmentTooLargeError({
serializedBytes,
maximumBytes: MAX_STREAM_CHUNK_SEGMENT_BYTES,
});
}
const segmentIndex = this.segmentIndex++;
this.flushing = this.store
.appendStreamChunkSegment(this.streamKey, segmentIndex, JSON.stringify(events))
.appendStreamChunkSegment(this.streamKey, segmentIndex, body)
.then((inserted) => {
if (!inserted) this.failed = true;
});
Expand All @@ -56,7 +89,6 @@ export class StreamChunkWriter {
} finally {
this.flushing = undefined;
}
// Only re-schedule if the writer is still active (not closed).
if (this.active && this.pending.length > 0 && !this.timer && !this.failed) {
this.timer = setTimeout(() => {
this.timer = undefined;
Expand Down Expand Up @@ -88,11 +120,12 @@ export function reconstructInterruptedStream(
): { partial: AssistantMessage; interrupted: SignalMessage; continued: SignalMessage } | null {
const events = segments.flatMap((segment) => parseSegment(segment.body));
const blocks: Array<AssistantMessage['content'][number] | undefined> = [];
let partial: AssistantMessage | undefined;
let partial: AssistantMessage | CompactPartial | undefined;
let sawToolCall = false;
for (const update of events) {
if ('partial' in update) partial = update.partial;
if ('partial' in update && update.partial) partial = update.partial;
if (
update.type === 'toolcall' ||
update.type === 'toolcall_start' ||
update.type === 'toolcall_delta' ||
update.type === 'toolcall_end'
Expand All @@ -101,44 +134,18 @@ export function reconstructInterruptedStream(
continue;
}
if (update.type === 'text_delta') {
const existing = blocks[update.contentIndex];
if (existing?.type === 'text') {
existing.text += update.delta;
} else {
blocks[update.contentIndex] = { type: 'text', text: update.delta };
}
appendText(blocks, update.contentIndex, update.delta);
} else if (update.type === 'text_end') {
const existing = blocks[update.contentIndex];
if (existing?.type === 'text') {
existing.text = update.content;
} else {
blocks[update.contentIndex] = { type: 'text', text: update.content };
}
blocks[update.contentIndex] = { type: 'text', text: update.content };
} else if (update.type === 'thinking_start') {
blocks[update.contentIndex] = { type: 'thinking', thinking: '' };
} else if (update.type === 'thinking_delta') {
const existing = blocks[update.contentIndex];
if (existing?.type === 'thinking') {
existing.thinking += update.delta;
} else {
blocks[update.contentIndex] = { type: 'thinking', thinking: update.delta };
}
appendThinking(blocks, update.contentIndex, update.delta);
} else if (update.type === 'thinking_end') {
const existing = blocks[update.contentIndex];
if (existing?.type === 'thinking') {
existing.thinking = update.content;
} else {
blocks[update.contentIndex] = { type: 'thinking', thinking: update.content };
}
blocks[update.contentIndex] = { type: 'thinking', thinking: update.content };
}
}
if (sawToolCall || !partial) return null;
// Reconstructed blocks intentionally omit provider signature metadata
// (textSignature, thinkingSignature) because stream deltas don't carry them.
// This is safe: recovered content is rendered as signal messages (XML) for the
// model, not sent back as provider-facing assistant blocks. If the architecture
// changes to feed recovered content directly to the provider, signatures must
// be preserved from the original partial AssistantMessage.
const content = blocks.filter((block): block is AssistantMessage['content'][number] => {
if (!block) return false;
return block.type === 'text'
Expand All @@ -148,6 +155,7 @@ export function reconstructInterruptedStream(
if (content.length === 0) return null;
const recovered: AssistantMessage = {
...partial,
role: 'assistant',
content,
stopReason: 'aborted',
errorMessage: 'Stream interrupted before completion.',
Expand All @@ -172,10 +180,74 @@ export function reconstructInterruptedStream(
};
}

function parseSegment(body: string): StreamChunkEvent[] {
function compactStreamEvent(event: AssistantMessageEvent): CompactStreamEvent | undefined {
switch (event.type) {
case 'text_delta':
return { type: event.type, contentIndex: event.contentIndex, delta: event.delta };
case 'text_end':
return { type: event.type, contentIndex: event.contentIndex, content: event.content };
case 'thinking_start':
return { type: event.type, contentIndex: event.contentIndex };
case 'thinking_delta':
return { type: event.type, contentIndex: event.contentIndex, delta: event.delta };
case 'thinking_end':
return { type: event.type, contentIndex: event.contentIndex, content: event.content };
case 'toolcall_start':
case 'toolcall_delta':
case 'toolcall_end':
return { type: 'toolcall' };
default:
return undefined;
}
}

function isToolCallMarker(event: CompactStreamEvent): boolean {
return event.type === 'toolcall';
}

function serializeStreamEvents(
pending: CompactStreamEvent[],
message: AssistantMessage,
): string {
const events: StoredStreamEvent[] = [...pending];
const last = events.at(-1);
if (last) {
const {
role: _role,
content: _content,
stopReason: _stopReason,
errorMessage: _errorMessage,
...partial
} = message;
events[events.length - 1] = { ...last, partial };
}
return JSON.stringify(events);
}

function appendText(
blocks: Array<AssistantMessage['content'][number] | undefined>,
contentIndex: number,
content: string,
): void {
const existing = blocks[contentIndex];
if (existing?.type === 'text') existing.text += content;
else blocks[contentIndex] = { type: 'text', text: content };
}

function appendThinking(
blocks: Array<AssistantMessage['content'][number] | undefined>,
contentIndex: number,
content: string,
): void {
const existing = blocks[contentIndex];
if (existing?.type === 'thinking') existing.thinking += content;
else blocks[contentIndex] = { type: 'thinking', thinking: content };
}

function parseSegment(body: string): Array<AssistantMessageEvent | StoredStreamEvent> {
try {
const parsed = JSON.parse(body) as unknown;
return Array.isArray(parsed) ? (parsed as StreamChunkEvent[]) : [];
return Array.isArray(parsed) ? (parsed as Array<AssistantMessageEvent | StoredStreamEvent>) : [];
} catch {
return [];
}
Expand Down
98 changes: 96 additions & 2 deletions packages/runtime/test/stream-chunks.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import type { AssistantMessage, AssistantMessageEvent } from '@earendil-works/pi-ai';
import { describe, expect, it, vi } from 'vitest';
import { reconstructInterruptedStream, StreamChunkWriter } from '../src/runtime/stream-chunks.ts';
import { StreamChunkSegmentTooLargeError } from '../src/index.ts';
import {
MAX_STREAM_CHUNK_SEGMENT_BYTES,
reconstructInterruptedStream,
StreamChunkWriter,
} from '../src/runtime/stream-chunks.ts';

// ─── Helpers ────────────────────────────────────────────────────────────────

Expand Down Expand Up @@ -166,7 +171,96 @@ describe('StreamChunkWriter', () => {
expect(storedSegment.streamKey).toBe('test-key');
expect(storedSegment.segmentIndex).toBe(0);
const parsed = JSON.parse(storedSegment.body);
expect(parsed).toHaveLength(2);
expect(parsed).toMatchObject([
{ type: 'text_delta', contentIndex: 0, delta: 'hello' },
{ type: 'text_delta', contentIndex: 0, delta: ' world', partial: expect.any(Object) },
]);
expect(parsed[0]).not.toHaveProperty('partial');
expect(parsed[1].partial).not.toHaveProperty('content');
});

it('stores cumulative provider partials with linear growth when text streams in many deltas', async () => {
const stored: Array<{ segmentIndex: number; body: string }> = [];
const store = {
appendStreamChunkSegment: async (_key: string, segmentIndex: number, body: string) => {
stored.push({ segmentIndex, body });
return true;
},
};
const writer = new StreamChunkWriter(store, 'linear-key');
let cumulative = '';
for (let index = 0; index < 500; index++) {
const delta = `token-${String(index)} `;
cumulative += delta;
writer.write({
type: 'text_delta',
contentIndex: 0,
delta,
partial: fakePartial([{ type: 'text', text: cumulative }]),
});
}
await writer.flush();

const totalBytes = stored.reduce(
(total, item) => total + new TextEncoder().encode(item.body).byteLength,
0,
);
expect(totalBytes).toBeLessThan(100_000);
const recovered = reconstructInterruptedStream(stored, 'linear-key');
expect(recovered?.partial.content).toEqual([{ type: 'text', text: cumulative }]);
});

it('rejects an oversized UTF-8 segment before writing it to storage', async () => {
let callCount = 0;
const store = {
appendStreamChunkSegment: async () => {
callCount++;
return true;
},
};
const writer = new StreamChunkWriter(store, 'oversized-key');
const content = '🙂"\\\n'.repeat(250_000);
writer.write({
type: 'text_delta',
contentIndex: 0,
delta: content,
partial: fakePartial([{ type: 'text', text: content }]),
});
const error = await writer.flush().catch((cause: unknown) => cause);

expect(error).toBeInstanceOf(StreamChunkSegmentTooLargeError);
expect(error).toMatchObject({
type: 'stream_chunk_segment_too_large',
meta: {
maximumBytes: MAX_STREAM_CHUNK_SEGMENT_BYTES,
serializedBytes: expect.any(Number),
},
});
expect(callCount).toBe(0);
});

it('keeps compact tool-call streams ineligible for recovery without persisting arguments', async () => {
const stored: Array<{ segmentIndex: number; body: string }> = [];
const store = {
appendStreamChunkSegment: async (_key: string, segmentIndex: number, body: string) => {
stored.push({ segmentIndex, body });
return true;
},
};
const writer = new StreamChunkWriter(store, 'tool-key');
const argumentsText = '{"prompt":"do not persist this marker"}';
writer.write({
type: 'toolcall_delta',
contentIndex: 0,
delta: argumentsText,
partial: fakePartial(),
});
await writer.flush();

expect(stored).toHaveLength(1);
expect(stored[0]?.body).not.toContain('do not persist this marker');
expect(JSON.parse(stored[0]?.body ?? '')).toMatchObject([{ type: 'toolcall' }]);
expect(reconstructInterruptedStream(stored, 'tool-key')).toBeNull();
});

it('marks itself failed on insert rejection and stops writing', async () => {
Expand Down