diff --git a/.changeset/blaze-plugin.md b/.changeset/blaze-plugin.md new file mode 100644 index 000000000..bd3915a6c --- /dev/null +++ b/.changeset/blaze-plugin.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents-plugin-blaze": patch +--- + +Fix Blaze plugin review issues and align LLM request route/query params with the Blaze voicebot-call API. \ No newline at end of file diff --git a/plugins/blaze/README.md b/plugins/blaze/README.md new file mode 100644 index 000000000..d691280e3 --- /dev/null +++ b/plugins/blaze/README.md @@ -0,0 +1,43 @@ +# @livekit/agents-plugin-blaze + +LiveKit Agent Framework plugin for Blaze AI services: + +- **STT (Speech-to-Text)**: `POST /v1/stt/transcribe` (batch only) +- **TTS (Text-to-Speech)**: `POST /v1/tts/realtime` (streaming PCM) +- **LLM (Conversational AI)**: `POST /voicebot/{botId}/chat-conversion?stream=true` (SSE streaming) + +## Install + +```bash +npm i @livekit/agents-plugin-blaze +``` + +## Quick start + +```ts +import { STT, TTS, LLM } from '@livekit/agents-plugin-blaze'; + +// Reads BLAZE_* env vars by default +const stt = new STT({ language: 'vi' }); +const tts = new TTS({ speakerId: 'speaker-1' }); +const llm = new LLM({ botId: 'my-chatbot-id' }); +``` + +## Environment variables + +```bash +# Required for authenticated deployments +export BLAZE_API_URL=https://api.blaze.vn +export BLAZE_API_TOKEN=your-bearer-token + +# Optional timeouts +export BLAZE_STT_TIMEOUT=30000 +export BLAZE_TTS_TIMEOUT=60000 +export BLAZE_LLM_TIMEOUT=60000 +``` + +## Notes + +- STT streaming is **not** supported (the plugin throws if `stream()` is called). +- LLM supports SSE streaming; `system/developer` messages are converted into user context as `"[System]: ..."`. + diff --git a/plugins/blaze/api-extractor.json b/plugins/blaze/api-extractor.json new file mode 100644 index 000000000..32c90f0fa --- /dev/null +++ b/plugins/blaze/api-extractor.json @@ -0,0 +1,5 @@ +{ + "$schema": "https://developer.microsoft.com/json-schemas/api-extractor/v7/api-extractor.schema.json", + "extends": "../../api-extractor-shared.json", + "mainEntryPointFilePath": "./dist/index.d.ts" +} diff --git a/plugins/blaze/package.json b/plugins/blaze/package.json new file mode 100644 index 000000000..0c550787a --- /dev/null +++ b/plugins/blaze/package.json @@ -0,0 +1,52 @@ +{ + "name": "@livekit/agents-plugin-blaze", + "version": "0.1.0", + "description": "Blaze AI plugin for LiveKit Node Agents (STT, TTS, LLM)", + "main": "dist/index.js", + "require": "dist/index.cjs", + "types": "dist/index.d.ts", + "exports": { + "import": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + }, + "require": { + "types": "./dist/index.d.cts", + "default": "./dist/index.cjs" + } + }, + "author": "LiveKit", + "type": "module", + "repository": "git@github.com:livekit/agents-js.git", + "license": "Apache-2.0", + "files": [ + "dist", + "src", + "README.md" + ], + "scripts": { + "build": "tsup --onSuccess \"pnpm build:types\"", + "build:types": "tsc --declaration --emitDeclarationOnly && node ../../scripts/copyDeclarationOutput.js", + "clean": "rm -rf dist", + "clean:build": "pnpm clean && pnpm build", + "lint": "eslint -f unix \"src/**/*.{ts,js}\"", + "api:check": "api-extractor run --typescript-compiler-folder ../../node_modules/typescript", + "api:update": "api-extractor run --local --typescript-compiler-folder ../../node_modules/typescript --verbose" + }, + "devDependencies": { + "@livekit/agents": "workspace:*", + "@livekit/agents-plugins-test": "workspace:*", + "@livekit/rtc-node": "catalog:", + "@microsoft/api-extractor": "^7.35.0", + "@types/ws": "^8.5.0", + "tsup": "^8.3.5", + "typescript": "^5.0.0" + }, + "dependencies": { + "ws": "^8.18.0" + }, + "peerDependencies": { + "@livekit/agents": "workspace:*", + "@livekit/rtc-node": "catalog:" + } +} diff --git a/plugins/blaze/src/config.test.ts b/plugins/blaze/src/config.test.ts new file mode 100644 index 000000000..e2b118009 --- /dev/null +++ b/plugins/blaze/src/config.test.ts @@ -0,0 +1,80 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { buildAuthHeaders, resolveConfig } from './config.js'; + +describe('resolveConfig', () => { + beforeEach(() => { + // Clear env vars before each test + delete process.env.BLAZE_API_URL; + delete process.env.BLAZE_API_TOKEN; + delete process.env.BLAZE_STT_TIMEOUT; + delete process.env.BLAZE_TTS_TIMEOUT; + delete process.env.BLAZE_LLM_TIMEOUT; + }); + + afterEach(() => { + delete process.env.BLAZE_API_URL; + delete process.env.BLAZE_API_TOKEN; + }); + + it('uses defaults when no config or env vars provided', () => { + const cfg = resolveConfig(); + expect(cfg.apiUrl).toBe('https://api.blaze.vn'); + expect(cfg.authToken).toBe(''); + expect(cfg.sttTimeout).toBe(30000); + expect(cfg.ttsTimeout).toBe(60000); + expect(cfg.llmTimeout).toBe(60000); + }); + + it('uses env vars when provided', () => { + process.env.BLAZE_API_URL = 'http://api.example.com'; + process.env.BLAZE_API_TOKEN = 'test-token'; + const cfg = resolveConfig(); + expect(cfg.apiUrl).toBe('http://api.example.com'); + expect(cfg.authToken).toBe('test-token'); + }); + + it('config values override env vars', () => { + process.env.BLAZE_API_URL = 'http://env.example.com'; + const cfg = resolveConfig({ apiUrl: 'http://config.example.com' }); + expect(cfg.apiUrl).toBe('http://config.example.com'); + }); + + it('timeout env vars are parsed as numbers', () => { + process.env.BLAZE_STT_TIMEOUT = '15000'; + process.env.BLAZE_TTS_TIMEOUT = '45000'; + const cfg = resolveConfig(); + expect(cfg.sttTimeout).toBe(15000); + expect(cfg.ttsTimeout).toBe(45000); + }); + + it('falls back to default timeout when env var is not a valid number', () => { + process.env.BLAZE_STT_TIMEOUT = 'abc'; + process.env.BLAZE_TTS_TIMEOUT = ''; + process.env.BLAZE_LLM_TIMEOUT = '-500'; + const cfg = resolveConfig(); + expect(cfg.sttTimeout).toBe(30000); // fallback + expect(cfg.ttsTimeout).toBe(60000); // fallback (empty string) + expect(cfg.llmTimeout).toBe(60000); // fallback (negative value) + }); + + it('falls back to default timeout when env var is zero', () => { + process.env.BLAZE_STT_TIMEOUT = '0'; + const cfg = resolveConfig(); + expect(cfg.sttTimeout).toBe(30000); // 0 is not a valid timeout + }); +}); + +describe('buildAuthHeaders', () => { + it('returns empty object when no token', () => { + const headers = buildAuthHeaders(''); + expect(headers).toEqual({}); + }); + + it('returns Authorization header when token provided', () => { + const headers = buildAuthHeaders('my-token'); + expect(headers).toEqual({ Authorization: 'Bearer my-token' }); + }); +}); diff --git a/plugins/blaze/src/config.ts b/plugins/blaze/src/config.ts new file mode 100644 index 000000000..2344a38bc --- /dev/null +++ b/plugins/blaze/src/config.ts @@ -0,0 +1,101 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** + * Blaze Configuration Module + * + * Provides centralized configuration for Blaze AI services. + * All services (STT, TTS, LLM) route through a single gateway URL. + * Service-specific configuration (language, speaker, etc.) comes from the + * voicebot ID and is passed as constructor options to each plugin. + * + * Values are resolved in priority order: + * Explicit options -\> BlazeConfig -\> Environment variables -\> Defaults + * + * Environment Variables (prefix: BLAZE_): + * BLAZE_API_URL - Base URL for all Blaze services + * BLAZE_API_TOKEN - Bearer token for authentication + * BLAZE_STT_TIMEOUT - STT timeout in ms (default: 30000) + * BLAZE_TTS_TIMEOUT - TTS timeout in ms (default: 60000) + * BLAZE_LLM_TIMEOUT - LLM timeout in ms (default: 60000) + */ + +/** Configuration for Blaze AI services. */ +export interface BlazeConfig { + /** Base URL for all Blaze API services. Default: https://api.blaze.vn */ + apiUrl?: string; + /** Bearer token for API authentication. */ + authToken?: string; + /** STT request timeout in milliseconds. Default: 30000 */ + sttTimeout?: number; + /** TTS request timeout in milliseconds. Default: 60000 */ + ttsTimeout?: number; + /** LLM request timeout in milliseconds. Default: 60000 */ + llmTimeout?: number; +} + +/** Resolved configuration with all values populated. */ +export interface ResolvedBlazeConfig { + apiUrl: string; + authToken: string; + sttTimeout: number; + ttsTimeout: number; + llmTimeout: number; +} + +/** Parse a timeout env var, falling back to a default if the value is missing or non-numeric. */ +function parseTimeoutEnv(envVal: string | undefined, defaultMs: number): number { + if (!envVal) return defaultMs; + const n = Number(envVal); + return Number.isFinite(n) && n > 0 ? n : defaultMs; +} + +/** Resolve configuration from options, environment variables, and defaults. */ +export function resolveConfig(config?: BlazeConfig): ResolvedBlazeConfig { + return { + apiUrl: config?.apiUrl ?? process.env['BLAZE_API_URL'] ?? 'https://api.blaze.vn', + authToken: config?.authToken ?? process.env['BLAZE_API_TOKEN'] ?? '', + sttTimeout: config?.sttTimeout ?? parseTimeoutEnv(process.env['BLAZE_STT_TIMEOUT'], 30000), + ttsTimeout: config?.ttsTimeout ?? parseTimeoutEnv(process.env['BLAZE_TTS_TIMEOUT'], 60000), + llmTimeout: config?.llmTimeout ?? parseTimeoutEnv(process.env['BLAZE_LLM_TIMEOUT'], 60000), + }; +} + +/** Build Authorization header value if token is provided. */ +export function buildAuthHeaders(authToken: string): Record { + if (!authToken) return {}; + return { Authorization: `Bearer ${authToken}` }; +} + +/** Maximum number of retry attempts for transient failures. */ +export const MAX_RETRY_COUNT = 3; + +/** Base delay in milliseconds for exponential backoff. */ +export const RETRY_BASE_DELAY_MS = 2000; + +/** Sleep for the given number of milliseconds. */ +export function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * Error thrown for non-retryable HTTP errors (4xx client errors). + * `isRetryableError` returns false for this type, preventing pointless retries. + */ +export class BlazeHttpError extends Error { + readonly status: number; + constructor(status: number, message: string) { + super(message); + this.name = 'BlazeHttpError'; + this.status = status; + } +} + +/** Check if an error is retryable (not an intentional abort or client error). */ +export function isRetryableError(err: unknown): boolean { + if (err instanceof DOMException && err.name === 'AbortError') return false; + // 4xx client errors are deterministic failures — retrying won't help + if (err instanceof BlazeHttpError && err.status < 500) return false; + return true; +} diff --git a/plugins/blaze/src/index.ts b/plugins/blaze/src/index.ts new file mode 100644 index 000000000..dd505bdd9 --- /dev/null +++ b/plugins/blaze/src/index.ts @@ -0,0 +1,59 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** + * `@livekit/agents-plugin-blaze` + * + * LiveKit Agent Framework plugin for Blaze AI services (STT, TTS, LLM). + * + * @example + * ```typescript + * import { STT, TTS, LLM } from '@livekit/agents-plugin-blaze'; + * + * // Create plugins (reads BLAZE_* env vars automatically) + * const stt = new STT({ language: 'vi' }); + * const tts = new TTS({ speakerId: 'speaker-1' }); + * const llm = new LLM({ botId: 'my-chatbot' }); + * + * // Or with shared configuration + * import type { BlazeConfig } from '@livekit/agents-plugin-blaze'; + * const config: BlazeConfig = { apiUrl: 'http://gateway:8080', authToken: 'tok' }; + * const stt2 = new STT({ config, language: 'vi' }); + * ``` + */ +import { Plugin } from '@livekit/agents'; + +export { STT } from './stt.js'; +export type { STTOptions } from './stt.js'; + +export { TTS, ChunkedStream, SynthesizeStream } from './tts.js'; +export type { TTSOptions } from './tts.js'; + +export { LLM, LLMStream } from './llm.js'; +export type { LLMOptions, BlazeDemographics } from './llm.js'; + +export type { BlazeConfig } from './config.js'; + +export type { + BlazeTTSModel, + BlazeLanguage, + BlazeAudioFormat, + BlazeGender, + BlazeDemographics as BlazeDemographicsModel, + BlazeSTTResponse, + BlazeChatMessage, + BlazeLLMData, +} from './models.js'; + +class BlazePlugin extends Plugin { + constructor() { + super({ + title: 'Blaze', + version: '0.1.0', + package: '@livekit/agents-plugin-blaze', + }); + } +} + +Plugin.registerPlugin(new BlazePlugin()); diff --git a/plugins/blaze/src/llm.test.ts b/plugins/blaze/src/llm.test.ts new file mode 100644 index 000000000..a4f8ffa5b --- /dev/null +++ b/plugins/blaze/src/llm.test.ts @@ -0,0 +1,463 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import { initializeLogger } from '../../agents/src/log.js'; +import { LLM } from './llm.js'; + +// LLMStream base class initializes a logger on construction. +// Without this call all chat() calls throw "logger not initialized". +beforeAll(() => { + initializeLogger({ pretty: false, level: 'silent' }); +}); + +/** Create a minimal ChatContext mock for testing. */ +function makeChatCtx(messages: Array<{ role: string; text: string }>) { + return { + items: messages.map((m) => ({ + role: m.role, + textContent: m.text, + type: 'message', + })), + }; +} + +/** Build an SSE response body from an array of string chunks. */ +function makeSseBody(chunks: string[], format: 'sse' | 'raw' = 'sse'): ReadableStream { + const encoder = new TextEncoder(); + return new ReadableStream({ + start(controller) { + for (const chunk of chunks) { + let line: string; + if (format === 'sse') { + line = `data: ${JSON.stringify({ content: chunk })}\n\n`; + } else { + line = `${JSON.stringify({ content: chunk })}\n`; + } + controller.enqueue(encoder.encode(line)); + } + controller.enqueue(encoder.encode('data: [DONE]\n\n')); + controller.close(); + }, + }); +} + +describe('LLM', () => { + it('throws when botId is not provided', () => { + // @ts-expect-error Testing invalid usage + expect(() => new LLM({ apiUrl: 'http://llm:8080' })).toThrow('botId is required'); + }); + + it('creates with valid botId', () => { + const llmInstance = new LLM({ botId: 'test-bot', authToken: 'tok', apiUrl: 'http://llm:8080' }); + expect(llmInstance.label()).toBe('blaze.LLM'); + }); + + it('updateOptions does not throw', () => { + const llmInstance = new LLM({ botId: 'test-bot', authToken: 'tok', apiUrl: 'http://llm:8080' }); + expect(() => + llmInstance.updateOptions({ deepSearch: true, agenticSearch: true }), + ).not.toThrow(); + }); + + it('updateOptions applies apiUrl to subsequent requests', async () => { + const fetchMock = vi.fn().mockResolvedValue({ + ok: true, + body: makeSseBody(['ok']), + }); + vi.stubGlobal('fetch', fetchMock); + + const llmInstance = new LLM({ botId: 'bot', authToken: 'tok', apiUrl: 'http://old-url:8080' }); + llmInstance.updateOptions({ apiUrl: 'http://new-url:9090' }); + + const ctx = makeChatCtx([{ role: 'user', text: 'hi' }]); + const stream = llmInstance.chat({ chatCtx: ctx as never }); + llmInstance.on('error', () => {}); + for await (const _ of stream) { /* consume */ } + + const [url] = fetchMock.mock.calls[0] as [string]; + expect(url).toContain('http://new-url:9090'); + expect(url).not.toContain('old-url'); + + vi.unstubAllGlobals(); + }); + + describe('chat() streaming', () => { + let fetchMock: ReturnType; + + beforeEach(() => { + fetchMock = vi.fn(); + vi.stubGlobal('fetch', fetchMock); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it('sends correct request to chat endpoint', async () => { + fetchMock.mockResolvedValue({ + ok: true, + body: makeSseBody(['Hello', ' world']), + }); + + const llmInstance = new LLM({ + botId: 'my-bot', + authToken: 'test-token', + apiUrl: 'http://llm:8080', + }); + const ctx = makeChatCtx([{ role: 'user', text: 'Hi' }]); + + const stream = llmInstance.chat({ chatCtx: ctx as never }); + const chunks = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + + expect(fetchMock).toHaveBeenCalledOnce(); + const [url, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + expect(url).toContain('/v1/voicebot-call/my-bot/chat-conversion-stream'); + expect(url).toContain('is_voice_call=true'); + expect(url).toContain('use_tool_based=false'); + expect(init.method).toBe('POST'); + expect(init.headers).toMatchObject({ + 'Content-Type': 'application/json', + Authorization: 'Bearer test-token', + }); + + const body = JSON.parse(init.body as string) as Array<{ role: string; content: string }>; + expect(body).toEqual([{ role: 'user', content: 'Hi' }]); + }); + + it('yields content chunks from SSE stream', async () => { + fetchMock.mockResolvedValue({ + ok: true, + body: makeSseBody(['Xin ', 'chào', '!']), + }); + + const llmInstance = new LLM({ botId: 'bot', authToken: 'tok', apiUrl: 'http://llm:8080' }); + const ctx = makeChatCtx([{ role: 'user', text: 'Chào' }]); + + const stream = llmInstance.chat({ chatCtx: ctx as never }); + const texts: string[] = []; + for await (const chunk of stream) { + if (chunk.delta?.content) texts.push(chunk.delta.content); + } + + expect(texts).toEqual(['Xin ', 'chào', '!']); + }); + + it('handles alternative SSE data format (text field)', async () => { + const encoder = new TextEncoder(); + const body = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('data: {"text": "hello"}\n\n')); + controller.enqueue(encoder.encode('data: [DONE]\n\n')); + controller.close(); + }, + }); + + fetchMock.mockResolvedValue({ ok: true, body }); + + const llmInstance = new LLM({ botId: 'bot', authToken: 'tok', apiUrl: 'http://llm:8080' }); + const ctx = makeChatCtx([{ role: 'user', text: 'test' }]); + + const stream = llmInstance.chat({ chatCtx: ctx as never }); + const texts: string[] = []; + for await (const chunk of stream) { + if (chunk.delta?.content) texts.push(chunk.delta.content); + } + + expect(texts).toEqual(['hello']); + }); + + it('handles delta.text SSE format', async () => { + const encoder = new TextEncoder(); + const body = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('data: {"delta": {"text": "world"}}\n\n')); + controller.enqueue(encoder.encode('data: [DONE]\n\n')); + controller.close(); + }, + }); + + fetchMock.mockResolvedValue({ ok: true, body }); + + const llmInstance = new LLM({ botId: 'bot', authToken: 'tok', apiUrl: 'http://llm:8080' }); + const ctx = makeChatCtx([{ role: 'user', text: 'test' }]); + + const stream = llmInstance.chat({ chatCtx: ctx as never }); + const texts: string[] = []; + for await (const chunk of stream) { + if (chunk.delta?.content) texts.push(chunk.delta.content); + } + + expect(texts).toEqual(['world']); + }); + + it('emits final usage chunk', async () => { + fetchMock.mockResolvedValue({ + ok: true, + body: makeSseBody(['hi']), + }); + + const llmInstance = new LLM({ botId: 'bot', authToken: 'tok', apiUrl: 'http://llm:8080' }); + const ctx = makeChatCtx([{ role: 'user', text: 'test' }]); + + const stream = llmInstance.chat({ chatCtx: ctx as never }); + const allChunks = []; + for await (const chunk of stream) allChunks.push(chunk); + + const usageChunk = allChunks.find((c) => c.usage !== undefined); + expect(usageChunk).toBeDefined(); + expect(usageChunk?.usage?.completionTokens).toBeGreaterThan(0); + }); + + it('includes deepSearch and agenticSearch query params when enabled', async () => { + fetchMock.mockResolvedValue({ + ok: true, + body: makeSseBody(['ok']), + }); + + const llmInstance = new LLM({ + botId: 'bot', + authToken: 'tok', + apiUrl: 'http://llm:8080', + deepSearch: true, + agenticSearch: true, + demographics: { gender: 'female', age: 30 }, + }); + const ctx = makeChatCtx([{ role: 'user', text: 'search' }]); + + const stream = llmInstance.chat({ chatCtx: ctx as never }); + for await (const _ of stream) { + /* consume */ + } + + const firstCall = fetchMock.mock.calls[0]; + expect(firstCall).toBeDefined(); + const url = firstCall![0] as string; + expect(url).toContain('deep_search=true'); + expect(url).toContain('agentic_search=true'); + expect(url).toContain('gender=female'); + expect(url).toContain('age=30'); + }); + + it('converts system role messages to user context', async () => { + fetchMock.mockResolvedValue({ + ok: true, + body: makeSseBody(['ok']), + }); + + const llmInstance = new LLM({ botId: 'bot', authToken: 'tok', apiUrl: 'http://llm:8080' }); + const ctx = makeChatCtx([ + { role: 'system', text: 'You are a helpful assistant.' }, + { role: 'user', text: 'Hello' }, + ]); + + const stream = llmInstance.chat({ chatCtx: ctx as never }); + for await (const _ of stream) { + /* consume */ + } + + const firstCall = fetchMock.mock.calls[0]; + expect(firstCall).toBeDefined(); + const body = JSON.parse((firstCall![1] as RequestInit).body as string) as Array<{ + role: string; + content: string; + }>; + // System messages are SKIPPED — Blaze chatapp loads the prompt from DB. + // Only the user message should appear. + expect(body).toHaveLength(1); + expect(body[0]).toEqual({ role: 'user', content: 'Hello' }); + }); + it('merges system/developer messages into one', async () => { + fetchMock.mockResolvedValue({ + ok: true, + body: makeSseBody(['ok']), + }); + + const llmInstance = new LLM({ botId: 'bot', authToken: 'tok', apiUrl: 'http://llm:8080' }); + const ctx = makeChatCtx([ + { role: 'system', text: 'You are a helpful assistant.' }, + { role: 'user', text: 'Hello' }, + { role: 'developer', text: 'Be concise.' }, + ]); + + const stream = llmInstance.chat({ chatCtx: ctx as never }); + for await (const _ of stream) { + /* consume */ + } + + const firstCall = fetchMock.mock.calls[0]; + expect(firstCall).toBeDefined(); + const body = JSON.parse((firstCall![1] as RequestInit).body as string) as Array<{ + role: string; + content: string; + }>; + // system & developer messages are both SKIPPED — only the user message is sent. + expect(body).toHaveLength(1); + expect(body[0]).toEqual({ role: 'user', content: 'Hello' }); + }); + it('handles raw JSON lines (non-SSE fallback format)', async () => { + const encoder = new TextEncoder(); + const body = new ReadableStream({ + start(controller) { + // Raw JSON without "data: " prefix + controller.enqueue(encoder.encode('{"content": "raw"}\n')); + controller.enqueue(encoder.encode('{"content": " json"}\n')); + controller.enqueue(encoder.encode('data: [DONE]\n\n')); + controller.close(); + }, + }); + + fetchMock.mockResolvedValue({ ok: true, body }); + + const llmInstance = new LLM({ botId: 'bot', authToken: 'tok', apiUrl: 'http://llm:8080' }); + const ctx = makeChatCtx([{ role: 'user', text: 'test' }]); + + const stream = llmInstance.chat({ chatCtx: ctx as never }); + const texts: string[] = []; + for await (const chunk of stream) { + if (chunk.delta?.content) texts.push(chunk.delta.content); + } + + expect(texts).toEqual(['raw', ' json']); + }); + + it('stops parsing after [DONE] even when data arrives in same chunk', async () => { + const encoder = new TextEncoder(); + const body = new ReadableStream({ + start(controller) { + // [DONE] and a spurious data line arrive in the same chunk + controller.enqueue( + encoder.encode( + 'data: {"content": "valid"}\n\ndata: [DONE]\n\ndata: {"content": "after-done"}\n\n', + ), + ); + controller.close(); + }, + }); + + fetchMock.mockResolvedValue({ ok: true, body }); + + const llmInstance = new LLM({ botId: 'bot', authToken: 'tok', apiUrl: 'http://llm:8080' }); + const ctx = makeChatCtx([{ role: 'user', text: 'test' }]); + + const stream = llmInstance.chat({ chatCtx: ctx as never }); + const texts: string[] = []; + for await (const chunk of stream) { + if (chunk.delta?.content) texts.push(chunk.delta.content); + } + + // 'after-done' must NOT appear — parser must stop at [DONE] + expect(texts).toEqual(['valid']); + }); + + it('sends request even when server returns an error status', async () => { + // Verify the request is correctly formed; error should propagate via event. + fetchMock.mockResolvedValue({ + ok: false, + status: 429, + text: async () => 'Rate Limited', + }); + + const llmInstance = new LLM({ botId: 'bot', authToken: 'tok', apiUrl: 'http://llm:8080' }); + const ctx = makeChatCtx([{ role: 'user', text: 'hi' }]); + const stream = llmInstance.chat({ chatCtx: ctx as never }); + + // The base class _mainTaskImpl emits errors on the LLM instance, then + // rethrows. The rethrow propagates as an unhandled rejection from the + // fire-and-forget startSoon task — suppress it for test isolation. + const suppress = () => {}; + process.on('unhandledRejection', suppress); + + let capturedError: Error | undefined; + llmInstance.on('error', ({ error }: { error: Error }) => { + capturedError = error; + }); + + // Drain the stream — iterator ends normally; errors propagate via event. + for await (const _ of stream) { /* consume */ } + + // Flush pending microtasks so the rejection fires while our handler is active. + await new Promise((r) => setTimeout(r, 0)); + process.off('unhandledRejection', suppress); + + expect(capturedError?.message).toContain('429'); + expect(fetchMock).toHaveBeenCalledOnce(); + const [url] = fetchMock.mock.calls[0] as [string]; + expect(url).toContain('/v1/voicebot-call/bot/chat-conversion-stream'); + }); + + it('captures options at chat creation time', async () => { + fetchMock.mockResolvedValue({ + ok: true, + body: makeSseBody(['ok']), + }); + + const llmInstance = new LLM({ + botId: 'bot', + authToken: 'old-token', + apiUrl: 'http://llm:8080', + deepSearch: true, + demographics: { gender: 'female', age: 30 }, + }); + const ctx = makeChatCtx([{ role: 'user', text: 'hi' }]); + + const stream = llmInstance.chat({ chatCtx: ctx as never }); + llmInstance.updateOptions({ + authToken: 'new-token', + deepSearch: false, + demographics: { gender: 'male', age: 99 }, + }); + + for await (const _ of stream) { + /* consume */ + } + + const [url, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + expect(url).toContain('deep_search=true'); + expect(url).toContain('gender=female'); + expect(url).toContain('age=30'); + expect(url).not.toContain('gender=male'); + expect(init.headers).toMatchObject({ Authorization: 'Bearer old-token' }); + }); + + it('sends use_tool_based=true when enableTools is set', async () => { + fetchMock.mockResolvedValue({ + ok: true, + body: makeSseBody(['ok']), + }); + + const llmInstance = new LLM({ + botId: 'bot', + authToken: 'tok', + apiUrl: 'http://llm:8080', + enableTools: true, + }); + const ctx = makeChatCtx([{ role: 'user', text: 'test' }]); + + const stream = llmInstance.chat({ chatCtx: ctx as never }); + for await (const _ of stream) { /* consume */ } + + const [url] = fetchMock.mock.calls[0] as [string]; + expect(url).toContain('use_tool_based=true'); + }); + + it('sends use_tool_based=false by default', async () => { + fetchMock.mockResolvedValue({ + ok: true, + body: makeSseBody(['ok']), + }); + + const llmInstance = new LLM({ botId: 'bot', authToken: 'tok', apiUrl: 'http://llm:8080' }); + const ctx = makeChatCtx([{ role: 'user', text: 'test' }]); + + const stream = llmInstance.chat({ chatCtx: ctx as never }); + for await (const _ of stream) { /* consume */ } + + const [url] = fetchMock.mock.calls[0] as [string]; + expect(url).toContain('use_tool_based=false'); + }); + }); +}); diff --git a/plugins/blaze/src/llm.ts b/plugins/blaze/src/llm.ts new file mode 100644 index 000000000..e14067f1b --- /dev/null +++ b/plugins/blaze/src/llm.ts @@ -0,0 +1,375 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** + * Blaze LLM Plugin for LiveKit Voice Agent (Node.js) + * + * LLM plugin interfacing with Blaze chatbot service. + * + * API Endpoint: POST `/v1/voicebot-call/{botId}/chat-conversion-stream` + * Input: JSON array of `{ role, content }` messages + * Output: SSE stream: `data: {"content": "..."}` then `data: [DONE]` + */ +import { randomUUID } from 'node:crypto'; +import { + DEFAULT_API_CONNECT_OPTIONS, + llm, + APIError, + APIStatusError, + APIConnectionError, +} from '@livekit/agents'; +import type { APIConnectOptions } from '@livekit/agents'; +import { + type BlazeConfig, + type ResolvedBlazeConfig, + buildAuthHeaders, + resolveConfig, +} from './config.js'; +import type { BlazeChatMessage, BlazeLLMData } from './models.js'; + +// ChatContext and ChatMessage are in the llm namespace +type ChatContext = llm.ChatContext; +type ChatMessage = llm.ChatMessage; + +/** Demographics for personalization. */ +export interface BlazeDemographics { + gender?: 'male' | 'female' | 'unknown'; + age?: number; +} + +/** Options for the Blaze LLM plugin. */ +export interface LLMOptions { + /** Blaze chatbot identifier (required). */ + botId: string; + /** + * Base URL for the LLM service. + * Falls back to config.apiUrl → BLAZE_API_URL env var. + */ + apiUrl?: string; + /** Bearer token for authentication. Falls back to BLAZE_API_TOKEN env var. */ + authToken?: string; + /** Enable deep search mode. Default: false */ + deepSearch?: boolean; + /** Enable agentic search mode. Default: false */ + agenticSearch?: boolean; + /** + * Enable tool/function calling (`use_tool_based` query param). + * When false the Blaze backend uses a simpler response path. Default: false + */ + enableTools?: boolean; + /** User demographics for personalization. */ + demographics?: BlazeDemographics; + /** Request timeout in milliseconds. Default: 60000 */ + timeout?: number; + /** Centralized configuration object. */ + config?: BlazeConfig; +} + +interface ResolvedLLMOptions { + botId: string; + apiUrl: string; + authToken: string; + deepSearch: boolean; + agenticSearch: boolean; + enableTools: boolean; + demographics?: BlazeDemographics; + timeout: number; +} + +function snapshotLLMOptions(opts: ResolvedLLMOptions): ResolvedLLMOptions { + return { + ...opts, + demographics: opts.demographics ? { ...opts.demographics } : undefined, + }; +} + +function resolveLLMOptions(opts: LLMOptions): ResolvedLLMOptions { + if (!opts.botId) { + throw new Error('Blaze LLM: botId is required'); + } + const cfg: ResolvedBlazeConfig = resolveConfig(opts.config); + return { + botId: opts.botId, + apiUrl: opts.apiUrl ?? cfg.apiUrl, + authToken: opts.authToken ?? cfg.authToken, + deepSearch: opts.deepSearch ?? false, + agenticSearch: opts.agenticSearch ?? false, + enableTools: opts.enableTools ?? false, + demographics: opts.demographics, + timeout: opts.timeout ?? cfg.llmTimeout, + }; +} + +/** + * Convert ChatContext items to Blaze API message format. + * Only processes ChatMessage items (skips FunctionCall, FunctionCallOutput, etc.) + * + * System/developer messages are SKIPPED because the Blaze chatapp already + * loads the voicebot prompt from the database and applies voice/chat mode + * extraction. Sending them again would cause double-prompting (2x tokens) + * and format conflicts (chat-mode template leaking into voice responses). + */ +function convertMessages(chatCtx: ChatContext): BlazeChatMessage[] { + const messages: BlazeChatMessage[] = []; + + for (const item of chatCtx.items) { + // Only process ChatMessage items (type guard) + if (!('role' in item) || !('textContent' in item)) continue; + const msg = item as ChatMessage; + const text = msg.textContent; + if (!text) continue; + + const role = msg.role; + // Skip system/developer — chatapp loads prompt from DB + if (role === 'system' || role === 'developer') { + continue; + } else if (role === 'user') { + messages.push({ role: 'user', content: text }); + } else if (role === 'assistant') { + // Strip tags — only meaningful for TTS/rendering, not for LLM context + const clean = text.replace(/[^<]*<\/img>/gi, '').trim(); + if (clean) { + messages.push({ role: 'assistant', content: clean }); + } + } + } + + return messages; +} + +/** + * Extract text content from SSE data in various formats. + */ +function extractContent(data: Record): string | null { + if (typeof data.content === 'string') return data.content; + if (typeof data.text === 'string') return data.text; + if (data.delta && typeof (data.delta as Record).text === 'string') { + return (data.delta as Record).text as string; + } + return null; +} + +/** + * Blaze LLM Stream - async iterator that yields ChatChunk from SSE response. + * + * Includes retry logic with exponential backoff for transient failures. + */ +export class BlazeLLMStream extends llm.LLMStream { + label = 'blaze.LLMStream'; + readonly #opts: ResolvedLLMOptions; + + constructor( + llmInstance: BlazeLLM, + opts: ResolvedLLMOptions, + chatCtx: ChatContext, + connOptions: APIConnectOptions, + ) { + super(llmInstance, { chatCtx, connOptions }); + this.#opts = opts; + } + + protected async run(): Promise { + const requestId = randomUUID(); + const messages = convertMessages(this.chatCtx); + + // Build URL with query params + const url = new URL( + `${this.#opts.apiUrl}/v1/voicebot-call/${this.#opts.botId}/chat-conversion-stream`, + ); + url.searchParams.set('is_voice_call', 'true'); + url.searchParams.set('agent_stream', 'true'); + url.searchParams.set('use_tool_based', this.#opts.enableTools ? 'true' : 'false'); + if (this.#opts.deepSearch) url.searchParams.set('deep_search', 'true'); + if (this.#opts.agenticSearch) url.searchParams.set('agentic_search', 'true'); + if (this.#opts.demographics?.gender) + url.searchParams.set('gender', this.#opts.demographics.gender); + if (this.#opts.demographics?.age !== undefined) { + url.searchParams.set('age', String(this.#opts.demographics.age)); + } + + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), this.#opts.timeout); + const signal = AbortSignal.any([this.abortController.signal, controller.signal]); + + try { + const response = await fetch(url.toString(), { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...buildAuthHeaders(this.#opts.authToken), + }, + body: JSON.stringify(messages), + signal, + }); + + if (!response.ok) { + const errorText = await response.text().catch(() => 'unknown error'); + throw new APIStatusError({ + message: `Blaze LLM error ${response.status}: ${errorText}`, + options: { statusCode: response.status }, + }); + } + + if (!response.body) { + throw new APIConnectionError({ message: 'Blaze LLM: response body is null' }); + } + + // Parse SSE stream + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let lineBuffer = ''; + let completionTokens = 0; + let streamDone = false; + + try { + while (!streamDone) { + const { done, value } = await reader.read(); + if (done) break; + if (signal.aborted) break; + + lineBuffer += decoder.decode(value, { stream: true }); + + // Process all complete lines + const lines = lineBuffer.split('\n'); + lineBuffer = lines.pop() ?? ''; + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + + let rawData: string; + + if (trimmed.startsWith('data: ')) { + rawData = trimmed.slice(6); + } else { + // Raw JSON line (non-SSE format fallback) + rawData = trimmed; + } + + if (rawData === '[DONE]') { + streamDone = true; + break; + } + + let parsed: Record; + try { + parsed = JSON.parse(rawData) as Record; + } catch { + // Skip non-JSON lines (comments, keep-alives, etc.) + continue; + } + + const content = extractContent( + parsed as BlazeLLMData as unknown as Record, + ); + if (content) { + completionTokens++; + this.queue.put({ + id: requestId, + delta: { + role: 'assistant', + content, + }, + }); + } + } + } + } finally { + reader.releaseLock(); + } + + // Emit final chunk with usage stats (approximate) + this.queue.put({ + id: requestId, + usage: { + completionTokens, + promptTokens: 0, + promptCachedTokens: 0, + totalTokens: completionTokens, + }, + }); + } catch (err) { + if (err instanceof APIError) throw err; + if (err instanceof DOMException && err.name === 'AbortError') { + throw new APIConnectionError({ message: `Blaze LLM request aborted: ${err.message}` }); + } + throw new APIConnectionError({ + message: `Blaze LLM connection error: ${err instanceof Error ? err.message : String(err)}`, + }); + } finally { + clearTimeout(timeoutId); + } + } + + // Required abstract method from base class + get label_(): string { + return 'blaze.LLMStream'; + } +} + +/** + * Blaze LLM Plugin. + * + * Interfaces with the Blaze chatbot service for conversational AI. + * Supports SSE streaming for low-latency responses. + * + * @example + * ```typescript + * import { LLM } from '@livekit/agents-plugin-blaze'; + * + * const llm = new LLM({ botId: 'my-chatbot' }); + * // Or with shared config: + * const llm = new LLM({ + * botId: 'my-chatbot', + * config: { apiUrl: 'https://api.blaze.vn', authToken: 'tok' } + * }); + * ``` + */ +export class BlazeLLM extends llm.LLM { + #opts: ResolvedLLMOptions; + + constructor(opts: LLMOptions) { + super(); + this.#opts = resolveLLMOptions(opts); + } + + label(): string { + return 'blaze.LLM'; + } + + /** + * Update LLM options at runtime. + */ + updateOptions(opts: Partial>): void { + if (opts.apiUrl !== undefined) this.#opts.apiUrl = opts.apiUrl; + if (opts.authToken !== undefined) this.#opts.authToken = opts.authToken; + if (opts.deepSearch !== undefined) this.#opts.deepSearch = opts.deepSearch; + if (opts.agenticSearch !== undefined) this.#opts.agenticSearch = opts.agenticSearch; + if (opts.enableTools !== undefined) this.#opts.enableTools = opts.enableTools; + if (opts.demographics !== undefined) this.#opts.demographics = opts.demographics; + if (opts.timeout !== undefined) this.#opts.timeout = opts.timeout; + } + + chat({ + chatCtx, + connOptions, + }: { + chatCtx: ChatContext; + toolCtx?: unknown; + connOptions?: APIConnectOptions; + parallelToolCalls?: boolean; + toolChoice?: unknown; + extraKwargs?: Record; + }): BlazeLLMStream { + return new BlazeLLMStream( + this, + snapshotLLMOptions(this.#opts), + chatCtx, + connOptions ?? DEFAULT_API_CONNECT_OPTIONS, + ); + } +} + +// Export with conventional names +export { BlazeLLM as LLM, BlazeLLMStream as LLMStream }; diff --git a/plugins/blaze/src/models.ts b/plugins/blaze/src/models.ts new file mode 100644 index 000000000..36dfec368 --- /dev/null +++ b/plugins/blaze/src/models.ts @@ -0,0 +1,51 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** + * Type definitions for Blaze AI models and options. + */ + +/** Available TTS model identifiers. */ +export type BlazeTTSModel = 'v1_5_pro' | 'v2_pro' | string; // Allow custom model names + +/** Supported language codes. */ +export type BlazeLanguage = + | 'vi' // Vietnamese (default) + | 'en' // English + | 'zh' // Chinese + | 'ja' // Japanese + | 'ko' // Korean + | string; // Allow any IETF language tag + +/** Audio format for TTS output. */ +export type BlazeAudioFormat = 'pcm' | 'mp3' | 'wav'; + +/** Gender values for demographics. */ +export type BlazeGender = 'male' | 'female' | 'unknown'; + +/** User demographics for personalization. */ +export interface BlazeDemographics { + gender?: BlazeGender; + age?: number; +} + +/** Blaze STT API response. */ +export interface BlazeSTTResponse { + transcription: string; + confidence: number; + is_final?: boolean; + language?: string; +} + +/** Blaze chatbot message format. */ +export interface BlazeChatMessage { + role: 'user' | 'assistant'; + content: string; +} + +/** Blaze LLM SSE data formats. */ +export type BlazeLLMData = + | { content: string } // Format 1: primary + | { text: string } // Format 2: fallback + | { delta: { text: string } }; // Format 3: delta diff --git a/plugins/blaze/src/stt.test.ts b/plugins/blaze/src/stt.test.ts new file mode 100644 index 000000000..938fdd10e --- /dev/null +++ b/plugins/blaze/src/stt.test.ts @@ -0,0 +1,390 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { STT } from './stt.js'; + +type AnyFn = (...args: unknown[]) => unknown; +type STTWithRecognize = STT & { _recognize: AnyFn }; + +/** Create a minimal PCM frame mock. */ +function makePcmFrame(samples = 160, sampleRate = 16000, channels = 1) { + return { + data: new Int16Array(samples), + sampleRate, + channels, + samplesPerChannel: samples, + }; +} + +describe('STT', () => { + it('has correct label', () => { + const sttInstance = new STT({ authToken: 'test', apiUrl: 'http://stt:8080' }); + expect(sttInstance.label).toBe('blaze.STT'); + }); + + it('throws when stream() is called', () => { + const sttInstance = new STT({ authToken: 'test', apiUrl: 'http://stt:8080' }); + expect(() => sttInstance.stream()).toThrow('Blaze STT does not support streaming recognition'); + }); + + it('updateOptions changes language without throwing', () => { + const sttInstance = new STT({ authToken: 'test', apiUrl: 'http://stt:8080', language: 'vi' }); + expect(() => sttInstance.updateOptions({ language: 'en' })).not.toThrow(); + }); + + it('updateOptions applies apiUrl to subsequent requests', async () => { + const fetchMock = vi.fn().mockResolvedValue({ + ok: true, + json: async () => ({ transcription: 'hello', confidence: 0.9 }), + }); + vi.stubGlobal('fetch', fetchMock); + + const sttInstance = new STT({ + authToken: 'tok', + apiUrl: 'http://old-url:8080', + }) as STTWithRecognize; + sttInstance.updateOptions({ apiUrl: 'http://new-url:9090' }); + + const frame = makePcmFrame(); + await sttInstance._recognize([frame]); + + const [url] = fetchMock.mock.calls[0] as [string]; + expect(url).toContain('http://new-url:9090'); + expect(url).not.toContain('old-url'); + + vi.unstubAllGlobals(); + }); + + describe('_recognize with mocked fetch', () => { + let fetchMock: ReturnType; + + beforeEach(() => { + fetchMock = vi.fn(); + vi.stubGlobal('fetch', fetchMock); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it('sends correct request to transcribe endpoint', async () => { + fetchMock.mockResolvedValue({ + ok: true, + json: async () => ({ transcription: 'hello world', confidence: 0.95 }), + }); + + const sttInstance = new STT({ + authToken: 'test-token', + apiUrl: 'http://stt:8080', + language: 'vi', + }) as STTWithRecognize; + const frame = makePcmFrame(); + await sttInstance._recognize([frame]); + + expect(fetchMock).toHaveBeenCalledOnce(); + const [url, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + expect(url).toContain('/v1/stt/transcribe'); + expect(url).toContain('language=vi'); + expect(url).toContain('enable_segments=false'); + expect(init.method).toBe('POST'); + expect(init.headers).toMatchObject({ Authorization: 'Bearer test-token' }); + }); + + it('returns FINAL_TRANSCRIPT with transcription text', async () => { + fetchMock.mockResolvedValue({ + ok: true, + json: async () => ({ transcription: 'xin chào', confidence: 0.99 }), + }); + + const sttInstance = new STT({ + authToken: 'tok', + apiUrl: 'http://stt:8080', + language: 'vi', + }) as STTWithRecognize; + const frame = makePcmFrame(); + const event = await sttInstance._recognize([frame]); + const ev = event as { + type: number; + alternatives: Array<{ text: string; confidence: number; language: string }>; + }; + + expect(ev.type).toBe(2); // SpeechEventType.FINAL_TRANSCRIPT = 2 + expect(ev.alternatives[0]!.text).toBe('xin chào'); + expect(ev.alternatives[0]!.confidence).toBe(0.99); + expect(ev.alternatives[0]!.language).toBe('vi'); + }); + + it('applies normalization rules to transcription', async () => { + fetchMock.mockResolvedValue({ + ok: true, + json: async () => ({ transcription: 'AI is great', confidence: 0.9 }), + }); + + const sttInstance = new STT({ + authToken: 'tok', + apiUrl: 'http://stt:8080', + normalizationRules: { AI: 'trí tuệ nhân tạo' }, + }) as STTWithRecognize; + + const frame = makePcmFrame(); + const event = await sttInstance._recognize([frame]); + const ev = event as { alternatives: Array<{ text: string }> }; + expect(ev.alternatives[0]!.text).toBe('trí tuệ nhân tạo is great'); + }); + + it('returns event with no alternatives for empty audio', async () => { + const sttInstance = new STT({ + authToken: 'tok', + apiUrl: 'http://stt:8080', + }) as STTWithRecognize; + // Empty frame: 0 samples + const emptyFrame = makePcmFrame(0); + const event = await sttInstance._recognize([emptyFrame]); + const ev = event as { type: number; alternatives?: unknown[] }; + + expect(ev.type).toBe(2); // FINAL_TRANSCRIPT + expect(ev.alternatives).toBeUndefined(); + expect(fetchMock).not.toHaveBeenCalled(); + }); + + it('throws on HTTP error response', async () => { + fetchMock.mockResolvedValue({ + ok: false, + status: 400, + text: async () => 'Bad Request', + }); + + const sttInstance = new STT({ + authToken: 'tok', + apiUrl: 'http://stt:8080', + }) as STTWithRecognize; + const frame = makePcmFrame(); + + await expect(sttInstance._recognize([frame])).rejects.toThrow('Blaze STT error 400'); + }, 20000); + + it('uses language from options in URL', async () => { + fetchMock.mockResolvedValue({ + ok: true, + json: async () => ({ transcription: 'hello', confidence: 1.0 }), + }); + + const sttInstance = new STT({ + authToken: 'tok', + apiUrl: 'http://stt:8080', + language: 'en', + }) as STTWithRecognize; + await sttInstance._recognize([makePcmFrame()]); + + const firstCall = fetchMock.mock.calls[0]; + expect(firstCall).toBeDefined(); + const [url] = firstCall! as [string]; + expect(url).toContain('language=en'); + }); + + it('sends a valid WAV file with correct RIFF header', async () => { + let capturedBody: FormData | undefined; + fetchMock.mockImplementation(async (_url: unknown, init: RequestInit) => { + capturedBody = init.body as FormData; + return { ok: true, json: async () => ({ transcription: '', confidence: 1.0 }) }; + }); + + const sttInstance = new STT({ + authToken: 'tok', + apiUrl: 'http://stt:8080', + }) as STTWithRecognize; + const frame = makePcmFrame(160, 16000, 1); // 160 samples, 16kHz, mono + await sttInstance._recognize([frame]); + + // Extract the WAV Blob from FormData + expect(capturedBody).toBeDefined(); + const wavBlob = capturedBody!.get('audio_file') as Blob; + expect(wavBlob).toBeInstanceOf(Blob); + + const arrayBuffer = await wavBlob.arrayBuffer(); + const buf = Buffer.from(arrayBuffer); + + // WAV RIFF header is 44 bytes + PCM data + // 160 samples × 2 bytes (Int16) = 320 bytes PCM + expect(buf.length).toBe(44 + 320); + + // Verify RIFF header fields + expect(buf.toString('ascii', 0, 4)).toBe('RIFF'); + expect(buf.toString('ascii', 8, 12)).toBe('WAVE'); + expect(buf.toString('ascii', 12, 16)).toBe('fmt '); + expect(buf.readUInt32LE(16)).toBe(16); // Subchunk1 size (PCM) + expect(buf.readUInt16LE(20)).toBe(1); // Audio format (PCM = 1) + expect(buf.readUInt16LE(22)).toBe(1); // Channels (mono) + expect(buf.readUInt32LE(24)).toBe(16000); // Sample rate + expect(buf.readUInt16LE(34)).toBe(16); // Bits per sample + expect(buf.toString('ascii', 36, 40)).toBe('data'); + expect(buf.readUInt32LE(40)).toBe(320); // Data chunk size + }); + + it('applies longer normalization rules first for deterministic results', async () => { + fetchMock.mockResolvedValue({ + ok: true, + // Input has 'A' (short) and 'AB' (long, overlaps with 'A') + json: async () => ({ transcription: 'A AB', confidence: 0.9 }), + }); + + const sttInstance = new STT({ + authToken: 'tok', + apiUrl: 'http://stt:8080', + normalizationRules: { + A: 'X', // shorter (length 1) + AB: 'Y', // longer (length 2) — must be applied first + }, + }) as STTWithRecognize; + + const event = await sttInstance._recognize([makePcmFrame()]); + const ev = event as { alternatives: Array<{ text: string }> }; + // Longer-first: 'AB'→'Y' gives 'A Y', then 'A'→'X' gives 'X Y' + // Shorter-first: 'A'→'X' gives 'X XB', then 'AB' not found → 'X XB' (wrong) + expect(ev.alternatives[0]!.text).toBe('X Y'); + }); + }); + + describe('frame accumulation', () => { + let fetchMock: ReturnType; + + beforeEach(() => { + fetchMock = vi.fn(); + vi.stubGlobal('fetch', fetchMock); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + function emptyFetchResponse() { + return { ok: true, json: async () => ({ transcription: '', confidence: 0.0 }) }; + } + + function textFetchResponse(text: string) { + return { ok: true, json: async () => ({ transcription: text, confidence: 0.95 }) }; + } + + it('empty STT response buffers PCM and returns SpeechData with empty text', async () => { + fetchMock.mockResolvedValue(emptyFetchResponse()); + + const sttInstance = new STT({ authToken: 'tok', apiUrl: 'http://stt:8080' }) as STTWithRecognize & { + _pendingPcm: Buffer; + _pendingEmptyCount: number; + }; + + const frame = makePcmFrame(160); // 160 samples = 320 bytes PCM + const event = await sttInstance._recognize([frame]) as { + type: number; + alternatives?: Array<{ text: string; confidence: number }>; + }; + + // Should return SpeechData with empty text (not undefined alternatives) + expect(event.type).toBe(2); // FINAL_TRANSCRIPT + expect(event.alternatives).toBeDefined(); + expect(event.alternatives![0]!.text).toBe(''); + expect(event.alternatives![0]!.confidence).toBe(0.0); + }); + + it('buffers PCM from empty result and prepends on next call', async () => { + // First call: empty result → buffer + fetchMock.mockResolvedValueOnce(emptyFetchResponse()); + // Second call: capture body size + let capturedWavSize = 0; + fetchMock.mockImplementationOnce(async (_url: unknown, init: RequestInit) => { + const fd = init.body as FormData; + const blob = fd.get('audio_file') as Blob; + capturedWavSize = (await blob.arrayBuffer()).byteLength; + return textFetchResponse('xin chao'); + }); + + const sttInstance = new STT({ authToken: 'tok', apiUrl: 'http://stt:8080' }) as STTWithRecognize; + const frame = makePcmFrame(160, 16000, 1); // 320 bytes PCM each + + await sttInstance._recognize([frame]); // first: empty → buffer + await sttInstance._recognize([frame]); // second: prepend + submit + + // WAV = 44 header + (320 pending + 320 new) = 44 + 640 + expect(capturedWavSize).toBe(44 + 640); + }); + + it('successful result clears pending buffer', async () => { + fetchMock.mockResolvedValueOnce(emptyFetchResponse()); + fetchMock.mockResolvedValueOnce(textFetchResponse('xin chao')); + + const sttInstance = new STT({ authToken: 'tok', apiUrl: 'http://stt:8080' }) as STTWithRecognize; + const frame = makePcmFrame(160); + + await sttInstance._recognize([frame]); // empty → buffer + + // After success, third call should send only single frame (no pending) + let capturedWavSize = 0; + fetchMock.mockImplementationOnce(async (_url: unknown, init: RequestInit) => { + const fd = init.body as FormData; + const blob = fd.get('audio_file') as Blob; + capturedWavSize = (await blob.arrayBuffer()).byteLength; + return textFetchResponse('hello'); + }); + + const result2 = await sttInstance._recognize([frame]); // success → clear pending + expect((result2 as { alternatives: Array<{ text: string }> }).alternatives[0]!.text).toBe('xin chao'); + + await sttInstance._recognize([frame]); // third: should be single frame only + expect(capturedWavSize).toBe(44 + 320); // no pending prepended + }); + + it('discards buffer after maxPendingSegments consecutive empties', async () => { + // 3 empties → buffered; 4th empty → discard + fetchMock.mockResolvedValue(emptyFetchResponse()); + + const sttInstance = new STT({ authToken: 'tok', apiUrl: 'http://stt:8080' }) as STTWithRecognize; + const frame = makePcmFrame(160); + + for (let i = 0; i < 3; i++) { + await sttInstance._recognize([frame]); + } + // After 3 calls: pendingPcm should be non-empty + // Send 4th empty: count exceeds maxPendingSegments (3) + await sttInstance._recognize([frame]); + + // After discard: next call should send only single frame (320 bytes PCM) + let capturedWavSize = 0; + fetchMock.mockImplementationOnce(async (_url: unknown, init: RequestInit) => { + const fd = init.body as FormData; + const blob = fd.get('audio_file') as Blob; + capturedWavSize = (await blob.arrayBuffer()).byteLength; + return textFetchResponse('hello'); + }); + + await sttInstance._recognize([frame]); + expect(capturedWavSize).toBe(44 + 320); // no pending after discard + }); + + it('discards buffer when duration exceeds maxPendingDuration', async () => { + fetchMock.mockResolvedValue(emptyFetchResponse()); + + const sttInstance = new STT({ authToken: 'tok', apiUrl: 'http://stt:8080' }) as STTWithRecognize; + + // At 16kHz, 16-bit, mono: 5s = 5 * 16000 * 2 = 160000 bytes + // Use a large frame whose PCM > 160000 bytes + const largeSamples = 80500; // 161000 bytes > 160000 + const largeFrame = makePcmFrame(largeSamples); + + await sttInstance._recognize([largeFrame]); + + // After discard: next call should send only single frame + let capturedWavSize = 0; + fetchMock.mockImplementationOnce(async (_url: unknown, init: RequestInit) => { + const fd = init.body as FormData; + const blob = fd.get('audio_file') as Blob; + capturedWavSize = (await blob.arrayBuffer()).byteLength; + return textFetchResponse('hello'); + }); + + const smallFrame = makePcmFrame(160); + await sttInstance._recognize([smallFrame]); + expect(capturedWavSize).toBe(44 + 320); // only smallFrame, no pending + }); + }); +}); diff --git a/plugins/blaze/src/stt.ts b/plugins/blaze/src/stt.ts new file mode 100644 index 000000000..ababd2156 --- /dev/null +++ b/plugins/blaze/src/stt.ts @@ -0,0 +1,338 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** + * Blaze STT Plugin for LiveKit Voice Agent (Node.js) + * + * Speech-to-Text plugin interfacing with Blaze transcription service. + * + * API Endpoint: POST `/v1/stt/transcribe` + * Input: WAV audio file (FormData), query params: language, enable_segments + * Output: `{ transcription: string, confidence: number }` + */ +import type { AudioBuffer } from '@livekit/agents'; +import { mergeFrames, stt } from '@livekit/agents'; +import type { AudioFrame } from '@livekit/rtc-node'; +import { + type BlazeConfig, + BlazeHttpError, + MAX_RETRY_COUNT, + RETRY_BASE_DELAY_MS, + type ResolvedBlazeConfig, + buildAuthHeaders, + isRetryableError, + resolveConfig, + sleep, +} from './config.js'; +import type { BlazeSTTResponse } from './models.js'; + +/** Options for the Blaze STT plugin. */ +export interface STTOptions { + /** + * Base URL for the STT service. + * Falls back to config.apiUrl → BLAZE_API_URL env var. + */ + apiUrl?: string; + /** Language code for transcription. Default: "vi" */ + language?: string; + /** Bearer token for authentication. Falls back to BLAZE_API_TOKEN env var. */ + authToken?: string; + /** + * Dictionary of text replacements applied to transcription output. + * Keys are search strings, values are replacements. + * Example: `{ "AI": "trí tuệ nhân tạo" }` + */ + normalizationRules?: Record; + /** Request timeout in milliseconds. Default: 30000 */ + timeout?: number; + /** Centralized configuration object. */ + config?: BlazeConfig; +} + +interface ResolvedSTTOptions { + apiUrl: string; + language: string; + authToken: string; + normalizationRules?: Record; + timeout: number; +} + +function resolveSTTOptions(opts: STTOptions): ResolvedSTTOptions { + const cfg: ResolvedBlazeConfig = resolveConfig(opts.config); + return { + apiUrl: opts.apiUrl ?? cfg.apiUrl, + language: opts.language ?? 'vi', + authToken: opts.authToken ?? cfg.authToken, + normalizationRules: opts.normalizationRules, + timeout: opts.timeout ?? cfg.sttTimeout, + }; +} + +/** + * Blaze Speech-to-Text Plugin. + * + * Converts audio to text using the Blaze transcription service. + * Supports batch recognition only (no real-time streaming). + * Includes retry logic with exponential backoff for transient failures. + * + * @example + * ```typescript + * import { STT } from '@livekit/agents-plugin-blaze'; + * + * const stt = new STT({ language: 'vi' }); + * // Or with shared config: + * const stt = new STT({ config: { apiUrl: 'https://api.blaze.vn', authToken: 'tok' } }); + * ``` + */ +export class STT extends stt.STT { + label = 'blaze.STT'; + #opts: ResolvedSTTOptions; + + // Frame accumulation: buffer PCM from empty STT segments so short + // leading fragments (hesitant speech) are prepended to the next segment. + #pendingPcm: Buffer = Buffer.alloc(0); + #pendingEmptyCount: number = 0; + #lastRecognizeTime: number = 0; + + // Safety limits (mirrors Python defaults) + readonly #maxPendingDuration: number = 5.0; // seconds of buffered audio + readonly #maxPendingSegments: number = 3; // consecutive empty segments + readonly #pendingIdleTimeout: number = 10.0; // auto-clear after idle gap (s) + + constructor(opts: STTOptions = {}) { + super({ streaming: false, interimResults: false, alignedTranscript: false }); + this.#opts = resolveSTTOptions(opts); + } + + /** + * Update STT options at runtime. + */ + updateOptions(opts: Partial>): void { + if (opts.apiUrl !== undefined) this.#opts.apiUrl = opts.apiUrl; + if (opts.language !== undefined) this.#opts.language = opts.language; + if (opts.authToken !== undefined) this.#opts.authToken = opts.authToken; + if (opts.normalizationRules !== undefined) + this.#opts.normalizationRules = opts.normalizationRules; + if (opts.timeout !== undefined) this.#opts.timeout = opts.timeout; + } + + async _recognize(buffer: AudioBuffer, abortSignal?: AbortSignal): Promise { + // 1. Merge all audio frames into one + const frame = mergeFrames(buffer); + + // 2. Extract raw PCM from the merged frame (new segment only) + const segmentPcm = Buffer.from( + frame.data.buffer, + frame.data.byteOffset, + frame.data.byteLength, + ); + + // 3. Auto-clear stale pending buffer if too much time has elapsed + const now = Date.now() / 1000; // seconds + if (this.#pendingPcm.length > 0 && this.#lastRecognizeTime > 0) { + const idleGap = now - this.#lastRecognizeTime; + if (idleGap > this.#pendingIdleTimeout) { + this.#pendingPcm = Buffer.alloc(0); + this.#pendingEmptyCount = 0; + } + } + this.#lastRecognizeTime = now; + + // 4. Prepend buffered PCM from previous empty segments + const pcmData = + this.#pendingPcm.length > 0 ? Buffer.concat([this.#pendingPcm, segmentPcm]) : segmentPcm; + + // 5. Handle fully empty audio (no sound at all) + if (pcmData.byteLength === 0) { + return { + type: stt.SpeechEventType.FINAL_TRANSCRIPT, + alternatives: undefined, + }; + } + + // 6. Convert PCM to WAV format + const wavBuffer = this.#createWavFromPcm(pcmData, frame.sampleRate, frame.channels); + + // 7. Build FormData for multipart upload + const formData = new FormData(); + const wavBytes = Uint8Array.from(wavBuffer); + const wavBlob = new Blob([wavBytes], { type: 'audio/wav' }); + formData.append('audio_file', wavBlob, 'audio.wav'); + + // 8. Build request URL with query params + const url = new URL(`${this.#opts.apiUrl}/v1/stt/transcribe`); + url.searchParams.set('language', this.#opts.language); + url.searchParams.set('enable_segments', 'false'); + url.searchParams.set('enable_refinement', 'false'); + + // 9. Make request with retry logic for transient failures + let result: BlazeSTTResponse | undefined; + + for (let attempt = 0; attempt <= MAX_RETRY_COUNT; attempt++) { + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), this.#opts.timeout); + const signal = abortSignal + ? AbortSignal.any([abortSignal, controller.signal]) + : controller.signal; + + try { + const response = await fetch(url.toString(), { + method: 'POST', + headers: buildAuthHeaders(this.#opts.authToken), + body: formData, + signal, + }); + + // Retry on 5xx server errors + if (response.status >= 500 && attempt < MAX_RETRY_COUNT) { + await sleep(RETRY_BASE_DELAY_MS * 2 ** attempt); + continue; + } + + if (!response.ok) { + const errorText = await response.text().catch(() => 'unknown error'); + throw new BlazeHttpError(response.status, `Blaze STT error ${response.status}: ${errorText}`); + } + + // 10. Parse response + result = (await response.json()) as BlazeSTTResponse; + break; // Success + } catch (err) { + if (attempt < MAX_RETRY_COUNT && isRetryableError(err)) { + await sleep(RETRY_BASE_DELAY_MS * 2 ** attempt); + continue; + } + throw err; + } finally { + clearTimeout(timeoutId); + } + } + + if (!result) { + throw new Error('Blaze STT: all retry attempts failed'); + } + + const rawText = result.transcription ?? ''; + const text = this.#applyNormalizationRules(rawText); + const confidence = result.confidence ?? 1.0; + + // 11. Frame accumulation logic + if (!text.trim()) { + // Empty result — decide whether to buffer or discard + this.#pendingEmptyCount++; + + const bytesPerSample = 2 * frame.channels; // 16-bit PCM + const segmentDuration = + frame.sampleRate && bytesPerSample + ? segmentPcm.byteLength / (frame.sampleRate * bytesPerSample) + : 0; + const pendingDuration = + this.#pendingPcm.length > 0 && frame.sampleRate && bytesPerSample + ? this.#pendingPcm.byteLength / (frame.sampleRate * bytesPerSample) + : 0; + const totalPendingDuration = pendingDuration + segmentDuration; + + if ( + this.#pendingEmptyCount <= this.#maxPendingSegments && + totalPendingDuration <= this.#maxPendingDuration + ) { + // Buffer combined PCM for next call + this.#pendingPcm = pcmData; + } else { + // Safety limit reached — discard buffer + this.#pendingPcm = Buffer.alloc(0); + this.#pendingEmptyCount = 0; + } + + return { + type: stt.SpeechEventType.FINAL_TRANSCRIPT, + alternatives: [ + { + text: '', + language: this.#opts.language as stt.SpeechData['language'], + startTime: 0, + endTime: 0, + confidence: 0.0, + }, + ], + }; + } + + // Got real text — clear pending buffer + this.#pendingPcm = Buffer.alloc(0); + this.#pendingEmptyCount = 0; + + return { + type: stt.SpeechEventType.FINAL_TRANSCRIPT, + alternatives: [ + { + text, + language: this.#opts.language as stt.SpeechData['language'], + startTime: 0, + endTime: 0, + confidence, + }, + ], + }; + } + + stream(): stt.SpeechStream { + throw new Error( + 'Blaze STT does not support streaming recognition. ' + + 'Use _recognize() for batch transcription.', + ); + } + + /** + * Create a WAV file buffer from an AudioFrame (PCM 16-bit signed). + * Follows the same 44-byte RIFF header pattern as the OpenAI STT plugin. + */ + #createWav(frame: AudioFrame): Buffer { + const pcm = Buffer.from(frame.data.buffer, frame.data.byteOffset, frame.data.byteLength); + return this.#createWavFromPcm(pcm, frame.sampleRate, frame.channels); + } + + /** + * Create a WAV file buffer from raw PCM bytes + audio metadata. + * Used when pending PCM is prepended to the current segment. + */ + #createWavFromPcm(pcm: Buffer, sampleRate: number, channels: number): Buffer { + const bitsPerSample = 16; + const byteRate = (sampleRate * channels * bitsPerSample) / 8; + const blockAlign = (channels * bitsPerSample) / 8; + + const header = Buffer.alloc(44); + header.write('RIFF', 0); + header.writeUInt32LE(36 + pcm.byteLength, 4); + header.write('WAVE', 8); + header.write('fmt ', 12); + header.writeUInt32LE(16, 16); // Subchunk1 size (PCM = 16) + header.writeUInt16LE(1, 20); // Audio format (1 = PCM) + header.writeUInt16LE(channels, 22); + header.writeUInt32LE(sampleRate, 24); + header.writeUInt32LE(byteRate, 28); + header.writeUInt16LE(blockAlign, 32); + header.writeUInt16LE(bitsPerSample, 34); + header.write('data', 36); + header.writeUInt32LE(pcm.byteLength, 40); + + return Buffer.concat([header, pcm]); + } + + /** + * Apply case-sensitive string replacements to transcribed text. + */ + #applyNormalizationRules(text: string): string { + const rules = this.#opts.normalizationRules; + if (!rules) return text; + let result = text; + // Apply longer patterns first for more deterministic results. + const entries = Object.entries(rules).sort((a, b) => b[0].length - a[0].length); + for (const [from, to] of entries) { + if (!from) continue; + result = result.replaceAll(from, to); + } + return result; + } +} diff --git a/plugins/blaze/src/tts.test.ts b/plugins/blaze/src/tts.test.ts new file mode 100644 index 000000000..92e25d5cd --- /dev/null +++ b/plugins/blaze/src/tts.test.ts @@ -0,0 +1,234 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { TTS } from './tts.js'; + +describe('TTS', () => { + beforeEach(() => { + // Default fetch stub for tests that construct streams without consuming them. + vi.stubGlobal( + 'fetch', + vi.fn().mockResolvedValue({ + ok: true, + body: new ReadableStream({ + start(controller) { + controller.close(); + }, + }), + }), + ); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it('has correct label', () => { + const ttsInstance = new TTS({ authToken: 'test', apiUrl: 'http://tts:8080' }); + expect(ttsInstance.label).toBe('blaze.TTS'); + }); + + it('reports correct sampleRate', () => { + const ttsInstance = new TTS({ + authToken: 'test', + apiUrl: 'http://tts:8080', + sampleRate: 22050, + }); + expect(ttsInstance.sampleRate).toBe(22050); + }); + + it('uses default sampleRate of 24000', () => { + const ttsInstance = new TTS({ authToken: 'test', apiUrl: 'http://tts:8080' }); + expect(ttsInstance.sampleRate).toBe(24000); + }); + + it('has mono channel (numChannels=1)', () => { + const ttsInstance = new TTS({ authToken: 'test', apiUrl: 'http://tts:8080' }); + expect(ttsInstance.numChannels).toBe(1); + }); + + it('supports streaming capability', () => { + const ttsInstance = new TTS({ authToken: 'test', apiUrl: 'http://tts:8080' }); + expect(ttsInstance.capabilities.streaming).toBe(true); + }); + + it('updateOptions does not throw', () => { + const ttsInstance = new TTS({ authToken: 'test', apiUrl: 'http://tts:8080' }); + expect(() => ttsInstance.updateOptions({ language: 'en', speakerId: 'voice-2' })).not.toThrow(); + }); + + it('synthesize() returns a ChunkedStream', () => { + const ttsInstance = new TTS({ authToken: 'test', apiUrl: 'http://tts:8080' }); + // Register a no-op error handler to prevent unhandled error events + ttsInstance.on('error', () => {}); + const stream = ttsInstance.synthesize('Hello world'); + expect(stream.label).toBe('blaze.ChunkedStream'); + expect(stream.inputText).toBe('Hello world'); + }); + + it('stream() returns a SynthesizeStream', () => { + const ttsInstance = new TTS({ authToken: 'test', apiUrl: 'http://tts:8080' }); + // Register a no-op error handler to prevent unhandled error events + ttsInstance.on('error', () => {}); + const stream = ttsInstance.stream(); + expect(stream.label).toBe('blaze.SynthesizeStream'); + }); + + describe('ChunkedStream synthesis', () => { + let fetchMock: ReturnType; + + beforeEach(() => { + fetchMock = vi.fn(); + vi.stubGlobal('fetch', fetchMock); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it('sends correct FormData fields to TTS endpoint', async () => { + // Create a PCM audio response (16-bit samples at 24kHz) + // For simplicity, use a small buffer representing a few samples + const pcmSamples = new Int16Array(2400); // 100ms of silence at 24kHz + const pcmBuffer = Buffer.from(pcmSamples.buffer); + + // Create a ReadableStream that yields the PCM data + const readable = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array(pcmBuffer)); + controller.close(); + }, + }); + + fetchMock.mockResolvedValue({ + ok: true, + body: readable, + }); + + const ttsInstance = new TTS({ + authToken: 'test-token', + apiUrl: 'http://tts:8080', + language: 'vi', + speakerId: 'speaker-1', + model: 'v2_pro', + }); + + const stream = ttsInstance.synthesize('hello'); + + // Consume the stream + const frames = []; + for await (const audio of stream) { + frames.push(audio); + } + + expect(fetchMock).toHaveBeenCalledOnce(); + const [url, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + expect(url).toBe('http://tts:8080/v1/tts/realtime'); + expect(init.method).toBe('POST'); + expect(init.headers).toMatchObject({ Authorization: 'Bearer test-token' }); + + // Verify FormData contains required fields + const body = init.body as FormData; + expect(body.get('query')).toBe('hello'); + expect(body.get('language')).toBe('vi'); + expect(body.get('audio_format')).toBe('pcm'); + expect(body.get('speaker_id')).toBe('speaker-1'); + expect(body.get('normalization')).toBe('no'); + expect(body.get('model')).toBe('v2_pro'); + + // Should have emitted at least one frame + expect(frames.length).toBeGreaterThan(0); + // Last frame should have final=true + expect(frames[frames.length - 1]!.final).toBe(true); + }); + + it('applies normalization rules before synthesis', async () => { + const pcmSamples = new Int16Array(2400); + const readable = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array(Buffer.from(pcmSamples.buffer))); + controller.close(); + }, + }); + + fetchMock.mockResolvedValue({ ok: true, body: readable }); + + const ttsInstance = new TTS({ + authToken: 'tok', + apiUrl: 'http://tts:8080', + normalizationRules: { $: 'đô la' }, + }); + + const stream = ttsInstance.synthesize('100$'); + for await (const _ of stream) { + /* consume */ + } + + const firstCall = fetchMock.mock.calls[0]; + expect(firstCall).toBeDefined(); + const body = (firstCall![1] as RequestInit).body as FormData; + expect(body.get('query')).toBe('100đô la'); + }); + + it('builds correct FormData for a minimal synthesis request', async () => { + // Keep this test deterministic: return an empty successful audio stream. + const readable = new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + fetchMock.mockResolvedValue({ ok: true, body: readable }); + + const ttsInstance = new TTS({ authToken: 'tok', apiUrl: 'http://tts:8080' }); + const stream = ttsInstance.synthesize('test text'); + for await (const _ of stream) { + // consume stream + } + + expect(fetchMock).toHaveBeenCalledOnce(); + const [url, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + expect(url).toBe('http://tts:8080/v1/tts/realtime'); + expect(init.method).toBe('POST'); + + const body = init.body as FormData; + expect(body.get('query')).toBe('test text'); + expect(body.get('audio_format')).toBe('pcm'); + expect(body.get('normalization')).toBe('no'); + }); + + it('captures options at stream creation time', async () => { + const readable = new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + fetchMock.mockResolvedValue({ ok: true, body: readable }); + + const ttsInstance = new TTS({ + authToken: 'old-token', + apiUrl: 'http://tts:8080', + language: 'vi', + speakerId: 'speaker-old', + }); + + const stream = ttsInstance.synthesize('hello'); + ttsInstance.updateOptions({ + authToken: 'new-token', + language: 'en', + speakerId: 'speaker-new', + }); + + for await (const _ of stream) { + // consume stream + } + + const [_url, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + const body = init.body as FormData; + + expect(init.headers).toMatchObject({ Authorization: 'Bearer old-token' }); + expect(body.get('language')).toBe('vi'); + expect(body.get('speaker_id')).toBe('speaker-old'); + }); + }); +}); diff --git a/plugins/blaze/src/tts.ts b/plugins/blaze/src/tts.ts new file mode 100644 index 000000000..b40ad7605 --- /dev/null +++ b/plugins/blaze/src/tts.ts @@ -0,0 +1,849 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** + * Blaze TTS Plugin for LiveKit Voice Agent (Node.js) + * + * Text-to-Speech plugin interfacing with Blaze TTS service. + * + * Streaming Mode (SynthesizeStream): + * WebSocket Endpoint: ws(s)://gateway/v1/tts/realtime + * Protocol: + * 1. Connect - receive type: "successful-connection" + * 2. Send token/strategy - receive type: "successful-authentication" + * 3. Send event: "speech-start" with params + * 4. Send query: "..." (one or more batches) + * 5. Send event: "speech-end" + * 6. Receive: JSON control msgs + binary PCM frames + * + * One-shot Mode (ChunkedStream): + * HTTP Endpoint: POST /v1/tts/realtime + * Input: FormData (query, language, audio_format, speaker_id, normalization, model) + * Output: Streaming raw PCM audio + */ +import { AudioByteStream, tts, APIStatusError, APIConnectionError } from '@livekit/agents'; +import type { APIConnectOptions } from '@livekit/agents'; +import type { AudioFrame } from '@livekit/rtc-node'; +import WebSocket from 'ws'; +import { + type BlazeConfig, + type ResolvedBlazeConfig, + buildAuthHeaders, + resolveConfig, +} from './config.js'; + +// ──────────────────────────────────────────────── +// Sentence boundary regex +// ──────────────────────────────────────────────── + +const SENTENCE_END_RE = /(?:\n\n+|\n|[.!?;:。!?;:](?:\s|$))/g; + +// ──────────────────────────────────────────────── +// Audio helpers +// ──────────────────────────────────────────────── + +/** + * Apply linear fade-in and/or fade-out to PCM16-LE audio. + */ +function applyPcm16Fade( + pcm: Buffer, + fadeSamples: number, + fadeIn: boolean, + fadeOut: boolean, +): Buffer { + if (!pcm.length || (!fadeIn && !fadeOut)) return pcm; + const sampleCount = Math.floor(pcm.length / 2); + if (sampleCount <= 0) return pcm; + const usable = Math.min(fadeSamples, Math.floor(sampleCount / 2)); + if (usable <= 0) return pcm; + + const result = Buffer.from(pcm); + const view = new DataView(result.buffer, result.byteOffset, result.byteLength); + + if (fadeIn) { + for (let i = 0; i < usable; i++) { + const offset = i * 2; + const sample = view.getInt16(offset, true); + view.setInt16(offset, Math.round(sample * (i / usable)), true); + } + } + if (fadeOut) { + for (let i = 0; i < usable; i++) { + const offset = (sampleCount - usable + i) * 2; + const sample = view.getInt16(offset, true); + view.setInt16(offset, Math.round(sample * ((usable - i) / usable)), true); + } + } + return result; +} + +/** + * Generate silence buffer (PCM16 zeros). + */ +function generateSilence(sampleRate: number, durationMs: number): Buffer { + const numSamples = Math.floor((sampleRate * durationMs) / 1000); + return Buffer.alloc(numSamples * 2); +} + +// ──────────────────────────────────────────────── +// Batching helpers +// ──────────────────────────────────────────────── + +function wordCount(s: string): number { + return (s.match(/\S+/g) || []).length; +} + +interface BatchSplitOpts { + minChars: number; + targetChars: number; + maxChars: number; + force: boolean; + isFirstBatch: boolean; +} + +/** + * Find the optimal split position in accumulated text for TTS batching. + * Returns the string index to split at, or null if no split is ready yet. + */ +function findBatchSplit(text: string, opts: BatchSplitOpts): number | null { + if (!text.trim()) return null; + const hardLimit = Math.min(text.length, opts.maxChars); + + // Find all sentence-end positions within the limit + const positions: number[] = []; + SENTENCE_END_RE.lastIndex = 0; + let m: RegExpExecArray | null; + while ((m = SENTENCE_END_RE.exec(text.slice(0, hardLimit))) !== null) { + positions.push(m.index + m[0].length); + } + + // First batch: prioritize word count for faster first audio + if (opts.isFirstBatch) { + for (const pos of positions) { + if (wordCount(text.slice(0, pos)) >= 4) return pos; + } + } + + // Hard limit reached — must split + if (text.length >= opts.maxChars) { + if (positions.length > 0) return positions[positions.length - 1]!; + return safeSplitOnWhitespace(text, opts.maxChars, opts.minChars); + } + + // Enough text accumulated — prefer boundary around target size + if (text.length >= opts.minChars && positions.length > 0) { + if (text.length >= opts.targetChars) { + for (const pos of positions) { + if (pos >= opts.targetChars) return pos; + } + } + const candidates = positions.filter((p) => p >= opts.minChars); + if (candidates.length > 0) return candidates[candidates.length - 1]!; + } + + // Force flush: send whatever we have + if (opts.force) { + if (positions.length > 0) return positions[positions.length - 1]!; + return safeSplitOnWhitespace(text, text.length, 1); + } + + return null; +} + +function safeSplitOnWhitespace(text: string, preferredIdx: number, floorIdx: number): number { + let idx = Math.min(Math.max(preferredIdx, 1), text.length); + const floor = Math.max(1, Math.min(floorIdx, idx)); + while (idx > floor && !/\s/.test(text[idx - 1] ?? '')) { + idx--; + } + if (idx <= floor) return preferredIdx; + while (idx < text.length && /\s/.test(text[idx] ?? '')) { + idx++; + } + return idx; +} + +function normalizeBatchText(text: string): string { + let result = text.replace(/\n{2,}/g, '\n'); + result = result.replace(/[ \t]{2,}/g, ' '); + return result; +} + +// ──────────────────────────────────────────────── +// Normalization rules +// ──────────────────────────────────────────────── + +/** + * Apply string replacement normalization rules to text before synthesis. + */ +function applyNormalizationRules(text: string, rules?: Record): string { + if (!rules) return text; + let result = text; + // Apply longer patterns first for more deterministic results. + const entries = Object.entries(rules).sort((a, b) => b[0].length - a[0].length); + for (const [from, to] of entries) { + if (!from) continue; + result = result.replaceAll(from, to); + } + return result; +} + +// ──────────────────────────────────────────────── +// TTS Options +// ──────────────────────────────────────────────── + +/** Options for the Blaze TTS plugin. */ +export interface TTSOptions { + /** + * Base URL for the TTS service. + * Falls back to config.apiUrl → BLAZE_API_URL env var. + */ + apiUrl?: string; + /** Language code. Default: "vi" */ + language?: string; + /** Speaker/voice identifier. Default: "default" */ + speakerId?: string; + /** Bearer token for authentication. Falls back to BLAZE_API_TOKEN env var. */ + authToken?: string; + /** TTS model identifier. Default: "v1_5_pro" */ + model?: string; + /** Audio output format: 'pcm' | 'mp3' | 'wav'. Default: 'pcm' */ + audioFormat?: string; + /** Audio playback speed multiplier. Default: '1' */ + audioSpeed?: string; + /** Audio quality (bitrate for mp3). Default: 32 */ + audioQuality?: number; + /** Output sample rate in Hz. Default: 24000 */ + sampleRate?: number; + /** + * Dictionary of text replacements applied before synthesis. + * Keys are search strings, values are replacements. + * Example: `{ "$": "đô la", "%": "phần trăm" }` + */ + normalizationRules?: Record; + /** Minimum chars before first batch can be sent. Default: 100 */ + batchMinChars?: number; + /** Target chars per batch. Default: 200 */ + batchTargetChars?: number; + /** Maximum chars per batch (hard limit). Default: 350 */ + batchMaxChars?: number; + /** Max wait time (ms) before force-flushing a batch. Default: 450 */ + batchMaxWaitMs?: number; + /** Silence duration between TTS segments (ms). Default: 150 */ + interSentenceSilenceMs?: number; + /** Request timeout in milliseconds. Default: 60000 */ + timeout?: number; + /** Centralized configuration object. */ + config?: BlazeConfig; +} + +interface ResolvedTTSOptions { + apiUrl: string; + language: string; + speakerId: string; + authToken: string; + model: string; + audioFormat: string; + audioSpeed: string; + audioQuality: number; + sampleRate: number; + normalizationRules?: Record; + batchMinChars: number; + batchTargetChars: number; + batchMaxChars: number; + batchMaxWaitMs: number; + interSentenceSilenceMs: number; + timeout: number; + wsUrl: string; +} + +function resolveTTSOptions(opts: TTSOptions): ResolvedTTSOptions { + const cfg: ResolvedBlazeConfig = resolveConfig(opts.config); + const apiUrl = opts.apiUrl ?? cfg.apiUrl; + const wsBase = apiUrl.replace('https://', 'wss://').replace('http://', 'ws://'); + + let audioFormat = (opts.audioFormat ?? 'pcm').trim().toLowerCase(); + if (!['pcm', 'mp3', 'wav'].includes(audioFormat)) audioFormat = 'pcm'; + + return { + apiUrl, + language: opts.language ?? 'vi', + speakerId: opts.speakerId ?? 'default', + authToken: opts.authToken ?? cfg.authToken, + model: opts.model ?? 'v1_5_pro', + audioFormat, + audioSpeed: opts.audioSpeed ?? '1', + audioQuality: opts.audioQuality ?? 32, + sampleRate: opts.sampleRate ?? 24000, + normalizationRules: opts.normalizationRules, + batchMinChars: opts.batchMinChars ?? 100, + batchTargetChars: opts.batchTargetChars ?? 200, + batchMaxChars: opts.batchMaxChars ?? 350, + batchMaxWaitMs: opts.batchMaxWaitMs ?? 450, + interSentenceSilenceMs: opts.interSentenceSilenceMs ?? 150, + timeout: opts.timeout ?? cfg.ttsTimeout, + wsUrl: `${wsBase}/v1/tts/realtime`, + }; +} + +function snapshotTTSOptions(opts: ResolvedTTSOptions): ResolvedTTSOptions { + return { + ...opts, + normalizationRules: opts.normalizationRules ? { ...opts.normalizationRules } : undefined, + }; +} + +// ──────────────────────────────────────────────── +// WebSocket helpers +// ──────────────────────────────────────────────── + +function openWebSocket(url: string): Promise { + return new Promise((resolve, reject) => { + const ws = new WebSocket(url); + ws.binaryType = 'nodebuffer'; + const onOpen = () => { + ws.off('error', onError); + resolve(ws); + }; + const onError = (err: Error) => { + ws.off('open', onOpen); + reject( + new APIConnectionError({ + message: `Blaze TTS failed to connect to WebSocket: ${err.message}`, + }), + ); + }; + ws.once('open', onOpen); + ws.once('error', onError); + }); +} + +function waitForWsTextMessage(ws: WebSocket): Promise { + return new Promise((resolve, reject) => { + const cleanup = () => { + ws.off('message', onMessage); + ws.off('error', onError); + ws.off('close', onClose); + }; + const onMessage = (data: Buffer | string) => { + cleanup(); + resolve(typeof data === 'string' ? data : data.toString()); + }; + const onError = (err: Error) => { + cleanup(); + reject( + new APIConnectionError({ + message: `Blaze TTS WebSocket error: ${err.message}`, + }), + ); + }; + const onClose = () => { + cleanup(); + reject(new APIConnectionError({ message: 'Blaze TTS WebSocket closed unexpectedly' })); + }; + ws.on('message', onMessage); + ws.on('error', onError); + ws.on('close', onClose); + }); +} + +// ──────────────────────────────────────────────── +// HTTP-based one-shot synthesis (for ChunkedStream) +// ──────────────────────────────────────────────── + +/** + * Fetch PCM audio from Blaze TTS HTTP API and emit frames via the queue. + * Used by ChunkedStream for one-shot synthesis. + */ +async function synthesizeAudio( + text: string, + opts: ResolvedTTSOptions, + requestId: string, + segmentId: string, + queue: { put: (item: tts.SynthesizedAudio) => void }, + abortSignal: AbortSignal, +): Promise { + const normalized = applyNormalizationRules(text, opts.normalizationRules); + + const formData = new FormData(); + formData.append('query', normalized); + formData.append('language', opts.language); + formData.append('audio_format', opts.audioFormat); + formData.append('speaker_id', opts.speakerId); + formData.append('normalization', 'no'); + formData.append('model', opts.model); + + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), opts.timeout); + const signal = AbortSignal.any([abortSignal, controller.signal]); + + let response: Response; + try { + response = await fetch(`${opts.apiUrl}/v1/tts/realtime`, { + method: 'POST', + headers: buildAuthHeaders(opts.authToken), + body: formData, + signal, + }); + + if (!response.ok) { + const errorText = await response.text().catch(() => 'unknown error'); + throw new APIStatusError({ + message: `Blaze TTS error ${response.status}: ${errorText}`, + options: { statusCode: response.status }, + }); + } + + if (!response.body) { + throw new APIConnectionError({ message: 'Blaze TTS: response body is null' }); + } + + const bstream = new AudioByteStream(opts.sampleRate, 1); + const reader = response.body.getReader(); + let pendingFrame: AudioFrame | undefined; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (signal.aborted) break; + + const chunk = value.buffer.slice(value.byteOffset, value.byteOffset + value.byteLength); + for (const frame of bstream.write(chunk)) { + if (pendingFrame !== undefined) { + queue.put({ requestId, segmentId, frame: pendingFrame, final: false }); + } + pendingFrame = frame; + } + } + + for (const frame of bstream.flush()) { + if (pendingFrame !== undefined) { + queue.put({ requestId, segmentId, frame: pendingFrame, final: false }); + } + pendingFrame = frame; + } + } finally { + reader.releaseLock(); + } + + if (pendingFrame !== undefined) { + queue.put({ requestId, segmentId, frame: pendingFrame, final: true }); + } + } finally { + clearTimeout(timeoutId); + } +} + +// ──────────────────────────────────────────────── +// ChunkedStream: one-shot synthesis via HTTP POST +// ──────────────────────────────────────────────── + +/** + * One-shot TTS stream: synthesizes a complete text segment and returns audio frames. + */ +export class ChunkedStream extends tts.ChunkedStream { + label = 'blaze.ChunkedStream'; + readonly #opts: ResolvedTTSOptions; + + constructor( + text: string, + ttsInstance: TTS, + opts: ResolvedTTSOptions, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ) { + super(text, ttsInstance, connOptions, abortSignal); + this.#opts = opts; + } + + protected async run(): Promise { + const requestId = crypto.randomUUID(); + await synthesizeAudio( + this.inputText, + this.#opts, + requestId, + requestId, + this.queue, + this.abortSignal, + ); + } +} + +// ──────────────────────────────────────────────── +// SynthesizeStream: WebSocket streaming with batching +// ──────────────────────────────────────────────── + +const TIMEOUT_SENTINEL = Symbol('TIMEOUT'); + +/** + * Streaming TTS: opens a persistent WebSocket connection and streams text + * in optimally-sized batches for low first-audio latency. + * + * Text tokens from the LLM are accumulated and split at sentence boundaries + * using the configurable batch size parameters. Audio is received concurrently + * from the TTS service, with inter-sentence silence injection and PCM fade + * applied per segment. + */ +export class SynthesizeStream extends tts.SynthesizeStream { + label = 'blaze.SynthesizeStream'; + readonly #opts: ResolvedTTSOptions; + + constructor(ttsInstance: TTS, opts: ResolvedTTSOptions, connOptions?: APIConnectOptions) { + super(ttsInstance, connOptions); + this.#opts = opts; + } + + protected async run(): Promise { + const opts = this.#opts; + const requestId = crypto.randomUUID(); + const segmentId = requestId; + const fadeSamples = Math.max(1, Math.floor(opts.sampleRate * 0.008)); + const silenceBuf = generateSilence(opts.sampleRate, opts.interSentenceSilenceMs); + + // --- Open WebSocket and perform handshake --- + let ws: WebSocket; + try { + ws = await openWebSocket(opts.wsUrl); + } catch (err) { + throw new APIConnectionError({ + message: `Blaze TTS: failed to connect to ${opts.wsUrl}: ${err}`, + }); + } + + try { + // Wait for connection acknowledgment + const connMsg = await waitForWsTextMessage(ws); + const connData = JSON.parse(connMsg) as Record; + if (connData.type !== 'successful-connection') { + throw new APIConnectionError({ + message: `Blaze TTS: unexpected connection response: ${connMsg}`, + }); + } + + // Authenticate + ws.send(JSON.stringify({ token: opts.authToken, strategy: 'livekit' })); + const authMsg = await waitForWsTextMessage(ws); + const authData = JSON.parse(authMsg) as Record; + if (authData.type !== 'successful-authentication') { + throw new APIConnectionError({ + message: `Blaze TTS: authentication failed: ${authMsg}`, + }); + } + + // Send speech-start with TTS parameters + ws.send( + JSON.stringify({ + event: 'speech-start', + language: opts.language, + speaker_id: opts.speakerId, + model: opts.model, + audio_format: opts.audioFormat, + audio_speed: opts.audioSpeed, + audio_quality: String(opts.audioQuality), + normalization: 'no', + }), + ); + + // --- Set up concurrent audio reader (event-driven) --- + const bstream = new AudioByteStream(opts.sampleRate, 1); + let pendingFrame: AudioFrame | undefined; + let hasPrevSegment = false; + let speechEnded = false; + + let audioReaderResolve!: () => void; + let audioReaderReject!: (err: Error) => void; + const audioReaderDone = new Promise((resolve, reject) => { + audioReaderResolve = resolve; + audioReaderReject = reject; + }); + // Prevent transient unhandledRejection before we await audioReaderDone later. + audioReaderDone.catch(() => {}); + + const emitFrame = (frame: AudioFrame, isFinal: boolean) => { + this.queue.put({ requestId, segmentId, frame, final: isFinal }); + }; + + ws.on('message', (data: Buffer | string, isBinary: boolean) => { + try { + if (isBinary) { + // Binary audio data + const buf = data as Buffer; + const chunk = new Uint8Array(buf).buffer; + for (const frame of bstream.write(chunk)) { + if (pendingFrame !== undefined) { + emitFrame(pendingFrame, false); + } + pendingFrame = frame; + } + } else { + // JSON control message + const msg = JSON.parse(typeof data === 'string' ? data : data.toString()) as Record< + string, + string + >; + const status = msg.status ?? msg.type ?? ''; + + if (status === 'started-byte-stream') { + // New TTS segment starting — inject inter-sentence silence + if (hasPrevSegment && silenceBuf.length > 0) { + const silenceChunk = new Uint8Array(silenceBuf).buffer; + for (const frame of bstream.write(silenceChunk)) { + if (pendingFrame !== undefined) { + emitFrame(pendingFrame, false); + } + pendingFrame = frame; + } + } + } else if (status === 'finished-byte-stream') { + hasPrevSegment = true; + } else if (status === 'speech-end') { + speechEnded = true; + // Flush remaining buffered audio + for (const frame of bstream.flush()) { + if (pendingFrame !== undefined) { + emitFrame(pendingFrame, false); + } + pendingFrame = frame; + } + // Emit last frame as final + if (pendingFrame !== undefined) { + emitFrame(pendingFrame, true); + pendingFrame = undefined; + } + audioReaderResolve(); + } else if (status === 'failed-request' || status === 'error') { + audioReaderReject(new APIConnectionError({ + message: `Blaze TTS error: ${msg.message ?? status}`, + })); + } + } + } catch (err) { + audioReaderReject(err instanceof APIConnectionError ? err : new APIConnectionError({ + message: `Blaze TTS stream error: ${err instanceof Error ? err.message : String(err)}`, + })); + } + }); + + ws.on('error', (err: Error) => { + if (!speechEnded) { + audioReaderReject( + new APIConnectionError({ + message: `Blaze TTS WebSocket error: ${err.message}`, + }), + ); + } + }); + + ws.on('close', () => { + if (!speechEnded) { + // Unexpected close — flush what we have + for (const frame of bstream.flush()) { + if (pendingFrame !== undefined) { + emitFrame(pendingFrame, false); + } + pendingFrame = frame; + } + if (pendingFrame !== undefined) { + emitFrame(pendingFrame, true); + pendingFrame = undefined; + } + audioReaderResolve(); + } + }); + + // --- Text batching loop --- + let textBuf = ''; + let batchCount = 0; + let inputDone = false; + + const sendQuery = (text: string) => { + const normalized = applyNormalizationRules(text, opts.normalizationRules); + const cleaned = normalizeBatchText(normalized); + if (!cleaned.trim()) return; + batchCount++; + ws.send(JSON.stringify({ query: cleaned })); + }; + + const drainBatches = (force: boolean) => { + while (textBuf.length > 0) { + const idx = findBatchSplit(textBuf, { + minChars: opts.batchMinChars, + targetChars: opts.batchTargetChars, + maxChars: opts.batchMaxChars, + force, + isFirstBatch: batchCount === 0, + }); + if (idx === null) break; + const chunk = textBuf.slice(0, idx); + textBuf = textBuf.slice(idx); + if (!chunk.trim()) continue; + if (chunk.trim().length < 8 && !force) { + textBuf = chunk + textBuf; + break; + } + sendQuery(chunk); + } + }; + + // Read input tokens with batch timeout support. + // We manually iterate to support timeout-based flushing. + const inputIter = this.input[Symbol.asyncIterator](); + let pendingNext: Promise< + IteratorResult + > | null = null; + + while (!inputDone) { + if (this.abortSignal.aborted) break; + + if (!pendingNext) { + pendingNext = inputIter.next(); + } + + // Race between next token and batch timeout. Always clear the timeout + // when the token path wins to avoid orphaned timers. + let batchTimeoutId: ReturnType | undefined; + const timeoutPromise = new Promise((resolve) => { + batchTimeoutId = setTimeout(() => resolve(TIMEOUT_SENTINEL), opts.batchMaxWaitMs); + }); + + let result: IteratorResult | typeof TIMEOUT_SENTINEL; + try { + result = await Promise.race([ + pendingNext.then( + (r) => r as IteratorResult, + ), + timeoutPromise, + ]); + } finally { + if (batchTimeoutId !== undefined) { + clearTimeout(batchTimeoutId); + } + } + + if (result === TIMEOUT_SENTINEL) { + // Timeout — flush accumulated text if we have enough for first batch + if (textBuf.trim() && batchCount === 0 && wordCount(textBuf) >= 4) { + sendQuery(textBuf); + textBuf = ''; + } else { + drainBatches(false); + } + continue; + } + + pendingNext = null; // Consumed + + if (result.done) { + inputDone = true; + break; + } + + const item = result.value; + if (item === tts.SynthesizeStream.FLUSH_SENTINEL) { + drainBatches(true); + continue; + } + + textBuf += item; + drainBatches(false); + } + + // Flush any remaining text + if (textBuf.trim()) { + sendQuery(textBuf); + textBuf = ''; + } + + // End speech session + ws.send(JSON.stringify({ event: 'speech-end' })); + + // Wait for all audio to be received + await audioReaderDone; + + // Signal end of stream to framework + this.queue.put(tts.SynthesizeStream.END_OF_STREAM); + } finally { + if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) { + ws.close(); + } + } + } +} + +// ──────────────────────────────────────────────── +// TTS Plugin +// ──────────────────────────────────────────────── + +/** + * Blaze Text-to-Speech Plugin. + * + * Converts text to speech using the Blaze TTS service. + * Supports both one-shot synthesis (ChunkedStream) via HTTP and + * streaming synthesis (SynthesizeStream) via WebSocket with text batching. + * + * @example + * ```typescript + * import { TTS } from '@livekit/agents-plugin-blaze'; + * + * const tts = new TTS({ speakerId: 'speaker-1', language: 'vi' }); + * // Or with shared config and batching options: + * const tts = new TTS({ + * config: { apiUrl: 'http://tts:8080', authToken: 'tok' }, + * batchMinChars: 80, + * batchTargetChars: 150, + * interSentenceSilenceMs: 200, + * }); + * ``` + */ +export class TTS extends tts.TTS { + label = 'blaze.TTS'; + #opts: ResolvedTTSOptions; + + constructor(opts: TTSOptions = {}) { + const resolved = resolveTTSOptions(opts); + super(resolved.sampleRate, 1, { streaming: true }); + this.#opts = resolved; + } + + /** + * Update TTS options at runtime. + */ + updateOptions(opts: Partial>): void { + if (opts.language !== undefined) this.#opts.language = opts.language; + if (opts.speakerId !== undefined) this.#opts.speakerId = opts.speakerId; + if (opts.authToken !== undefined) this.#opts.authToken = opts.authToken; + if (opts.model !== undefined) this.#opts.model = opts.model; + if (opts.audioFormat !== undefined) this.#opts.audioFormat = opts.audioFormat; + if (opts.audioSpeed !== undefined) this.#opts.audioSpeed = opts.audioSpeed; + if (opts.audioQuality !== undefined) this.#opts.audioQuality = opts.audioQuality; + if (opts.timeout !== undefined) this.#opts.timeout = opts.timeout; + if (opts.normalizationRules !== undefined) + this.#opts.normalizationRules = opts.normalizationRules; + if (opts.batchMinChars !== undefined) this.#opts.batchMinChars = opts.batchMinChars; + if (opts.batchTargetChars !== undefined) this.#opts.batchTargetChars = opts.batchTargetChars; + if (opts.batchMaxChars !== undefined) this.#opts.batchMaxChars = opts.batchMaxChars; + if (opts.batchMaxWaitMs !== undefined) this.#opts.batchMaxWaitMs = opts.batchMaxWaitMs; + if (opts.interSentenceSilenceMs !== undefined) + this.#opts.interSentenceSilenceMs = opts.interSentenceSilenceMs; + // Recompute WS URL if apiUrl changed + if (opts.apiUrl !== undefined) { + this.#opts.apiUrl = opts.apiUrl; + const wsBase = opts.apiUrl.replace('https://', 'wss://').replace('http://', 'ws://'); + this.#opts.wsUrl = `${wsBase}/v1/tts/realtime`; + } + } + + synthesize( + text: string, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ): ChunkedStream { + return new ChunkedStream(text, this, snapshotTTSOptions(this.#opts), connOptions, abortSignal); + } + + stream(options?: { connOptions?: APIConnectOptions }): SynthesizeStream { + return new SynthesizeStream(this, snapshotTTSOptions(this.#opts), options?.connOptions); + } +} diff --git a/plugins/blaze/tsconfig.json b/plugins/blaze/tsconfig.json new file mode 100644 index 000000000..661b42094 --- /dev/null +++ b/plugins/blaze/tsconfig.json @@ -0,0 +1,14 @@ +{ + "extends": "../../tsconfig.json", + "include": ["./src"], + "compilerOptions": { + "rootDir": "./src", + "declarationDir": "./dist", + "outDir": "./dist" + }, + "typedocOptions": { + "name": "plugins/agents-plugin-blaze", + "entryPointStrategy": "resolve", + "entryPoints": ["src/index.ts"] + } +} diff --git a/plugins/blaze/tsup.config.ts b/plugins/blaze/tsup.config.ts new file mode 100644 index 000000000..8ca20961f --- /dev/null +++ b/plugins/blaze/tsup.config.ts @@ -0,0 +1,7 @@ +import { defineConfig } from 'tsup'; + +import defaults from '../../tsup.config'; + +export default defineConfig({ + ...defaults, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c8a126114..4c772aea4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -436,6 +436,27 @@ importers: specifier: ^5.0.0 version: 5.4.5 + plugins/blaze: + devDependencies: + '@livekit/agents': + specifier: workspace:* + version: link:../../agents + '@livekit/agents-plugins-test': + specifier: workspace:* + version: link:../test + '@livekit/rtc-node': + specifier: 'catalog:' + version: 0.13.24 + '@microsoft/api-extractor': + specifier: ^7.35.0 + version: 7.43.7(@types/node@22.19.1) + tsup: + specifier: ^8.3.5 + version: 8.4.0(@microsoft/api-extractor@7.43.7(@types/node@22.19.1))(postcss@8.5.6)(tsx@4.21.0)(typescript@5.9.3) + typescript: + specifier: ^5.0.0 + version: 5.9.3 + plugins/cartesia: dependencies: ws: diff --git a/turbo.json b/turbo.json index 064ba79d6..21d779571 100644 --- a/turbo.json +++ b/turbo.json @@ -12,6 +12,11 @@ "BASETEN_API_KEY", "BASETEN_MODEL_ENDPOINT", "BASETEN_STT_MODEL_ID", + "BLAZE_API_URL", + "BLAZE_API_TOKEN", + "BLAZE_STT_TIMEOUT", + "BLAZE_TTS_TIMEOUT", + "BLAZE_LLM_TIMEOUT", "CARTESIA_API_KEY", "CAL_API_KEY", "CEREBRAS_API_KEY",