diff --git a/.changeset/liveavatar-plugin.md b/.changeset/liveavatar-plugin.md new file mode 100644 index 000000000..041677f0d --- /dev/null +++ b/.changeset/liveavatar-plugin.md @@ -0,0 +1,10 @@ +--- +'@livekit/agents-plugin-liveavatar': patch +'@livekit/agents': patch +--- + +Port the `liveavatar` plugin from the Python `livekit-agents` repo, including the new `videoQuality` parameter from livekit/agents#5552. + +The new `@livekit/agents-plugin-liveavatar` package adds a LiveAvatar `AvatarSession` that mirrors the Python plugin: it brings up a LiveAvatar streaming session, opens the realtime websocket, captures the agent's audio output through a queue-based `AudioOutput`, resamples to 24 kHz mono, and forwards base64-encoded chunks (~600 ms first chunk, ~1 s subsequent) to the LiveAvatar service. Inbound websocket events drive playback start/finish notifications back into the `AgentSession`. + +Also exports `voice.AudioOutput` (and its companion `AudioOutputCapabilities` / `PlaybackFinishedEvent` / `PlaybackStartedEvent` types) from `@livekit/agents` so plugin authors can subclass the abstract audio sink. diff --git a/agents/src/voice/avatar/index.ts b/agents/src/voice/avatar/index.ts index f9d316a17..85c42287d 100644 --- a/agents/src/voice/avatar/index.ts +++ b/agents/src/voice/avatar/index.ts @@ -2,3 +2,4 @@ // // SPDX-License-Identifier: Apache-2.0 export * from './datastream_io.js'; +export * from './queue_io.js'; diff --git a/agents/src/voice/avatar/queue_io.ts b/agents/src/voice/avatar/queue_io.ts new file mode 100644 index 000000000..ee508f9c5 --- /dev/null +++ b/agents/src/voice/avatar/queue_io.ts @@ -0,0 +1,141 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { AudioFrame } from '@livekit/rtc-node'; +import { type StreamChannel, createStreamChannel } from '../../stream/stream_channel.js'; +import { AudioOutput } from '../io.js'; + +/** + * Sentinel value pushed onto the {@link QueueAudioOutput} stream when a segment of agent + * speech has finished (i.e. `flush()` was called). Consumers should treat it as an + * end-of-segment marker and emit the corresponding "speech ended" signal on their + * downstream protocol (websocket, RPC, etc.). + * + * Ref: python livekit-agents/livekit/agents/voice/avatar/_types.py (AudioSegmentEnd) + */ +export class AudioSegmentEnd {} + +/** + * One value emitted by {@link QueueAudioOutput.stream}: either an {@link AudioFrame} + * or an {@link AudioSegmentEnd} sentinel marking the end of a speech segment. + */ +export type QueueAudioOutputItem = AudioFrame | AudioSegmentEnd; + +/** + * Payload emitted with the {@link QueueAudioOutput} `'clear_buffer'` event. + * + * `wasCapturing` is set synchronously inside {@link QueueAudioOutput.clearBuffer} + * based on whether {@link QueueAudioOutput.captureFrame} had been called for the + * current segment. Consumers should use this — not their own asynchronous + * "is the avatar speaking?" flag — to decide whether to call + * {@link QueueAudioOutput.notifyPlaybackFinished}, otherwise an interrupt that + * lands in the window between `captureFrame` and the consumer's reader can leak + * `playbackSegmentsCount > playbackFinishedCount` and deadlock + * {@link AudioOutput.waitForPlayout}. + */ +export interface QueueAudioOutputClearEvent { + wasCapturing: boolean; +} + +/** + * AudioOutput implementation that buffers agent speech frames into a stream/queue so + * they can be consumed by an external transport (e.g. a custom websocket protocol used + * by an avatar plugin). Frames captured via {@link captureFrame} flow through the + * underlying stream as-is; on {@link flush} an {@link AudioSegmentEnd} sentinel is + * appended; on {@link clearBuffer} a `'clear_buffer'` event is emitted with a + * {@link QueueAudioOutputClearEvent} payload so the consumer can drop any in-flight + * bytes and notify upstream of an interruption. + * + * Mirrors Python's `livekit.agents.voice.avatar.QueueAudioOutput`. + * + * Ref: python livekit-agents/livekit/agents/voice/avatar/_queue_io.py + */ +export class QueueAudioOutput extends AudioOutput { + private readonly channel: StreamChannel = + createStreamChannel(); + private startedSegment = false; + + constructor(sampleRate?: number) { + super(sampleRate, undefined, { pause: false }); + } + + /** + * Returns the underlying readable stream of audio frames + end-of-segment sentinels. + * + * Each call returns the same shared stream; do not split between concurrent readers. + */ + stream(): ReturnType['stream']> { + return this.channel.stream(); + } + + /** True once {@link aclose} has been called. */ + get closed(): boolean { + return this.channel.closed; + } + + override async captureFrame(frame: AudioFrame): Promise { + await super.captureFrame(frame); + this.startedSegment = true; + if (!this.channel.closed) { + await this.channel.write(frame); + } + } + + override flush(): void { + super.flush(); + if (this.startedSegment && !this.channel.closed) { + // Best-effort write — the consumer drains on its own loop. + void this.channel.write(new AudioSegmentEnd()).catch(() => { + // channel closed concurrently; safe to drop the sentinel. + }); + this.startedSegment = false; + } + } + + override clearBuffer(): void { + // Capture the in-flight state synchronously so consumers can race-free decide + // whether to fire `notifyPlaybackFinished` (and avoid leaking the base class's + // `playbackSegmentsCount > playbackFinishedCount` bookkeeping). + const wasCapturing = this.startedSegment; + this.startedSegment = false; + // Always close an in-flight segment with an AudioSegmentEnd sentinel. + // The producer (e.g. the AgentSession forward-audio task) typically calls + // `flush()` *after* `clearBuffer()` once the segment is aborted, but at + // that point `startedSegment` is already `false` and `flush()` is a no-op + // for the sentinel write. Without writing it here, a downstream consumer + // that uses the sentinel as a "drop stale frames" reset signal would + // never see a boundary for the interrupted segment and would silently + // drop the entire next segment's audio. + if (wasCapturing && !this.channel.closed) { + void this.channel.write(new AudioSegmentEnd()).catch(() => { + // channel closed concurrently; safe to drop the sentinel. + }); + } + this.emit('clear_buffer', { wasCapturing } satisfies QueueAudioOutputClearEvent); + } + + /** Close the underlying stream so consumers see a graceful end-of-stream. */ + async aclose(): Promise { + if (!this.channel.closed) { + await this.channel.close(); + } + } + + /** + * Convenience wrapper around {@link AudioOutput.onPlaybackStarted} so a remote + * transport can announce "first byte played" without needing access to the + * protected method. + */ + notifyPlaybackStarted(createdAt: number = Date.now()): void { + this.onPlaybackStarted(createdAt); + } + + /** + * Convenience wrapper around {@link AudioOutput.onPlaybackFinished} so a remote + * transport can announce segment completion (or interruption) without needing + * access to the protected method. + */ + notifyPlaybackFinished(playbackPosition: number, interrupted: boolean): void { + this.onPlaybackFinished({ playbackPosition, interrupted }); + } +} diff --git a/agents/src/voice/index.ts b/agents/src/voice/index.ts index b9b3a62e7..808ac88c1 100644 --- a/agents/src/voice/index.ts +++ b/agents/src/voice/index.ts @@ -22,7 +22,13 @@ export { RoomSessionTransport, } from './remote_session.js'; export * from './events.js'; -export { type TimedString } from './io.js'; +export { + AudioOutput, + type AudioOutputCapabilities, + type PlaybackFinishedEvent, + type PlaybackStartedEvent, + type TimedString, +} from './io.js'; export * from './report.js'; export * from './room_io/index.js'; export { RunContext } from './run_context.js'; diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index 01dd31625..eee18f40e 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -80,7 +80,12 @@ abstract class BaseParticipantTranscriptionOutput extends TextOutput { this.trackId = track.sid; }; - protected onLocalTrackPublished = (track: LocalTrackPublication) => { + protected onLocalTrackPublished = (track: LocalTrackPublication | undefined) => { + if (!track) { + this.logger.warn('LocalTrackPublished event without publication payload'); + return; + } + if ( !this.participantIdentity || this.participantIdentity !== this.room.localParticipant?.identity || diff --git a/examples/package.json b/examples/package.json index 4718e0797..b3fa4cec8 100644 --- a/examples/package.json +++ b/examples/package.json @@ -33,6 +33,7 @@ "@livekit/agents-plugin-google": "workspace:*", "@livekit/agents-plugin-inworld": "workspace:*", "@livekit/agents-plugin-lemonslice": "workspace:*", + "@livekit/agents-plugin-liveavatar": "workspace:*", "@livekit/agents-plugin-livekit": "workspace:*", "@livekit/agents-plugin-neuphonic": "workspace:*", "@livekit/agents-plugin-openai": "workspace:*", diff --git a/examples/src/liveavatar_avatar.ts b/examples/src/liveavatar_avatar.ts new file mode 100644 index 000000000..fe502d3b3 --- /dev/null +++ b/examples/src/liveavatar_avatar.ts @@ -0,0 +1,102 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { + type JobContext, + type JobProcess, + ServerOptions, + cli, + defineAgent, + inference, + log, + metrics, + voice, +} from '@livekit/agents'; +import * as liveavatar from '@livekit/agents-plugin-liveavatar'; +import * as livekit from '@livekit/agents-plugin-livekit'; +import * as silero from '@livekit/agents-plugin-silero'; +import { fileURLToPath } from 'node:url'; + +export default defineAgent({ + prewarm: async (proc: JobProcess) => { + proc.userData.vad = await silero.VAD.load(); + }, + entry: async (ctx: JobContext) => { + const logger = log().child({ example: 'liveavatar_avatar' }); + + const avatarId = process.env.LIVEAVATAR_AVATAR_ID || undefined; + const videoQuality: liveavatar.VideoQuality | undefined = undefined; + + const session = new voice.AgentSession({ + stt: new inference.STT({ + model: 'deepgram/nova-3', + language: 'en', + }), + llm: new inference.LLM({ + model: 'openai/gpt-4.1-mini', + }), + tts: new inference.TTS({ + model: 'cartesia/sonic-3', + voice: '9626c31c-bec5-4cca-baa8-f8ba9e84c8bc', + }), + turnDetection: new livekit.turnDetector.MultilingualModel(), + vad: ctx.proc.userData.vad! as silero.VAD, + voiceOptions: { + preemptiveGeneration: true, + }, + }); + + await ctx.connect(); + + await session.start({ + agent: new voice.Agent({ + instructions: + 'You are a helpful avatar assistant. Keep responses concise, friendly, and natural.', + }), + room: ctx.room, + outputOptions: { + syncTranscription: false, + }, + }); + + const avatar = new liveavatar.AvatarSession({ + avatarId, + videoQuality, + }); + await avatar.start(session, ctx.room); + + session.on(voice.AgentSessionEventTypes.AgentStateChanged, (ev) => { + logger.info({ oldState: ev.oldState, newState: ev.newState }, 'Agent state changed'); + }); + + session.on(voice.AgentSessionEventTypes.UserStateChanged, (ev) => { + logger.info({ oldState: ev.oldState, newState: ev.newState }, 'User state changed'); + }); + + session.on(voice.AgentSessionEventTypes.UserInputTranscribed, (ev) => { + logger.info( + { final: ev.isFinal, transcript: ev.transcript, language: ev.language }, + 'User transcript received', + ); + }); + + session.on(voice.AgentSessionEventTypes.MetricsCollected, (ev) => { + metrics.logMetrics(ev.metrics); + }); + + session.on(voice.AgentSessionEventTypes.Error, (ev) => { + logger.error({ error: ev.error, source: ev.source }, 'Session emitted error'); + }); + + ctx.addShutdownCallback(async () => { + logger.info({ usage: session.usage }, 'Session usage summary'); + }); + + session.generateReply({ + instructions: + 'Greet the user, tell them this is a LiveAvatar example, and ask them to interrupt you mid-sentence.', + }); + }, +}); + +cli.runApp(new ServerOptions({ agent: fileURLToPath(import.meta.url) })); diff --git a/plugins/liveavatar/README.md b/plugins/liveavatar/README.md new file mode 100644 index 000000000..6462d6527 --- /dev/null +++ b/plugins/liveavatar/README.md @@ -0,0 +1,81 @@ + + +# LiveAvatar plugin for LiveKit Agents + +Support for [LiveAvatar](https://www.liveavatar.com) interactive avatars. + +This is the JS/TS port of the Python `livekit-plugins-liveavatar` plugin. See [https://docs.livekit.io/agents/models/avatar/plugins/liveavatar/](https://docs.livekit.io/agents/models/avatar/plugins/liveavatar/) for more information. + +## Installation + +```bash +npm install @livekit/agents-plugin-liveavatar +``` + +or + +```bash +pnpm add @livekit/agents-plugin-liveavatar +``` + +## Pre-requisites + +Create a developer API key from the LiveAvatar dashboard and set the `LIVEAVATAR_API_KEY` environment variable with it: + +```bash +export LIVEAVATAR_API_KEY= +``` + +## Usage + +```typescript +import { AvatarSession } from '@livekit/agents-plugin-liveavatar'; +import { AgentSession } from '@livekit/agents'; + +const avatarSession = new AvatarSession({ + avatarId: 'your-avatar-id', // or via LIVEAVATAR_AVATAR_ID + apiKey: process.env.LIVEAVATAR_API_KEY, + videoQuality: 'high', // optional: 'very_high' | 'high' | 'medium' | 'low' +}); + +await avatarSession.start(agentSession, room, { + livekitUrl: process.env.LIVEKIT_URL, + livekitApiKey: process.env.LIVEKIT_API_KEY, + livekitApiSecret: process.env.LIVEKIT_API_SECRET, +}); +``` + +## API + +### `AvatarSession` + +#### Constructor Options + +- `avatarId?: string` — The LiveAvatar avatar id. Falls back to `LIVEAVATAR_AVATAR_ID`. +- `apiUrl?: string` — Override the LiveAvatar API base URL. +- `apiKey?: string` — Your LiveAvatar API key. Falls back to `LIVEAVATAR_API_KEY`. +- `isSandbox?: boolean` — Use the LiveAvatar sandbox (1 minute connection limit). Defaults to `false`. +- `videoQuality?: 'very_high' | 'high' | 'medium' | 'low'` — Avatar video quality requested from the service. When omitted, the LiveAvatar service decides. +- `avatarParticipantIdentity?: string` — Identity for the avatar participant. Defaults to `'liveavatar-avatar-agent'`. +- `avatarParticipantName?: string` — Display name for the avatar participant. Defaults to `'liveavatar-avatar-agent'`. +- `connOptions?: APIConnectOptions` — API retry/timeout options. + +#### Methods + +##### `start(agentSession, room, options?)` + +Starts the avatar session, brings up a LiveAvatar streaming session, opens the realtime websocket, and routes the agent's audio output through to the avatar. + +**StartOptions:** + +- `livekitUrl?: string` — Falls back to `LIVEKIT_URL`. +- `livekitApiKey?: string` — Falls back to `LIVEKIT_API_KEY`. +- `livekitApiSecret?: string` — Falls back to `LIVEKIT_API_SECRET`. + +## License + +Apache 2.0 diff --git a/plugins/liveavatar/package.json b/plugins/liveavatar/package.json new file mode 100644 index 000000000..c4ec1d029 --- /dev/null +++ b/plugins/liveavatar/package.json @@ -0,0 +1,53 @@ +{ + "name": "@livekit/agents-plugin-liveavatar", + "version": "1.3.0", + "description": "LiveAvatar avatar plugin for LiveKit Node Agents", + "main": "dist/index.js", + "require": "dist/index.cjs", + "types": "dist/index.d.ts", + "exports": { + "import": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + }, + "require": { + "types": "./dist/index.d.cts", + "default": "./dist/index.cjs" + } + }, + "author": "LiveKit", + "type": "module", + "repository": "git@github.com:livekit/agents-js.git", + "license": "Apache-2.0", + "files": [ + "dist", + "src", + "README.md" + ], + "scripts": { + "build": "tsup --onSuccess \"pnpm build:types\"", + "build:types": "tsc --declaration --emitDeclarationOnly && node ../../scripts/copyDeclarationOutput.js", + "clean": "rm -rf dist", + "clean:build": "pnpm clean && pnpm build", + "lint": "eslint -f unix \"src/**/*.{ts,js}\"", + "api:check": "api-extractor run --typescript-compiler-folder ../../node_modules/typescript", + "api:update": "api-extractor run --local --typescript-compiler-folder ../../node_modules/typescript --verbose" + }, + "devDependencies": { + "@livekit/agents": "workspace:*", + "@livekit/rtc-node": "catalog:", + "@microsoft/api-extractor": "^7.35.0", + "@types/ws": "catalog:", + "pino": "^8.19.0", + "tsup": "^8.3.5", + "typescript": "^5.0.0" + }, + "dependencies": { + "livekit-server-sdk": "^2.13.3", + "ws": "catalog:" + }, + "peerDependencies": { + "@livekit/agents": "workspace:*", + "@livekit/rtc-node": "catalog:" + } +} diff --git a/plugins/liveavatar/src/api.ts b/plugins/liveavatar/src/api.ts new file mode 100644 index 000000000..a010662b7 --- /dev/null +++ b/plugins/liveavatar/src/api.ts @@ -0,0 +1,203 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { + type APIConnectOptions, + APIConnectionError, + APIStatusError, + DEFAULT_API_CONNECT_OPTIONS, +} from '@livekit/agents'; +import { log } from './log.js'; + +export const DEFAULT_API_URL = 'https://api.liveavatar.com/v1/sessions'; + +export type VideoQuality = 'very_high' | 'high' | 'medium' | 'low'; + +/** + * Exception thrown when the LiveAvatar plugin or the LiveAvatar service errors. + */ +export class LiveAvatarException extends Error { + constructor(message: string) { + super(message); + this.name = 'LiveAvatarException'; + } +} + +export interface CreateStreamingSessionOptions { + livekitUrl: string; + livekitToken: string; + roomName: string; + avatarId: string; + isSandbox?: boolean; + videoQuality?: VideoQuality | null; +} + +export interface SessionResponse { + data: { + session_id: string; + session_token: string; + [key: string]: unknown; + }; + code: number; + [key: string]: unknown; +} + +export interface StartSessionResponse { + data: { + ws_url: string; + [key: string]: unknown; + }; + code: number; + [key: string]: unknown; +} + +export interface StopSessionResponse { + data: Record; + code: number; + [key: string]: unknown; +} + +export interface LiveAvatarAPIOptions { + apiKey?: string; + apiUrl?: string; + connOptions?: APIConnectOptions; +} + +/** + * Thin client for the LiveAvatar HTTP API. + * + * Mirrors `livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py`. + */ +export class LiveAvatarAPI { + private apiKey: string; + private apiUrl: string; + private connOptions: APIConnectOptions; + + #logger = log(); + + constructor(options: LiveAvatarAPIOptions = {}) { + const apiKey = options.apiKey ?? process.env.LIVEAVATAR_API_KEY ?? ''; + if (!apiKey) { + throw new LiveAvatarException('api_key or LIVEAVATAR_API_KEY must be set'); + } + this.apiKey = apiKey; + this.apiUrl = options.apiUrl || DEFAULT_API_URL; + this.connOptions = options.connOptions || DEFAULT_API_CONNECT_OPTIONS; + } + + /** + * Create a new streaming session, returning the session id and session token. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py - 51-87 lines + */ + async createStreamingSession(opts: CreateStreamingSessionOptions): Promise { + const livekitConfig = { + livekit_room: opts.roomName, + livekit_url: opts.livekitUrl, + livekit_client_token: opts.livekitToken, + }; + + const payload: Record = { + mode: 'LITE', + avatar_id: opts.avatarId, + is_sandbox: opts.isSandbox ?? false, + livekit_config: livekitConfig, + }; + + if (opts.videoQuality != null) { + payload.video_quality = opts.videoQuality; + } + + const headers = { + accept: 'application/json', + 'content-type': 'application/json', + 'X-API-KEY': this.apiKey, + }; + return (await this.post('/token', payload, headers)) as SessionResponse; + } + + /** + * Start a previously created streaming session. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py - 92-97 lines + */ + async startStreamingSession( + sessionId: string, + sessionToken: string, + ): Promise { + const payload = { session_id: sessionId }; + const headers = { + 'content-type': 'application/json', + Authorization: `Bearer ${sessionToken}`, + }; + return (await this.post('/start', payload, headers)) as StartSessionResponse; + } + + /** + * Stop a running streaming session. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py - 99-107 lines + */ + async stopStreamingSession( + sessionId: string, + sessionToken: string, + ): Promise { + const payload = { session_id: sessionId, reason: 'USER_DISCONNECTED' }; + const headers = { + 'content-type': 'application/json', + Authorization: `Bearer ${sessionToken}`, + }; + return (await this.post('/stop', payload, headers)) as StopSessionResponse; + } + + /** + * POST helper with the same retry/backoff semantics as the Python plugin. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/api.py - 109-138 lines + */ + private async post( + endpoint: string, + payload: Record, + headers: Record, + ): Promise { + const url = this.apiUrl + endpoint; + const maxRetry = this.connOptions.maxRetry; + // `maxRetry` is the number of retries on top of the initial attempt, so we + // run up to `maxRetry + 1` total attempts. This matches the convention used + // by other agents-js plugins (e.g. lemonslice/runway) and ensures a single + // attempt still fires when callers configure `maxRetry: 0`. + for (let i = 0; i <= maxRetry; i++) { + try { + const response = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(payload), + signal: AbortSignal.timeout(this.connOptions.timeoutMs), + }); + if (!response.ok) { + const text = await response.text(); + throw new APIStatusError({ + message: `Server returned an error for ${url}: ${response.status}`, + options: { statusCode: response.status, body: { error: text } }, + }); + } + return await response.json(); + } catch (e) { + if (e instanceof APIStatusError && !e.retryable) { + throw e; + } + this.#logger.warn( + { error: String(e), url, attempt: i }, + `API request to ${url} failed on attempt ${i}`, + ); + } + + if (i < maxRetry) { + await new Promise((resolve) => setTimeout(resolve, this.connOptions.retryIntervalMs)); + } + } + throw new APIConnectionError({ + message: `Failed to call LiveAvatar API after ${maxRetry + 1} attempts`, + }); + } +} diff --git a/plugins/liveavatar/src/avatar.ts b/plugins/liveavatar/src/avatar.ts new file mode 100644 index 000000000..18c702e7d --- /dev/null +++ b/plugins/liveavatar/src/avatar.ts @@ -0,0 +1,639 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { + type APIConnectOptions, + APIConnectionError, + DEFAULT_API_CONNECT_OPTIONS, + Future, + getJobContext, + shortuuid, + stream as streamNs, + voice, +} from '@livekit/agents'; +import { type AudioFrame, AudioResampler, type Room } from '@livekit/rtc-node'; +import type { VideoGrant } from 'livekit-server-sdk'; +import { AccessToken } from 'livekit-server-sdk'; +import { type RawData, WebSocket } from 'ws'; +import { LiveAvatarAPI, LiveAvatarException, type VideoQuality } from './api.js'; +import { log } from './log.js'; + +const ATTRIBUTE_PUBLISH_ON_BEHALF = 'lk.publish_on_behalf'; +const SAMPLE_RATE = 24000; +const KEEP_ALIVE_INTERVAL_MS = 60_000; +const FIRST_CHUNK_THRESHOLD_MS = 600; +const SUBSEQUENT_CHUNK_THRESHOLD_MS = 1_000; +const AVATAR_AGENT_IDENTITY = 'liveavatar-avatar-agent'; +const AVATAR_AGENT_NAME = 'liveavatar-avatar-agent'; + +/** + * Options for configuring an AvatarSession. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 47-58 lines + */ +export interface AvatarSessionOptions { + /** The LiveAvatar avatar id. Falls back to the `LIVEAVATAR_AVATAR_ID` env var. */ + avatarId?: string; + /** Override the LiveAvatar API base URL. */ + apiUrl?: string; + /** LiveAvatar API key. Falls back to the `LIVEAVATAR_API_KEY` env var. */ + apiKey?: string; + /** When true, use the LiveAvatar sandbox (1 minute connection limit). */ + isSandbox?: boolean; + /** Avatar video quality. When omitted, the LiveAvatar service decides. */ + videoQuality?: VideoQuality; + /** Identity for the avatar participant. Defaults to `liveavatar-avatar-agent`. */ + avatarParticipantIdentity?: string; + /** Display name for the avatar participant. Defaults to `liveavatar-avatar-agent`. */ + avatarParticipantName?: string; + /** API retry/timeout options. */ + connOptions?: APIConnectOptions; +} + +/** + * Optional LiveKit credentials for {@link AvatarSession.start}; falls back to env vars. + */ +export interface StartOptions { + livekitUrl?: string; + livekitApiKey?: string; + livekitApiSecret?: string; +} + +/** + * A LiveAvatar interactive avatar session. + * + * This class manages the connection between a LiveKit agent and a LiveAvatar avatar: + * it brings up a LiveAvatar streaming session, opens the realtime websocket, captures + * the agent's audio output, and forwards it (resampled, base64-encoded) to the avatar + * service. Inbound websocket events drive playback start/finish notifications back into + * the agent session so the speech handle can complete correctly. + * + * @example + * ```typescript + * const avatar = new AvatarSession({ + * avatarId: 'your-avatar-id', + * apiKey: process.env.LIVEAVATAR_API_KEY, + * videoQuality: 'high', + * }); + * await avatar.start(agentSession, room); + * ``` + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py + */ +export class AvatarSession { + private avatarId: string | null; + private apiUrl?: string; + private apiKey: string; + private isSandbox: boolean; + private videoQuality: VideoQuality | null; + private avatarParticipantIdentity: string; + private avatarParticipantName: string; + private connOptions: APIConnectOptions; + + private api: LiveAvatarAPI; + private sessionId: string | null = null; + private sessionToken: string | null = null; + private wsUrl: string | null = null; + + private audioBuffer?: voice.QueueAudioOutput; + private msgChannel?: streamNs.StreamChannel>; + private msgChannelClosed = false; + private mainTaskPromise?: Promise; + private agentSession?: voice.AgentSession; + private room?: Room; + private localParticipantIdentity = ''; + + private audioResampler: AudioResampler | null = null; + private resamplerInputRate: number | null = null; + + private audioPlaying = false; + private avatarSpeaking = false; + private avatarInterrupted = false; + private playbackPosition = 0; + private sessionConnectedFuture: Future = new Future(); + private chunkInterrupted = false; + private closing = false; + + #logger = log(); + + constructor(options: AvatarSessionOptions = {}) { + this.avatarId = options.avatarId ?? process.env.LIVEAVATAR_AVATAR_ID ?? null; + this.apiUrl = options.apiUrl; + this.apiKey = options.apiKey ?? process.env.LIVEAVATAR_API_KEY ?? ''; + this.isSandbox = options.isSandbox ?? false; + this.videoQuality = options.videoQuality ?? null; + this.avatarParticipantIdentity = options.avatarParticipantIdentity || AVATAR_AGENT_IDENTITY; + this.avatarParticipantName = options.avatarParticipantName || AVATAR_AGENT_NAME; + this.connOptions = options.connOptions || DEFAULT_API_CONNECT_OPTIONS; + + this.api = new LiveAvatarAPI({ + apiKey: this.apiKey, + apiUrl: this.apiUrl, + connOptions: this.connOptions, + }); + } + + /** + * Start the avatar session and wire it into the agent session's audio output. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 89-178 lines + */ + async start( + agentSession: voice.AgentSession, + room: Room, + options: StartOptions = {}, + ): Promise { + this.agentSession = agentSession; + this.room = room; + + const livekitUrl = options.livekitUrl || process.env.LIVEKIT_URL; + const livekitApiKey = options.livekitApiKey || process.env.LIVEKIT_API_KEY; + const livekitApiSecret = options.livekitApiSecret || process.env.LIVEKIT_API_SECRET; + if (!livekitUrl || !livekitApiKey || !livekitApiSecret) { + throw new LiveAvatarException( + 'livekit_url, livekit_api_key, and livekit_api_secret must be set', + ); + } + + try { + const jobCtx = getJobContext(); + this.localParticipantIdentity = jobCtx.agent?.identity || ''; + if (!this.localParticipantIdentity && room.localParticipant) { + this.localParticipantIdentity = room.localParticipant.identity; + } + } catch { + if (!room.isConnected || !room.localParticipant) { + throw new LiveAvatarException('failed to get local participant identity'); + } + this.localParticipantIdentity = room.localParticipant.identity; + } + if (!this.localParticipantIdentity) { + throw new LiveAvatarException('failed to get local participant identity'); + } + + const at = new AccessToken(livekitApiKey, livekitApiSecret, { + identity: this.avatarParticipantIdentity, + name: this.avatarParticipantName, + }); + at.kind = 'agent'; + at.attributes = { [ATTRIBUTE_PUBLISH_ON_BEHALF]: this.localParticipantIdentity }; + at.addGrant({ + roomJoin: true, + room: room.name, + canPublish: true, + canSubscribe: true, + canPublishData: true, + canUpdateOwnMetadata: true, + canSubscribeMetrics: true, + } as VideoGrant); + + const livekitToken = await at.toJwt(); + + if (!this.avatarId) { + throw new LiveAvatarException('avatar_id must be set'); + } + + const sessionConfig = await this.api.createStreamingSession({ + livekitUrl, + livekitToken, + roomName: room.name ?? '', + avatarId: this.avatarId, + isSandbox: this.isSandbox, + videoQuality: this.videoQuality, + }); + this.sessionId = sessionConfig.data.session_id; + this.sessionToken = sessionConfig.data.session_token; + this.#logger.info({ sessionId: this.sessionId }, 'LiveAvatar session created'); + + if (!this.sessionId || !this.sessionToken) { + throw new LiveAvatarException('LiveAvatar session creation returned no session id/token'); + } + + const startData = await this.api.startStreamingSession(this.sessionId, this.sessionToken); + this.wsUrl = startData.data.ws_url; + this.#logger.info('LiveAvatar streaming session started'); + + this.msgChannel = streamNs.createStreamChannel>(); + agentSession.on(voice.AgentSessionEventTypes.AgentStateChanged, (ev) => { + if (ev.newState === 'idle') { + this.sendEvent({ type: 'agent.stop_listening', event_id: shortuuid() }); + } + }); + agentSession.on(voice.AgentSessionEventTypes.Close, () => { + this.closeMsgChannel(); + }); + + this.audioBuffer = new voice.QueueAudioOutput(SAMPLE_RATE); + this.audioBuffer.on('clear_buffer', (ev: voice.QueueAudioOutputClearEvent) => + this.onClearBuffer(ev), + ); + agentSession.output.audio = this.audioBuffer; + + // Spawn the main task with an attached error handler so a websocket open or + // protocol failure does not surface as an unhandled rejection. The main task + // itself handles its own cleanup in finally. + this.mainTaskPromise = this.mainTask().catch((e) => { + this.#logger.warn({ error: String(e) }, 'LiveAvatar main task failed'); + }); + + // Best-effort cleanup on job shutdown. + try { + const jobCtx = getJobContext(); + jobCtx.addShutdownCallback(async () => { + await this.aclose(); + }); + } catch { + // No active job context — caller is expected to manage lifecycle. + } + } + + /** + * Stop the avatar session, drain queues, and close the websocket. + */ + async aclose(): Promise { + this.closing = true; + this.closeMsgChannel(); + if (this.audioBuffer) { + await this.audioBuffer.aclose(); + } + if (this.mainTaskPromise) { + try { + await this.mainTaskPromise; + } catch { + // logged in mainTask + } + } + } + + /** + * Send an event over the websocket message queue. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 207-209 lines + */ + sendEvent(msg: Record): void { + if (!this.msgChannel || this.msgChannelClosed) return; + void this.msgChannel.write(msg).catch(() => { + // channel closed — drop the event + }); + } + + private closeMsgChannel(): void { + if (this.msgChannel && !this.msgChannelClosed) { + this.msgChannelClosed = true; + void this.msgChannel.close().catch(() => { + // ignore double-close + }); + } + } + + /** + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 180-196 lines + * + * Gates everything on the `wasCapturing` flag carried by the `clear_buffer` + * event (set synchronously inside `QueueAudioOutput.clearBuffer`): + * + * 1. `notifyPlaybackFinished` only fires when a segment was actually in + * flight, so the base class's segment-count bookkeeping stays balanced + * even when an interrupt lands in the window between `super.captureFrame` + * incrementing `playbackSegmentsCount` and `forwardAudio` consuming the + * frame. + * 2. `chunkInterrupted` is only flipped when there's an actual segment to + * interrupt. If `wasCapturing` is false (e.g. `clearBuffer` is called + * after `flush` has already written its `AudioSegmentEnd`), setting + * `chunkInterrupted` would otherwise carry over and discard the first + * frame of the *next* segment. + */ + private onClearBuffer(ev: voice.QueueAudioOutputClearEvent): void { + this.audioPlaying = false; + if (!ev.wasCapturing) { + return; + } + this.chunkInterrupted = true; + if (this.audioBuffer) { + this.audioBuffer.notifyPlaybackFinished(this.playbackPosition, true); + if (this.avatarSpeaking) { + this.sendEvent({ type: 'agent.interrupt', event_id: shortuuid() }); + } + this.playbackPosition = 0; + } + } + + /** + * Resample frame to {@link SAMPLE_RATE} mono. Mirrors the lazy resampler swap + * in the Python plugin: when the input rate changes we discard the old resampler. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 198-216 lines + */ + private *resampleAudio(frame: AudioFrame): IterableIterator { + if (this.audioResampler && this.resamplerInputRate !== frame.sampleRate) { + this.audioResampler.close(); + this.audioResampler = null; + this.resamplerInputRate = null; + } + if (!this.audioResampler && (frame.sampleRate !== SAMPLE_RATE || frame.channels !== 1)) { + this.audioResampler = new AudioResampler(frame.sampleRate, SAMPLE_RATE, 1); + this.resamplerInputRate = frame.sampleRate; + } + if (this.audioResampler) { + yield* this.audioResampler.push(frame); + } else { + yield frame; + } + } + + /** + * Main task: opens the websocket and runs forward/send/recv/keep-alive loops. + * + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 218-308 lines + */ + private async mainTask(): Promise { + if (!this.wsUrl) { + throw new LiveAvatarException('ws_url not set'); + } + + let ws: WebSocket | null = null; + let resetKeepAlive = () => {}; + const closingResolver = new Future(); + + try { + // Open the websocket inside the guarded section so DNS/TLS/network failures + // are routed through the same cleanup path as runtime errors. + const wsRef = new WebSocket(this.wsUrl); + await new Promise((resolve, reject) => { + wsRef.once('open', resolve); + wsRef.once('error', reject); + }); + ws = wsRef; + + const forwardAudio = async (): Promise => { + if (!this.audioBuffer) return; + await this.sessionConnectedFuture.await; + + let chunkBuf: Uint8Array[] = []; + let chunkDurationMs = 0; + let isFirstChunk = true; + // True between an interrupt and the next AudioSegmentEnd: drops any + // frames that were already queued from the interrupted segment so + // they don't bleed into the next segment's first chunk. + let interruptDraining = false; + + const flushChunk = () => { + if (chunkBuf.length === 0) return; + const total = chunkBuf.reduce((acc, c) => acc + c.length, 0); + const merged = new Uint8Array(total); + let offset = 0; + for (const c of chunkBuf) { + merged.set(c, offset); + offset += c.length; + } + const encoded = Buffer.from(merged).toString('base64'); + this.sendEvent({ type: 'agent.speak', event_id: shortuuid(), audio: encoded }); + this.playbackPosition += chunkDurationMs / 1000; + chunkBuf = []; + chunkDurationMs = 0; + isFirstChunk = false; + }; + + const discardChunk = () => { + chunkBuf = []; + chunkDurationMs = 0; + isFirstChunk = true; + }; + + const reader = this.audioBuffer.stream().getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (this.chunkInterrupted) { + this.chunkInterrupted = false; + discardChunk(); + interruptDraining = true; + } + + if (value instanceof voice.AudioSegmentEnd) { + // The interrupted segment has fully drained; resume normal + // operation for the next segment. + interruptDraining = false; + flushChunk(); + this.sendEvent({ type: 'agent.speak_end', event_id: shortuuid() }); + this.sendEvent({ type: 'agent.start_listening', event_id: shortuuid() }); + isFirstChunk = true; + continue; + } + + if (interruptDraining) { + // Drop any frame from the interrupted segment that was already + // sitting in the channel; they would otherwise leak into the + // next segment's audio. + continue; + } + + if (!this.audioPlaying) { + this.audioPlaying = true; + } + for (const resampled of this.resampleAudio(value)) { + chunkBuf.push(new Uint8Array(resampled.data.buffer)); + const frameDurationMs = (resampled.samplesPerChannel / resampled.sampleRate) * 1000; + chunkDurationMs += frameDurationMs; + const thresholdMs = isFirstChunk + ? FIRST_CHUNK_THRESHOLD_MS + : SUBSEQUENT_CHUNK_THRESHOLD_MS; + if (chunkDurationMs >= thresholdMs) { + flushChunk(); + } + } + } + } finally { + reader.releaseLock(); + } + }; + + const sendTask = async (): Promise => { + if (!this.msgChannel) return; + const reader = this.msgChannel.stream().getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + try { + wsRef.send(JSON.stringify(value)); + resetKeepAlive(); + } catch (e) { + this.#logger.warn({ error: String(e) }, 'failed to send LiveAvatar event'); + break; + } + } + } finally { + reader.releaseLock(); + try { + wsRef.close(); + } catch { + // ignore + } + this.closing = true; + closingResolver.resolve(); + } + }; + + const recvTask = async (): Promise => { + const queue: RawData[] = []; + let waiter: ((m: RawData) => void) | null = null; + + wsRef.on('message', (data: RawData) => { + if (waiter) { + const w = waiter; + waiter = null; + w(data); + } else { + queue.push(data); + } + }); + const closedFuture = new Future(); + wsRef.on('close', () => closedFuture.resolve()); + wsRef.on('error', () => closedFuture.resolve()); + + const nextMessage = (): Promise => + new Promise((resolve) => { + if (queue.length > 0) { + resolve(queue.shift()!); + return; + } + waiter = (m: RawData) => resolve(m); + void closedFuture.await.then(() => { + if (waiter) { + waiter = null; + resolve(null); + } + }); + }); + + while (true) { + const msg = await nextMessage(); + if (msg === null) { + if (this.closing) return; + if (this.isSandbox) { + this.#logger.warn('The LiveAvatar Sandbox connection surpassed the 1 minute limit'); + return; + } + throw new APIConnectionError({ + message: 'LiveAvatar connection closed unexpectedly.', + }); + } + let parsed: { type?: string; state?: string }; + try { + parsed = JSON.parse(msg.toString()) as { type?: string; state?: string }; + } catch { + continue; + } + switch (parsed.type) { + case 'session.state_updated': + this.#logger.debug({ state: parsed.state }, 'LiveAvatar session state'); + if (parsed.state === 'connected') { + if (!this.sessionConnectedFuture.done) { + this.sessionConnectedFuture.resolve(); + } + } + break; + case 'agent.speak_interrupted': + this.handleAgentSpeakInterrupted(); + break; + case 'agent.speak_ended': + this.handleAgentSpeakEnded(); + break; + case 'agent.speak_started': + this.handleAgentSpeakStarted(); + break; + default: + this.#logger.debug({ type: parsed.type }, 'Unhandled LiveAvatar event'); + } + } + }; + + const keepAliveTask = async (): Promise => { + await this.sessionConnectedFuture.await; + let timer: NodeJS.Timeout | null = null; + const tick = () => { + if (this.closing) return; + this.sendEvent({ type: 'session.keep_alive', event_id: shortuuid() }); + timer = setTimeout(tick, KEEP_ALIVE_INTERVAL_MS); + }; + timer = setTimeout(tick, KEEP_ALIVE_INTERVAL_MS); + resetKeepAlive = () => { + if (timer) clearTimeout(timer); + if (!this.closing) { + timer = setTimeout(tick, KEEP_ALIVE_INTERVAL_MS); + } + }; + await closingResolver.await; + if (timer) clearTimeout(timer); + }; + + await Promise.race([forwardAudio(), sendTask(), recvTask(), keepAliveTask()]); + } catch (e) { + this.#logger.warn({ error: String(e) }, 'LiveAvatar main task error'); + } finally { + this.closing = true; + closingResolver.resolve(); + + try { + if (this.sessionId && this.sessionToken) { + const data = await this.api.stopStreamingSession(this.sessionId, this.sessionToken); + // Mirrors python livekit-plugins-liveavatar/.../avatar.py - 304 line + if (data.code <= 200) { + this.#logger.info({ sessionId: this.sessionId }, 'LiveAvatar session stopped'); + } + } + } catch (e) { + this.#logger.warn({ error: String(e) }, 'Failed to stop LiveAvatar session'); + } + + if (this.audioBuffer) { + await this.audioBuffer.aclose(); + } + if (this.audioResampler) { + try { + this.audioResampler.close(); + } catch { + // ignore + } + this.audioResampler = null; + } + if (ws) { + try { + ws.close(); + } catch { + // ignore + } + } + } + } + + /** + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 310-311 lines + */ + private handleAgentSpeakInterrupted(): void { + this.avatarInterrupted = true; + } + + /** + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 313-322 lines + */ + private handleAgentSpeakEnded(): void { + this.avatarSpeaking = false; + if (!this.avatarInterrupted && this.audioBuffer) { + this.audioBuffer.notifyPlaybackFinished(this.playbackPosition, false); + this.playbackPosition = 0; + this.audioPlaying = false; + } + } + + /** + * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py - 324-327 lines + */ + private handleAgentSpeakStarted(): void { + this.avatarSpeaking = true; + this.avatarInterrupted = false; + this.audioBuffer?.notifyPlaybackStarted(); + } +} diff --git a/plugins/liveavatar/src/index.ts b/plugins/liveavatar/src/index.ts new file mode 100644 index 000000000..46567f780 --- /dev/null +++ b/plugins/liveavatar/src/index.ts @@ -0,0 +1,19 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { Plugin } from '@livekit/agents'; + +export * from './api.js'; +export * from './avatar.js'; + +class LiveAvatarPlugin extends Plugin { + constructor() { + super({ + title: 'liveavatar', + version: __PACKAGE_VERSION__, + package: __PACKAGE_NAME__, + }); + } +} + +Plugin.registerPlugin(new LiveAvatarPlugin()); diff --git a/plugins/liveavatar/src/log.ts b/plugins/liveavatar/src/log.ts new file mode 100644 index 000000000..08acfaf3e --- /dev/null +++ b/plugins/liveavatar/src/log.ts @@ -0,0 +1,7 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { log as agentsLog } from '@livekit/agents'; +import type { Logger } from 'pino'; + +export const log = (): Logger => agentsLog().child({ plugin: 'liveavatar' }); diff --git a/plugins/liveavatar/tsconfig.json b/plugins/liveavatar/tsconfig.json new file mode 100644 index 000000000..46d415de9 --- /dev/null +++ b/plugins/liveavatar/tsconfig.json @@ -0,0 +1,15 @@ +{ + "extends": "../../tsconfig.json", + "include": ["./src"], + "compilerOptions": { + "rootDir": "./src", + "declarationDir": "./dist", + "outDir": "./dist" + }, + "typedocOptions": { + "name": "plugins/agents-plugin-liveavatar", + "entryPointStrategy": "resolve", + "readme": "none", + "entryPoints": ["src/index.ts"] + } +} diff --git a/plugins/liveavatar/tsup.config.ts b/plugins/liveavatar/tsup.config.ts new file mode 100644 index 000000000..46011fa8c --- /dev/null +++ b/plugins/liveavatar/tsup.config.ts @@ -0,0 +1,9 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { defineConfig } from 'tsup'; +import defaults from '../../tsup.config.js'; + +export default defineConfig({ + ...defaults, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ae1210ae1..bf64fd7cd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -269,6 +269,9 @@ importers: '@livekit/agents-plugin-lemonslice': specifier: workspace:* version: link:../plugins/lemonslice + '@livekit/agents-plugin-liveavatar': + specifier: workspace:* + version: link:../plugins/liveavatar '@livekit/agents-plugin-livekit': specifier: workspace:* version: link:../plugins/livekit @@ -742,6 +745,37 @@ importers: specifier: ^5.0.0 version: 5.9.3 + plugins/liveavatar: + dependencies: + livekit-server-sdk: + specifier: ^2.13.3 + version: 2.14.1 + ws: + specifier: 'catalog:' + version: 8.20.0 + devDependencies: + '@livekit/agents': + specifier: workspace:* + version: link:../../agents + '@livekit/rtc-node': + specifier: 'catalog:' + version: 0.13.25 + '@microsoft/api-extractor': + specifier: ^7.35.0 + version: 7.43.7(@types/node@22.19.1) + '@types/ws': + specifier: 'catalog:' + version: 8.5.10 + pino: + specifier: ^8.19.0 + version: 8.21.0 + tsup: + specifier: ^8.3.5 + version: 8.4.0(@microsoft/api-extractor@7.43.7(@types/node@22.19.1))(postcss@8.5.9)(tsx@4.21.0)(typescript@5.9.3) + typescript: + specifier: ^5.0.0 + version: 5.9.3 + plugins/livekit: dependencies: '@huggingface/hub': diff --git a/turbo.json b/turbo.json index 113bda809..4742bab09 100644 --- a/turbo.json +++ b/turbo.json @@ -27,6 +27,9 @@ "LEMONSLICE_API_KEY", "LEMONSLICE_API_URL", "LEMONSLICE_IMAGE_URL", + "LIVEAVATAR_API_KEY", + "LIVEAVATAR_API_URL", + "LIVEAVATAR_AVATAR_ID", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET", "LIVEKIT_INFERENCE_API_KEY",