Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/docs/concepts/requirements-system.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,30 @@ requirements = [
All requirements are validated after each generation attempt. The repair request lists
every requirement that failed, not just the first one, so the model can address all
issues in a single repair pass.

## Streaming validation

`stream_validate()` is the streaming counterpart to `validate()`. It is called
once per semantic chunk as tokens arrive from the model, before the full output
is available. Requirements that need to detect problems early — too many
sentences, a prohibited keyword in the first paragraph, unexpected JSON
structure mid-output — override `stream_validate()` to express that logic.

`stream_validate()` returns a `PartialValidationResult` with a tri-state `success`
field:

- `"unknown"` — no conclusion yet; the chunk is passed to the consumer and
`validate()` will be called at stream end.
- `"pass"` — the chunk looks valid so far; it is passed to the consumer and
`validate()` is still called at stream end (a streaming pass is informational,
not final).
- `"fail"` — the stream is cancelled immediately; no further chunks reach the
consumer; `validate()` is skipped for this requirement.

State isolation is per-clone: `stream_with_chunking()` copies each requirement
with `copy()` before starting the orchestrator, so the original objects are never
mutated. Requirements that accumulate state across chunks (e.g. a running word
count) should reassign mutable containers rather than mutate in place, since
clones share the original's `__dict__` values at copy time.

> **See also:** [Streaming with per-chunk validation](../how-to/use-async-and-streaming#streaming-with-per-chunk-validation)
3 changes: 2 additions & 1 deletion docs/docs/docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
"tutorials/02-streaming-and-async",
"tutorials/03-using-generative-stubs",
"tutorials/04-making-agents-reliable",
"tutorials/05-mifying-legacy-code"
"tutorials/05-mifying-legacy-code",
"tutorials/06-streaming-validation"
]
},
{
Expand Down
1 change: 1 addition & 0 deletions docs/docs/examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ to run.
| `context/` | Context inspection, sampling with context trees, parallel context branches |
| `sessions/` | Custom session types and backend selection |
| `async/` | How to utilize basic async capabilities |
| `streaming/` | `stream_with_chunking()` with per-chunk validation, typed event vocabulary, early-exit on fail |

### Data and documents

Expand Down
122 changes: 122 additions & 0 deletions docs/docs/how-to/use-async-and-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,128 @@ asyncio.run(sequential_chat())

For parallel generation, use `SimpleContext`.

## Streaming with per-chunk validation

`stream_with_chunking()` adds per-chunk validation to a streaming generation.
It splits the accumulated text into semantic units (sentences, words, or
paragraphs), calls `stream_validate()` on each chunk in parallel, and can
exit early if any requirement returns `"fail"` — preventing the consumer from
seeing invalid content mid-stream.

The primary way to observe a `stream_with_chunking()` run is via typed
`StreamEvent` objects from `result.events()`:

```python
# Requires: mellea
# Returns: None
import asyncio

from mellea.core.backend import Backend
from mellea.core.base import Context
from mellea.core.requirement import PartialValidationResult, Requirement, ValidationResult
from mellea.stdlib.components import Instruction
from mellea.stdlib.streaming import (
ChunkEvent,
CompletedEvent,
FullValidationEvent,
QuickCheckEvent,
StreamingDoneEvent,
stream_with_chunking,
)


class MaxSentencesReq(Requirement):
"""Fails if the model generates more than *limit* sentences."""

def __init__(self, limit: int) -> None:
super().__init__()
self._limit = limit
self._count = 0

def format_for_llm(self) -> str:
return f"The response must be at most {self._limit} sentences."

async def stream_validate(
self, chunk: str, *, backend: Backend, ctx: Context
) -> PartialValidationResult:
self._count += 1
if self._count > self._limit:
return PartialValidationResult("fail", reason="Too many sentences")
return PartialValidationResult("unknown")

async def validate(
self, backend: Backend, ctx: Context, *, format=None, model_options=None
) -> ValidationResult:
return ValidationResult(result=True)


async def main() -> None:
from mellea.stdlib.session import start_session

m = start_session()
action = Instruction("Write a two-sentence summary of the water cycle.")
req = MaxSentencesReq(limit=3)

result = await stream_with_chunking(
action, m.backend, m.ctx, requirements=[req], chunking="sentence"
)

async for event in result.events():
match event:
case ChunkEvent():
print(f" chunk[{event.chunk_index}]: {event.text!r}")
case QuickCheckEvent(passed=False):
print(f" FAIL at chunk {event.chunk_index}: {event.results}")
case StreamingDoneEvent():
print(f" stream done — {len(event.full_text)} chars")
case FullValidationEvent():
print(f" final: {'pass' if event.passed else 'fail'}")
case CompletedEvent():
print(f" completed — success={event.success}")
case _:
pass # ErrorEvent and other future types

await result.acomplete()
print(f"completed={result.completed}, failures={len(result.streaming_failures)}")


asyncio.run(main())
```

If you only need the raw validated text without event metadata, use
`result.astream()` instead:

```python
result = await stream_with_chunking(
action, m.backend, m.ctx, requirements=[req], chunking="sentence"
)
async for chunk in result.astream():
print(chunk)
await result.acomplete()
```

Both `astream()` (raw chunks) and `events()` are available on the same result
object. They use independent queues, so you can run them concurrently with
`asyncio.gather`. Both are **single-consumer** — a second iteration on either
will block indefinitely.

### The `stream_validate` tri-state

Each call to `stream_validate` returns a `PartialValidationResult` with one of
three values:

| Value | Meaning |
| ----- | ------- |
| `"unknown"` | No conclusion yet — wait for the full output before judging. |
| `"pass"` | This chunk is valid so far (informational; does not skip final `validate()`). |
| `"fail"` | Invalid — cancel the stream immediately and record a streaming failure. |

After a natural stream end, `validate()` is called on every non-`"fail"`
requirement (both `"pass"` and `"unknown"`). This means `"pass"` from
`stream_validate` does **not** replace the final `validate()` call.

> **See also:** [The Requirements System — Streaming validation](../concepts/requirements-system#streaming-validation)

---

**See also:** [Tutorial 02: Streaming and Async](../tutorials/02-streaming-and-async) | [act() and aact()](../how-to/act-and-aact)
Loading
Loading