runtime-v2 — E2E Go runtime wiring, runtime-sidecar, and observability#2952
runtime-v2 — E2E Go runtime wiring, runtime-sidecar, and observability#2952jgraettinger wants to merge 21 commits into
Conversation
RCLOCK_BEGIN_MIN aliased KEY_BEGIN rather than KEY_BEGIN_MIN.
`bytesBehind` is typed as u64 to tally large values, but like `bytesTotal` and `docsTotal` it should serialize as a native JSON integer rather than a quoted string. Extend the build.rs codegen rewrite to cover `bytesBehind`.
Relaxed schemas strip validation keywords; `redact` belongs in that set. Pass it through to `RelaxedSchemaObj` with `skip_serializing` so it is dropped, and cover the behavior in both the models unit test and a validation scenario exercising a redacted key with a connector and relaxed write schema.
When a journal read skips the direct-fragment path and the broker returns a `file://` fragment URL, the fragment lives on the broker's local filesystem and the client has no transport to read it. With `do_not_proxy=true` and no open spool file, the broker's `serveRead` short-circuits after sending only fragment metadata, EOFs the stream, and the client loop spins. Clear `do_not_proxy` for `file://` fragments so the broker proxies the content instead.
The "nonce" name is unrelated to cryptography but trips GitHub's secret scanner. Renaming to "seq_no" sidesteps the false positive without changing protocol semantics.
…TH_TOKEN Replace the direct storage_mappings/grants inserts in `local:test-tenant` with a betaOnboard directive, mint a multi-use refresh token, and emit `~/flow-local/test-tenant.env`. Raise the new tenant's task/collection quotas so concurrent integration suites don't trip the default ceiling. flowctl: collapse FLOW_ACCESS_TOKEN into FLOW_AUTH_TOKEN, which now accepts either a JWT access token or a base64 refresh-token JSON; drop the now-unused base64 dependency. ci:dekaf-e2e and the dekaf e2e harness take FLOW_AUTH_TOKEN / FLOW_TEST_TENANT from that env file instead of a hard-coded system-user token. Also symlink CLAUDE.md -> AGENTS.md and add local/README.md documenting the local-stack systemd topology.
Use "runtime sidecar" consistently across the runtime-next README and the runtime-v2 plan, replacing the mix of "shuffle sidecar" and "runtime-sidecar process" phrasings.
Add `crates/runtime-sidecar/`, the per-machine Rust process that hosts the Shuffle and Shuffle Leader gRPC services for all V2 tasks on a reactor machine. It listens on a fixed fleet-wide sidecar port, optionally terminates TLS, and is supervised with the same lifetime as the reactor process(es) it serves.
Implement the "controller" portion of the V2 runtime, which initiates the shard RPC lifecycle and drives the Join/Joined => Opened sequence. The new runtime is selected only for tasks having the `enable-runtime-v2` feature flag. Also add a new --shuffle-port flag, used to generate accessible endpoints for the sidecar of a given reactor.
Add a Fixed binding that targets a single pre-existing journal by name, distinct from Mapped bindings which dynamically resolve documents to a collection's partitions (creating them on demand). Use Fixed for the ops stats journal, which activate pre-creates and which never needs partition mapping. This lets the runtime drop ops_stats_spec from the Task proto and removes catalog.LoadCollectionForJournal along with its Go caller, both of which existed only to recover the ops stats CollectionSpec so a Mapped binding could be narrowed to its single partition.
The V2 publisher creates destination partitions on demand, so it needs APPLY in addition to APPEND, plus LIST to watch journals. Have the runtime-next task service and the runtime-sidecar publisher factory request `APPEND | APPLY | LIST` jointly, and teach `authorize_task` to accept that combined capability as `models::Capability::Write`. Update the `TaskCollectionAuth` doc comment to reflect the broadened set.
`bindings` is linked into Go binaries through CGO, so there is no Rust binary entrypoint to install rustls' process-wide CryptoProvider. Install the `aws-lc-rs` provider lazily (once) when a task service is created, and enable rustls' `aws_lc_rs` feature.
… Materialize Drop `ops_stats_journal` from the `Task` proto: both the leader and the shard already receive it via shard labeling at Join time, so passing it through `Task` was redundant. Add `log_level` to the top-level `Materialize` message so the controller can supply it on unary `spec` / `validate` requests, which never see the Join-time labeling that carries log level for session-bound work. Session paths continue to read log level from labeling.
…scan Long-lived tasks accumulate FC: entries for producers that stopped writing (including ones that wrote CONTINUE_TXN docs but never committed them), inflating startup cost, RocksDB size, and abandoned-transaction replay distance. Add `recovery::prune_committed_frontier`, a pure pass over the decoded per-(journal, binding) FC: chunks that drops a producer only when, within its group, it is not FH:-protected, trails the newest last_commit by at least FRONTIER_PRUNE_CLOCK_HORIZON, and trails the furthest read offset by at least FRONTIER_PRUNE_BYTE_HORIZON. The scan path then issues a small (non-synced; this is GC, not a commit) delete batch for the pruned FC: keys before returning Recover, so the leader never observes them.
The close-policy comparisons used a strict `>`, so a threshold of zero could never be satisfied; use `>=` so zero-valued thresholds fire. Widen the `last_close_age` placeholder ceiling from 300s to `Duration::MAX`. The materialize stats doc also reported the `sourced` and `loaded` document/byte tallies under swapped `left`/`right` keys; correct the orientation.
The V2 leader stamps a synthetic "committed-close" source into the consumer.Checkpoint on each commit, recording the V2 RocksDB epoch. If a task is rolled back to the V1 runtime, V1 would otherwise carry that marker verbatim across its own commits; a later roll-forward to V2 would then mistake the stale marker for an in-sync RocksDB state, ignore the legacy_checkpoint, and resume from V2's stale frontier — reprocessing whatever V1 had advanced past. Strip the "committed-close" source on each V1 start-commit so a subsequent V2 startup treats V1's advanced sources as authoritative.
`NewStore` is invoked only on the initial PRIMARY transition, so a publish that flips the `enable-runtime-v2` flag on a running shard cannot otherwise reroute it between the V1 and V2 materialize runtimes. Have each app's `RestoreCheckpoint` surface a functional error when its shard's flag no longer matches the running runtime, forcing the controller to restart the shard so `NewStore` re-evaluates the flag and selects the correct runtime.
Each local data plane now runs a dedicated runtime-v2 sidecar on base_port+60, advertising the same per-plane FQDN and HMAC key as its reactors. Cap brokers and reactors at 10 instances each so the +0..+9 and +90..+99 ranges stay clear of the +50/+51/+52 Dekaf and +60 sidecar reservations. Also set CONSUMER_ZONE on reactors so sidecar peering resolves, prefer the musl target dir ahead of glibc on PATH so an over-broad `cargo build` doesn't shadow the musl flow-connector-init, disable color in sidecar logs, and document the preview-harness scope and the Supabase Docker-network connector wiring.
… and metrics
Add `crates/service-kit/`, a service-agnostic leaf crate that provides
the observability foundation for the runtime-v2 sidecar:
- `Registry` / `HandlerGuard`: a coarse lifecycle view of in-flight
units of work (label / phase / fields), each running inside its own
`tracing` handler span.
- `admin`: a loopback-only `axum` surface — an auto-refreshing HTML
dashboard, `/debug/handlers.json`, a per-handler drill-down page, and
a `POST /debug/handlers/{id}/level/{level}` runtime trace-level
control.
- `trace`: a `tracing_subscriber` layer-filter, composed with the base
`EnvFilter`, that admits events at or above an enclosing handler
span's override level.
- `event!`: a structured-event macro with lazy field capture, feeding
both `tracing` and per-handler breadcrumb rings shown on the
drill-down page.
- `metrics`: a Prometheus registry and `/metrics` route.
The crate is added inert; the following commit wires it into the
sidecar's Shuffle and Leader services.
…nt! instrumentation Wire the runtime-v2 sidecar's Shuffle and Shuffle Leader services into `service-kit`. Both gRPC services register their spawned handlers in a shared `Registry`, each running inside its handler span, and replace ad-hoc `tracing` calls in their actor loops with `service_kit::event!`. `runtime-sidecar` gains an `--admin-port` and rebuilds its tracing stack on a layered subscriber that hosts the loopback admin surface; local data planes bind it at base_port + 61. The shuffle `Shard` message gains an `id` field used to label handlers and metrics, and gazette journal append/read gain the instrumentation the event stream draws on.
The legacy V1 `consumer.Checkpoint` holds a complete committed frontier, whereas V2 writes `FC:` keys as per-transaction deltas. At a cutover the recovered `FC:` keys are not yet a sound recovery baseline. `leader::materialize::startup` now reconciles synchronously: after the connector Open/Opened exchange, when the final status of the recovered V1 checkpoint and any remote-authoritative connector checkpoint is known, it issues one cleanup `Persist` to shard zero. An authoritative checkpoint clears all `FC:` keys and rewrites the complete baseline. The per-task `drop-runtime-v1-rollback` shard-label flag tells the leader to stop maintaining the legacy `consumer.Checkpoint`, deleting the persisted key during startup in exchange for forfeiting V1 rollback. Adds `delete_committed_frontier` and `delete_legacy_checkpoint` to the `Persist` proto, renumbering subsequent fields.
ef13f33 to
ba98cb9
Compare
williamhbaker
left a comment
There was a problem hiding this comment.
LGTM! Just a few comments.
|
|
||
| let Self { | ||
| nonce: _, | ||
| seq_no: _, |
There was a problem hiding this comment.
Can't comment on the line specifically, but down on line 689 - if task.triggers.is_some() the triggers fire, even on empty transactions. It was a comment from the previous runtime-next PR.
| // and re-processing whatever V1 had advanced past. Strip the marker | ||
| // so V2 startup treats V1's advanced sources as authoritative. | ||
| let mut runtime_checkpoint = runtime_checkpoint.clone(); | ||
| runtime_checkpoint.sources.remove("committed-close"); |
There was a problem hiding this comment.
Should this strip be applied to the returned request as well?
I'm thinking about a scenario where a V2-flagged task reverts to a V1 and hits this stripping, but then the unstripped checkpoint from request is persisted in a remote-authoritative materialization. Then if the task crashes at an inopportune time when it is flipped back to V2, I think we could end up re-reading data based on the unstripped field from the connector recovered checkpoint?
This seems like a pretty unlikely case but the fix at least on the surface seems like it would be simple.
| ..Default::default() | ||
| })), | ||
| Ok(content) => { | ||
| metrics.append.increment(content.len() as u64); |
There was a problem hiding this comment.
I think this will over-count the metric on retries. Unless that is intentional...maybe we could increment the counter after the append has succeeded, a little further down?
| waitForRevision = rev + 1 | ||
| continue | ||
| } | ||
| if err := m.client.Send(&pr.Materialize{Join: join}); err != nil { |
There was a problem hiding this comment.
In the V1 runtime, we have this doSend helper for checking io.EOF and then reading the actual error.
Do we need to do that here too, to avoid returning the less useful io.EOF error instead of the real causal one? Same applies to the Task send below at L223.
Description
This branch completes the end-to-end wiring of the V2 materialization
runtime: the Go runtime controller
that drives the V2 shard RPC lifecycle, the
runtime-sidecarprocess thathosts the Shuffle and Shuffle Leader gRPC services, and a
service-kitobservability crate backing both.
The new code ships deployed inert: no task uses it unless the per-task
enable-runtime-v2feature flag is set on its shard labels.Status: functional on local stacks with initial testing workloads.
Not yet production-ready — needs further QA before production tasks
are migrated.
What's included
controller, which drive the
Join/Joined=>Openedshard RPClifecycle; the CGO
TaskServicev2 entry point; aRestoreCheckpointguard that restarts a shard when its
enable-runtime-v2flag is toggledmid-run; and lazy
aws-lc-rsrustls install for the CGO path.crates/runtime-sidecar/, the per-machine Rustprocess hosting the Shuffle and Shuffle Leader gRPC services, plus its
systemd unit and local data-plane wiring (
base_port+60).crates/service-kit/, a service-agnostic leaf crate(handler
Registry, loopback admin surface, runtime trace-levelcontrol,
event!macro, Prometheus metrics), wired into both sidecargRPC services.
BindingintoMappedandFixedvariants; grant V2 journal clients a jointAPPEND|APPLY|LISTcapability for on-demand partition creation.
stale committed-frontier entries during recovery scan; a cutover cleanup
Persistthat reconciles V1's complete checkpoint against V2's deltaFC:keys; rollback-marker stripping; and thedrop-runtime-v1-rollbackflag.
runtime.protocleanups, leader close-policy andstats-orientation fixes, a gazette
file://fragment proxy fix thatwas hot-looping local stacks, and assorted serde/label fixes.
dekaf-e2etest-tenant onboarding,FLOW_AUTH_TOKENunification, and new local-stack / preview-harnessdocs.
See individual commit messages for per-change rationale.