Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/room-io-frame-processor-ownership.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

fix(room-io): ownership-aware FrameProcessor lifecycle management in `ParticipantAudioInputStream`. Introduces a `processorOwned` flag and an internal `updateProcessor()` helper that only closes the previous processor when the stream owns it, so an externally-provided `FrameProcessor` survives track transitions and is only closed on `close()`. Ported from Python PR [livekit/agents#5467](https://github.com/livekit/agents/pull/5467).
77 changes: 77 additions & 0 deletions agents/src/voice/room_io/_input.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { describe, expect, it, vi } from 'vitest';
import { ParticipantAudioInputStream } from './_input.js';

type InputInternals = {
frameProcessor: { close: () => void } | undefined;
processorOwned: boolean;
updateProcessor: (p: { close: () => void } | undefined) => void;
};

/**
* Ref: python livekit-agents/livekit/agents/voice/room_io/_input.py
*
* Mirrors the Python PR (livekit/agents#5467) lifecycle invariants for the
* subset of behaviors that apply to agents-js — i.e. externally-provided
* FrameProcessor instances. The JS API does not currently accept a
* selector callable, so selector-owned scenarios are N/A here.
*/
describe('ParticipantAudioInputStream processor ownership', () => {
const makeInternal = (
frameProcessor: { close: () => void } | undefined,
processorOwned: boolean,
): InputInternals => {
const target = Object.create(ParticipantAudioInputStream.prototype) as InputInternals;
target.frameProcessor = frameProcessor;
target.processorOwned = processorOwned;
return target;
};

it('updateProcessor(undefined) is a no-op for externally-provided processors', () => {
const external = { close: vi.fn() };
const target = makeInternal(external, false);

target.updateProcessor(undefined);

expect(external.close).not.toHaveBeenCalled();
expect(target.frameProcessor).toBe(external);
expect(target.processorOwned).toBe(false);
});

it('updateProcessor(undefined) closes and clears an owned processor', () => {
const owned = { close: vi.fn() };
const target = makeInternal(owned, true);

target.updateProcessor(undefined);

expect(owned.close).toHaveBeenCalledTimes(1);
expect(target.frameProcessor).toBeUndefined();
expect(target.processorOwned).toBe(false);
});

it('updateProcessor(new) replaces and closes an owned predecessor', () => {
const oldOwned = { close: vi.fn() };
const replacement = { close: vi.fn() };
const target = makeInternal(oldOwned, true);

target.updateProcessor(replacement);

expect(oldOwned.close).toHaveBeenCalledTimes(1);
expect(replacement.close).not.toHaveBeenCalled();
expect(target.frameProcessor).toBe(replacement);
expect(target.processorOwned).toBe(true);
});

it('updateProcessor(same processor) does not close itself', () => {
const proc = { close: vi.fn() };
const target = makeInternal(proc, true);

target.updateProcessor(proc);

expect(proc.close).not.toHaveBeenCalled();
expect(target.frameProcessor).toBe(proc);
expect(target.processorOwned).toBe(true);
});
});
31 changes: 31 additions & 0 deletions agents/src/voice/room_io/_input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ export class ParticipantAudioInputStream extends AudioInput {
private numChannels: number;
private noiseCancellation?: NoiseCancellationOptions;
private frameProcessor?: FrameProcessor<AudioFrame>;
// Ref: python livekit-agents/livekit/agents/voice/room_io/_input.py - 55-56 lines
// Tracks whether the current frameProcessor was created internally (owned by
// this stream) vs. supplied externally by the caller. JS currently only
// accepts an externally-provided FrameProcessor, so this is always false for
// processors set from the constructor; the flag is retained for structural
// symmetry with Python and to document the invariant.
private processorOwned = false;
private publication: RemoteTrackPublication | null = null;
private participantIdentity: string | null = null;
private currentInputId: string | null = null;
Expand Down Expand Up @@ -55,6 +62,22 @@ export class ParticipantAudioInputStream extends AudioInput {
this.room.on(RoomEvent.TokenRefreshed, this.onTokenRefreshed);
}

// Ref: python livekit-agents/livekit/agents/voice/room_io/_input.py - 172-181 lines
// Ownership-aware replacement: only close the previous processor when this
// stream owns it. External processors survive stream transitions; only
// `close()` below closes them unconditionally.
private updateProcessor(processor: FrameProcessor<AudioFrame> | undefined) {
if (processor === undefined && !this.processorOwned) {
return;
}
const old = this.frameProcessor;
if (old && old !== processor && this.processorOwned) {
old.close();
}
this.frameProcessor = processor;
this.processorOwned = processor !== undefined;
}

setParticipant(participant: RemoteParticipant | string | null) {
this.logger.debug({ participant }, 'setting participant audio input');
const participantIdentity =
Expand Down Expand Up @@ -130,6 +153,9 @@ export class ParticipantAudioInputStream extends AudioInput {
}

this.publication = null;
// Ref: python livekit-agents/livekit/agents/voice/room_io/_input.py - 188 lines
// Ownership-aware teardown: no-op for externally-provided processors.
this.updateProcessor(undefined);
}

private onTrackSubscribed = (
Expand Down Expand Up @@ -190,7 +216,12 @@ export class ParticipantAudioInputStream extends AudioInput {
this.closeStream();
await super.close();

// Ref: python livekit-agents/livekit/agents/voice/room_io/_input.py - aclose unconditionally closes
// close() closes the processor unconditionally, regardless of ownership —
// owned processors are already cleared via closeStream(); this handles the
// externally-provided case.
this.frameProcessor?.close();
this.frameProcessor = undefined;
this.processorOwned = false;
}
}
Loading