Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions livekit-agents/livekit/agents/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,15 +321,18 @@ async def _metrics_monitor_task(self, event_aiter: AsyncIterable[ChatChunk]) ->
)

# set gen_ai attributes
self._llm_request_span.set_attributes(
{
trace_types.ATTR_GEN_AI_OPERATION_NAME: "chat",
trace_types.ATTR_GEN_AI_REQUEST_MODEL: self._llm.model,
trace_types.ATTR_GEN_AI_PROVIDER_NAME: self._llm.provider,
trace_types.ATTR_GEN_AI_USAGE_INPUT_TOKENS: metrics.prompt_tokens,
trace_types.ATTR_GEN_AI_USAGE_OUTPUT_TOKENS: metrics.completion_tokens,
},
)
gen_ai_attrs: dict[str, str | int] = {
trace_types.ATTR_GEN_AI_OPERATION_NAME: "chat",
trace_types.ATTR_GEN_AI_REQUEST_MODEL: self._llm.model,
trace_types.ATTR_GEN_AI_PROVIDER_NAME: self._llm.provider,
trace_types.ATTR_GEN_AI_USAGE_INPUT_TOKENS: metrics.prompt_tokens,
trace_types.ATTR_GEN_AI_USAGE_OUTPUT_TOKENS: metrics.completion_tokens,
}
if metrics.prompt_cached_tokens:
gen_ai_attrs[trace_types.ATTR_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS] = (
metrics.prompt_cached_tokens
)
self._llm_request_span.set_attributes(gen_ai_attrs)
if completion_start_time:
self._llm_request_span.set_attribute(
trace_types.ATTR_LANGFUSE_COMPLETION_START_TIME, f'"{completion_start_time}"'
Expand Down
15 changes: 7 additions & 8 deletions livekit-agents/livekit/agents/telemetry/trace_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,14 @@
ATTR_GEN_AI_REQUEST_MODEL = "gen_ai.request.model"
ATTR_GEN_AI_USAGE_INPUT_TOKENS = "gen_ai.usage.input_tokens"
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS = "gen_ai.usage.output_tokens"
ATTR_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS = "gen_ai.usage.cache_read.input_tokens"
ATTR_GEN_AI_USAGE_DETAILS_OUTPUT_AUDIO_TOKENS = "gen_ai.usage.details.output_audio_tokens"
# The following two experimental fields have recently been adopted by
# pydantic (https://github.com/open-telemetry/semantic-conventions/issues/1959#issuecomment-3135486259)
# and LangFuse (https://github.com/langfuse/langfuse/pull/13110) to support audio token counting.
ATTR_GEN_AI_USAGE_DETAILS_INPUT_AUDIO_TOKENS = "gen_ai.usage.details.input_audio_tokens" # pydantic proposed https://github.com/open-telemetry/semantic-conventions/issues/1959#issuecomment-3135486259
ATTR_GEN_AI_USAGE_DETAILS_CACHE_AUDIO_READ_TOKENS = "gen_ai.usage.details.cache_audio_read_tokens" # pddantic proposed https://github.com/open-telemetry/semantic-conventions/issues/1959#issuecomment-3135486259

# Unofficial OpenTelemetry GenAI attributes, these are namespaces recognised by LangFuse
# https://langfuse.com/integrations/native/opentelemetry#usage
# but not yet in the official OpenTelemetry specification.
ATTR_GEN_AI_USAGE_INPUT_TEXT_TOKENS = "gen_ai.usage.input_text_tokens"
ATTR_GEN_AI_USAGE_INPUT_AUDIO_TOKENS = "gen_ai.usage.input_audio_tokens"
ATTR_GEN_AI_USAGE_INPUT_CACHED_TOKENS = "gen_ai.usage.input_cached_tokens"
ATTR_GEN_AI_USAGE_OUTPUT_TEXT_TOKENS = "gen_ai.usage.output_text_tokens"
ATTR_GEN_AI_USAGE_OUTPUT_AUDIO_TOKENS = "gen_ai.usage.output_audio_tokens"

# OpenTelemetry GenAI event names (for structured logging)
EVENT_GEN_AI_SYSTEM_MESSAGE = "gen_ai.system.message"
Expand Down
34 changes: 27 additions & 7 deletions livekit-agents/livekit/agents/telemetry/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,42 @@ def record_exception(span: trace.Span, exception: Exception) -> None:


def record_realtime_metrics(span: trace.Span, ev: RealtimeModelMetrics) -> None:
"""Set OpenTelemetry GenAI usage attributes for Langfuse-compatible OTEL ingestion.

Uses text token counts for top-level ``gen_ai.usage.input_tokens`` and
``gen_ai.usage.output_tokens`` (from ``input_token_details.text_tokens`` and
``output_token_details.text_tokens``). OpenAI Realtime reports those text counts
inclusive of cached text; cache read for pricing breakdown is
``gen_ai.usage.cache_read.input_tokens`` from ``cached_tokens_details.text_tokens``.
Audio tokens use ``gen_ai.usage.details.*`` keys.
"""
model_name = ev.metadata.model_name if ev.metadata else None
model_provider = ev.metadata.model_provider if ev.metadata else None

cached = ev.input_token_details.cached_tokens_details
cache_read_text = cached.text_tokens if cached else 0
cache_read_audio = cached.audio_tokens if cached else 0

attrs: dict[str, str | int] = {
trace_types.ATTR_GEN_AI_OPERATION_NAME: "chat",
trace_types.ATTR_GEN_AI_PROVIDER_NAME: model_provider or "unknown",
trace_types.ATTR_GEN_AI_REQUEST_MODEL: model_name or "unknown",
trace_types.ATTR_REALTIME_MODEL_METRICS: ev.model_dump_json(),
trace_types.ATTR_GEN_AI_USAGE_INPUT_TOKENS: ev.input_tokens,
trace_types.ATTR_GEN_AI_USAGE_OUTPUT_TOKENS: ev.output_tokens,
trace_types.ATTR_GEN_AI_USAGE_INPUT_TEXT_TOKENS: ev.input_token_details.text_tokens,
trace_types.ATTR_GEN_AI_USAGE_INPUT_AUDIO_TOKENS: ev.input_token_details.audio_tokens,
trace_types.ATTR_GEN_AI_USAGE_INPUT_CACHED_TOKENS: ev.input_token_details.cached_tokens,
trace_types.ATTR_GEN_AI_USAGE_OUTPUT_TEXT_TOKENS: ev.output_token_details.text_tokens,
trace_types.ATTR_GEN_AI_USAGE_OUTPUT_AUDIO_TOKENS: ev.output_token_details.audio_tokens,
trace_types.ATTR_GEN_AI_USAGE_INPUT_TOKENS: ev.input_token_details.text_tokens,
trace_types.ATTR_GEN_AI_USAGE_OUTPUT_TOKENS: ev.output_token_details.text_tokens,
}
if cache_read_text:
attrs[trace_types.ATTR_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS] = cache_read_text
if ev.input_token_details.audio_tokens:
attrs[trace_types.ATTR_GEN_AI_USAGE_DETAILS_INPUT_AUDIO_TOKENS] = (
ev.input_token_details.audio_tokens
)
if cache_read_audio:
attrs[trace_types.ATTR_GEN_AI_USAGE_DETAILS_CACHE_AUDIO_READ_TOKENS] = cache_read_audio
if ev.output_token_details.audio_tokens:
attrs[trace_types.ATTR_GEN_AI_USAGE_DETAILS_OUTPUT_AUDIO_TOKENS] = (
ev.output_token_details.audio_tokens
)
if ev.ttft != -1:
completion_start_time = ev.timestamp + ev.ttft
# This attribute is used by LangFuse to calculate "time to first token metric"
Expand Down
18 changes: 9 additions & 9 deletions livekit-agents/livekit/agents/voice/run_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -964,15 +964,15 @@ async def check_intent(success: bool, reason: str) -> tuple[bool, str]:
current_span.set_attribute(trace_types.ATTR_FUNCTION_TOOL_OUTPUT, reason)

if usage:
current_span.set_attributes(
{
trace_types.ATTR_GEN_AI_USAGE_INPUT_TOKENS: usage.prompt_tokens,
trace_types.ATTR_GEN_AI_USAGE_OUTPUT_TOKENS: usage.completion_tokens,
trace_types.ATTR_GEN_AI_USAGE_INPUT_TEXT_TOKENS: usage.prompt_tokens,
trace_types.ATTR_GEN_AI_USAGE_OUTPUT_TEXT_TOKENS: usage.completion_tokens,
trace_types.ATTR_GEN_AI_USAGE_INPUT_CACHED_TOKENS: usage.prompt_cached_tokens,
}
)
judge_attrs: dict[str, int] = {
trace_types.ATTR_GEN_AI_USAGE_INPUT_TOKENS: usage.prompt_tokens,
trace_types.ATTR_GEN_AI_USAGE_OUTPUT_TOKENS: usage.completion_tokens,
}
if usage.prompt_cached_tokens:
judge_attrs[trace_types.ATTR_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS] = (
usage.prompt_cached_tokens
)
current_span.set_attributes(judge_attrs)

if not success:
self._raise(f"Judgement failed: {reason}")
Expand Down
70 changes: 70 additions & 0 deletions tests/test_record_realtime_metrics_otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from __future__ import annotations

from unittest.mock import MagicMock

from livekit.agents.metrics.base import Metadata, RealtimeModelMetrics
from livekit.agents.telemetry import trace_types
from livekit.agents.telemetry.utils import record_realtime_metrics


def test_record_realtime_metrics_sets_langfuse_otel_usage_keys() -> None:
span = MagicMock()
span.is_recording.return_value = True

ev = RealtimeModelMetrics(
request_id="req-1",
timestamp=0.0,
input_token_details=RealtimeModelMetrics.InputTokenDetails(
text_tokens=120,
audio_tokens=5,
cached_tokens=32,
cached_tokens_details=RealtimeModelMetrics.CachedTokenDetails(
text_tokens=30,
audio_tokens=2,
),
),
output_token_details=RealtimeModelMetrics.OutputTokenDetails(
text_tokens=40,
audio_tokens=7,
),
metadata=Metadata(model_name="gpt-realtime", model_provider="openai"),
)

record_realtime_metrics(span, ev)

span.set_attributes.assert_called_once()
attrs = span.set_attributes.call_args[0][0]

assert attrs[trace_types.ATTR_GEN_AI_USAGE_INPUT_TOKENS] == 120
assert attrs[trace_types.ATTR_GEN_AI_USAGE_OUTPUT_TOKENS] == 40
assert attrs[trace_types.ATTR_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS] == 30
assert attrs[trace_types.ATTR_GEN_AI_USAGE_DETAILS_INPUT_AUDIO_TOKENS] == 5
assert attrs[trace_types.ATTR_GEN_AI_USAGE_DETAILS_CACHE_AUDIO_READ_TOKENS] == 2
assert attrs[trace_types.ATTR_GEN_AI_USAGE_DETAILS_OUTPUT_AUDIO_TOKENS] == 7


def test_record_realtime_metrics_omits_zero_breakdown() -> None:
span = MagicMock()
span.is_recording.return_value = True

ev = RealtimeModelMetrics(
request_id="req-2",
timestamp=0.0,
input_token_details=RealtimeModelMetrics.InputTokenDetails(
text_tokens=10,
audio_tokens=0,
cached_tokens_details=None,
),
output_token_details=RealtimeModelMetrics.OutputTokenDetails(
text_tokens=3,
audio_tokens=0,
),
)

record_realtime_metrics(span, ev)

attrs = span.set_attributes.call_args[0][0]
assert trace_types.ATTR_GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS not in attrs
assert trace_types.ATTR_GEN_AI_USAGE_DETAILS_INPUT_AUDIO_TOKENS not in attrs
assert trace_types.ATTR_GEN_AI_USAGE_DETAILS_CACHE_AUDIO_READ_TOKENS not in attrs
assert trace_types.ATTR_GEN_AI_USAGE_DETAILS_OUTPUT_AUDIO_TOKENS not in attrs