From df624a84b7b245b48db3c74c46fe980eb48b23cc Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 27 Apr 2026 10:05:37 +0000 Subject: [PATCH 1/9] feat(liveavatar): port plugin from python with video_quality param Ports the Python `livekit-plugins-liveavatar` plugin into agents-js as `@livekit/agents-plugin-liveavatar`, including the new `videoQuality` parameter from livekit/agents#5552. The plugin mirrors the Python implementation: it brings up a LiveAvatar streaming session, opens the realtime websocket, captures the agent's audio output through a queue-based AudioOutput, resamples to 24 kHz mono, and forwards base64-encoded chunks (~600 ms first chunk, ~1 s subsequent) to the LiveAvatar service. Inbound websocket events drive playback start/finish notifications back into the AgentSession. Also exports `voice.AudioOutput` (and its companion types) from `@livekit/agents` so plugin authors can subclass the abstract audio sink. Refs: livekit/agents#5552 https://claude.ai/code/session_01DE5pBrf3y1bFgLTK8NDTkB --- .changeset/liveavatar-plugin.md | 10 + agents/src/voice/index.ts | 8 +- plugins/liveavatar/README.md | 81 ++++ plugins/liveavatar/package.json | 53 +++ plugins/liveavatar/src/api.ts | 202 +++++++++ plugins/liveavatar/src/avatar.ts | 661 ++++++++++++++++++++++++++++++ plugins/liveavatar/src/index.ts | 19 + plugins/liveavatar/src/log.ts | 7 + plugins/liveavatar/tsconfig.json | 15 + plugins/liveavatar/tsup.config.ts | 9 + pnpm-lock.yaml | 31 ++ turbo.json | 3 + 12 files changed, 1098 insertions(+), 1 deletion(-) create mode 100644 .changeset/liveavatar-plugin.md create mode 100644 plugins/liveavatar/README.md create mode 100644 plugins/liveavatar/package.json create mode 100644 plugins/liveavatar/src/api.ts create mode 100644 plugins/liveavatar/src/avatar.ts create mode 100644 plugins/liveavatar/src/index.ts create mode 100644 plugins/liveavatar/src/log.ts create mode 100644 plugins/liveavatar/tsconfig.json create mode 100644 plugins/liveavatar/tsup.config.ts diff --git a/.changeset/liveavatar-plugin.md b/.changeset/liveavatar-plugin.md new file mode 100644 index 000000000..fc1c185c2 --- /dev/null +++ b/.changeset/liveavatar-plugin.md @@ -0,0 +1,10 @@ +--- +'@livekit/agents-plugin-liveavatar': minor +'@livekit/agents': minor +--- + +Port the `liveavatar` plugin from the Python `livekit-agents` repo, including the new `videoQuality` parameter from livekit/agents#5552. + +The new `@livekit/agents-plugin-liveavatar` package adds a LiveAvatar `AvatarSession` that mirrors the Python plugin: it brings up a LiveAvatar streaming session, opens the realtime websocket, captures the agent's audio output through a queue-based `AudioOutput`, resamples to 24 kHz mono, and forwards base64-encoded chunks (~600 ms first chunk, ~1 s subsequent) to the LiveAvatar service. Inbound websocket events drive playback start/finish notifications back into the `AgentSession`. + +Also exports `voice.AudioOutput` (and its companion `AudioOutputCapabilities` / `PlaybackFinishedEvent` / `PlaybackStartedEvent` types) from `@livekit/agents` so plugin authors can subclass the abstract audio sink. diff --git a/agents/src/voice/index.ts b/agents/src/voice/index.ts index b9b3a62e7..808ac88c1 100644 --- a/agents/src/voice/index.ts +++ b/agents/src/voice/index.ts @@ -22,7 +22,13 @@ export { RoomSessionTransport, } from './remote_session.js'; export * from './events.js'; -export { type TimedString } from './io.js'; +export { + AudioOutput, + type AudioOutputCapabilities, + type PlaybackFinishedEvent, + type PlaybackStartedEvent, + type TimedString, +} from './io.js'; export * from './report.js'; export * from './room_io/index.js'; export { RunContext } from './run_context.js'; diff --git a/plugins/liveavatar/README.md b/plugins/liveavatar/README.md new file mode 100644 index 000000000..934062f7c --- /dev/null +++ b/plugins/liveavatar/README.md @@ -0,0 +1,81 @@ + + +# LiveAvatar plugin for LiveKit Agents + +Support for [LiveAvatar](https://www.liveavatar.com) interactive avatars. + +This is the JS/TS port of the Python `livekit-plugins-liveavatar` plugin. See [https://docs.livekit.io/agents/integrations/avatar/](https://docs.livekit.io/agents/integrations/avatar/) for more information. + +## Installation + +```bash +npm install @livekit/agents-plugin-liveavatar +``` + +or + +```bash +pnpm add @livekit/agents-plugin-liveavatar +``` + +## Pre-requisites + +Create a developer API key from the LiveAvatar dashboard and set the `LIVEAVATAR_API_KEY` environment variable with it: + +```bash +export LIVEAVATAR_API_KEY= +``` + +## Usage + +```typescript +import { AvatarSession } from '@livekit/agents-plugin-liveavatar'; +import { AgentSession } from '@livekit/agents'; + +const avatarSession = new AvatarSession({ + avatarId: 'your-avatar-id', // or via LIVEAVATAR_AVATAR_ID + apiKey: process.env.LIVEAVATAR_API_KEY, + videoQuality: 'high', // optional: 'very_high' | 'high' | 'medium' | 'low' +}); + +await avatarSession.start(agentSession, room, { + livekitUrl: process.env.LIVEKIT_URL, + livekitApiKey: process.env.LIVEKIT_API_KEY, + livekitApiSecret: process.env.LIVEKIT_API_SECRET, +}); +``` + +## API + +### `AvatarSession` + +#### Constructor Options + +- `avatarId?: string` — The LiveAvatar avatar id. Falls back to `LIVEAVATAR_AVATAR_ID`. +- `apiUrl?: string` — Override the LiveAvatar API base URL. +- `apiKey?: string` — Your LiveAvatar API key. Falls back to `LIVEAVATAR_API_KEY`. +- `isSandbox?: boolean` — Use the LiveAvatar sandbox (1 minute connection limit). Defaults to `false`. +- `videoQuality?: 'very_high' | 'high' | 'medium' | 'low'` — Avatar video quality requested from the service. When omitted, the LiveAvatar service decides. +- `avatarParticipantIdentity?: string` — Identity for the avatar participant. Defaults to `'liveavatar-avatar-agent'`. +- `avatarParticipantName?: string` — Display name for the avatar participant. Defaults to `'liveavatar-avatar-agent'`. +- `connOptions?: APIConnectOptions` — API retry/timeout options. + +#### Methods + +##### `start(agentSession, room, options?)` + +Starts the avatar session, brings up a LiveAvatar streaming session, opens the realtime websocket, and routes the agent's audio output through to the avatar. + +**StartOptions:** + +- `livekitUrl?: string` — Falls back to `LIVEKIT_URL`. +- `livekitApiKey?: string` — Falls back to `LIVEKIT_API_KEY`. +- `livekitApiSecret?: string` — Falls back to `LIVEKIT_API_SECRET`. + +## License + +Apache 2.0 diff --git a/plugins/liveavatar/package.json b/plugins/liveavatar/package.json new file mode 100644 index 000000000..c4ec1d029 --- /dev/null +++ b/plugins/liveavatar/package.json @@ -0,0 +1,53 @@ +{ + "name": "@livekit/agents-plugin-liveavatar", + "version": "1.3.0", + "description": "LiveAvatar avatar plugin for LiveKit Node Agents", + "main": "dist/index.js", + "require": "dist/index.cjs", + "types": "dist/index.d.ts", + "exports": { + "import": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + }, + "require": { + "types": "./dist/index.d.cts", + "default": "./dist/index.cjs" + } + }, + "author": "LiveKit", + "type": "module", + "repository": "git@github.com:livekit/agents-js.git", + "license": "Apache-2.0", + "files": [ + "dist", + "src", + "README.md" + ], + "scripts": { + "build": "tsup --onSuccess \"pnpm build:types\"", + "build:types": "tsc --declaration --emitDeclarationOnly && node ../../scripts/copyDeclarationOutput.js", + "clean": "rm -rf dist", + "clean:build": "pnpm clean && pnpm build", + "lint": "eslint -f unix \"src/**/*.{ts,js}\"", + "api:check": "api-extractor run --typescript-compiler-folder ../../node_modules/typescript", + "api:update": "api-extractor run --local --typescript-compiler-folder ../../node_modules/typescript --verbose" + }, + "devDependencies": { + "@livekit/agents": "workspace:*", + "@livekit/rtc-node": "catalog:", + "@microsoft/api-extractor": "^7.35.0", + "@types/ws": "catalog:", + "pino": "^8.19.0", + "tsup": "^8.3.5", + "typescript": "^5.0.0" + }, + "dependencies": { + "livekit-server-sdk": "^2.13.3", + "ws": "catalog:" + }, + "peerDependencies": { + "@livekit/agents": "workspace:*", + "@livekit/rtc-node": "catalog:" + } +} diff --git a/plugins/liveavatar/src/api.ts b/plugins/liveavatar/src/api.ts new file mode 100644 index 000000000..e206be634 --- /dev/null +++ b/plugins/liveavatar/src/api.ts @@ -0,0 +1,202 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { + type APIConnectOptions, + APIConnectionError, + APIStatusError, + DEFAULT_API_CONNECT_OPTIONS, +} from '@livekit/agents'; +import { log } from './log.js'; + +export const DEFAULT_API_URL = 'https://api.liveavatar.com/v1/sessions'; + +// Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 41 line +export type VideoQuality = 'very_high' | 'high' | 'medium' | 'low'; + +/** + * Exception thrown when the LiveAvatar plugin or the LiveAvatar service errors. + */ +export class LiveAvatarException extends Error { + constructor(message: string) { + super(message); + this.name = 'LiveAvatarException'; + } +} + +export interface CreateStreamingSessionOptions { + livekitUrl: string; + livekitToken: string; + roomName: string; + avatarId: string; + isSandbox?: boolean; + // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py - 62 line + videoQuality?: VideoQuality | null; +} + +export interface SessionResponse { + data: { + session_id: string; + session_token: string; + [key: string]: unknown; + }; + code: number; + [key: string]: unknown; +} + +export interface StartSessionResponse { + data: { + ws_url: string; + [key: string]: unknown; + }; + code: number; + [key: string]: unknown; +} + +export interface StopSessionResponse { + data: Record; + code: number; + [key: string]: unknown; +} + +export interface LiveAvatarAPIOptions { + apiKey?: string; + apiUrl?: string; + connOptions?: APIConnectOptions; +} + +/** + * Thin client for the LiveAvatar HTTP API. + * + * Mirrors `livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py`. + */ +export class LiveAvatarAPI { + private apiKey: string; + private apiUrl: string; + private connOptions: APIConnectOptions; + + #logger = log(); + + constructor(options: LiveAvatarAPIOptions = {}) { + const apiKey = options.apiKey ?? process.env.LIVEAVATAR_API_KEY ?? ''; + if (!apiKey) { + throw new LiveAvatarException('api_key or LIVEAVATAR_API_KEY must be set'); + } + this.apiKey = apiKey; + this.apiUrl = options.apiUrl || DEFAULT_API_URL; + this.connOptions = options.connOptions || DEFAULT_API_CONNECT_OPTIONS; + } + + /** + * Create a new streaming session, returning the session id and session token. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py - 51-87 lines + */ + async createStreamingSession(opts: CreateStreamingSessionOptions): Promise { + const livekitConfig = { + livekit_room: opts.roomName, + livekit_url: opts.livekitUrl, + livekit_client_token: opts.livekitToken, + }; + + const payload: Record = { + mode: 'LITE', + avatar_id: opts.avatarId, + is_sandbox: opts.isSandbox ?? false, + livekit_config: livekitConfig, + }; + + // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py - 75-76 lines + if (opts.videoQuality != null) { + payload.video_quality = opts.videoQuality; + } + + const headers = { + accept: 'application/json', + 'content-type': 'application/json', + 'X-API-KEY': this.apiKey, + }; + return (await this.post('/token', payload, headers)) as SessionResponse; + } + + /** + * Start a previously created streaming session. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py - 92-97 lines + */ + async startStreamingSession( + sessionId: string, + sessionToken: string, + ): Promise { + const payload = { session_id: sessionId }; + const headers = { + 'content-type': 'application/json', + Authorization: `Bearer ${sessionToken}`, + }; + return (await this.post('/start', payload, headers)) as StartSessionResponse; + } + + /** + * Stop a running streaming session. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py - 99-107 lines + */ + async stopStreamingSession( + sessionId: string, + sessionToken: string, + ): Promise { + const payload = { session_id: sessionId, reason: 'USER_DISCONNECTED' }; + const headers = { + 'content-type': 'application/json', + Authorization: `Bearer ${sessionToken}`, + }; + return (await this.post('/stop', payload, headers)) as StopSessionResponse; + } + + /** + * POST helper with the same retry/backoff semantics as the Python plugin. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py - 109-138 lines + */ + private async post( + endpoint: string, + payload: Record, + headers: Record, + ): Promise { + const url = this.apiUrl + endpoint; + const maxRetry = this.connOptions.maxRetry; + for (let i = 0; i < maxRetry; i++) { + try { + const response = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(payload), + signal: AbortSignal.timeout(this.connOptions.timeoutMs), + }); + if (!response.ok) { + const text = await response.text(); + throw new APIStatusError({ + message: `Server returned an error for ${url}: ${response.status}`, + options: { statusCode: response.status, body: { error: text } }, + }); + } + return await response.json(); + } catch (e) { + if (e instanceof APIStatusError && !e.retryable) { + throw e; + } + this.#logger.warn( + { error: String(e), url, attempt: i }, + `API request to ${url} failed on attempt ${i}`, + ); + } + + if (i < maxRetry - 1) { + await new Promise((resolve) => setTimeout(resolve, this.connOptions.retryIntervalMs)); + } + } + throw new APIConnectionError({ + message: `Failed to call LiveAvatar API after ${maxRetry} retries`, + }); + } +} diff --git a/plugins/liveavatar/src/avatar.ts b/plugins/liveavatar/src/avatar.ts new file mode 100644 index 000000000..88840bcc9 --- /dev/null +++ b/plugins/liveavatar/src/avatar.ts @@ -0,0 +1,661 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { + type APIConnectOptions, + APIConnectionError, + DEFAULT_API_CONNECT_OPTIONS, + Future, + getJobContext, + shortuuid, + stream as streamNs, + voice, +} from '@livekit/agents'; +import { type AudioFrame, AudioResampler, type Room } from '@livekit/rtc-node'; +import type { VideoGrant } from 'livekit-server-sdk'; +import { AccessToken } from 'livekit-server-sdk'; +import { type RawData, WebSocket } from 'ws'; +import { LiveAvatarAPI, LiveAvatarException, type VideoQuality } from './api.js'; +import { log } from './log.js'; + +const ATTRIBUTE_PUBLISH_ON_BEHALF = 'lk.publish_on_behalf'; +const SAMPLE_RATE = 24000; +const KEEP_ALIVE_INTERVAL_MS = 60_000; +const FIRST_CHUNK_THRESHOLD_MS = 600; +const SUBSEQUENT_CHUNK_THRESHOLD_MS = 1_000; +const AVATAR_AGENT_IDENTITY = 'liveavatar-avatar-agent'; +const AVATAR_AGENT_NAME = 'liveavatar-avatar-agent'; + +/** + * Sentinel pushed onto the audio queue to mark the end of an agent speech segment. + * + * Ref: python livekit-agents/livekit/agents/voice/avatar/_types.py (AudioSegmentEnd) + */ +class AudioSegmentEnd {} + +type AudioQueueItem = AudioFrame | AudioSegmentEnd; + +/** + * AudioOutput that captures agent speech frames into a queue for the LiveAvatar + * websocket forwarder to consume. Mirrors Python's `QueueAudioOutput` in spirit, + * but with a single queue + segment-end sentinel rather than a typed queue. + * + * Ref: python livekit-agents/livekit/agents/voice/avatar/_queue_io.py + */ +class QueueAudioOutput extends voice.AudioOutput { + private channel: streamNs.StreamChannel = streamNs.createStreamChannel(); + private startedSegment = false; + + constructor(sampleRate: number) { + super(sampleRate, undefined, { pause: false }); + } + + /** Returns the underlying readable stream of audio frames + segment-end sentinels. */ + stream(): ReturnType['stream']> { + return this.channel.stream(); + } + + override async captureFrame(frame: AudioFrame): Promise { + await super.captureFrame(frame); + this.startedSegment = true; + if (!this.channel.closed) { + await this.channel.write(frame); + } + } + + override flush(): void { + super.flush(); + if (this.startedSegment && !this.channel.closed) { + // Best-effort write — the consumer will drain on its own loop. + void this.channel.write(new AudioSegmentEnd()); + this.startedSegment = false; + } + } + + override clearBuffer(): void { + this.emit('clear_buffer'); + this.startedSegment = false; + } + + async aclose(): Promise { + if (!this.channel.closed) { + await this.channel.close(); + } + } + + // Convenience helpers exposed for the AvatarSession driver. + notifyPlaybackStarted(createdAt: number = Date.now()): void { + this.onPlaybackStarted(createdAt); + } + + notifyPlaybackFinished(playbackPosition: number, interrupted: boolean): void { + this.onPlaybackFinished({ playbackPosition, interrupted }); + } +} + +/** + * Options for configuring an AvatarSession. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 47-58 lines + */ +export interface AvatarSessionOptions { + /** The LiveAvatar avatar id. Falls back to the `LIVEAVATAR_AVATAR_ID` env var. */ + avatarId?: string; + /** Override the LiveAvatar API base URL. */ + apiUrl?: string; + /** LiveAvatar API key. Falls back to the `LIVEAVATAR_API_KEY` env var. */ + apiKey?: string; + /** When true, use the LiveAvatar sandbox (1 minute connection limit). */ + isSandbox?: boolean; + /** Avatar video quality. When omitted, the LiveAvatar service decides. */ + videoQuality?: VideoQuality; + /** Identity for the avatar participant. Defaults to `liveavatar-avatar-agent`. */ + avatarParticipantIdentity?: string; + /** Display name for the avatar participant. Defaults to `liveavatar-avatar-agent`. */ + avatarParticipantName?: string; + /** API retry/timeout options. */ + connOptions?: APIConnectOptions; +} + +/** + * Optional LiveKit credentials for {@link AvatarSession.start}; falls back to env vars. + */ +export interface StartOptions { + livekitUrl?: string; + livekitApiKey?: string; + livekitApiSecret?: string; +} + +/** + * A LiveAvatar interactive avatar session. + * + * This class manages the connection between a LiveKit agent and a LiveAvatar avatar: + * it brings up a LiveAvatar streaming session, opens the realtime websocket, captures + * the agent's audio output, and forwards it (resampled, base64-encoded) to the avatar + * service. Inbound websocket events drive playback start/finish notifications back into + * the agent session so the speech handle can complete correctly. + * + * @example + * ```typescript + * const avatar = new AvatarSession({ + * avatarId: 'your-avatar-id', + * apiKey: process.env.LIVEAVATAR_API_KEY, + * videoQuality: 'high', + * }); + * await avatar.start(agentSession, room); + * ``` + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py + */ +export class AvatarSession { + private avatarId: string | null; + private apiUrl?: string; + private apiKey: string; + private isSandbox: boolean; + private videoQuality: VideoQuality | null; + private avatarParticipantIdentity: string; + private avatarParticipantName: string; + private connOptions: APIConnectOptions; + + private api: LiveAvatarAPI; + private sessionId: string | null = null; + private sessionToken: string | null = null; + private wsUrl: string | null = null; + + private audioBuffer?: QueueAudioOutput; + private msgChannel?: streamNs.StreamChannel>; + private msgChannelClosed = false; + private mainTaskPromise?: Promise; + private agentSession?: voice.AgentSession; + private room?: Room; + private localParticipantIdentity = ''; + + private audioResampler: AudioResampler | null = null; + private resamplerInputRate: number | null = null; + + private audioPlaying = false; + private avatarSpeaking = false; + private avatarInterrupted = false; + private playbackPosition = 0; + private sessionConnectedFuture: Future = new Future(); + private chunkInterrupted = false; + private closing = false; + + #logger = log(); + + constructor(options: AvatarSessionOptions = {}) { + this.avatarId = options.avatarId ?? process.env.LIVEAVATAR_AVATAR_ID ?? null; + this.apiUrl = options.apiUrl; + this.apiKey = options.apiKey ?? process.env.LIVEAVATAR_API_KEY ?? ''; + this.isSandbox = options.isSandbox ?? false; + // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 75 line + this.videoQuality = options.videoQuality ?? null; + this.avatarParticipantIdentity = options.avatarParticipantIdentity || AVATAR_AGENT_IDENTITY; + this.avatarParticipantName = options.avatarParticipantName || AVATAR_AGENT_NAME; + this.connOptions = options.connOptions || DEFAULT_API_CONNECT_OPTIONS; + + this.api = new LiveAvatarAPI({ + apiKey: this.apiKey, + apiUrl: this.apiUrl, + connOptions: this.connOptions, + }); + } + + /** + * Start the avatar session and wire it into the agent session's audio output. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 89-178 lines + */ + async start( + agentSession: voice.AgentSession, + room: Room, + options: StartOptions = {}, + ): Promise { + this.agentSession = agentSession; + this.room = room; + + const livekitUrl = options.livekitUrl || process.env.LIVEKIT_URL; + const livekitApiKey = options.livekitApiKey || process.env.LIVEKIT_API_KEY; + const livekitApiSecret = options.livekitApiSecret || process.env.LIVEKIT_API_SECRET; + if (!livekitUrl || !livekitApiKey || !livekitApiSecret) { + throw new LiveAvatarException( + 'livekit_url, livekit_api_key, and livekit_api_secret must be set', + ); + } + + // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 109-115 lines + try { + const jobCtx = getJobContext(); + this.localParticipantIdentity = jobCtx.agent?.identity || ''; + if (!this.localParticipantIdentity && room.localParticipant) { + this.localParticipantIdentity = room.localParticipant.identity; + } + } catch { + if (!room.isConnected || !room.localParticipant) { + throw new LiveAvatarException('failed to get local participant identity'); + } + this.localParticipantIdentity = room.localParticipant.identity; + } + if (!this.localParticipantIdentity) { + throw new LiveAvatarException('failed to get local participant identity'); + } + + // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 117-128 lines + const at = new AccessToken(livekitApiKey, livekitApiSecret, { + identity: this.avatarParticipantIdentity, + name: this.avatarParticipantName, + }); + at.kind = 'agent'; + at.addGrant({ roomJoin: true, room: room.name } as VideoGrant); + at.attributes = { [ATTRIBUTE_PUBLISH_ON_BEHALF]: this.localParticipantIdentity }; + const livekitToken = await at.toJwt(); + + this.#logger.debug('starting avatar session'); + + if (!this.avatarId) { + throw new LiveAvatarException('avatar_id must be set'); + } + + // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 135-145 lines + const sessionConfig = await this.api.createStreamingSession({ + livekitUrl, + livekitToken, + roomName: room.name ?? '', + avatarId: this.avatarId, + isSandbox: this.isSandbox, + videoQuality: this.videoQuality, + }); + this.sessionId = sessionConfig.data.session_id; + this.sessionToken = sessionConfig.data.session_token; + this.#logger.info({ sessionId: this.sessionId }, 'LiveAvatar session created'); + + if (!this.sessionId || !this.sessionToken) { + throw new LiveAvatarException('LiveAvatar session creation returned no session id/token'); + } + + const startData = await this.api.startStreamingSession(this.sessionId, this.sessionToken); + this.wsUrl = startData.data.ws_url; + this.#logger.info('LiveAvatar streaming session started'); + + // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 158-164 lines + this.msgChannel = streamNs.createStreamChannel>(); + agentSession.on(voice.AgentSessionEventTypes.AgentStateChanged, (ev) => { + if (ev.newState === 'idle') { + this.sendEvent({ type: 'agent.stop_listening', event_id: shortuuid() }); + } + }); + agentSession.on(voice.AgentSessionEventTypes.Close, () => { + this.closeMsgChannel(); + }); + + // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 166-173 lines + this.audioBuffer = new QueueAudioOutput(SAMPLE_RATE); + this.audioBuffer.on('clear_buffer', () => this.onClearBuffer()); + agentSession.output.audio = this.audioBuffer; + + this.mainTaskPromise = this.mainTask(); + + // Best-effort cleanup on job shutdown. + try { + const jobCtx = getJobContext(); + jobCtx.addShutdownCallback(async () => { + await this.aclose(); + }); + } catch { + // No active job context — caller is expected to manage lifecycle. + } + } + + /** + * Stop the avatar session, drain queues, and close the websocket. + */ + async aclose(): Promise { + this.closing = true; + this.closeMsgChannel(); + if (this.audioBuffer) { + await this.audioBuffer.aclose(); + } + if (this.mainTaskPromise) { + try { + await this.mainTaskPromise; + } catch { + // logged in mainTask + } + } + } + + /** + * Send an event over the websocket message queue. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 207-209 lines + */ + sendEvent(msg: Record): void { + if (!this.msgChannel || this.msgChannelClosed) return; + void this.msgChannel.write(msg).catch(() => { + // channel closed — drop the event + }); + } + + private closeMsgChannel(): void { + if (this.msgChannel && !this.msgChannelClosed) { + this.msgChannelClosed = true; + void this.msgChannel.close().catch(() => { + // ignore double-close + }); + } + } + + /** + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 180-196 lines + */ + private onClearBuffer(): void { + this.chunkInterrupted = true; + const wasPlaying = this.audioPlaying; + this.audioPlaying = false; + + if (wasPlaying && this.audioBuffer) { + this.audioBuffer.notifyPlaybackFinished(this.playbackPosition, true); + if (this.avatarSpeaking) { + this.sendEvent({ type: 'agent.interrupt', event_id: shortuuid() }); + } + this.playbackPosition = 0; + } + } + + /** + * Resample frame to {@link SAMPLE_RATE} mono. Mirrors the lazy resampler swap + * in the Python plugin: when the input rate changes we discard the old resampler. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 198-216 lines + */ + private *resampleAudio(frame: AudioFrame): IterableIterator { + if (this.audioResampler && this.resamplerInputRate !== frame.sampleRate) { + this.audioResampler.close(); + this.audioResampler = null; + this.resamplerInputRate = null; + } + if (!this.audioResampler && (frame.sampleRate !== SAMPLE_RATE || frame.channels !== 1)) { + this.audioResampler = new AudioResampler(frame.sampleRate, SAMPLE_RATE, 1); + this.resamplerInputRate = frame.sampleRate; + } + if (this.audioResampler) { + yield* this.audioResampler.push(frame); + } else { + yield frame; + } + } + + /** + * Main task: opens the websocket and runs forward/send/recv/keep-alive loops. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 218-308 lines + */ + private async mainTask(): Promise { + if (!this.wsUrl) { + throw new LiveAvatarException('ws_url not set'); + } + const ws = new WebSocket(this.wsUrl); + await new Promise((resolve, reject) => { + ws.once('open', resolve); + ws.once('error', reject); + }); + + let resetKeepAlive = () => {}; + const closingResolver = new Future(); + + const forwardAudio = async (): Promise => { + if (!this.audioBuffer) return; + await this.sessionConnectedFuture.await; + + let chunkBuf: Uint8Array[] = []; + let chunkDurationMs = 0; + let isFirstChunk = true; + + const flushChunk = () => { + if (chunkBuf.length === 0) return; + const total = chunkBuf.reduce((acc, c) => acc + c.length, 0); + const merged = new Uint8Array(total); + let offset = 0; + for (const c of chunkBuf) { + merged.set(c, offset); + offset += c.length; + } + const encoded = Buffer.from(merged).toString('base64'); + this.sendEvent({ type: 'agent.speak', event_id: shortuuid(), audio: encoded }); + this.playbackPosition += chunkDurationMs / 1000; + chunkBuf = []; + chunkDurationMs = 0; + isFirstChunk = false; + }; + + const discardChunk = () => { + chunkBuf = []; + chunkDurationMs = 0; + isFirstChunk = true; + }; + + const reader = this.audioBuffer.stream().getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (this.chunkInterrupted) { + this.chunkInterrupted = false; + discardChunk(); + } + + if (value instanceof AudioSegmentEnd) { + flushChunk(); + this.sendEvent({ type: 'agent.speak_end', event_id: shortuuid() }); + this.sendEvent({ type: 'agent.start_listening', event_id: shortuuid() }); + isFirstChunk = true; + continue; + } + + if (!this.audioPlaying) { + this.audioPlaying = true; + } + for (const resampled of this.resampleAudio(value)) { + chunkBuf.push(new Uint8Array(resampled.data.buffer)); + const frameDurationMs = (resampled.samplesPerChannel / resampled.sampleRate) * 1000; + chunkDurationMs += frameDurationMs; + const thresholdMs = isFirstChunk + ? FIRST_CHUNK_THRESHOLD_MS + : SUBSEQUENT_CHUNK_THRESHOLD_MS; + if (chunkDurationMs >= thresholdMs) { + flushChunk(); + } + } + } + } finally { + reader.releaseLock(); + } + }; + + const sendTask = async (): Promise => { + if (!this.msgChannel) return; + const reader = this.msgChannel.stream().getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + try { + ws.send(JSON.stringify(value)); + resetKeepAlive(); + } catch (e) { + this.#logger.warn({ error: String(e) }, 'failed to send LiveAvatar event'); + break; + } + } + } finally { + reader.releaseLock(); + try { + ws.close(); + } catch { + // ignore + } + this.closing = true; + closingResolver.resolve(); + } + }; + + const recvTask = async (): Promise => { + const messages: Promise[] = []; + const queue: RawData[] = []; + let waiter: ((m: RawData) => void) | null = null; + + ws.on('message', (data: RawData) => { + if (waiter) { + const w = waiter; + waiter = null; + w(data); + } else { + queue.push(data); + } + }); + const closedFuture = new Future(); + ws.on('close', () => closedFuture.resolve()); + ws.on('error', () => closedFuture.resolve()); + + const nextMessage = (): Promise => + new Promise((resolve) => { + if (queue.length > 0) { + resolve(queue.shift()!); + return; + } + waiter = (m: RawData) => resolve(m); + void closedFuture.await.then(() => { + if (waiter) { + waiter = null; + resolve(null); + } + }); + }); + + while (true) { + const msg = await nextMessage(); + if (msg === null) { + if (this.closing) return; + if (this.isSandbox) { + this.#logger.warn('The LiveAvatar Sandbox connection surpassed the 1 minute limit'); + return; + } + throw new APIConnectionError({ message: 'LiveAvatar connection closed unexpectedly.' }); + } + let parsed: { type?: string; state?: string }; + try { + parsed = JSON.parse(msg.toString()) as { type?: string; state?: string }; + } catch { + continue; + } + switch (parsed.type) { + case 'session.state_updated': + this.#logger.debug({ state: parsed.state }, 'LiveAvatar session state'); + if (parsed.state === 'connected') { + if (!this.sessionConnectedFuture.done) { + this.sessionConnectedFuture.resolve(); + } + } + break; + case 'agent.speak_interrupted': + this.handleAgentSpeakInterrupted(); + break; + case 'agent.speak_ended': + this.handleAgentSpeakEnded(); + break; + case 'agent.speak_started': + this.handleAgentSpeakStarted(); + break; + default: + this.#logger.debug({ type: parsed.type }, 'Unhandled LiveAvatar event'); + } + } + // unreachable + void messages; + }; + + const keepAliveTask = async (): Promise => { + await this.sessionConnectedFuture.await; + let timer: NodeJS.Timeout | null = null; + const tick = () => { + if (this.closing) return; + this.sendEvent({ type: 'session.keep_alive', event_id: shortuuid() }); + timer = setTimeout(tick, KEEP_ALIVE_INTERVAL_MS); + }; + timer = setTimeout(tick, KEEP_ALIVE_INTERVAL_MS); + resetKeepAlive = () => { + if (timer) clearTimeout(timer); + if (!this.closing) { + timer = setTimeout(tick, KEEP_ALIVE_INTERVAL_MS); + } + }; + await closingResolver.await; + if (timer) clearTimeout(timer); + }; + + try { + await Promise.race([forwardAudio(), sendTask(), recvTask(), keepAliveTask()]); + } catch (e) { + this.#logger.warn({ error: String(e) }, 'LiveAvatar main task error'); + } finally { + this.closing = true; + closingResolver.resolve(); + + try { + if (this.sessionId && this.sessionToken) { + const data = await this.api.stopStreamingSession(this.sessionId, this.sessionToken); + if (data.code <= 200) { + this.#logger.info({ sessionId: this.sessionId }, 'LiveAvatar session stopped'); + } + } + } catch (e) { + this.#logger.warn({ error: String(e) }, 'Failed to stop LiveAvatar session'); + } + + if (this.audioBuffer) { + await this.audioBuffer.aclose(); + } + if (this.audioResampler) { + try { + this.audioResampler.close(); + } catch { + // ignore + } + this.audioResampler = null; + } + try { + ws.close(); + } catch { + // ignore + } + } + } + + /** + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 310-311 lines + */ + private handleAgentSpeakInterrupted(): void { + this.avatarInterrupted = true; + } + + /** + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 313-322 lines + */ + private handleAgentSpeakEnded(): void { + this.avatarSpeaking = false; + if (!this.avatarInterrupted && this.audioBuffer) { + this.audioBuffer.notifyPlaybackFinished(this.playbackPosition, false); + this.playbackPosition = 0; + this.audioPlaying = false; + } + } + + /** + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 324-327 lines + */ + private handleAgentSpeakStarted(): void { + this.avatarSpeaking = true; + this.avatarInterrupted = false; + this.audioBuffer?.notifyPlaybackStarted(); + } +} diff --git a/plugins/liveavatar/src/index.ts b/plugins/liveavatar/src/index.ts new file mode 100644 index 000000000..46567f780 --- /dev/null +++ b/plugins/liveavatar/src/index.ts @@ -0,0 +1,19 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { Plugin } from '@livekit/agents'; + +export * from './api.js'; +export * from './avatar.js'; + +class LiveAvatarPlugin extends Plugin { + constructor() { + super({ + title: 'liveavatar', + version: __PACKAGE_VERSION__, + package: __PACKAGE_NAME__, + }); + } +} + +Plugin.registerPlugin(new LiveAvatarPlugin()); diff --git a/plugins/liveavatar/src/log.ts b/plugins/liveavatar/src/log.ts new file mode 100644 index 000000000..08acfaf3e --- /dev/null +++ b/plugins/liveavatar/src/log.ts @@ -0,0 +1,7 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { log as agentsLog } from '@livekit/agents'; +import type { Logger } from 'pino'; + +export const log = (): Logger => agentsLog().child({ plugin: 'liveavatar' }); diff --git a/plugins/liveavatar/tsconfig.json b/plugins/liveavatar/tsconfig.json new file mode 100644 index 000000000..46d415de9 --- /dev/null +++ b/plugins/liveavatar/tsconfig.json @@ -0,0 +1,15 @@ +{ + "extends": "../../tsconfig.json", + "include": ["./src"], + "compilerOptions": { + "rootDir": "./src", + "declarationDir": "./dist", + "outDir": "./dist" + }, + "typedocOptions": { + "name": "plugins/agents-plugin-liveavatar", + "entryPointStrategy": "resolve", + "readme": "none", + "entryPoints": ["src/index.ts"] + } +} diff --git a/plugins/liveavatar/tsup.config.ts b/plugins/liveavatar/tsup.config.ts new file mode 100644 index 000000000..46011fa8c --- /dev/null +++ b/plugins/liveavatar/tsup.config.ts @@ -0,0 +1,9 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { defineConfig } from 'tsup'; +import defaults from '../../tsup.config.js'; + +export default defineConfig({ + ...defaults, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ae1210ae1..8d0cca7ef 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -742,6 +742,37 @@ importers: specifier: ^5.0.0 version: 5.9.3 + plugins/liveavatar: + dependencies: + livekit-server-sdk: + specifier: ^2.13.3 + version: 2.14.1 + ws: + specifier: 'catalog:' + version: 8.20.0 + devDependencies: + '@livekit/agents': + specifier: workspace:* + version: link:../../agents + '@livekit/rtc-node': + specifier: 'catalog:' + version: 0.13.25 + '@microsoft/api-extractor': + specifier: ^7.35.0 + version: 7.43.7(@types/node@22.19.1) + '@types/ws': + specifier: 'catalog:' + version: 8.5.10 + pino: + specifier: ^8.19.0 + version: 8.21.0 + tsup: + specifier: ^8.3.5 + version: 8.4.0(@microsoft/api-extractor@7.43.7(@types/node@22.19.1))(postcss@8.5.9)(tsx@4.21.0)(typescript@5.9.3) + typescript: + specifier: ^5.0.0 + version: 5.9.3 + plugins/livekit: dependencies: '@huggingface/hub': diff --git a/turbo.json b/turbo.json index 113bda809..4742bab09 100644 --- a/turbo.json +++ b/turbo.json @@ -27,6 +27,9 @@ "LEMONSLICE_API_KEY", "LEMONSLICE_API_URL", "LEMONSLICE_IMAGE_URL", + "LIVEAVATAR_API_KEY", + "LIVEAVATAR_API_URL", + "LIVEAVATAR_AVATAR_ID", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET", "LIVEKIT_INFERENCE_API_KEY", From 40ba52c74b86eb9b8cdbde1fa21c8160f3d96d54 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Mon, 27 Apr 2026 22:14:41 -0700 Subject: [PATCH 2/9] Update plugins/liveavatar/README.md Co-authored-by: Tina Nguyen <72938484+tinalenguyen@users.noreply.github.com> --- plugins/liveavatar/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/liveavatar/README.md b/plugins/liveavatar/README.md index 934062f7c..6462d6527 100644 --- a/plugins/liveavatar/README.md +++ b/plugins/liveavatar/README.md @@ -8,7 +8,7 @@ SPDX-License-Identifier: Apache-2.0 Support for [LiveAvatar](https://www.liveavatar.com) interactive avatars. -This is the JS/TS port of the Python `livekit-plugins-liveavatar` plugin. See [https://docs.livekit.io/agents/integrations/avatar/](https://docs.livekit.io/agents/integrations/avatar/) for more information. +This is the JS/TS port of the Python `livekit-plugins-liveavatar` plugin. See [https://docs.livekit.io/agents/models/avatar/plugins/liveavatar/](https://docs.livekit.io/agents/models/avatar/plugins/liveavatar/) for more information. ## Installation From e71d5e8724e1dbfc105a54637dfed669a8590523 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Mon, 27 Apr 2026 22:15:37 -0700 Subject: [PATCH 3/9] Update .changeset/liveavatar-plugin.md Co-authored-by: Tina Nguyen <72938484+tinalenguyen@users.noreply.github.com> --- .changeset/liveavatar-plugin.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/liveavatar-plugin.md b/.changeset/liveavatar-plugin.md index fc1c185c2..9ad039c66 100644 --- a/.changeset/liveavatar-plugin.md +++ b/.changeset/liveavatar-plugin.md @@ -1,5 +1,5 @@ --- -'@livekit/agents-plugin-liveavatar': minor +'@livekit/agents-plugin-liveavatar': patch '@livekit/agents': minor --- From af8b8edb172f4992b4897e3cdd2bbe34fa2d8843 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 28 Apr 2026 05:24:28 +0000 Subject: [PATCH 4/9] review: lift QueueAudioOutput to core, harden mainTask, fix retry semantics Addresses review feedback on #1324: - Move `QueueAudioOutput` and `AudioSegmentEnd` from the plugin into `agents/src/voice/avatar/queue_io.ts` so they're reusable by other avatar plugins (mirrors Python's `livekit.agents.voice.avatar` layout). The plugin now consumes `voice.QueueAudioOutput` / `voice.AudioSegmentEnd`. - Move the WebSocket open handshake inside `mainTask`'s try/catch/finally so a DNS / TLS / connect failure is routed through the same cleanup path as runtime errors (no more unhandled rejection at spawn time). Also attach a `.catch` at the spawn site as a defense-in-depth net. - Treat `connOptions.maxRetry` as the number of retries on top of the initial attempt: the post loop now uses `i <= maxRetry` so a single attempt still fires when `maxRetry: 0`, matching the convention used by other agents-js plugins (lemonslice, runway). - Mark the `@livekit/agents` changeset bump as `patch` to match the plugin's bump under the project's fixed-versioning policy. https://claude.ai/code/session_01DE5pBrf3y1bFgLTK8NDTkB --- .changeset/liveavatar-plugin.md | 2 +- agents/src/voice/avatar/index.ts | 1 + agents/src/voice/avatar/queue_io.ts | 107 +++++++ plugins/liveavatar/src/api.ts | 10 +- plugins/liveavatar/src/avatar.ts | 443 ++++++++++++---------------- 5 files changed, 310 insertions(+), 253 deletions(-) create mode 100644 agents/src/voice/avatar/queue_io.ts diff --git a/.changeset/liveavatar-plugin.md b/.changeset/liveavatar-plugin.md index 9ad039c66..041677f0d 100644 --- a/.changeset/liveavatar-plugin.md +++ b/.changeset/liveavatar-plugin.md @@ -1,6 +1,6 @@ --- '@livekit/agents-plugin-liveavatar': patch -'@livekit/agents': minor +'@livekit/agents': patch --- Port the `liveavatar` plugin from the Python `livekit-agents` repo, including the new `videoQuality` parameter from livekit/agents#5552. diff --git a/agents/src/voice/avatar/index.ts b/agents/src/voice/avatar/index.ts index f9d316a17..85c42287d 100644 --- a/agents/src/voice/avatar/index.ts +++ b/agents/src/voice/avatar/index.ts @@ -2,3 +2,4 @@ // // SPDX-License-Identifier: Apache-2.0 export * from './datastream_io.js'; +export * from './queue_io.js'; diff --git a/agents/src/voice/avatar/queue_io.ts b/agents/src/voice/avatar/queue_io.ts new file mode 100644 index 000000000..039811a8e --- /dev/null +++ b/agents/src/voice/avatar/queue_io.ts @@ -0,0 +1,107 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { AudioFrame } from '@livekit/rtc-node'; +import { type StreamChannel, createStreamChannel } from '../../stream/stream_channel.js'; +import { AudioOutput } from '../io.js'; + +/** + * Sentinel value pushed onto the {@link QueueAudioOutput} stream when a segment of agent + * speech has finished (i.e. `flush()` was called). Consumers should treat it as an + * end-of-segment marker and emit the corresponding "speech ended" signal on their + * downstream protocol (websocket, RPC, etc.). + * + * Ref: python livekit-agents/livekit/agents/voice/avatar/_types.py (AudioSegmentEnd) + */ +export class AudioSegmentEnd {} + +/** + * One value emitted by {@link QueueAudioOutput.stream}: either an {@link AudioFrame} + * or an {@link AudioSegmentEnd} sentinel marking the end of a speech segment. + */ +export type QueueAudioOutputItem = AudioFrame | AudioSegmentEnd; + +/** + * AudioOutput implementation that buffers agent speech frames into a stream/queue so + * they can be consumed by an external transport (e.g. a custom websocket protocol used + * by an avatar plugin). Frames captured via {@link captureFrame} flow through the + * underlying stream as-is; on {@link flush} an {@link AudioSegmentEnd} sentinel is + * appended; on {@link clearBuffer} a `'clear_buffer'` event is emitted so the consumer + * can drop any in-flight bytes and notify upstream of an interruption. + * + * Mirrors Python's `livekit.agents.voice.avatar.QueueAudioOutput`. + * + * Ref: python livekit-agents/livekit/agents/voice/avatar/_queue_io.py + */ +export class QueueAudioOutput extends AudioOutput { + private readonly channel: StreamChannel = + createStreamChannel(); + private startedSegment = false; + + constructor(sampleRate?: number) { + super(sampleRate, undefined, { pause: false }); + } + + /** + * Returns the underlying readable stream of audio frames + end-of-segment sentinels. + * + * Each call returns the same shared stream; do not split between concurrent readers. + */ + stream(): ReturnType['stream']> { + return this.channel.stream(); + } + + /** True once {@link aclose} has been called. */ + get closed(): boolean { + return this.channel.closed; + } + + override async captureFrame(frame: AudioFrame): Promise { + await super.captureFrame(frame); + this.startedSegment = true; + if (!this.channel.closed) { + await this.channel.write(frame); + } + } + + override flush(): void { + super.flush(); + if (this.startedSegment && !this.channel.closed) { + // Best-effort write — the consumer drains on its own loop. + void this.channel.write(new AudioSegmentEnd()).catch(() => { + // channel closed concurrently; safe to drop the sentinel. + }); + this.startedSegment = false; + } + } + + override clearBuffer(): void { + this.emit('clear_buffer'); + this.startedSegment = false; + } + + /** Close the underlying stream so consumers see a graceful end-of-stream. */ + async aclose(): Promise { + if (!this.channel.closed) { + await this.channel.close(); + } + } + + /** + * Convenience wrapper around {@link AudioOutput.onPlaybackStarted} so a remote + * transport can announce "first byte played" without needing access to the + * protected method. + */ + notifyPlaybackStarted(createdAt: number = Date.now()): void { + this.onPlaybackStarted(createdAt); + } + + /** + * Convenience wrapper around {@link AudioOutput.onPlaybackFinished} so a remote + * transport can announce segment completion (or interruption) without needing + * access to the protected method. + */ + notifyPlaybackFinished(playbackPosition: number, interrupted: boolean): void { + this.onPlaybackFinished({ playbackPosition, interrupted }); + } +} diff --git a/plugins/liveavatar/src/api.ts b/plugins/liveavatar/src/api.ts index e206be634..d7005599b 100644 --- a/plugins/liveavatar/src/api.ts +++ b/plugins/liveavatar/src/api.ts @@ -165,7 +165,11 @@ export class LiveAvatarAPI { ): Promise { const url = this.apiUrl + endpoint; const maxRetry = this.connOptions.maxRetry; - for (let i = 0; i < maxRetry; i++) { + // `maxRetry` is the number of retries on top of the initial attempt, so we + // run up to `maxRetry + 1` total attempts. This matches the convention used + // by other agents-js plugins (e.g. lemonslice/runway) and ensures a single + // attempt still fires when callers configure `maxRetry: 0`. + for (let i = 0; i <= maxRetry; i++) { try { const response = await fetch(url, { method: 'POST', @@ -191,12 +195,12 @@ export class LiveAvatarAPI { ); } - if (i < maxRetry - 1) { + if (i < maxRetry) { await new Promise((resolve) => setTimeout(resolve, this.connOptions.retryIntervalMs)); } } throw new APIConnectionError({ - message: `Failed to call LiveAvatar API after ${maxRetry} retries`, + message: `Failed to call LiveAvatar API after ${maxRetry + 1} attempts`, }); } } diff --git a/plugins/liveavatar/src/avatar.ts b/plugins/liveavatar/src/avatar.ts index 88840bcc9..974a9b188 100644 --- a/plugins/liveavatar/src/avatar.ts +++ b/plugins/liveavatar/src/avatar.ts @@ -26,73 +26,6 @@ const SUBSEQUENT_CHUNK_THRESHOLD_MS = 1_000; const AVATAR_AGENT_IDENTITY = 'liveavatar-avatar-agent'; const AVATAR_AGENT_NAME = 'liveavatar-avatar-agent'; -/** - * Sentinel pushed onto the audio queue to mark the end of an agent speech segment. - * - * Ref: python livekit-agents/livekit/agents/voice/avatar/_types.py (AudioSegmentEnd) - */ -class AudioSegmentEnd {} - -type AudioQueueItem = AudioFrame | AudioSegmentEnd; - -/** - * AudioOutput that captures agent speech frames into a queue for the LiveAvatar - * websocket forwarder to consume. Mirrors Python's `QueueAudioOutput` in spirit, - * but with a single queue + segment-end sentinel rather than a typed queue. - * - * Ref: python livekit-agents/livekit/agents/voice/avatar/_queue_io.py - */ -class QueueAudioOutput extends voice.AudioOutput { - private channel: streamNs.StreamChannel = streamNs.createStreamChannel(); - private startedSegment = false; - - constructor(sampleRate: number) { - super(sampleRate, undefined, { pause: false }); - } - - /** Returns the underlying readable stream of audio frames + segment-end sentinels. */ - stream(): ReturnType['stream']> { - return this.channel.stream(); - } - - override async captureFrame(frame: AudioFrame): Promise { - await super.captureFrame(frame); - this.startedSegment = true; - if (!this.channel.closed) { - await this.channel.write(frame); - } - } - - override flush(): void { - super.flush(); - if (this.startedSegment && !this.channel.closed) { - // Best-effort write — the consumer will drain on its own loop. - void this.channel.write(new AudioSegmentEnd()); - this.startedSegment = false; - } - } - - override clearBuffer(): void { - this.emit('clear_buffer'); - this.startedSegment = false; - } - - async aclose(): Promise { - if (!this.channel.closed) { - await this.channel.close(); - } - } - - // Convenience helpers exposed for the AvatarSession driver. - notifyPlaybackStarted(createdAt: number = Date.now()): void { - this.onPlaybackStarted(createdAt); - } - - notifyPlaybackFinished(playbackPosition: number, interrupted: boolean): void { - this.onPlaybackFinished({ playbackPosition, interrupted }); - } -} - /** * Options for configuring an AvatarSession. * @@ -162,7 +95,7 @@ export class AvatarSession { private sessionToken: string | null = null; private wsUrl: string | null = null; - private audioBuffer?: QueueAudioOutput; + private audioBuffer?: voice.QueueAudioOutput; private msgChannel?: streamNs.StreamChannel>; private msgChannelClosed = false; private mainTaskPromise?: Promise; @@ -289,11 +222,16 @@ export class AvatarSession { }); // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 166-173 lines - this.audioBuffer = new QueueAudioOutput(SAMPLE_RATE); + this.audioBuffer = new voice.QueueAudioOutput(SAMPLE_RATE); this.audioBuffer.on('clear_buffer', () => this.onClearBuffer()); agentSession.output.audio = this.audioBuffer; - this.mainTaskPromise = this.mainTask(); + // Spawn the main task with an attached error handler so a websocket open or + // protocol failure does not surface as an unhandled rejection. The main task + // itself handles its own cleanup in finally. + this.mainTaskPromise = this.mainTask().catch((e) => { + this.#logger.warn({ error: String(e) }, 'LiveAvatar main task failed'); + }); // Best-effort cleanup on job shutdown. try { @@ -394,206 +332,210 @@ export class AvatarSession { if (!this.wsUrl) { throw new LiveAvatarException('ws_url not set'); } - const ws = new WebSocket(this.wsUrl); - await new Promise((resolve, reject) => { - ws.once('open', resolve); - ws.once('error', reject); - }); + let ws: WebSocket | null = null; let resetKeepAlive = () => {}; const closingResolver = new Future(); - const forwardAudio = async (): Promise => { - if (!this.audioBuffer) return; - await this.sessionConnectedFuture.await; - - let chunkBuf: Uint8Array[] = []; - let chunkDurationMs = 0; - let isFirstChunk = true; - - const flushChunk = () => { - if (chunkBuf.length === 0) return; - const total = chunkBuf.reduce((acc, c) => acc + c.length, 0); - const merged = new Uint8Array(total); - let offset = 0; - for (const c of chunkBuf) { - merged.set(c, offset); - offset += c.length; - } - const encoded = Buffer.from(merged).toString('base64'); - this.sendEvent({ type: 'agent.speak', event_id: shortuuid(), audio: encoded }); - this.playbackPosition += chunkDurationMs / 1000; - chunkBuf = []; - chunkDurationMs = 0; - isFirstChunk = false; - }; - - const discardChunk = () => { - chunkBuf = []; - chunkDurationMs = 0; - isFirstChunk = true; - }; - - const reader = this.audioBuffer.stream().getReader(); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - if (this.chunkInterrupted) { - this.chunkInterrupted = false; - discardChunk(); - } - - if (value instanceof AudioSegmentEnd) { - flushChunk(); - this.sendEvent({ type: 'agent.speak_end', event_id: shortuuid() }); - this.sendEvent({ type: 'agent.start_listening', event_id: shortuuid() }); - isFirstChunk = true; - continue; + try { + // Open the websocket inside the guarded section so DNS/TLS/network failures + // are routed through the same cleanup path as runtime errors. + const wsRef = new WebSocket(this.wsUrl); + await new Promise((resolve, reject) => { + wsRef.once('open', resolve); + wsRef.once('error', reject); + }); + ws = wsRef; + + const forwardAudio = async (): Promise => { + if (!this.audioBuffer) return; + await this.sessionConnectedFuture.await; + + let chunkBuf: Uint8Array[] = []; + let chunkDurationMs = 0; + let isFirstChunk = true; + + const flushChunk = () => { + if (chunkBuf.length === 0) return; + const total = chunkBuf.reduce((acc, c) => acc + c.length, 0); + const merged = new Uint8Array(total); + let offset = 0; + for (const c of chunkBuf) { + merged.set(c, offset); + offset += c.length; } + const encoded = Buffer.from(merged).toString('base64'); + this.sendEvent({ type: 'agent.speak', event_id: shortuuid(), audio: encoded }); + this.playbackPosition += chunkDurationMs / 1000; + chunkBuf = []; + chunkDurationMs = 0; + isFirstChunk = false; + }; + + const discardChunk = () => { + chunkBuf = []; + chunkDurationMs = 0; + isFirstChunk = true; + }; + + const reader = this.audioBuffer.stream().getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (this.chunkInterrupted) { + this.chunkInterrupted = false; + discardChunk(); + } - if (!this.audioPlaying) { - this.audioPlaying = true; - } - for (const resampled of this.resampleAudio(value)) { - chunkBuf.push(new Uint8Array(resampled.data.buffer)); - const frameDurationMs = (resampled.samplesPerChannel / resampled.sampleRate) * 1000; - chunkDurationMs += frameDurationMs; - const thresholdMs = isFirstChunk - ? FIRST_CHUNK_THRESHOLD_MS - : SUBSEQUENT_CHUNK_THRESHOLD_MS; - if (chunkDurationMs >= thresholdMs) { + if (value instanceof voice.AudioSegmentEnd) { flushChunk(); + this.sendEvent({ type: 'agent.speak_end', event_id: shortuuid() }); + this.sendEvent({ type: 'agent.start_listening', event_id: shortuuid() }); + isFirstChunk = true; + continue; + } + + if (!this.audioPlaying) { + this.audioPlaying = true; + } + for (const resampled of this.resampleAudio(value)) { + chunkBuf.push(new Uint8Array(resampled.data.buffer)); + const frameDurationMs = (resampled.samplesPerChannel / resampled.sampleRate) * 1000; + chunkDurationMs += frameDurationMs; + const thresholdMs = isFirstChunk + ? FIRST_CHUNK_THRESHOLD_MS + : SUBSEQUENT_CHUNK_THRESHOLD_MS; + if (chunkDurationMs >= thresholdMs) { + flushChunk(); + } } } + } finally { + reader.releaseLock(); } - } finally { - reader.releaseLock(); - } - }; + }; - const sendTask = async (): Promise => { - if (!this.msgChannel) return; - const reader = this.msgChannel.stream().getReader(); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; + const sendTask = async (): Promise => { + if (!this.msgChannel) return; + const reader = this.msgChannel.stream().getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + try { + wsRef.send(JSON.stringify(value)); + resetKeepAlive(); + } catch (e) { + this.#logger.warn({ error: String(e) }, 'failed to send LiveAvatar event'); + break; + } + } + } finally { + reader.releaseLock(); try { - ws.send(JSON.stringify(value)); - resetKeepAlive(); - } catch (e) { - this.#logger.warn({ error: String(e) }, 'failed to send LiveAvatar event'); - break; + wsRef.close(); + } catch { + // ignore } + this.closing = true; + closingResolver.resolve(); } - } finally { - reader.releaseLock(); - try { - ws.close(); - } catch { - // ignore - } - this.closing = true; - closingResolver.resolve(); - } - }; - - const recvTask = async (): Promise => { - const messages: Promise[] = []; - const queue: RawData[] = []; - let waiter: ((m: RawData) => void) | null = null; - - ws.on('message', (data: RawData) => { - if (waiter) { - const w = waiter; - waiter = null; - w(data); - } else { - queue.push(data); - } - }); - const closedFuture = new Future(); - ws.on('close', () => closedFuture.resolve()); - ws.on('error', () => closedFuture.resolve()); - - const nextMessage = (): Promise => - new Promise((resolve) => { - if (queue.length > 0) { - resolve(queue.shift()!); - return; + }; + + const recvTask = async (): Promise => { + const queue: RawData[] = []; + let waiter: ((m: RawData) => void) | null = null; + + wsRef.on('message', (data: RawData) => { + if (waiter) { + const w = waiter; + waiter = null; + w(data); + } else { + queue.push(data); } - waiter = (m: RawData) => resolve(m); - void closedFuture.await.then(() => { - if (waiter) { - waiter = null; - resolve(null); + }); + const closedFuture = new Future(); + wsRef.on('close', () => closedFuture.resolve()); + wsRef.on('error', () => closedFuture.resolve()); + + const nextMessage = (): Promise => + new Promise((resolve) => { + if (queue.length > 0) { + resolve(queue.shift()!); + return; } + waiter = (m: RawData) => resolve(m); + void closedFuture.await.then(() => { + if (waiter) { + waiter = null; + resolve(null); + } + }); }); - }); - while (true) { - const msg = await nextMessage(); - if (msg === null) { - if (this.closing) return; - if (this.isSandbox) { - this.#logger.warn('The LiveAvatar Sandbox connection surpassed the 1 minute limit'); - return; + while (true) { + const msg = await nextMessage(); + if (msg === null) { + if (this.closing) return; + if (this.isSandbox) { + this.#logger.warn('The LiveAvatar Sandbox connection surpassed the 1 minute limit'); + return; + } + throw new APIConnectionError({ + message: 'LiveAvatar connection closed unexpectedly.', + }); } - throw new APIConnectionError({ message: 'LiveAvatar connection closed unexpectedly.' }); - } - let parsed: { type?: string; state?: string }; - try { - parsed = JSON.parse(msg.toString()) as { type?: string; state?: string }; - } catch { - continue; - } - switch (parsed.type) { - case 'session.state_updated': - this.#logger.debug({ state: parsed.state }, 'LiveAvatar session state'); - if (parsed.state === 'connected') { - if (!this.sessionConnectedFuture.done) { - this.sessionConnectedFuture.resolve(); + let parsed: { type?: string; state?: string }; + try { + parsed = JSON.parse(msg.toString()) as { type?: string; state?: string }; + } catch { + continue; + } + switch (parsed.type) { + case 'session.state_updated': + this.#logger.debug({ state: parsed.state }, 'LiveAvatar session state'); + if (parsed.state === 'connected') { + if (!this.sessionConnectedFuture.done) { + this.sessionConnectedFuture.resolve(); + } } - } - break; - case 'agent.speak_interrupted': - this.handleAgentSpeakInterrupted(); - break; - case 'agent.speak_ended': - this.handleAgentSpeakEnded(); - break; - case 'agent.speak_started': - this.handleAgentSpeakStarted(); - break; - default: - this.#logger.debug({ type: parsed.type }, 'Unhandled LiveAvatar event'); + break; + case 'agent.speak_interrupted': + this.handleAgentSpeakInterrupted(); + break; + case 'agent.speak_ended': + this.handleAgentSpeakEnded(); + break; + case 'agent.speak_started': + this.handleAgentSpeakStarted(); + break; + default: + this.#logger.debug({ type: parsed.type }, 'Unhandled LiveAvatar event'); + } } - } - // unreachable - void messages; - }; - - const keepAliveTask = async (): Promise => { - await this.sessionConnectedFuture.await; - let timer: NodeJS.Timeout | null = null; - const tick = () => { - if (this.closing) return; - this.sendEvent({ type: 'session.keep_alive', event_id: shortuuid() }); - timer = setTimeout(tick, KEEP_ALIVE_INTERVAL_MS); }; - timer = setTimeout(tick, KEEP_ALIVE_INTERVAL_MS); - resetKeepAlive = () => { - if (timer) clearTimeout(timer); - if (!this.closing) { + + const keepAliveTask = async (): Promise => { + await this.sessionConnectedFuture.await; + let timer: NodeJS.Timeout | null = null; + const tick = () => { + if (this.closing) return; + this.sendEvent({ type: 'session.keep_alive', event_id: shortuuid() }); timer = setTimeout(tick, KEEP_ALIVE_INTERVAL_MS); - } + }; + timer = setTimeout(tick, KEEP_ALIVE_INTERVAL_MS); + resetKeepAlive = () => { + if (timer) clearTimeout(timer); + if (!this.closing) { + timer = setTimeout(tick, KEEP_ALIVE_INTERVAL_MS); + } + }; + await closingResolver.await; + if (timer) clearTimeout(timer); }; - await closingResolver.await; - if (timer) clearTimeout(timer); - }; - try { await Promise.race([forwardAudio(), sendTask(), recvTask(), keepAliveTask()]); } catch (e) { this.#logger.warn({ error: String(e) }, 'LiveAvatar main task error'); @@ -604,6 +546,7 @@ export class AvatarSession { try { if (this.sessionId && this.sessionToken) { const data = await this.api.stopStreamingSession(this.sessionId, this.sessionToken); + // Mirrors python livekit-plugins-liveavatar/.../avatar.py - 304 line if (data.code <= 200) { this.#logger.info({ sessionId: this.sessionId }, 'LiveAvatar session stopped'); } @@ -623,10 +566,12 @@ export class AvatarSession { } this.audioResampler = null; } - try { - ws.close(); - } catch { - // ignore + if (ws) { + try { + ws.close(); + } catch { + // ignore + } } } } From 651e3de0e509a34354f554336cea3ceed8dd5cf8 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 28 Apr 2026 05:41:58 +0000 Subject: [PATCH 5/9] review: fix onClearBuffer race and stale-frame leak after interrupt Addresses two new Devin findings on #1324: 1. **Race condition in `onClearBuffer` (P-red)**. `super.captureFrame` synchronously increments `playbackSegmentsCount`, but `audioPlaying` was only flipped asynchronously inside `forwardAudio` after a frame was actually drained. An interrupt that landed in that window gated `notifyPlaybackFinished` away, leaving the base class's segment count permanently unbalanced and stalling `waitForPlayout()` forever. Fix: `QueueAudioOutput.clearBuffer` now emits the `'clear_buffer'` event with a `{ wasCapturing }` payload set synchronously from `startedSegment`. The plugin gates `notifyPlaybackFinished` on the payload, eliminating the race. New `QueueAudioOutputClearEvent` type exported alongside `QueueAudioOutput`. 2. **Stale audio frame leak after interrupt (P-yellow)**. After the first frame triggered `discardChunk`, subsequent frames already queued from the interrupted segment fell through and were sent to the avatar, bleeding into the next segment's audio. Fix: introduce an `interruptDraining` flag in the `forwardAudio` loop. Set it whenever `chunkInterrupted` is consumed; drop every frame while it's true; clear it on the next `AudioSegmentEnd` (the interrupted segment has fully drained at that point) so the next segment starts clean. https://claude.ai/code/session_01DE5pBrf3y1bFgLTK8NDTkB --- agents/src/voice/avatar/queue_io.ts | 27 +++++++++++++++++++++--- plugins/liveavatar/src/avatar.ts | 32 +++++++++++++++++++++++++---- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/agents/src/voice/avatar/queue_io.ts b/agents/src/voice/avatar/queue_io.ts index 039811a8e..53a74e32e 100644 --- a/agents/src/voice/avatar/queue_io.ts +++ b/agents/src/voice/avatar/queue_io.ts @@ -21,13 +21,30 @@ export class AudioSegmentEnd {} */ export type QueueAudioOutputItem = AudioFrame | AudioSegmentEnd; +/** + * Payload emitted with the {@link QueueAudioOutput} `'clear_buffer'` event. + * + * `wasCapturing` is set synchronously inside {@link QueueAudioOutput.clearBuffer} + * based on whether {@link QueueAudioOutput.captureFrame} had been called for the + * current segment. Consumers should use this — not their own asynchronous + * "is the avatar speaking?" flag — to decide whether to call + * {@link QueueAudioOutput.notifyPlaybackFinished}, otherwise an interrupt that + * lands in the window between `captureFrame` and the consumer's reader can leak + * `playbackSegmentsCount > playbackFinishedCount` and deadlock + * {@link AudioOutput.waitForPlayout}. + */ +export interface QueueAudioOutputClearEvent { + wasCapturing: boolean; +} + /** * AudioOutput implementation that buffers agent speech frames into a stream/queue so * they can be consumed by an external transport (e.g. a custom websocket protocol used * by an avatar plugin). Frames captured via {@link captureFrame} flow through the * underlying stream as-is; on {@link flush} an {@link AudioSegmentEnd} sentinel is - * appended; on {@link clearBuffer} a `'clear_buffer'` event is emitted so the consumer - * can drop any in-flight bytes and notify upstream of an interruption. + * appended; on {@link clearBuffer} a `'clear_buffer'` event is emitted with a + * {@link QueueAudioOutputClearEvent} payload so the consumer can drop any in-flight + * bytes and notify upstream of an interruption. * * Mirrors Python's `livekit.agents.voice.avatar.QueueAudioOutput`. * @@ -76,8 +93,12 @@ export class QueueAudioOutput extends AudioOutput { } override clearBuffer(): void { - this.emit('clear_buffer'); + // Capture the in-flight state synchronously so consumers can race-free decide + // whether to fire `notifyPlaybackFinished` (and avoid leaking the base class's + // `playbackSegmentsCount > playbackFinishedCount` bookkeeping). + const wasCapturing = this.startedSegment; this.startedSegment = false; + this.emit('clear_buffer', { wasCapturing } satisfies QueueAudioOutputClearEvent); } /** Close the underlying stream so consumers see a graceful end-of-stream. */ diff --git a/plugins/liveavatar/src/avatar.ts b/plugins/liveavatar/src/avatar.ts index 974a9b188..92f392a4a 100644 --- a/plugins/liveavatar/src/avatar.ts +++ b/plugins/liveavatar/src/avatar.ts @@ -223,7 +223,9 @@ export class AvatarSession { // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 166-173 lines this.audioBuffer = new voice.QueueAudioOutput(SAMPLE_RATE); - this.audioBuffer.on('clear_buffer', () => this.onClearBuffer()); + this.audioBuffer.on('clear_buffer', (ev: voice.QueueAudioOutputClearEvent) => + this.onClearBuffer(ev), + ); agentSession.output.audio = this.audioBuffer; // Spawn the main task with an attached error handler so a websocket open or @@ -285,13 +287,20 @@ export class AvatarSession { /** * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 180-196 lines + * + * Gates `notifyPlaybackFinished` on the `wasCapturing` flag carried by the + * `clear_buffer` event (set synchronously inside `QueueAudioOutput.clearBuffer`) + * rather than the asynchronously-set `audioPlaying` field. The async flag races + * with the channel reader: `super.captureFrame` increments + * `playbackSegmentsCount` before `forwardAudio` reads the frame and flips + * `audioPlaying = true`, so an interrupt landing in that window would have + * skipped `notifyPlaybackFinished` and stalled `waitForPlayout()` forever. */ - private onClearBuffer(): void { + private onClearBuffer(ev: voice.QueueAudioOutputClearEvent): void { this.chunkInterrupted = true; - const wasPlaying = this.audioPlaying; this.audioPlaying = false; - if (wasPlaying && this.audioBuffer) { + if (ev.wasCapturing && this.audioBuffer) { this.audioBuffer.notifyPlaybackFinished(this.playbackPosition, true); if (this.avatarSpeaking) { this.sendEvent({ type: 'agent.interrupt', event_id: shortuuid() }); @@ -354,6 +363,10 @@ export class AvatarSession { let chunkBuf: Uint8Array[] = []; let chunkDurationMs = 0; let isFirstChunk = true; + // True between an interrupt and the next AudioSegmentEnd: drops any + // frames that were already queued from the interrupted segment so + // they don't bleed into the next segment's first chunk. + let interruptDraining = false; const flushChunk = () => { if (chunkBuf.length === 0) return; @@ -386,9 +399,13 @@ export class AvatarSession { if (this.chunkInterrupted) { this.chunkInterrupted = false; discardChunk(); + interruptDraining = true; } if (value instanceof voice.AudioSegmentEnd) { + // The interrupted segment has fully drained; resume normal + // operation for the next segment. + interruptDraining = false; flushChunk(); this.sendEvent({ type: 'agent.speak_end', event_id: shortuuid() }); this.sendEvent({ type: 'agent.start_listening', event_id: shortuuid() }); @@ -396,6 +413,13 @@ export class AvatarSession { continue; } + if (interruptDraining) { + // Drop any frame from the interrupted segment that was already + // sitting in the channel; they would otherwise leak into the + // next segment's audio. + continue; + } + if (!this.audioPlaying) { this.audioPlaying = true; } From f518785112ea309c72c2f087c3f7a8874fca475d Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 28 Apr 2026 05:59:36 +0000 Subject: [PATCH 6/9] review: write AudioSegmentEnd from clearBuffer to unstick interruptDraining MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses Devin's follow-up finding on #1324: the previous fix exposed a downstream bug where the entire next segment's audio would be silently dropped after an interrupt. Repro: TTS speaks → user interrupts → AgentSession calls `audioOutput.clearBuffer()` (which clears `startedSegment`) and then the forward-audio task's `finally` block calls `audioOutput.flush()`. With `startedSegment` already false, the `flush()` no-ops and never writes the `AudioSegmentEnd` sentinel. The avatar plugin's `forwardAudio` loop is now stuck with `interruptDraining=true` (only reset on the next sentinel) and silently drops every frame of the next reply until that segment's own `flush()` writes its boundary. Two-part fix: - `QueueAudioOutput.clearBuffer()` now writes an `AudioSegmentEnd` sentinel itself when `wasCapturing` was true, guaranteeing every in-flight segment gets a closing boundary regardless of whether the producer follows up with a `flush()`. The subsequent `flush()` is a no-op for the sentinel write since `startedSegment` is already false. - The plugin's `onClearBuffer` now early-returns when `wasCapturing` is false, so a `clearBuffer` that arrives after the segment already ended (e.g. after a normal `flush()`) doesn't leak `chunkInterrupted` into the next segment's first frame. https://claude.ai/code/session_01DE5pBrf3y1bFgLTK8NDTkB --- agents/src/voice/avatar/queue_io.ts | 13 +++++++++++++ plugins/liveavatar/src/avatar.ts | 28 ++++++++++++++++++---------- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/agents/src/voice/avatar/queue_io.ts b/agents/src/voice/avatar/queue_io.ts index 53a74e32e..ee508f9c5 100644 --- a/agents/src/voice/avatar/queue_io.ts +++ b/agents/src/voice/avatar/queue_io.ts @@ -98,6 +98,19 @@ export class QueueAudioOutput extends AudioOutput { // `playbackSegmentsCount > playbackFinishedCount` bookkeeping). const wasCapturing = this.startedSegment; this.startedSegment = false; + // Always close an in-flight segment with an AudioSegmentEnd sentinel. + // The producer (e.g. the AgentSession forward-audio task) typically calls + // `flush()` *after* `clearBuffer()` once the segment is aborted, but at + // that point `startedSegment` is already `false` and `flush()` is a no-op + // for the sentinel write. Without writing it here, a downstream consumer + // that uses the sentinel as a "drop stale frames" reset signal would + // never see a boundary for the interrupted segment and would silently + // drop the entire next segment's audio. + if (wasCapturing && !this.channel.closed) { + void this.channel.write(new AudioSegmentEnd()).catch(() => { + // channel closed concurrently; safe to drop the sentinel. + }); + } this.emit('clear_buffer', { wasCapturing } satisfies QueueAudioOutputClearEvent); } diff --git a/plugins/liveavatar/src/avatar.ts b/plugins/liveavatar/src/avatar.ts index 92f392a4a..e154ce7bf 100644 --- a/plugins/liveavatar/src/avatar.ts +++ b/plugins/liveavatar/src/avatar.ts @@ -288,19 +288,27 @@ export class AvatarSession { /** * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 180-196 lines * - * Gates `notifyPlaybackFinished` on the `wasCapturing` flag carried by the - * `clear_buffer` event (set synchronously inside `QueueAudioOutput.clearBuffer`) - * rather than the asynchronously-set `audioPlaying` field. The async flag races - * with the channel reader: `super.captureFrame` increments - * `playbackSegmentsCount` before `forwardAudio` reads the frame and flips - * `audioPlaying = true`, so an interrupt landing in that window would have - * skipped `notifyPlaybackFinished` and stalled `waitForPlayout()` forever. + * Gates everything on the `wasCapturing` flag carried by the `clear_buffer` + * event (set synchronously inside `QueueAudioOutput.clearBuffer`): + * + * 1. `notifyPlaybackFinished` only fires when a segment was actually in + * flight, so the base class's segment-count bookkeeping stays balanced + * even when an interrupt lands in the window between `super.captureFrame` + * incrementing `playbackSegmentsCount` and `forwardAudio` consuming the + * frame. + * 2. `chunkInterrupted` is only flipped when there's an actual segment to + * interrupt. If `wasCapturing` is false (e.g. `clearBuffer` is called + * after `flush` has already written its `AudioSegmentEnd`), setting + * `chunkInterrupted` would otherwise carry over and discard the first + * frame of the *next* segment. */ private onClearBuffer(ev: voice.QueueAudioOutputClearEvent): void { - this.chunkInterrupted = true; this.audioPlaying = false; - - if (ev.wasCapturing && this.audioBuffer) { + if (!ev.wasCapturing) { + return; + } + this.chunkInterrupted = true; + if (this.audioBuffer) { this.audioBuffer.notifyPlaybackFinished(this.playbackPosition, true); if (this.avatarSpeaking) { this.sendEvent({ type: 'agent.interrupt', event_id: shortuuid() }); From 06e68c394ca2e01d9cbd3cb43c2e1f8b25cf94ec Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 29 Apr 2026 13:13:54 +0800 Subject: [PATCH 7/9] fix grant --- agents/src/voice/room_io/_output.ts | 7 +- examples/package.json | 1 + examples/src/liveavatar_avatar.ts | 102 ++++++++++++++++++++++++++++ plugins/liveavatar/src/avatar.ts | 13 +++- 4 files changed, 119 insertions(+), 4 deletions(-) create mode 100644 examples/src/liveavatar_avatar.ts diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index 01dd31625..eee18f40e 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -80,7 +80,12 @@ abstract class BaseParticipantTranscriptionOutput extends TextOutput { this.trackId = track.sid; }; - protected onLocalTrackPublished = (track: LocalTrackPublication) => { + protected onLocalTrackPublished = (track: LocalTrackPublication | undefined) => { + if (!track) { + this.logger.warn('LocalTrackPublished event without publication payload'); + return; + } + if ( !this.participantIdentity || this.participantIdentity !== this.room.localParticipant?.identity || diff --git a/examples/package.json b/examples/package.json index 4718e0797..b3fa4cec8 100644 --- a/examples/package.json +++ b/examples/package.json @@ -33,6 +33,7 @@ "@livekit/agents-plugin-google": "workspace:*", "@livekit/agents-plugin-inworld": "workspace:*", "@livekit/agents-plugin-lemonslice": "workspace:*", + "@livekit/agents-plugin-liveavatar": "workspace:*", "@livekit/agents-plugin-livekit": "workspace:*", "@livekit/agents-plugin-neuphonic": "workspace:*", "@livekit/agents-plugin-openai": "workspace:*", diff --git a/examples/src/liveavatar_avatar.ts b/examples/src/liveavatar_avatar.ts new file mode 100644 index 000000000..fe502d3b3 --- /dev/null +++ b/examples/src/liveavatar_avatar.ts @@ -0,0 +1,102 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { + type JobContext, + type JobProcess, + ServerOptions, + cli, + defineAgent, + inference, + log, + metrics, + voice, +} from '@livekit/agents'; +import * as liveavatar from '@livekit/agents-plugin-liveavatar'; +import * as livekit from '@livekit/agents-plugin-livekit'; +import * as silero from '@livekit/agents-plugin-silero'; +import { fileURLToPath } from 'node:url'; + +export default defineAgent({ + prewarm: async (proc: JobProcess) => { + proc.userData.vad = await silero.VAD.load(); + }, + entry: async (ctx: JobContext) => { + const logger = log().child({ example: 'liveavatar_avatar' }); + + const avatarId = process.env.LIVEAVATAR_AVATAR_ID || undefined; + const videoQuality: liveavatar.VideoQuality | undefined = undefined; + + const session = new voice.AgentSession({ + stt: new inference.STT({ + model: 'deepgram/nova-3', + language: 'en', + }), + llm: new inference.LLM({ + model: 'openai/gpt-4.1-mini', + }), + tts: new inference.TTS({ + model: 'cartesia/sonic-3', + voice: '9626c31c-bec5-4cca-baa8-f8ba9e84c8bc', + }), + turnDetection: new livekit.turnDetector.MultilingualModel(), + vad: ctx.proc.userData.vad! as silero.VAD, + voiceOptions: { + preemptiveGeneration: true, + }, + }); + + await ctx.connect(); + + await session.start({ + agent: new voice.Agent({ + instructions: + 'You are a helpful avatar assistant. Keep responses concise, friendly, and natural.', + }), + room: ctx.room, + outputOptions: { + syncTranscription: false, + }, + }); + + const avatar = new liveavatar.AvatarSession({ + avatarId, + videoQuality, + }); + await avatar.start(session, ctx.room); + + session.on(voice.AgentSessionEventTypes.AgentStateChanged, (ev) => { + logger.info({ oldState: ev.oldState, newState: ev.newState }, 'Agent state changed'); + }); + + session.on(voice.AgentSessionEventTypes.UserStateChanged, (ev) => { + logger.info({ oldState: ev.oldState, newState: ev.newState }, 'User state changed'); + }); + + session.on(voice.AgentSessionEventTypes.UserInputTranscribed, (ev) => { + logger.info( + { final: ev.isFinal, transcript: ev.transcript, language: ev.language }, + 'User transcript received', + ); + }); + + session.on(voice.AgentSessionEventTypes.MetricsCollected, (ev) => { + metrics.logMetrics(ev.metrics); + }); + + session.on(voice.AgentSessionEventTypes.Error, (ev) => { + logger.error({ error: ev.error, source: ev.source }, 'Session emitted error'); + }); + + ctx.addShutdownCallback(async () => { + logger.info({ usage: session.usage }, 'Session usage summary'); + }); + + session.generateReply({ + instructions: + 'Greet the user, tell them this is a LiveAvatar example, and ask them to interrupt you mid-sentence.', + }); + }, +}); + +cli.runApp(new ServerOptions({ agent: fileURLToPath(import.meta.url) })); diff --git a/plugins/liveavatar/src/avatar.ts b/plugins/liveavatar/src/avatar.ts index e154ce7bf..166b7d7ac 100644 --- a/plugins/liveavatar/src/avatar.ts +++ b/plugins/liveavatar/src/avatar.ts @@ -179,11 +179,18 @@ export class AvatarSession { name: this.avatarParticipantName, }); at.kind = 'agent'; - at.addGrant({ roomJoin: true, room: room.name } as VideoGrant); at.attributes = { [ATTRIBUTE_PUBLISH_ON_BEHALF]: this.localParticipantIdentity }; - const livekitToken = await at.toJwt(); + at.addGrant({ + roomJoin: true, + room: room.name, + canPublish: true, + canSubscribe: true, + canPublishData: true, + canUpdateOwnMetadata: true, + canSubscribeMetrics: true, + } as VideoGrant); - this.#logger.debug('starting avatar session'); + const livekitToken = await at.toJwt(); if (!this.avatarId) { throw new LiveAvatarException('avatar_id must be set'); From 4395205788674da3ec603e70aff601fb3d8f660e Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 29 Apr 2026 13:17:17 +0800 Subject: [PATCH 8/9] changes verified --- plugins/liveavatar/src/api.ts | 3 --- plugins/liveavatar/src/avatar.ts | 6 ------ 2 files changed, 9 deletions(-) diff --git a/plugins/liveavatar/src/api.ts b/plugins/liveavatar/src/api.ts index d7005599b..a010662b7 100644 --- a/plugins/liveavatar/src/api.ts +++ b/plugins/liveavatar/src/api.ts @@ -11,7 +11,6 @@ import { log } from './log.js'; export const DEFAULT_API_URL = 'https://api.liveavatar.com/v1/sessions'; -// Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 41 line export type VideoQuality = 'very_high' | 'high' | 'medium' | 'low'; /** @@ -30,7 +29,6 @@ export interface CreateStreamingSessionOptions { roomName: string; avatarId: string; isSandbox?: boolean; - // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py - 62 line videoQuality?: VideoQuality | null; } @@ -106,7 +104,6 @@ export class LiveAvatarAPI { livekit_config: livekitConfig, }; - // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py - 75-76 lines if (opts.videoQuality != null) { payload.video_quality = opts.videoQuality; } diff --git a/plugins/liveavatar/src/avatar.ts b/plugins/liveavatar/src/avatar.ts index 166b7d7ac..18c702e7d 100644 --- a/plugins/liveavatar/src/avatar.ts +++ b/plugins/liveavatar/src/avatar.ts @@ -121,7 +121,6 @@ export class AvatarSession { this.apiUrl = options.apiUrl; this.apiKey = options.apiKey ?? process.env.LIVEAVATAR_API_KEY ?? ''; this.isSandbox = options.isSandbox ?? false; - // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 75 line this.videoQuality = options.videoQuality ?? null; this.avatarParticipantIdentity = options.avatarParticipantIdentity || AVATAR_AGENT_IDENTITY; this.avatarParticipantName = options.avatarParticipantName || AVATAR_AGENT_NAME; @@ -156,7 +155,6 @@ export class AvatarSession { ); } - // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 109-115 lines try { const jobCtx = getJobContext(); this.localParticipantIdentity = jobCtx.agent?.identity || ''; @@ -173,7 +171,6 @@ export class AvatarSession { throw new LiveAvatarException('failed to get local participant identity'); } - // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 117-128 lines const at = new AccessToken(livekitApiKey, livekitApiSecret, { identity: this.avatarParticipantIdentity, name: this.avatarParticipantName, @@ -196,7 +193,6 @@ export class AvatarSession { throw new LiveAvatarException('avatar_id must be set'); } - // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 135-145 lines const sessionConfig = await this.api.createStreamingSession({ livekitUrl, livekitToken, @@ -217,7 +213,6 @@ export class AvatarSession { this.wsUrl = startData.data.ws_url; this.#logger.info('LiveAvatar streaming session started'); - // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 158-164 lines this.msgChannel = streamNs.createStreamChannel>(); agentSession.on(voice.AgentSessionEventTypes.AgentStateChanged, (ev) => { if (ev.newState === 'idle') { @@ -228,7 +223,6 @@ export class AvatarSession { this.closeMsgChannel(); }); - // Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 166-173 lines this.audioBuffer = new voice.QueueAudioOutput(SAMPLE_RATE); this.audioBuffer.on('clear_buffer', (ev: voice.QueueAudioOutputClearEvent) => this.onClearBuffer(ev), From 4c99192c55cac187e6fc91fc76a0bd368060273f Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 29 Apr 2026 05:18:53 +0000 Subject: [PATCH 9/9] fix: update pnpm-lock.yaml for liveavatar workspace dep in examples CI was failing on all three jobs (Build, Test, Formatting) because each runs `pnpm install --frozen-lockfile` as its first step. Commit 06e68c3 added `@livekit/agents-plugin-liveavatar` to `examples/package.json` but did not update the lockfile, so the frozen install rejected the manifest and cascaded into every downstream step reporting failure. This commit just adds the missing workspace link entry to pnpm-lock.yaml so the lockfile matches `examples/package.json`. https://claude.ai/code/session_01DE5pBrf3y1bFgLTK8NDTkB --- pnpm-lock.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8d0cca7ef..bf64fd7cd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -269,6 +269,9 @@ importers: '@livekit/agents-plugin-lemonslice': specifier: workspace:* version: link:../plugins/lemonslice + '@livekit/agents-plugin-liveavatar': + specifier: workspace:* + version: link:../plugins/liveavatar '@livekit/agents-plugin-livekit': specifier: workspace:* version: link:../plugins/livekit