Skip to content

Add temporalio.contrib.pubsub module#1423

Open
jssmith wants to merge 72 commits intomainfrom
contrib/pubsub
Open

Add temporalio.contrib.pubsub module#1423
jssmith wants to merge 72 commits intomainfrom
contrib/pubsub

Conversation

@jssmith
Copy link
Copy Markdown
Contributor

@jssmith jssmith commented Apr 7, 2026

What was changed

Adds temporalio.contrib.pubsub, a reusable pub/sub primitive for streaming data out of Temporal workflows.

Why?

Streaming incremental results from long-running workflows (e.g., AI agent token streams, progress updates) is a common need with no built-in solution. This module provides a correct, reusable implementation so users don't have to roll their own poll/signal/dedup logic.

Checklist

  1. Closes — N/A (new contrib module, no existing issue)

  2. How was this tested:

    • 24 pytest tests in tests/contrib/pubsub/test_pubsub.py covering batching, flush safety, CAN serialization, replay guards, dedup (TTL pruning, truncation), offset-based resumption, max_batch_size, drain, and error handling
    • Demo application
    • Shared with prospective users
  3. Any docs updates needed?

    • Module includes README.md with usage examples and API reference
    • Design docs: DESIGN-v2.md, and addenda covering CAN, dedup, and topic semantics
    • No docs.temporal.io updates yet — will add once the API stabilizes

jssmith and others added 15 commits April 5, 2026 21:33
A workflow mixin (PubSubMixin) that turns any workflow into a pub/sub
broker. Activities and starters publish via batched signals; external
clients subscribe via long-poll updates exposed as an async iterator.

Key design decisions:
- Payloads are opaque bytes for cross-language compatibility
- Topics are plain strings, no hierarchy or prefix matching
- Global monotonic offsets (not per-topic) for simple continuation
- Batching built into PubSubClient with Nagle-like timer + priority flush
- Structured concurrency: no fire-and-forget tasks, trio-compatible
- Continue-as-new support: drain_pubsub() + get_pubsub_state() + validator
  to cleanly drain polls, plus follow_continues on the subscriber side

Module layout:
  _types.py  — PubSubItem, PublishInput, PollInput, PollResult, PubSubState
  _mixin.py  — PubSubMixin (signal, update, query handlers)
  _client.py — PubSubClient (batcher, async iterator, CAN resilience)

9 E2E integration tests covering: activity publish + subscribe, topic
filtering, offset-based replay, interleaved workflow/activity publish,
priority flush, iterator cancellation, context manager flush, concurrent
subscribers, and mixin coexistence with application signals/queries.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
PubSubState is now a Pydantic model so it survives serialization through
Pydantic-based data converters when embedded in Any-typed fields. Without
this, continue-as-new would fail with "'dict' object has no attribute 'log'"
because Pydantic deserializes Any fields as plain dicts.

Added two CAN tests:
- test_continue_as_new_any_typed_fails: documents that Any-typed fields
  lose PubSubState type information (negative test)
- test_continue_as_new_properly_typed: verifies CAN works with properly
  typed PubSubState | None fields

Simplified subscribe() exception handling: removed the broad except
Exception clause that tried _follow_continue_as_new() on every error.
Now only catches WorkflowUpdateRPCTimeoutOrCancelledError for CAN follow.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
README.md: usage-oriented documentation covering workflow mixin, activity
publishing, subscribing, continue-as-new, and cross-language protocol.

flush() safety: items are now removed from the buffer only after the
signal succeeds. Previously, buffer.clear() ran before the signal,
losing items on failure. Added test_flush_retains_items_on_signal_failure.

init_pubsub() guard: publish() and _pubsub_publish signal handler now
check for initialization and raise a clear RuntimeError instead of a
cryptic AttributeError.

PubSubClient.for_workflow() factory: preferred constructor that takes a
Client + workflow_id. Enables follow_continues in subscribe() without
accessing private WorkflowHandle._client. The handle-based constructor
remains for simple cases that don't need CAN following.

activity_pubsub_client() now uses for_workflow() internally with proper
keyword-only typed arguments instead of **kwargs: object.

CAN test timing: replaced asyncio.sleep(2) with assert_eq_eventually
polling for a different run_id, matching sdk-python test patterns.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
_pubsub_poll and _pubsub_offset now call _check_initialized() for a
clear RuntimeError instead of cryptic AttributeError when init_pubsub()
is forgotten.

README CAN example now includes the required imports (@DataClass,
workflow) and @workflow.init decorator.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The poll validator accesses _pubsub_draining, which would AttributeError
if init_pubsub() was never called. Added _check_initialized() guard.

Fixed PubSubState docstring: the field must be typed as PubSubState | None,
not Any. The old docstring incorrectly implied Any-typed fields would work.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
get_pubsub_state() and drain_pubsub() now call _check_initialized().
Previously drain_pubsub() could silently set _pubsub_draining on an
uninitialized instance, which init_pubsub() would then reset to False.

New tests:
- test_max_batch_size: verifies auto-flush when buffer reaches limit,
  using max_cached_workflows=0 to also test replay safety
