Conversation
Implements store-level primitives for parallel background processing of the CCC message log. Partitions are discovered via AND semantics (messages must have all partition attributes) and fetched via conditional AND (each message matches all partition attributes it has). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
claim_next now builds guard conditions from each handled_type × partition key_pair combination, enabling deciders to detect concurrent writes at append time via store.append(events, guard: result[:guard]). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Guard conditions are now derived from each message class's declared payload attributes via Message.to_conditions(**partition_attrs). This avoids nonsensical conditions (e.g. CourseCreated × user_id) while still covering all handled_types for conflict detection. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Documents conditional AND fetch, ConsistencyGuard from claim_next, Message.to_conditions, cached payload_attribute_names, and the SQLite DISTINCT requirement. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Enable causal chain tracing across CCC messages, matching the pattern from Sourced::Message. Both IDs default to the message's own id via Plumb's prepare_attributes hook. Store schema and serialization updated to persist and round-trip the new fields. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
highest_position on the consumer group tracks the furthest position ever successfully acked (advanced in ack, never decreased). claim_next returns replaying: true when all returned messages are at or below this watermark, meaning they have been processed before (e.g. after an offset reset). First-time processing is never replaying. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the plain Hash with an immutable ClaimResult value object (Data.define) for type safety and a cleaner API. Defined in store.rb alongside the Store class. Tests updated to use method access. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Implement the Decide/Evolve/React pattern for CCC's stream-less model. Deciders request history via context_for() conditions, Projectors evolve from claimed messages directly. Router orchestrates claim→handle→execute→ack with transactional action execution, partial batch ACK, and error recovery. New modules: Actions (Append/Sync), Consumer, Evolve, React, Sync. Store#read now returns ReadResult data struct. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Decider now pre-correlates events with the command, then passes correlated events as source: to reaction Appends. This gives the correct causation chain: cmd → event → reaction message, with correlation_id tracing back to the command throughout. Append gains source: (override correlation source) and correlated: (skip re-correlation) options. Router integration tests verify exact causation_id and correlation_id at each link in the chain. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add default empty handled_messages_for_evolve to CCC::Consumer so context_for works for reactors that just extend Consumer, define handled_messages, and implement handle_batch with manual action pairs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…partition reads Introduces Sourced::CCC.load(reactor_class, store, **partition_attrs) to load a reactor's evolved state from the store. Uses Store#read_partition for SQL-level AND filtering — a message is included only when every partition attribute it declares matches, avoiding loading irrelevant messages into memory. Guard's last_position covers the broader OR-context to prevent false concurrency conflicts. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ier) Signal-driven dispatch for CCC reactors: Store notifies on append/resume, NotificationQueuer routes types to reactors via WorkQueue, Workers drain partitions in bounded loops. Reuses generic primitives (WorkQueue, CatchUpPoller, InlineNotifier) with CCC-specific Worker and Dispatcher. Also adds batch_size: to Store#claim_next and Router#handle_next_for. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Crashed workers leave partitions permanently claimed. Add a ccc_workers table with heartbeat upserts and a StaleClaimReaper that periodically releases claims from workers that stopped heartbeating. Wire the reaper into the Dispatcher alongside the existing notifier and catchup poller. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Mirrors Sourced::Supervisor but simpler: no separate HouseKeepers since StaleClaimReaper is already embedded in the CCC Dispatcher. Takes router + config kwargs, sets signal handlers, and spawns Dispatcher into an executor. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ter, store, router, reset!) Wire CCC components (Supervisor, Dispatcher, Worker, Consumer, StaleClaimReaper) to pull defaults from CCC.config instead of Sourced.config, keeping executor on Sourced. Change CCC.load signature from positional store to keyword store: defaulting to global. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Uses Types::Interface to validate custom store objects implement the 12 required methods. Store instances and raw Sequel SQLite connections are still accepted directly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace NullGroup with GroupUpdater that accumulates stop/retry mutations for atomic persistence. Add Store#updating_consumer_group to load, yield, and persist group state. Gate claim_next on retry_at so retries are honoured at the database level. Clear retry_at and error_context in start_consumer_group. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Encapsulates the validate → load history → decide → append → ACK flow into a single call. Returns a HandleResult supporting destructuring: cmd, reactor, events = CCC.handle!(cmd, MyDecider) Adds Store#advance_offset to move consumer group offsets without a prior claim, so background workers skip already-handled commands. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
stop_consumer_group, start_consumer_group, reset_consumer_group, and consumer_group_active? now accept either a String or any object responding to #group_id (e.g. a reactor class). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1. Use Sequel's insert return value (last_insert_rowid) instead of a separate SELECT to get the message position. 2. Resolve key_pair_id via subquery in the message_key_pairs INSERT instead of a separate SELECT round-trip. Cuts per-message append cost by ~3x: 1 key: 0.10ms → 0.04ms 2 keys: 0.15ms → 0.05ms 3 keys: 0.20ms → 0.07ms Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace highest_position-based short-circuit with min(last_position) across all offsets to avoid skipping unprocessed partitions during catch-up. Add eager offset creation path: register_consumer_group accepts partition_by, and append creates offsets inline for registered groups. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ached watermark The min(last_position) vs max(position) comparison was apples-to-oranges: a per-partition offset vs a cross-partition message max. After all partitions catch up, the short-circuit never fired, causing full offset scans + discovery on every idle poll. Replace with last_nil_types_max_pos: when claim_next finds no work, cache the current types_max_pos. Next poll compares in O(1) — if no new messages were appended, return nil instantly. Also batch key_pair lookups in ensure_offsets_for_registered_groups to eliminate N+1 queries, and move has_offsets query into legacy path only. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Router now provides stop/reset/start_consumer_group methods that resolve a reactor class or group_id string, call the corresponding Store method, and invoke optional on_stop/on_reset/on_start callbacks on the reactor. Convenience delegators added to CCC module. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Changed from_position to use >= (asc) and <= (desc) instead of strict > / <. Updated the internal fetcher to offset by 1 to prevent duplicates during auto-pagination via to_enum. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds observability for consumer group partitions — lists offsets with group filtering, cursor-based pagination, and claim status fields. Includes OffsetsResult type with Enumerable, destructuring, and to_enum auto-pagination. Documents in CCC README. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The parameter uses exclusive (>) semantics, while read_all's from_position: uses inclusive (>=). Rename throughout the call stack to eliminate ambiguity: read, read_partition, query_messages, max_position_for, condition_position_subqueries, and check_conflicts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Returns an Enumerator that yields all registered message classes, walking this registry's own lookup then recursing into subclass registries (Command, Event, etc.). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tcher.spawn_into(task)
Replace the old stream+seq implementation with the stream-less, partition-based design from Sourced::CCC. All former CCC code now lives under the top-level Sourced namespace; the old Actor / Unit / ActiveRecord backend / Rails / Falcon / stream-based DurableWorkflow are deleted outright. - Rename internal handler prefixes ccc_decide / ccc_reaction / ccc_evolution to sourced_decide / sourced_reaction / sourced_evolution - Merge Sourced::Topology so the new stream-less build lives alongside the Prism source analyzer - Keep Injector, Types, ErrorStrategy, WorkQueue, CatchUpPoller, InlineNotifier, Async/Thread executors — all used by the promoted code - Replace root README with the promoted reactor docs, stripped of CCC terminology - Move examples/ccc_app to examples/app; delete examples that depended on removed APIs - Drop Sourced::InvalidMessageError, Types::TrailingModuleName, Types::ModuleToMethodName (no remaining callers) - All 472 previously-CCC specs pass against the new top-level API Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Keep the files locally for experimentation but exclude them from the repo until they're ready. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Keep benchmarks locally for experimentation but exclude them from the repo until they're ready. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Reactions no longer run inline with the command that produced the triggering event. The Decider's own subscription (via handled_messages_for_react) re-claims the event on the next cycle and runs react() in a separate handle_batch invocation. - The originating command's after_sync fires as soon as events commit instead of waiting for every reaction to finish. - Command and reactions are now in separate transactions; a failing reaction does not roll back the command. - handle_claim forwards claim.replaying to handle_batch; the reaction branch skips on replay, matching Projector. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The Decider's command branch no longer pre-correlates events with msg.correlate + correlated: true. Both command and reaction branches now delegate correlation to Actions::Append#execute via source:, matching Projector and DurableWorkflow. Drops the correlated: parameter from Actions.build_for, Append, and Schedule — Decider was the only caller that set it. Correlation now always runs at execute time against source: || source_message. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
position number is relative to queried partition, not global position
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This branch replaces the original actor/stream-based architecture with the CCC (Command-Context Consistency) design, promoting it to the top-level
Sourcednamespace.Decider,Projector,DurableWorkflow, and plainConsumerreactors, all supporting#handle_batchSourced::Store(with Sequel migration support) replacingSequelBackend/PGBackend/SQLiteBackend/TestBackendDispatcher,Worker,StaleClaimReaper,ScheduledMessagePoller, consumer-group lifecycle hooksCommandContextwith per-message /anyhooks,Sourced.handle!for synchronous dispatchActor, old backends, pubsub modules, Rails generators,Unit, old handler/consumer testsexamples/app/demonstrating the new APIstore_spec.rbalone has 2400+ lines)