Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions apps/framework-cli/docs/operator/metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Operator metrics — connection-churn observability (Phase 0)

These metrics are exposed by the management server on `GET /metrics` in
OpenMetrics text format. They were introduced by the Phase 0 observability
work of the Redpanda connection-management plan
(`.cursor/plans/redpanda-connection-management/connection-churn/plan_connection-churn.md`)
and are the measurement baseline for every subsequent fix phase.

## `moose_kafka_client_gauge{purpose}`

- **Type**: Gauge
- **Description**: Number of live Kafka client handles held by this
process, broken down by the role of the client. Incremented when a
handle is constructed, decremented when the last reference is dropped.
- **Labels**:
- `purpose` — one of the role constants defined in
`apps/framework-cli/src/infrastructure/stream/kafka/client.rs`:
`ingest_producer`, `idempotent_producer`, `sync_producer`,
`sync_consumer`, `peek_consumer`, `mcp_sample_consumer`,
`health_probe`, `fetch_topics_consumer`, `fetch_topics_admin`,
`check_topic_size_consumer`, `admin_add_partitions`,
`admin_update_topic_config`, `admin_create_topics`,
`admin_delete_topics`, `admin_describe_topic_config`,
`function_worker_estimated` (see note below).
- **Expected range (per pod)**: in the ~20–30 region in steady state on
a main-branch deployment with 6 function processes; the exact
per-purpose breakdown is tracked on the connection-churn dashboard.
A sustained value of zero for any `purpose` usually indicates the
code path is inactive rather than a broken metric.
- **Notes**: `function_worker_estimated` is a coarse proxy emitted from
the Rust supervisor (`functions_registry.rs`) — one tick per parallel
worker — because TypeScript and Python workers don't use `rdkafka` and
therefore can't be instrumented the same way.

## `moose_function_worker_restarts_total{reason}`

- **Type**: Counter (exposed as `..._total` by the OpenMetrics encoder).
- **Description**: Cumulative count of streaming-function worker
restarts since process start, bucketed by exit classification.
- **Labels**:
- `reason` — Rust-supervised children emit one of
`rust_child_exit_ok`, `rust_child_exit_err_code`,
`rust_child_exit_signal`, `rust_child_wait_err`. TypeScript clusters
emit `ts_worker_exit_code_0`, `ts_worker_exit_code_nonzero`,
`ts_worker_killed_by_signal_<SIGNAL>`, or `ts_worker_killed_other`.
Python runners emit `py_worker_exit_code_0`,
`py_worker_exit_code_nonzero`, or `py_worker_killed_by_<SIGNAL>`.
- **Expected range**: `rate(...[5m])` should be 0 in steady state. Any
sustained non-zero rate indicates a misbehaving function and is worth
paging on.

## `moose_function_process_diff_updated_total{reason}`

- **Type**: Counter (exposed as `..._total`).
- **Description**: Cumulative count of iterations through
`InfrastructureMap::diff_function_processes` that decided to emit a
`ProcessChange::Updated`, labelled by the root cause of the decision.
- **Labels**:
- `reason`:
- `forced_always` — pre-Phase-1 behaviour: every existing function
process is treated as changed even when its fields are identical.
This is the counter Phase 1 will drive to zero.
- `no_change` — fields match exactly; still emits an `Updated` in
Phase 0 (behaviour is unchanged; Phase 1 will make this a no-op).
- Phase 1 introduces finer-grained reasons
(`executable_changed`, `version_changed`, etc.); they'll replace
`forced_always` in the dominant series.
- **Expected range**: Phase 0 baseline is ~0.83/s/pod across the fleet
(see research §3.1). Post-Phase-1, the `forced_always` series must
drop to zero; a non-zero rate after rollout is the regression signal.

## Kill-switch — `MOOSE_KAFKA_CLIENT_METRICS_DISABLED`

Setting this env var to a non-empty value disables `kafka_client_gauge`
instrumentation for the process. Used to rule out metric-tracking as a
CPU/memory regression source during rollout. The
`function_worker_restarts_total` and `function_process_diff_updated_total`
counters are **not** gated by this switch — they're cheap and do not
touch the hot Kafka client path.
Comment on lines +72 to +79
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Doc overstates the kill-switch condition.

kafka_client_tracking_enabled() in metrics.rs is std::env::var(KAFKA_METRICS_DISABLED_ENV).is_err(), so the switch trips on any set value — including empty string. Either tighten the check to reject empty (is_ok_and(|v| !v.is_empty())) or drop "non-empty" from the doc.

