From e2ff3aefe62aadf8eb8a13c5981b1e0f61b277b4 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 15 Apr 2026 14:05:53 -0700 Subject: [PATCH 1/5] save --- agents/src/functional/index.ts | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 agents/src/functional/index.ts diff --git a/agents/src/functional/index.ts b/agents/src/functional/index.ts new file mode 100644 index 000000000..e69de29bb From 38399d843a9448b6af9684a15caaaa16a336af8a Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 15 Apr 2026 17:08:31 -0700 Subject: [PATCH 2/5] functional agent first draft --- agents/package.json | 24 +- .../src/functional/create_agent_template.ts | 419 ++++++++++++++++++ agents/src/functional/index.ts | 20 + agents/src/functional/types.ts | 236 ++++++++++ agents/src/utils.test.ts | 310 ++++++++++++- agents/src/utils.ts | 94 +++- agents/src/voice/audio_recognition.ts | 2 +- .../voice/audio_recognition_handoff.test.ts | 2 +- examples/src/functional_agent.ts | 119 +++++ 9 files changed, 1215 insertions(+), 11 deletions(-) create mode 100644 agents/src/functional/create_agent_template.ts create mode 100644 agents/src/functional/types.ts create mode 100644 examples/src/functional_agent.ts diff --git a/agents/package.json b/agents/package.json index b66cea054..b11d65709 100644 --- a/agents/package.json +++ b/agents/package.json @@ -6,13 +6,25 @@ "require": "dist/index.cjs", "types": "dist/index.d.ts", "exports": { - "import": { - "types": "./dist/index.d.ts", - "default": "./dist/index.js" + ".": { + "import": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + }, + "require": { + "types": "./dist/index.d.cts", + "default": "./dist/index.cjs" + } }, - "require": { - "types": "./dist/index.d.cts", - "default": "./dist/index.cjs" + "./functional": { + "import": { + "types": "./dist/functional/index.d.ts", + "default": "./dist/functional/index.js" + }, + "require": { + "types": "./dist/functional/index.d.cts", + "default": "./dist/functional/index.cjs" + } } }, "author": "LiveKit", diff --git a/agents/src/functional/create_agent_template.ts b/agents/src/functional/create_agent_template.ts new file mode 100644 index 000000000..d1aa84006 --- /dev/null +++ b/agents/src/functional/create_agent_template.ts @@ -0,0 +1,419 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { AudioFrame } from '@livekit/rtc-node'; +import type { ReadableStream } from 'node:stream/web'; +import type { ReadonlyChatContext } from '../llm/chat_context.js'; +import type { ChatContext, ChatMessage, LLM, RealtimeModel } from '../llm/index.js'; +import { type ChatChunk, type ToolContext, isFunctionTool, tool } from '../llm/index.js'; +import type { JSONObject, ToolExecuteFunction, ToolInputSchema } from '../llm/tool_context.js'; +import type { STT, SpeechEvent } from '../stt/index.js'; +import type { TTS } from '../tts/index.js'; +import { asyncIterableToReadableStream, readableStreamToAsyncIterable } from '../utils.js'; +import type { VAD } from '../vad.js'; +import { Agent, type AgentOptions, type ModelSettings } from '../voice/agent.js'; +import type { AgentSession } from '../voice/agent_session.js'; +import type { TimedString } from '../voice/io.js'; +import type { TurnHandlingOptions } from '../voice/turn_config/turn_handling.js'; +import { AGENT_TEMPLATE_ID, AgentContextNotReadyError } from './types.js'; +import type { + AgentBuilderContext, + AgentTemplate, + AgentTemplateConfigureOptions, + LLMNodeFn, + RealtimeAudioOutputNodeFn, + STTNodeFn, + TTSNodeFn, + ToolInput, + TranscriptionNodeFn, +} from './types.js'; + +// --------------------------------------------------------------------------- +// Template registry +// --------------------------------------------------------------------------- + +let nextTemplateId = 0; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const templateRegistry = new Map>(); + +/** @internal */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function getTemplateById(id: number): AgentTemplate | undefined { + return templateRegistry.get(id); +} + +// --------------------------------------------------------------------------- +// Accumulated builder state +// --------------------------------------------------------------------------- + +type _BuilderState = { + configured: boolean; + configureOptions: AgentTemplateConfigureOptions | null; + tools: ToolContext; + onEnterCb: (() => Promise) | null; + onExitCb: (() => Promise) | null; + onUserTurnCompletedCb: ((chatCtx: ChatContext, newMessage: ChatMessage) => Promise) | null; + sttNodeFn: STTNodeFn | null; + llmNodeFn: LLMNodeFn | null; + ttsNodeFn: TTSNodeFn | null; + transcriptionNodeFn: TranscriptionNodeFn | null; + realtimeAudioOutputNodeFn: RealtimeAudioOutputNodeFn | null; +}; + +// --------------------------------------------------------------------------- +// _FunctionalAgent (private subclass of voice.Agent) +// --------------------------------------------------------------------------- + +class _FunctionalAgent extends Agent { + readonly #onEnterCb: (() => Promise) | null; + readonly #onExitCb: (() => Promise) | null; + readonly #onUserTurnCompletedCb: + | ((chatCtx: ChatContext, newMessage: ChatMessage) => Promise) + | null; + readonly #sttNodeFn: STTNodeFn | null; + readonly #llmNodeFn: LLMNodeFn | null; + readonly #ttsNodeFn: TTSNodeFn | null; + readonly #transcriptionNodeFn: TranscriptionNodeFn | null; + readonly #realtimeAudioOutputNodeFn: RealtimeAudioOutputNodeFn | null; + + constructor(options: AgentOptions, state: _BuilderState) { + super(options); + this.#onEnterCb = state.onEnterCb; + this.#onExitCb = state.onExitCb; + this.#onUserTurnCompletedCb = state.onUserTurnCompletedCb; + this.#sttNodeFn = state.sttNodeFn; + this.#llmNodeFn = state.llmNodeFn; + this.#ttsNodeFn = state.ttsNodeFn; + this.#transcriptionNodeFn = state.transcriptionNodeFn; + this.#realtimeAudioOutputNodeFn = state.realtimeAudioOutputNodeFn; + } + + override async onEnter(): Promise { + if (this.#onEnterCb) { + await this.#onEnterCb(); + } + } + + override async onExit(): Promise { + if (this.#onExitCb) { + await this.#onExitCb(); + } + } + + override async onUserTurnCompleted(chatCtx: ChatContext, newMessage: ChatMessage): Promise { + if (this.#onUserTurnCompletedCb) { + await this.#onUserTurnCompletedCb(chatCtx, newMessage); + } + } + + override async sttNode( + audio: ReadableStream, + modelSettings: ModelSettings, + ): Promise | null> { + if (!this.#sttNodeFn) { + return Agent.default.sttNode(this, audio, modelSettings); + } + const asyncAudio = readableStreamToAsyncIterable(audio); + const result = await this.#sttNodeFn(asyncAudio, modelSettings); + return result ? asyncIterableToReadableStream(result) : null; + } + + override async llmNode( + chatCtx: ChatContext, + toolCtx: ToolContext, + modelSettings: ModelSettings, + ): Promise | null> { + if (!this.#llmNodeFn) { + return Agent.default.llmNode(this, chatCtx, toolCtx, modelSettings); + } + const result = await this.#llmNodeFn(chatCtx, toolCtx, modelSettings); + return result ? asyncIterableToReadableStream(result) : null; + } + + override async ttsNode( + text: ReadableStream, + modelSettings: ModelSettings, + ): Promise | null> { + if (!this.#ttsNodeFn) { + return Agent.default.ttsNode(this, text, modelSettings); + } + const asyncText = readableStreamToAsyncIterable(text); + const result = await this.#ttsNodeFn(asyncText, modelSettings); + return result ? asyncIterableToReadableStream(result) : null; + } + + override async transcriptionNode( + text: ReadableStream, + modelSettings: ModelSettings, + ): Promise | null> { + if (!this.#transcriptionNodeFn) { + return Agent.default.transcriptionNode(this, text, modelSettings); + } + const asyncText = readableStreamToAsyncIterable(text); + const result = await this.#transcriptionNodeFn(asyncText, modelSettings); + return result ? asyncIterableToReadableStream(result) : null; + } + + override async realtimeAudioOutputNode( + audio: ReadableStream, + modelSettings: ModelSettings, + ): Promise | null> { + if (!this.#realtimeAudioOutputNodeFn) { + return Agent.default.realtimeAudioOutputNode(this, audio, modelSettings); + } + const asyncAudio = readableStreamToAsyncIterable(audio); + const result = await this.#realtimeAudioOutputNodeFn(asyncAudio, modelSettings); + return result ? asyncIterableToReadableStream(result) : null; + } +} + +// --------------------------------------------------------------------------- +// Runtime getter helper +// --------------------------------------------------------------------------- + +function runtimeGetter( + agentRef: { current: Agent | null }, + propertyName: string, + accessor: (agent: Agent) => T, +): T { + if (!agentRef.current) { + throw new AgentContextNotReadyError(propertyName); + } + return accessor(agentRef.current); +} + +// --------------------------------------------------------------------------- +// createAgentTemplate +// --------------------------------------------------------------------------- + +export function createAgentTemplate( + builder: (ctx: AgentBuilderContext, props: Props) => void, +): AgentTemplate { + const templateId = nextTemplateId++; + + const factory = ((...args: Props extends void ? [] : [props: Props]): Agent => { + const props = args[0] as Props; + const agentRef: { current: Agent | null } = { current: null }; + + const state: _BuilderState = { + configured: false, + configureOptions: null, + tools: {}, + onEnterCb: null, + onExitCb: null, + onUserTurnCompletedCb: null, + sttNodeFn: null, + llmNodeFn: null, + ttsNodeFn: null, + transcriptionNodeFn: null, + realtimeAudioOutputNodeFn: null, + }; + + // --- Build the context object --- + + const ctx: AgentBuilderContext = { + configure(options: AgentTemplateConfigureOptions): void { + if (state.configured) { + throw new Error('ctx.configure() can only be called once per agent definition.'); + } + state.configured = true; + state.configureOptions = options; + }, + + tool(name: string, toolInput: ToolInput) { + if (name in state.tools) { + throw new Error( + `Tool '${name}' is already registered. Each tool must have a unique name.`, + ); + } + if (isFunctionTool(toolInput)) { + state.tools[name] = toolInput; + } else if (toolInput.parameters) { + state.tools[name] = tool({ + description: toolInput.description, + parameters: toolInput.parameters as ToolInputSchema, + execute: toolInput.execute as ToolExecuteFunction, + flags: toolInput.flags, + }); + } else { + state.tools[name] = tool({ + description: toolInput.description, + execute: toolInput.execute as ToolExecuteFunction>, + flags: toolInput.flags, + }); + } + }, + + onEnter(callback: () => Promise): void { + state.onEnterCb = callback; + }, + onExit(callback: () => Promise): void { + state.onExitCb = callback; + }, + onUserTurnCompleted( + callback: (chatCtx: ChatContext, newMessage: ChatMessage) => Promise, + ): void { + state.onUserTurnCompletedCb = callback; + }, + + sttNode(fn: STTNodeFn): void { + state.sttNodeFn = fn; + }, + llmNode(fn: LLMNodeFn): void { + state.llmNodeFn = fn; + }, + ttsNode(fn: TTSNodeFn): void { + state.ttsNodeFn = fn; + }, + transcriptionNode(fn: TranscriptionNodeFn): void { + state.transcriptionNodeFn = fn; + }, + realtimeAudioOutputNode(fn: RealtimeAudioOutputNodeFn): void { + state.realtimeAudioOutputNodeFn = fn; + }, + + async defaultSttNode( + audio: AsyncIterable, + modelSettings: ModelSettings, + ): Promise> { + const agent = runtimeGetter(agentRef, 'defaultSttNode', (a) => a); + const stream = await Agent.default.sttNode( + agent, + asyncIterableToReadableStream(audio), + modelSettings, + ); + if (!stream) { + return (async function* () {})(); + } + return readableStreamToAsyncIterable(stream); + }, + + async defaultLlmNode( + chatCtx: ChatContext, + toolCtx: ToolContext, + modelSettings: ModelSettings, + ): Promise> { + const agent = runtimeGetter(agentRef, 'defaultLlmNode', (a) => a); + const stream = await Agent.default.llmNode(agent, chatCtx, toolCtx, modelSettings); + if (!stream) { + return (async function* () {})(); + } + return readableStreamToAsyncIterable(stream); + }, + + async defaultTtsNode( + text: AsyncIterable, + modelSettings: ModelSettings, + ): Promise> { + const agent = runtimeGetter(agentRef, 'defaultTtsNode', (a) => a); + const stream = await Agent.default.ttsNode( + agent, + asyncIterableToReadableStream(text), + modelSettings, + ); + if (!stream) { + return (async function* () {})(); + } + return readableStreamToAsyncIterable(stream); + }, + + async defaultTranscriptionNode( + text: AsyncIterable, + modelSettings: ModelSettings, + ): Promise> { + const agent = runtimeGetter(agentRef, 'defaultTranscriptionNode', (a) => a); + const stream = await Agent.default.transcriptionNode( + agent, + asyncIterableToReadableStream(text), + modelSettings, + ); + if (!stream) { + return (async function* () {})(); + } + return readableStreamToAsyncIterable(stream); + }, + + async defaultRealtimeAudioOutputNode( + audio: AsyncIterable, + modelSettings: ModelSettings, + ): Promise> { + const agent = runtimeGetter(agentRef, 'defaultRealtimeAudioOutputNode', (a) => a); + const stream = await Agent.default.realtimeAudioOutputNode( + agent, + asyncIterableToReadableStream(audio), + modelSettings, + ); + if (!stream) { + return (async function* () {})(); + } + return readableStreamToAsyncIterable(stream); + }, + + // Runtime getters + get session(): AgentSession { + return runtimeGetter(agentRef, 'session', (a) => a.session); + }, + get chatCtx(): ReadonlyChatContext { + return runtimeGetter(agentRef, 'chatCtx', (a) => a.chatCtx); + }, + get toolCtx(): ToolContext { + return runtimeGetter(agentRef, 'toolCtx', (a) => a.toolCtx); + }, + get id(): string { + return runtimeGetter(agentRef, 'id', (a) => a.id); + }, + get instructions(): string { + return runtimeGetter(agentRef, 'instructions', (a) => a.instructions); + }, + get stt(): STT | undefined { + return runtimeGetter(agentRef, 'stt', (a) => a.stt); + }, + get llm(): LLM | RealtimeModel | undefined { + return runtimeGetter(agentRef, 'llm', (a) => a.llm); + }, + get tts(): TTS | undefined { + return runtimeGetter(agentRef, 'tts', (a) => a.tts); + }, + get vad(): VAD | undefined { + return runtimeGetter(agentRef, 'vad', (a) => a.vad); + }, + get turnHandling(): Partial | undefined { + return runtimeGetter(agentRef, 'turnHandling', (a) => a.turnHandling); + }, + get minConsecutiveSpeechDelay(): number | undefined { + return runtimeGetter( + agentRef, + 'minConsecutiveSpeechDelay', + (a) => a.minConsecutiveSpeechDelay, + ); + }, + }; + + // --- Run the builder --- + builder(ctx, props); + + // --- Validate & construct --- + if (!state.configureOptions) { + throw new Error( + 'ctx.configure() must be called during agent definition to provide at least `instructions`.', + ); + } + + const agentOptions: AgentOptions = { + ...state.configureOptions, + id: `functional_agent_${templateId}`, + tools: Object.keys(state.tools).length > 0 ? state.tools : undefined, + }; + + const agent = new _FunctionalAgent(agentOptions, state); + agentRef.current = agent; + + return agent; + }) as AgentTemplate; + + Object.defineProperty(factory, AGENT_TEMPLATE_ID, { value: templateId, writable: false }); + templateRegistry.set(templateId, factory); + + return factory; +} diff --git a/agents/src/functional/index.ts b/agents/src/functional/index.ts index e69de29bb..16628a337 100644 --- a/agents/src/functional/index.ts +++ b/agents/src/functional/index.ts @@ -0,0 +1,20 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +export { createAgentTemplate } from './create_agent_template.js'; + +export { + AGENT_TEMPLATE_ID, + AgentContextNotReadyError, + type AgentBuilderContext, + type AgentTemplate, + type AgentTemplateConfigureOptions, + type LLMNodeFn, + type RealtimeAudioOutputNodeFn, + type STTNodeFn, + type ToolInput, + type TranscriptionNodeFn, + type TTSNodeFn, +} from './types.js'; + +export { asyncIterableToReadableStream, readableStreamToAsyncIterable } from '../utils.js'; diff --git a/agents/src/functional/types.ts b/agents/src/functional/types.ts new file mode 100644 index 000000000..069638550 --- /dev/null +++ b/agents/src/functional/types.ts @@ -0,0 +1,236 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { AudioFrame } from '@livekit/rtc-node'; +import type { Throws } from '@livekit/throws-transformer/throws'; +import type { LLMModels, STTModelString, TTSModelString } from '../inference/index.js'; +import type { ReadonlyChatContext } from '../llm/chat_context.js'; +import type { + ChatChunk, + ChatContext, + ChatMessage, + FunctionTool, + LLM, + RealtimeModel, +} from '../llm/index.js'; +import { type ToolContext } from '../llm/index.js'; +import type { + InferToolInput, + JSONObject, + ToolExecuteFunction, + ToolInputSchema, + ToolOptions, +} from '../llm/tool_context.js'; +import type { STT, SpeechEvent } from '../stt/index.js'; +import type { TTS } from '../tts/index.js'; +import type { VAD } from '../vad.js'; +import type { Agent, ModelSettings } from '../voice/agent.js'; +import type { AgentSession } from '../voice/agent_session.js'; +import type { TimedString } from '../voice/io.js'; +import type { TurnHandlingOptions } from '../voice/turn_config/turn_handling.js'; + +// --------------------------------------------------------------------------- +// Errors +// --------------------------------------------------------------------------- + +/** + * Thrown when a runtime-only property on {@link AgentBuilderContext} is accessed + * during the agent definition callback, before the agent instance exists. + */ +export class AgentContextNotReadyError extends Error { + constructor(propertyName: string) { + super( + `ctx.${propertyName} is not available during agent definition. ` + + 'It can only be accessed inside hook callbacks (onEnter, onExit, onUserTurnCompleted) ' + + 'or tool execute functions.', + ); + this.name = 'AgentContextNotReadyError'; + } +} + +// --------------------------------------------------------------------------- +// Template identity +// --------------------------------------------------------------------------- + +export const AGENT_TEMPLATE_ID = Symbol.for('AGENT_TEMPLATE_ID'); + +// --------------------------------------------------------------------------- +// Pipeline node function types (user-facing, AsyncIterable-based) +// --------------------------------------------------------------------------- + +export type STTNodeFn = ( + audio: AsyncIterable, + modelSettings: ModelSettings, +) => Promise | null>; + +export type LLMNodeFn = ( + chatCtx: ChatContext, + toolCtx: ToolContext, + modelSettings: ModelSettings, +) => Promise | null>; + +export type TTSNodeFn = ( + text: AsyncIterable, + modelSettings: ModelSettings, +) => Promise | null>; + +export type TranscriptionNodeFn = ( + text: AsyncIterable, + modelSettings: ModelSettings, +) => Promise | null>; + +export type RealtimeAudioOutputNodeFn = ( + audio: AsyncIterable, + modelSettings: ModelSettings, +) => Promise | null>; + +// --------------------------------------------------------------------------- +// Configure options (subset of AgentOptions, minus id and tools) +// --------------------------------------------------------------------------- + +export interface AgentTemplateConfigureOptions { + instructions: string; + chatCtx?: ChatContext; + stt?: STT | STTModelString; + vad?: VAD; + llm?: LLM | RealtimeModel | LLMModels; + tts?: TTS | TTSModelString; + turnHandling?: TurnHandlingOptions; + minConsecutiveSpeechDelay?: number; + useTtsAlignedTranscript?: boolean; +} + +// --------------------------------------------------------------------------- +// Tool input: either a pre-built FunctionTool or inline { description, ... } +// --------------------------------------------------------------------------- + +export interface InlineToolDefinition< + Schema extends ToolInputSchema = ToolInputSchema, + Result = unknown, +> { + description: string; + parameters?: Schema; + execute: ToolExecuteFunction, unknown, Result>; + flags?: number; +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type ToolInput = FunctionTool | InlineToolDefinition; + +// --------------------------------------------------------------------------- +// AgentBuilderContext +// --------------------------------------------------------------------------- + +export interface AgentBuilderContext { + /** Configure agent models and pipeline settings. Must be called exactly once. */ + configure(options: AgentTemplateConfigureOptions): void; + + /** Register a pre-built `FunctionTool` (from `llm.tool(...)`). */ + tool

(name: string, toolInput: FunctionTool): void; + /** Register an inline tool with parameters. */ + tool( + name: string, + toolInput: { + description: string; + parameters: ToolInputSchema; + execute: (args: Input, opts: ToolOptions) => Promise; + flags?: number; + }, + ): void; + /** Register an inline tool without parameters. */ + tool( + name: string, + toolInput: { + description: string; + parameters?: never; + execute: ToolExecuteFunction, unknown, Result>; + flags?: number; + }, + ): void; + + /** Register an `onEnter` lifecycle hook. */ + onEnter(callback: () => Promise): void; + /** Register an `onExit` lifecycle hook. */ + onExit(callback: () => Promise): void; + /** Register an `onUserTurnCompleted` lifecycle hook. */ + onUserTurnCompleted( + callback: (chatCtx: ChatContext, newMessage: ChatMessage) => Promise, + ): void; + + /** Override the STT pipeline node (async generator). */ + sttNode(fn: STTNodeFn): void; + /** Override the LLM pipeline node (async generator). */ + llmNode(fn: LLMNodeFn): void; + /** Override the TTS pipeline node (async generator). */ + ttsNode(fn: TTSNodeFn): void; + /** Override the transcription pipeline node (async generator). */ + transcriptionNode(fn: TranscriptionNodeFn): void; + /** Override the realtime audio output pipeline node (async generator). */ + realtimeAudioOutputNode(fn: RealtimeAudioOutputNodeFn): void; + + /** Call the default STT node implementation and return an AsyncIterable. */ + defaultSttNode( + audio: AsyncIterable, + modelSettings: ModelSettings, + ): Promise>; + /** Call the default LLM node implementation and return an AsyncIterable. */ + defaultLlmNode( + chatCtx: ChatContext, + toolCtx: ToolContext, + modelSettings: ModelSettings, + ): Promise>; + /** Call the default TTS node implementation and return an AsyncIterable. */ + defaultTtsNode( + text: AsyncIterable, + modelSettings: ModelSettings, + ): Promise>; + /** Call the default transcription node implementation and return an AsyncIterable. */ + defaultTranscriptionNode( + text: AsyncIterable, + modelSettings: ModelSettings, + ): Promise>; + /** Call the default realtime audio output node implementation and return an AsyncIterable. */ + defaultRealtimeAudioOutputNode( + audio: AsyncIterable, + modelSettings: ModelSettings, + ): Promise>; + + /** + * Runtime getters — available inside hook callbacks and tool execute functions. + * Throws `AgentContextNotReadyError` if accessed during the agent definition callback. + */ + + /** The agent session. */ + readonly session: Throws; + /** The chat context. */ + readonly chatCtx: Throws; + /** The tool context. */ + readonly toolCtx: Throws; + /** The agent ID. */ + readonly id: Throws; + /** The agent instructions. */ + readonly instructions: Throws; + /** The STT model. */ + readonly stt: Throws; + /** The LLM model. */ + readonly llm: Throws; + /** The TTS model. */ + readonly tts: Throws; + /** The VAD model. */ + readonly vad: Throws; + /** The turn handling options. */ + readonly turnHandling: Throws< + Partial | undefined, + AgentContextNotReadyError + >; + /** The minimum consecutive speech delay. */ + readonly minConsecutiveSpeechDelay: Throws; +} + +// --------------------------------------------------------------------------- +// AgentTemplate +// --------------------------------------------------------------------------- + +export type AgentTemplate = ( + ...args: Props extends void ? [] : [props: Props] +) => Agent & { [AGENT_TEMPLATE_ID]: number }; diff --git a/agents/src/utils.test.ts b/agents/src/utils.test.ts index cefe7a5b2..94400d452 100644 --- a/agents/src/utils.test.ts +++ b/agents/src/utils.test.ts @@ -5,7 +5,17 @@ import { AudioFrame } from '@livekit/rtc-node'; import { ReadableStream } from 'node:stream/web'; import { describe, expect, it } from 'vitest'; import { initializeLogger } from '../src/log.js'; -import { Event, Task, TaskResult, dedent, delay, isPending, resampleStream } from '../src/utils.js'; +import { + Event, + Task, + TaskResult, + asyncIterableToReadableStream, + dedent, + delay, + isPending, + readableStreamToAsyncIterable, + resampleStream, +} from '../src/utils.js'; describe('utils', () => { // initialize logger @@ -823,4 +833,302 @@ world expect(outputFrames).toEqual([]); }); }); + + describe('readableStreamToAsyncIterable', () => { + it('should yield all values from a ReadableStream', async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(1); + controller.enqueue(2); + controller.enqueue(3); + controller.close(); + }, + }); + + const result: number[] = []; + for await (const value of readableStreamToAsyncIterable(stream)) { + result.push(value); + } + expect(result).toEqual([1, 2, 3]); + }); + + it('should handle an empty stream', async () => { + const stream = new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + + const result: number[] = []; + for await (const value of readableStreamToAsyncIterable(stream)) { + result.push(value); + } + expect(result).toEqual([]); + }); + + it('should stop when the signal is already aborted', async () => { + const controller = new AbortController(); + controller.abort(); + + const stream = new ReadableStream({ + start(c) { + c.enqueue(1); + c.close(); + }, + }); + + const result: number[] = []; + for await (const value of readableStreamToAsyncIterable(stream, controller.signal)) { + result.push(value); + } + expect(result).toEqual([]); + }); + + it('should stop iteration when signal is aborted mid-stream', async () => { + const ac = new AbortController(); + let enqueueNext: ((v: number) => void) | null = null; + + const stream = new ReadableStream({ + start(controller) { + let n = 0; + enqueueNext = (v: number) => { + n++; + if (n > 10) { + controller.close(); + return; + } + controller.enqueue(v); + }; + }, + }); + + const result: number[] = []; + const iterPromise = (async () => { + for await (const value of readableStreamToAsyncIterable(stream, ac.signal)) { + result.push(value); + } + })(); + + enqueueNext!(1); + enqueueNext!(2); + await delay(10); + ac.abort(); + await iterPromise; + + expect(result).toEqual([1, 2]); + }); + + it('should handle stream errors by propagating them', async () => { + let pullCount = 0; + const stream = new ReadableStream({ + pull(controller) { + pullCount++; + if (pullCount === 1) { + controller.enqueue(1); + } else { + controller.error(new Error('stream broke')); + } + }, + }); + + const result: number[] = []; + await expect(async () => { + for await (const value of readableStreamToAsyncIterable(stream)) { + result.push(value); + } + }).rejects.toThrow('stream broke'); + expect(result).toEqual([1]); + }); + + it('should release the reader lock after iteration completes', async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(42); + controller.close(); + }, + }); + + const result: number[] = []; + for await (const value of readableStreamToAsyncIterable(stream)) { + result.push(value); + } + + const reader = stream.getReader(); + const { done } = await reader.read(); + expect(done).toBe(true); + reader.releaseLock(); + }); + + it('should release the reader lock after abort', async () => { + const ac = new AbortController(); + let enqueue: ((v: number) => void) | null = null; + + const stream = new ReadableStream({ + start(controller) { + enqueue = (v) => controller.enqueue(v); + }, + }); + + const iterPromise = (async () => { + for await (const _ of readableStreamToAsyncIterable(stream, ac.signal)) { + // consume + } + })(); + + enqueue!(1); + await delay(10); + ac.abort(); + await iterPromise; + + const reader = stream.getReader(); + reader.releaseLock(); + }); + }); + + describe('asyncIterableToReadableStream', () => { + it('should produce all values from an async iterable', async () => { + async function* gen() { + yield 'a'; + yield 'b'; + yield 'c'; + } + + const stream = asyncIterableToReadableStream(gen()); + const reader = stream.getReader(); + const result: string[] = []; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + result.push(value); + } + reader.releaseLock(); + expect(result).toEqual(['a', 'b', 'c']); + }); + + it('should handle an empty async iterable', async () => { + async function* gen(): AsyncGenerator { + // yields nothing + } + + const stream = asyncIterableToReadableStream(gen()); + const reader = stream.getReader(); + const { done } = await reader.read(); + expect(done).toBe(true); + reader.releaseLock(); + }); + + it('should run generator finally block when stream is cancelled', async () => { + let finallyCalled = false; + + async function* gen() { + try { + yield 1; + yield 2; + await delay(5000); + yield 3; + } finally { + finallyCalled = true; + } + } + + const stream = asyncIterableToReadableStream(gen()); + const reader = stream.getReader(); + + const { value: first } = await reader.read(); + expect(first).toBe(1); + + await reader.cancel(); + expect(finallyCalled).toBe(true); + }); + + it('should stop yielding after stream cancel', async () => { + let yieldCount = 0; + + async function* gen() { + while (true) { + yieldCount++; + yield yieldCount; + await delay(10); + } + } + + const stream = asyncIterableToReadableStream(gen()); + const reader = stream.getReader(); + + await reader.read(); + await reader.read(); + const countBeforeCancel = yieldCount; + await reader.cancel(); + + await delay(50); + expect(yieldCount).toBe(countBeforeCancel); + }); + + it('should propagate generator errors to the stream reader', async () => { + async function* gen() { + yield 1; + throw new Error('generator failed'); + } + + const stream = asyncIterableToReadableStream(gen()); + const reader = stream.getReader(); + + const { value } = await reader.read(); + expect(value).toBe(1); + + await expect(reader.read()).rejects.toThrow('generator failed'); + }); + + it('round-trip: stream → iterable → stream preserves values', async () => { + const original = new ReadableStream({ + start(controller) { + controller.enqueue('x'); + controller.enqueue('y'); + controller.enqueue('z'); + controller.close(); + }, + }); + + const iterable = readableStreamToAsyncIterable(original); + const rebuilt = asyncIterableToReadableStream(iterable); + + const reader = rebuilt.getReader(); + const result: string[] = []; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + result.push(value); + } + reader.releaseLock(); + expect(result).toEqual(['x', 'y', 'z']); + }); + + it('round-trip: cancel on rebuilt stream stops the original', async () => { + let finallyCalled = false; + + async function* gen() { + try { + let i = 0; + while (true) { + yield i++; + await delay(10); + } + } finally { + finallyCalled = true; + } + } + + const stream = asyncIterableToReadableStream(gen()); + const iterable = readableStreamToAsyncIterable(stream); + const rebuilt = asyncIterableToReadableStream(iterable); + + const reader = rebuilt.getReader(); + await reader.read(); + await reader.read(); + await reader.cancel(); + + await delay(20); + expect(finallyCalled).toBe(true); + }); + }); }); diff --git a/agents/src/utils.ts b/agents/src/utils.ts index 82c623a6c..682e7f022 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -12,10 +12,14 @@ import { AudioFrame, AudioResampler, RoomEvent } from '@livekit/rtc-node'; import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws'; import { AsyncLocalStorage } from 'node:async_hooks'; import { EventEmitter, once } from 'node:events'; -import type { ReadableStream } from 'node:stream/web'; -import { TransformStream, type TransformStreamDefaultController } from 'node:stream/web'; +import { + ReadableStream, + TransformStream, + type TransformStreamDefaultController, +} from 'node:stream/web'; import { v4 as uuidv4 } from 'uuid'; import { log } from './log.js'; +import { isStreamReaderReleaseError } from './stream/deferred_stream.js'; /** * Recursively expands all nested properties of a type, @@ -826,6 +830,92 @@ export function delay(ms: number, options: DelayOptions = {}): Promise { }); } +/** + * Converts a ReadableStream into an AsyncGenerator. When the stream is cancelled, + * closed, or encounters an error, the generator terminates cleanly. Stream-reader + * release errors (from concurrent close / releaseLock) are swallowed so callers + * can simply `for await` without boilerplate try/catch around reader lifecycle. + * + * If an `AbortSignal` is provided, the generator races each read against the abort + * event — equivalent to the manual `Promise.race([reader.read(), abortPromise])` + * pattern that is otherwise repeated across STT/TTS plugins. + */ +export async function* readableStreamToAsyncIterable( + stream: ReadableStream, + signal?: AbortSignal, +): AsyncGenerator { + if (signal?.aborted) return; + const reader = stream.getReader(); + let streamEnded = false; + try { + if (signal) { + const abortPromise = waitForAbort(signal); + while (true) { + const result = await ThrowsPromise.race([reader.read(), abortPromise]); + if (!result) break; + const { done, value } = result; + if (done) { + streamEnded = true; + break; + } + yield value; + } + } else { + while (true) { + const { done, value } = await reader.read(); + if (done) { + streamEnded = true; + break; + } + yield value; + } + } + } catch (e) { + if (isStreamReaderReleaseError(e)) return; + throw e; + } finally { + try { + if (!streamEnded) { + await reader.cancel(); + } + reader.releaseLock(); + } catch { + // stream cleanup errors are expected during concurrent teardown + } + } +} + +/** + * Wraps an AsyncIterable in a ReadableStream. When the ReadableStream is + * cancelled (e.g. by the framework tearing down the pipeline), the backing + * async iterator is properly returned so that `finally` blocks inside the + * generator execute. + */ +export function asyncIterableToReadableStream(iterable: AsyncIterable): ReadableStream { + const iterator = iterable[Symbol.asyncIterator](); + let cancelled = false; + + return new ReadableStream({ + async pull(controller) { + if (cancelled) return; + try { + const { done, value } = await iterator.next(); + if (done || cancelled) { + if (!cancelled) controller.close(); + } else { + controller.enqueue(value); + } + } catch (err) { + controller.error(err); + } + }, + async cancel() { + cancelled = true; + await iterator.return?.(undefined); + }, + }); +} + export class IdleTimeoutError extends Error { constructor(message = 'idle timeout') { super(message); diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index cb2defef4..9ec45d9ed 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -13,7 +13,7 @@ import { trace, } from '@opentelemetry/api'; import type { WritableStreamDefaultWriter } from 'node:stream/web'; -import { ReadableStream } from 'node:stream/web'; +import type { ReadableStream } from 'node:stream/web'; import { isAPIError } from '../_exceptions.js'; import { apiConnectDefaults, intervalForRetry } from '../inference/interruption/defaults.js'; import { InterruptionDetectionError } from '../inference/interruption/errors.js'; diff --git a/agents/src/voice/audio_recognition_handoff.test.ts b/agents/src/voice/audio_recognition_handoff.test.ts index 80a715463..76311ec12 100644 --- a/agents/src/voice/audio_recognition_handoff.test.ts +++ b/agents/src/voice/audio_recognition_handoff.test.ts @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { ReadableStreamDefaultController } from 'node:stream/web'; +import type { ReadableStreamDefaultController } from 'node:stream/web'; import { describe, expect, it, vi } from 'vitest'; import { ChatContext } from '../llm/chat_context.js'; import { initializeLogger } from '../log.js'; diff --git a/examples/src/functional_agent.ts b/examples/src/functional_agent.ts new file mode 100644 index 000000000..bf80c9ab1 --- /dev/null +++ b/examples/src/functional_agent.ts @@ -0,0 +1,119 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { + type JobContext, + type JobProcess, + ServerOptions, + cli, + defineAgent, + inference, + llm, + log, + metrics, + voice, +} from '@livekit/agents'; +import * as livekit from '@livekit/agents-plugin-livekit'; +import * as silero from '@livekit/agents-plugin-silero'; +import { createAgentTemplate } from '@livekit/agents/functional'; +import { fileURLToPath } from 'node:url'; +import { z } from 'zod'; + +// Standalone tool -- reusable across agents +const getWeather = llm.tool({ + description: 'Get the weather for a given location.', + parameters: z.object({ + location: z.string().describe('The location to get the weather for'), + }), + execute: async ({ location }) => { + return `The weather in ${location} is sunny.`; + }, +}); + +type MyAgentProps = { + vad: silero.VAD; +}; + +const MyAgent = createAgentTemplate((ctx, _props) => { + ctx.configure({ + instructions: "You are a helpful assistant, you can hear the user's message and respond to it.", + }); + + // Register a pre-built tool + ctx.tool('getWeather', getWeather); + + // Register an inline tool without parameters + ctx.tool('getTime', { + description: 'Get the current time.', + execute: async () => { + return `The current time is ${new Date().toLocaleTimeString()}.`; + }, + }); + + // Register an inline tool with parameters — types are fully inferred + ctx.tool('lookupCity', { + description: 'Look up information about a city.', + parameters: z.object({ + city: z.string().describe('The city name'), + includePopulation: z.boolean().optional().describe('Whether to include population data'), + }), + execute: async ({ city, includePopulation }) => { + const info = `${city} is a great place to visit.`; + return includePopulation ? `${info} Population: 1,000,000.` : info; + }, + }); + + ctx.onEnter(async () => { + const logger = log(); + logger.info({ agentId: ctx.id }, 'Agent entered'); + }); + + ctx.onUserTurnCompleted(async (_chatCtx, newMessage) => { + const logger = log(); + logger.info({ message: newMessage }, 'User turn completed'); + }); +}); + +export default defineAgent({ + prewarm: async (proc: JobProcess) => { + proc.userData.vad = await silero.VAD.load(); + }, + entry: async (ctx: JobContext<{ vad: silero.VAD }>) => { + const agent = MyAgent({ vad: ctx.proc.userData.vad }); + + const logger = log(); + + const session = new voice.AgentSession({ + vad: ctx.proc.userData.vad, + stt: new inference.STT({ + model: 'deepgram/nova-3', + language: 'en', + }), + llm: new inference.LLM({ model: 'openai/gpt-4.1-mini' }), + tts: new inference.TTS({ + model: 'cartesia/sonic-3', + voice: '9626c31c-bec5-4cca-baa8-f8ba9e84c8bc', + }), + turnHandling: { + turnDetection: new livekit.turnDetector.MultilingualModel(), + }, + }); + + session.on(voice.AgentSessionEventTypes.MetricsCollected, (ev) => { + metrics.logMetrics(ev.metrics); + }); + + ctx.addShutdownCallback(async () => { + logger.info({ usage: session.usage }, 'Session usage summary'); + }); + + await session.start({ + agent, + room: ctx.room, + }); + + session.say('Hello, how can I help you today?'); + }, +}); + +cli.runApp(new ServerOptions({ agent: fileURLToPath(import.meta.url) })); From 7b655cfd86d21a016e9bcb45168730164483c9f7 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 15 Apr 2026 17:18:31 -0700 Subject: [PATCH 3/5] fix typing --- agents/src/functional/create_agent_template.ts | 5 +++-- agents/src/functional/types.ts | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/agents/src/functional/create_agent_template.ts b/agents/src/functional/create_agent_template.ts index d1aa84006..83884489f 100644 --- a/agents/src/functional/create_agent_template.ts +++ b/agents/src/functional/create_agent_template.ts @@ -222,7 +222,7 @@ export function createAgentTemplate( }, tool(name: string, toolInput: ToolInput) { - if (name in state.tools) { + if (Object.hasOwn(state.tools, name)) { throw new Error( `Tool '${name}' is already registered. Each tool must have a unique name.`, ); @@ -413,7 +413,8 @@ export function createAgentTemplate( }) as AgentTemplate; Object.defineProperty(factory, AGENT_TEMPLATE_ID, { value: templateId, writable: false }); - templateRegistry.set(templateId, factory); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + templateRegistry.set(templateId, factory as AgentTemplate); return factory; } diff --git a/agents/src/functional/types.ts b/agents/src/functional/types.ts index 069638550..e7d4a7473 100644 --- a/agents/src/functional/types.ts +++ b/agents/src/functional/types.ts @@ -231,6 +231,6 @@ export interface AgentBuilderContext { // AgentTemplate // --------------------------------------------------------------------------- -export type AgentTemplate = ( +export type AgentTemplate = (( ...args: Props extends void ? [] : [props: Props] -) => Agent & { [AGENT_TEMPLATE_ID]: number }; +) => Agent) & { [AGENT_TEMPLATE_ID]: number }; From e4297c70085b1ba52ee6fb374986c3ce9b43359f Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 15 Apr 2026 17:20:15 -0700 Subject: [PATCH 4/5] Create unlucky-waves-rhyme.md --- .changeset/unlucky-waves-rhyme.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/unlucky-waves-rhyme.md diff --git a/.changeset/unlucky-waves-rhyme.md b/.changeset/unlucky-waves-rhyme.md new file mode 100644 index 000000000..a6f6256e0 --- /dev/null +++ b/.changeset/unlucky-waves-rhyme.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +feat(agents): add createAgentTemplate functional API for agent definition From 8786a6228146dc2f7fba26bea55bdc36e054dd8f Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Wed, 15 Apr 2026 17:22:17 -0700 Subject: [PATCH 5/5] Update functional_agent.ts --- examples/src/functional_agent.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/examples/src/functional_agent.ts b/examples/src/functional_agent.ts index bf80c9ab1..bfdbe25ca 100644 --- a/examples/src/functional_agent.ts +++ b/examples/src/functional_agent.ts @@ -64,8 +64,9 @@ const MyAgent = createAgentTemplate((ctx, _props) => { }); ctx.onEnter(async () => { - const logger = log(); - logger.info({ agentId: ctx.id }, 'Agent entered'); + ctx.session.generateReply({ + userInput: 'Greet the user', + }); }); ctx.onUserTurnCompleted(async (_chatCtx, newMessage) => { @@ -111,8 +112,6 @@ export default defineAgent({ agent, room: ctx.room, }); - - session.say('Hello, how can I help you today?'); }, });