feat(stdlib): streaming event types, events() iterator, and OTEL bridge (#902)#958
feat(stdlib): streaming event types, events() iterator, and OTEL bridge (#902)#958planetf1 wants to merge 24 commits into
Conversation
Adds an async cancel_generation() method that cancels in-progress _generate and _generate_extra tasks, drains the internal async queue to release any blocked put() calls, closes the open telemetry span, and sets _computed=True so the MOT is immediately usable. Required by the stream_with_chunking() orchestrator (generative-computing#901) for clean early-exit when a streaming requirement returns "fail". Assisted-by: Claude Code Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
…enerative-computing#901) Adds stream_with_chunking() — the core streaming orchestration primitive that consumes a ModelOutputThunk's async stream via a background asyncio.Task, applies a ChunkingStrategy to produce semantic chunks, and runs stream_validate() in parallel across all requirements at each chunk boundary. Key behaviours: - Early exit: if any requirement returns "fail" during streaming, generation is cancelled immediately via cancel_generation() and StreamChunkingResult.completed is set to False. - Final validation: after natural completion, validate() is called on all non-failed requirements. - Clone-per-call: requirements are cloned (copy(req)) before each invocation; originals are never mutated. - String aliases: "sentence", "word", "paragraph" map to the corresponding ChunkingStrategy subclasses. StreamChunkingResult exposes: - astream() — async iterator yielding individual validated chunks - acomplete() — await full completion including final validation - as_thunk — wrap full_text as a computed ModelOutputThunk - completed, full_text, final_validations, streaming_failures Re-exports StreamChunkingResult and stream_with_chunking from mellea.stdlib for day-to-day use. Assisted-by: Claude Code Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
Adds test/stdlib/test_streaming.py with 9 unit tests covering: - Normal completion: validate() called at stream end, completed=True - Early exit on "fail": completed=False, streaming_failures populated - Clone isolation: originals never mutated across retries - quick_check_backend routing: validation uses alternate backend - Deadlock prevention: early exit with asyncio.wait_for timeout - as_thunk correctness: value=full_text, raises before acomplete() - astream() yields individual chunks (not accumulated text) - No requirements: streams without validation StreamingMockBackend subclasses Backend and feeds a fixed response string into a MOT queue char-by-char via asyncio.create_task, following the create_manual_mock_thunk() pattern from test_astream_mock.py. Assisted-by: Claude Code Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
Adds docs/examples/streaming/streaming_chunking.py demonstrating stream_with_chunking() end-to-end: defining a custom stream_validate() override, consuming chunks via astream(), and awaiting acomplete() to inspect final_validations and streaming_failures. Assisted-by: Claude Code Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
Fixes [no_class_args] CI failure — the docs build-and-validate checker requires __init__ parameters to be documented in the class docstring (not __init__) per Option C convention. Assisted-by: Claude Code
Fixes second [no_raises] CI failure — stream_with_chunking raises ValueError for unknown chunking aliases but had no Raises: section. Assisted-by: Claude Code
…e call Aligns the orchestrator with the chunk-at-a-time design set out in the generative-computing#891 epic and generative-computing#900 spec. Previously _orchestrate_streaming passed the full accumulated text to stream_validate, and called it once per batch of new chunks rather than once per chunk. This masked the design intent of the chunking strategy and forced stateful requirements into the self._seen_len workaround. Behaviour changes: - stream_validate is called once per complete chunk produced by the chunking strategy (not once per astream() iteration) - The call receives that single chunk (not the accumulated text) - Multiple chunks from one astream() iteration are validated in order; early exit on a "fail" prevents later chunks in the same batch from being validated or emitted - On early exit, the failing chunk is no longer emitted to the consumer; consumers inspect StreamChunkingResult.streaming_failures instead (previous behaviour emitted whatever the current batch contained) Test changes: - FailAfterWordsReq now maintains a running word count on self, since each stream_validate call sees a one-word chunk rather than the growing accumulation - New test_stream_validate_receives_individual_chunks asserts the per-chunk contract directly by capturing the cloned requirement and checking the chunks it saw Docstring updated to describe the per-chunk contract, the in-order validation of a batch, the non-emission of failing chunks, and the MOT single-consumer constraint. Assisted-by: Claude Code
Two documentation fixes following the per-chunk semantics correction: - streaming_chunking.py: MaxSentencesReq previously counted sentence-end punctuation in the chunk, which worked under the old accumulated-text behaviour but returns at most 1 per sentence under delta semantics. Rewritten to increment self._count once per chunk -- the canonical pattern for a requirement that needs context beyond a single chunk. - stream_with_chunking docstring: add a Note that chunks are emitted to the consumer only after every active validator returns for that chunk. A slow stream_validate (e.g. an LLM-based one) therefore adds latency to every chunk. The invariant preserved is that the consumer never sees unvalidated content; a concurrent-emission fast path may be added in future if a concrete use case calls for it. Assisted-by: Claude Code
ChunkingStrategy.split() withholds the trailing fragment by design (generative-computing#899). Previously the orchestrator discarded it — it appeared in full_text and the final validate() saw it, but it was never yielded to astream() consumers and never seen by stream_validate. For a response that did not end in a chunk terminator (e.g. "Sentence one. Sentence two." with no trailing whitespace under SentenceChunker), the last sentence silently bypassed streaming validation. Adds ChunkingStrategy.flush(accumulated_text) -> list[str]: - Default in the ABC returns [] (backward-compatible — external chunkers retain the old discard behaviour until they opt in). - SentenceChunker, WordChunker, ParagraphChunker each override to return the withheld trailing fragment as a single-element list. _orchestrate_streaming calls chunking.flush(accumulated) after the main loop (only when the stream ended naturally, not on early exit — a cancelled stream's trailing fragment is by definition incomplete). Each flushed chunk goes through the same stream_validate / emit path as regular chunks, so the "no unvalidated content reaches the consumer" invariant extends to the trailing fragment, and a fail on the fragment still records a streaming failure and skips final validate(). Tests: - 13 new chunker tests covering the default-discard behaviour and each built-in's flush logic (empty input, fragment-present, already- terminated cases). - test_trailing_fragment_is_flushed_to_consumer: stream_validate sees the fragment and astream yields it. - test_early_exit_on_trailing_fragment: fail on the flushed fragment propagates to streaming_failures and skips final validation. Assisted-by: Claude Code
Addresses issues raised by independent review on top of PR generative-computing#942. Orchestrator (mellea/stdlib/streaming.py): - except Exception now calls mot.cancel_generation() before surfacing the exception to the consumer — previously the backend producer was left running, eventually blocking on mot._async_queue (maxsize=20). Cleanup failures are logged via MelleaLogger.warning with a TODO(generative-computing#902) marker; generative-computing#902 replaces the log with a proper ErrorEvent. - RuntimeError catch in the astream() loop now re-raises unless mot.is_computed() is true, so only the documented "already computed" race is swallowed. - astream() docstring now states the single-consumer contract explicitly; a second iteration blocks on an empty queue with no sentinel to deliver. - as_thunk docstring now flags the early-exit case: cancel_generation forces is_computed=True without running post_processing(), so generation.usage and related telemetry fields may be None. Chunker (mellea/stdlib/chunking.py): - SentenceChunker.flush switches from .strip() to .rstrip() with a comment explaining why: the loop's lstrip has already removed leading whitespace, and trailing whitespace on a sentence fragment is non-semantic (consistent with split() returning sentences without trailing whitespace). - ParagraphChunker.flush adds a docstring noting the deliberate asymmetry: paragraph fragments are returned byte-for-byte because internal whitespace (e.g. trailing \n of a list item) can be semantically meaningful. Tests (test/stdlib/test_streaming.py): - test_stream_validate_receives_individual_chunks now uses exact- match on the captured chunk list, which directly regresses if someone reverts to accumulated-text semantics. - test_multiple_chunks_in_one_batch_with_mid_batch_fail: response fed as one large token so split() yields 4 sentences at once; verifies chunk 1 emits, chunk 2 fails (not emitted), chunks 3 and 4 are neither validated nor emitted. - test_cancel_generation_invoked_on_fail: spies on ModelOutputThunk.cancel_generation and asserts it was called on the "fail" early-exit path. - test_exception_in_stream_validate_cancels_generation: a requirement that raises must cause cancel_generation to run and the exception to surface via astream()/acomplete() without hanging. Telemetry observability (orchestrator-level spans, metrics, span events) remains deferred to generative-computing#902 per the epic, which now has the acceptance criteria updated to cover event emission, the OTEL bridge, and the ErrorEvent type that will replace the MelleaLogger stopgap. Assisted-by: Claude Code
Three items from the second independent review:
cancel_generation(error=) — accept an optional Exception parameter.
When the orchestrator enters the except Exception path, it now passes
the caught exception to cancel_generation() so the backend telemetry
span records the real cause via set_span_error instead of a generic
RuntimeError("Generation cancelled"). The original exception still
surfaces to the consumer via astream()/acomplete(); this is purely an
OTEL accuracy fix. Backward-compatible: the default None preserves the
previous "Generation cancelled" message for the normal fail path.
stream_with_chunking docstring — the "After the stream ends (naturally
or via early exit), validate() is called" wording overstated behaviour.
The orchestrator actually skips final validate() on early exit
(test_early_exit_on_fail verifies final_validations == []). Docstring
now correctly says final validate() runs only on natural completion.
test_exception_in_stream_validate_cancels_generation docstring — the
test fails on chunk 1 so the queue never actually fills; it verifies
the cancel-on-exception path and the no-hang guarantee but does not
directly prove the worst-case "producer blocked on full queue"
scenario. Docstring now states what it actually covers and points at
test/core/ for the cancel_generation drain logic.
Assisted-by: Claude Code
The Docs CI docstring quality gate [no_class_args]-equivalent check requires every documented method with typed params to have an Args section and a Returns section matching the return annotation. SentenceChunker.flush, WordChunker.flush, and ParagraphChunker.flush all took accumulated_text and returned list[str] without the sections. Add both to each override, documenting each flush's specific semantics (rstrip for sentences, whitespace-split trailing fragment for words, byte-for-byte for paragraphs). Assisted-by: Claude Code
- _orchestrate_streaming: add cancel_generation() in finally block so the backend producer is stopped even on external CancelledError (BaseException bypasses except Exception, leaving _generate hung on a full queue) - cancel_generation: replace .get + del on _telemetry_span with .pop to prevent KeyError if two coroutines race before _computed is set - Example and test doubles: add super().__init__() to Requirement subclasses so description/validation_fn/_output are always initialised - docs/examples: fix pytest tier marker integration → e2e (Ollama example must be e2e per MARKERS_GUIDE; all peer examples use e2e) - test_quick_check_backend_routing: capture clone via __copy__ intercept and assert all seen_backends are val_backend, not just clone-isolation check Assisted-by: Claude Code
…bridge (generative-computing#902) Adds eight typed event dataclasses (StreamEvent base + ChunkEvent, QuickCheckEvent, StreamingDoneEvent, FullValidationEvent, RetryEvent, CompletedEvent, ErrorEvent) with auto-populated timestamps. Wires event emission into _orchestrate_streaming: - ChunkEvent emitted per chunk passed to the consumer - QuickCheckEvent after each stream_validate batch (pass or fail) - StreamingDoneEvent when the raw stream ends naturally - FullValidationEvent after the post-stream validate() calls complete - ErrorEvent replaces the MelleaLogger.warning stopgap in the except branch (removes TODO(generative-computing#902) marker) - CompletedEvent in the finally block, guaranteed on every exit path Adds StreamChunkingResult.events() single-consumer async iterator backed by an independent queue — can be consumed concurrently with astream(). Wraps the orchestrator in trace_application("stream_with_chunking") to open an OTEL application span for the full orchestration lifetime. Calls record_requirement_check, record_requirement_failure, record_sampling_outcome, and record_error at the appropriate emission points. Uses set_span_error on early-exit fail and on unhandled exceptions. Exports all eight event types from mellea.stdlib.__init__. Assisted-by: Claude Code
…mputing#902) Ten new tests for the Wave 4 additions: - test_stream_event_types_have_auto_timestamp: all seven event types auto-populate timestamp on construction - test_event_emission_order_happy_path: full sequence (ChunkEvent, QuickCheckEvent, StreamingDoneEvent, FullValidationEvent, CompletedEvent) in correct order on a two-sentence generation - test_streaming_done_event_carries_full_text: StreamingDoneEvent.full_text matches result.full_text - test_event_emission_on_early_exit: no StreamingDoneEvent or FullValidationEvent, QuickCheckEvent(passed=False) present, CompletedEvent(success=False) - test_error_event_on_stream_validate_exception: ErrorEvent emitted with correct exception_type and detail, no log warning - test_record_requirement_check_called_per_chunk: metric helper called once per sentence chunk - test_record_requirement_failure_called_on_fail: called with requirement class name and reason string - test_record_sampling_outcome_success: called with "stream_with_chunking" and success=True - test_record_sampling_outcome_failure_on_early_exit: called with success=False - test_concurrent_astream_and_events: astream() and events() consumed concurrently via asyncio.gather without interference 26 tests total, all passing. Assisted-by: Claude Code
…ative-computing#902) use-async-and-streaming.md: new "Streaming with per-chunk validation" section covering stream_with_chunking() motivation, a minimal example with MaxSentencesReq, the stream_validate tri-state table, and both consumption patterns (astream() and events()) with a match dispatch example. Notes single-consumer discipline and concurrent usage. requirements-system.md: new "Streaming validation" section explaining stream_validate() as the streaming counterpart to validate(), the tri-state PartialValidationResult semantics, state isolation via per-clone copy, and a cross-link to the how-to page. Assisted-by: Claude Code
…omputing#902) Replaces the astream() chunk loop with an events() loop using structural pattern matching. Shows all six emitted event types: ChunkEvent, QuickCheckEvent (pass and fail variants), StreamingDoneEvent, FullValidationEvent, and CompletedEvent. Updates the module docstring to describe the events() consumption pattern. Assisted-by: Claude Code
…g#902) The streaming/ directory (introduced in Wave 3) was missing from docs/docs/examples/index.md, causing the CI examples-catalogue check to fail. Add an entry under Core concepts alongside async/. Assisted-by: Claude Code
…es (generative-computing#902) The docstring quality gate (--fail-on-quality) requires Args: sections in class docstrings for constructor parameters. Dataclass fields are constructor parameters, so they need Args:, not Attributes:. The seven event subclasses (ChunkEvent, QuickCheckEvent, StreamingDoneEvent, FullValidationEvent, RetryEvent, CompletedEvent, ErrorEvent) previously used Attributes: which the auditor could not resolve to __init__ params. StreamEvent keeps Attributes: for `timestamp` because it is init=False and does not appear as a constructor parameter. Assisted-by: Claude Code
…tive-computing#902) - Fix QuickCheckEvent.passed to reflect per-chunk result (was using cumulative failed_indices set, causing false negatives on all chunks after the first failure) - Replace synthetic RuntimeError objects in early-exit set_span_error calls with set_span_status_error helper (no phantom exception events in OTEL traces); add set_span_status_error to mellea/telemetry/tracing.py - Reorder result.completed = False to top of except block so the flag is set before ErrorEvent is enqueued (consistent consumer observation) - Update acomplete() Raises: docstring to reflect that Exception types surface via astream(), only BaseException propagates directly - Add events() docstring note that events() itself never raises - Add _event_queue comment noting unconditional production / opt-in consumption - Add StreamEvent docstring note for subclassers on init=False fields - Add RetryEvent "not emitted in v1" comment in __init__.__all__ - Fix test: move import time to module level in test_streaming.py - Add docstring to test_unknown_chunking_alias_raises_value_error - Rewrite how-to streaming section to lead with events() as primary API; demote astream()-only example to secondary; add case _: pass fallback to all match event: blocks Assisted-by: Claude Code
…enerative-computing#902) Consistent with the how-to doc; covers RetryEvent and any future types. Assisted-by: Claude Code
…omputing#902) Three new streaming examples alongside the existing streaming_chunking.py: - word_chunking.py: WordChunker alias — forbidden-word detection at the highest granularity; O(1) set check per token, early exit on first bad word - paragraph_chunking.py: ParagraphChunker alias — per-paragraph word-count gate; validates entire \n\n-delimited blocks, useful for structure/length checks that require full paragraph context - custom_chunking.py: ChunkingStrategy subclass — LineChunker splitting on single \n; validates numbered-list output line-by-line; demonstrates split()+flush() extension pattern All three verified running against granite4:micro (Ollama local). Assisted-by: Claude Code
…lly guard, telemetry pop race fix, super().__init__() in test doubles, e2e marker, ValueError test) - Set result.completed=False in finally block before cancel_generation() so external CancelledError (BaseException, bypasses except Exception) does not leave result.completed=True and emit a misleading CompletedEvent/metric - Add regression test test_cancelled_task_sets_completed_false (27th test); documents Python 3.12 C Task cancellation-before-start behaviour and the asyncio.sleep(0) scheduling requirement - Document O(n) re-scan cost in ChunkingStrategy class docstring and split() Args; note copy()-cloning constraint for stateful subclasses Assisted-by: Claude Code
…generative-computing#902) - Fix misleading StreamEvent docstring (init=False ordering explanation) - Fix events() docstring: QuickCheckEvent fires before ChunkEvent, not after - Add _events_consumed guard to events() for single-consumer enforcement - Move StreamingDoneEvent emission to before flush loop (token stream is done regardless of flush validation outcome) - Guard FullValidationEvent/final_validations list with list() copy to prevent aliasing between result attribute and event payload - Add cancelled-task guard in acomplete() to avoid CancelledError from task.exception() on externally-cancelled tasks - Switch terminal finally bookkeeping to put_nowait() to eliminate await points and guarantee _done.set() runs under pending CancelledError - Add mot.is_computed() guard in except block to avoid double-cancel - Remove inline comment from __all__ RetryEvent entry - Fix word_chunking.py example: preserve original-case word list for LLM prompt - Add test for no-requirements path omitting FullValidationEvent - Fix test: assert QuickCheckEvent precedes ChunkEvent within each pair Assisted-by: Claude Code Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
|
PR failure is known issue with HF/intrinsics changes. (one fix is here) |
|
@nrfulton are you able to take a look. I have another or stacked behind this one. With the final batch of changes |
|
this is based on #942 - will revisit this pr that is resolved and approved (red X is just the pipeline issue from last week and should clear on next push) |
jakelorocco
left a comment
There was a problem hiding this comment.
I'll take another look when the underlying review gets closed out but this seems good to go.
Yes - once the part 3 PR is merged I will rebase this one and re-work accordingly - didn't want to waste time on it before that. Good to hear the overall 'shape' looks ok. |
|
PR beneath is stack now merged. Working on rebasing/verifying |
|
closing - to be replaced by #1095 |
Misc PR
Type of PR
Description
Part of epic #891 · Wave 4 of 4 (final wave — nothing further is planned after this PR).
Adds the streaming event-type vocabulary, wires the OTEL bridge into the orchestrator, replaces the
MelleaLogger.warningstopgap withErrorEvent, and ships all narrative docs for the streaming validation epic.What changed
mellea/stdlib/streaming.py— eight typed event dataclasses (StreamEventbase +ChunkEvent,QuickCheckEvent,StreamingDoneEvent,FullValidationEvent,RetryEvent,CompletedEvent,ErrorEvent) with auto-populatedtimestamp.StreamChunkingResult.events()single-consumer async iterator backed by an independent queue — can be consumed concurrently withastream(). Event emission wired into_orchestrate_streamingat every lifecycle point. Orchestrator wrapped intrace_application("stream_with_chunking")for an OTEL application span across the full orchestration lifetime. Metric helper calls at each emission point (record_requirement_check,record_requirement_failure,record_sampling_outcome,record_error).ErrorEventreplaces theMelleaLogger.warningstopgap in theexcept Exceptionbranch.QuickCheckEvent.passedreflects per-chunk result (not cumulative state). Early-exit paths useset_span_status_errorrather than a phantomRuntimeErrorto mark the OTEL span as failed.result.completed = Falseset before any event is enqueued in the exception path.mellea/telemetry/tracing.py— newset_span_status_error()helper: sets ERROR status on a span without recording a phantom exception event (use this for validation failures where no exception was actually raised).mellea/stdlib/__init__.py— exports all eight event types.docs/docs/how-to/use-async-and-streaming.md— new "Streaming with per-chunk validation" section: leads withevents()as the primary API with a fullmatchdispatch example includingcase _: passfallback;astream()shown as a secondary alternative for raw-chunk access;stream_validatetri-state table.docs/docs/concepts/requirements-system.md— new "Streaming validation" section:stream_validate()semantics, tri-statePartialValidationResult, state isolation via per-clone copy, cross-link to how-to page.docs/examples/streaming/streaming_chunking.py— updated to useevents()API withmatchdispatch on all six emitted event types;case _: passfallback forRetryEventand future types.Testing
test/stdlib/test_streaming.py— 12 new tests (28 total, all pass): event timestamp construction, event emission order on happy path (including intra-pair QuickCheck-before-Chunk ordering),StreamingDoneEvent.full_textcontent, early-exit event sequence,ErrorEventonstream_validateexception,record_requirement_checkcall count,record_requirement_failureargs,record_sampling_outcomeon success and on early-exit fail, concurrentastream()+events()consumption, no-requirements path omitsFullValidationEvent, single-consumer guard raises on second drain.Attribution
Stacked PR
Prerequisite: #942 (Wave 3 —
stream_with_chunking(),StreamChunkingResultskeleton, sentence chunker flush).This PR stacks on
feat/901-stream-with-chunking. The diff againstmainincludes Wave 3 changes from that PR. When reviewing, focus on the eleven commits added here:What to ignore (Wave 3, covered by #942)
mellea/core/base.py—cancel_generationadditionmellea/stdlib/chunking.py—flush()implementation and flush override overloadsmellea/stdlib/streaming.py—_orchestrate_streamingorchestrator,StreamChunkingResultclass skeleton,stream_with_chunking()entry pointtest/stdlib/test_streaming.py— the 16 orchestrator tests from Wave 3