-
Notifications
You must be signed in to change notification settings - Fork 31
ENG-2894: kafka observability #4047
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| # Operator metrics — connection-churn observability (Phase 0) | ||
|
|
||
| These metrics are exposed by the management server on `GET /metrics` in | ||
| OpenMetrics text format. They were introduced by the Phase 0 observability | ||
| work of the Redpanda connection-management plan | ||
| (`.cursor/plans/redpanda-connection-management/connection-churn/plan_connection-churn.md`) | ||
| and are the measurement baseline for every subsequent fix phase. | ||
|
|
||
| ## `moose_kafka_client_gauge{purpose}` | ||
|
|
||
| - **Type**: Gauge | ||
| - **Description**: Number of live Kafka client handles held by this | ||
| process, broken down by the role of the client. Incremented when a | ||
| handle is constructed, decremented when the last reference is dropped. | ||
| - **Labels**: | ||
| - `purpose` — one of the role constants defined in | ||
| `apps/framework-cli/src/infrastructure/stream/kafka/client.rs`: | ||
| `ingest_producer`, `idempotent_producer`, `sync_producer`, | ||
| `sync_consumer`, `peek_consumer`, `mcp_sample_consumer`, | ||
| `health_probe`, `fetch_topics_consumer`, `fetch_topics_admin`, | ||
| `check_topic_size_consumer`, `admin_add_partitions`, | ||
| `admin_update_topic_config`, `admin_create_topics`, | ||
| `admin_delete_topics`, `admin_describe_topic_config`, | ||
| `function_worker_estimated` (see note below). | ||
| - **Expected range (per pod)**: in the ~20–30 region in steady state on | ||
| a main-branch deployment with 6 function processes; the exact | ||
| per-purpose breakdown is tracked on the connection-churn dashboard. | ||
| A sustained value of zero for any `purpose` usually indicates the | ||
| code path is inactive rather than a broken metric. | ||
| - **Notes**: `function_worker_estimated` is a coarse proxy emitted from | ||
| the Rust supervisor (`functions_registry.rs`) — one tick per parallel | ||
| worker — because TypeScript and Python workers don't use `rdkafka` and | ||
| therefore can't be instrumented the same way. | ||
|
|
||
| ## `moose_function_worker_restarts_total{reason}` | ||
|
|
||
| - **Type**: Counter (exposed as `..._total` by the OpenMetrics encoder). | ||
| - **Description**: Cumulative count of streaming-function worker | ||
| restarts since process start, bucketed by exit classification. | ||
| - **Labels**: | ||
| - `reason` — Rust-supervised children emit one of | ||
| `rust_child_exit_ok`, `rust_child_exit_err_code`, | ||
| `rust_child_exit_signal`, `rust_child_wait_err`. TypeScript clusters | ||
| emit `ts_worker_exit_code_0`, `ts_worker_exit_code_nonzero`, | ||
| `ts_worker_killed_by_signal_<SIGNAL>`, or `ts_worker_killed_other`. | ||
| Python runners emit `py_worker_exit_code_0`, | ||
| `py_worker_exit_code_nonzero`, or `py_worker_killed_by_<SIGNAL>`. | ||
| - **Expected range**: `rate(...[5m])` should be 0 in steady state. Any | ||
| sustained non-zero rate indicates a misbehaving function and is worth | ||
| paging on. | ||
|
|
||
| ## `moose_function_process_diff_updated_total{reason}` | ||
|
|
||
| - **Type**: Counter (exposed as `..._total`). | ||
| - **Description**: Cumulative count of iterations through | ||
| `InfrastructureMap::diff_function_processes` that decided to emit a | ||
| `ProcessChange::Updated`, labelled by the root cause of the decision. | ||
| - **Labels**: | ||
| - `reason`: | ||
| - `forced_always` — pre-Phase-1 behaviour: every existing function | ||
| process is treated as changed even when its fields are identical. | ||
| This is the counter Phase 1 will drive to zero. | ||
| - `no_change` — fields match exactly; still emits an `Updated` in | ||
| Phase 0 (behaviour is unchanged; Phase 1 will make this a no-op). | ||
| - Phase 1 introduces finer-grained reasons | ||
| (`executable_changed`, `version_changed`, etc.); they'll replace | ||
| `forced_always` in the dominant series. | ||
| - **Expected range**: Phase 0 baseline is ~0.83/s/pod across the fleet | ||
| (see research §3.1). Post-Phase-1, the `forced_always` series must | ||
| drop to zero; a non-zero rate after rollout is the regression signal. | ||
|
|
||
| ## Kill-switch — `MOOSE_KAFKA_CLIENT_METRICS_DISABLED` | ||
|
|
||
| Setting this env var to a non-empty value disables `kafka_client_gauge` | ||
| instrumentation for the process. Used to rule out metric-tracking as a | ||
| CPU/memory regression source during rollout. The | ||
| `function_worker_restarts_total` and `function_process_diff_updated_total` | ||
| counters are **not** gated by this switch — they're cheap and do not | ||
| touch the hot Kafka client path. | ||
|
|
||
| ## Endpoints used by language workers | ||
|
|
||
| Worker processes outside the Rust supervisor (TypeScript `cluster` | ||
| workers, Python streaming runners) don't have direct access to the Rust | ||
| metrics registry. They POST JSON event bodies to | ||
| `http://127.0.0.1:${MOOSE_MANAGEMENT_PORT}/metrics-logs`, where the | ||
| `metrics_log_route` handler forwards them to the shared registry. Only | ||
| `StreamingFunctionEvent` and `FunctionWorkerRestart` payloads are | ||
| currently accepted; see `apps/framework-cli/src/cli/local_webserver.rs` | ||
| for the exact allow-list. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1362,23 +1362,12 @@ async fn metrics_log_route( | |
| let parsed: Result<MetricEvent, serde_json::Error> = serde_json::from_reader(body); | ||
| trace!("Parsed metrics log route: {:?}", parsed); | ||
|
|
||
| if let Ok(MetricEvent::StreamingFunctionEvent { | ||
| count_in, | ||
| count_out, | ||
| bytes, | ||
| function_name, | ||
| timestamp, | ||
| }) = parsed | ||
| { | ||
| metrics | ||
| .send_metric_event(MetricEvent::StreamingFunctionEvent { | ||
| timestamp, | ||
| count_in, | ||
| count_out, | ||
| bytes, | ||
| function_name: function_name.clone(), | ||
| }) | ||
| .await; | ||
| match parsed { | ||
| Ok(event @ MetricEvent::StreamingFunctionEvent { .. }) | ||
| | Ok(event @ MetricEvent::FunctionWorkerRestart { .. }) => { | ||
| metrics.send_metric_event(event).await; | ||
| } | ||
| _ => {} | ||
| } | ||
|
|
||
| Response::builder() | ||
|
|
@@ -2798,6 +2787,7 @@ impl Webserver { | |
| let producer = if project.features.streaming_engine { | ||
| Some(kafka::client::create_producer( | ||
| project.redpanda_config.clone(), | ||
| kafka::client::PURPOSE_INGEST_PRODUCER, | ||
| )) | ||
| } else { | ||
| None | ||
|
|
@@ -4280,4 +4270,100 @@ mod tests { | |
| // Leading slash edge case | ||
| assert_eq!(find_api_name("/api/1", &apis), "/api/1"); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn metrics_endpoint_e2e_exposes_new_churn_observability_series() { | ||
| use crate::metrics::{Metrics, TelemetryMetadata}; | ||
| use hyper::service::service_fn; | ||
| use std::convert::Infallible; | ||
| use std::time::Duration; | ||
| use tokio::net::TcpListener; | ||
|
|
||
| let (metrics, rx) = Metrics::new( | ||
| TelemetryMetadata { | ||
| machine_id: "smoke".to_string(), | ||
| is_moose_developer: false, | ||
| metric_labels: None, | ||
| metric_endpoints: None, | ||
| is_production: false, | ||
| project_name: "smoke".to_string(), | ||
| export_metrics: false, | ||
| }, | ||
| None, | ||
| ); | ||
| let metrics = Arc::new(metrics); | ||
| metrics.start_listening_to_metrics(rx).await; | ||
|
|
||
| metrics.kafka_client_created("smoke_producer"); | ||
| metrics.kafka_client_created("smoke_producer"); | ||
| metrics.kafka_client_dropped("smoke_producer"); | ||
| metrics.function_worker_restart("smoke_reason".to_string()); | ||
| metrics.function_process_diff_updated("forced_always"); | ||
| metrics.function_process_diff_updated("forced_always"); | ||
|
|
||
| let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||
| let addr = listener.local_addr().unwrap(); | ||
|
|
||
| let metrics_for_server = metrics.clone(); | ||
| let server_task = tokio::spawn(async move { | ||
| loop { | ||
| let (stream, _) = match listener.accept().await { | ||
| Ok(pair) => pair, | ||
| Err(_) => return, | ||
| }; | ||
| let io = TokioIo::new(stream); | ||
| let metrics_clone = metrics_for_server.clone(); | ||
| tokio::spawn(async move { | ||
| let service = service_fn(move |_req: Request<Incoming>| { | ||
| let m = metrics_clone.clone(); | ||
| async move { | ||
| let resp = metrics_route(m).await.unwrap(); | ||
| Ok::<_, Infallible>(resp) | ||
| } | ||
| }); | ||
| let _ = auto::Builder::new(TokioExecutor::new()) | ||
| .serve_connection(io, service) | ||
| .await; | ||
| }); | ||
| } | ||
| }); | ||
|
|
||
| tokio::time::sleep(Duration::from_millis(100)).await; | ||
|
|
||
| let url = format!("http://{addr}/metrics"); | ||
| let client = reqwest::Client::builder() | ||
| .timeout(Duration::from_secs(5)) | ||
| .build() | ||
| .unwrap(); | ||
| let resp = client.get(&url).send().await.expect("GET /metrics failed"); | ||
| assert_eq!(resp.status().as_u16(), 200); | ||
| let body = resp.text().await.expect("read body"); | ||
|
|
||
| assert!( | ||
| body.contains("# TYPE moose_kafka_client_gauge gauge"), | ||
| "missing kafka gauge TYPE header in /metrics body:\n{body}" | ||
| ); | ||
| assert!( | ||
| body.contains(r#"moose_kafka_client_gauge{purpose="smoke_producer"} 1"#), | ||
| "expected gauge value of 1 after 2 creates + 1 drop:\n{body}" | ||
| ); | ||
| assert!( | ||
| body.contains("# TYPE moose_function_worker_restarts counter"), | ||
| "missing worker restarts TYPE header:\n{body}" | ||
| ); | ||
| assert!( | ||
| body.contains(r#"moose_function_worker_restarts_total{reason="smoke_reason"} 1"#), | ||
| "expected worker restart counter == 1:\n{body}" | ||
| ); | ||
| assert!( | ||
| body.contains("# TYPE moose_function_process_diff_updated counter"), | ||
| "missing diff updated TYPE header:\n{body}" | ||
| ); | ||
| assert!( | ||
| body.contains(r#"moose_function_process_diff_updated_total{reason="forced_always"} 2"#), | ||
| "expected diff counter == 2:\n{body}" | ||
| ); | ||
|
|
||
| server_task.abort(); | ||
| } | ||
|
Comment on lines
+4274
to
+4368
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Document the new This adds user-visible metrics surface, but the PR context does not include a matching As per coding guidelines, 🤖 Prompt for AI Agents |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1504,9 +1504,18 @@ impl InfrastructureMap { | |
|
|
||
| for (id, process) in self_processes { | ||
| if let Some(target_process) = target_processes.get(id) { | ||
| // Always treat function processes as updated if they exist in both maps | ||
| // This ensures function code changes are always redeployed | ||
| tracing::debug!("FunctionProcess updated (forced): {}", id); | ||
| let reason = if process == target_process { | ||
| "no_change" | ||
| } else { | ||
| "forced_always" | ||
| }; | ||
| crate::metrics::record_function_process_diff_updated(reason); | ||
|
|
||
| tracing::debug!( | ||
| "FunctionProcess updated (forced, diff_reason={}): {}", | ||
| reason, | ||
| id | ||
| ); | ||
|
Comment on lines
+1507
to
+1518
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
set -euo pipefail
# Verify current reason taxonomy and metric wiring for FunctionProcess diff updates.
fd -i 'metrics.rs' apps/framework-cli/src --exec rg -n "function_process_diff_updated|record_function_process_diff_updated|reason|no_change|forced_always|changed" {}
fd -i 'metrics_inserter.rs' apps/framework-cli/src --exec rg -n "FunctionProcessDiffUpdated|reason|no_change|forced_always|changed" {}
rg -n --type=rust "record_function_process_diff_updated|FunctionProcessDiffUpdated|no_change|forced_always|changed" apps/framework-cli/srcRepository: 514-labs/moosestack Length of output: 40534 🏁 Script executed: # View the diff_function_processes function around lines 1507-1518
sed -n '1480,1530p' apps/framework-cli/src/framework/core/infrastructure_map.rsRepository: 514-labs/moosestack Length of output: 2036 🏁 Script executed: # Search for function signature and understand what this function does
rg -B 20 "let reason = if process == target_process" apps/framework-cli/src/framework/core/infrastructure_map.rs | head -60Repository: 514-labs/moosestack Length of output: 869 🏁 Script executed: # Check if there are any comments or documentation about FunctionProcess diff semantics
rg -B 5 -A 5 "diff_function_processes|FunctionProcess.*diff" apps/framework-cli/src/framework/core/infrastructure_map.rs | head -80Repository: 514-labs/moosestack Length of output: 1243 🏁 Script executed: # Check documentation/comments about what "forced_always" means and FunctionProcess restart semantics
rg -B 10 -A 5 "forced.*always|forced.*update.*process|FunctionProcess.*restart|restart.*process" apps/framework-cli/src/framework/core/infrastructure_map.rs | head -120Repository: 514-labs/moosestack Length of output: 722 🏁 Script executed: # Check if there are similar patterns in orchestration/workflow diffs to understand the naming convention
rg -B 3 -A 3 "record_function_process_diff_updated|OrchestrationWorker.*diff|Workflow.*diff.*reason" apps/framework-cli/src/framework/core/infrastructure_map.rs | head -100Repository: 514-labs/moosestack Length of output: 330 🏁 Script executed: # Look at the test to understand the metric intent
sed -n '8249,8300p' apps/framework-cli/src/framework/core/infrastructure_map.rsRepository: 514-labs/moosestack Length of output: 2377 🏁 Script executed: # Look at the full test that uses "forced_always" to understand test expectations
sed -n '658,690p' apps/framework-cli/src/metrics.rsRepository: 514-labs/moosestack Length of output: 1337 🏁 Script executed: # Check the full test_diff_function_process_no_changes_triggers_update to understand intent
sed -n '8249,8290p' apps/framework-cli/src/framework/core/infrastructure_map.rsRepository: 514-labs/moosestack Length of output: 2020 🏁 Script executed: # Check if there are comments about why FunctionProcess is always forced to update
rg -B 20 "diff_function_processes" apps/framework-cli/src/framework/core/infrastructure_map.rs | grep -A 20 "fn diff_function_processes"Repository: 514-labs/moosestack Length of output: 94 🏁 Script executed: # Search for all usages of the reason labels to see if they're hardcoded in tests/monitoring
rg "\"no_change\"|\"forced_always\"" apps/framework-cli/src --type=rustRepository: 514-labs/moosestack Length of output: 948 🏁 Script executed: # Check if there's any documentation about what these metric reasons should mean
rg -B 5 -A 10 "function_process_diff_updated|FunctionProcessDiffUpdated" apps/framework-cli/src/metrics.rs | grep -A 15 "pub fn function_process_diff_updated"Repository: 514-labs/moosestack Length of output: 717 🏁 Script executed: # Check if there are any monitoring dashboards or prometheus config that hardcode "forced_always"
rg "forced_always" apps/framework-cli --type=rust --type=yaml --type=jsonRepository: 514-labs/moosestack Length of output: 856 🏁 Script executed: # Look at the git history or comments to understand why "forced_always" was chosen
git log --oneline --all -S "forced_always" -- apps/framework-cli/src/framework/core/infrastructure_map.rs | head -5Repository: 514-labs/moosestack Length of output: 101 🏁 Script executed: # Read more context around the test to understand what the intent is
sed -n '8249,8330p' apps/framework-cli/src/framework/core/infrastructure_map.rsRepository: 514-labs/moosestack Length of output: 3600 🏁 Script executed: # Check what the commit f4786f50 was about
git show f4786f50 --stat | head -30Repository: 514-labs/moosestack Length of output: 1635 🏁 Script executed: # Check if there are any other test files that might reference these reason labels
find apps/framework-cli -name "*.rs" -type f -exec grep -l "forced_always\|no_change" {} \;Repository: 514-labs/moosestack Length of output: 319 🏁 Script executed: # Check the broader context of how FunctionProcess updates are actually used/applied
rg -B 5 -A 10 "ProcessChange::FunctionProcess.*Updated" apps/framework-cli/src --type=rust | head -80Repository: 514-labs/moosestack Length of output: 5267 🏁 Script executed: # Verify if the function worker actually restarts based on this diff (is it truly "forced_always"?)
rg -B 3 -A 3 "function_processes|FunctionProcess.*change" apps/framework-cli/src/framework/core/plan.rs | head -100Repository: 514-labs/moosestack Length of output: 45 Clarify reason labels to reflect actual vs identical diffs. Line 1510 labels Proposed fix- let reason = if process == target_process {
- "no_change"
- } else {
- "forced_always"
- };
+ let reason = if process == target_process {
+ "forced_no_change"
+ } else {
+ "changed"
+ };Update the metric tests in 🤖 Prompt for AI Agents |
||
| process_updates += 1; | ||
| process_changes.push(ProcessChange::FunctionProcess( | ||
| Change::<FunctionProcess>::Updated { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc overstates the kill-switch condition.
kafka_client_tracking_enabled()inmetrics.rsisstd::env::var(KAFKA_METRICS_DISABLED_ENV).is_err(), so the switch trips on any set value — including empty string. Either tighten the check to reject empty (is_ok_and(|v| !v.is_empty())) or drop "non-empty" from the doc.📝 Proposed doc tweak
📝 Committable suggestion
🤖 Prompt for AI Agents