Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/famous-gifts-wave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": patch
---

Resolve 4 Detail bugs (maxToolSteps, JWT caching, fallback concurrency, mergeFrames perf)
14 changes: 0 additions & 14 deletions agents/src/llm/fallback_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,6 @@ class FallbackLLMStream extends LLMStream {
extraKwargs: this.extraKwargs,
});

// Listen for error events - child LLMs emit errors via their LLM instance, not the stream
let streamError: Error | undefined;
const errorHandler = (ev: { error: Error }) => {
streamError = ev.error;
};
llm.on('error', errorHandler);

try {
let shouldSetCurrent = !checkRecovery;
for await (const chunk of stream) {
Comment on lines 217 to 219
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Re-raise child LLM failures in fallback stream

tryGenerate now relies only on for await (const chunk of stream) to detect provider failures, but LLMStream reports many failures by emitting llm.on('error') and then closing the output queue (see LLMStream.emitError), so iteration can end normally without throwing. In that case this method returns as success (often with zero/partial chunks), the adapter never marks the provider unavailable, and fallback to the next LLM is skipped. This regresses the core fallback behavior whenever a child stream fails via emitted error events rather than iterator exceptions.

Useful? React with 👍 / 👎.

Expand All @@ -225,11 +218,6 @@ class FallbackLLMStream extends LLMStream {
}
yield chunk;
}

// If an error was emitted but not thrown through iteration, throw it now
if (streamError) {
throw streamError;
}
} catch (error) {
if (error instanceof APIError) {
if (checkRecovery) {
Expand Down Expand Up @@ -257,8 +245,6 @@ class FallbackLLMStream extends LLMStream {
this._log.error({ llm: llm.label(), error }, 'unexpected error, switching to next LLM');
}
throw error;
} finally {
llm.off('error', errorHandler);
}
}

Expand Down
4 changes: 3 additions & 1 deletion agents/src/telemetry/otel_http_exporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export interface SimpleOTLPHttpLogExporterConfig {
export class SimpleOTLPHttpLogExporter {
private readonly config: SimpleOTLPHttpLogExporterConfig;
private jwt: string | null = null;
private jwtExpiresAt = 0;

private static readonly FORCE_DOUBLE_KEYS = new Set([
'transcriptConfidence',
Expand Down Expand Up @@ -102,7 +103,7 @@ export class SimpleOTLPHttpLogExporter {
}

private async ensureJwt(): Promise<void> {
if (this.jwt) return;
if (this.jwt && Date.now() < this.jwtExpiresAt) return;

const apiKey = process.env.LIVEKIT_API_KEY;
const apiSecret = process.env.LIVEKIT_API_SECRET;
Expand All @@ -114,6 +115,7 @@ export class SimpleOTLPHttpLogExporter {
const token = new AccessToken(apiKey, apiSecret, { ttl: '6h' });
token.addObservabilityGrant({ write: true });
this.jwt = await token.toJwt();
this.jwtExpiresAt = Date.now() + 5 * 60 * 60 * 1000;
}

private buildPayload(records: SimpleLogRecord[]): object {
Expand Down
7 changes: 6 additions & 1 deletion agents/src/telemetry/pino_otel_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export class PinoCloudExporter {
private readonly batchSize: number;
private readonly flushIntervalMs: number;
private jwt: string | null = null;
private jwtExpiresAt = 0;
private pendingLogs: any[] = [];
private flushTimer: NodeJS.Timeout | null = null;

Expand Down Expand Up @@ -172,6 +173,9 @@ export class PinoCloudExporter {
await this.sendLogs(logs);
} catch (error) {
this.pendingLogs = [...logs, ...this.pendingLogs];
if (this.pendingLogs.length > 10000) {
this.pendingLogs = this.pendingLogs.slice(-10000);
}
console.error('[PinoCloudExporter] Failed to flush logs:', error);
}
}
Expand Down Expand Up @@ -223,7 +227,7 @@ export class PinoCloudExporter {
}

private async ensureJwt(): Promise<void> {
if (this.jwt) return;
if (this.jwt && Date.now() < this.jwtExpiresAt) return;

const apiKey = process.env.LIVEKIT_API_KEY;
const apiSecret = process.env.LIVEKIT_API_SECRET;
Expand All @@ -235,6 +239,7 @@ export class PinoCloudExporter {
const token = new AccessToken(apiKey, apiSecret, { ttl: '6h' });
token.addObservabilityGrant({ write: true });
this.jwt = await token.toJwt();
this.jwtExpiresAt = Date.now() + 5 * 60 * 60 * 1000;
}

async shutdown(): Promise<void> {
Expand Down
13 changes: 10 additions & 3 deletions agents/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ export const mergeFrames = (buffer: AudioBuffer): AudioFrame => {

const sampleRate = buffer[0]!.sampleRate;
const channels = buffer[0]!.channels;
let totalDataLength = 0;
let samplesPerChannel = 0;
let data = new Int16Array();

for (const frame of buffer) {
if (frame.sampleRate !== sampleRate) {
Expand All @@ -74,10 +74,17 @@ export const mergeFrames = (buffer: AudioBuffer): AudioFrame => {
throw new TypeError('channel count mismatch');
}

data = new Int16Array([...data, ...frame.data]);
totalDataLength += frame.data.length;
samplesPerChannel += frame.samplesPerChannel;
}

const data = new Int16Array(totalDataLength);
let offset = 0;
for (const frame of buffer) {
data.set(frame.data, offset);
offset += frame.data.length;
}

return new AudioFrame(data, sampleRate, channels, samplesPerChannel);
}

Expand All @@ -101,7 +108,7 @@ export class Queue<T> {
await once(this.#events, 'put');
}
let item = this.items.shift();
if (typeof item === 'undefined') {
if (item === undefined) {
item = await _get();
}
return item;
Expand Down
3 changes: 2 additions & 1 deletion agents/src/voice/speech_handle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class SpeechHandle {
_tasks: Task<void>[] = [];

/** @internal */
_numSteps = 1;
_numSteps: number;

/** @internal - OpenTelemetry context for the agent turn span */
_agentTurnContext?: Context;
Expand All @@ -62,6 +62,7 @@ export class SpeechHandle {
public _stepIndex: number,
readonly parent?: SpeechHandle,
) {
this._numSteps = _stepIndex;
this.doneFut.await.finally(() => {
for (const callback of this.doneCallbacks) {
callback(this);
Expand Down
Loading