Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions agents/src/tts/fallback_adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class MockSynthesizeStream extends SynthesizeStream {
constructor(
private mockTts: MockTTS,
private shouldFail: boolean,
private emitAudio: boolean,
connOptions?: APIConnectOptions,
) {
super(mockTts, connOptions);
Expand All @@ -37,6 +38,7 @@ class MockSynthesizeStream extends SynthesizeStream {
for await (const data of this.input) {
if (this.abortController.signal.aborted) break;
if (data === SynthesizeStream.FLUSH_SENTINEL) continue;
if (!this.emitAudio) continue;
this.queue.put({
requestId: 'mock-req',
segmentId: 'mock-seg',
Expand All @@ -53,6 +55,7 @@ class MockChunkedStream extends ChunkedStream {
private mockTts: MockTTS,
text: string,
private shouldFail: boolean,
private emitAudio: boolean,
connOptions?: APIConnectOptions,
) {
super(text, mockTts, connOptions);
Expand All @@ -61,6 +64,7 @@ class MockChunkedStream extends ChunkedStream {
if (this.shouldFail) {
throw new APIError('mock TTS failed immediately');
}
if (!this.emitAudio) return;
this.queue.put({
requestId: 'mock-req',
segmentId: 'mock-seg',
Expand All @@ -73,18 +77,19 @@ class MockChunkedStream extends ChunkedStream {
class MockTTS extends TTS {
label: string;
shouldFail = false;
emitAudio = true;

constructor(label: string, sampleRate: number = SAMPLE_RATE) {
super(sampleRate, 1, { streaming: true });
this.label = label;
}

synthesize(text: string, connOptions?: APIConnectOptions): ChunkedStream {
return new MockChunkedStream(this, text, this.shouldFail, connOptions);
return new MockChunkedStream(this, text, this.shouldFail, this.emitAudio, connOptions);
}

stream(options?: { connOptions?: APIConnectOptions }): SynthesizeStream {
return new MockSynthesizeStream(this, this.shouldFail, options?.connOptions);
return new MockSynthesizeStream(this, this.shouldFail, this.emitAudio, options?.connOptions);
}
}

Expand Down Expand Up @@ -190,6 +195,46 @@ describe('TTS FallbackAdapter', () => {
await adapter.close();
});

it('should not mark the primary unavailable when closed before any audio is received', async () => {
// Regression: abort before first audio frame previously raised
// APIConnectionError, causing markUnAvailable on the primary.
const primary = new MockTTS('primary');
primary.emitAudio = false;
const secondary = new MockTTS('secondary');
secondary.emitAudio = false;
const adapter = new FallbackAdapter({
ttsInstances: [primary, secondary],
maxRetryPerTTS: 0,
recoveryDelayMs: 60_000,
});

const stream = adapter.stream();
stream.updateInputStream(
new ReadableStream<string>({
start(controller) {
controller.enqueue('hello world');
},
}),
);

const iterate = (async () => {
for await (const event of stream) {
if (event === SynthesizeStream.END_OF_STREAM) break;
}
})();

await new Promise((r) => setTimeout(r, 50));
stream.close();
await iterate;
// Allow mainTask to fully unwind both TTS instances.
await new Promise((r) => setTimeout(r, 200));

expect(adapter.status[0]!.available).toBe(true);
expect(adapter.status[1]!.available).toBe(true);

await adapter.close();
});

it('should fall back in the non-streaming (synthesize) path with mismatched sample rates', async () => {
// FallbackChunkedStream has the same phantom-flush vulnerability as
// FallbackSynthesizeStream: when the primary's sample rate differs from
Expand Down
6 changes: 6 additions & 0 deletions agents/src/tts/fallback_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,12 @@ class FallbackSynthesizeStream extends SynthesizeStream {
// Silent failures must trigger fallback. See `sawRawAudio` above for
// why we don't check `audioPushed` here.
if (!sawRawAudio) {
// Abort before first audio frame is an interruption, not a provider failure.
if (this.abortController.signal.aborted) {
this.queue.put(SynthesizeStream.END_OF_STREAM);
await readInputLLMStream.catch(() => {});
return;
}
throw new APIConnectionError({
message: 'TTS stream completed but no audio was received',
});
Expand Down