Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 102 additions & 66 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
from collections.abc import AsyncIterable, Coroutine, Sequence
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal

from opentelemetry import context as otel_context, trace

Expand Down Expand Up @@ -130,6 +130,7 @@ class _PreemptiveGeneration:
@dataclass
class _PausedSpeechInfo:
handle: SpeechHandle
generation_step: int
agent_state: AgentState
timeout: float

Expand Down Expand Up @@ -1611,14 +1612,18 @@ def _interrupt_by_audio_activity(
and not self._current_speech.interrupted
and self._current_speech.allow_interruptions
):
current_speech = self._current_speech
current_generation_playout_active = current_speech.current_generation_playout_active

# reset the false interruption timer
if self._false_interruption_timer:
self._false_interruption_timer.cancel()
self._false_interruption_timer = None

# only interrupt if not already interrupting
if (
self._audio_recognition
current_generation_playout_active
and self._audio_recognition
and not self._audio_recognition._endpointing.overlapping
and self._session.agent_state == "speaking"
):
Expand All @@ -1630,7 +1635,7 @@ def _interrupt_by_audio_activity(
assert (timeout := interruption_options["false_interruption_timeout"]) is not None
assert (audio_output := self._session.output.audio) is not None

self._update_paused_speech(self._current_speech, timeout)
self._update_paused_speech(current_speech, timeout)
audio_output.pause()
self._session._update_agent_state("listening")
if self._audio_recognition:
Expand All @@ -1643,7 +1648,7 @@ def _interrupt_by_audio_activity(
if self._rt_session is not None:
self._rt_session.interrupt()

self._current_speech.interrupt()
current_speech.interrupt()

# region recognition hooks

Expand Down Expand Up @@ -2110,6 +2115,42 @@ def _on_pipeline_reply_done(self, _: asyncio.Task[None]) -> None:
if self.interruption_enabled:
self._restore_interruption_by_audio_activity()

def _on_generation_playout_started(
self, *, speech_handle: SpeechHandle, started_at: float
) -> None:
speech_handle._mark_current_generation_playout_started()
self._session._update_agent_state(
"speaking",
start_time=started_at,
otel_context=speech_handle._agent_turn_context,
)
if self._audio_recognition:
self._audio_recognition.on_start_of_agent_speech(started_at=started_at)
if self.interruption_enabled:
self._interruption_by_audio_activity_enabled = False

def _on_generation_playout_finished(
self,
*,
speech_handle: SpeechHandle,
next_state: Literal["thinking", "listening"],
ignore_user_transcript_until: float | None = None,
notify_audio_recognition: bool = True,
restore_interruption_by_audio_activity: bool = True,
) -> None:
speech_handle._mark_current_generation_playout_finished()

if self._session.agent_state != "speaking":
return

self._session._update_agent_state(next_state)
if notify_audio_recognition and self._audio_recognition:
self._audio_recognition.on_end_of_agent_speech(
ignore_user_transcript_until=ignore_user_transcript_until or time.time()
)
if restore_interruption_by_audio_activity and self.interruption_enabled:
self._restore_interruption_by_audio_activity()

@utils.log_exceptions(logger=logger)
async def _tts_task(
self,
Expand Down Expand Up @@ -2201,15 +2242,10 @@ def _on_first_frame(fut: asyncio.Future[float] | asyncio.Future[None]) -> None:
except BaseException:
return

self._session._update_agent_state(
"speaking",
start_time=started_speaking_at,
otel_context=speech_handle._agent_turn_context,
self._on_generation_playout_started(
speech_handle=speech_handle,
started_at=started_speaking_at,
)
if self._audio_recognition:
self._audio_recognition.on_start_of_agent_speech(started_at=started_speaking_at)
if self.interruption_enabled:
self._interruption_by_audio_activity_enabled = False

audio_out: _AudioOutput | None = None
tts_gen_data: _TTSGenerationData | None = None
Expand Down Expand Up @@ -2319,14 +2355,11 @@ def _on_first_frame(fut: asyncio.Future[float] | asyncio.Future[None]) -> None:
speech_handle._item_added([msg])
self._session._conversation_item_added(msg)

if self._session.agent_state == "speaking":
self._session._update_agent_state("listening")
if self._audio_recognition:
self._audio_recognition.on_end_of_agent_speech(
ignore_user_transcript_until=time.time()
)
if self.interruption_enabled:
self._restore_interruption_by_audio_activity()
self._on_generation_playout_finished(
speech_handle=speech_handle,
next_state="listening",
ignore_user_transcript_until=time.time(),
)

if audio_out is not None and not audio_out.first_frame_fut.done():
audio_out.first_frame_fut.cancel()
Expand Down Expand Up @@ -2544,18 +2577,11 @@ def _on_first_frame(fut: asyncio.Future[float] | asyncio.Future[None]) -> None:
started_speaking_at - user_metrics["stopped_speaking_at"]
)
self._session._early_assistant_metrics = early_metrics

self._session._update_agent_state(
"speaking",
start_time=started_speaking_at,
otel_context=speech_handle._agent_turn_context,
self._on_generation_playout_started(
speech_handle=speech_handle,
started_at=started_speaking_at,
)

if self._audio_recognition:
self._audio_recognition.on_start_of_agent_speech(started_at=started_speaking_at)
if self.interruption_enabled:
self._interruption_by_audio_activity_enabled = False

audio_out: _AudioOutput | None = None
if audio_output is not None:
assert tts_gen_data is not None
Expand Down Expand Up @@ -2679,15 +2705,18 @@ def _tool_execution_completed_cb(out: ToolExecutionOutput) -> None:
current_span.set_attribute(trace_types.ATTR_RESPONSE_TEXT, forwarded_text)

if not speech_handle.interrupted and len(tool_output.output) > 0:
self._session._update_agent_state("thinking")
elif self._session.agent_state == "speaking":
self._session._update_agent_state("listening")
if self._audio_recognition:
self._audio_recognition.on_end_of_agent_speech(
ignore_user_transcript_until=time.time()
)
if self.interruption_enabled:
self._restore_interruption_by_audio_activity()
self._on_generation_playout_finished(
speech_handle=speech_handle,
next_state="thinking",
notify_audio_recognition=False,
restore_interruption_by_audio_activity=False,
)
else:
self._on_generation_playout_finished(
speech_handle=speech_handle,
next_state="listening",
ignore_user_transcript_until=time.time(),
)

if audio_out is not None and not audio_out.first_frame_fut.done():
audio_out.first_frame_fut.cancel()
Expand Down Expand Up @@ -2719,7 +2748,7 @@ def _tool_execution_completed_cb(out: ToolExecutionOutput) -> None:
extra={"speech_id": speech_handle.id},
)

speech_handle._num_steps += 1
speech_handle._advance_step()

new_calls: list[llm.FunctionCall] = []
new_fnc_outputs: list[llm.FunctionCallOutput] = []
Expand Down Expand Up @@ -2996,15 +3025,10 @@ def _on_first_frame(fut: asyncio.Future[float] | asyncio.Future[None]) -> None:
except BaseException:
return

self._session._update_agent_state(
"speaking",
start_time=started_speaking_at,
otel_context=speech_handle._agent_turn_context,
self._on_generation_playout_started(
speech_handle=speech_handle,
started_at=started_speaking_at,
)
if self._audio_recognition:
self._audio_recognition.on_start_of_agent_speech(started_at=started_speaking_at)
if self.interruption_enabled:
self._interruption_by_audio_activity_enabled = False

tasks: list[asyncio.Task[Any]] = []
tees: list[utils.aio.itertools.Tee[Any]] = []
Expand Down Expand Up @@ -3175,16 +3199,14 @@ def _tool_execution_completed_cb(out: ToolExecutionOutput) -> None:
await speech_handle.wait_if_not_interrupted(
[asyncio.ensure_future(audio_output.wait_for_playout())]
)
self._session._update_agent_state("listening")
if self._audio_recognition:
self._audio_recognition.on_end_of_agent_speech(
ignore_user_transcript_until=time.time()
)
if self.interruption_enabled:
self._restore_interruption_by_audio_activity()
current_span.set_attribute(
trace_types.ATTR_SPEECH_INTERRUPTED, speech_handle.interrupted
)

self._on_generation_playout_finished(
speech_handle=speech_handle,
next_state="listening",
ignore_user_transcript_until=time.time(),
)

current_span.set_attribute(trace_types.ATTR_SPEECH_INTERRUPTED, speech_handle.interrupted)

stopped_speaking_at = time.time()

Expand Down Expand Up @@ -3284,7 +3306,7 @@ def _create_assistant_message(
# important: no agent output should be used after this point

if len(tool_output.output) > 0:
speech_handle._num_steps += 1
speech_handle._advance_step()

new_fnc_outputs: list[llm.FunctionCallOutput] = []
generate_tool_reply: bool = False
Expand Down Expand Up @@ -3390,11 +3412,16 @@ def _update_paused_speech(self, speech_handle: SpeechHandle, timeout: float) ->
``agent_state`` captured at first pause is preserved, so the resume
path restores the correct state even across multiple calls.
"""
if self._paused_speech and self._paused_speech.handle is speech_handle:
if (
self._paused_speech
and self._paused_speech.handle is speech_handle
and self._paused_speech.generation_step == speech_handle.num_steps
):
self._paused_speech.timeout = timeout
else:
self._paused_speech = _PausedSpeechInfo(
handle=speech_handle,
generation_step=speech_handle.num_steps,
agent_state=self._session.agent_state,
timeout=timeout,
)
Expand All @@ -3413,25 +3440,34 @@ def _start_false_interruption_timer(self, timeout: float) -> None:
self._false_interruption_timer.cancel()

def _on_false_interruption() -> None:
if self._paused_speech is None or (
self._current_speech and self._current_speech is not self._paused_speech.handle
paused_speech = self._paused_speech
if (
paused_speech is None
or (self._current_speech and self._current_speech is not paused_speech.handle)
or (paused_speech.generation_step != paused_speech.handle.num_steps)
):
# already new speech is scheduled, do nothing
self._paused_speech = None
if (
paused_speech is not None
and (audio_output := self._session.output.audio)
and audio_output.can_pause
):
audio_output.resume()
return

resumed = False
if (
self._session.options.interruption["resume_false_interruption"]
and (audio_output := self._session.output.audio)
and audio_output.can_pause
and not self._paused_speech.handle.done()
and not paused_speech.handle.done()
):
self._session._update_agent_state(
self._paused_speech.agent_state,
otel_context=self._paused_speech.handle._agent_turn_context,
paused_speech.agent_state,
otel_context=paused_speech.handle._agent_turn_context,
)
if self._audio_recognition and self._paused_speech.agent_state == "speaking":
if self._audio_recognition and paused_speech.agent_state == "speaking":
self._audio_recognition.on_start_of_agent_speech(started_at=time.time())
if self.interruption_enabled:
self._interruption_by_audio_activity_enabled = False
Expand Down
15 changes: 15 additions & 0 deletions livekit-agents/livekit/agents/voice/speech_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(
self._done_fut = asyncio.Future[None]()
self._scheduled_fut = asyncio.Future[None]()
self._authorize_event = asyncio.Event()
self._current_generation_playout_active = False

self._generations: list[asyncio.Future[None]] = []

Expand Down Expand Up @@ -131,6 +132,10 @@ def allow_interruptions(self, value: bool) -> None:

self._allow_interruptions = value

@property
def current_generation_playout_active(self) -> bool:
return self._current_generation_playout_active

@property
def chat_items(self) -> list[llm.ChatItem]:
return self._chat_items
Expand Down Expand Up @@ -251,6 +256,16 @@ def _authorize_generation(self) -> None:
self._generations.append(fut)
self._authorize_event.set()

def _mark_current_generation_playout_started(self) -> None:
self._current_generation_playout_active = True

def _mark_current_generation_playout_finished(self) -> None:
self._current_generation_playout_active = False

def _advance_step(self) -> None:
self._num_steps += 1
self._current_generation_playout_active = False

def _clear_authorization(self) -> None:
self._authorize_event.clear()

Expand Down
Loading