Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions livekit-agents/livekit/agents/stt/stt.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ class SpeechEvent:
request_id: str = ""
alternatives: list[SpeechData] = field(default_factory=list)
recognition_usage: RecognitionUsage | None = None
speech_start_time: float | None = None
"""server-reported wall-clock time of speech onset, when the provider sends
a separate speech-start signal carrying onset timing."""


@dataclass
Expand Down
9 changes: 5 additions & 4 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -1642,10 +1642,11 @@ def _interrupt_by_audio_activity(

# region recognition hooks

def on_start_of_speech(self, ev: vad.VADEvent | None) -> None:
speech_start_time = time.time()
if ev:
speech_start_time = speech_start_time - ev.speech_duration - ev.inference_duration
def on_start_of_speech(
self,
ev: vad.VADEvent | None,
speech_start_time: float,
) -> None:
self._session._update_user_state("speaking", last_speaking_time=speech_start_time)
if self._audio_recognition:
self._audio_recognition.on_start_of_speech(
Expand Down
15 changes: 9 additions & 6 deletions livekit-agents/livekit/agents/voice/audio_recognition.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class _PreemptiveGenerationInfo:

class RecognitionHooks(Protocol):
def on_interruption(self, ev: inference.OverlappingSpeechEvent) -> None: ...
def on_start_of_speech(self, ev: vad.VADEvent | None) -> None: ...
def on_start_of_speech(self, ev: vad.VADEvent | None, speech_start_time: float) -> None: ...
def on_vad_inference_done(self, ev: vad.VADEvent) -> None: ...
def on_end_of_speech(self, ev: vad.VADEvent | None) -> None: ...
def on_interim_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None) -> None: ...
Expand Down Expand Up @@ -852,12 +852,15 @@ async def _on_stt_event(self, ev: stt.SpeechEvent) -> None:
self._run_eou_detection(chat_ctx)

elif ev.type == stt.SpeechEventType.START_OF_SPEECH and self._turn_detection_mode == "stt":
with trace.use_span(self._ensure_user_turn_span()):
self._hooks.on_start_of_speech(None)
# If the plugin provided a server onset timestamp, use it;
# otherwise fall back to message arrival time.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can add a condition where:

self._speech_start_time = ev.speech_start_time if ev.speech_start_time < self._speech_start_time else self._speech_start_time

for when the vad detects activity before the stt as well

Copy link
Copy Markdown
Contributor Author

@gsharp-aai gsharp-aai Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open to this! Just want to flag it changes behavior from the current PR. Two shapes:

  1. Fallback only (current PR as-is): _speech_start_time is only set from STT when VAD hasn't already set it. VAD wins when it fires, preserving current behavior.
  2. Earlier of VAD or STT (your suggestion): every STT SOS compares both and picks the earlier onset, even when VAD already fired.

I leaned toward #1 since local VAD's back-date is usually more accurate than the server timestamp (no network delay, no clock skew) plus less of a behavioral change (in relation to what currently exists), but happy to flip to #2 if you think the "STT caught it earlier" case is common enough to trust by default.

Let me know which shape the team prefers!

if self._speech_start_time is None:
self._speech_start_time = ev.speech_start_time or time.time()

with trace.use_span(self._ensure_user_turn_span(start_time=self._speech_start_time)):
self._hooks.on_start_of_speech(None, speech_start_time=self._speech_start_time)

self._speaking = True
if self._speech_start_time is None:
self._speech_start_time = time.time()
self._last_speaking_time = time.time()

if self._end_of_turn_task is not None:
Expand All @@ -872,7 +875,7 @@ async def _on_vad_event(self, ev: vad.VADEvent) -> None:
self._vad_speech_started = True

with trace.use_span(self._ensure_user_turn_span(start_time=speech_start_time)):
self._hooks.on_start_of_speech(ev)
self._hooks.on_start_of_speech(ev, speech_start_time=speech_start_time)

self._speaking = True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import dataclasses
import json
import os
import time
import weakref
from dataclasses import dataclass
from typing import Literal
Expand Down Expand Up @@ -282,6 +283,11 @@ def __init__(
self._config_update_queue: asyncio.Queue[dict] = asyncio.Queue()
self._session_id: str | None = None
self._expires_at: int | None = None
# Wall-clock time (time.time()) when the first audio frame was sent to
# the server. Used to convert the server's stream-relative timestamp
# (returned in SpeechStarted.timestamp) into a wall-clock time so the
# framework can back-date _speech_start_time on START_OF_SPEECH.
self._stream_wall_start: float | None = None

@property
def session_id(self) -> str | None:
Expand Down Expand Up @@ -356,6 +362,10 @@ def force_endpoint(self) -> None:

async def _run(self) -> None:
"""Run a single websocket connection to AssemblyAI."""
# Reset on each (re)connection — the server's stream-relative timestamps
# restart at 0 with every new WebSocket, so the wall-clock anchor must
# also be re-captured from this connection's first frame.
self._stream_wall_start = None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a field called start_time_offset in stt stream that plays a similar role, and it is assigned when the stream is initialized:

stream.start_time_offset = time.time() - _audio_input_started_at

I think we can add a second field stream.start_time so that other STT implementations can use it as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call!

Key consideration is that I believe "server-provided onset timestamp" would be anchored to whatever zero-point that provider defines, which will of course vary by each provider's sever-side implementation. Because of that, I was thinking that the framework can't reliably pin a single wall-clock moment that aligns with every provider's "zero" simultaneously (each plugin knows its own server's semantics and should probably own the anchoring moment).

What about putting the field on the base class (shared, discoverable, other plugins can adopt), seeding a framework default at init so plugins that don't override still get some value, and letting each plugin overwrite it at whatever moment corresponds to its own server's zero? The framework can handle resetting it on retries centrally, same pattern as start_time_offset.

Shape:

# base class SpeechStream
self._start_time: float = time.time()   # framework default

@property
def start_time(self) -> float: ...

@start_time.setter
def start_time(self, value: float) -> None: ...
# Plus a reset in _main_task across retries, same pattern as start_time_offset.

What do you think?

Edit: updated to seed a framework default and let plugins overwrite it, instead of leaving it as purely plugin-set.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds reasonable. The framework provides a default, and plugins can override it if needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The framework provides a default, and plugins can override it if needed.

Updated PR to reflect this

closing_ws = False

async def send_task(ws: aiohttp.ClientWebSocketResponse) -> None:
Expand All @@ -378,6 +388,9 @@ async def send_task(ws: aiohttp.ClientWebSocketResponse) -> None:
frames = audio_bstream.write(data.data.tobytes())

for frame in frames:
if self._stream_wall_start is None:
# Anchor wall-clock time at first audio frame sent.
self._stream_wall_start = time.time()
Comment thread
gsharp-aai marked this conversation as resolved.
Outdated
self._speech_duration += frame.duration
await ws.send_bytes(frame.data.tobytes())

Expand Down Expand Up @@ -518,7 +531,21 @@ def _process_stream_event(self, data: dict) -> None:
return

if message_type == "SpeechStarted":
self._event_ch.send_nowait(stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH))
# SpeechStarted can arrive well after actual speech onset. The
# `timestamp` field carries the server VAD's onset time in stream-
# relative ms. Convert to wall-clock by adding _stream_wall_start
# (recorded when the first audio frame was sent) so the framework
# records an accurate _speech_start_time instead of message arrival.
timestamp_ms = data.get("timestamp")
speech_start_time: float | None = None
if timestamp_ms is not None and self._stream_wall_start is not None:
speech_start_time = self._stream_wall_start + timestamp_ms / 1000
self._event_ch.send_nowait(
stt.SpeechEvent(
type=stt.SpeechEventType.START_OF_SPEECH,
speech_start_time=speech_start_time,
)
)
return

if message_type == "Termination":
Expand Down