📝 Proposed doc tweak
-Setting this env var to a non-empty value disables `kafka_client_gauge`
-instrumentation for the process. Used to rule out metric-tracking as a
+Setting this env var to any value (including empty) disables `kafka_client_gauge`
+instrumentation for the process. Used to rule out metric-tracking as a
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
## Kill-switch — `MOOSE_KAFKA_CLIENT_METRICS_DISABLED`
Setting this env var to a non-empty value disables `kafka_client_gauge`
instrumentation for the process. Used to rule out metric-tracking as a
CPU/memory regression source during rollout. The
`function_worker_restarts_total` and `function_process_diff_updated_total`
counters are **not** gated by this switch — they're cheap and do not
touch the hot Kafka client path.
## Kill-switch — `MOOSE_KAFKA_CLIENT_METRICS_DISABLED`
Setting this env var to any value (including empty) disables `kafka_client_gauge`
instrumentation for the process. Used to rule out metric-tracking as a
CPU/memory regression source during rollout. The
`function_worker_restarts_total` and `function_process_diff_updated_total`
counters are **not** gated by this switch — they're cheap and do not
touch the hot Kafka client path.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/framework-cli/docs/operator/metrics.md` around lines 72 - 79, The doc is
incorrect: kafka_client_tracking_enabled() in metrics.rs currently treats any
set value (including empty) as disabling metrics; change the implementation in
metrics.rs (function kafka_client_tracking_enabled and reference
KAFKA_METRICS_DISABLED_ENV) to reject empty strings by using a check like
std::env::var(KAFKA_METRICS_DISABLED_ENV).is_ok_and(|v| !v.is_empty()), so the
kill-switch only trips for non-empty values (or alternatively update the doc to
remove "non-empty" if you prefer doc-only alignment).


## Endpoints used by language workers

Worker processes outside the Rust supervisor (TypeScript `cluster`
workers, Python streaming runners) don't have direct access to the Rust
metrics registry. They POST JSON event bodies to
`http://127.0.0.1:${MOOSE_MANAGEMENT_PORT}/metrics-logs`, where the
`metrics_log_route` handler forwards them to the shared registry. Only
`StreamingFunctionEvent` and `FunctionWorkerRestart` payloads are
currently accepted; see `apps/framework-cli/src/cli/local_webserver.rs`
for the exact allow-list.
4 changes: 3 additions & 1 deletion apps/framework-cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub mod settings;
/// `spawn_and_await_initial_compile`.
pub mod ts_compilation_watcher;
pub mod watcher;
use super::metrics::Metrics;
use super::metrics::{set_global_metrics_handle, Metrics};
use crate::utilities::constants;
use crate::utilities::docker::DockerClient;
use crate::utilities::docker_provider::DockerInfraProvider;
Expand Down Expand Up @@ -773,6 +773,7 @@ pub async fn top_command_handler(

let arc_metrics = Arc::new(metrics);
arc_metrics.start_listening_to_metrics(rx_events).await;
set_global_metrics_handle(&arc_metrics);

routines::start_development_mode(
project_arc,
Expand Down Expand Up @@ -1018,6 +1019,7 @@ pub async fn top_command_handler(

let arc_metrics = Arc::new(metrics);
arc_metrics.start_listening_to_metrics(rx_events).await;
set_global_metrics_handle(&arc_metrics);

let capture_handle = crate::utilities::capture::capture_usage(
ActivityType::ProdCommand,
Expand Down
120 changes: 103 additions & 17 deletions apps/framework-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,23 +1362,12 @@ async fn metrics_log_route(
let parsed: Result<MetricEvent, serde_json::Error> = serde_json::from_reader(body);
trace!("Parsed metrics log route: {:?}", parsed);

if let Ok(MetricEvent::StreamingFunctionEvent {
count_in,
count_out,
bytes,
function_name,
timestamp,
}) = parsed
{
metrics
.send_metric_event(MetricEvent::StreamingFunctionEvent {
timestamp,
count_in,
count_out,
bytes,
function_name: function_name.clone(),
})
.await;
match parsed {
Ok(event @ MetricEvent::StreamingFunctionEvent { .. })
| Ok(event @ MetricEvent::FunctionWorkerRestart { .. }) => {
metrics.send_metric_event(event).await;
}
_ => {}
}

Response::builder()
Expand Down Expand Up @@ -2798,6 +2787,7 @@ impl Webserver {
let producer = if project.features.streaming_engine {
Some(kafka::client::create_producer(
project.redpanda_config.clone(),
kafka::client::PURPOSE_INGEST_PRODUCER,
))
} else {
None
Expand Down Expand Up @@ -4280,4 +4270,100 @@ mod tests {
// Leading slash edge case
assert_eq!(find_api_name("/api/1", &apis), "/api/1");
}

#[tokio::test]
async fn metrics_endpoint_e2e_exposes_new_churn_observability_series() {
use crate::metrics::{Metrics, TelemetryMetadata};
use hyper::service::service_fn;
use std::convert::Infallible;
use std::time::Duration;
use tokio::net::TcpListener;

let (metrics, rx) = Metrics::new(
TelemetryMetadata {
machine_id: "smoke".to_string(),
is_moose_developer: false,
metric_labels: None,
metric_endpoints: None,
is_production: false,
project_name: "smoke".to_string(),
export_metrics: false,
},
None,
);
let metrics = Arc::new(metrics);
metrics.start_listening_to_metrics(rx).await;

metrics.kafka_client_created("smoke_producer");
metrics.kafka_client_created("smoke_producer");
metrics.kafka_client_dropped("smoke_producer");
metrics.function_worker_restart("smoke_reason".to_string());
metrics.function_process_diff_updated("forced_always");
metrics.function_process_diff_updated("forced_always");

let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

let metrics_for_server = metrics.clone();
let server_task = tokio::spawn(async move {
loop {
let (stream, _) = match listener.accept().await {
Ok(pair) => pair,
Err(_) => return,
};
let io = TokioIo::new(stream);
let metrics_clone = metrics_for_server.clone();
tokio::spawn(async move {
let service = service_fn(move |_req: Request<Incoming>| {
let m = metrics_clone.clone();
async move {
let resp = metrics_route(m).await.unwrap();
Ok::<_, Infallible>(resp)
}
});
let _ = auto::Builder::new(TokioExecutor::new())
.serve_connection(io, service)
.await;
});
}
});

tokio::time::sleep(Duration::from_millis(100)).await;

let url = format!("http://{addr}/metrics");
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(5))
.build()
.unwrap();
let resp = client.get(&url).send().await.expect("GET /metrics failed");
assert_eq!(resp.status().as_u16(), 200);
let body = resp.text().await.expect("read body");

assert!(
body.contains("# TYPE moose_kafka_client_gauge gauge"),
"missing kafka gauge TYPE header in /metrics body:\n{body}"
);
assert!(
body.contains(r#"moose_kafka_client_gauge{purpose="smoke_producer"} 1"#),
"expected gauge value of 1 after 2 creates + 1 drop:\n{body}"
);
assert!(
body.contains("# TYPE moose_function_worker_restarts counter"),
"missing worker restarts TYPE header:\n{body}"
);
assert!(
body.contains(r#"moose_function_worker_restarts_total{reason="smoke_reason"} 1"#),
"expected worker restart counter == 1:\n{body}"
);
assert!(
body.contains("# TYPE moose_function_process_diff_updated counter"),
"missing diff updated TYPE header:\n{body}"
);
assert!(
body.contains(r#"moose_function_process_diff_updated_total{reason="forced_always"} 2"#),
"expected diff counter == 2:\n{body}"
);

server_task.abort();
}
Comment on lines +4274 to +4368
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Document the new /metrics series.

This adds user-visible metrics surface, but the PR context does not include a matching apps/framework-docs-v2/content update for these new series/labels.

As per coding guidelines, **/framework-cli/src/**: "If any user-facing changes were made ... verify the documentation for THAT SPECIFIC feature is updated in apps/framework-docs-v2/content".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/framework-cli/src/cli/local_webserver.rs` around lines 4274 - 4368, The
test introduces three new public Prometheus series (moose_kafka_client_gauge
with label purpose, moose_function_worker_restarts_total with label reason, and
moose_function_process_diff_updated_total with label reason) surfaced by
Metrics/Metrics::new and exposed by metrics_route; update the user docs in
apps/framework-docs-v2/content to document each series name, type (gauge or
counter), label meanings (purpose, reason), example metric lines (e.g.
moose_kafka_client_gauge{purpose="smoke_producer"} 1), and any related
configuration or opt-out notes so the new observability surface is discoverable;
also ensure the docs index/TOC includes this metrics page.

}
12 changes: 9 additions & 3 deletions apps/framework-cli/src/cli/routines/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use crate::project::Project;
use super::{setup_redis_client, RoutineFailure, RoutineSuccess};

use crate::infrastructure::olap::clickhouse::model::ClickHouseTable;
use crate::infrastructure::stream::kafka::client::create_consumer;
use crate::infrastructure::stream::kafka::client::{
create_consumer, KafkaClientHandle, PURPOSE_PEEK_CONSUMER,
};
use futures::stream::BoxStream;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::{Message as KafkaMessage, Offset, TopicPartitionList};
Expand Down Expand Up @@ -76,13 +78,17 @@ pub async fn peek(
))
})?;

let consumer_ref: StreamConsumer;
let consumer_ref: KafkaClientHandle<StreamConsumer>;
let table_ref: ClickHouseTable;

let mut stream: BoxStream<anyhow::Result<Value>> = if is_stream {
let group_id = project.redpanda_config.prefix_with_namespace("peek");

consumer_ref = create_consumer(&project.redpanda_config, &[("group.id", &group_id)]);
consumer_ref = create_consumer(
&project.redpanda_config,
&[("group.id", &group_id)],
PURPOSE_PEEK_CONSUMER,
);
let consumer = &consumer_ref;

let topic = find_topic_by_name(&infra, name).ok_or_else(|| {
Expand Down
15 changes: 12 additions & 3 deletions apps/framework-cli/src/framework/core/infrastructure_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1504,9 +1504,18 @@ impl InfrastructureMap {

for (id, process) in self_processes {
if let Some(target_process) = target_processes.get(id) {
// Always treat function processes as updated if they exist in both maps
// This ensures function code changes are always redeployed
tracing::debug!("FunctionProcess updated (forced): {}", id);
let reason = if process == target_process {
"no_change"
} else {
"forced_always"
};
crate::metrics::record_function_process_diff_updated(reason);

tracing::debug!(
"FunctionProcess updated (forced, diff_reason={}): {}",
reason,
id
);
Comment on lines +1507 to +1518
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify current reason taxonomy and metric wiring for FunctionProcess diff updates.
fd -i 'metrics.rs' apps/framework-cli/src --exec rg -n "function_process_diff_updated|record_function_process_diff_updated|reason|no_change|forced_always|changed" {}
fd -i 'metrics_inserter.rs' apps/framework-cli/src --exec rg -n "FunctionProcessDiffUpdated|reason|no_change|forced_always|changed" {}
rg -n --type=rust "record_function_process_diff_updated|FunctionProcessDiffUpdated|no_change|forced_always|changed" apps/framework-cli/src

Repository: 514-labs/moosestack

Length of output: 40534


🏁 Script executed:

# View the diff_function_processes function around lines 1507-1518
sed -n '1480,1530p' apps/framework-cli/src/framework/core/infrastructure_map.rs

Repository: 514-labs/moosestack

Length of output: 2036


🏁 Script executed:

# Search for function signature and understand what this function does
rg -B 20 "let reason = if process == target_process" apps/framework-cli/src/framework/core/infrastructure_map.rs | head -60

Repository: 514-labs/moosestack

Length of output: 869


🏁 Script executed:

# Check if there are any comments or documentation about FunctionProcess diff semantics
rg -B 5 -A 5 "diff_function_processes|FunctionProcess.*diff" apps/framework-cli/src/framework/core/infrastructure_map.rs | head -80

Repository: 514-labs/moosestack

Length of output: 1243


🏁 Script executed:

# Check documentation/comments about what "forced_always" means and FunctionProcess restart semantics
rg -B 10 -A 5 "forced.*always|forced.*update.*process|FunctionProcess.*restart|restart.*process" apps/framework-cli/src/framework/core/infrastructure_map.rs | head -120

Repository: 514-labs/moosestack

Length of output: 722


🏁 Script executed:

# Check if there are similar patterns in orchestration/workflow diffs to understand the naming convention
rg -B 3 -A 3 "record_function_process_diff_updated|OrchestrationWorker.*diff|Workflow.*diff.*reason" apps/framework-cli/src/framework/core/infrastructure_map.rs | head -100

Repository: 514-labs/moosestack

Length of output: 330


🏁 Script executed:

# Look at the test to understand the metric intent
sed -n '8249,8300p' apps/framework-cli/src/framework/core/infrastructure_map.rs

Repository: 514-labs/moosestack

Length of output: 2377


🏁 Script executed:

# Look at the full test that uses "forced_always" to understand test expectations
sed -n '658,690p' apps/framework-cli/src/metrics.rs

Repository: 514-labs/moosestack

Length of output: 1337


🏁 Script executed:

# Check the full test_diff_function_process_no_changes_triggers_update to understand intent
sed -n '8249,8290p' apps/framework-cli/src/framework/core/infrastructure_map.rs

Repository: 514-labs/moosestack

Length of output: 2020


🏁 Script executed:

# Check if there are comments about why FunctionProcess is always forced to update
rg -B 20 "diff_function_processes" apps/framework-cli/src/framework/core/infrastructure_map.rs | grep -A 20 "fn diff_function_processes"

Repository: 514-labs/moosestack

Length of output: 94


🏁 Script executed:

# Search for all usages of the reason labels to see if they're hardcoded in tests/monitoring
rg "\"no_change\"|\"forced_always\"" apps/framework-cli/src --type=rust

Repository: 514-labs/moosestack

Length of output: 948


🏁 Script executed:

# Check if there's any documentation about what these metric reasons should mean
rg -B 5 -A 10 "function_process_diff_updated|FunctionProcessDiffUpdated" apps/framework-cli/src/metrics.rs | grep -A 15 "pub fn function_process_diff_updated"

Repository: 514-labs/moosestack

Length of output: 717


🏁 Script executed:

# Check if there are any monitoring dashboards or prometheus config that hardcode "forced_always"
rg "forced_always" apps/framework-cli --type=rust --type=yaml --type=json

Repository: 514-labs/moosestack

Length of output: 856


🏁 Script executed:

# Look at the git history or comments to understand why "forced_always" was chosen
git log --oneline --all -S "forced_always" -- apps/framework-cli/src/framework/core/infrastructure_map.rs | head -5

Repository: 514-labs/moosestack

Length of output: 101


🏁 Script executed:

# Read more context around the test to understand what the intent is
sed -n '8249,8330p' apps/framework-cli/src/framework/core/infrastructure_map.rs

Repository: 514-labs/moosestack

Length of output: 3600


🏁 Script executed:

# Check what the commit f4786f50 was about
git show f4786f50 --stat | head -30

Repository: 514-labs/moosestack

Length of output: 1635


🏁 Script executed:

# Check if there are any other test files that might reference these reason labels
find apps/framework-cli -name "*.rs" -type f -exec grep -l "forced_always\|no_change" {} \;

Repository: 514-labs/moosestack

Length of output: 319


🏁 Script executed:

# Check the broader context of how FunctionProcess updates are actually used/applied
rg -B 5 -A 10 "ProcessChange::FunctionProcess.*Updated" apps/framework-cli/src --type=rust | head -80

Repository: 514-labs/moosestack

Length of output: 5267


🏁 Script executed:

# Verify if the function worker actually restarts based on this diff (is it truly "forced_always"?)
rg -B 3 -A 3 "function_processes|FunctionProcess.*change" apps/framework-cli/src/framework/core/plan.rs | head -100

Repository: 514-labs/moosestack

Length of output: 45


Clarify reason labels to reflect actual vs identical diffs.

Line 1510 labels process != target_process as "forced_always", which obscures that the process config actually differs. For observability, the metric should distinguish whether a restart is due to an actual change or forced by the diff logic despite no change.

Proposed fix
-                let reason = if process == target_process {
-                    "no_change"
-                } else {
-                    "forced_always"
-                };
+                let reason = if process == target_process {
+                    "forced_no_change"
+                } else {
+                    "changed"
+                };

Update the metric tests in apps/framework-cli/src/metrics.rs (line 673–679) and apps/framework-cli/src/cli/local_webserver.rs (line 4301–4363) to use "changed" instead of "forced_always".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/framework-cli/src/framework/core/infrastructure_map.rs` around lines
1507 - 1518, The metric label for the restart reason is misleading: change the
branch that sets reason when process != target_process from "forced_always" to
"changed" (the variables involved are process, target_process, and reason) so
crate::metrics::record_function_process_diff_updated(reason) emits "changed" for
actual config differences; keep the tracing::debug call but update any tests
that assert the old label to expect "changed" instead of "forced_always" in the
metrics assertions.

process_updates += 1;
process_changes.push(ProcessChange::FunctionProcess(
Change::<FunctionProcess>::Updated {
Expand Down
Loading
Loading