diff --git a/.changeset/famous-gifts-wave.md b/.changeset/famous-gifts-wave.md new file mode 100644 index 000000000..3f7780390 --- /dev/null +++ b/.changeset/famous-gifts-wave.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +Resolve 4 Detail bugs (maxToolSteps, JWT caching, fallback concurrency, mergeFrames perf) diff --git a/agents/src/llm/fallback_adapter.ts b/agents/src/llm/fallback_adapter.ts index e9b7255ac..287622d6b 100644 --- a/agents/src/llm/fallback_adapter.ts +++ b/agents/src/llm/fallback_adapter.ts @@ -209,12 +209,10 @@ class FallbackLLMStream extends LLMStream { extraKwargs: this.extraKwargs, }); - // Listen for error events - child LLMs emit errors via their LLM instance, not the stream - let streamError: Error | undefined; - const errorHandler = (ev: { error: Error }) => { - streamError = ev.error; - }; - llm.on('error', errorHandler); + // Suppress unhandled 'error' events from the child LLM — error detection + // is done via stream._runError to avoid cross-request contamination. + const noop = () => {}; + llm.on('error', noop); try { let shouldSetCurrent = !checkRecovery; @@ -226,9 +224,8 @@ class FallbackLLMStream extends LLMStream { yield chunk; } - // If an error was emitted but not thrown through iteration, throw it now - if (streamError) { - throw streamError; + if (stream._runError) { + throw stream._runError; } } catch (error) { if (error instanceof APIError) { @@ -258,7 +255,7 @@ class FallbackLLMStream extends LLMStream { } throw error; } finally { - llm.off('error', errorHandler); + llm.off('error', noop); } } diff --git a/agents/src/llm/llm.ts b/agents/src/llm/llm.ts index a71bd4714..f96b019b4 100644 --- a/agents/src/llm/llm.ts +++ b/agents/src/llm/llm.ts @@ -115,6 +115,8 @@ export abstract class LLMStream implements AsyncIterableIterator { protected abortController = new AbortController(); protected _connOptions: APIConnectOptions; protected logger = log(); + /** @internal – Captured when run() fails so callers can inspect after iteration ends. */ + _runError?: Error; #llm: LLM; #chatCtx: ChatContext; @@ -148,7 +150,13 @@ export abstract class LLMStream implements AsyncIterableIterator { // is run **after** the constructor has finished. Otherwise we get // runtime error when trying to access class variables in the // `run` method. - startSoon(() => this.mainTask().finally(() => this.queue.close())); + startSoon(() => + this.mainTask() + .catch((e) => { + this._runError = e instanceof Error ? e : new Error(String(e)); + }) + .finally(() => this.queue.close()), + ); } private _mainTaskImpl = async (span: Span) => { diff --git a/agents/src/telemetry/otel_http_exporter.ts b/agents/src/telemetry/otel_http_exporter.ts index 43f01faea..3f41e20ba 100644 --- a/agents/src/telemetry/otel_http_exporter.ts +++ b/agents/src/telemetry/otel_http_exporter.ts @@ -57,6 +57,7 @@ export interface SimpleOTLPHttpLogExporterConfig { export class SimpleOTLPHttpLogExporter { private readonly config: SimpleOTLPHttpLogExporterConfig; private jwt: string | null = null; + private jwtExpiresAt = 0; private static readonly FORCE_DOUBLE_KEYS = new Set([ 'transcriptConfidence', @@ -102,7 +103,7 @@ export class SimpleOTLPHttpLogExporter { } private async ensureJwt(): Promise { - if (this.jwt) return; + if (this.jwt && Date.now() < this.jwtExpiresAt) return; const apiKey = process.env.LIVEKIT_API_KEY; const apiSecret = process.env.LIVEKIT_API_SECRET; @@ -114,6 +115,7 @@ export class SimpleOTLPHttpLogExporter { const token = new AccessToken(apiKey, apiSecret, { ttl: '6h' }); token.addObservabilityGrant({ write: true }); this.jwt = await token.toJwt(); + this.jwtExpiresAt = Date.now() + 5 * 60 * 60 * 1000; } private buildPayload(records: SimpleLogRecord[]): object { diff --git a/agents/src/telemetry/pino_otel_transport.ts b/agents/src/telemetry/pino_otel_transport.ts index b21514584..0266a0307 100644 --- a/agents/src/telemetry/pino_otel_transport.ts +++ b/agents/src/telemetry/pino_otel_transport.ts @@ -96,6 +96,7 @@ export class PinoCloudExporter { private readonly batchSize: number; private readonly flushIntervalMs: number; private jwt: string | null = null; + private jwtExpiresAt = 0; private pendingLogs: any[] = []; private flushTimer: NodeJS.Timeout | null = null; @@ -172,6 +173,9 @@ export class PinoCloudExporter { await this.sendLogs(logs); } catch (error) { this.pendingLogs = [...logs, ...this.pendingLogs]; + if (this.pendingLogs.length > 10000) { + this.pendingLogs = this.pendingLogs.slice(-10000); + } console.error('[PinoCloudExporter] Failed to flush logs:', error); } } @@ -223,7 +227,7 @@ export class PinoCloudExporter { } private async ensureJwt(): Promise { - if (this.jwt) return; + if (this.jwt && Date.now() < this.jwtExpiresAt) return; const apiKey = process.env.LIVEKIT_API_KEY; const apiSecret = process.env.LIVEKIT_API_SECRET; @@ -235,6 +239,7 @@ export class PinoCloudExporter { const token = new AccessToken(apiKey, apiSecret, { ttl: '6h' }); token.addObservabilityGrant({ write: true }); this.jwt = await token.toJwt(); + this.jwtExpiresAt = Date.now() + 5 * 60 * 60 * 1000; } async shutdown(): Promise { diff --git a/agents/src/utils.ts b/agents/src/utils.ts index 2dc5d4ee1..5383e27b4 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -62,8 +62,8 @@ export const mergeFrames = (buffer: AudioBuffer): AudioFrame => { const sampleRate = buffer[0]!.sampleRate; const channels = buffer[0]!.channels; + let totalDataLength = 0; let samplesPerChannel = 0; - let data = new Int16Array(); for (const frame of buffer) { if (frame.sampleRate !== sampleRate) { @@ -74,10 +74,17 @@ export const mergeFrames = (buffer: AudioBuffer): AudioFrame => { throw new TypeError('channel count mismatch'); } - data = new Int16Array([...data, ...frame.data]); + totalDataLength += frame.data.length; samplesPerChannel += frame.samplesPerChannel; } + const data = new Int16Array(totalDataLength); + let offset = 0; + for (const frame of buffer) { + data.set(frame.data, offset); + offset += frame.data.length; + } + return new AudioFrame(data, sampleRate, channels, samplesPerChannel); } @@ -101,7 +108,7 @@ export class Queue { await once(this.#events, 'put'); } let item = this.items.shift(); - if (typeof item === 'undefined') { + if (item === undefined) { item = await _get(); } return item; diff --git a/agents/src/voice/speech_handle.ts b/agents/src/voice/speech_handle.ts index a3cde5aa6..b1a941b39 100644 --- a/agents/src/voice/speech_handle.ts +++ b/agents/src/voice/speech_handle.ts @@ -41,7 +41,7 @@ export class SpeechHandle { _tasks: Task[] = []; /** @internal */ - _numSteps = 1; + _numSteps: number; /** @internal - OpenTelemetry context for the agent turn span */ _agentTurnContext?: Context; @@ -62,6 +62,7 @@ export class SpeechHandle { public _stepIndex: number, readonly parent?: SpeechHandle, ) { + this._numSteps = _stepIndex; this.doneFut.await.finally(() => { for (const callback of this.doneCallbacks) { callback(this);