diff --git a/.changeset/few-buttons-draw.md b/.changeset/few-buttons-draw.md new file mode 100644 index 000000000..1641b12ba --- /dev/null +++ b/.changeset/few-buttons-draw.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": minor +--- + +feat(stt): add FakeSTT test harness for FallbackAdapter diff --git a/agents/src/stt/fallback_adapter.test.ts b/agents/src/stt/fallback_adapter.test.ts index 2517317ed..1c230ff91 100644 --- a/agents/src/stt/fallback_adapter.test.ts +++ b/agents/src/stt/fallback_adapter.test.ts @@ -5,131 +5,9 @@ import type { EventEmitter } from 'node:events'; import { beforeAll, describe, expect, it, vi } from 'vitest'; import { APIConnectionError, APIError } from '../_exceptions.js'; import { initializeLogger } from '../log.js'; -import type { APIConnectOptions } from '../types.js'; import { FallbackAdapter } from './fallback_adapter.js'; -import { - STT, - type STTCapabilities, - type SpeechEvent, - SpeechEventType, - SpeechStream, -} from './stt.js'; - -type Step = - | { kind: 'event'; event: SpeechEvent } - | { kind: 'error'; error: Error; recoverable?: boolean } - | { kind: 'end' }; - -class MockSpeechStream extends SpeechStream { - label: string; - private program: Step[]; - private parent: MockSTT; - private drainsInput: boolean; - constructor( - parent: MockSTT, - program: Step[], - connOptions?: APIConnectOptions, - drainsInput = false, - ) { - super(parent, undefined, connOptions); - this.label = `${parent.label}.stream`; - this.program = program; - this.parent = parent; - this.drainsInput = drainsInput; - } - protected async run(): Promise { - for (const step of this.program) { - if (step.kind === 'event') { - this.queue.put(step.event); - } else if (step.kind === 'error') { - this.parent.emit('error', { - type: 'stt_error', - timestamp: Date.now(), - label: this.parent.label, - error: step.error, - recoverable: step.recoverable ?? false, - }); - throw step.error; - } else if (step.kind === 'end') { - break; - } - } - // Optionally block on input like a real provider. run() only returns once - // endInput() is called on this child, so this is what exposes the - // "child elected after forwarder EOF never gets endInput()" hang. - if (this.drainsInput) { - for await (const _ of this.input) { - /* noop */ - } - } - } -} - -interface MockSTTOptions { - label: string; - program: Step[]; - streamProgram?: Step[]; - streamDrainsInput?: boolean; - capabilities?: Partial; -} - -class MockSTT extends STT { - label: string; - private recognizeProgram: Step[]; - private streamProgram: Step[]; - private streamDrainsInput: boolean; - constructor(opts: MockSTTOptions) { - super({ - streaming: opts.capabilities?.streaming ?? true, - interimResults: opts.capabilities?.interimResults ?? true, - diarization: opts.capabilities?.diarization ?? false, - }); - this.label = opts.label; - this.recognizeProgram = opts.program; - this.streamProgram = opts.streamProgram ?? opts.program; - this.streamDrainsInput = opts.streamDrainsInput ?? false; - } - override async recognize( - frame: Parameters[0], - abortSignal?: AbortSignal, - ): Promise { - return super.recognize(frame, abortSignal); - } - protected async _recognize(): Promise { - for (const step of this.recognizeProgram) { - if (step.kind === 'event') return step.event; - if (step.kind === 'error') throw step.error; - } - return { type: SpeechEventType.FINAL_TRANSCRIPT }; - } - override stream(options?: { connOptions?: APIConnectOptions }): SpeechStream { - return new MockSpeechStream( - this, - this.streamProgram, - options?.connOptions, - this.streamDrainsInput, - ); - } -} - -const finalEvent: SpeechEvent = { - type: SpeechEventType.FINAL_TRANSCRIPT, - alternatives: [ - { - language: 'en', - text: 'hello world', - startTime: 0, - endTime: 1, - confidence: 0.99, - }, - ], - requestId: 'req-1', -}; - -const emptyFinalEvent: SpeechEvent = { - type: SpeechEventType.FINAL_TRANSCRIPT, - alternatives: [{ language: 'en', text: '', startTime: 0, endTime: 1, confidence: 0.99 }], -}; +import type { STT, SpeechEvent } from './stt.js'; +import { FakeSTT, RecognizeSentinel, emptyAudioFrame } from './testing/fake_stt.js'; describe('FallbackAdapter', () => { beforeAll(() => { @@ -144,10 +22,9 @@ describe('FallbackAdapter', () => { }); it('throws if a non-streaming STT is provided without a VAD', () => { - const nonStreaming = new MockSTT({ + const nonStreaming = new FakeSTT({ label: 'non-streaming', - program: [{ kind: 'end' }], - capabilities: { streaming: false }, + capabilities: { streaming: false, interimResults: false }, }); expect(() => new FallbackAdapter({ sttInstances: [nonStreaming] })).toThrow( /do not support streaming/, @@ -155,8 +32,8 @@ describe('FallbackAdapter', () => { }); it('exposes provided instances and telephony-tuned defaults', () => { - const a = new MockSTT({ label: 'a', program: [{ kind: 'end' }] }); - const b = new MockSTT({ label: 'b', program: [{ kind: 'end' }] }); + const a = new FakeSTT({ label: 'a' }); + const b = new FakeSTT({ label: 'b' }); const adapter = new FallbackAdapter({ sttInstances: [a, b] }); expect(adapter.sttInstances).toHaveLength(2); expect(adapter.sttInstances[0]).toBe(a); @@ -168,98 +45,106 @@ describe('FallbackAdapter', () => { }); it('reports streaming=true even when capabilities are mixed (via StreamAdapter wrap)', () => { - // All-streaming case: we can verify streaming=true without needing a VAD. - const a = new MockSTT({ label: 'a', program: [{ kind: 'end' }] }); + const a = new FakeSTT({ label: 'a' }); const adapter = new FallbackAdapter({ sttInstances: [a] }); expect(adapter.capabilities.streaming).toBe(true); }); it('_recognize falls through to the next instance on error', async () => { - const boom = new APIConnectionError({ message: 'primary down' }); - const primary = new MockSTT({ label: 'primary', program: [{ kind: 'error', error: boom }] }); - const fallback = new MockSTT({ - label: 'fallback', - program: [{ kind: 'event', event: finalEvent }], + const primary = new FakeSTT({ + label: 'primary', + fakeException: new APIConnectionError({ message: 'primary down' }), }); + const fallback = new FakeSTT({ label: 'fallback', fakeTranscript: 'hello world' }); const adapter = new FallbackAdapter({ sttInstances: [primary, fallback] }); - const emptyFrame = {} as Parameters[0]; - const result = await adapter.recognize(emptyFrame); - expect(result).toEqual(finalEvent); + const result = await adapter.recognize(emptyAudioFrame()); + expect(result.alternatives?.[0]?.text).toBe('hello world'); expect(adapter.status[0]?.available).toBe(false); expect(adapter.status[1]?.available).toBe(true); + + // Observability: each STT saw exactly one recognize() attempt. + expect((await primary.recognizeCh.next()).value).toBeInstanceOf(RecognizeSentinel); + expect((await fallback.recognizeCh.next()).value).toBeInstanceOf(RecognizeSentinel); }); it('_recognize throws APIConnectionError when every instance fails', async () => { const boom = new APIConnectionError({ message: 'down' }); - const a = new MockSTT({ label: 'a', program: [{ kind: 'error', error: boom }] }); - const b = new MockSTT({ label: 'b', program: [{ kind: 'error', error: boom }] }); + const a = new FakeSTT({ label: 'a', fakeException: boom }); + const b = new FakeSTT({ label: 'b', fakeException: boom }); const adapter = new FallbackAdapter({ sttInstances: [a, b] }); - const emptyFrame = {} as Parameters[0]; - await expect(adapter.recognize(emptyFrame)).rejects.toThrow(/all STTs failed/); + await expect(adapter.recognize(emptyAudioFrame())).rejects.toThrow(/all STTs failed/); expect(adapter.status[0]?.available).toBe(false); expect(adapter.status[1]?.available).toBe(false); }); it('_recognize treats non-APIError failures as fallback-worthy too', async () => { - const a = new MockSTT({ - label: 'a', - program: [{ kind: 'error', error: new Error('anything') }], - }); - const b = new MockSTT({ - label: 'b', - program: [{ kind: 'event', event: finalEvent }], - }); + const a = new FakeSTT({ label: 'a', fakeException: new Error('anything') }); + const b = new FakeSTT({ label: 'b', fakeTranscript: 'hello world' }); const adapter = new FallbackAdapter({ sttInstances: [a, b] }); - const emptyFrame = {} as Parameters[0]; - const result = await adapter.recognize(emptyFrame); - expect(result).toEqual(finalEvent); + const result = await adapter.recognize(emptyAudioFrame()); + expect(result.alternatives?.[0]?.text).toBe('hello world'); expect(adapter.status[0]?.available).toBe(false); }); it("emits 'stt_availability_changed' with { stt, available } when marking unavailable", async () => { - const boom = new APIError('primary down'); - const primary = new MockSTT({ label: 'primary', program: [{ kind: 'error', error: boom }] }); - const fallback = new MockSTT({ - label: 'fallback', - program: [{ kind: 'event', event: finalEvent }], + const primary = new FakeSTT({ + label: 'primary', + fakeException: new APIError('primary down'), }); + const fallback = new FakeSTT({ label: 'fallback', fakeTranscript: 'hello world' }); const adapter = new FallbackAdapter({ sttInstances: [primary, fallback] }); const handler = vi.fn(); (adapter as unknown as EventEmitter).on('stt_availability_changed', handler); - const emptyFrame = {} as Parameters[0]; - await adapter.recognize(emptyFrame); + await adapter.recognize(emptyAudioFrame()); expect(handler).toHaveBeenCalledWith({ stt: primary, available: false }); }); - it('recognize recovery probe flips an instance back to available on success', async () => { - // Primary fails, fallback succeeds. The background recovery probe for the - // primary re-runs recognize() — with our MockSTT program, the second - // invocation still errors, so it stays unavailable. Swap program mid-test - // to simulate recovery. - const primary = new MockSTT({ + it('recognize recovery probe flips an instance back to available once it succeeds', async () => { + // Port of Python's `test_stt_recover`. Primary starts broken, fallback + // works. After the first recognize() marks primary unavailable, flipping + // primary to success via updateOptions should let the background + // recovery task mark it available again on the next recognize() call + // (recovery is scheduled inside _recognize for every unavailable STT). + const primary = new FakeSTT({ label: 'primary', - program: [{ kind: 'error', error: new APIError('transient') }], - }); - const fallback = new MockSTT({ - label: 'fallback', - program: [{ kind: 'event', event: finalEvent }], + fakeException: new APIConnectionError({ message: 'primary down' }), }); + const fallback = new FakeSTT({ label: 'fallback', fakeTranscript: 'hello world' }); const adapter = new FallbackAdapter({ sttInstances: [primary, fallback] }); - const emptyFrame = {} as Parameters[0]; - await adapter.recognize(emptyFrame); - expect(adapter.status[0]?.available).toBe(false); + const availabilityEvents: Array<{ stt: STT; available: boolean }> = []; + (adapter as unknown as EventEmitter).on( + 'stt_availability_changed', + (ev: { stt: STT; available: boolean }) => { + availabilityEvents.push(ev); + }, + ); - // Give the background recovery task a chance to run (then confirm it - // correctly stays marked unavailable since primary's program still errors). - await new Promise((r) => setTimeout(r, 20)); + await adapter.recognize(emptyAudioFrame()); expect(adapter.status[0]?.available).toBe(false); + expect(adapter.status[1]?.available).toBe(true); + + // Flip primary to success and trigger another recognize — that kicks + // off a fresh recovery task for primary. + primary.updateOptions({ fakeException: null, fakeTranscript: 'recovered' }); + await adapter.recognize(emptyAudioFrame()); + + // Recovery runs asynchronously; poll briefly for the flip. + const deadline = Date.now() + 500; + while (Date.now() < deadline && !adapter.status[0]?.available) { + await new Promise((r) => setTimeout(r, 5)); + } + expect(adapter.status[0]?.available).toBe(true); + expect(availabilityEvents.map((e) => ({ stt: e.stt.label, available: e.available }))).toEqual([ + { stt: 'primary', available: false }, + { stt: 'primary', available: true }, + ]); }); it('recognize emits exactly one metrics_collected event (no double-count)', async () => { @@ -268,24 +153,20 @@ describe('FallbackAdapter', () => { // also emits metrics — and those child metrics are forwarded onto the // adapter. Without a recognize() override, consumers see two stt_metrics // events per call and RECOGNITION_USAGE is double-counted. - const primary = new MockSTT({ - label: 'primary', - program: [{ kind: 'event', event: finalEvent }], - }); + const primary = new FakeSTT({ label: 'primary', fakeTranscript: 'hello' }); const adapter = new FallbackAdapter({ sttInstances: [primary] }); const received: unknown[] = []; adapter.on('metrics_collected', (m) => received.push(m)); - const emptyFrame = {} as Parameters[0]; - await adapter.recognize(emptyFrame); + await adapter.recognize(emptyAudioFrame()); expect(received).toHaveLength(1); }); it('forwards metrics_collected events from every child instance', () => { - const a = new MockSTT({ label: 'a', program: [{ kind: 'end' }] }); - const b = new MockSTT({ label: 'b', program: [{ kind: 'end' }] }); + const a = new FakeSTT({ label: 'a' }); + const b = new FakeSTT({ label: 'b' }); const adapter = new FallbackAdapter({ sttInstances: [a, b] }); const received: unknown[] = []; @@ -307,7 +188,7 @@ describe('FallbackAdapter', () => { }); it('close detaches the forwarders so orphan events stop flowing through', async () => { - const a = new MockSTT({ label: 'a', program: [{ kind: 'end' }] }); + const a = new FakeSTT({ label: 'a' }); const adapter = new FallbackAdapter({ sttInstances: [a] }); const received: unknown[] = []; @@ -327,13 +208,6 @@ describe('FallbackAdapter', () => { expect(received).toHaveLength(0); }); - - it('recovery probe marks an STT available when it yields a non-empty FINAL_TRANSCRIPT', () => { - // Direct-unit test of the probe guard: an empty-transcript FINAL event - // should not satisfy the recovery condition. - expect(emptyFinalEvent.alternatives?.[0]?.text).toBe(''); - expect(finalEvent.alternatives?.[0]?.text).toBe('hello world'); - }); }); describe('FallbackSpeechStream (streaming path)', () => { @@ -343,16 +217,8 @@ describe('FallbackSpeechStream (streaming path)', () => { }); it('forwards events from the primary without triggering fallback when it succeeds', async () => { - const primary = new MockSTT({ - label: 'primary', - program: [], - streamProgram: [{ kind: 'event', event: finalEvent }, { kind: 'end' }], - }); - const fallback = new MockSTT({ - label: 'fallback', - program: [], - streamProgram: [{ kind: 'event', event: finalEvent }, { kind: 'end' }], - }); + const primary = new FakeSTT({ label: 'primary', fakeTranscript: 'hello world' }); + const fallback = new FakeSTT({ label: 'fallback', fakeTranscript: 'hello world' }); const adapter = new FallbackAdapter({ sttInstances: [primary, fallback] }); const availabilityChanges: Array<{ stt: STT; available: boolean }> = []; @@ -369,23 +235,18 @@ describe('FallbackSpeechStream (streaming path)', () => { const events: SpeechEvent[] = []; for await (const ev of stream) events.push(ev); - expect(events).toEqual([finalEvent]); + expect(events.map((e) => e.alternatives?.[0]?.text)).toEqual(['hello world']); expect(availabilityChanges).toEqual([]); expect(adapter.status[0]?.available).toBe(true); expect(adapter.status[1]?.available).toBe(true); }); it('stream switches to the secondary provider when the primary errors', async () => { - const primary = new MockSTT({ + const primary = new FakeSTT({ label: 'primary', - program: [], - streamProgram: [{ kind: 'error', error: new APIError('primary down') }], - }); - const fallback = new MockSTT({ - label: 'fallback', - program: [], - streamProgram: [{ kind: 'event', event: finalEvent }, { kind: 'end' }], + fakeException: new APIError('primary down'), }); + const fallback = new FakeSTT({ label: 'fallback', fakeTranscript: 'hello world' }); const adapter = new FallbackAdapter({ sttInstances: [primary, fallback], maxRetryPerSTT: 0, // no retries — primary fails once, move on @@ -405,28 +266,21 @@ describe('FallbackSpeechStream (streaming path)', () => { const events: SpeechEvent[] = []; for await (const ev of stream) events.push(ev); - expect(events).toEqual([finalEvent]); + expect(events.map((e) => e.alternatives?.[0]?.text)).toEqual(['hello world']); expect(availabilityChanges).toContainEqual({ stt: primary, available: false }); expect(adapter.status[0]?.available).toBe(false); expect(adapter.status[1]?.available).toBe(true); + + // Both providers saw a stream attempt via the observability channel. + expect((await primary.streamCh.next()).done).toBe(false); + expect((await fallback.streamCh.next()).done).toBe(false); }); it('stream marks every instance unavailable when all children fail', async () => { const err = new APIError('down'); - const a = new MockSTT({ - label: 'a', - program: [], - streamProgram: [{ kind: 'error', error: err }], - }); - const b = new MockSTT({ - label: 'b', - program: [], - streamProgram: [{ kind: 'error', error: err }], - }); - const adapter = new FallbackAdapter({ - sttInstances: [a, b], - maxRetryPerSTT: 0, - }); + const a = new FakeSTT({ label: 'a', fakeException: err }); + const b = new FakeSTT({ label: 'b', fakeException: err }); + const adapter = new FallbackAdapter({ sttInstances: [a, b], maxRetryPerSTT: 0 }); // Adapter's base SpeechStream.mainTask re-throws after emitting 'error'; // swallow to keep the test harness quiet. @@ -449,17 +303,12 @@ describe('FallbackSpeechStream (streaming path)', () => { // The fallback child elected afterwards never receives endInput(), so // a provider whose run() drains input hangs forever. Guard: on election // after the forwarder has finished, immediately end the child's input. - const primary = new MockSTT({ + // FakeRecognizeStream drains input by default, matching a real provider. + const primary = new FakeSTT({ label: 'primary', - program: [], - streamProgram: [{ kind: 'error', error: new APIError('primary down') }], - }); - const fallback = new MockSTT({ - label: 'fallback', - program: [], - streamProgram: [{ kind: 'event', event: finalEvent }, { kind: 'end' }], - streamDrainsInput: true, + fakeException: new APIError('primary down'), }); + const fallback = new FakeSTT({ label: 'fallback', fakeTranscript: 'hello world' }); const adapter = new FallbackAdapter({ sttInstances: [primary, fallback], maxRetryPerSTT: 0, @@ -479,7 +328,7 @@ describe('FallbackSpeechStream (streaming path)', () => { const outcome = await Promise.race([collect.then(() => 'ok' as const), timeout]); expect(outcome).toBe('ok'); - expect(events).toEqual([finalEvent]); + expect(events.map((e) => e.alternatives?.[0]?.text)).toEqual(['hello world']); expect(adapter.status[0]?.available).toBe(false); expect(adapter.status[1]?.available).toBe(true); }); diff --git a/agents/src/stt/index.ts b/agents/src/stt/index.ts index 77b200f2a..5b0517546 100644 --- a/agents/src/stt/index.ts +++ b/agents/src/stt/index.ts @@ -18,3 +18,4 @@ export { type STTCallbacks, type STTCapabilities, } from './stt.js'; +export * as testing from './testing/index.js'; diff --git a/agents/src/stt/testing/fake_stt.ts b/agents/src/stt/testing/fake_stt.ts new file mode 100644 index 000000000..4a4b44a24 --- /dev/null +++ b/agents/src/stt/testing/fake_stt.ts @@ -0,0 +1,337 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** + * Reusable fake STT for exercising the STT pipeline in tests without talking + * to a real provider. Ported from the Python harness at + * `livekit-agents/tests/fake_stt.py` so JS tests can achieve the same + * coverage shape (scripted failures, mid-test behaviour flips, timed interim + * + final transcript emission). + */ +import type { AudioFrame } from '@livekit/rtc-node'; +import { asLanguageCode } from '../../language.js'; +import type { APIConnectOptions } from '../../types.js'; +import { AsyncIterableQueue, type AudioBuffer, delay } from '../../utils.js'; +import { + STT, + type STTCapabilities, + type SpeechEvent, + SpeechEventType, + SpeechStream, +} from '../stt.js'; + +/** + * Describes a scheduled speech turn. `startTime`/`endTime`/`sttDelay` are in + * milliseconds and keyed to the moment the first audio frame is pushed into + * the stream — not wall-clock. `sttDelay` is the provider's transcription + * lag; the stream emits an interim result halfway through and a final result + * at `endTime + sttDelay`. + * + * Python uses seconds here — multiply Python fixtures by 1000 when porting. + */ +// Ref: python tests/fake_stt.py - 29-34 lines +export interface FakeUserSpeech { + startTime: number; + endTime: number; + transcript: string; + sttDelay: number; +} + +/** Scale every timing field by `factor` — useful for speeding up tests. */ +// Ref: python tests/fake_stt.py - 36-41 lines +export function speedUpFakeUserSpeech(speech: FakeUserSpeech, factor: number): FakeUserSpeech { + return { + ...speech, + startTime: speech.startTime / factor, + endTime: speech.endTime / factor, + sttDelay: speech.sttDelay / factor, + }; +} + +/** Marker posted to {@link FakeSTT.recognizeCh} each time `recognize()` runs. */ +// Ref: python tests/fake_stt.py - 25-26 lines +export class RecognizeSentinel {} + +export interface FakeSTTOptions { + label?: string; + fakeException?: Error | null; + fakeTranscript?: string | null; + fakeTimeoutMs?: number | null; + fakeUserSpeeches?: FakeUserSpeech[] | null; + fakeRequireAudio?: boolean; + capabilities?: Partial; +} + +type UpdateOptions = Partial< + Pick +>; + +/** + * Configurable stand-in for a real {@link STT}. Knobs mirror Python's + * `FakeSTT`: inject exceptions, scripted transcripts, connection timeouts, + * or a full sequence of {@link FakeUserSpeech} turns. + * + * Observability: every call to `recognize()` posts to {@link recognizeCh} + * and every call to `stream()` posts the new stream to {@link streamCh}, so + * tests can assert on attempt counts directly instead of inferring them. + * + * @example + * ```ts + * const primary = new FakeSTT({ fakeException: new APIConnectionError('down') }); + * const fallback = new FakeSTT({ fakeTranscript: 'hello world' }); + * const adapter = new FallbackAdapter({ sttInstances: [primary, fallback] }); + * const ev = await adapter.recognize(frame); + * assert(ev.alternatives[0].text === 'hello world'); + * assert((await primary.recognizeCh.next()).value instanceof RecognizeSentinel); + * ``` + */ +// Ref: python tests/fake_stt.py - 44-147 lines +export class FakeSTT extends STT { + label: string; + + private _fakeException: Error | null; + private _fakeTranscript: string | null; + private _fakeTimeoutMs: number | null; + private _fakeUserSpeeches: FakeUserSpeech[] | null; + private _fakeRequireAudio: boolean; + + private _recognizeCh = new AsyncIterableQueue(); + private _streamCh = new AsyncIterableQueue(); + private _fakeUserSpeechesDone: Promise; + private _resolveFakeUserSpeechesDone!: () => void; + + constructor(opts: FakeSTTOptions = {}) { + super({ + streaming: opts.capabilities?.streaming ?? true, + interimResults: opts.capabilities?.interimResults ?? false, + diarization: opts.capabilities?.diarization ?? false, + alignedTranscript: opts.capabilities?.alignedTranscript ?? false, + }); + this.label = opts.label ?? 'fake-stt'; + this._fakeException = opts.fakeException ?? null; + this._fakeTranscript = opts.fakeTranscript ?? null; + this._fakeTimeoutMs = opts.fakeTimeoutMs ?? null; + this._fakeRequireAudio = opts.fakeRequireAudio ?? false; + + let speeches = opts.fakeUserSpeeches ?? null; + if (speeches && speeches.length > 0) { + speeches = [...speeches].sort((a, b) => a.startTime - b.startTime); + for (let i = 0; i < speeches.length - 1; i++) { + if (speeches[i]!.endTime > speeches[i + 1]!.startTime) { + throw new Error('fake user speeches overlap'); + } + } + } + this._fakeUserSpeeches = speeches; + + this._fakeUserSpeechesDone = new Promise((resolve) => { + this._resolveFakeUserSpeechesDone = resolve; + }); + } + + /** Replace one or more fake knobs mid-test (e.g. flip from error to success). */ + // Ref: python tests/fake_stt.py - 74-88 lines + updateOptions(opts: UpdateOptions): void { + if ('fakeException' in opts) this._fakeException = opts.fakeException ?? null; + if ('fakeTranscript' in opts) this._fakeTranscript = opts.fakeTranscript ?? null; + if ('fakeTimeoutMs' in opts) this._fakeTimeoutMs = opts.fakeTimeoutMs ?? null; + } + + /** Channel: one sentinel per `recognize()` invocation. */ + // Ref: python tests/fake_stt.py - 90-92 lines + get recognizeCh(): AsyncIterableQueue { + return this._recognizeCh; + } + + /** Channel: one stream instance per `stream()` invocation. */ + // Ref: python tests/fake_stt.py - 94-96 lines + get streamCh(): AsyncIterableQueue { + return this._streamCh; + } + + // Ref: python tests/fake_stt.py - 98-100 lines + get fakeUserSpeeches(): FakeUserSpeech[] | null { + return this._fakeUserSpeeches; + } + + /** Resolves once the scheduled `fake_user_speeches` have all been emitted. */ + // Ref: python tests/fake_stt.py - 102-104 lines + get fakeUserSpeechesDone(): Promise { + return this._fakeUserSpeechesDone; + } + + /** @internal Read-only state snapshot for the stream to consult. */ + get _state(): Readonly<{ + fakeException: Error | null; + fakeTranscript: string | null; + fakeTimeoutMs: number | null; + fakeUserSpeeches: FakeUserSpeech[] | null; + fakeRequireAudio: boolean; + }> { + return { + fakeException: this._fakeException, + fakeTranscript: this._fakeTranscript, + fakeTimeoutMs: this._fakeTimeoutMs, + fakeUserSpeeches: this._fakeUserSpeeches, + fakeRequireAudio: this._fakeRequireAudio, + }; + } + + /** @internal Called from the stream once it has finished the scheduled speeches. */ + _markFakeUserSpeechesDone(): void { + this._resolveFakeUserSpeechesDone(); + } + + // Ref: python tests/fake_stt.py - 106-124 lines + protected async _recognize(_frame: AudioBuffer): Promise { + if (this._fakeTimeoutMs !== null) { + await delay(this._fakeTimeoutMs); + } + if (this._fakeException !== null) { + throw this._fakeException; + } + return { + type: SpeechEventType.FINAL_TRANSCRIPT, + alternatives: [ + { + text: this._fakeTranscript ?? '', + language: asLanguageCode(''), + startTime: 0, + endTime: 0, + confidence: 1, + }, + ], + }; + } + + // Ref: python tests/fake_stt.py - 126-134 lines + override async recognize(frame: AudioBuffer, abortSignal?: AbortSignal): Promise { + this._recognizeCh.put(new RecognizeSentinel()); + return super.recognize(frame, abortSignal); + } + + // Ref: python tests/fake_stt.py - 136-147 lines + override stream(options?: { connOptions?: APIConnectOptions }): FakeRecognizeStream { + const stream = new FakeRecognizeStream(this, options?.connOptions); + this._streamCh.put(stream); + return stream; + } +} + +/** + * Stream returned by {@link FakeSTT.stream}. Exposes an `attempt` counter and + * `sendFakeTranscript()` so tests can inject interim/final events at will. + */ +// Ref: python tests/fake_stt.py - 150-227 lines +export class FakeRecognizeStream extends SpeechStream { + label: string; + + private _attempt = 0; + private _fakeStt: FakeSTT; + + // Ref: python tests/fake_stt.py - 151-160 lines + constructor(stt: FakeSTT, connOptions?: APIConnectOptions) { + super(stt, undefined, connOptions); + this._fakeStt = stt; + this.label = `${stt.label}.stream`; + } + + // Ref: python tests/fake_stt.py - 162-164 lines + get attempt(): number { + return this._attempt; + } + + /** Push a synthetic INTERIM or FINAL event onto the output queue. */ + // Ref: python tests/fake_stt.py - 166-174 lines + sendFakeTranscript(transcript: string, isFinal = true): void { + this.queue.put({ + type: isFinal ? SpeechEventType.FINAL_TRANSCRIPT : SpeechEventType.INTERIM_TRANSCRIPT, + alternatives: [ + { + text: transcript, + language: asLanguageCode(''), + startTime: 0, + endTime: 0, + confidence: 1, + }, + ], + }); + } + + // Ref: python tests/fake_stt.py - 176-202 lines + protected async run(): Promise { + this._attempt += 1; + const state = this._fakeStt._state; + + if (state.fakeTimeoutMs !== null) { + await delay(state.fakeTimeoutMs); + } + + if (state.fakeRequireAudio) { + // Emit a transcript only after we've both received audio frames and + // seen a flush — matches the Python fake's shape for providers that + // block on real audio. + let gotAudio = false; + for await (const data of this.input) { + if (data === SpeechStream.FLUSH_SENTINEL) { + if (gotAudio && state.fakeTranscript !== null) { + this.sendFakeTranscript(state.fakeTranscript); + } + gotAudio = false; + } else { + gotAudio = true; + } + } + } else { + if (state.fakeTranscript !== null) { + this.sendFakeTranscript(state.fakeTranscript); + } + await this.fakeUserSpeechTask(); + // Drain remaining input until EOF so the stream terminates cleanly. + for await (const _ of this.input) { + /* noop */ + } + } + + if (state.fakeException !== null) { + throw state.fakeException; + } + } + + // Ref: python tests/fake_stt.py - 204-227 lines + private async fakeUserSpeechTask(): Promise { + const speeches = this._fakeStt._state.fakeUserSpeeches; + if (!speeches || speeches.length === 0) return; + + // Anchor the clock to the first frame the caller pushes. + const first = await this.input.next(); + if (first.done) return; + + // Elapsed time in milliseconds since the first pushed frame. + const startHrt = process.hrtime.bigint(); + const elapsed = (): number => Number(process.hrtime.bigint() - startHrt) / 1e6; + + for (const speech of speeches) { + const interimAt = speech.endTime + speech.sttDelay * 0.5; + if (elapsed() < interimAt) await delay(interimAt - elapsed()); + const interim = speech.transcript.split(/\s+/).slice(0, 2).join(' '); + this.sendFakeTranscript(interim, false); + + const finalAt = speech.endTime + speech.sttDelay; + if (elapsed() < finalAt) await delay(finalAt - elapsed()); + this.sendFakeTranscript(speech.transcript, true); + } + + this._fakeStt._markFakeUserSpeechesDone(); + } +} + +/** Convenience: a zero-length audio frame suitable for `recognize()` calls. */ +export function emptyAudioFrame(): AudioBuffer { + // `recognize()` only reads the frame for its audio duration. Empty is + // valid for callers that only care about the exception/transcript path. + return [] as unknown as AudioBuffer; +} + +export type { AudioFrame }; diff --git a/agents/src/stt/testing/index.ts b/agents/src/stt/testing/index.ts new file mode 100644 index 000000000..99f2e398f --- /dev/null +++ b/agents/src/stt/testing/index.ts @@ -0,0 +1,13 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +export { + emptyAudioFrame, + FakeRecognizeStream, + FakeSTT, + type FakeSTTOptions, + type FakeUserSpeech, + RecognizeSentinel, + speedUpFakeUserSpeech, +} from './fake_stt.js';