- test_replay_safety: interleaved workflow/activity publish with
  max_cached_workflows=0, proving the mixin is determinism-safe

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Review comments (#@agent: annotations) capture design questions on:
- Topic offset model and information leakage (resolved: global offsets
  with BFF-layer containment, per NATS JetStream model)
- Exactly-once publish delivery (resolved: publisher ID + sequence number
  dedup, per Kafka producer model)
- Flush concurrency (resolved: asyncio.Lock with buffer swap)
- CAN follow behavior, poll rate limiting, activity context detection,
  validator purpose, pyright errors, API ergonomics

DESIGN-ADDENDUM-TOPICS.md: full exploration of per-topic vs global offsets
with industry survey (Kafka, Redis, NATS, PubNub, Google Pub/Sub,
RabbitMQ). Concludes global offsets are correct for workflow-scoped
pub/sub; leakage contained at BFF trust boundary.

DESIGN-ADDENDUM-DEDUP.md: exactly-once delivery via publisher ID +
monotonic sequence number. Workflow dedup state is dict[str, int],
bounded by publisher count. Buffer swap pattern with sequence reuse
on failure. PubSubState carries publisher_sequences through CAN.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Types:
- Remove offset from PubSubItem (global offset is now derived)
- Add publisher_id + sequence to PublishInput for exactly-once dedup
- Add base_offset + publisher_sequences to PubSubState for CAN
- Use Field(default_factory=...) for Pydantic mutable defaults

Mixin:
- Add _pubsub_base_offset for future log truncation support
- Add _pubsub_publisher_sequences for signal deduplication
- Dedup in signal handler: reject if sequence <= last seen
- Poll uses base_offset arithmetic for offset translation
- Class-body type declarations for basedpyright compatibility
- Validator docstring explaining drain/CAN interaction
- Module docstring gives specific init_pubsub() guidance

Client:
- asyncio.Lock + buffer swap for flush concurrency safety
- Publisher ID (uuid) + monotonic sequence for exactly-once delivery
- Sequence advances on failure to prevent data loss when new items
  merge with retry batch (found via Codex review)
- Remove follow_continues param — always follow CAN via describe()
- Configurable poll_interval (default 0.1s) for rate limiting
- Merge activity_pubsub_client() into for_workflow() with auto-detect
- _follow_continue_as_new is async with describe() check

Tests:
- New test_dedup_rejects_duplicate_signal
- Updated flush failure test for new sequence semantics
- All activities use PubSubClient.for_workflow()
- Remove PubSubItem.offset assertions
- poll_interval=0 in test helper for speed

Docs:
- DESIGN-v2.md: consolidated design doc superseding original + addenda
- README.md: updated API reference
- DESIGN-ADDENDUM-DEDUP.md: corrected flush failure semantics

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rewrite the client-side dedup algorithm to match the formally verified
TLA+ protocol: failed flushes keep a separate _pending batch and retry
with the same sequence number. Only advance the confirmed sequence on
success. TLC proves NoDuplicates and OrderPreserved for the correct
algorithm, and finds duplicates in the old algorithm.

Add TTL-based pruning of publisher dedup entries during continue-as-new
(default 15 min). Add max_retry_duration (default 600s) to bound client
retries — must be less than publisher_ttl for safety. Both constraints
are formally verified in PubSubDedupTTL.tla.

Add truncate_pubsub() for explicit log prefix truncation. Add
publisher_last_seen timestamps for TTL tracking. Preserve legacy state
without timestamps during upgrade.

API changes: for_workflow→create, flush removed (use priority=True),
poll_interval→poll_cooldown, publisher ID shortened to 16 hex chars.

Includes TLA+ specs (correct, broken, inductive, multi-publisher TTL),
PROOF.md with per-action preservation arguments, scope and limitations.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New analysis document evaluates whether publishing should use signals
or updates, examining Temporal's native dedup (Update ID per-run,
request_id for RPCs) vs the application-level (publisher_id, sequence)
protocol. Conclusion: app-level dedup is permanent for signals but
could be dropped for updates once temporal/temporal#6375 is fixed.
Non-blocking flush keeps signals as the right choice for streaming.

Updates DESIGN-v2.md section 6 to be precise about the two Temporal
guarantees that signal ordering relies on: sequential send order and
history-order handler invocation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Analyzes deduplication through the end-to-end principle lens. Three
types of duplicates exist in the pipeline, each handled at the layer
that introduces them:

- Type A (duplicate LLM work): belongs at application layer — data
  escapes to consumers before the duplicate exists, so only the
  application can resolve it
- Type B (duplicate signal batches): belongs in pub/sub workflow —
  encapsulates transport details and is the only layer that can
  detect them correctly
- Type C (duplicate SSE delivery): belongs at BFF/browser layer

Concludes the (publisher_id, sequence) protocol is correctly placed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… design

Fill gaps identified during design review:
- Document why per-topic offsets were rejected (trust model, cursor
  portability, unjustified complexity) inline rather than only in historical
  addendum
- Expand BFF section with the four reconnection options considered and
  the decision to use SSE Last-Event-ID with BFF-assigned gapless IDs
- Add poll efficiency characteristics (O(new items) common case)
- Document BFF restart fallback (replay from turn start)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Wire types (PublishEntry, _WireItem, PollResult, PubSubState) encode
data as base64 strings for cross-language compatibility across all
Temporal SDKs. User-facing types (PubSubItem) use native bytes.

Conversion happens inside handlers:
- Signal handler decodes base64 → bytes on ingest
- Poll handler encodes bytes → base64 on response
- Client publish() accepts bytes, encodes for signal
- Client subscribe() decodes poll response, yields bytes

This means Go/Java/.NET ports get cross-language compat for free since
their JSON serializers encode byte[] as base64 by default.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Comment thread temporalio/contrib/pubsub/_types.py Outdated
jssmith and others added 14 commits April 7, 2026 20:10
Remove the bounded poll wait from PubSubMixin and trim trailing
whitespace from types. Update DESIGN-v2.md with streaming plugin
rationale (no fencing needed, UI handles repeat delivery).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add opt-in streaming code path to both agent framework plugins.
When enabled, the model activity calls the streaming LLM endpoint,
publishes TEXT_DELTA/THINKING_DELTA/TOOL_CALL_START events via
PubSubClient as a side channel, and returns the complete response
for the workflow to process (unchanged interface).

OpenAI Agents SDK:
- ModelActivityParameters.enable_streaming flag
- New invoke_model_activity_streaming method on ModelActivity
- ModelResponse reconstructed from ResponseCompletedEvent
- Uses @_auto_heartbeater for periodic heartbeats
- Routing in _temporal_model_stub (rejects local activities)

Google ADK:
- TemporalModel(streaming=True) constructor parameter
- New invoke_model_streaming activity using stream=True
- Registered in GoogleAdkPlugin

Both use batch_interval=0.1s for near-real-time token delivery.
No pubsub module changes needed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The Pydantic BaseModel was introduced as a workaround for Any-typed fields
losing type information during continue-as-new serialization. The actual fix
is using concrete type annotations (PubSubState | None), which the default
data converter handles correctly for dataclasses — no Pydantic dependency
needed.

This removes the pydantic import from the pubsub contrib module entirely,
making it work out of the box with the default data converter. All 18 tests
pass, including both continue-as-new tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Implements DESIGN-ADDENDUM-ITEM-OFFSET.md. The poll handler now annotates
each item with its global offset (base_offset + position in log), enabling
subscribers to track fine-grained consumption progress for truncation.
This is needed for the voice-terminal agent where audio chunks must not be
truncated until actually played, not merely received.

