diff --git a/.github/workflows/platform-test.yaml b/.github/workflows/platform-test.yaml index 87b322bb7f3..bbf4430182b 100644 --- a/.github/workflows/platform-test.yaml +++ b/.github/workflows/platform-test.yaml @@ -124,6 +124,7 @@ jobs: - run: mise run build:gazette - run: mise run build:flowctl-go - run: mise run ci:nextest-build + - run: mise run local:bigtable - run: mise run ci:nextest-run - name: Stripe integration test diff --git a/Cargo.lock b/Cargo.lock index 703697f0bbe..703bf2692b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1374,7 +1374,7 @@ dependencies = [ "serde", "serde_json", "sqlx", - "strum 0.26.3", + "strum 0.27.2", ] [[package]] @@ -1643,6 +1643,27 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" +[[package]] +name = "catalog-stats" +version = "0.0.0" +dependencies = [ + "anyhow", + "catalog-stats", + "chrono", + "coroutines", + "futures", + "futures-core", + "gcp_auth", + "googleapis-tonic-google-bigtable-v2", + "ops", + "serde", + "serde_json", + "thiserror 2.0.17", + "tokio", + "tonic", + "tuple", +] + [[package]] name = "cbindgen" version = "0.29.0" @@ -2102,6 +2123,7 @@ dependencies = [ "tables", "tempfile", "thiserror 2.0.17", + "time", "tokens", "tokio", "tokio-util", @@ -3652,6 +3674,32 @@ dependencies = [ "uuid", ] +[[package]] +name = "gcp_auth" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2b3d0b409a042a380111af38136310839af8ac1a0917fb6e84515ed1e4bf3ee" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "chrono", + "http 1.3.1", + "http-body-util", + "hyper 1.9.0", + "hyper-rustls 0.27.7", + "hyper-util", + "ring", + "rustls-pki-types", + "serde", + "serde_json", + "thiserror 2.0.17", + "tokio", + "tracing", + "tracing-futures", + "url", +] + [[package]] name = "generator" version = "0.8.8" @@ -3727,6 +3775,57 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "googleapis-tonic-google-api" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c4265caca40796c59dc0f21023fac0275a4206dfb369f34e0fa84a93183c11f" +dependencies = [ + "prost", + "prost-types", + "tonic", + "tonic-prost", +] + +[[package]] +name = "googleapis-tonic-google-bigtable-v2" +version = "0.36.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4416eb033bc8daa9e41b69d43b9b5d6db0fc160ac9a010b2e08f0b9ef89ac284" +dependencies = [ + "googleapis-tonic-google-api", + "googleapis-tonic-google-rpc", + "googleapis-tonic-google-type", + "prost", + "prost-types", + "tonic", + "tonic-prost", +] + +[[package]] +name = "googleapis-tonic-google-rpc" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e7cde0280d228535f2dc25186cb661314d96d337a6cb6bb644a2525bc99a056" +dependencies = [ + "prost", + "prost-types", + "tonic", + "tonic-prost", +] + +[[package]] +name = "googleapis-tonic-google-type" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "691c4dca3f36ca693f2644556d0be8303c2f92a9675b1a0cbc17951687e979db" +dependencies = [ + "prost", + "prost-types", + "tonic", + "tonic-prost", +] + [[package]] name = "governor" version = "0.10.2" @@ -5653,6 +5752,7 @@ name = "ops" version = "0.0.0" dependencies = [ "anyhow", + "chrono", "colored_json", "insta", "lazy_static", @@ -6229,9 +6329,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.14.1" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" +checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" dependencies = [ "bytes", "prost-derive", @@ -6261,9 +6361,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.14.1" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" +checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", "itertools 0.14.0", @@ -6274,9 +6374,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.14.1" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" +checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7" dependencies = [ "prost", ] @@ -9051,6 +9151,16 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + [[package]] name = "tracing-log" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 395880fc5d3..e28fbec84d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,8 @@ futures = "0" futures-core = "0" futures-util = "0" fxhash = "0" +gcp_auth = "0.12" +googleapis-tonic-google-bigtable-v2 = "0.36" graphql_client = "*" handlebars = "6" hex = "0" diff --git a/crates/catalog-stats/Cargo.toml b/crates/catalog-stats/Cargo.toml new file mode 100644 index 00000000000..4af501f4671 --- /dev/null +++ b/crates/catalog-stats/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "catalog-stats" +version.workspace = true +rust-version.workspace = true +edition.workspace = true +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true + +[lib] +doctest = false + +[features] +# Exposes `catalog_stats::test_util`, the emulator-seeding helpers used by +# integration tests. +test_util = [] + +[dependencies] +coroutines = { path = "../coroutines" } +ops = { path = "../ops" } +tuple = { path = "../tuple" } + +anyhow = { workspace = true } +chrono = { workspace = true } +futures = { workspace = true } +futures-core = { workspace = true } +gcp_auth = { workspace = true } +googleapis-tonic-google-bigtable-v2 = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true } + +[dev-dependencies] +catalog-stats = { path = ".", features = ["test_util"] } +ops = { path = "../ops" } +futures = { workspace = true } diff --git a/crates/catalog-stats/README.md b/crates/catalog-stats/README.md new file mode 100644 index 00000000000..d86e582dcbe --- /dev/null +++ b/crates/catalog-stats/README.md @@ -0,0 +1,98 @@ +# catalog-stats + +Read-side client for the BigTable tables that hold rolled-up catalog stats. + +## Inspecting a local stack + +The local `mise run local:stack` workflow materializes the same docs to +both BigTable (via this crate's tables) and Postgres (`catalog_stats` +table populated by `ops.us-central1.v1/stats-view`). The two are useful +to cross-check during development; the snippets below assume the default +local config. + +### BigTable + +`cbt` runs out of the `google/cloud-sdk` image and talks to the emulator +exposed by `mise run local:bigtable` on `localhost:8086`: + +```sh +cbt() { + docker run --rm --network host \ + -e BIGTABLE_EMULATOR_HOST=localhost:8086 \ + -e CLOUDSDK_CORE_PROJECT=estuary-local \ + google/cloud-sdk:latest \ + cbt -instance estuary-local "$@" +} + +# List the three grain tables. +cbt ls + +# Read the first few rows of a grain, restricted to the flow_document column. +cbt read catalog_stats_hourly columns=f:flow_document count=5 +``` + +cbt's `prefix=` / `start=` / `end=` arguments match against the raw row +key, which is FDB tuple-packed and begins with a `0x02` type byte — so a +literal `prefix=bobCo/` will NOT match. For ad-hoc filtering by catalog +name, it's easier to scan the table and post-filter the decoded JSON +(see the `jq` example below). + +The `flow_document` cell is printed Go-`%q`-quoted on a 4-space-indented +line; for the ASCII JSON we store, that quoting is also a valid JSON +string, so `jq fromjson` recovers the document: + +```sh +cbt read catalog_stats_hourly columns=f:flow_document count=1 \ + | grep -aE '^ "' | sed 's/^ //' | jq 'fromjson' +``` + +### Postgres + +The same docs land in the `catalog_stats` table under the local +`stats_loader` role: + +```sh +psql() { + PGPASSWORD=stats_loader_password command psql \ + -h localhost -U stats_loader -d postgres "$@" +} + +# A few recent rows, projected columns. +psql -c " + SELECT catalog_name, grain, ts, docs_written_by_me, bytes_written_by_me + FROM catalog_stats + ORDER BY ts DESC + LIMIT 10" + +# The full flow_document for a specific row. +psql -At -c " + SELECT jsonb_pretty(flow_document::jsonb) + FROM catalog_stats + WHERE catalog_name = 'bobCo/hw1/' + AND grain = 'hourly' + AND ts = '2026-05-14T18:00:00Z'" +``` + +### Comparing + +The two stores will usually agree (sans small propagation delay) on every +field except `_meta.uuid`, which each materialization assigns +independently. + +**Full dump.** Normalize both sides to one JSON-per-line, sort, and +diff. Identical rows produce byte-identical lines after `del(._meta)` +and key-sorting (`-S`), so `sort` lines them up without an explicit +join: + +```sh +for grain in hourly daily monthly; do + cbt read "catalog_stats_${grain}" columns=f:flow_document +done \ + | grep -aE '^ "' | sed 's/^ //' \ + | jq -cS 'fromjson | del(._meta)' | sort > /tmp/bt.jsonl + +psql -At -c "SELECT flow_document::text FROM catalog_stats" \ + | jq -cS 'del(._meta)' | sort > /tmp/pg.jsonl + +diff /tmp/bt.jsonl /tmp/pg.jsonl +``` diff --git a/crates/catalog-stats/src/lib.rs b/crates/catalog-stats/src/lib.rs new file mode 100644 index 00000000000..6c468876ffe --- /dev/null +++ b/crates/catalog-stats/src/lib.rs @@ -0,0 +1,1076 @@ +use anyhow::Context; +use futures::StreamExt; +use googleapis_tonic_google_bigtable_v2::google::bigtable::v2::{ + self as bt, bigtable_client, + read_rows_response::{self, cell_chunk}, + row_filter, row_range, value_range, +}; +use tonic::body; +use tonic::codegen; +use tonic::codegen::http; +use tonic::transport; +use tuple::TuplePack; + +#[cfg(feature = "test_util")] +pub mod test_util; + +pub use ops::catalog_stats::{CatalogStats, Grain, StatsSummary, TaskStats}; + +// Fixed column family written by `materialize-bigtable`. +const COLUMN_FAMILY: &str = "f"; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("gRPC code: {:?}, message: {}", .0.code(), .0.message())] + Grpc(#[from] tonic::Status), + #[error(transparent)] + Internal(#[from] anyhow::Error), +} + +/// RetryError is an Error encountered during a retry-able operation. +#[derive(Debug)] +pub struct RetryError { + /// Number of operation attempts since the last success. + pub attempt: usize, + /// Error encountered with this attempt. + pub inner: Error, +} + +impl Error { + pub fn with_attempt(self, attempt: usize) -> RetryError { + RetryError { + attempt, + inner: self, + } + } + + pub fn is_transient(&self) -> bool { + match self { + // These retryable codes are consistent with retry handling in the + // official Go Bigtable client library. + Error::Grpc(status) => matches!( + status.code(), + tonic::Code::Unavailable | tonic::Code::DeadlineExceeded | tonic::Code::Aborted, + ), + Error::Internal(_) => false, + } + } +} + +pub type Result = std::result::Result; + +/// RetryResult is a single Result of a retry-able operation. +pub type RetryResult = std::result::Result; + +/// BigTable-specific configuration. When `emulator_host` is `Some`, the +/// transport connects to `http://{host}` and skips ADC auth; otherwise it +/// connects to `https://bigtable.googleapis.com` with an ADC bearer token. +#[derive(Debug, Clone)] +pub struct BigtableConfig { + pub project: String, + pub instance: String, + pub emulator_host: Option, +} + +#[derive(Clone)] +pub struct Client { + client: bigtable_client::BigtableClient, + project: String, + instance: String, +} + +impl Client { + pub async fn connect(cfg: &BigtableConfig) -> anyhow::Result { + let endpoint = match &cfg.emulator_host { + Some(h) => format!("http://{h}"), + None => "https://bigtable.googleapis.com".to_string(), + }; + + let channel = transport::Channel::from_shared(endpoint.clone()) + .with_context(|| format!("invalid BigTable endpoint {endpoint:?}"))? + .connect() + .await + .with_context(|| format!("connecting to BigTable {endpoint:?}"))?; + let use_auth = cfg.emulator_host.is_none(); + let client = + bigtable_client::BigtableClient::new(AuthChannel::new(channel, use_auth).await?); + + Ok(Self { + client, + project: cfg.project.clone(), + instance: cfg.instance.clone(), + }) + } + + /// Streams every `catalog_stats_` row whose `catalog_name` is + /// one of `names`, and whose `ts` falls in the half-open interval + /// `[range.start, range.end)`. + /// + /// Timestamps should be aligned to the grain boundaries. Sub-ms precision + /// is truncated. + /// + /// Results are in lexicographic order by `names`, then ascending by `ts` + /// within each name. + pub fn fetch_range_for_names( + &self, + names: &[&str], + grain: Grain, + range: std::ops::Range>, + ) -> impl futures_core::Stream> + '_ { + let row_set = bt::RowSet { + row_keys: vec![], + row_ranges: names + .iter() + .map(|name| pack_row_range(name, &range)) + .collect(), + }; + + self.read_rows(grain, row_set, vec![]) + } + + /// Streams the row at exactly `(name, ts)` from `catalog_stats_` for + /// each name. + /// + /// Timestamps should be aligned to the grain boundaries. Sub-ms precision + /// is truncated. + /// + /// Results are in lexicographic order by `names`. + pub fn fetch_at_for_names( + &self, + names: &[&str], + grain: Grain, + ts: chrono::DateTime, + ) -> impl futures_core::Stream> + '_ { + let row_set = bt::RowSet { + row_keys: names.iter().map(|name| pack_row_key(name, ts)).collect(), + row_ranges: vec![], + }; + + self.read_rows(grain, row_set, vec![]) + } + + /// Streams every `catalog_stats_` row whose `catalog_name` starts + /// with `prefix`, and whose `ts` falls in the half-open interval + /// `[range.start, range.end)`. + /// + /// Includes rollup rows — callers that want only individual tasks must + /// filter names ending in `/` themselves. + /// + /// Timestamps should be aligned to the grain boundaries. Sub-ms precision + /// is truncated. + /// + /// Performance: `ts` is enforced server-side as a value filter, so Bigtable + /// scans every row in the prefix range regardless of `range`. The cost is + /// least significant on coarser grains like `Monthly` but grows on finer + /// grains. + /// + /// Results are in lexicographic order by prefix, then ascending by `ts` + /// within each prefix. + pub fn fetch_range_for_prefix( + &self, + prefix: &str, + grain: Grain, + range: std::ops::Range>, + ) -> impl futures_core::Stream> + '_ { + let row_set = bt::RowSet { + row_keys: vec![], + row_ranges: pack_name_prefix_range(prefix).into_iter().collect(), + }; + + let ts_match = rf(row_filter::Filter::Condition(Box::new( + row_filter::Condition { + predicate_filter: Some(Box::new(chain_filter(vec![ + rf(row_filter::Filter::FamilyNameRegexFilter( + COLUMN_FAMILY.to_string(), + )), + rf(row_filter::Filter::ColumnQualifierRegexFilter( + b"ts".to_vec(), + )), + rf(row_filter::Filter::ValueRangeFilter(bt::ValueRange { + start_value: Some(value_range::StartValue::StartValueClosed( + format_ts(range.start).into_bytes(), + )), + end_value: Some(value_range::EndValue::EndValueOpen( + format_ts(range.end).into_bytes(), + )), + })), + ]))), + true_filter: Some(Box::new(rf(row_filter::Filter::PassAllFilter(true)))), + false_filter: None, + }, + ))); + + self.read_rows(grain, row_set, vec![ts_match]) + } + + fn table_name(&self, grain: Grain) -> String { + format!( + "projects/{}/instances/{}/tables/catalog_stats_{grain}", + self.project, self.instance, + ) + } + + fn read_rows( + &self, + grain: Grain, + row_set: bt::RowSet, + additional_filters: Vec, + ) -> impl futures_core::Stream> + '_ { + let mut client = self.client.clone(); + let mut read = ReadRows::new(self.table_name(grain), row_set, additional_filters); + let mut attempt: usize = 0; + + coroutines::coroutine(move |mut co| async move { + // Each iteration issues one ReadRows RPC; on failure the inner + // state has already trimmed `row_set` past the watermark so we + // only re-read rows still owed. + loop { + let Some(request) = read.next_request() else { + return; + }; + + let stream = client + .read_rows(request) + .await + .map(|response| response.into_inner()); + let mut stream = std::pin::pin!(read.handle_stream(stream)); + + while let Some(action) = stream.next().await { + match action { + ReadResult::Yield(stats_doc) => { + () = co.yield_(Ok(stats_doc)).await; + attempt = 0; + } + ReadResult::Done => return, + ReadResult::Failed(err) => { + // Surface error to the caller, who can either drop + // to cancel or poll to retry. Non-transient errors + // (decode failures, protocol violations) end the + // stream immediately — retrying won't help. + let transient = err.is_transient(); + () = co.yield_(Err(err.with_attempt(attempt))).await; + if !transient { + return; + } + () = tokio::time::sleep(backoff(attempt)).await; + attempt += 1; + break; + } + } + } + } + }) + } +} + +fn backoff(attempt: usize) -> std::time::Duration { + match attempt { + 0 => std::time::Duration::ZERO, + 1 => std::time::Duration::from_millis(100), + 2 | 3 => std::time::Duration::from_millis(500), + _ => std::time::Duration::from_secs(1), + } +} + +// `%.3f` matches the JS `toISOString()` format that the L1 derivation uses. +fn format_ts(ts: chrono::DateTime) -> String { + ts.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string() +} + +fn pack_row_key(name: &str, ts: chrono::DateTime) -> Vec { + let key = (name, &format_ts(ts)); + key.pack_to_vec() +} + +fn pack_row_range( + name: &str, + range: &std::ops::Range>, +) -> bt::RowRange { + bt::RowRange { + start_key: Some(row_range::StartKey::StartKeyClosed(pack_row_key( + name, + range.start, + ))), + end_key: Some(row_range::EndKey::EndKeyOpen(pack_row_key(name, range.end))), + } +} + +fn pack_name_prefix_range(prefix: &str) -> Option { + if prefix.is_empty() { + // Return `None` for an empty prefix: the FDB encoding of `""` would + // produce a range covering every string-typed row in the table — i.e. + // "scan everything". + return None; + } + + let mut start_key = (prefix,).pack_to_vec(); + start_key.pop().expect("non-empty packed prefix"); // strip 0x00 terminator + + // FDB escapes any NUL byte in the string as `0x00 0xff`, so a `prefix` + // ending in NUL produces an encoded form that ends in `0xff` once the + // FDB terminator is stripped. Walk past trailing `0xff` bytes before + // incrementing to compute the lex successor; the leading `0x02` + // string-type tag keeps the strip from emptying the key. + let mut end_key = start_key.clone(); + while let Some(&0xff) = end_key.last() { + end_key.pop(); + } + *end_key.last_mut().expect("type tag prevents empty key") += 1; + + Some(bt::RowRange { + start_key: Some(row_range::StartKey::StartKeyClosed(start_key)), + end_key: Some(row_range::EndKey::EndKeyOpen(end_key)), + }) +} + +fn rf(filter: row_filter::Filter) -> bt::RowFilter { + bt::RowFilter { + filter: Some(filter), + } +} + +fn chain_filter(mut filters: Vec) -> bt::RowFilter { + if filters.len() == 1 { + filters.pop().unwrap() + } else { + rf(row_filter::Filter::Chain(row_filter::Chain { filters })) + } +} + +struct ReadRows { + table_name: String, + filter: Option, + row_set: bt::RowSet, + doc: Vec, + current_key: Vec, + watermark: Option>, +} + +#[derive(Debug)] +enum ReadResult { + Yield(CatalogStats), + Failed(Error), + Done, +} + +impl ReadRows { + fn new( + table_name: String, + row_set: bt::RowSet, + additional_filters: Vec, + ) -> Self { + // `CellsPerColumnLimitFilter(1)`: materialize-bigtable writes + // under MaxVersions(2); narrow to the latest cell. + let mut filters = vec![rf(row_filter::Filter::CellsPerColumnLimitFilter(1))]; + filters.extend(additional_filters); + filters.push(rf(row_filter::Filter::ColumnQualifierRegexFilter( + b"flow_document".to_vec(), + ))); + + Self { + table_name, + filter: Some(chain_filter(filters)), + row_set, + doc: Vec::new(), + current_key: Vec::new(), + watermark: None, + } + } + + fn next_request(&self) -> Option { + if self.row_set.row_keys.is_empty() && self.row_set.row_ranges.is_empty() { + // Request is complete. + return None; + } + + Some(bt::ReadRowsRequest { + table_name: self.table_name.clone(), + rows: Some(self.row_set.clone()), + filter: self.filter.clone(), + ..Default::default() + }) + } + + fn handle_stream<'a, S>( + &'a mut self, + stream: std::result::Result, + ) -> impl futures_core::Stream + 'a + where + S: futures_core::Stream> + + Unpin + + 'a, + { + coroutines::coroutine(move |mut co| async move { + let mut stream = match stream { + Ok(s) => s, + Err(status) => { + // Initial RPC failed before any data arrived. + () = co.yield_(ReadResult::Failed(Error::Grpc(status))).await; + return; + } + }; + + // Drain the response stream. Each response carries N chunks; + // chunks may begin, continue, reset, or commit a row. + while let Some(res) = stream.next().await { + let message = match res { + Ok(m) => m, + Err(status) => { + // Mid-stream failure may be retried. Drop any partial + // row buffer and trim `row_set` past the last yielded + // key. + self.doc.clear(); + if let Some(w) = &self.watermark { + self.row_set = trim_row_set(std::mem::take(&mut self.row_set), w); + } + () = co.yield_(ReadResult::Failed(Error::Grpc(status))).await; + return; + } + }; + for chunk in message.chunks { + // `on_chunk` returns `Some` only on `CommitRow`. + match self.on_chunk(chunk) { + Ok(Some(stats)) => () = co.yield_(ReadResult::Yield(stats)).await, + Ok(None) => {} + Err(err) => { + () = co.yield_(ReadResult::Failed(err)).await; + return; + } + } + } + } + + // Clean end-of-stream: no row should be mid-assembly. + if !self.doc.is_empty() { + let buffered = self.doc.len(); + () = co + .yield_(ReadResult::Failed(Error::Internal(anyhow::anyhow!( + "ReadRows stream ended with {buffered} bytes buffered for an uncommitted row", + )))) + .await; + return; + } + + () = co.yield_(ReadResult::Done).await; + }) + } + + /// Fold `chunk` into the in-progress row buffer. Returns the + /// decoded row when `chunk` carries `CommitRow`. + fn on_chunk(&mut self, chunk: read_rows_response::CellChunk) -> Result> { + // The first chunk of each row carries `row_key`; subsequent + // chunks for the same row leave it empty. + if !chunk.row_key.is_empty() { + self.current_key = chunk.row_key; + } + // The server-side filter strips all columns except + // `flow_document`, so every chunk's `value` belongs to that + // single cell. + self.doc.extend_from_slice(&chunk.value); + + match chunk.row_status { + Some(cell_chunk::RowStatus::ResetRow(true)) => { + // `ResetRow` discards in-progress state — the server is + // retrying this row from scratch. + self.doc.clear(); + Ok(None) + } + Some(cell_chunk::RowStatus::CommitRow(true)) => { + let result = serde_json::from_slice(&self.doc) + .context("decoding flow_document") + .map_err(Error::Internal)?; + self.doc.clear(); + + // This assumes Bigtable returns results in sorted order, which + // it does. + self.watermark = Some(std::mem::take(&mut self.current_key)); + + Ok(Some(result)) + } + _ => Ok(None), // Continue accumulation + } + } +} + +/// Narrows `row_set` to exclude every row at or before `after`. +/// +/// Called from the retry path: `after` is the highest row key already +/// yielded, so the returned `RowSet` covers only the rows still owed. +/// +/// Discrete `row_keys` are filtered point-wise. Each `row_range` is then +/// dropped, clamped, or kept as-is depending on where it sits relative +/// to `after`: +/// +/// ```text +/// after +/// │ +/// range A: [─────) │ drop (end ≤ after) +/// range B: [──────────────┼──────) clamp start to Open(after) +/// range C: │ [─────) keep as-is +/// ``` +fn trim_row_set(row_set: bt::RowSet, after: &[u8]) -> bt::RowSet { + let row_keys = row_set + .row_keys + .into_iter() + .filter(|k| k.as_slice() > after) + .collect(); + + let row_ranges = row_set + .row_ranges + .into_iter() + .filter_map(|mut range| { + match &range.end_key { + Some(row_range::EndKey::EndKeyClosed(k)) + | Some(row_range::EndKey::EndKeyOpen(k)) + if k.as_slice() <= after => + { + // Range ends at or below `after`; no rows remain. + return None; + } + _ => {} + } + + // Narrow the start when it still includes anything `<= after`. + let narrow = match &range.start_key { + None => true, + Some(row_range::StartKey::StartKeyClosed(k)) => k.as_slice() <= after, + Some(row_range::StartKey::StartKeyOpen(k)) => k.as_slice() < after, + }; + if narrow { + range.start_key = Some(row_range::StartKey::StartKeyOpen(after.to_vec())); + } + + Some(range) + }) + .collect(); + + bt::RowSet { + row_keys, + row_ranges, + } +} + +const BIGTABLE_DATA_SCOPE: &str = "https://www.googleapis.com/auth/bigtable.data"; + +#[derive(Clone)] +struct AuthChannel { + channel: transport::Channel, + provider: Option>, +} + +impl AuthChannel { + async fn new(channel: transport::Channel, use_auth: bool) -> anyhow::Result { + let provider = if use_auth { + Some( + gcp_auth::provider() + .await + .context("acquiring GCP credentials provider")?, + ) + } else { + None + }; + Ok(Self { channel, provider }) + } +} + +impl codegen::Service> for AuthChannel { + type Response = http::Response; + type Error = codegen::StdError; + type Future = std::pin::Pin< + Box< + dyn std::future::Future< + Output = std::result::Result, codegen::StdError>, + > + Send, + >, + >; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.channel.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, mut req: http::Request) -> Self::Future { + let next = self.channel.clone(); + let mut channel = std::mem::replace(&mut self.channel, next); + let provider = self.provider.clone(); + Box::pin(async move { + if let Some(provider) = provider { + let token = provider.token(&[BIGTABLE_DATA_SCOPE]).await?; + let mut header = + http::HeaderValue::from_str(&format!("Bearer {}", token.as_str()))?; + header.set_sensitive(true); + req.headers_mut() + .insert(http::header::AUTHORIZATION, header); + } + channel.call(req).await.map_err(Into::into) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + #[test] + fn format_ts_matches_l2_iso_string() { + let ts = chrono::Utc.with_ymd_and_hms(2026, 5, 5, 18, 0, 0).unwrap(); + assert_eq!(format_ts(ts), "2026-05-05T18:00:00.000Z"); + + // Whole-millisecond precision round-trips through the formatter. + let ts_ms = ts + chrono::Duration::milliseconds(123); + assert_eq!(format_ts(ts_ms), "2026-05-05T18:00:00.123Z"); + + // Sub-ms precision is silently truncated, not rounded. + let ts_us = ts + chrono::Duration::microseconds(123_999); + assert_eq!(format_ts(ts_us), "2026-05-05T18:00:00.123Z"); + } + + #[test] + fn name_prefix_range_brackets_matching_row_keys() { + let ts = chrono::Utc.with_ymd_and_hms(2026, 5, 5, 18, 0, 0).unwrap(); + let cases: &[(&str, &[(&str, bool)])] = &[ + ( + "acmeCo/", + &[ + ("acmeCo/", true), // tenant rollup row + ("acmeCo/foo", true), // per-task row under the tenant + ("acmeCo0foo", false), // sibling exactly at the exclusive end-key boundary (`/`+1 == `0`) + ("acmeCo:foo", false), // sibling just past the boundary (`:` > `0`) + ], + ), + ( + "acm", + &[ + ("acmeCo/foo", true), // partial-name match + ("acn", false), // `m`+1 == `n`, just past the range + ], + ), + ( + "acmé", + &[ + ("acmé/foo", true), // under the prefix + ("acmé", true), // exact match + ("acmê", false), // next Unicode codepoint, sibling + ], + ), + ( + "foo\0", + &[ + ("foo\0bar", true), // longer match through the NUL escape + ("foo\0", true), // exact match + ("foo", false), // shorter, falls before the prefix + ("foo\x01", false), // sibling — 0x01 sorts after the escape + ], + ), + ]; + + for (prefix, candidates) in cases { + let bt::RowRange { + start_key: Some(row_range::StartKey::StartKeyClosed(start)), + end_key: Some(row_range::EndKey::EndKeyOpen(end)), + } = pack_name_prefix_range(prefix).expect("non-empty prefix") + else { + panic!("expected [closed, open) range for prefix {prefix:?}"); + }; + + for (name, want_in) in *candidates { + let key = pack_row_key(name, ts); + assert_eq!( + key >= start && key < end, + *want_in, + "prefix {prefix:?}, name {name:?}" + ); + } + } + } + + #[test] + fn name_prefix_range_empty_prefix_yields_none() { + assert!(pack_name_prefix_range("").is_none()); + } + + #[test] + fn trim_row_set_drops_ranges_ending_at_or_before_after() { + let mk_range = + |start: Option, end: Option| bt::RowRange { + start_key: start, + end_key: end, + }; + let closed_start = |k: &[u8]| Some(row_range::StartKey::StartKeyClosed(k.to_vec())); + let closed_end = |k: &[u8]| Some(row_range::EndKey::EndKeyClosed(k.to_vec())); + let open_end = |k: &[u8]| Some(row_range::EndKey::EndKeyOpen(k.to_vec())); + + let after = b"m"; + let cases: &[(&str, bt::RowRange)] = &[ + ( + "open end strictly before after", + mk_range(closed_start(b"a"), open_end(b"c")), + ), + ( + "open end at after (range ends below after)", + mk_range(closed_start(b"a"), open_end(after)), + ), + ( + "closed end at after (only row is `after` itself)", + mk_range(closed_start(b"a"), closed_end(after)), + ), + ]; + + for (name, range) in cases { + let got = trim_row_set( + bt::RowSet { + row_keys: vec![], + row_ranges: vec![range.clone()], + }, + after, + ); + assert!( + got.row_ranges.is_empty(), + "{name}: expected dropped range, got {got:?}", + ); + } + } + + #[test] + fn trim_row_set_narrows_starts_at_or_before_after() { + let mk_range = + |start: Option, end: Option| bt::RowRange { + start_key: start, + end_key: end, + }; + let closed_start = |k: &[u8]| Some(row_range::StartKey::StartKeyClosed(k.to_vec())); + let open_start = |k: &[u8]| Some(row_range::StartKey::StartKeyOpen(k.to_vec())); + let closed_end = |k: &[u8]| Some(row_range::EndKey::EndKeyClosed(k.to_vec())); + let open_end = |k: &[u8]| Some(row_range::EndKey::EndKeyOpen(k.to_vec())); + + let after = b"m"; + let cases: &[(&str, bt::RowRange, Option)] = &[ + ( + "unbounded start narrows to open(after)", + mk_range(None, open_end(b"z")), + open_start(after), + ), + ( + "closed start exactly at after narrows (would otherwise re-emit it)", + mk_range(closed_start(after), open_end(b"z")), + open_start(after), + ), + ( + "closed start past after is left alone", + mk_range(closed_start(b"p"), open_end(b"z")), + closed_start(b"p"), + ), + ( + "open start strictly before after narrows", + mk_range(open_start(b"a"), open_end(b"z")), + open_start(after), + ), + ( + "open start exactly at after is already correct", + mk_range(open_start(after), open_end(b"z")), + open_start(after), + ), + ( + "closed end past after keeps the range and narrows start", + mk_range(closed_start(b"a"), closed_end(b"z")), + open_start(after), + ), + ]; + + for (name, range, expected_start) in cases { + let got = trim_row_set( + bt::RowSet { + row_keys: vec![], + row_ranges: vec![range.clone()], + }, + after, + ); + assert_eq!(got.row_ranges.len(), 1, "{name}"); + assert_eq!(&got.row_ranges[0].start_key, expected_start, "{name}"); + assert_eq!( + got.row_ranges[0].end_key, range.end_key, + "{name}: end_key must not be mutated", + ); + } + } + + #[test] + fn trim_row_set_filters_discrete_keys() { + let trimmed = trim_row_set( + bt::RowSet { + row_keys: vec![b"a".to_vec(), b"m".to_vec(), b"n".to_vec(), b"z".to_vec()], + row_ranges: vec![], + }, + b"m", + ); + assert_eq!(trimmed.row_keys, vec![b"n".to_vec(), b"z".to_vec()]); + } + + // ──────────────────────────────────────────────────────────────────── + // `ReadRows` state-machine tests + // + // Drive `handle_stream` with a scripted RPC outcome and observe the + // resulting `ReadResult` actions and post-attempt `next_request` + // shape. + // ──────────────────────────────────────────────────────────────────── + + fn base_ts() -> chrono::DateTime { + chrono::Utc.with_ymd_and_hms(2026, 5, 5, 18, 0, 0).unwrap() + } + + fn mk_stats(name: &str, ts: chrono::DateTime) -> CatalogStats { + CatalogStats { + meta: ops::Meta { + uuid: "00000000-0000-0000-0000-000000000000".to_string(), + }, + catalog_name: name.to_string(), + ts, + stats_summary: StatsSummary::default(), + task_stats: TaskStats::default(), + } + } + + fn mk_chunk( + row_key: &[u8], + value: &[u8], + status: Option, + ) -> read_rows_response::CellChunk { + read_rows_response::CellChunk { + row_key: row_key.to_vec(), + value: value.to_vec(), + row_status: status, + ..Default::default() + } + } + + fn commit(key: &[u8], stats: &CatalogStats) -> read_rows_response::CellChunk { + let doc = serde_json::to_vec(stats).unwrap(); + mk_chunk(key, &doc, Some(cell_chunk::RowStatus::CommitRow(true))) + } + + fn reset_chunk() -> read_rows_response::CellChunk { + mk_chunk(b"", b"", Some(cell_chunk::RowStatus::ResetRow(true))) + } + + fn mk_response(chunks: Vec) -> bt::ReadRowsResponse { + bt::ReadRowsResponse { + chunks, + ..Default::default() + } + } + + // Serialize `stats` and split it across `n` chunks; only the first carries + // `key`, only the last carries `CommitRow(true)`. + fn split_commit( + key: &[u8], + stats: &CatalogStats, + n: usize, + ) -> Vec { + let doc = serde_json::to_vec(stats).unwrap(); + (0..n) + .map(|i| { + let start = i * doc.len() / n; + let end = (i + 1) * doc.len() / n; + let row_key: &[u8] = if i == 0 { key } else { b"" }; + let status = (i + 1 == n).then_some(cell_chunk::RowStatus::CommitRow(true)); + mk_chunk(row_key, &doc[start..end], status) + }) + .collect() + } + + fn read_over_name_range(name: &str, hours: i64) -> ReadRows { + let base = base_ts(); + let row_set = bt::RowSet { + row_keys: vec![], + row_ranges: vec![pack_row_range( + name, + &(base..base + chrono::Duration::hours(hours)), + )], + }; + ReadRows::new("test-table".to_string(), row_set, vec![]) + } + + fn next_request_range(state: &ReadRows) -> bt::RowRange { + let req = state.next_request().expect("state should not be done"); + let mut row_set = req.rows.expect("request must have rows"); + assert_eq!(row_set.row_ranges.len(), 1, "single-range fixture"); + row_set.row_ranges.pop().unwrap() + } + + async fn drive( + state: &mut ReadRows, + stream: std::result::Result< + Vec>, + tonic::Status, + >, + ) -> Vec { + let stream = stream.map(futures::stream::iter); + let res = state.handle_stream(stream); + let mut res = std::pin::pin!(res); + let mut out = Vec::new(); + while let Some(action) = res.next().await { + out.push(action); + } + out + } + + fn assert_yields_then_done(label: &str, actions: &[ReadResult], expected: &[CatalogStats]) { + assert_eq!( + actions.len(), + expected.len() + 1, + "{label}: action count mismatch: {actions:?}", + ); + for (i, (got, want)) in actions.iter().zip(expected).enumerate() { + let ReadResult::Yield(s) = got else { + panic!("{label}: action {i} is not Yield: {got:?}"); + }; + assert_eq!(s, want, "{label}: yielded row {i}"); + } + assert!( + matches!(actions.last(), Some(ReadResult::Done)), + "{label}: trailing action must be Done, got {actions:?}", + ); + } + + #[tokio::test] + async fn happy_path_streams() { + let ts = base_ts(); + let stats = mk_stats("foo", ts); + let key = pack_row_key("foo", ts); + let stats_a = mk_stats("acmeCo/a", ts); + let stats_b = mk_stats("acmeCo/b", ts); + let key_a = pack_row_key("acmeCo/a", ts); + let key_b = pack_row_key("acmeCo/b", ts); + let split2 = split_commit(&key, &stats, 2); + + // (label, name range, responses, expected rows) + let cases: Vec<(&str, &str, Vec, Vec)> = vec![ + ("empty stream", "foo", vec![], vec![]), + ( + "single committed row", + "foo", + vec![mk_response(vec![commit(&key, &stats)])], + vec![stats.clone()], + ), + ( + "doc split across chunks in one response", + "foo", + vec![mk_response(split_commit(&key, &stats, 3))], + vec![stats.clone()], + ), + ( + "doc split across two responses", + "foo", + vec![ + mk_response(vec![split2[0].clone()]), + mk_response(vec![split2[1].clone()]), + ], + vec![stats.clone()], + ), + ( + "ResetRow discards partial buffer; retry commits cleanly", + "foo", + vec![mk_response(vec![ + mk_chunk(&key, b"{garbled", None), + reset_chunk(), + commit(&key, &stats), + ])], + vec![stats.clone()], + ), + ( + "consecutive rows yield independently", + "acmeCo", + vec![mk_response(vec![ + commit(&key_a, &stats_a), + commit(&key_b, &stats_b), + ])], + vec![stats_a.clone(), stats_b.clone()], + ), + ]; + + for (label, name, responses, expected) in cases { + let mut state = read_over_name_range(name, 1); + let rpc: Vec> = + responses.into_iter().map(Ok).collect(); + let actions = drive(&mut state, Ok(rpc)).await; + assert_yields_then_done(label, &actions, &expected); + } + } + + #[tokio::test] + async fn initial_status_yields_failed() { + let mut read = read_over_name_range("foo", 1); + let actions = drive(&mut read, Err(tonic::Status::unavailable("nope"))).await; + let [ReadResult::Failed(Error::Grpc(status))] = &actions[..] else { + panic!("expected [Failed(Grpc)], got {actions:?}"); + }; + assert_eq!(status.code(), tonic::Code::Unavailable); + } + + #[tokio::test] + async fn resume_after_mid_range_yield() { + let stats = mk_stats("foo", base_ts()); + let key = pack_row_key("foo", base_ts()); + + let mut read = read_over_name_range("foo", 3); + let stream = drive( + &mut read, + Ok(vec![ + Ok(mk_response(vec![commit(&key, &stats)])), + Err(tonic::Status::unavailable("")), + ]), + ) + .await; + + let [ + ReadResult::Yield(s), + ReadResult::Failed(Error::Grpc(status)), + ] = &stream[..] + else { + panic!("expected [Yield, Failed(Grpc)], got {stream:?}"); + }; + assert_eq!(s, &stats); + assert_eq!(status.code(), tonic::Code::Unavailable); + + let range = next_request_range(&read); + assert_eq!( + range.start_key, + Some(row_range::StartKey::StartKeyOpen(key)), + "resume must start strictly past yielded key", + ); + } + + #[tokio::test] + async fn resume_after_yield_at_range_end_exhausts_row_set() { + let stats = mk_stats("foo", base_ts() + chrono::Duration::hours(1)); + let key = pack_row_key("foo", base_ts() + chrono::Duration::hours(1)); + + let mut read = read_over_name_range("foo", 1); + let stream = drive( + &mut read, + Ok(vec![ + Ok(mk_response(vec![commit(&key, &stats)])), + Err(tonic::Status::unavailable("")), + ]), + ) + .await; + + let [ + ReadResult::Yield(s), + ReadResult::Failed(Error::Grpc(status)), + ] = &stream[..] + else { + panic!("expected [Yield, Failed(Grpc)], got {stream:?}"); + }; + assert_eq!(s, &stats); + assert_eq!(status.code(), tonic::Code::Unavailable); + + assert!( + read.next_request().is_none(), + "trim past range end must leave row_set empty", + ); + } +} diff --git a/crates/catalog-stats/src/test_util.rs b/crates/catalog-stats/src/test_util.rs new file mode 100644 index 00000000000..5c226221640 --- /dev/null +++ b/crates/catalog-stats/src/test_util.rs @@ -0,0 +1,110 @@ +use crate::{CatalogStats, Client, Grain, pack_name_prefix_range}; +use anyhow::Context; +use googleapis_tonic_google_bigtable_v2::google::bigtable::v2::{self as bt, mutation, row_filter}; + +/// Seeds one row per `(grain, stats)` pair, writing into +/// `catalog_stats_`. +pub async fn seed_rows(client: &Client, rows: &[(Grain, CatalogStats)]) -> anyhow::Result<()> { + let mut bt_client = client.client.clone(); + let set_cell = |qualifier: &str, value: &[u8]| bt::Mutation { + mutation: Some(mutation::Mutation::SetCell(mutation::SetCell { + family_name: crate::COLUMN_FAMILY.to_string(), + column_qualifier: qualifier.as_bytes().to_vec(), + timestamp_micros: -1, + value: value.to_vec(), + })), + }; + + for (grain, stats) in rows { + let row_key = crate::pack_row_key(&stats.catalog_name, stats.ts); + let ts_str = crate::format_ts(stats.ts); + let flow_document = serde_json::to_vec(stats).context("encoding flow_document")?; + + let mutations = vec![ + set_cell("catalog_name", stats.catalog_name.as_bytes()), + set_cell("ts", ts_str.as_bytes()), + set_cell("flow_document", &flow_document), + ]; + + bt_client + .mutate_row(bt::MutateRowRequest { + table_name: client.table_name(*grain), + row_key, + mutations, + ..Default::default() + }) + .await + .with_context(|| format!("MutateRow for {} {} {grain}", stats.catalog_name, ts_str))?; + } + + Ok(()) +} + +/// Deletes every row in each `catalog_stats_` table. +pub async fn delete_all_rows(client: &Client) -> anyhow::Result<()> { + delete_matching_rows(client, bt::RowSet::default()).await +} + +/// Deletes every row in each `catalog_stats_` table whose +/// `catalog_name` starts with `prefix`. Empty `prefix` is a no-op (mirrors +/// the public API's empty-prefix guard) — callers that genuinely want a +/// full wipe should use `delete_all_rows`. +pub async fn delete_rows_with_prefix(client: &Client, prefix: &str) -> anyhow::Result<()> { + let Some(row_range) = pack_name_prefix_range(prefix) else { + return Ok(()); + }; + delete_matching_rows( + client, + bt::RowSet { + row_keys: vec![], + row_ranges: vec![row_range], + }, + ) + .await +} + +async fn delete_matching_rows(client: &Client, row_set: bt::RowSet) -> anyhow::Result<()> { + let mut bt_client = client.client.clone(); + + for grain in [Grain::Hourly, Grain::Daily, Grain::Monthly] { + let table_name = client.table_name(grain); + + let mut stream = bt_client + .read_rows(bt::ReadRowsRequest { + table_name: table_name.clone(), + rows: Some(row_set.clone()), + filter: Some(bt::RowFilter { + filter: Some(row_filter::Filter::StripValueTransformer(true)), + }), + ..Default::default() + }) + .await + .with_context(|| format!("ReadRows for delete_matching_rows ({grain})"))? + .into_inner(); + + // Only the first chunk of each row carries `row_key`; later chunks + // for the same row leave it empty. + while let Some(message) = stream.message().await.context("ReadRows stream error")? { + for chunk in message.chunks { + if chunk.row_key.is_empty() { + continue; + } + bt_client + .mutate_row(bt::MutateRowRequest { + table_name: table_name.clone(), + row_key: chunk.row_key, + mutations: vec![bt::Mutation { + mutation: Some(mutation::Mutation::DeleteFromRow( + mutation::DeleteFromRow {}, + )), + }], + ..Default::default() + }) + .await + .context("MutateRow DeleteFromRow")?; + } + } + } + + Ok(()) +} diff --git a/crates/catalog-stats/tests/client.rs b/crates/catalog-stats/tests/client.rs new file mode 100644 index 00000000000..7e0861b0c1f --- /dev/null +++ b/crates/catalog-stats/tests/client.rs @@ -0,0 +1,323 @@ +use catalog_stats::test_util; +use chrono::TimeZone; +use futures::TryStreamExt; + +async fn connect() -> catalog_stats::Client { + catalog_stats::Client::connect(&catalog_stats::BigtableConfig { + project: "estuary-local".to_string(), + instance: "estuary-local".to_string(), + emulator_host: Some("localhost:8086".to_string()), + }) + .await + .expect("BigTable emulator must be running: `mise run local:bigtable`") +} + +fn base_ts() -> chrono::DateTime { + chrono::Utc.with_ymd_and_hms(2026, 5, 5, 18, 0, 0).unwrap() +} + +async fn fresh_prefix(client: &catalog_stats::Client, test: &str) -> String { + let prefix = format!("ct/{test}/"); + test_util::delete_rows_with_prefix(client, &prefix) + .await + .unwrap(); + prefix +} + +fn stats( + name: impl Into, + ts: chrono::DateTime, +) -> catalog_stats::CatalogStats { + catalog_stats::CatalogStats { + meta: ops::Meta { + uuid: "00000000-0000-0000-0000-000000000000".to_string(), + }, + catalog_name: name.into(), + ts, + stats_summary: catalog_stats::StatsSummary::default(), + task_stats: catalog_stats::TaskStats::default(), + } +} + +async fn names_of( + stream: impl futures_core::Stream>, +) -> Vec { + let rows: Vec = Box::pin(stream).try_collect().await.unwrap(); + rows.into_iter().map(|s| s.catalog_name).collect() +} + +async fn pairs_of( + stream: impl futures_core::Stream>, +) -> Vec<(String, chrono::DateTime)> { + let rows: Vec = Box::pin(stream).try_collect().await.unwrap(); + rows.into_iter().map(|s| (s.catalog_name, s.ts)).collect() +} + +#[tokio::test] +async fn fetch_at_for_names() { + struct Case { + name: &'static str, + seed: &'static [&'static str], + query: &'static [&'static str], + query_ts_us: i64, + expected: &'static [&'static str], + } + + let client = connect().await; + let base = base_ts(); + + for case in [ + Case { + name: "returns_seeded_rows_in_lex_order", + seed: &["c", "a", "b"], + query: &["c", "a", "b"], + query_ts_us: 0, + expected: &["a", "b", "c"], + }, + Case { + name: "empty_input_yields_empty_stream", + seed: &[], + query: &[], + query_ts_us: 0, + expected: &[], + }, + Case { + name: "omits_unknown_names", + seed: &["present"], + query: &["missing", "present"], + query_ts_us: 0, + expected: &["present"], + }, + Case { + name: "truncates_sub_ms_query_timestamp", + seed: &["foo"], + query: &["foo"], + query_ts_us: 999, + expected: &["foo"], + }, + ] { + let prefix = fresh_prefix(&client, case.name).await; + let rows: Vec<_> = case + .seed + .iter() + .map(|n| { + ( + catalog_stats::Grain::Hourly, + stats(format!("{prefix}{n}"), base), + ) + }) + .collect(); + test_util::seed_rows(&client, &rows).await.unwrap(); + + let full_query: Vec = case.query.iter().map(|n| format!("{prefix}{n}")).collect(); + let query_ref: Vec<&str> = full_query.iter().map(String::as_str).collect(); + let got = names_of(client.fetch_at_for_names( + &query_ref, + catalog_stats::Grain::Hourly, + base + chrono::Duration::microseconds(case.query_ts_us), + )) + .await; + + let expected: Vec = case + .expected + .iter() + .map(|n| format!("{prefix}{n}")) + .collect(); + assert_eq!(got, expected, "case: {}", case.name); + } +} + +#[tokio::test] +async fn fetch_range_for_names() { + struct Case { + name: &'static str, + seed: &'static [(&'static str, i64)], + query: &'static [&'static str], + range_hours: std::ops::Range, + expected: &'static [(&'static str, i64)], + } + + let client = connect().await; + let base = base_ts(); + let at = |h: i64| base + chrono::Duration::hours(h); + + for case in [ + Case { + name: "inclusive_start_exclusive_end", + seed: &[("foo", 0), ("foo", 1), ("foo", 2), ("foo", 3)], + query: &["foo"], + range_hours: 0..3, + expected: &[("foo", 0), ("foo", 1), ("foo", 2)], + }, + Case { + name: "orders_by_name_then_ts", + seed: &[("b", 1), ("a", 1), ("b", 0), ("a", 0)], + query: &["b", "a"], + range_hours: 0..2, + expected: &[("a", 0), ("a", 1), ("b", 0), ("b", 1)], + }, + Case { + name: "empty_names_yields_empty_stream", + seed: &[], + query: &[], + range_hours: 0..1, + expected: &[], + }, + Case { + name: "degenerate_range_yields_empty_stream", + seed: &[("any", 0)], + query: &["any"], + range_hours: 0..0, + expected: &[], + }, + ] { + let prefix = fresh_prefix(&client, case.name).await; + let rows: Vec<_> = case + .seed + .iter() + .map(|(n, h)| { + ( + catalog_stats::Grain::Hourly, + stats(format!("{prefix}{n}"), at(*h)), + ) + }) + .collect(); + test_util::seed_rows(&client, &rows).await.unwrap(); + + let full_query: Vec = case.query.iter().map(|n| format!("{prefix}{n}")).collect(); + let query_ref: Vec<&str> = full_query.iter().map(String::as_str).collect(); + + let got = pairs_of(client.fetch_range_for_names( + &query_ref, + catalog_stats::Grain::Hourly, + at(case.range_hours.start)..at(case.range_hours.end), + )) + .await; + + let expected: Vec<_> = case + .expected + .iter() + .map(|(n, h)| (format!("{prefix}{n}"), at(*h))) + .collect(); + assert_eq!(got, expected, "case: {}", case.name); + } +} + +#[tokio::test] +async fn fetch_range_for_prefix() { + struct Case { + name: &'static str, + seed: &'static [(&'static str, i64)], + sub_prefix: &'static str, + range_hours: std::ops::Range, + expected: &'static [(&'static str, i64)], + } + + let client = connect().await; + let base = base_ts(); + let at = |h: i64| base + chrono::Duration::hours(h); + + for case in [ + Case { + name: "includes_rollups", + seed: &[("tenant/", 0), ("tenant/a", 0), ("tenant/b", 0)], + sub_prefix: "tenant/", + range_hours: 0..1, + expected: &[("tenant/", 0), ("tenant/a", 0), ("tenant/b", 0)], + }, + Case { + name: "excludes_sibling_names", + seed: &[("tenant/foo", 0), ("tenant:foo", 0)], + sub_prefix: "tenant/", + range_hours: 0..1, + expected: &[("tenant/foo", 0)], + }, + Case { + name: "filters_rows_outside_ts_range", + seed: &[("tenant/foo", 1), ("tenant/foo", 3)], + sub_prefix: "tenant/", + range_hours: 0..2, + expected: &[("tenant/foo", 1)], + }, + ] { + let prefix = fresh_prefix(&client, case.name).await; + let rows: Vec<_> = case + .seed + .iter() + .map(|(n, h)| { + ( + catalog_stats::Grain::Hourly, + stats(format!("{prefix}{n}"), at(*h)), + ) + }) + .collect(); + test_util::seed_rows(&client, &rows).await.unwrap(); + + let query_prefix = format!("{prefix}{}", case.sub_prefix); + let got = pairs_of(client.fetch_range_for_prefix( + &query_prefix, + catalog_stats::Grain::Hourly, + at(case.range_hours.start)..at(case.range_hours.end), + )) + .await; + + let expected: Vec<_> = case + .expected + .iter() + .map(|(n, h)| (format!("{prefix}{n}"), at(*h))) + .collect(); + assert_eq!(got, expected, "case: {}", case.name); + } +} + +#[tokio::test] +async fn fetch_range_for_prefix_empty_prefix_yields_empty_stream() { + let client = connect().await; + let base = base_ts(); + let got = names_of(client.fetch_range_for_prefix( + "", + catalog_stats::Grain::Hourly, + base..base + chrono::Duration::hours(1), + )) + .await; + assert!(got.is_empty()); +} + +#[tokio::test] +async fn fetches_target_only_the_requested_grain() { + let client = connect().await; + let prefix = fresh_prefix(&client, "grain_isolation").await; + let ts = base_ts(); + let hourly_name = format!("{prefix}hourly"); + let daily_name = format!("{prefix}daily"); + let monthly_name = format!("{prefix}monthly"); + + test_util::seed_rows( + &client, + &[ + (catalog_stats::Grain::Hourly, stats(&hourly_name, ts)), + (catalog_stats::Grain::Daily, stats(&daily_name, ts)), + (catalog_stats::Grain::Monthly, stats(&monthly_name, ts)), + ], + ) + .await + .unwrap(); + + for (grain, expected) in [ + (catalog_stats::Grain::Hourly, vec![hourly_name.clone()]), + (catalog_stats::Grain::Daily, vec![daily_name.clone()]), + (catalog_stats::Grain::Monthly, vec![monthly_name.clone()]), + ] { + let got = names_of(client.fetch_at_for_names( + &[ + hourly_name.as_str(), + daily_name.as_str(), + monthly_name.as_str(), + ], + grain, + ts, + )) + .await; + assert_eq!(got, expected, "grain: {grain:?}"); + } +} diff --git a/crates/control-plane-api/Cargo.toml b/crates/control-plane-api/Cargo.toml index 7c62f1cca4c..75ad0c1b40f 100644 --- a/crates/control-plane-api/Cargo.toml +++ b/crates/control-plane-api/Cargo.toml @@ -62,6 +62,7 @@ serde_json = { workspace = true } sqlx = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } +time = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } tonic = { workspace = true } diff --git a/crates/control-plane-api/src/server/update_l2_reporting.rs b/crates/control-plane-api/src/server/update_l2_reporting.rs index 507a5c057cf..755db86f61b 100644 --- a/crates/control-plane-api/src/server/update_l2_reporting.rs +++ b/crates/control-plane-api/src/server/update_l2_reporting.rs @@ -56,9 +56,11 @@ pub async fn update_l2_reporting( // Extract draft collection templates from the bundle. const L2_INFERRED_NAME: &str = "ops.us-central1.v1/inferred-schemas/L2"; const L2_STATS_NAME: &str = "ops.us-central1.v1/catalog-stats-L2"; + const L2_STATS_NEW_NAME: &str = "ops/rollups/L2/catalog-stats"; const L2_EVENTS_NAME: &str = "ops.us-central1.v1/events/L2"; let mut l2_inferred: Option = None; let mut l2_stats: Option = None; + let mut l2_stats_new: Option = None; let mut l2_events: Option = None; for row in collections { @@ -69,6 +71,9 @@ pub async fn update_l2_reporting( L2_STATS_NAME => { l2_stats = Some(row); } + L2_STATS_NEW_NAME => { + l2_stats_new = Some(row); + } L2_EVENTS_NAME => { l2_events = Some(row); } @@ -81,8 +86,8 @@ pub async fn update_l2_reporting( } } } - let (Some(mut l2_stats), Some(mut l2_inferred), Some(mut l2_events)) = - (l2_stats, l2_inferred, l2_events) + let (Some(mut l2_stats), Some(mut l2_stats_new), Some(mut l2_inferred), Some(mut l2_events)) = + (l2_stats, l2_stats_new, l2_inferred, l2_events) else { return Err(tonic::Status::internal( "expected template to include L2 status, inferred schemas, and catalog stats", @@ -101,6 +106,18 @@ pub async fn update_l2_reporting( .. } = &mut l2_stats.model.as_mut().unwrap().derive.as_mut().unwrap(); + let models::Derivation { + transforms: l2_stats_new_transforms, + using: l2_stats_new_using, + .. + } = &mut l2_stats_new + .model + .as_mut() + .unwrap() + .derive + .as_mut() + .unwrap(); + let models::Derivation { transforms: l2_events_transforms, .. @@ -115,15 +132,31 @@ pub async fn update_l2_reporting( ); }; + let models::DeriveUsing::Typescript(models::DeriveUsingTypescript { + module: l2_stats_new_module_raw, + }) = l2_stats_new_using + else { + return Err(tonic::Status::internal( + "ops/rollups/L2/catalog-stats derivation must be a TypeScript module", + ) + .into()); + }; + let mut l2_stats_module = r#"import * as Types from 'flow/ops.us-central1.v1/catalog-stats-L2.ts'; export class Derivation extends Types.IDerivation {"# .to_string(); + let mut l2_stats_new_module = r#"import * as Types from 'flow/ops/rollups/L2/catalog-stats.ts'; + +export class Derivation extends Types.IDerivation {"# + .to_string(); + // Remove template placeholders (they're used only for tests of reporting tasks). l2_inferred_transforms.clear(); l2_stats_transforms.clear(); + l2_stats_new_transforms.clear(); l2_events_transforms.clear(); // Add transforms for L1 derivations across all active data-planes. @@ -174,6 +207,26 @@ export class Derivation extends Types.IDerivation {"# shuffle: models::Shuffle::Any, }); + l2_stats_new_transforms.push(models::TransformDef { + name: models::Transform::new(&data_plane.ops_l2_stats_transform), + source: models::Source::Source(models::FullSource { + name: data_plane.ops_l1_stats_name.clone(), + partitions: None, + // Initial cutover: avoid back-filling history into the new rollup. + not_before: Some(time::macros::datetime!(2026-05-14 00:00:00 UTC)), + not_after: None, + }), + disable: !data_plane.enable_l2, + + backfill: 0, + lambda: models::RawValue::default(), + priority: 0, + read_delay: None, + shuffle: models::Shuffle::Key(models::CompositeKey::new([models::JsonPointer::new( + "/catalogName", + )])), + }); + l2_events_transforms.push(models::TransformDef { name: models::Transform::new(&data_plane.ops_l2_events_transform), source: models::Source::Collection(data_plane.ops_l1_events_name.clone()), @@ -204,13 +257,40 @@ export class Derivation extends Types.IDerivation {"# if !data_plane.enable_l2 { l2_stats_module.push_str("\n*/"); } + + if !data_plane.enable_l2 { + l2_stats_new_module.push_str("\n/*"); + } + l2_stats_new_module.push_str(&format!( + r#" + {method_name}(read: {{ doc: Types.{type_name}}}): Types.Document[] {{ + return [read.doc] + }}"#, + method_name = camel_case(&data_plane.ops_l2_stats_transform, false), + type_name = format!( + "Source{}", + camel_case(&data_plane.ops_l2_stats_transform, true) + ) + )); + if !data_plane.enable_l2 { + l2_stats_new_module.push_str("\n*/"); + } } l2_stats_module.push_str("\n}\n"); *l2_stats_module_raw = models::RawValue::from_value(&serde_json::json!(l2_stats_module)); + l2_stats_new_module.push_str("\n}\n"); + *l2_stats_new_module_raw = + models::RawValue::from_value(&serde_json::json!(l2_stats_new_module)); + let draft = tables::DraftCatalog { - collections: tables::DraftCollections::from_iter([l2_inferred, l2_stats, l2_events]), + collections: tables::DraftCollections::from_iter([ + l2_inferred, + l2_stats, + l2_stats_new, + l2_events, + ]), ..Default::default() }; @@ -251,11 +331,13 @@ export class Derivation extends Types.IDerivation {"# let previous = serde_json::json!({ "l2_inferred": live.get_by_key(&models::Collection::new(L2_INFERRED_NAME)).map(|r| &r.model), "l2_stats": live.get_by_key(&models::Collection::new(L2_STATS_NAME)).map(|r| &r.model), + "l2_stats_new": live.get_by_key(&models::Collection::new(L2_STATS_NEW_NAME)).map(|r| &r.model), "l2_events": live.get_by_key(&models::Collection::new(L2_EVENTS_NAME)).map(|r| &r.model), }); let next = serde_json::json!({ "l2_inferred": draft.get_by_key(&models::Collection::new(L2_INFERRED_NAME)).map(|r| &r.model), "l2_stats": draft.get_by_key(&models::Collection::new(L2_STATS_NAME)).map(|r| &r.model), + "l2_stats_new": draft.get_by_key(&models::Collection::new(L2_STATS_NEW_NAME)).map(|r| &r.model), "l2_events": draft.get_by_key(&models::Collection::new(L2_EVENTS_NAME)).map(|r| &r.model), }); diff --git a/crates/ops/Cargo.toml b/crates/ops/Cargo.toml index fb8a7494c21..8696619b64c 100644 --- a/crates/ops/Cargo.toml +++ b/crates/ops/Cargo.toml @@ -12,6 +12,7 @@ license.workspace = true proto-flow = { path = "../proto-flow" } anyhow = { workspace = true } +chrono = { workspace = true } colored_json = { workspace = true } lazy_static = { workspace = true } regex = { workspace = true } diff --git a/crates/ops/src/catalog_stats.rs b/crates/ops/src/catalog_stats.rs new file mode 100644 index 00000000000..96628cac379 --- /dev/null +++ b/crates/ops/src/catalog_stats.rs @@ -0,0 +1,74 @@ +use crate::{Meta, stats}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +/// Time grain at which a `CatalogStats` row aggregates data. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Grain { + Hourly, + Daily, + Monthly, +} + +impl std::fmt::Display for Grain { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + Grain::Hourly => "hourly", + Grain::Daily => "daily", + Grain::Monthly => "monthly", + }) + } +} + +/// Aggregated catalog stats for one `(catalog_name, ts)` pair at a specific +/// `Grain`. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CatalogStats { + #[serde(rename = "_meta")] + pub meta: Meta, + pub catalog_name: String, + pub ts: DateTime, + pub stats_summary: StatsSummary, + #[serde(default)] + pub task_stats: TaskStats, +} + +/// Combined totals across every task and collection contributing to the row. +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct StatsSummary { + #[serde(default)] + pub read_by_me: stats::DocsAndBytes, + #[serde(default)] + pub read_from_me: stats::DocsAndBytes, + #[serde(default)] + pub written_by_me: stats::DocsAndBytes, + #[serde(default)] + pub written_to_me: stats::DocsAndBytes, + #[serde(default)] + pub warnings: u64, + #[serde(default)] + pub errors: u64, + #[serde(default)] + pub failures: u64, + #[serde(default)] + pub usage_seconds: u64, + #[serde(default)] + pub txn_count: u64, +} + +/// Per-task-kind breakouts: maps keyed by collection name for +/// captures (target) and materializations (source), and the single +/// derivation block (a derivation has one output collection). +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskStats { + #[serde(default)] + pub capture: BTreeMap, + #[serde(default)] + pub derive: Option, + #[serde(default)] + pub materialize: BTreeMap, +} diff --git a/crates/ops/src/lib.rs b/crates/ops/src/lib.rs index 55381320f41..0d7a10127e6 100644 --- a/crates/ops/src/lib.rs +++ b/crates/ops/src/lib.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize, de::Error}; use std::collections::BTreeMap; use std::io::Write; +pub mod catalog_stats; pub mod decode; pub mod tracing; diff --git a/local/systemd/flow-bigtable.service b/local/systemd/flow-bigtable.service new file mode 100644 index 00000000000..3f14a945fa3 --- /dev/null +++ b/local/systemd/flow-bigtable.service @@ -0,0 +1,16 @@ +[Unit] +Description=Flow BigTable Emulator (local development) +Documentation=https://github.com/estuary/flow + +[Service] +Type=simple + +ExecStart=docker run --rm --name flow-bigtable \ + -p 8086:8086 \ + google/cloud-sdk:latest \ + gcloud beta emulators bigtable start --host-port=0.0.0.0:8086 + +ExecStop=-/bin/sh -c 'docker stop flow-bigtable >/dev/null 2>&1 || true' + +Restart=on-failure +RestartSec=5s diff --git a/local/systemd/flow-control-agent.service b/local/systemd/flow-control-agent.service index d412ebf654e..7bc97530a39 100644 --- a/local/systemd/flow-control-agent.service +++ b/local/systemd/flow-control-agent.service @@ -3,6 +3,8 @@ Description=Flow Control Plane Agent Documentation=https://github.com/estuary/flow After=flow-supabase.service Requires=flow-supabase.service +After=flow-bigtable.service +Requires=flow-bigtable.service PartOf=flow-control-plane.target [Service] diff --git a/mise/README.md b/mise/README.md index fadeb39c23e..53ddf63f9e1 100644 --- a/mise/README.md +++ b/mise/README.md @@ -294,6 +294,7 @@ mise tasks | `local:runtime-sidecar` | Start runtime-v2 sidecar for a local data plane | | `local:seed-controller-job` | Seed a controller job to trigger data plane converge | | `local:supabase` | Start Supabase only | +| `local:bigtable` | Start BigTable emulator only | | `local:stop` | Stop all services and clean up | | `local:dekaf` | Start Dekaf against local stack | | `local:dekaf-kafka` | Start local Kafka for Dekaf testing | diff --git a/mise/tasks/ci/platform-test b/mise/tasks/ci/platform-test index 276a3c18cee..bdd54964836 100755 --- a/mise/tasks/ci/platform-test +++ b/mise/tasks/ci/platform-test @@ -17,6 +17,7 @@ mise run ci:gnu-dev mise run build:gazette mise run build:flowctl-go mise run ci:nextest-build +mise run local:bigtable mise run ci:nextest-run mise run ci:doctest mise run ci:gotest diff --git a/mise/tasks/local/bigtable b/mise/tasks/local/bigtable new file mode 100755 index 00000000000..f5d458b6222 --- /dev/null +++ b/mise/tasks/local/bigtable @@ -0,0 +1,57 @@ +#!/usr/bin/env bash +set -euo pipefail + +#MISE description="Start the BigTable emulator via systemd" +#MISE depends=["local:common"] + +UNIT_FILE="${HOME}/.config/systemd/user/flow-bigtable.service" +if [ ! -f "$UNIT_FILE" ]; then + ln -sf "$(pwd)/local/systemd/flow-bigtable.service" "$UNIT_FILE" + systemctl --user daemon-reload +fi + +systemctl --user start flow-bigtable + +host="localhost:8086" + +# Wait for the emulator's TCP port. The gRPC server may still need a moment +# beyond this — the ensure_table retry loop below covers that gap. +for _ in $(seq 1 30); do + # Subshell so bash's own "connect: Connection refused" diagnostic for a + # failed /dev/tcp redirection is captured by 2>/dev/null. + if (exec 3<>"/dev/tcp/${host/:/\/}") 2>/dev/null; then + break + fi + sleep 1 +done + +# Create expected tables per stats grain. This is needed for tests that seed +# rows directly. `cbt` has no createtable-if-not-exists, so we tolerate the +# "AlreadyExists" error from re-runs and retry transient failures (e.g. gRPC +# not yet accepting requests right after the TCP port opens). +ensure_table() { + local table="$1" + local out + for _ in $(seq 1 10); do + if out=$(docker run --rm --network host \ + -e BIGTABLE_EMULATOR_HOST="${host}" \ + google/cloud-sdk:latest \ + cbt -project estuary-local -instance estuary-local \ + createtable "${table}" families=f 2>&1); then + return 0 + fi + if grep -q 'AlreadyExists' <<<"${out}"; then + return 0 + fi + sleep 1 + done + echo "Failed to create BigTable table ${table}:" >&2 + echo "${out}" >&2 + return 1 +} + +for grain in hourly daily monthly; do + ensure_table "catalog_stats_${grain}" +done + +echo "BigTable emulator ready at ${host}." diff --git a/mise/tasks/local/control-plane b/mise/tasks/local/control-plane index e5f9eb7cb29..865a7e6cd3b 100755 --- a/mise/tasks/local/control-plane +++ b/mise/tasks/local/control-plane @@ -2,7 +2,7 @@ set -euo pipefail #MISE description="Start control plane agent via systemd" -#MISE depends=["local:common", "local:tls-cert", "local:supabase"] +#MISE depends=["local:common", "local:tls-cert", "local:supabase", "local:bigtable"] FLOW_LOCAL="${HOME}/flow-local" mkdir -p "${FLOW_LOCAL}/env" @@ -26,6 +26,10 @@ SERVE_HANDLERS=true SKIP_CONNECTOR_TABLE_CHECK=true SSL_CERT_FILE=${FLOW_LOCAL}/ca.crt +BIGTABLE_PROJECT=estuary-local +BIGTABLE_INSTANCE=estuary-local +BIGTABLE_EMULATOR_HOST=localhost:8086 + EOF # Forward STRIPE_API_KEY from the invoking shell when set diff --git a/ops-catalog/catalog-stats-L2-rollup.ts b/ops-catalog/catalog-stats-L2-rollup.ts new file mode 100644 index 00000000000..bfab45d411b --- /dev/null +++ b/ops-catalog/catalog-stats-L2-rollup.ts @@ -0,0 +1,11 @@ +import { IDerivation, Document, SourceFromBaseName } from 'flow/ops/rollups/L2/catalog-stats.ts'; + +// Placeholder for the ops/rollups/L2/catalog-stats derivation. Production +// overwrites this module in `update_l2_reporting` with one method per +// data-plane transform; this file's content is only exercised by catalog +// tests. +export class Derivation extends IDerivation { + fromBaseName(read: { doc: SourceFromBaseName }): Document[] { + return [read.doc]; + } +} diff --git a/ops-catalog/data-plane-template.bundle.json b/ops-catalog/data-plane-template.bundle.json index 1f69ec62c35..81de04e433c 100644 --- a/ops-catalog/data-plane-template.bundle.json +++ b/ops-catalog/data-plane-template.bundle.json @@ -1,7 +1,7 @@ { "collections": { "ops/rollups/L1/BASE_NAME/catalog-stats": { - "schema": {"$defs":{"__flowInline1":{"$defs":{"docsAndBytes":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal","bytesTotal"],"type":"object"},"transformStats":{"description":"Stats for a specific transform of a derivation, which will have an update, publish, or both.","properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this transform","type":"integer"},"input":{"$ref":"#/$defs/docsAndBytes","description":"The input documents that were fed into this transform."},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"source":{"description":"The name of the collection that this transform sources from","type":"string"}},"reduce":{"strategy":"merge"},"required":["input"],"type":"object"}},"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/stats.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Statistics related to the processing of a Flow capture, derivation, or materialization","properties":{"capture":{"additionalProperties":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently captured document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"right":{"$ref":"#/$defs/docsAndBytes","description":"Documents fed into the combiner from the source"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"Capture stats, organized by collection. The keys of this object are the collection names, and the values are the stats for that collection.","reduce":{"strategy":"merge"},"type":"object"},"derive":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently derived document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"published":{"$ref":"#/$defs/docsAndBytes"},"transforms":{"additionalProperties":{"$ref":"#/$defs/transformStats"},"description":"A map of each transform (transform name, not collection name) to stats for that transform","reduce":{"strategy":"merge"},"type":"object"}},"reduce":{"strategy":"merge"},"type":"object"},"interval":{"properties":{"uptimeSeconds":{"description":"Number of seconds that the task shard is metered as having been running","minimum":1,"reduce":{"strategy":"sum"},"type":"integer"},"usageRate":{"default":0,"description":"Usage rate which adjusts `uptimeSeconds` to determine the task's effective usage","minimum":0,"type":"number"}},"reduce":{"strategy":"merge"},"required":["uptimeSeconds"],"type":"object"},"materialize":{"additionalProperties":{"properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this binding","type":"integer"},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"left":{"$ref":"#/$defs/docsAndBytes"},"out":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal"],"type":"object"},"right":{"$ref":"#/$defs/docsAndBytes"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"A map of each binding source (collection name) to combiner stats for that binding","reduce":{"strategy":"merge"},"type":"object"},"openSecondsTotal":{"description":"Total time that the transaction was open before starting to commit","reduce":{"strategy":"sum"},"type":"number"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"},"txnCount":{"description":"Total number of transactions represented by this stats document","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["shard","ts"],"title":"Flow task stats","type":"object"},"__flowInline2":{"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"},"logCount":{"default":0,"description":"Total number of matching log events","reduce":{"strategy":"sum"},"type":"integer"}},"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/catalog-stats.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Statistics related to the processing of a Flow catalog.","properties":{"catalogName":{"description":"Name of the Flow catalog","type":"string"},"grain":{"description":"Time grain that the stats are aggregated over","enum":["monthly","daily","hourly"]},"statsSummary":{"properties":{"errors":{"$ref":"#/$defs/logCount","description":"Total number of logged errors"},"failures":{"$ref":"#/$defs/logCount","description":"Total number of shard failures"},"lastPublishedAt":{"description":"The most recent publish timestamp of documents in this collection.","format":"date-time","reduce":{"strategy":"maximize"},"type":"string"},"readByMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"readFromMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"txnCount":{"$ref":"#/$defs/logCount","description":"Total number of transactions that have been successfully processed"},"usageSeconds":{"default":0,"description":"Cumulative number of metered seconds of task usage","reduce":{"strategy":"sum"},"type":"integer"},"warnings":{"$ref":"#/$defs/logCount","description":"Total number of logged warnings"},"writtenByMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"writtenToMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"}},"reduce":{"strategy":"merge"},"type":"object"},"taskStats":{"properties":{"capture":{"$ref":"stats.schema.yaml#/properties/capture"},"derive":{"$ref":"stats.schema.yaml#/properties/derive"},"interval":{"$ref":"stats.schema.yaml#/properties/interval"},"materialize":{"$ref":"stats.schema.yaml#/properties/materialize"}},"reduce":{"strategy":"merge"},"type":"object"},"ts":{"description":"Timestamp of the catalog stat aggregate","format":"date-time","type":"string"}},"reduce":{"strategy":"merge"},"required":["catalogName","grain","ts","statsSummary"],"title":"Flow catalog task stats","type":"object"}, + "schema": {"$defs":{"__flowInline1":{"$defs":{"docsAndBytes":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal","bytesTotal"],"type":"object"},"transformStats":{"description":"Stats for a specific transform of a derivation, which will have an update, publish, or both.","properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this transform","type":"integer"},"input":{"$ref":"#/$defs/docsAndBytes","description":"The input documents that were fed into this transform."},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"source":{"description":"The name of the collection that this transform sources from","type":"string"}},"reduce":{"strategy":"merge"},"required":["input"],"type":"object"}},"$id":"file:///home/wbaker/estuary/flow/ops-catalog/stats.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Statistics related to the processing of a Flow capture, derivation, or materialization","properties":{"capture":{"additionalProperties":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently captured document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"right":{"$ref":"#/$defs/docsAndBytes","description":"Documents fed into the combiner from the source"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"Capture stats, organized by collection. The keys of this object are the collection names, and the values are the stats for that collection.","reduce":{"strategy":"merge"},"type":"object"},"derive":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently derived document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"published":{"$ref":"#/$defs/docsAndBytes"},"transforms":{"additionalProperties":{"$ref":"#/$defs/transformStats"},"description":"A map of each transform (transform name, not collection name) to stats for that transform","reduce":{"strategy":"merge"},"type":"object"}},"reduce":{"strategy":"merge"},"type":"object"},"interval":{"properties":{"uptimeSeconds":{"description":"Number of seconds that the task shard is metered as having been running","minimum":1,"reduce":{"strategy":"sum"},"type":"integer"},"usageRate":{"default":0,"description":"Usage rate which adjusts `uptimeSeconds` to determine the task's effective usage","minimum":0,"type":"number"}},"reduce":{"strategy":"merge"},"required":["uptimeSeconds"],"type":"object"},"materialize":{"additionalProperties":{"properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this binding","type":"integer"},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"left":{"$ref":"#/$defs/docsAndBytes"},"out":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal"],"type":"object"},"right":{"$ref":"#/$defs/docsAndBytes"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"A map of each binding source (collection name) to combiner stats for that binding","reduce":{"strategy":"merge"},"type":"object"},"openSecondsTotal":{"description":"Total time that the transaction was open before starting to commit","reduce":{"strategy":"sum"},"type":"number"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"},"txnCount":{"description":"Total number of transactions represented by this stats document","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["shard","ts"],"title":"Flow task stats","type":"object"},"__flowInline2":{"$id":"file:///home/wbaker/estuary/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"},"logCount":{"default":0,"description":"Total number of matching log events","reduce":{"strategy":"sum"},"type":"integer"}},"$id":"file:///home/wbaker/estuary/flow/ops-catalog/catalog-stats.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Statistics related to the processing of a Flow catalog.","properties":{"catalogName":{"description":"Name of the Flow catalog","type":"string"},"grain":{"description":"Time grain that the stats are aggregated over","enum":["monthly","daily","hourly"]},"statsSummary":{"properties":{"errors":{"$ref":"#/$defs/logCount","description":"Total number of logged errors"},"failures":{"$ref":"#/$defs/logCount","description":"Total number of shard failures"},"lastPublishedAt":{"description":"The most recent publish timestamp of documents in this collection.","format":"date-time","reduce":{"strategy":"maximize"},"type":"string"},"readByMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"readFromMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"txnCount":{"$ref":"#/$defs/logCount","description":"Total number of transactions that have been successfully processed"},"usageSeconds":{"default":0,"description":"Cumulative number of metered seconds of task usage","reduce":{"strategy":"sum"},"type":"integer"},"warnings":{"$ref":"#/$defs/logCount","description":"Total number of logged warnings"},"writtenByMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"writtenToMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"}},"reduce":{"strategy":"merge"},"type":"object"},"taskStats":{"properties":{"capture":{"$ref":"stats.schema.yaml#/properties/capture"},"derive":{"$ref":"stats.schema.yaml#/properties/derive"},"interval":{"$ref":"stats.schema.yaml#/properties/interval"},"materialize":{"$ref":"stats.schema.yaml#/properties/materialize"}},"reduce":{"strategy":"merge"},"type":"object"},"ts":{"description":"Timestamp of the catalog stat aggregate","format":"date-time","type":"string"}},"reduce":{"strategy":"merge"},"required":["catalogName","grain","ts","statsSummary"],"title":"Flow catalog task stats","type":"object"}, "key": [ "/catalogName", "/grain", @@ -43,7 +43,7 @@ } }, "ops/rollups/L1/BASE_NAME/events": { - "schema": {"$defs":{"__flowInline1":{"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/logs.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Logs related to the processing of a Flow capture, derivation, or materialization","properties":{"fields":{"additionalProperties":true,"description":"Map of keys and values that are associated with this log entry.","type":"object"},"level":{"enum":["error","warn","info","debug","trace"]},"message":{"type":"string"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"}},"required":["shard","ts","level"],"title":"Flow task logs","type":"object"},"__flowInline2":{"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"}},"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/events.schema.yaml","$ref":"logs.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Events are special logs that are intended to be consumed by the control plane","properties":{"fields":{"additionalProperties":true,"properties":{"error":{"description":"If the event represents an error, this field contains the error message.\n","type":"string"},"eventTarget":{"description":"The target of the event is a catalog name that the event pertains to.\n","type":"string"},"eventType":{"description":"Identifies this log message as an event of the given type. Events\nare special logs that are meant to be observed by the Flow control plane.\n","type":"string"}},"required":["eventType","eventTarget"]},"shard":{"description":"The source of the event, which may differ from the eventTarget"}},"required":["fields"],"title":"Flow events"}, + "schema": {"$defs":{"__flowInline1":{"$id":"file:///home/wbaker/estuary/flow/ops-catalog/logs.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Logs related to the processing of a Flow capture, derivation, or materialization","properties":{"fields":{"additionalProperties":true,"description":"Map of keys and values that are associated with this log entry.","type":"object"},"level":{"enum":["error","warn","info","debug","trace"]},"message":{"type":"string"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"}},"required":["shard","ts","level"],"title":"Flow task logs","type":"object"},"__flowInline2":{"$id":"file:///home/wbaker/estuary/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"}},"$id":"file:///home/wbaker/estuary/flow/ops-catalog/events.schema.yaml","$ref":"logs.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Events are special logs that are intended to be consumed by the control plane","properties":{"fields":{"additionalProperties":true,"properties":{"error":{"description":"If the event represents an error, this field contains the error message.\n","type":"string"},"eventTarget":{"description":"The target of the event is a catalog name that the event pertains to.\n","type":"string"},"eventType":{"description":"Identifies this log message as an event of the given type. Events\nare special logs that are meant to be observed by the Flow control plane.\n","type":"string"}},"required":["eventType","eventTarget"]},"shard":{"description":"The source of the event, which may differ from the eventTarget"}},"required":["fields"],"title":"Flow events"}, "key": [ "/fields/eventTarget", "/fields/eventType" @@ -72,7 +72,7 @@ } }, "ops/rollups/L1/BASE_NAME/inferred-schemas": { - "schema": {"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/inferred-schemas.schema.yaml","properties":{"collection_name":{"description":"The name of the collection that this schema was inferred for","type":"string"},"schema":{"description":"The inferred schema","reduce":{"strategy":"jsonSchemaMerge"},"type":"object"}},"reduce":{"strategy":"merge"},"required":["collection_name","schema"],"type":"object"}, + "schema": {"$id":"file:///home/wbaker/estuary/flow/ops-catalog/inferred-schemas.schema.yaml","properties":{"collection_name":{"description":"The name of the collection that this schema was inferred for","type":"string"},"schema":{"description":"The inferred schema","reduce":{"strategy":"jsonSchemaMerge"},"type":"object"}},"reduce":{"strategy":"merge"},"required":["collection_name","schema"],"type":"object"}, "key": [ "/collection_name" ], @@ -114,7 +114,7 @@ } }, "ops/tasks/BASE_NAME/logs": { - "schema": {"$defs":{"__flowInline1":{"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"}},"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/logs.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Logs related to the processing of a Flow capture, derivation, or materialization","properties":{"fields":{"additionalProperties":true,"description":"Map of keys and values that are associated with this log entry.","type":"object"},"level":{"enum":["error","warn","info","debug","trace"]},"message":{"type":"string"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"}},"required":["shard","ts","level"],"title":"Flow task logs","type":"object"}, + "schema": {"$defs":{"__flowInline1":{"$id":"file:///home/wbaker/estuary/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"}},"$id":"file:///home/wbaker/estuary/flow/ops-catalog/logs.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Logs related to the processing of a Flow capture, derivation, or materialization","properties":{"fields":{"additionalProperties":true,"description":"Map of keys and values that are associated with this log entry.","type":"object"},"level":{"enum":["error","warn","info","debug","trace"]},"message":{"type":"string"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"}},"required":["shard","ts","level"],"title":"Flow task logs","type":"object"}, "key": [ "/shard/name", "/shard/keyBegin", @@ -133,7 +133,7 @@ } }, "ops/tasks/BASE_NAME/stats": { - "schema": {"$defs":{"__flowInline1":{"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"},"docsAndBytes":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal","bytesTotal"],"type":"object"},"transformStats":{"description":"Stats for a specific transform of a derivation, which will have an update, publish, or both.","properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this transform","type":"integer"},"input":{"$ref":"#/$defs/docsAndBytes","description":"The input documents that were fed into this transform."},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"source":{"description":"The name of the collection that this transform sources from","type":"string"}},"reduce":{"strategy":"merge"},"required":["input"],"type":"object"}},"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/stats.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Statistics related to the processing of a Flow capture, derivation, or materialization","properties":{"capture":{"additionalProperties":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently captured document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"right":{"$ref":"#/$defs/docsAndBytes","description":"Documents fed into the combiner from the source"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"Capture stats, organized by collection. The keys of this object are the collection names, and the values are the stats for that collection.","reduce":{"strategy":"merge"},"type":"object"},"derive":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently derived document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"published":{"$ref":"#/$defs/docsAndBytes"},"transforms":{"additionalProperties":{"$ref":"#/$defs/transformStats"},"description":"A map of each transform (transform name, not collection name) to stats for that transform","reduce":{"strategy":"merge"},"type":"object"}},"reduce":{"strategy":"merge"},"type":"object"},"interval":{"properties":{"uptimeSeconds":{"description":"Number of seconds that the task shard is metered as having been running","minimum":1,"reduce":{"strategy":"sum"},"type":"integer"},"usageRate":{"default":0,"description":"Usage rate which adjusts `uptimeSeconds` to determine the task's effective usage","minimum":0,"type":"number"}},"reduce":{"strategy":"merge"},"required":["uptimeSeconds"],"type":"object"},"materialize":{"additionalProperties":{"properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this binding","type":"integer"},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"left":{"$ref":"#/$defs/docsAndBytes"},"out":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal"],"type":"object"},"right":{"$ref":"#/$defs/docsAndBytes"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"A map of each binding source (collection name) to combiner stats for that binding","reduce":{"strategy":"merge"},"type":"object"},"openSecondsTotal":{"description":"Total time that the transaction was open before starting to commit","reduce":{"strategy":"sum"},"type":"number"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"},"txnCount":{"description":"Total number of transactions represented by this stats document","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["shard","ts"],"title":"Flow task stats","type":"object"}, + "schema": {"$defs":{"__flowInline1":{"$id":"file:///home/wbaker/estuary/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"},"docsAndBytes":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal","bytesTotal"],"type":"object"},"transformStats":{"description":"Stats for a specific transform of a derivation, which will have an update, publish, or both.","properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this transform","type":"integer"},"input":{"$ref":"#/$defs/docsAndBytes","description":"The input documents that were fed into this transform."},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"source":{"description":"The name of the collection that this transform sources from","type":"string"}},"reduce":{"strategy":"merge"},"required":["input"],"type":"object"}},"$id":"file:///home/wbaker/estuary/flow/ops-catalog/stats.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Statistics related to the processing of a Flow capture, derivation, or materialization","properties":{"capture":{"additionalProperties":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently captured document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"right":{"$ref":"#/$defs/docsAndBytes","description":"Documents fed into the combiner from the source"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"Capture stats, organized by collection. The keys of this object are the collection names, and the values are the stats for that collection.","reduce":{"strategy":"merge"},"type":"object"},"derive":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently derived document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"published":{"$ref":"#/$defs/docsAndBytes"},"transforms":{"additionalProperties":{"$ref":"#/$defs/transformStats"},"description":"A map of each transform (transform name, not collection name) to stats for that transform","reduce":{"strategy":"merge"},"type":"object"}},"reduce":{"strategy":"merge"},"type":"object"},"interval":{"properties":{"uptimeSeconds":{"description":"Number of seconds that the task shard is metered as having been running","minimum":1,"reduce":{"strategy":"sum"},"type":"integer"},"usageRate":{"default":0,"description":"Usage rate which adjusts `uptimeSeconds` to determine the task's effective usage","minimum":0,"type":"number"}},"reduce":{"strategy":"merge"},"required":["uptimeSeconds"],"type":"object"},"materialize":{"additionalProperties":{"properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this binding","type":"integer"},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"left":{"$ref":"#/$defs/docsAndBytes"},"out":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal"],"type":"object"},"right":{"$ref":"#/$defs/docsAndBytes"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"A map of each binding source (collection name) to combiner stats for that binding","reduce":{"strategy":"merge"},"type":"object"},"openSecondsTotal":{"description":"Total time that the transaction was open before starting to commit","reduce":{"strategy":"sum"},"type":"number"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"},"txnCount":{"description":"Total number of transactions represented by this stats document","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["shard","ts"],"title":"Flow task stats","type":"object"}, "key": [ "/shard/name", "/shard/keyBegin", diff --git a/ops-catalog/flow_generated/typescript/ops/rollups/L2/catalog-stats.ts b/ops-catalog/flow_generated/typescript/ops/rollups/L2/catalog-stats.ts new file mode 100644 index 00000000000..718a6fbd936 --- /dev/null +++ b/ops-catalog/flow_generated/typescript/ops/rollups/L2/catalog-stats.ts @@ -0,0 +1,208 @@ + +// Generated for published documents of derived collection ops/rollups/L2/catalog-stats. +export type Document = /* Flow catalog task stats Statistics related to the processing of a Flow catalog. */ { + catalogName: /* Name of the Flow catalog */ string; + grain: /* Time grain that the stats are aggregated over */ "daily" | "hourly" | "monthly"; + statsSummary: { + errors?: /* Total number of logged errors */ number; + failures?: /* Total number of shard failures */ number; + lastPublishedAt?: /* The most recent publish timestamp of documents in this collection. */ string; + readByMe?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + readFromMe?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + txnCount?: /* Total number of transactions that have been successfully processed */ number; + usageSeconds?: /* Cumulative number of metered seconds of task usage */ number; + warnings?: /* Total number of logged warnings */ number; + writtenByMe?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + writtenToMe?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + }; + taskStats?: { + capture?: /* Capture stats, organized by collection. The keys of this object are the collection names, and the values are the stats for that collection. */ { + [k: string]: { + lastPublishedAt?: /* The publication timestamp of the most recently captured document. */ string; + out?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + right?: /* Documents fed into the combiner from the source */ { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + }; + }; + derive?: { + lastPublishedAt?: /* The publication timestamp of the most recently derived document. */ string; + out?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + published?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + transforms?: /* A map of each transform (transform name, not collection name) to stats for that transform */ { + [k: string]: /* Stats for a specific transform of a derivation, which will have an update, publish, or both. */ { + bytesBehind?: /* Bytes behind of the source journals for this transform */ number; + input: /* The input documents that were fed into this transform. */ { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + lastSourcePublishedAt?: /* The publication timestamp of the most recently processed source document. + */ string; + source?: /* The name of the collection that this transform sources from */ string; + }; + }; + }; + interval?: { + uptimeSeconds: /* Number of seconds that the task shard is metered as having been running */ number; + usageRate?: /* Usage rate which adjusts `uptimeSeconds` to determine the task's effective usage */ number; + }; + materialize?: /* A map of each binding source (collection name) to combiner stats for that binding */ { + [k: string]: { + bytesBehind?: /* Bytes behind of the source journals for this binding */ number; + lastSourcePublishedAt?: /* The publication timestamp of the most recently processed source document. + */ string; + left?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + out?: { + bytesTotal?: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + right?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + }; + }; + }; + ts: /* Timestamp of the catalog stat aggregate */ string; +}; + + +// Generated for read documents of sourced collection ops/rollups/L1/BASE_NAME/catalog-stats. +export type SourceFromBaseName = /* Flow catalog task stats Statistics related to the processing of a Flow catalog. */ { + catalogName: /* Name of the Flow catalog */ string; + grain: /* Time grain that the stats are aggregated over */ "daily" | "hourly" | "monthly"; + statsSummary: { + errors?: /* Total number of logged errors */ number; + failures?: /* Total number of shard failures */ number; + lastPublishedAt?: /* The most recent publish timestamp of documents in this collection. */ string; + readByMe?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + readFromMe?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + txnCount?: /* Total number of transactions that have been successfully processed */ number; + usageSeconds?: /* Cumulative number of metered seconds of task usage */ number; + warnings?: /* Total number of logged warnings */ number; + writtenByMe?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + writtenToMe?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + }; + taskStats?: { + capture?: /* Capture stats, organized by collection. The keys of this object are the collection names, and the values are the stats for that collection. */ { + [k: string]: { + lastPublishedAt?: /* The publication timestamp of the most recently captured document. */ string; + out?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + right?: /* Documents fed into the combiner from the source */ { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + }; + }; + derive?: { + lastPublishedAt?: /* The publication timestamp of the most recently derived document. */ string; + out?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + published?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + transforms?: /* A map of each transform (transform name, not collection name) to stats for that transform */ { + [k: string]: /* Stats for a specific transform of a derivation, which will have an update, publish, or both. */ { + bytesBehind?: /* Bytes behind of the source journals for this transform */ number; + input: /* The input documents that were fed into this transform. */ { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + lastSourcePublishedAt?: /* The publication timestamp of the most recently processed source document. + */ string; + source?: /* The name of the collection that this transform sources from */ string; + }; + }; + }; + interval?: { + uptimeSeconds: /* Number of seconds that the task shard is metered as having been running */ number; + usageRate?: /* Usage rate which adjusts `uptimeSeconds` to determine the task's effective usage */ number; + }; + materialize?: /* A map of each binding source (collection name) to combiner stats for that binding */ { + [k: string]: { + bytesBehind?: /* Bytes behind of the source journals for this binding */ number; + lastSourcePublishedAt?: /* The publication timestamp of the most recently processed source document. + */ string; + left?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + out?: { + bytesTotal?: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + right?: { + bytesTotal: /* Total number of bytes representing the JSON encoded documents */ number; + docsTotal: /* Total number of documents */ number; + }; + }; + }; + }; + ts: /* Timestamp of the catalog stat aggregate */ string; +}; + + +export abstract class IDerivation { + // Construct a new Derivation instance from a Request.Open message. + constructor(_open: { state: unknown }) { } + + // flush awaits any remaining documents to be published and returns them. + // deno-lint-ignore require-await + async flush(): Promise { + return []; + } + + // reset is called only when running catalog tests, and must reset any internal state. + async reset() { } + + // startCommit is notified of a runtime commit in progress, and returns an optional + // connector state update to be committed. + startCommit(_startCommit: { runtimeCheckpoint: unknown }): { state?: { updated: unknown, mergePatch: boolean } } { + return {}; + } + + abstract fromBaseName(read: { doc: SourceFromBaseName }): Document[]; +} diff --git a/ops-catalog/local-bigtable.config.yaml b/ops-catalog/local-bigtable.config.yaml new file mode 100644 index 00000000000..58be440bd70 --- /dev/null +++ b/ops-catalog/local-bigtable.config.yaml @@ -0,0 +1,8 @@ +project_id: estuary-local +instance_id: estuary-local +credentials: + auth_type: CredentialsJSON + credentials_json: "{}" +advanced: + endpoint: 172.17.0.1:8086 + feature_flags: allow_existing_tables_for_new_bindings diff --git a/ops-catalog/local-view.bundle.json b/ops-catalog/local-view.bundle.json index 4b366666661..d3b8a7914e2 100644 --- a/ops-catalog/local-view.bundle.json +++ b/ops-catalog/local-view.bundle.json @@ -145,6 +145,94 @@ } } ] + }, + "ops/views/stats": { + "endpoint": { + "connector": { + "image": "ghcr.io/estuary/materialize-bigtable:dev", + "config": {"project_id":"estuary-local","instance_id":"estuary-local","credentials":{"auth_type":"CredentialsJSON","credentials_json":"{}"},"advanced":{"endpoint":"172.17.0.1:8086","feature_flags":"allow_existing_tables_for_new_bindings"}} + } + }, + "bindings": [ + { + "resource": {"table":"catalog_stats_hourly"}, + "source": { + "name": "ops/rollups/L2/catalog-stats", + "partitions": { + "include": { + "grain": [ + "hourly" + ] + }, + "exclude": {} + } + }, + "fields": { + "groupBy": [ + "catalog_name", + "ts" + ], + "require": { + "catalog_name": {}, + "flow_document": {}, + "ts": {} + }, + "recommended": false + } + }, + { + "resource": {"table":"catalog_stats_daily"}, + "source": { + "name": "ops/rollups/L2/catalog-stats", + "partitions": { + "include": { + "grain": [ + "daily" + ] + }, + "exclude": {} + } + }, + "fields": { + "groupBy": [ + "catalog_name", + "ts" + ], + "require": { + "catalog_name": {}, + "flow_document": {}, + "ts": {} + }, + "recommended": false + } + }, + { + "resource": {"table":"catalog_stats_monthly"}, + "source": { + "name": "ops/rollups/L2/catalog-stats", + "partitions": { + "include": { + "grain": [ + "monthly" + ] + }, + "exclude": {} + } + }, + "fields": { + "groupBy": [ + "catalog_name", + "ts" + ], + "require": { + "catalog_name": {}, + "flow_document": {}, + "ts": {} + }, + "recommended": false + } + } + ] } } } \ No newline at end of file diff --git a/ops-catalog/local-view.flow.yaml b/ops-catalog/local-view.flow.yaml index 256b39f222b..5ebce629979 100644 --- a/ops-catalog/local-view.flow.yaml +++ b/ops-catalog/local-view.flow.yaml @@ -99,3 +99,52 @@ materializations: exclude: - event_type - error + + ops/views/stats: + endpoint: + connector: + image: ghcr.io/estuary/materialize-bigtable:dev + config: local-bigtable.config.yaml + bindings: + - source: + name: ops/rollups/L2/catalog-stats + partitions: + include: + grain: ["hourly"] + resource: + table: catalog_stats_hourly + fields: + recommended: false + groupBy: [catalog_name, ts] + require: + catalog_name: {} + ts: {} + flow_document: {} + - source: + name: ops/rollups/L2/catalog-stats + partitions: + include: + grain: ["daily"] + resource: + table: catalog_stats_daily + fields: + recommended: false + groupBy: [catalog_name, ts] + require: + catalog_name: {} + ts: {} + flow_document: {} + - source: + name: ops/rollups/L2/catalog-stats + partitions: + include: + grain: ["monthly"] + resource: + table: catalog_stats_monthly + fields: + recommended: false + groupBy: [catalog_name, ts] + require: + catalog_name: {} + ts: {} + flow_document: {} diff --git a/ops-catalog/reporting-L2-template.bundle.json b/ops-catalog/reporting-L2-template.bundle.json index 07a388a8a68..12094367511 100644 --- a/ops-catalog/reporting-L2-template.bundle.json +++ b/ops-catalog/reporting-L2-template.bundle.json @@ -1,7 +1,7 @@ { "collections": { "ops.us-central1.v1/catalog-stats-L2": { - "schema": {"$defs":{"__flowInline1":{"$defs":{"docsAndBytes":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal","bytesTotal"],"type":"object"},"transformStats":{"description":"Stats for a specific transform of a derivation, which will have an update, publish, or both.","properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this transform","type":"integer"},"input":{"$ref":"#/$defs/docsAndBytes","description":"The input documents that were fed into this transform."},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"source":{"description":"The name of the collection that this transform sources from","type":"string"}},"reduce":{"strategy":"merge"},"required":["input"],"type":"object"}},"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/stats.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Statistics related to the processing of a Flow capture, derivation, or materialization","properties":{"capture":{"additionalProperties":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently captured document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"right":{"$ref":"#/$defs/docsAndBytes","description":"Documents fed into the combiner from the source"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"Capture stats, organized by collection. The keys of this object are the collection names, and the values are the stats for that collection.","reduce":{"strategy":"merge"},"type":"object"},"derive":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently derived document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"published":{"$ref":"#/$defs/docsAndBytes"},"transforms":{"additionalProperties":{"$ref":"#/$defs/transformStats"},"description":"A map of each transform (transform name, not collection name) to stats for that transform","reduce":{"strategy":"merge"},"type":"object"}},"reduce":{"strategy":"merge"},"type":"object"},"interval":{"properties":{"uptimeSeconds":{"description":"Number of seconds that the task shard is metered as having been running","minimum":1,"reduce":{"strategy":"sum"},"type":"integer"},"usageRate":{"default":0,"description":"Usage rate which adjusts `uptimeSeconds` to determine the task's effective usage","minimum":0,"type":"number"}},"reduce":{"strategy":"merge"},"required":["uptimeSeconds"],"type":"object"},"materialize":{"additionalProperties":{"properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this binding","type":"integer"},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"left":{"$ref":"#/$defs/docsAndBytes"},"out":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal"],"type":"object"},"right":{"$ref":"#/$defs/docsAndBytes"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"A map of each binding source (collection name) to combiner stats for that binding","reduce":{"strategy":"merge"},"type":"object"},"openSecondsTotal":{"description":"Total time that the transaction was open before starting to commit","reduce":{"strategy":"sum"},"type":"number"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"},"txnCount":{"description":"Total number of transactions represented by this stats document","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["shard","ts"],"title":"Flow task stats","type":"object"},"__flowInline2":{"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"},"logCount":{"default":0,"description":"Total number of matching log events","reduce":{"strategy":"sum"},"type":"integer"}},"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/catalog-stats.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Statistics related to the processing of a Flow catalog.","properties":{"catalogName":{"description":"Name of the Flow catalog","type":"string"},"grain":{"description":"Time grain that the stats are aggregated over","enum":["monthly","daily","hourly"]},"statsSummary":{"properties":{"errors":{"$ref":"#/$defs/logCount","description":"Total number of logged errors"},"failures":{"$ref":"#/$defs/logCount","description":"Total number of shard failures"},"lastPublishedAt":{"description":"The most recent publish timestamp of documents in this collection.","format":"date-time","reduce":{"strategy":"maximize"},"type":"string"},"readByMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"readFromMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"txnCount":{"$ref":"#/$defs/logCount","description":"Total number of transactions that have been successfully processed"},"usageSeconds":{"default":0,"description":"Cumulative number of metered seconds of task usage","reduce":{"strategy":"sum"},"type":"integer"},"warnings":{"$ref":"#/$defs/logCount","description":"Total number of logged warnings"},"writtenByMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"writtenToMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"}},"reduce":{"strategy":"merge"},"type":"object"},"taskStats":{"properties":{"capture":{"$ref":"stats.schema.yaml#/properties/capture"},"derive":{"$ref":"stats.schema.yaml#/properties/derive"},"interval":{"$ref":"stats.schema.yaml#/properties/interval"},"materialize":{"$ref":"stats.schema.yaml#/properties/materialize"}},"reduce":{"strategy":"merge"},"type":"object"},"ts":{"description":"Timestamp of the catalog stat aggregate","format":"date-time","type":"string"}},"reduce":{"strategy":"merge"},"required":["catalogName","grain","ts","statsSummary"],"title":"Flow catalog task stats","type":"object"}, + "schema": {"$defs":{"__flowInline1":{"$defs":{"docsAndBytes":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal","bytesTotal"],"type":"object"},"transformStats":{"description":"Stats for a specific transform of a derivation, which will have an update, publish, or both.","properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this transform","type":"integer"},"input":{"$ref":"#/$defs/docsAndBytes","description":"The input documents that were fed into this transform."},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"source":{"description":"The name of the collection that this transform sources from","type":"string"}},"reduce":{"strategy":"merge"},"required":["input"],"type":"object"}},"$id":"file:///home/wbaker/estuary/flow/ops-catalog/stats.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Statistics related to the processing of a Flow capture, derivation, or materialization","properties":{"capture":{"additionalProperties":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently captured document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"right":{"$ref":"#/$defs/docsAndBytes","description":"Documents fed into the combiner from the source"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"Capture stats, organized by collection. The keys of this object are the collection names, and the values are the stats for that collection.","reduce":{"strategy":"merge"},"type":"object"},"derive":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently derived document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"published":{"$ref":"#/$defs/docsAndBytes"},"transforms":{"additionalProperties":{"$ref":"#/$defs/transformStats"},"description":"A map of each transform (transform name, not collection name) to stats for that transform","reduce":{"strategy":"merge"},"type":"object"}},"reduce":{"strategy":"merge"},"type":"object"},"interval":{"properties":{"uptimeSeconds":{"description":"Number of seconds that the task shard is metered as having been running","minimum":1,"reduce":{"strategy":"sum"},"type":"integer"},"usageRate":{"default":0,"description":"Usage rate which adjusts `uptimeSeconds` to determine the task's effective usage","minimum":0,"type":"number"}},"reduce":{"strategy":"merge"},"required":["uptimeSeconds"],"type":"object"},"materialize":{"additionalProperties":{"properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this binding","type":"integer"},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"left":{"$ref":"#/$defs/docsAndBytes"},"out":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal"],"type":"object"},"right":{"$ref":"#/$defs/docsAndBytes"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"A map of each binding source (collection name) to combiner stats for that binding","reduce":{"strategy":"merge"},"type":"object"},"openSecondsTotal":{"description":"Total time that the transaction was open before starting to commit","reduce":{"strategy":"sum"},"type":"number"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"},"txnCount":{"description":"Total number of transactions represented by this stats document","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["shard","ts"],"title":"Flow task stats","type":"object"},"__flowInline2":{"$id":"file:///home/wbaker/estuary/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"},"logCount":{"default":0,"description":"Total number of matching log events","reduce":{"strategy":"sum"},"type":"integer"}},"$id":"file:///home/wbaker/estuary/flow/ops-catalog/catalog-stats.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Statistics related to the processing of a Flow catalog.","properties":{"catalogName":{"description":"Name of the Flow catalog","type":"string"},"grain":{"description":"Time grain that the stats are aggregated over","enum":["monthly","daily","hourly"]},"statsSummary":{"properties":{"errors":{"$ref":"#/$defs/logCount","description":"Total number of logged errors"},"failures":{"$ref":"#/$defs/logCount","description":"Total number of shard failures"},"lastPublishedAt":{"description":"The most recent publish timestamp of documents in this collection.","format":"date-time","reduce":{"strategy":"maximize"},"type":"string"},"readByMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"readFromMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"txnCount":{"$ref":"#/$defs/logCount","description":"Total number of transactions that have been successfully processed"},"usageSeconds":{"default":0,"description":"Cumulative number of metered seconds of task usage","reduce":{"strategy":"sum"},"type":"integer"},"warnings":{"$ref":"#/$defs/logCount","description":"Total number of logged warnings"},"writtenByMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"writtenToMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"}},"reduce":{"strategy":"merge"},"type":"object"},"taskStats":{"properties":{"capture":{"$ref":"stats.schema.yaml#/properties/capture"},"derive":{"$ref":"stats.schema.yaml#/properties/derive"},"interval":{"$ref":"stats.schema.yaml#/properties/interval"},"materialize":{"$ref":"stats.schema.yaml#/properties/materialize"}},"reduce":{"strategy":"merge"},"type":"object"},"ts":{"description":"Timestamp of the catalog stat aggregate","format":"date-time","type":"string"}},"reduce":{"strategy":"merge"},"required":["catalogName","grain","ts","statsSummary"],"title":"Flow catalog task stats","type":"object"}, "key": [ "/catalogName", "/grain", @@ -39,7 +39,7 @@ } }, "ops.us-central1.v1/events/L2": { - "schema": {"$defs":{"__flowInline1":{"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/logs.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Logs related to the processing of a Flow capture, derivation, or materialization","properties":{"fields":{"additionalProperties":true,"description":"Map of keys and values that are associated with this log entry.","type":"object"},"level":{"enum":["error","warn","info","debug","trace"]},"message":{"type":"string"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"}},"required":["shard","ts","level"],"title":"Flow task logs","type":"object"},"__flowInline2":{"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"}},"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/events.schema.yaml","$ref":"logs.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Events are special logs that are intended to be consumed by the control plane","properties":{"fields":{"additionalProperties":true,"properties":{"error":{"description":"If the event represents an error, this field contains the error message.\n","type":"string"},"eventTarget":{"description":"The target of the event is a catalog name that the event pertains to.\n","type":"string"},"eventType":{"description":"Identifies this log message as an event of the given type. Events\nare special logs that are meant to be observed by the Flow control plane.\n","type":"string"}},"required":["eventType","eventTarget"]},"shard":{"description":"The source of the event, which may differ from the eventTarget"}},"required":["fields"],"title":"Flow events"}, + "schema": {"$defs":{"__flowInline1":{"$id":"file:///home/wbaker/estuary/flow/ops-catalog/logs.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Logs related to the processing of a Flow capture, derivation, or materialization","properties":{"fields":{"additionalProperties":true,"description":"Map of keys and values that are associated with this log entry.","type":"object"},"level":{"enum":["error","warn","info","debug","trace"]},"message":{"type":"string"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"}},"required":["shard","ts","level"],"title":"Flow task logs","type":"object"},"__flowInline2":{"$id":"file:///home/wbaker/estuary/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"}},"$id":"file:///home/wbaker/estuary/flow/ops-catalog/events.schema.yaml","$ref":"logs.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Events are special logs that are intended to be consumed by the control plane","properties":{"fields":{"additionalProperties":true,"properties":{"error":{"description":"If the event represents an error, this field contains the error message.\n","type":"string"},"eventTarget":{"description":"The target of the event is a catalog name that the event pertains to.\n","type":"string"},"eventType":{"description":"Identifies this log message as an event of the given type. Events\nare special logs that are meant to be observed by the Flow control plane.\n","type":"string"}},"required":["eventType","eventTarget"]},"shard":{"description":"The source of the event, which may differ from the eventTarget"}},"required":["fields"],"title":"Flow events"}, "key": [ "/fields/eventTarget", "/fields/eventType" @@ -69,7 +69,7 @@ } }, "ops.us-central1.v1/inferred-schemas/L2": { - "schema": {"$id":"file:///Users/wbaker/estuary/flow/ops-catalog/inferred-schemas.schema.yaml","properties":{"collection_name":{"description":"The name of the collection that this schema was inferred for","type":"string"},"schema":{"description":"The inferred schema","reduce":{"strategy":"jsonSchemaMerge"},"type":"object"}},"reduce":{"strategy":"merge"},"required":["collection_name","schema"],"type":"object"}, + "schema": {"$id":"file:///home/wbaker/estuary/flow/ops-catalog/inferred-schemas.schema.yaml","properties":{"collection_name":{"description":"The name of the collection that this schema was inferred for","type":"string"},"schema":{"description":"The inferred schema","reduce":{"strategy":"jsonSchemaMerge"},"type":"object"}},"reduce":{"strategy":"merge"},"required":["collection_name","schema"],"type":"object"}, "key": [ "/collection_name" ], @@ -90,6 +90,39 @@ } ] } + }, + "ops/rollups/L2/catalog-stats": { + "schema": {"$defs":{"__flowInline1":{"$defs":{"docsAndBytes":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal","bytesTotal"],"type":"object"},"transformStats":{"description":"Stats for a specific transform of a derivation, which will have an update, publish, or both.","properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this transform","type":"integer"},"input":{"$ref":"#/$defs/docsAndBytes","description":"The input documents that were fed into this transform."},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"source":{"description":"The name of the collection that this transform sources from","type":"string"}},"reduce":{"strategy":"merge"},"required":["input"],"type":"object"}},"$id":"file:///home/wbaker/estuary/flow/ops-catalog/stats.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Statistics related to the processing of a Flow capture, derivation, or materialization","properties":{"capture":{"additionalProperties":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently captured document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"right":{"$ref":"#/$defs/docsAndBytes","description":"Documents fed into the combiner from the source"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"Capture stats, organized by collection. The keys of this object are the collection names, and the values are the stats for that collection.","reduce":{"strategy":"merge"},"type":"object"},"derive":{"properties":{"lastPublishedAt":{"description":"The publication timestamp of the most recently derived document.","format":"date-time","type":"string"},"out":{"$ref":"#/$defs/docsAndBytes"},"published":{"$ref":"#/$defs/docsAndBytes"},"transforms":{"additionalProperties":{"$ref":"#/$defs/transformStats"},"description":"A map of each transform (transform name, not collection name) to stats for that transform","reduce":{"strategy":"merge"},"type":"object"}},"reduce":{"strategy":"merge"},"type":"object"},"interval":{"properties":{"uptimeSeconds":{"description":"Number of seconds that the task shard is metered as having been running","minimum":1,"reduce":{"strategy":"sum"},"type":"integer"},"usageRate":{"default":0,"description":"Usage rate which adjusts `uptimeSeconds` to determine the task's effective usage","minimum":0,"type":"number"}},"reduce":{"strategy":"merge"},"required":["uptimeSeconds"],"type":"object"},"materialize":{"additionalProperties":{"properties":{"bytesBehind":{"description":"Bytes behind of the source journals for this binding","type":"integer"},"lastSourcePublishedAt":{"description":"The publication timestamp of the most recently processed source document.\n","format":"date-time","type":"string"},"left":{"$ref":"#/$defs/docsAndBytes"},"out":{"properties":{"bytesTotal":{"default":0,"description":"Total number of bytes representing the JSON encoded documents","reduce":{"strategy":"sum"},"type":"integer"},"docsTotal":{"default":0,"description":"Total number of documents","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["docsTotal"],"type":"object"},"right":{"$ref":"#/$defs/docsAndBytes"}},"reduce":{"strategy":"merge"},"type":"object"},"description":"A map of each binding source (collection name) to combiner stats for that binding","reduce":{"strategy":"merge"},"type":"object"},"openSecondsTotal":{"description":"Total time that the transaction was open before starting to commit","reduce":{"strategy":"sum"},"type":"number"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"},"txnCount":{"description":"Total number of transactions represented by this stats document","reduce":{"strategy":"sum"},"type":"integer"}},"reduce":{"strategy":"merge"},"required":["shard","ts"],"title":"Flow task stats","type":"object"},"__flowInline2":{"$id":"file:///home/wbaker/estuary/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"},"logCount":{"default":0,"description":"Total number of matching log events","reduce":{"strategy":"sum"},"type":"integer"}},"$id":"file:///home/wbaker/estuary/flow/ops-catalog/catalog-stats.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Statistics related to the processing of a Flow catalog.","properties":{"catalogName":{"description":"Name of the Flow catalog","type":"string"},"grain":{"description":"Time grain that the stats are aggregated over","enum":["monthly","daily","hourly"]},"statsSummary":{"properties":{"errors":{"$ref":"#/$defs/logCount","description":"Total number of logged errors"},"failures":{"$ref":"#/$defs/logCount","description":"Total number of shard failures"},"lastPublishedAt":{"description":"The most recent publish timestamp of documents in this collection.","format":"date-time","reduce":{"strategy":"maximize"},"type":"string"},"readByMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"readFromMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"txnCount":{"$ref":"#/$defs/logCount","description":"Total number of transactions that have been successfully processed"},"usageSeconds":{"default":0,"description":"Cumulative number of metered seconds of task usage","reduce":{"strategy":"sum"},"type":"integer"},"warnings":{"$ref":"#/$defs/logCount","description":"Total number of logged warnings"},"writtenByMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"},"writtenToMe":{"$ref":"stats.schema.yaml#/$defs/docsAndBytes"}},"reduce":{"strategy":"merge"},"type":"object"},"taskStats":{"properties":{"capture":{"$ref":"stats.schema.yaml#/properties/capture"},"derive":{"$ref":"stats.schema.yaml#/properties/derive"},"interval":{"$ref":"stats.schema.yaml#/properties/interval"},"materialize":{"$ref":"stats.schema.yaml#/properties/materialize"}},"reduce":{"strategy":"merge"},"type":"object"},"ts":{"description":"Timestamp of the catalog stat aggregate","format":"date-time","type":"string"}},"reduce":{"strategy":"merge"},"required":["catalogName","grain","ts","statsSummary"],"title":"Flow catalog task stats","type":"object"}, + "key": [ + "/catalogName", + "/grain", + "/ts" + ], + "projections": { + "catalog_name": "/catalogName", + "grain": { + "location": "/grain", + "partition": true + } + }, + "derive": { + "using": { + "typescript": { + "module": "import { IDerivation, Document, SourceFromBaseName } from 'flow/ops/rollups/L2/catalog-stats.ts';\n\n// Placeholder for the ops/rollups/L2/catalog-stats derivation. Production\n// overwrites this module in `update_l2_reporting` with one method per\n// data-plane transform; this file's content is only exercised by catalog\n// tests.\nexport class Derivation extends IDerivation {\n fromBaseName(read: { doc: SourceFromBaseName }): Document[] {\n return [read.doc];\n }\n}\n" + } + }, + "transforms": [ + { + "name": "from-base-name", + "source": "ops/rollups/L1/BASE_NAME/catalog-stats", + "shuffle": { + "key": [ + "/catalogName" + ] + } + } + ] + } } } } \ No newline at end of file diff --git a/ops-catalog/reporting-L2-template.flow.yaml b/ops-catalog/reporting-L2-template.flow.yaml index 5b76b0ec7cb..52f0f86dfa7 100644 --- a/ops-catalog/reporting-L2-template.flow.yaml +++ b/ops-catalog/reporting-L2-template.flow.yaml @@ -71,3 +71,24 @@ collections: # TODO(johnny): Ideally, this would be shuffle: { key: [/catalogName] } # However, while rolling out federated data-planes I'm minimizing churn. shuffle: any + + ops/rollups/L2/catalog-stats: + schema: catalog-stats.schema.yaml + key: [/catalogName, /grain, /ts] + projections: + catalog_name: /catalogName + grain: + location: /grain + partition: true + + derive: + using: + typescript: + module: catalog-stats-L2-rollup.ts + transforms: + # Bindings of this derivation are dynamically generated over all current data-planes. + # This binding is ignored in production, and is used only for tests. + - name: from-base-name + source: ops/rollups/L1/BASE_NAME/catalog-stats + shuffle: + key: [/catalogName] diff --git a/supabase/seed.sql b/supabase/seed.sql index e7a4bd74e15..e676a475d14 100644 --- a/supabase/seed.sql +++ b/supabase/seed.sql @@ -36,8 +36,12 @@ insert into public.role_grants (subject_role, object_role, capability) values ('ops/rollups/L1/', 'ops/rollups/L1/', 'write'), -- L2 roll-ups can read L1 roll-ups. ('ops.us-central1.v1/', 'ops/rollups/L1/', 'read'), + ('ops/rollups/L2/', 'ops/rollups/L1/', 'read'), -- L2 roll-ups can write to themselves. - ('ops.us-central1.v1/', 'ops.us-central1.v1/', 'write') + ('ops.us-central1.v1/', 'ops.us-central1.v1/', 'write'), + ('ops/rollups/L2/', 'ops/rollups/L2/', 'write'), + -- The local stats-view materialization reads L2 catalog-stats. + ('ops/views/', 'ops/rollups/L2/', 'read') ; -- Ops collections are directed to estuary-flow-poc and not estuary-trial for $reasons.