diff --git a/.changeset/room-io-frame-processor-ownership.md b/.changeset/room-io-frame-processor-ownership.md new file mode 100644 index 000000000..0d8ad7dde --- /dev/null +++ b/.changeset/room-io-frame-processor-ownership.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +fix(room-io): ownership-aware FrameProcessor lifecycle management in `ParticipantAudioInputStream`. Introduces a `processorOwned` flag and an internal `updateProcessor()` helper that only closes the previous processor when the stream owns it, so an externally-provided `FrameProcessor` survives track transitions and is only closed on `close()`. Ported from Python PR [livekit/agents#5467](https://github.com/livekit/agents/pull/5467). diff --git a/agents/src/voice/room_io/_input.test.ts b/agents/src/voice/room_io/_input.test.ts new file mode 100644 index 000000000..cc1b7beb1 --- /dev/null +++ b/agents/src/voice/room_io/_input.test.ts @@ -0,0 +1,77 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { describe, expect, it, vi } from 'vitest'; +import { ParticipantAudioInputStream } from './_input.js'; + +type InputInternals = { + frameProcessor: { close: () => void } | undefined; + processorOwned: boolean; + updateProcessor: (p: { close: () => void } | undefined) => void; +}; + +/** + * Ref: python livekit-agents/livekit/agents/voice/room_io/_input.py + * + * Mirrors the Python PR (livekit/agents#5467) lifecycle invariants for the + * subset of behaviors that apply to agents-js — i.e. externally-provided + * FrameProcessor instances. The JS API does not currently accept a + * selector callable, so selector-owned scenarios are N/A here. + */ +describe('ParticipantAudioInputStream processor ownership', () => { + const makeInternal = ( + frameProcessor: { close: () => void } | undefined, + processorOwned: boolean, + ): InputInternals => { + const target = Object.create(ParticipantAudioInputStream.prototype) as InputInternals; + target.frameProcessor = frameProcessor; + target.processorOwned = processorOwned; + return target; + }; + + it('updateProcessor(undefined) is a no-op for externally-provided processors', () => { + const external = { close: vi.fn() }; + const target = makeInternal(external, false); + + target.updateProcessor(undefined); + + expect(external.close).not.toHaveBeenCalled(); + expect(target.frameProcessor).toBe(external); + expect(target.processorOwned).toBe(false); + }); + + it('updateProcessor(undefined) closes and clears an owned processor', () => { + const owned = { close: vi.fn() }; + const target = makeInternal(owned, true); + + target.updateProcessor(undefined); + + expect(owned.close).toHaveBeenCalledTimes(1); + expect(target.frameProcessor).toBeUndefined(); + expect(target.processorOwned).toBe(false); + }); + + it('updateProcessor(new) replaces and closes an owned predecessor', () => { + const oldOwned = { close: vi.fn() }; + const replacement = { close: vi.fn() }; + const target = makeInternal(oldOwned, true); + + target.updateProcessor(replacement); + + expect(oldOwned.close).toHaveBeenCalledTimes(1); + expect(replacement.close).not.toHaveBeenCalled(); + expect(target.frameProcessor).toBe(replacement); + expect(target.processorOwned).toBe(true); + }); + + it('updateProcessor(same processor) does not close itself', () => { + const proc = { close: vi.fn() }; + const target = makeInternal(proc, true); + + target.updateProcessor(proc); + + expect(proc.close).not.toHaveBeenCalled(); + expect(target.frameProcessor).toBe(proc); + expect(target.processorOwned).toBe(true); + }); +}); diff --git a/agents/src/voice/room_io/_input.ts b/agents/src/voice/room_io/_input.ts index 6ede89e2f..2b9f8973c 100644 --- a/agents/src/voice/room_io/_input.ts +++ b/agents/src/voice/room_io/_input.ts @@ -24,6 +24,13 @@ export class ParticipantAudioInputStream extends AudioInput { private numChannels: number; private noiseCancellation?: NoiseCancellationOptions; private frameProcessor?: FrameProcessor; + // Ref: python livekit-agents/livekit/agents/voice/room_io/_input.py - 55-56 lines + // Tracks whether the current frameProcessor was created internally (owned by + // this stream) vs. supplied externally by the caller. JS currently only + // accepts an externally-provided FrameProcessor, so this is always false for + // processors set from the constructor; the flag is retained for structural + // symmetry with Python and to document the invariant. + private processorOwned = false; private publication: RemoteTrackPublication | null = null; private participantIdentity: string | null = null; private currentInputId: string | null = null; @@ -55,6 +62,22 @@ export class ParticipantAudioInputStream extends AudioInput { this.room.on(RoomEvent.TokenRefreshed, this.onTokenRefreshed); } + // Ref: python livekit-agents/livekit/agents/voice/room_io/_input.py - 172-181 lines + // Ownership-aware replacement: only close the previous processor when this + // stream owns it. External processors survive stream transitions; only + // `close()` below closes them unconditionally. + private updateProcessor(processor: FrameProcessor | undefined) { + if (processor === undefined && !this.processorOwned) { + return; + } + const old = this.frameProcessor; + if (old && old !== processor && this.processorOwned) { + old.close(); + } + this.frameProcessor = processor; + this.processorOwned = processor !== undefined; + } + setParticipant(participant: RemoteParticipant | string | null) { this.logger.debug({ participant }, 'setting participant audio input'); const participantIdentity = @@ -130,6 +153,9 @@ export class ParticipantAudioInputStream extends AudioInput { } this.publication = null; + // Ref: python livekit-agents/livekit/agents/voice/room_io/_input.py - 188 lines + // Ownership-aware teardown: no-op for externally-provided processors. + this.updateProcessor(undefined); } private onTrackSubscribed = ( @@ -190,7 +216,12 @@ export class ParticipantAudioInputStream extends AudioInput { this.closeStream(); await super.close(); + // Ref: python livekit-agents/livekit/agents/voice/room_io/_input.py - aclose unconditionally closes + // close() closes the processor unconditionally, regardless of ownership — + // owned processors are already cleared via closeStream(); this handles the + // externally-provided case. this.frameProcessor?.close(); this.frameProcessor = undefined; + this.processorOwned = false; } }