- Add offset field to PubSubItem and _WireItem (default 0)
- Poll handler computes offset from base_offset + log_offset + enumerate index
- subscribe() passes wire_item.offset through to yielded PubSubItem
- Tests: per-item offsets, offsets with topic filtering, offsets after truncation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Documents the motivation and design for adding offset fields to
PubSubItem and _WireItem, enabling subscribers to track consumption
at item granularity rather than batch boundaries. Driven by the
voice-terminal agent's need to truncate only after audio playback,
not just after receipt.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Three changes:

1. Poll handler: replace ValueError with ApplicationError(non_retryable=True)
   when requested offset has been truncated. This fails the UPDATE (client
   gets the error) without crashing the WORKFLOW TASK — avoids the poison
   pill during replay that caused permanent workflow failures.

2. Poll handler: treat from_offset=0 as "from the beginning of whatever
   exists" (i.e., from base_offset). This lets subscribers recover from
   truncation by resubscribing from 0 without knowing the current base.

3. PubSubClient.subscribe(): catch WorkflowUpdateFailedError with type
   TruncatedOffset and retry from offset 0, auto-recovering.

New tests:
- test_poll_truncated_offset_returns_application_error
- test_poll_offset_zero_after_truncation
- test_subscribe_recovers_from_truncation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Verify that PubSubClient can subscribe to events from a different
workflow (same namespace) and that Nexus operations can start pub/sub
broker workflows in a separate namespace with cross-namespace
subscription working end-to-end. No library changes needed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Poll responses now estimate wire size (base64 data + topic) and stop
adding items once the response exceeds 1MB. The new `more_ready` flag
on PollResult tells the subscriber that more data is available, so it
skips the poll_cooldown sleep and immediately re-polls. This avoids
unnecessary latency during big reloads or catch-up scenarios while
keeping individual update payloads within Temporal's recommended limits.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Codify the four wire evolution rules that have been followed implicitly
through four addenda: additive-only fields with defaults, immutable
handler names, forward-compatible PubSubState, and no application-level
version negotiation. Includes a precedent table showing all past changes
and reasoning for why version fields in payloads would cause silent data
loss on signals.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
After max_retry_duration expires, the client dropped the pending batch
without advancing _sequence. The next batch reused the same sequence
number, which could be silently deduplicated by the workflow if the
timed-out signal was actually delivered — causing permanent data loss
for those items.

The fix advances _sequence to _pending_seq before clearing _pending,
ensuring subsequent batches always get a fresh sequence number.

TLA+ verification:
- Added DropPendingBuggy/DropPendingFixed actions to PubSubDedup.tla
- Added SequenceFreshness invariant: (pending=<<>>) => (confirmed_seq >= wf_last_seq)
- BuggyDropSpec FAILS SequenceFreshness (confirmed_seq=0 < wf_last_seq=1)
- FixedDropSpec PASSES all invariants (489 distinct states)
- NoDuplicates passes for both — the bug causes data loss, not duplicates

Python test:
- test_retry_timeout_sequence_reuse_causes_data_loss demonstrates the
  end-to-end consequence: reused seq=1 is rejected, fresh seq=2 accepted

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
# Conflicts:
#	temporalio/contrib/google_adk_agents/_model.py
This is a new release with no legacy to support. Changes:

- _mixin.py: Remove ts-is-None fallback that retained publishers without
  timestamps. All publishers always have timestamps, so this was dead code.
- _types.py: Clean up docstrings referencing addendum docs
- DESIGN-v2.md: Remove backward-compat framing, addendum references, and
  historical file listing. Keep the actual evolution rules.
- PROOF.md: "Legacy publisher_id" → "Empty publisher_id"
- README.md: Reference DESIGN-v2.md instead of deleted addendum
- Delete DESIGN.md and 4 DESIGN-ADDENDUM-*.md files (preserved in
  the top-level streaming-comparisons repo)
- Delete stale TLA+ trace .bin files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Simplify the README to focus on essential API patterns. Rename
for_workflow() to create() throughout, condense the topics section,
remove the exactly-once and type-warning sections (these details
belong in DESIGN-v2.md), and update the API reference table with
current parameter signatures. Also fix whitespace alignment in
DESIGN-v2.md diagram.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…de pubsub state

The CAN example only showed pubsub_state being passed through, which could
mislead readers into thinking that's all that's needed. Updated to include
a representative application field (items_processed) to make it clear that
your own workflow state must also be carried across the CAN boundary.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
jssmith and others added 7 commits April 23, 2026 22:06
- ruff format: apply formatter to auto-generated style changes.
- pyright: replace dict literals for Response.text/usage with the
  pydantic model types (ResponseTextConfig, ResponseUsage,
  InputTokensDetails, OutputTokensDetails).
- basedpyright: suppress reportUnusedFunction on the private
  _encode_payload/_decode_payload helpers in pubsub._types (they are
  used from sibling modules, which basedpyright does not credit) and
  reportUnusedParameter on the CAN workflow run() input arg.
- pydocstyle: add docstrings to PubSubClient.__aenter__/__aexit__.
- typing.Self requires 3.11; import from typing_extensions like the
  rest of the SDK does.
- asyncio.timeout requires 3.11; fall back to async_timeout.timeout
  on 3.10 (async_timeout is an aiohttp transitive dep there).
On Python 3.10 CI, the `if sys.version_info >= (3, 11):` branch is
what basedpyright flags as unreachable. The ignore needs to be on
both branches so it is silent under every Python version in the
matrix.
The previous attempt placed the pragma on the indented `timeout as
_async_timeout` line, but basedpyright reports reportUnreachable
against the outer `from ... import (` line (the block-opening
statement), so the pragma had no effect. Move the ignore up to the
import line and combine with reportMissingImports there.

Locally verified clean on Python 3.10, 3.11, and 3.14 via
`uv run --python <ver> poe lint`.
Under parallel test load we saw test_poll_truncated_offset_returns_
application_error fail with "Cannot truncate to offset 3: only 0 items
exist" — traced to an activation-ordering race.

When a workflow receives an activation containing
[InitializeWorkflow, Signal(__pubsub_publish), Update(truncate)] in
one batch, _WorkflowInstanceImpl.activate groups signals and updates
into job_sets[1] and init into job_sets[2]. During _apply of
job_sets[1], __pubsub_publish (a dynamic signal registered inside
PubSub.__init__) has no handler yet, so it is buffered; truncate is
class-level @workflow.update, found in self._updates at activation
time, and its task is created immediately and queued in self._ready.
_run_once then lazy-instantiates the workflow, __init__ runs
set_signal_handler which dispatches the buffered signal via a new
task appended to self._ready after the update task. FIFO event-loop
dispatch runs truncate against an empty log first; the handler
raised ValueError which poisoned the whole workflow task.

