Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
stt,
utils,
)
from livekit.agents.stt import SpeechData
from livekit.agents.types import (
NOT_GIVEN,
NotGivenOr,
Expand Down Expand Up @@ -72,6 +73,7 @@ class STTOptions:
numerals: bool = False
mip_opt_out: bool = False
tags: NotGivenOr[list[str]] = NOT_GIVEN
final_on_endpoint: bool = False


class STT(stt.STT):
Expand Down Expand Up @@ -100,6 +102,7 @@ def __init__(
numerals: bool = False,
mip_opt_out: bool = False,
vad_events: bool = True,
final_on_endpoint: bool = False,
# deprecated
keyterms: NotGivenOr[list[str]] = NOT_GIVEN,
) -> None:
Expand Down Expand Up @@ -130,6 +133,20 @@ def __init__(
mip_opt_out: Whether to take part in the model improvement program
vad_events: Whether to enable VAD (Voice Activity Detection) events.
When enabled, SpeechStarted events are sent when speech is detected. Defaults to True.
final_on_endpoint: When True, emit ``FINAL_TRANSCRIPT`` only once per utterance,
on ``speech_final=True`` (endpoint detected). Deepgram fires
``is_final=True`` multiple times per utterance as it stabilizes word
batches; those intermediate batches are accumulated internally and
surfaced as ``INTERIM_TRANSCRIPT`` events carrying the cumulative
text, then emitted as a single ``FINAL_TRANSCRIPT`` with the full
utterance when the endpoint fires. Cumulative text from the Deepgram
API (where a new batch's transcript re-includes prior words after the
model revises earlier hypotheses) is detected via a prefix check and
replaces rather than appends, to avoid doubling up overlap. This
makes ``endpointing_ms`` the authoritative control over transcript-
segment boundaries, matching the mental model most consumers expect
for UI/analytics. Trade-off: ``FINAL_TRANSCRIPT`` latency grows by up
to ``endpointing_ms``. Defaults to False to preserve existing behavior.

Raises:
ValueError: If no API key is provided or found in environment variables.
Expand Down Expand Up @@ -185,6 +202,7 @@ def __init__(
vad_events=vad_events,
tags=_validate_tags(tags) if is_given(tags) else [],
endpoint_url=base_url,
final_on_endpoint=final_on_endpoint,
)
self._session = http_session
self._streams = weakref.WeakSet[SpeechStream]()
Expand Down Expand Up @@ -298,6 +316,7 @@ def update_options(
vad_events: NotGivenOr[bool] = NOT_GIVEN,
tags: NotGivenOr[list[str]] = NOT_GIVEN,
endpoint_url: NotGivenOr[str] = NOT_GIVEN,
final_on_endpoint: NotGivenOr[bool] = NOT_GIVEN,
# deprecated
keyterms: NotGivenOr[list[str]] = NOT_GIVEN,
) -> None:
Expand Down Expand Up @@ -342,6 +361,8 @@ def update_options(
self._opts.tags = _validate_tags(tags)
if is_given(endpoint_url):
self._opts.endpoint_url = endpoint_url
if is_given(final_on_endpoint):
self._opts.final_on_endpoint = final_on_endpoint

for stream in self._streams:
stream.update_options(
Expand All @@ -361,6 +382,7 @@ def update_options(
mip_opt_out=mip_opt_out,
vad_events=vad_events,
endpoint_url=endpoint_url,
final_on_endpoint=final_on_endpoint,
)

def _sanitize_options(
Expand Down Expand Up @@ -410,6 +432,10 @@ def __init__(

self._request_id = ""
self._reconnect_event = asyncio.Event()
# Accumulator for final_on_endpoint mode: collects text from intermediate
# is_final=True batches so that one utterance produces one FINAL_TRANSCRIPT
# carrying the complete transcript. Reset on each FINAL_TRANSCRIPT emission.
self._pending_final_alt: SpeechData | None = None

def update_options(
self,
Expand All @@ -432,6 +458,7 @@ def update_options(
vad_events: NotGivenOr[bool] = NOT_GIVEN,
tags: NotGivenOr[list[str]] = NOT_GIVEN,
endpoint_url: NotGivenOr[str] = NOT_GIVEN,
final_on_endpoint: NotGivenOr[bool] = NOT_GIVEN,
# deprecated
keyterms: NotGivenOr[list[str]] = NOT_GIVEN,
) -> None:
Expand Down Expand Up @@ -476,6 +503,8 @@ def update_options(
self._opts.tags = _validate_tags(tags)
if is_given(endpoint_url):
self._opts.endpoint_url = endpoint_url
if is_given(final_on_endpoint):
self._opts.final_on_endpoint = final_on_endpoint

self._reconnect_event.set()

Expand Down Expand Up @@ -586,6 +615,9 @@ async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:
break

self._reconnect_event.clear()
# Reconnecting: drop any partial accumulator so we don't
# splice fragments across a stream boundary.
self._pending_final_alt = None
finally:
await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task)
tasks_group.cancel()
Expand Down Expand Up @@ -694,7 +726,49 @@ def _process_stream_event(self, data: dict) -> None:
start_event = stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
self._event_ch.send_nowait(start_event)

if is_final_transcript:
# Deepgram's is_final=True fires multiple times per utterance as it
# stabilizes word batches, and those batches can overlap when the
# model revises earlier text. When final_on_endpoint is True, we
# accumulate the text of intermediate is_final batches and only
# emit FINAL_TRANSCRIPT on speech_final=True so that one spoken
# utterance maps to one final event carrying the full transcript.
if self._opts.final_on_endpoint:
if is_final_transcript:
# Accumulate into a single combined SpeechData.
combined = _merge_speech_data(self._pending_final_alt, alts[0])
combined_alts = [combined] + list(alts[1:])
if is_endpoint:
final_event = stt.SpeechEvent(
type=stt.SpeechEventType.FINAL_TRANSCRIPT,
request_id=request_id,
alternatives=combined_alts,
)
self._event_ch.send_nowait(final_event)
self._pending_final_alt = None
else:
# Emit INTERIM with the cumulative text so downstream
# consumers that overwrite (rather than accumulate) their
# interim buffer still see the full in-progress transcript.
interim_event = stt.SpeechEvent(
type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
request_id=request_id,
alternatives=combined_alts,
)
self._event_ch.send_nowait(interim_event)
self._pending_final_alt = combined
else:
# is_final=False — tentative interim. Prepend any accumulated
# finalized text so the downstream interim buffer shows the
# full utterance-so-far while Deepgram is still refining.
display_alt = _overlay_pending_interim(self._pending_final_alt, alts[0])
display_alts = [display_alt] + list(alts[1:])
interim_event = stt.SpeechEvent(
type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
request_id=request_id,
alternatives=display_alts,
)
self._event_ch.send_nowait(interim_event)
elif is_final_transcript:
final_event = stt.SpeechEvent(
type=stt.SpeechEventType.FINAL_TRANSCRIPT,
request_id=request_id,
Expand All @@ -714,6 +788,10 @@ def _process_stream_event(self, data: dict) -> None:
# a non-empty transcript (deepgram doesn't have a SpeechEnded event)
if is_endpoint and self._speaking:
self._speaking = False
# Safety: clear any partial accumulator after speech ends — the
# FINAL branch above normally resets it, but this guards against
# speech_final arriving with no is_final batches accumulated.
self._pending_final_alt = None
Comment on lines 789 to +794
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Accumulated transcripts silently lost when endpoint fires with empty text in final_on_endpoint mode

When final_on_endpoint=True, intermediate is_final=True batches are accumulated in self._pending_final_alt and only emitted as FINAL_TRANSCRIPT when speech_final=True (the endpoint). However, the entire accumulation/emission logic is gated by if len(alts) > 0 and alts[0].text: (line 723), which is falsy when the endpoint batch carries an empty transcript. Deepgram commonly sends speech_final=True with an empty transcript when all words were already finalized in prior batches — exactly the scenario final_on_endpoint is designed for. When this happens, the code skips past all the final_on_endpoint logic, and then the guard at line 789-794 clears self._pending_final_alt = None without ever emitting the accumulated text. The entire utterance's transcript is silently dropped.

Example scenario
  1. DG sends is_final=True, speech_final=False, text="hello" → accumulated in _pending_final_alt
  2. DG sends is_final=True, speech_final=False, text="world" → merged into _pending_final_alt
  3. DG sends is_final=True, speech_final=True, text="" → alts[0].text is falsy, inner block skipped
  4. Line 794: self._pending_final_alt = None → accumulated "hello world" lost, no FINAL_TRANSCRIPT emitted
Prompt for agents
In _process_stream_event, when final_on_endpoint is True and speech_final=True arrives, the accumulated _pending_final_alt must be emitted as a FINAL_TRANSCRIPT even if the current batch's text is empty. The fix should be applied near the endpoint handling block (around line 789). Before clearing _pending_final_alt and emitting END_OF_SPEECH, check if final_on_endpoint is True and _pending_final_alt is not None. If so, emit a FINAL_TRANSCRIPT event using the pending accumulated data. This could be done by moving the endpoint+accumulator flush logic outside the `if len(alts) > 0 and alts[0].text:` guard, or by adding a separate check in the `if is_endpoint and self._speaking:` block that emits the pending data before clearing it.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

self._event_ch.send_nowait(stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH))

elif data["type"] == "Metadata":
Expand All @@ -722,6 +800,76 @@ def _process_stream_event(self, data: dict) -> None:
logger.warning("received unexpected message from deepgram %s", data)


def _merge_speech_data(pending: stt.SpeechData | None, incoming: stt.SpeechData) -> stt.SpeechData:
"""Combine a prior finalized ``SpeechData`` with a new finalized batch.

Deepgram sometimes emits cumulative text (the new batch's ``transcript``
already includes the prior batch's words, e.g. after revising an earlier
hypothesis) and sometimes emits purely new words. We detect the cumulative
case via a prefix check and replace instead of concatenating, to avoid
doubling up the overlapping text.
"""
if pending is None:
return incoming

pending_text = pending.text.strip()
incoming_text = incoming.text.strip()
if not pending_text:
return incoming
if not incoming_text:
return pending

if incoming_text.startswith(pending_text):
combined_text = incoming_text
combined_words = list(incoming.words) if incoming.words else []
else:
combined_text = f"{pending_text} {incoming_text}".strip()
combined_words = (list(pending.words) if pending.words else []) + (
list(incoming.words) if incoming.words else []
)

start_time = pending.start_time if pending.start_time else incoming.start_time
if incoming.start_time:
start_time = min(start_time, incoming.start_time) if start_time else incoming.start_time
Comment on lines +831 to +833
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Falsy check on start_time treats valid 0.0 timestamp as missing

In _merge_speech_data, line 831 uses if pending.start_time to decide whether to use pending.start_time or fall back to incoming.start_time. Since start_time is a float defaulting to 0.0 (stt.py:57), a legitimate start time of 0.0 (the beginning of the audio stream) is treated as falsy/missing, causing the code to incorrectly use incoming.start_time (a later timestamp) instead. The same pattern on line 832-833 (if incoming.start_time, if start_time) has the same issue. The correct check should use is not None or explicit comparison, but since start_time can never be None (it's float), the intent is likely pending.start_time is not None which is always true — so the logic should simply be min(pending.start_time, incoming.start_time).

Suggested change
start_time = pending.start_time if pending.start_time else incoming.start_time
if incoming.start_time:
start_time = min(start_time, incoming.start_time) if start_time else incoming.start_time
start_time = min(pending.start_time, incoming.start_time)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

confidence = (
(pending.confidence + incoming.confidence) / 2
if pending.confidence and incoming.confidence
else (incoming.confidence or pending.confidence)
)
return dataclasses.replace(
incoming,
text=combined_text,
words=combined_words,
start_time=start_time,
end_time=max(pending.end_time, incoming.end_time),
confidence=confidence,
)


def _overlay_pending_interim(
pending: stt.SpeechData | None, incoming: stt.SpeechData
) -> stt.SpeechData:
"""Return an interim ``SpeechData`` whose text is the accumulated finalized
prefix plus the current tentative interim. Used in ``final_on_endpoint``
mode so downstream consumers that overwrite their interim buffer still see
the full utterance-so-far between finalized batches.
"""
if pending is None or not pending.text.strip():
return incoming

pending_text = pending.text.strip()
incoming_text = incoming.text.strip()
if not incoming_text:
return dataclasses.replace(incoming, text=pending_text)

display_text = (
incoming_text
if incoming_text.startswith(pending_text)
else f"{pending_text} {incoming_text}".strip()
)
return dataclasses.replace(incoming, text=display_text)


def live_transcription_to_speech_data(
language: str, data: dict, *, is_final: bool, start_time_offset: float
) -> list[stt.SpeechData]:
Expand Down