Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion examples/avatar_agents/audio_wave/agent_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@
from dotenv import load_dotenv

from livekit import api, rtc
from livekit.agents import Agent, AgentServer, AgentSession, JobContext, cli, inference
from livekit.agents import (
Agent,
AgentServer,
AgentSession,
ConversationItemAddedEvent,
JobContext,
cli,
inference,
)
from livekit.agents.voice.avatar import DataStreamAudioOutput
from livekit.agents.voice.io import PlaybackFinishedEvent, PlaybackStartedEvent
from livekit.agents.voice.room_io import ATTRIBUTE_PUBLISH_ON_BEHALF
Expand Down Expand Up @@ -103,6 +111,14 @@ def on_playback_started(ev: PlaybackStartedEvent) -> None:
extra={"created_at": ev.created_at},
)

@session.on("conversation_item_added")
def on_conversation_item_added(ev: ConversationItemAddedEvent) -> None:
if ev.item.type == "message" and ev.item.role == "assistant":
logger.info(
"agent response metrics",
extra={"metrics": ev.item.metrics},
)

await session.generate_reply(instructions="say hello to the user")


Expand Down
10 changes: 10 additions & 0 deletions livekit-agents/livekit/agents/llm/chat_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,16 @@ class MetricsReport(TypedDict, total=False):
Assistant `ChatMessage` only
"""

playback_latency: float
"""Delay between forwarding the first audio frame and the `AudioOutput` reporting
playback started. Near-zero for the default room output (self-reported when the frame
is pushed to the track, so it doesn't account for network delivery to the client);
meaningful when a remote avatar worker is in the chain and reports playback via
the `lk.playback_started` RPC.

Assistant `ChatMessage` only
"""
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! Is it included inside the e2e_latency?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's included inside the e2e_latency


e2e_latency: float
"""Time from when the user finished speaking to when the agent began responding

Expand Down
49 changes: 44 additions & 5 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
from collections.abc import AsyncIterable, Coroutine, Sequence
from dataclasses import dataclass
from functools import partial
from typing import TYPE_CHECKING, Any

from opentelemetry import context as otel_context, trace
Expand Down Expand Up @@ -2188,16 +2189,22 @@ async def _read_text() -> AsyncIterable[str]:
forward_text_task: asyncio.Task[Any] | None = None
started_speaking_at: float | None = None
stopped_speaking_at: float | None = None
started_forwarding_at: float | None = None

def _on_first_frame(fut: asyncio.Future[float] | asyncio.Future[None]) -> None:
"""
Callback to update the agent state when the first frame is captured:
1. _AudioOutput.first_frame_fut (float)
2. _TextOutput.first_text_fut (None)
"""
nonlocal started_speaking_at
nonlocal started_speaking_at, started_forwarding_at
try:
started_speaking_at = fut.result() or time.time()
started_forwarding_at = (
audio_out.started_forwarding_at
if audio_out and audio_out.started_forwarding_at is not None
else started_speaking_at
)
except BaseException:
return

Expand Down Expand Up @@ -2310,6 +2317,11 @@ def _on_first_frame(fut: asyncio.Future[float] | asyncio.Future[None]) -> None:
assistant_metrics["started_speaking_at"] = started_speaking_at
assistant_metrics["stopped_speaking_at"] = stopped_speaking_at

if started_forwarding_at is not None:
assistant_metrics["playback_latency"] = (
started_speaking_at - started_forwarding_at
)

msg = self._agent._chat_ctx.add_message(
role="assistant",
content=forwarded_text,
Expand Down Expand Up @@ -2519,16 +2531,22 @@ async def _read_text(

started_speaking_at: float | None = None
stopped_speaking_at: float | None = None
started_forwarding_at: float | None = None

def _on_first_frame(fut: asyncio.Future[float] | asyncio.Future[None]) -> None:
"""
Callback to update the agent state when the first frame is captured:
1. _AudioOutput.first_frame_fut (float)
2. _TextOutput.first_text_fut (None)
"""
nonlocal started_speaking_at
nonlocal started_speaking_at, started_forwarding_at
try:
started_speaking_at = fut.result() or time.time()
started_forwarding_at = (
audio_out.started_forwarding_at
if audio_out and audio_out.started_forwarding_at is not None
else started_speaking_at
)
except BaseException:
return

Expand All @@ -2539,6 +2557,7 @@ def _on_first_frame(fut: asyncio.Future[float] | asyncio.Future[None]) -> None:
early_metrics["llm_node_ttft"] = llm_gen_data.ttft
if tts_gen_data and tts_gen_data.ttfb is not None:
early_metrics["tts_node_ttfb"] = tts_gen_data.ttfb
early_metrics["playback_latency"] = started_speaking_at - started_forwarding_at
if user_metrics and "stopped_speaking_at" in user_metrics:
early_metrics["e2e_latency"] = (
started_speaking_at - user_metrics["stopped_speaking_at"]
Expand Down Expand Up @@ -2623,6 +2642,11 @@ def _tool_execution_completed_cb(out: ToolExecutionOutput) -> None:
assistant_metrics["started_speaking_at"] = started_speaking_at
assistant_metrics["stopped_speaking_at"] = stopped_speaking_at

if started_forwarding_at is not None:
assistant_metrics["playback_latency"] = (
started_speaking_at - started_forwarding_at
)

if user_metrics and "stopped_speaking_at" in user_metrics:
e2e_latency = started_speaking_at - user_metrics["stopped_speaking_at"]
assistant_metrics["e2e_latency"] = e2e_latency
Expand Down Expand Up @@ -2983,16 +3007,24 @@ async def _realtime_generation_task_impl(

started_speaking_at: float | None = None
stopped_speaking_at: float | None = None
started_forwarding_at: float | None = None

def _on_first_frame(fut: asyncio.Future[float] | asyncio.Future[None]) -> None:
def _on_first_frame(
fut: asyncio.Future[float] | asyncio.Future[None], audio_out: _AudioOutput | None = None
) -> None:
"""
Callback to update the agent state when the first frame is captured:
1. _AudioOutput.first_frame_fut (float)
2. _TextOutput.first_text_fut (None)
"""
nonlocal started_speaking_at
nonlocal started_speaking_at, started_forwarding_at
try:
started_speaking_at = fut.result() or time.time()
started_forwarding_at = (
audio_out.started_forwarding_at
if audio_out and audio_out.started_forwarding_at is not None
else started_speaking_at
)
except BaseException:
return

Expand Down Expand Up @@ -3095,7 +3127,9 @@ async def _read_messages(
tts_output=realtime_audio_result,
)
forward_tasks.append(forward_task)
audio_out.first_frame_fut.add_done_callback(_on_first_frame)
audio_out.first_frame_fut.add_done_callback(
partial(_on_first_frame, audio_out=audio_out)
)

# text output
tr_node = self._agent.transcription_node(tr_text_input, model_settings)
Expand Down Expand Up @@ -3197,6 +3231,11 @@ def _create_assistant_message(
assistant_metrics["started_speaking_at"] = started_speaking_at
assistant_metrics["stopped_speaking_at"] = stopped_speaking_at

if started_forwarding_at is not None:
assistant_metrics["playback_latency"] = (
started_speaking_at - started_forwarding_at
)

msg = llm.ChatMessage(
role="assistant",
content=[forwarded_text],
Expand Down
4 changes: 4 additions & 0 deletions livekit-agents/livekit/agents/voice/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ class _AudioOutput:
first_frame_fut: asyncio.Future[float]
"""Future that will be set with the timestamp of the first frame's capture"""

started_forwarding_at: float | None = None

def _resolve_first_frame_fut(self, ev: io.PlaybackStartedEvent) -> None:
if not self.first_frame_fut.done():
self.first_frame_fut.set_result(ev.created_at)
Expand Down Expand Up @@ -410,6 +412,8 @@ async def _audio_forwarding_task(

async for frame in tts_output:
out.audio.append(frame)
if out.started_forwarding_at is None:
out.started_forwarding_at = time.time()

if (
not out.first_frame_fut.done()
Expand Down
Loading