Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8138135
LCORE-1572: add conversation compaction and wire it into /v1/query
max-svistunov May 26, 2026
a9ac9bc
LCORE-1572: unit tests for conversation compaction core and /v1/query
max-svistunov May 26, 2026
56a9d88
LCORE-1572: apply conversation compaction in the A2A endpoint
max-svistunov May 26, 2026
518dfb5
LCORE-1572: apply conversation compaction in the streaming_query endp…
max-svistunov May 26, 2026
3854a71
LCORE-1572: tests for the streaming compaction gate
max-svistunov May 26, 2026
91b4fc9
LCORE-1572: apply conversation compaction in the /v1/responses endpoint
max-svistunov May 26, 2026
d173d38
LCORE-1572: tests for /v1/responses compacted-turn storage
max-svistunov May 26, 2026
3202433
LCORE-1572: update spec doc to the as-built compaction design
max-svistunov May 26, 2026
1a78038
LCORE-1572: fix needs_compaction_path docstring (pydocstyle D400)
max-svistunov May 26, 2026
412b7d9
LCORE-1572: build compacted input as typed messages (silence Pydantic…
max-svistunov May 26, 2026
88a1071
LCORE-1572: raise instead of assert on the drained compaction result
max-svistunov May 26, 2026
79dd34d
LCORE-1572: wire persisted recursive fold (R3) via the summary cache
max-svistunov May 26, 2026
cb7414d
LCORE-1572: address CodeRabbit review — list-form input tokens + clar…
max-svistunov May 26, 2026
423e093
LCORE-1572: persist compacted streaming turns with structure (CodeRab…
max-svistunov May 26, 2026
a82f9ff
LCORE-1572: do not initialize the conversation cache when compaction …
max-svistunov May 26, 2026
0a80995
LCORE-1572: address CodeRabbit round 2 (compacted-mode persistence ed…
max-svistunov May 27, 2026
5a708c4
LCORE-1572: document the disable-after-compaction limitation in the s…
max-svistunov May 27, 2026
597ab01
LCORE-1572: document as-built divergences in spec doc (cache source-o…
max-svistunov May 27, 2026
ce8627e
LCORE-1572: fix line-too-long (C0301) in interrupted-turn test docstring
max-svistunov May 27, 2026
d6e0367
LCORE-1572: harden disabled-cache regression test to fail on eager ca…
max-svistunov May 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 100 additions & 50 deletions docs/design/conversation-compaction/conversation-compaction.md

Large diffs are not rendered by default.

53 changes: 48 additions & 5 deletions src/app/endpoints/a2a.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Handler for A2A (Agent-to-Agent) protocol endpoints using Responses API."""

# pylint: disable=too-many-lines

import asyncio
import json
import uuid
Expand Down Expand Up @@ -32,7 +34,7 @@
from llama_stack_api.openai_responses import (
OpenAIResponseObjectStream,
)
from llama_stack_client import APIConnectionError
from llama_stack_client import APIConnectionError, AsyncLlamaStackClient
from starlette.responses import Response, StreamingResponse

from a2a_storage import A2AContextStore, A2AStorageFactory
Expand All @@ -45,13 +47,18 @@
from constants import MEDIA_TYPE_EVENT_STREAM
from log import get_logger
from models.api.requests import QueryRequest
from models.common.responses.types import ResponseInput
from models.config import Action
from utils.conversation_compaction import (
apply_compaction_blocking,
store_compacted_turn,
)
from utils.mcp_headers import McpHeaders, mcp_headers_dependency
from utils.responses import (
extract_text_from_response_item,
prepare_responses_params,
)
from utils.suid import normalize_conversation_id
from utils.suid import normalize_conversation_id, to_llama_stack_conversation_id
from version import __version__

logger = get_logger(__name__)
Expand Down Expand Up @@ -336,6 +343,19 @@ async def _process_task_streaming( # pylint: disable=too-many-locals
store=True,
request_headers=self.request_headers,
)
# Compact the conversation if it is approaching the context window
# limit. A2A is not a browser SSE stream, so no progress event is
# emitted; the blocking variant summarizes inline before the call.
# No conversation cache is passed: the A2A executor has no resolved
# user_id for the (user_id, conversation_id) cache key, so A2A runs
# in marker-only mode (additive summaries, no persisted fold).
compaction = await apply_compaction_blocking(
client,
responses_params,
configuration.inference,
configuration.compaction,
)
responses_params = compaction.params
# Stream response from LLM using the Responses API
stream = await client.responses.create(**responses_params.model_dump())
except APIConnectionError as e:
Expand Down Expand Up @@ -392,9 +412,16 @@ async def _process_task_streaming( # pylint: disable=too-many-locals
)
)

# Process stream using generator and aggregator pattern
# Process stream using generator and aggregator pattern. In compacted
# mode the conversation parameter is not sent, so the turn is stored
# explicitly once the response completes (see _convert_stream_to_events).
async for a2a_event in self._convert_stream_to_events(
stream, task_id, context_id, conversation_id
stream,
task_id,
context_id,
conversation_id,
client,
compaction.original_input if compaction.compacted else None,
):
Comment thread
coderabbitai[bot] marked this conversation as resolved.
aggregator.process_event(a2a_event)
await event_queue.enqueue_event(a2a_event)
Expand All @@ -414,12 +441,14 @@ async def _process_task_streaming( # pylint: disable=too-many-locals
final=True,
)

async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-many-locals
async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-many-locals,too-many-arguments,too-many-positional-arguments
self,
stream: AsyncIterator[OpenAIResponseObjectStream],
task_id: str,
context_id: str,
conversation_id: Optional[str],
client: Optional[AsyncLlamaStackClient] = None,
compacted_original_input: Optional[ResponseInput] = None,
) -> AsyncIterator[Any]:
"""Convert Responses API stream chunks to A2A events.

Expand Down Expand Up @@ -508,6 +537,20 @@ async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-ma

if response_obj:
output = getattr(response_obj, "output", [])
# In compacted mode the conversation parameter was not sent,
# so persist this turn ourselves to keep the recent-turn
# buffer and audit history intact for the next request.
if (
compacted_original_input is not None
and client is not None
and conversation_id
):
await store_compacted_turn(
client,
to_llama_stack_conversation_id(conversation_id),
compacted_original_input,
output,
)
a2a_parts = _convert_responses_content_to_a2a_parts(output)
if not a2a_parts and final_text:
a2a_parts = [Part(root=TextPart(text=final_text))]
Expand Down
50 changes: 48 additions & 2 deletions src/app/endpoints/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@
from models.api.responses.successful import QueryResponse
from models.common.moderation import ShieldModerationResult
from models.common.responses.responses_api_params import ResponsesApiParams
from models.common.responses.types import ResponseInput
from models.common.turn_summary import TurnSummary
from models.config import Action
from utils.conversation_compaction import (
apply_compaction_blocking,
configured_conversation_cache,
store_compacted_turn,
)
from utils.conversations import append_turn_items_to_conversation
from utils.endpoints import (
check_configuration_loaded,
Expand Down Expand Up @@ -196,6 +202,20 @@ async def query_endpoint_handler(
inline_rag_context=inline_rag_context.context_text,
)

# Compact the conversation if it is approaching the context window limit.
# When compaction is active, params carry explicit input and the
# conversation parameter is dropped (lightspeed-stack owns the context).
compaction = await apply_compaction_blocking(
client,
responses_params,
configuration.inference,
configuration.compaction,
cache=configured_conversation_cache(),
user_id=user_id,
skip_user_id_check=_skip_userid_check,
)
responses_params = compaction.params

# Handle Azure token refresh if needed
if (
responses_params.model.startswith("azure")
Expand All @@ -207,7 +227,11 @@ async def query_endpoint_handler(

# Retrieve response using Responses API
turn_summary = await retrieve_response(
client, responses_params, moderation_result, endpoint_path
client,
responses_params,
moderation_result,
endpoint_path,
original_input=compaction.original_input if compaction.compacted else None,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if moderation_result.decision == "passed":
Expand Down Expand Up @@ -282,6 +306,7 @@ async def retrieve_response(
responses_params: ResponsesApiParams,
moderation_result: ShieldModerationResult,
endpoint_path: str = "",
original_input: Optional[ResponseInput] = None,
) -> TurnSummary:
"""
Retrieve response from LLMs and agents.
Expand All @@ -294,17 +319,28 @@ async def retrieve_response(
client: The AsyncLlamaStackClient to use for the request.
responses_params: The Responses API parameters.
moderation_result: The moderation result.
endpoint_path: The request path, for metrics/telemetry.
original_input: Set only in compacted mode (LCORE-1572). It is the new
user query before the explicit-input rewrite. When provided, the
turn is appended to the conversation here, because the conversation
parameter is no longer passed to Llama Stack and so the turn is not
stored automatically.

Returns:
-------
TurnSummary: Summary of the LLM response content
"""
response: Optional[OpenAIResponseObject] = None
# In compacted mode, the new turn must be stored against the original user
# query, not the explicit summaries-plus-recent input we send to inference.
turn_input = (
original_input if original_input is not None else responses_params.input
)
if moderation_result.decision == "blocked":
await append_turn_items_to_conversation(
client,
responses_params.conversation,
responses_params.input,
turn_input,
[moderation_result.refusal_response],
)
return TurnSummary(
Expand All @@ -331,6 +367,16 @@ async def retrieve_response(
error_response = handle_known_apistatus_errors(e, responses_params.model)
raise HTTPException(**error_response.model_dump()) from e

# In compacted mode, store the completed turn ourselves (the conversation
# parameter was not sent, so Llama Stack did not persist it).
if original_input is not None:
await store_compacted_turn(
client,
responses_params.conversation,
original_input,
response.output,
)

vector_store_ids = extract_vector_store_ids_from_tools(responses_params.tools)
rag_id_mapping = configuration.rag_id_mapping
return build_turn_summary(
Expand Down
64 changes: 60 additions & 4 deletions src/app/endpoints/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,13 @@
from models.common.moderation import ShieldModerationBlocked
from models.common.responses.contexts import ResponsesContext
from models.common.responses.responses_api_params import ResponsesApiParams
from models.common.responses.types import ResponseInput
from models.common.turn_summary import TurnSummary
from models.config import Action
from utils.conversation_compaction import (
apply_compaction_blocking,
configured_conversation_cache,
)
from utils.conversations import append_turn_items_to_conversation
from utils.endpoints import (
check_configuration_loaded,
Expand Down Expand Up @@ -225,10 +230,18 @@ async def _persist_blocked_response_turn(
"""
if api_params.store:
moderation_result = cast(ShieldModerationBlocked, context.moderation_result)
# In compacted mode the conversation parameter was dropped and
# api_params.input is the explicit-input rewrite, so persist the turn
# against the original user input instead (LCORE-1572).
user_input = (
context.compacted_original_input
if context.compacted_original_input is not None
else api_params.input
)
await append_turn_items_to_conversation(
client=context.client,
conversation_id=api_params.conversation,
user_input=api_params.input,
user_input=user_input,
llm_output=[moderation_result.refusal_response],
)

Expand All @@ -238,14 +251,30 @@ async def _append_previous_response_turn(
context: ResponsesContext,
output: Sequence[OpenAIResponseOutput],
) -> None:
"""Append response output when continuing from a previous response id.
"""Append the completed turn when Llama Stack did not store it automatically.

Llama Stack stores the turn itself only when the conversation parameter is
sent. Two cases bypass that and require an explicit append: continuing from
a ``previous_response_id``, and conversation compaction (LCORE-1572), where
the conversation parameter is dropped in favor of explicit input. In the
compaction case the turn is stored against the original user input (before
the explicit-input rewrite), carried on the context.

Args:
api_params: Responses API parameters containing conversation details.
context: Request-scoped Responses API context.
output: Final output items from the Responses API object.
"""
if api_params.store and api_params.previous_response_id:
if not api_params.store:
return
if context.compacted_original_input is not None:
await append_turn_items_to_conversation(
context.client,
api_params.conversation,
context.compacted_original_input,
output,
)
elif api_params.previous_response_id:
await append_turn_items_to_conversation(
context.client,
api_params.conversation,
Expand Down Expand Up @@ -356,7 +385,7 @@ async def responses_endpoint_handler(
check_configuration_loaded(configuration)
started_at = datetime.now(UTC)
rh_identity_context = get_rh_identity_context(request)
user_id, _, _, token = auth
user_id, _, skip_userid_check, token = auth

await check_mcp_auth(configuration, mcp_headers, token, request.headers)

Expand Down Expand Up @@ -455,6 +484,32 @@ async def responses_endpoint_handler(
)

api_params = ResponsesApiParams.model_validate(updated_request.model_dump())

# Compact the conversation if it is approaching the context window limit.
# /v1/responses is OpenAI-compatible, so compaction is silent (no custom SSE
# event): summarization happens before the response is created, and the turn
# is appended explicitly afterward (the conversation parameter is dropped).
# Only stateful single-conversation requests are eligible.
compacted_original_input: Optional[ResponseInput] = None
if (
configuration.compaction.enabled
and api_params.store
and api_params.conversation
and not api_params.previous_response_id
):
compaction = await apply_compaction_blocking(
client,
api_params,
configuration.inference,
configuration.compaction,
cache=configured_conversation_cache(),
user_id=user_id,
skip_user_id_check=skip_userid_check,
)
api_params = compaction.params
if compaction.compacted:
compacted_original_input = compaction.original_input
Comment thread
coderabbitai[bot] marked this conversation as resolved.

context = ResponsesContext(
client=client,
auth=auth,
Expand All @@ -468,6 +523,7 @@ async def responses_endpoint_handler(
user_agent=_get_user_agent(request),
endpoint_path=endpoint_path,
generate_topic_summary=updated_request.generate_topic_summary,
compacted_original_input=compacted_original_input,
)
response_handler = (
handle_streaming_response
Expand Down
Loading
Loading