Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8138135
LCORE-1572: add conversation compaction and wire it into /v1/query
max-svistunov May 26, 2026
a9ac9bc
LCORE-1572: unit tests for conversation compaction core and /v1/query
max-svistunov May 26, 2026
56a9d88
LCORE-1572: apply conversation compaction in the A2A endpoint
max-svistunov May 26, 2026
518dfb5
LCORE-1572: apply conversation compaction in the streaming_query endp…
max-svistunov May 26, 2026
3854a71
LCORE-1572: tests for the streaming compaction gate
max-svistunov May 26, 2026
91b4fc9
LCORE-1572: apply conversation compaction in the /v1/responses endpoint
max-svistunov May 26, 2026
d173d38
LCORE-1572: tests for /v1/responses compacted-turn storage
max-svistunov May 26, 2026
3202433
LCORE-1572: update spec doc to the as-built compaction design
max-svistunov May 26, 2026
1a78038
LCORE-1572: fix needs_compaction_path docstring (pydocstyle D400)
max-svistunov May 26, 2026
412b7d9
LCORE-1572: build compacted input as typed messages (silence Pydantic…
max-svistunov May 26, 2026
88a1071
LCORE-1572: raise instead of assert on the drained compaction result
max-svistunov May 26, 2026
79dd34d
LCORE-1572: wire persisted recursive fold (R3) via the summary cache
max-svistunov May 26, 2026
cb7414d
LCORE-1572: address CodeRabbit review — list-form input tokens + clar…
max-svistunov May 26, 2026
423e093
LCORE-1572: persist compacted streaming turns with structure (CodeRab…
max-svistunov May 26, 2026
a82f9ff
LCORE-1572: do not initialize the conversation cache when compaction …
max-svistunov May 26, 2026
0a80995
LCORE-1572: address CodeRabbit round 2 (compacted-mode persistence ed…
max-svistunov May 27, 2026
5a708c4
LCORE-1572: document the disable-after-compaction limitation in the s…
max-svistunov May 27, 2026
597ab01
LCORE-1572: document as-built divergences in spec doc (cache source-o…
max-svistunov May 27, 2026
ce8627e
LCORE-1572: fix line-too-long (C0301) in interrupted-turn test docstring
max-svistunov May 27, 2026
d6e0367
LCORE-1572: harden disabled-cache regression test to fail on eager ca…
max-svistunov May 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 76 additions & 49 deletions docs/design/conversation-compaction/conversation-compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ R9
Compaction configuration must be admin-configurable via YAML: threshold ratio, fixed token floor, and buffer zone size.

R10
After compaction, lightspeed-stack injects the summary as a marked item into the existing Llama Stack conversation. When building context for the LLM, lightspeed-stack selects only items from the last summary marker onward. This preserves a single continuous conversation identity in Llama Stack while giving lightspeed-stack control over what the LLM sees.
After compaction, lightspeed-stack builds the LLM input explicitly — the summaries plus the recent verbatim turns plus the new query — and stops passing the Llama Stack `conversation` parameter for that request, because Llama Stack always reloads the full message history when the `conversation` parameter is set (verified empirically on llama-stack 0.6.0; see the Changelog and the spike doc). The summary is still written into the conversation as a marked item so it appears in the Conversations API, but the marker is lightspeed-stack's own boundary bookkeeping, not a Llama Stack selection mechanism. The `conversation_id` is preserved across the whole conversation, and the full history (including pre-compaction turns) remains in the conversation's items for UI/audit. Because the `conversation` parameter is no longer sent in compacted mode, lightspeed-stack appends each completed turn to the conversation itself.

This applies to every endpoint that builds context from a growing conversation and calls the Responses API: `/v1/query`, `/v1/streaming_query`, the A2A executor, and `/v1/responses`. (The `/v1/rlsapi` inference path is stateless — no stored conversation — and is therefore out of scope.)

R11
Compaction must be blocking per conversation. If a request triggers compaction, concurrent requests on the same conversation must wait until compaction completes. This prevents race conditions (e.g., two requests both triggering compaction, or a new message being appended mid-compaction).

R12
The streaming endpoint must emit a compaction event (e.g., `{"event": "compaction_started"}`) before the summarization LLM call begins, so the client can display a progress indicator. Non-streaming requests have no mid-request notification mechanism.
The native streaming endpoint (`/v1/streaming_query`) must emit a compaction event (e.g., `{"event": "compaction"}`) before the summarization LLM call begins, so the client can display a progress indicator. Non-streaming requests (`/v1/query`) have no mid-request notification mechanism. The A2A executor compacts inline (it is not a browser SSE stream). `/v1/responses` is the OpenAI-compatible endpoint and compacts **silently** — it does not inject a non-standard event into the OpenAI-format stream, preserving wire compatibility.

# Use Cases

Expand Down Expand Up @@ -95,28 +97,32 @@ User Query → lightspeed-stack
1. Resolve model, system prompt, tools
2. Build input (query + RAG + attachments)
3. Acquire per-conversation lock (blocks concurrent requests)
4. Estimate total tokens (tiktoken): system + history + new query
5. If compaction needed (tokens > threshold):
a. Emit compaction event on streaming endpoint
b. Retrieve conversation history from Llama Stack
4. Estimate total tokens (tiktoken): system + (summaries + recent items) + new query
5. If compaction needed (tokens > threshold) OR a prior summary marker exists:
a. Emit compaction event (native streaming endpoint only)
b. Retrieve conversation items from Llama Stack
c. Split into "old" (summarize) and "recent" (keep)
— degrading guard: reduce recent turns if they exceed token budget
d. Summarize old turns → inject summary as marked item into conversation
e. Store summary chunk in conversation cache
6. Build context: select items from last summary marker onward + new query
7. Call Llama Stack Responses API with conversation parameter
(Llama Stack loads items from marker onward)
d. Summarize old turns → write summary as a marked item into conversation
6. Build EXPLICIT input: [summary markers] + [recent items after last marker] + new query
7. Call Llama Stack Responses API WITHOUT the conversation parameter
(so Llama Stack does not reload the full history)
Llama Stack
8. Processes conversation (summary marker + recent turns + new query)
8. Processes exactly the explicit input
lightspeed-stack
9. Response stored in same conversation (continuous history)
10. Update conversation cache
11. Release per-conversation lock
12. Return QueryResponse with context_status="summarized" (or "full")
9. Append the completed turn to the conversation items (continuous history,
same conversation_id) — Llama Stack did not auto-store it (no conversation param)
10. Release per-conversation lock
11. Return response (context_status="summarized" when 1573 lands; "full" otherwise)
```

Note: when no prior summary exists and the request is below the threshold,
none of step 5–9 applies — the request takes the normal path with the
`conversation` parameter, byte-for-byte unchanged. The explicit-input path is
entered only for conversations that are actually being (or have been) compacted.

## Token estimation

Add tiktoken as a dependency. Create `src/utils/token_estimator.py`:
Expand Down Expand Up @@ -221,11 +227,11 @@ A conversation may have multiple summary chunks (one per compaction event). All

## Changed request flow after compaction

After compaction, lightspeed-stack injects the summary as a marked conversation item into the existing Llama Stack conversation. The summary item has a recognizable marker (e.g., metadata tag or content prefix) so that lightspeed-stack can identify it when loading history.
After compaction, lightspeed-stack writes the summary as a marked conversation item (a message whose text begins with a recognizable sentinel) so it appears in the Conversations API and serves as lightspeed-stack's own boundary marker.

When building context for subsequent requests, lightspeed-stack fetches conversation items and selects only those from the last summary marker onward. The `conversation` parameter continues to be used — Llama Stack still manages the conversation. lightspeed-stack just controls *which items* form the LLM context.
When building context for a compacted conversation, lightspeed-stack fetches the conversation items, collects every marker's summary, takes the items after the last marker as the recent verbatim buffer, and sends `[summaries] + [recent items] + [new query]` as **explicit input**, **without** the `conversation` parameter. This is necessary because Llama Stack reloads the *full* stored message history whenever the `conversation` parameter is set — there is no marker-based selection hook (verified empirically; see the Changelog). Each completed turn is then appended back to the conversation items by lightspeed-stack, since Llama Stack no longer auto-stores it.

This preserves a single continuous conversation identity. The user sees one conversation in the UI, and the Conversations API returns the full history including summary items.
This preserves a single continuous conversation identity. The `conversation_id` never changes, the user sees one conversation in the UI, and the Conversations API returns the full history including the summary marker items.

## API response changes

Expand Down Expand Up @@ -291,36 +297,36 @@ Add `compaction` field to the root `Configuration` class.
| `src/utils/compaction.py` | New module: summarization logic, partitioning, additive summary management |
| `src/models/config.py` | Add `CompactionConfiguration` (near `ConversationHistoryConfiguration`) |
| `src/configuration.py` | Add `compaction_configuration` property to `AppConfig` singleton |
| `src/utils/responses.py` | Modify `prepare_responses_params()` — insert compaction check (see below) |
| `src/app/endpoints/query.py` | No changes needed — compaction happens inside `prepare_responses_params()` |
| `src/app/endpoints/streaming_query.py` | No changes needed — same function is used |
| `src/models/responses.py` | Add `context_status` field to `QueryResponse` and `StreamingQueryResponse` |
| `src/cache/` (all backends) | Extend schema for `ConversationSummary` storage |

## Insertion point in `responses.py`

The compaction hook goes in `prepare_responses_params()`. Its signature:

``` python
async def prepare_responses_params(
client: AsyncLlamaStackClient,
query_request: QueryRequest,
user_conversation: Optional[UserConversation],
...
) -> ResponsesApiParams:
```

At the insertion point (after line 297), the following are available:

- `client` — Llama Stack client (can fetch conversation history)
- `llama_stack_conv_id` — the conversation ID
- `model` — selected model (e.g., `"openai/gpt-4o-mini"`)
- `system_prompt` — resolved system prompt
- `tools` — prepared tool list
- `input_text` — the user's query with RAG context
- `user_conversation` — DB metadata including `message_count`

After compaction, the summary is injected as a conversation item in Llama Stack. When building the next request, lightspeed-stack fetches items from the conversation, filters to only those after the last summary marker, and passes them as input alongside the `conversation` parameter. The `conversation` parameter is still used — the conversation identity is preserved.
| `src/utils/conversation_compaction.py` | New module: `apply_compaction()` / `apply_compaction_blocking()`, `needs_compaction_path()`, marker helpers, per-conversation lock |
| `src/models/common/responses/responses_api_params.py` | `omit_conversation` flag — drops the `conversation` parameter from the request body in compacted mode |
| `src/app/endpoints/query.py` | Call `apply_compaction_blocking()` after preparing params; store the turn in compacted mode |
| `src/app/endpoints/streaming_query.py` | Compaction-aware SSE path that emits the `compaction` event before summarizing (R12) |
| `src/app/endpoints/a2a.py` | Inline compaction (no SSE event); store the turn on `response.completed` |
| `src/app/endpoints/responses.py` | Silent compaction (OpenAI-compatible); store the turn via `_append_previous_response_turn` |
| `src/models/responses.py` (now relocated) | `context_status` field — deferred to LCORE-1573 |
| `src/cache/` (all backends) | `ConversationSummary` storage — LCORE-1571 |

## How compaction is invoked

Rather than a hook buried inside `prepare_responses_params()`, compaction is a
reusable unit in `src/utils/conversation_compaction.py` that each endpoint
calls after its params are prepared:

- `apply_compaction_blocking(client, params, inference_config, compaction_config)`
returns a `CompactionResult` (possibly-rewritten params, a `summarized`
flag, and the `original_input`). Non-streaming `/v1/query`, A2A, and
`/v1/responses` use this.
- `apply_compaction(..., emit_events=True)` is the async-generator variant that
yields a `CompactionStartedEvent` before the summarization LLM call; the
native `/v1/streaming_query` SSE path uses it to satisfy R12.
- `needs_compaction_path(...)` is a cheap pre-stream predicate (no LLM, no lock)
that the streaming endpoint uses to route only actually-compacting
conversations through the in-stream path, leaving every other request on the
unchanged flow.

When compaction is active, the endpoint builds explicit input, the
`conversation` parameter is omitted (via `ResponsesApiParams.omit_conversation`),
and the completed turn is appended to the conversation items afterward.

## Fetching conversation history

Expand Down Expand Up @@ -369,6 +375,27 @@ Compaction adds latency only on the trigger turn. In PoC testing, compaction tur
- **Smaller model for summarization**: Allow configuring a cheaper model for the summarization call. Current design uses the same model for simplicity.
- **UI compaction indicator**: The `context_status` response field and the streaming compaction event (R12) provide the data. Coordinate with the UI team on how to display it.

# Changelog

**2026-05-26 — R10 redesign (Option A) during LCORE-1572 implementation.**
A live experiment on the deployed llama-stack 0.6.0 showed that passing the
`conversation` parameter to the Responses API always reloads the *full* stored
message history, with no marker-based selection hook. The original R10
(inject a marker, keep the `conversation` parameter, "select from the marker
onward") therefore cannot limit what the model sees. R10, R12, the
architecture flow, and the implementation guidance were revised to the
explicit-input approach: in compacted mode lightspeed-stack builds the input
itself and omits the `conversation` parameter, preserving `conversation_id`
and the full item history while controlling the LLM context. This restores the
spike's *original* Decision 6 recommendation (which a later spike edit had
changed to the marker approach). The summary cache (Decision 8 / LCORE-1571)
becomes a parallel persistence layer; the runtime boundary is the marker item
in Llama Stack. Compaction was also confirmed to apply to four endpoints —
`/v1/query`, `/v1/streaming_query`, the A2A executor, and `/v1/responses` —
not the two originally listed; `/v1/responses` compacts silently to preserve
OpenAI-API wire compatibility. Evidence and full reasoning: the spike doc
(`conversation-compaction-spike.md`).

# Appendix A: PoC Evidence

A proof-of-concept was built and tested.
Expand Down
53 changes: 48 additions & 5 deletions src/app/endpoints/a2a.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Handler for A2A (Agent-to-Agent) protocol endpoints using Responses API."""

# pylint: disable=too-many-lines

import asyncio
import json
import uuid
Expand Down Expand Up @@ -32,7 +34,7 @@
from llama_stack_api.openai_responses import (
OpenAIResponseObjectStream,
)
from llama_stack_client import APIConnectionError
from llama_stack_client import APIConnectionError, AsyncLlamaStackClient
from starlette.responses import Response, StreamingResponse

from a2a_storage import A2AContextStore, A2AStorageFactory
Expand All @@ -45,13 +47,18 @@
from constants import MEDIA_TYPE_EVENT_STREAM
from log import get_logger
from models.api.requests import QueryRequest
from models.common.responses.types import ResponseInput
from models.config import Action
from utils.conversation_compaction import (
apply_compaction_blocking,
store_compacted_turn,
)
from utils.mcp_headers import McpHeaders, mcp_headers_dependency
from utils.responses import (
extract_text_from_response_item,
prepare_responses_params,
)
from utils.suid import normalize_conversation_id
from utils.suid import normalize_conversation_id, to_llama_stack_conversation_id
from version import __version__

logger = get_logger(__name__)
Expand Down Expand Up @@ -336,6 +343,19 @@ async def _process_task_streaming( # pylint: disable=too-many-locals
store=True,
request_headers=self.request_headers,
)
# Compact the conversation if it is approaching the context window
# limit. A2A is not a browser SSE stream, so no progress event is
# emitted; the blocking variant summarizes inline before the call.
# No conversation cache is passed: the A2A executor has no resolved
# user_id for the (user_id, conversation_id) cache key, so A2A runs
# in marker-only mode (additive summaries, no persisted fold).
compaction = await apply_compaction_blocking(
client,
responses_params,
configuration.inference,
configuration.compaction,
)
responses_params = compaction.params
# Stream response from LLM using the Responses API
stream = await client.responses.create(**responses_params.model_dump())
except APIConnectionError as e:
Expand Down Expand Up @@ -392,9 +412,16 @@ async def _process_task_streaming( # pylint: disable=too-many-locals
)
)

# Process stream using generator and aggregator pattern
# Process stream using generator and aggregator pattern. In compacted
# mode the conversation parameter is not sent, so the turn is stored
# explicitly once the response completes (see _convert_stream_to_events).
async for a2a_event in self._convert_stream_to_events(
stream, task_id, context_id, conversation_id
stream,
task_id,
context_id,
conversation_id,
client,
compaction.original_input if compaction.compacted else None,
):
Comment thread
coderabbitai[bot] marked this conversation as resolved.
aggregator.process_event(a2a_event)
await event_queue.enqueue_event(a2a_event)
Expand All @@ -414,12 +441,14 @@ async def _process_task_streaming( # pylint: disable=too-many-locals
final=True,
)

async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-many-locals
async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-many-locals,too-many-arguments,too-many-positional-arguments
self,
stream: AsyncIterator[OpenAIResponseObjectStream],
task_id: str,
context_id: str,
conversation_id: Optional[str],
client: Optional[AsyncLlamaStackClient] = None,
compacted_original_input: Optional[ResponseInput] = None,
) -> AsyncIterator[Any]:
"""Convert Responses API stream chunks to A2A events.

Expand Down Expand Up @@ -508,6 +537,20 @@ async def _convert_stream_to_events( # pylint: disable=too-many-branches,too-ma

if response_obj:
output = getattr(response_obj, "output", [])
# In compacted mode the conversation parameter was not sent,
# so persist this turn ourselves to keep the recent-turn
# buffer and audit history intact for the next request.
if (
compacted_original_input is not None
and client is not None
and conversation_id
):
await store_compacted_turn(
client,
to_llama_stack_conversation_id(conversation_id),
compacted_original_input,
output,
)
a2a_parts = _convert_responses_content_to_a2a_parts(output)
if not a2a_parts and final_text:
a2a_parts = [Part(root=TextPart(text=final_text))]
Expand Down
Loading
Loading