Fixes:

1. temporalio/contrib/pubsub/_broker.py — PubSub.truncate now raises
   ApplicationError(type="TruncateOutOfRange", non_retryable=True)
   instead of ValueError when the offset is past the end of the log.
   Matches what _on_poll already does for TruncatedOffset and lets
   update handlers surface the error cleanly without failing the task.

2. tests/contrib/pubsub/test_pubsub.py — TruncateWorkflow seeds the
   log from @workflow.init with a prepub_count arg. Three tests
   (test_poll_truncated_offset_returns_application_error,
   test_subscribe_recovers_from_truncation, test_truncate_pubsub) now
   pass prepub_count=5 to start_workflow rather than sending a
   client-side __pubsub_publish signal, sidestepping the dynamic-
   signal-before-init race entirely.

3. Tighten the poll-after-truncation assertion to check
   cause.type == "TruncatedOffset", and add
   test_truncate_past_end_raises_application_error to cover the new
   TruncateOutOfRange branch of PubSub.truncate.

4. temporalio/contrib/pubsub/_client.py — pydoctor couldn't resolve
   :class:\`~temporalio.api.common.v1.Payload\` against the generated
   proto module and was failing the docs build; switched that one
   cross-ref to plain backticks.

Verified locally on Python 3.10 and 3.14: full lint clean, docs
build clean, and pubsub tests pass 27/27 across three parallel runs.
Add a visible "Gotcha" section to the contrib/pubsub README covering
the case where a custom synchronous update or signal handler reads
PubSub state and races a same-activation __pubsub_publish signal.

The race is inherent to registering __pubsub_publish dynamically
from @workflow.init: on the first activation the signal is buffered
until __init__ runs, and any class-level sync handler scheduled in
the same activation observes pre-publish state.

Framing in the README distinguishes the two cases where users do or
don't need to care:

- Independent producer/consumer shape (the common PubSub use): the
  handler already has to tolerate out-of-order arrival for reasons
  unrelated to this race, so no recipe is required.
- Sequential same-client publish->update ordering: use the recipe.

Recipe is a one-line "await asyncio.sleep(0)" at the top of the
handler, which is a pure asyncio yield with no Temporal timer, no
history events, and no server round trip. Explicit call-out that
workflow.sleep(0) is not a substitute.

Also extend SIGNAL-UPDATE-RACE.md with a "Zooming out" section that
explains why the application layer typically subsumes this race,
and update the Recommendation to treat the SDK-level dispatch fix
(option 4) as optional follow-up rather than a must-fix. The PubSub
class docstring gets a short note pointing at the README.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The existing TruncateWorkflow sidestepped the dynamic-signal-vs-update
race by seeding the log from @workflow.init via prepub_count. That
kept CI green but meant the test workflow did not exercise the
pattern the README now asks users to follow (await asyncio.sleep(0)
at the top of sync-shaped handlers reading PubSub state).

Make truncate async with the recipe so the test workflow is a living
example of the documented pattern, and simplify the docstring now
that the race is closed in the handler rather than avoided via
init-time seeding. prepub_count is kept as a convenience for the
error-path tests that just need deterministic log content.

All four truncate tests still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@workflow.init
def __init__(self, input: WorkflowInput) -> None:
self.items_processed = input.items_processed
self.pubsub = PubSub(prior_state=input.pubsub_state)
Copy link
Copy Markdown
Contributor

@JasonSteving99 JasonSteving99 Apr 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one suggestion I can think of to simplify the continue as new based on the conversation in today's code walkthrough meeting:
from typing import Callable
from temporalio import workflow

class PubSub[T]:
    def __init__(
        self, 
        *, 
        prior_state: PubSubState | None = None, 
        build_continue_as_new_args: Callable[[], list[T]]
    ):
        self._build_continue_as_new_args = build_continue_as_new_args
        # ... 

    async def continue_as_new(self):
        self.drain()
        await workflow.wait_condition(workflow.all_handlers_finished)
        workflow.continue_as_new(args=self._build_continue_as_new_args())


@workflow.defn
class MyWorkflow:
    @workflow.init
    def __init__(self, input: WorkflowInput) -> None:
        self.items_processed = input.items_processed
        self.pubsub: PubSub[WorkflowInput] = PubSub(
            prior_state=input.pubsub_state,
            build_continue_as_new_args=lambda: [WorkflowInput(
                items_processed=self.items_processed,
                pubsub_state=self.pubsub.get_state(),
            )],
        )

    @workflow.run
    async def run(self, input: WorkflowInput) -> None:
        # ... do work, updating self.items_processed ...

        if workflow.info().is_continue_as_new_suggested():
            await self.pubsub.continue_as_new()

One way I think this would be useful would be that it pushes the user to think about this continue as new scenario right out the gate. And by placing this build_continue_as_new_args in the constructor and simultaneously requiring the user to instantiate Pubsub(...) in the workflow's init, you end up forcing them to realize that they'd necessarily need to pass the pubsub state in their workflow input if they want to make use of CAN like this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could go either way on this. I like encouraging CAN upfront. However I also like keeping the state saving logic where it is in existing patterns. My inclination is to defer more changes to a bigger CAN rethink.

jssmith and others added 5 commits April 24, 2026 21:43
flush() is an explicit synchronization point: it returns once items
buffered at call time have been signaled to the workflow and
acknowledged by the server, and returns immediately when the buffer
is empty. It complements the two existing flush mechanisms
(force_flush=True on publish, context-manager exit) for the case
where the caller needs proof that prior publications landed but the
moment doesn't naturally correspond to a specific event.

Implementation reuses _flush() under the existing flush_lock, looped
while either _pending or _buffer is non-empty so the pending-vs-buffer
staging in _flush() can drain in one call.

DESIGN-v2 updates the API table and replaces the "no public flush()"
paragraph with a section framing the three complementary flush
mechanisms and when each is appropriate.

Test test_explicit_flush_barrier exercises the documented contract:
empty-buffer no-op, flush as a barrier with batch_interval=60s so a
regression hangs rather than passing on the timer, and idempotent
second flush.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Workflow-side (publisher_id, sequence) dedup is a polyfill for two
gaps in Temporal's built-in signal request_id dedup:

  1. The Python SDK does not expose request_id on
     WorkflowHandle.signal(), so cross-_flush() retries always allocate
     a fresh request_id and bypass server-side dedup even within a
     single run.
  2. pendingSignalRequestedIDs is per-run mutable state and is not
     copied across continue-as-new, so retries that straddle CAN are
     accepted as fresh signals (verified empirically on dev server and
     Temporal Cloud — see experiments/can-signal-dup/README.md).

When (1) and (2) are both fixed, the workflow-side check becomes
redundant. The dedup keys at both layers already align on
(publisher_id, sequence), so the migration is mechanical — pin
request_id=f"{publisher_id}:{seq}" in _flush(), drop the dedup
branch in _on_publish, retire publisher_sequences /
publisher_last_seen / publisher_ttl from PubSubState in a follow-up
wire-format pass.

Adds a "Future Work" subsection in DESIGN-v2 capturing the
prerequisites, the diff (what changes / stays / goes), and the
rollout sequencing. Adds short pointer comments at the two code
sites that would change so a future maintainer encounters the design
note at the right place.

No behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Convenience for single-topic subscribers — the common case. The
previous signature required wrapping a single topic in a list, which
is noisy at every call site. Internally we normalize to a list before
issuing the poll update; behavior for None / empty list / multi-topic
list is unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rename the wire-level handler identifiers to follow the existing
__temporal_ convention (__temporal_workflow_metadata,
__temporal_activity_definition, etc.) so they are clearly recognizable
as Temporal-internal and won't collide with user-defined handlers:

  __pubsub_publish -> __temporal_pubsub_publish
  __pubsub_poll    -> __temporal_pubsub_poll
  __pubsub_offset  -> __temporal_pubsub_offset

Updates the broker/client implementation, tests, and design docs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- _broker.py:_validate_poll — rename `payload` to `_payload`, drop
  `del payload` and `# noqa: ARG002`. The noqa was dead code: CI runs
  only `ruff check --select I` (import sort), so ARG rules never fire.
  Underscore prefix silences basedpyright's reportUnusedParameter
  cleanly.

- test_pubsub.py:ContinueAsNewTypedWorkflow.run — rename `input` to
  `_input` with `del _input`, drop the `type:ignore`. Now matches the
  existing `_prepub_count` pattern at TruncateWorkflow.run for the same
  @workflow.init/@workflow.run signature constraint.

- test_pubsub.py async_timeout import — declare `async-timeout` as an
  explicit dev dep gated on `python_version < '3.11'`, drop the
  `reportMissingImports` half of the test pragma. Closes the audit
  gap of relying on aiohttp's transitive on 3.10. Kept the
  `reportUnreachable` ignores — still needed because basedpyright
  resolves `sys.version_info` against its own runtime, not the matrix
  Python.

Verified `poe lint` clean on Python 3.10, 3.11, 3.14.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
model_activity = ModelActivity(model_provider)
new_activities = [
model_activity.invoke_model_activity,
model_activity.invoke_model_activity_streaming,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need both activities? They can't both be used in the same worker can they? I think we could check the model_params.enable_streaming to decide which of these activities is actually necessary

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true currently, but it isn't really necessary. If we ever expose TemporalModel like we do in others, it would be possible.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tconley1428 I think you're right. Esp based on a late realization I had in #1423 (comment) that presumably we should allow the user to opt into streaming by picking Runner::run_streamed rather than Runner::run instead of just implicitly changing the behavior of Runner::run when the plugin has a different flag passed.

model = self._model_provider.get_model(input.get("model_name"))
tools, handoffs = _build_tools_and_handoffs(input)

pubsub = PubSubClient.from_activity(batch_interval=0.1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should batch_interval be user-configurable? (similar q for adk activity above)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good callout.

from temporalio.contrib.pubsub import PubSubClient
from temporalio.workflow import ActivityConfig

logger = logging.getLogger(__name__)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see these logger = ... lines added in several files, but I don't see them used. Were these loggers for your local debugging?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, probably at some point. Will remove.

_raise_for_openai_status(e)

@activity.defn
@_auto_heartbeater
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need this @_auto_heartbeater decorator? I see the explicit heartbeats within the streaming loop. Seems more accurate to keep the explicit heartbeat instead of the implicit decorator heartbeats. Probably not both though, unless there was a specific idea you had in mind for this? (Note I don't think the adk activity has such an auto heartbeater)

prompt=input.get("prompt"),
):
activity.heartbeat()
pubsub.publish(EVENTS_TOPIC, event.model_dump_json().encode())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to allow a user to configure the topic name. We could provide a default "events" or "model_stream_events" though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, thanks.

pubsub.publish(
EVENTS_TOPIC,
_make_event("TEXT_COMPLETE", text=text_buffer),
force_flush=True,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need force_flush=True on this line, if the very next line also does force_flush=True?

On a related note, I don't see any analogous force_flush usages in the corresponding openai agents streaming activity. Should probably use the same flushing strategy in both activities.

Comment on lines +20 to +27
def _make_event(event_type: str, **data: object) -> bytes:
return json.dumps(
{
"type": event_type,
"timestamp": datetime.now(timezone.utc).isoformat(),
"data": data,
}
).encode()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a bit of a tricky interface to publish to users that want to consume these events. Here, you have to read both this function def and all the call sites to see what shape the "data" : ... field would have. Could you instead define a Pydantic model (or even a TypdedDict) that explicitly defines a union type for the possible shapes the data field can take?

pubsub.publish(
EVENTS_TOPIC,
_make_event(
"TOOL_CALL_START",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I absolutely agree that we should be emitting lifecycle events like this TOOL_CALL_START. But maybe we should save that for a followup PR after we think through all of the hook points and what they should mean semantically. I worry that we're tying our hands here to something we didn't necessarily have a design discussion on.

E.g. for this one in particular (TOOL_CALL_START), I'd think that we should use this to semantically signal that a tool has started executing, not just that the LLM has started emitting tokens related to a tool call.

Of course, this is moot if you accept the suggestion in my comment immediately above this one.

activity.heartbeat()
responses.append(response)

if response.content and response.content.parts:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the openai agents activity below you publish the full stream event, you don't unpack and inspect each event and publish specially named events. I think that's actually the better approach. I'd suggest just pubsub.publish(..., response)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, need to switch ADK to that approach as well.

Comment on lines +169 to +170
stream: Whether to stream the response (currently ignored; use the
``streaming`` constructor parameter instead).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this could be safely honored since it's so seamless to swap between the underlying activities

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't actually do the thing you would expect though, as it doesn't stream back.

Comment on lines 225 to 239
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the above

return await workflow.execute_activity_method(
                ModelActivity.invoke_model_activity_streaming,

in async def get_response(...) above needs to be moved down into this method instead of being shoehorned above into the non-streaming method. Thinking from the user perspective it's odd that you'd call Runner.run() and it would do something different depending on how you configure the plugin somewhere else entirely.

This stream_response method gets called when the user calls Runner::run_streamed instead of Runner::run.

By implementing streaming here the streaming activity impl wouldn't need to hack in the logic for collapsing the streamed outputs into a single synthetic ModelResponse, but would instead more naturally return a list of these TResponseStreamEvent that you get from the underlying llm call.

@_auto_heartbeater
async def invoke_model_activity_streaming(
self, input: ActivityModelInput
) -> ModelResponse:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that this is presumably the wrong interface for this streaming activity in the OpenAI agents sdk. I realized it's a bit odd that the streaming activity still ends up only returning a single output ModelResponse that it constructs synthetically and drops the intermediate outputs that the LLM responded with. I think the correct hook point is Model::stream_response instead.

Would look like:

def stream_response(self, ...) -> AsyncIterator[TResponseStreamEvent]:

you'd collect all of the TResponseStreamEvents into a list and return the whole list after publishing all of them rather than dropping the intermediate stream events as it's currently doing

name="Assistant",
instructions="You are a test agent.",
)
result = await Runner.run(starting_agent=agent, input=prompt)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the user should have to explicitly opt into streaming at the callsite via Runner::run_streamed instead of implicitly changing the behavior of Runner::run when the plugin is configured with enable_streaming=True.

Since the plugin in defined somewhere else entirely, we're tip-toeing into spooky action at a distance. I also wonder if we'd lead users into some non-determinism errors by simply changing enable_streaming=False -> enable_streaming=True. Would probably be the only place where a code change completely outside of any workflow code could cause nondeterminism.

@brianstrauch brianstrauch requested a review from Copilot April 27, 2026 22:24
if not llm:
raise ValueError(f"Failed to create LLM for model: {llm_request.model}")

pubsub = PubSubClient.from_activity(batch_interval=0.1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include units? I suspect the typical use case will be on the order of milliseconds, not seconds.

Suggested change
pubsub = PubSubClient.from_activity(batch_interval=0.1)
pubsub = PubSubClient.from_activity(batch_interval_ms=100)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally like the style of units in the argument name. It hasn't been a common convention in our codebase.

Copy link
Copy Markdown
Contributor Author

@jssmith jssmith Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it looks like the convention is to take a timedelta

"""
super().__init__(model=model_name)
self._model_name = model_name
self._streaming = streaming
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not always clear that this variable is a bool, renaming could help

Suggested change
self._streaming = streaming
self._is_streaming = is_streaming

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion.

next_retry_delay=retry_after,
) from e

if e.response.status_code in [408, 409, 429] or e.response.status_code >= 500:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth it to document these status codes? I'm not familiar with the first two.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion.

Comment on lines +286 to +297
if e.response.status_code in [408, 409, 429] or e.response.status_code >= 500:
raise ApplicationError(
f"Retryable OpenAI status code: {e.response.status_code}",
non_retryable=False,
next_retry_delay=retry_after,
) from e

raise ApplicationError(
f"Non retryable OpenAI status code: {e.response.status_code}",
non_retryable=True,
next_retry_delay=retry_after,
) from e
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if e.response.status_code in [408, 409, 429] or e.response.status_code >= 500:
raise ApplicationError(
f"Retryable OpenAI status code: {e.response.status_code}",
non_retryable=False,
next_retry_delay=retry_after,
) from e
raise ApplicationError(
f"Non retryable OpenAI status code: {e.response.status_code}",
non_retryable=True,
next_retry_delay=retry_after,
) from e
retryable = e.response.status_code in [408, 409, 429] or e.response.status_code >= 500
raise ApplicationError(
f"Non retryable OpenAI status code: {e.response.status_code}",
non_retryable=not retryable,
next_retry_delay=retry_after,
) from e

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new temporalio.contrib.pubsub module that implements a workflow-hosted pub/sub log with signal-based publishing and update-based long-poll subscription, intended for streaming incremental results out of long-running workflows.

Changes:

  • Introduces temporalio.contrib.pubsub (workflow-side broker, external client, shared wire/types) plus bundled design/usage docs.
  • Adds extensive end-to-end/integration tests for pubsub, plus new streaming integration tests for OpenAI Agents and Google ADK that publish stream events via pubsub.
  • Updates dev dependencies/lockfile to support the new test matrix (e.g., async-timeout for Python < 3.11).

Reviewed changes

Copilot reviewed 18 out of 20 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
temporalio/contrib/pubsub/_broker.py Workflow-side broker: signal/update/query handlers, log management, truncation, size-capped polling
temporalio/contrib/pubsub/_client.py External publisher/subscriber client with batching, flush semantics, dedup retry handling
temporalio/contrib/pubsub/_types.py Wire types and helpers (Payload base64 proto serialization)
temporalio/contrib/pubsub/__init__.py Public exports and module-level docs
temporalio/contrib/pubsub/README.md User-facing usage and API reference
temporalio/contrib/pubsub/DESIGN-v2.md Design document for protocol/semantics
temporalio/contrib/pubsub/SIGNAL-UPDATE-RACE.md Design note documenting activation ordering gotcha and mitigation
tests/contrib/pubsub/test_pubsub.py Comprehensive E2E integration tests for pubsub semantics
tests/contrib/pubsub/test_payload_roundtrip_prototype.py Regression coverage for the Payload-in-dataclass serialization constraints and chosen wire format
tests/contrib/openai_agents/test_openai_streaming.py Validates OpenAI Agents streaming publishes raw events via pubsub and preserves final result
tests/contrib/google_adk_agents/test_adk_streaming.py Validates ADK streaming publishes events via pubsub and preserves non-streaming behavior
temporalio/contrib/openai_agents/_invoke_model_activity.py Adds streaming-aware activity that publishes stream events via pubsub
temporalio/contrib/openai_agents/_temporal_openai_agents.py Registers both standard and streaming model activities
temporalio/contrib/openai_agents/_temporal_model_stub.py Executes streaming activity path when enabled; disallows local activities for streaming
temporalio/contrib/openai_agents/_model_parameters.py Adds enable_streaming parameter
temporalio/contrib/google_adk_agents/_model.py Adds streaming activity + TemporalModel toggle to use it and publish events via pubsub
temporalio/contrib/google_adk_agents/_plugin.py Registers the new ADK streaming activity
pyproject.toml Adds async-timeout to dev deps for Python < 3.11
uv.lock Lockfile updates reflecting new dev dependency and resolution options

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +325 to +333
elif self._buffer:
# New batch path
raw = self._buffer
self._buffer = []
batch = self._encode_buffer(raw)
seq = self._sequence + 1
self._pending = batch
self._pending_seq = seq
self._pending_since = time.monotonic()
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _flush(), the code clears self._buffer before calling _encode_buffer(raw). If payload conversion/encoding raises (e.g., a value isn’t serializable by the payload converter), the buffered items are dropped and cannot be retried or inspected. Consider only clearing the buffer after _encode_buffer succeeds (or restoring it on exception) so conversion failures don’t cause silent data loss.

Copilot uses AI. Check for mistakes.
Comment thread temporalio/contrib/pubsub/_client.py Outdated
Comment on lines +179 to +181
assert (
workflow_id is not None
), "from_activity requires an activity with a parent workflow"
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_activity() uses an assert to validate workflow_id presence. Asserts can be stripped with python -O, turning this into a potential None propagation instead of a clear error. Prefer raising a RuntimeError/ValueError with the same message rather than relying on assert for runtime validation.

Suggested change
assert (
workflow_id is not None
), "from_activity requires an activity with a parent workflow"
if workflow_id is None:
raise RuntimeError(
"from_activity requires an activity with a parent workflow"
)

Copilot uses AI. Check for mistakes.
Comment thread temporalio/contrib/pubsub/_types.py Outdated
Comment on lines +4 to +6
:class:`temporalio.api.common.v1.Payload` so that user codec chains
(encryption, PII-redaction, compression) apply per item. See
``DESIGN-v2.md`` §5 and ``docs/pubsub-payload-migration.md``.
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module docstring says user codec chains “apply per item”, but the implementation uses the payload converter to build per-item Payloads and relies on the SDK’s codec chain only at the signal/update envelope level (as described in _client.py / _broker.py). This wording is likely to mislead users about the effective codec boundaries; please update the docstring to match the actual behavior (envelope-level codec, per-item payload metadata preserved).

Suggested change
:class:`temporalio.api.common.v1.Payload` so that user codec chains
(encryption, PII-redaction, compression) apply per item. See
``DESIGN-v2.md`` §5 and ``docs/pubsub-payload-migration.md``.
:class:`temporalio.api.common.v1.Payload`. Per-item values are converted
to ``Payload`` objects by the payload converter, and the resulting
payload bytes/metadata are preserved per item. The SDK codec chain
(encryption, PII-redaction, compression) applies at the outer
signal/update envelope level, not separately to each embedded item
payload. See ``DESIGN-v2.md`` §5 and
``docs/pubsub-payload-migration.md``.

Copilot uses AI. Check for mistakes.
Comment thread temporalio/contrib/pubsub/__init__.py Outdated
Comment on lines +7 to +9
Payloads are Temporal ``Payload`` values. Publishing values go through
the client's data converter (including any configured codec chain);
subscribers can yield raw ``Payload`` or request a concrete type via
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This docstring says published values “go through the client's data converter (including any configured codec chain)”. In practice, PubSubClient.publish() converts values to Payload via the sync payload converter and does not run the codec chain per item (the codec runs once on the signal/update envelope). Consider rewording to avoid implying per-item codec behavior.

Suggested change
Payloads are Temporal ``Payload`` values. Publishing values go through
the client's data converter (including any configured codec chain);
subscribers can yield raw ``Payload`` or request a concrete type via
Payloads are Temporal ``Payload`` values. Published values are converted to
``Payload`` using the client's payload converter; any configured codec chain
applies when the surrounding signal/update envelope is encoded. Subscribers
can yield raw ``Payload`` or request a concrete type via

Copilot uses AI. Check for mistakes.
Packages the drain + wait-for-handlers + workflow.continue_as_new recipe
behind `await self.pubsub.continue_as_new(build_args)`. The builder is
typed `Callable[[PubSubState], Sequence[Any]]` and is invoked after
drain stabilizes with the post-drain state as its single argument, so
the snapshot ordering is structural rather than documented-by-prose.

The helper deliberately does not mirror workflow.continue_as_new's
12-param signature; workflows that need to override task_queue,
retry_policy, etc. fall back to the explicit drain/wait/CAN recipe.

Reverses the 2026-04-24 rejection in DESIGN-v2 Future Work: the
state-bound-builder shape resolves the "second footgun" objection to
the zero-arg-lambda form (caller could still write
self.pubsub.get_state() inside the lambda; with a state parameter the
helper controls the read).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@jssmith jssmith requested a review from a team as a code owner April 28, 2026 04:19
jssmith and others added 6 commits April 27, 2026 22:49
Brian noted on PR #1423 that timedelta is the convention in this
codebase for duration parameters. Migrating the public API surface:

- ``PubSubClient`` constructor / ``create`` / ``from_activity``:
  ``batch_interval`` and ``max_retry_duration`` now take ``timedelta``
  (previously ``float`` seconds).
- ``PubSubClient.subscribe``: ``poll_cooldown`` now takes ``timedelta``
  (previously ``float`` seconds).
- ``PubSub.get_state`` and ``PubSub.continue_as_new``: ``publisher_ttl``
  now takes ``timedelta`` (previously ``float`` seconds).

Internals continue to use ``.total_seconds()`` where needed (asyncio
timeouts, comparisons against ``workflow.time()``).

The TTL test workflow query keeps its arg as ``float`` seconds and
constructs the ``timedelta`` inside the handler — query payloads use
the default JSON converter, which does not serialize ``timedelta``.

Docs and examples in DESIGN-v2.md and README.md updated to use
``timedelta(...)`` literals.

This is contrib/preview, so no float-compat shim — callers that
previously passed numeric seconds need to migrate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously, streaming was a plugin-level flag (``enable_streaming``)
that silently rerouted ``Runner.run`` to a streaming activity which
synthesized a ``ModelResponse`` from the terminal
``ResponseCompletedEvent`` and dropped intermediate stream events.
Reviewers flagged two problems with that shape:

1. ``Runner.run`` callers did not opt into streaming behavior — flipping
   the plugin flag elsewhere changed what a workflow saw at runtime.
   That is the kind of spooky-action-at-a-distance that produces
   non-determinism if the flag is changed mid-history.

2. ``Model.stream_response`` is the natural hookpoint for streaming
   in the agents SDK. ``Runner.run_streamed`` already exposes the
   correct user-facing API — we just had not implemented it.

This commit reworks both:

- ``_TemporalModelStub.stream_response`` now executes the streaming
  activity and yields each event from its return list (an async
  generator). ``get_response`` keeps the non-streaming path; the
  ``enable_streaming`` branch is gone.
- ``invoke_model_activity_streaming`` returns
  ``list[TResponseStreamEvent]`` rather than a synthesized
  ``ModelResponse``, and publishes the raw events to the configured
  pub/sub topic via ``pubsub.publish(topic, event)`` (relying on the
  payload converter rather than manual JSON encoding).
- ``TemporalOpenAIRunner.run_streamed`` performs the same agent
  conversion + sandbox checks as ``run`` and forwards to the
  underlying ``AgentRunner.run_streamed``. Its ``run_loop_task`` is
  wrapped to mirror the ``AgentsException -> AgentsWorkflowError``
  rewrap done in ``run`` (the plugin registers
  ``AgentsWorkflowError`` in ``workflow_failure_exception_types``;
  without the wrap, durable failures would surface as retrying
  workflow-task errors instead of terminal workflow failures).
- The shared ``ActivityModelInput``-building logic is factored into
  ``_TemporalModelStub._build_activity_input`` so the two methods do
  not duplicate it.

New plugin config on ``ModelActivityParameters``:

- ``streaming_event_topic: str | None = "events"`` — set to ``None``
  to skip pub/sub entirely (no ``PubSubClient`` constructed; workflows
  that consume only via ``stream_events()`` then need no broker).
- ``streaming_event_batch_interval: timedelta = timedelta(milliseconds=100)``
  — interval for the pub/sub publisher's flusher.

The streaming activity keeps the ``@_auto_heartbeater`` decorator so
long initial-token latency or pauses between chunks do not trip
``heartbeat_timeout``. Explicit per-event ``activity.heartbeat()`` is
removed as redundant.

Status-code retry block in ``_raise_for_openai_status`` now carries a
short comment explaining 408/409/429 (Brian's review note).

Tests (``test_openai_streaming.py``) switch to ``Runner.run_streamed``
and verify that both the workflow-side iteration (via
``stream_events()`` exposed through a query) and the pub/sub side
channel observe the same native OpenAI events. A separate test
covers ``streaming_event_topic=None``.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors the OpenAI-side rework: streaming opt-in moves from a
constructor flag to the SDK-native API, and the streaming activity
publishes raw response objects rather than synthesizing custom event
types.

- ``TemporalModel.generate_content_async(stream=True)`` is now
  honored. Users opt into streaming via the ADK-native API path —
  e.g. ``RunConfig(streaming_mode=StreamingMode.SSE)`` on
  ``runner.run_async`` — rather than a plugin-level ``streaming``
  flag (which is removed).
- ``invoke_model_streaming`` publishes each ``LlmResponse`` directly
  via ``pubsub.publish(topic, response)``. The previously-synthesized
  ``LLM_CALL_START`` / ``TEXT_DELTA`` / ``TOOL_CALL_START`` /
  ``TEXT_COMPLETE`` / ``LLM_CALL_COMPLETE`` events are gone — those
  semantic distinctions are speculative until the lifecycle hook
  design is settled (deferred to a follow-up). Raw publishes also
  remove the redundant double ``force_flush`` and the unused
  ``logger`` import that the review flagged.

New constructor config on ``TemporalModel``:

- ``streaming_event_topic: str | None = "events"`` — set to ``None``
  to skip pub/sub entirely.
- ``streaming_event_batch_interval: timedelta = timedelta(milliseconds=100)``
  — interval for the publisher's flusher.

``_plugin.py`` annotates the activities list as
``list[Callable[..., Any]]`` because ``invoke_model`` and
``invoke_model_streaming`` now have different signatures
(streaming takes the topic and batch interval), so type inference
on the bare list literal would not satisfy
``SimplePlugin``'s parameter type.

Tests (``test_adk_streaming.py``) opt into streaming via
``RunConfig(streaming_mode=StreamingMode.SSE)`` and subscribe to the
pub/sub topic with ``result_type=LlmResponse``, asserting the raw
chunks round-trip intact.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the ad-hoc items_processed counter with a nested AppState
dataclass so the snapshot pattern reads symmetrically: app_state
beside pubsub_state, each round-tripped the same way. Also rename
the build_args lambda parameter to pubsub_state to disambiguate
which snapshot it carries.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Four small fixes from the PR review thread:

- ``_raise_for_openai_status``: consolidate the two ``raise
  ApplicationError`` branches into one with ``non_retryable=not
  retryable``. The retryable / non-retryable case now picks a label
  for the message string instead of duplicating the raise. Suggested
  by Brian (3150542100).

- ``PubSubClient._flush``: encode the buffer before clearing it. The
  prior order (``self._buffer = []`` then ``_encode_buffer(raw)``)
  silently dropped items if the payload converter raised — items
  were already detached from ``self._buffer`` and unrecoverable.
  Now encoding is attempted first; on exception the buffer is
  preserved for inspection or retry. Caught by Copilot
  (3150579181).

- ``PubSubClient.from_activity``: replace the ``assert workflow_id
  is not None`` check with an explicit ``raise RuntimeError``.
  ``-O`` strips asserts, which would turn the validation into a
  ``None`` propagation rather than a clear error. Caught by
  Copilot (3150579197).

- ``_types.py`` and ``__init__.py`` module docstrings: rewrite the
  codec-scope wording. The previous phrasing implied the codec
  chain runs per item, but the implementation runs codec once on
  the signal/update envelope (per-item ``Payload`` is built by the
  payload converter, not the codec chain). Caught by Copilot
  (3150579211, 3150579227).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
basedpyright on Python 3.10+ flags `typing.Sequence` as deprecated
(reportDeprecated). Switch to `collections.abc.Sequence`, which is the
canonical source post-PEP 585.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants