diff --git a/apps/framework-cli/docs/operator/metrics.md b/apps/framework-cli/docs/operator/metrics.md new file mode 100644 index 0000000000..daf0a05a48 --- /dev/null +++ b/apps/framework-cli/docs/operator/metrics.md @@ -0,0 +1,90 @@ +# Operator metrics — connection-churn observability (Phase 0) + +These metrics are exposed by the management server on `GET /metrics` in +OpenMetrics text format. They were introduced by the Phase 0 observability +work of the Redpanda connection-management plan +(`.cursor/plans/redpanda-connection-management/connection-churn/plan_connection-churn.md`) +and are the measurement baseline for every subsequent fix phase. + +## `moose_kafka_client_gauge{purpose}` + +- **Type**: Gauge +- **Description**: Number of live Kafka client handles held by this + process, broken down by the role of the client. Incremented when a + handle is constructed, decremented when the last reference is dropped. +- **Labels**: + - `purpose` — one of the role constants defined in + `apps/framework-cli/src/infrastructure/stream/kafka/client.rs`: + `ingest_producer`, `idempotent_producer`, `sync_producer`, + `sync_consumer`, `peek_consumer`, `mcp_sample_consumer`, + `health_probe`, `fetch_topics_consumer`, `fetch_topics_admin`, + `check_topic_size_consumer`, `admin_add_partitions`, + `admin_update_topic_config`, `admin_create_topics`, + `admin_delete_topics`, `admin_describe_topic_config`, + `function_worker_estimated` (see note below). +- **Expected range (per pod)**: in the ~20–30 region in steady state on + a main-branch deployment with 6 function processes; the exact + per-purpose breakdown is tracked on the connection-churn dashboard. + A sustained value of zero for any `purpose` usually indicates the + code path is inactive rather than a broken metric. +- **Notes**: `function_worker_estimated` is a coarse proxy emitted from + the Rust supervisor (`functions_registry.rs`) — one tick per parallel + worker — because TypeScript and Python workers don't use `rdkafka` and + therefore can't be instrumented the same way. + +## `moose_function_worker_restarts_total{reason}` + +- **Type**: Counter (exposed as `..._total` by the OpenMetrics encoder). +- **Description**: Cumulative count of streaming-function worker + restarts since process start, bucketed by exit classification. +- **Labels**: + - `reason` — Rust-supervised children emit one of + `rust_child_exit_ok`, `rust_child_exit_err_code`, + `rust_child_exit_signal`, `rust_child_wait_err`. TypeScript clusters + emit `ts_worker_exit_code_0`, `ts_worker_exit_code_nonzero`, + `ts_worker_killed_by_signal_`, or `ts_worker_killed_other`. + Python runners emit `py_worker_exit_code_0`, + `py_worker_exit_code_nonzero`, or `py_worker_killed_by_`. +- **Expected range**: `rate(...[5m])` should be 0 in steady state. Any + sustained non-zero rate indicates a misbehaving function and is worth + paging on. + +## `moose_function_process_diff_updated_total{reason}` + +- **Type**: Counter (exposed as `..._total`). +- **Description**: Cumulative count of iterations through + `InfrastructureMap::diff_function_processes` that decided to emit a + `ProcessChange::Updated`, labelled by the root cause of the decision. +- **Labels**: + - `reason`: + - `forced_always` — pre-Phase-1 behaviour: every existing function + process is treated as changed even when its fields are identical. + This is the counter Phase 1 will drive to zero. + - `no_change` — fields match exactly; still emits an `Updated` in + Phase 0 (behaviour is unchanged; Phase 1 will make this a no-op). + - Phase 1 introduces finer-grained reasons + (`executable_changed`, `version_changed`, etc.); they'll replace + `forced_always` in the dominant series. +- **Expected range**: Phase 0 baseline is ~0.83/s/pod across the fleet + (see research §3.1). Post-Phase-1, the `forced_always` series must + drop to zero; a non-zero rate after rollout is the regression signal. + +## Kill-switch — `MOOSE_KAFKA_CLIENT_METRICS_DISABLED` + +Setting this env var to a non-empty value disables `kafka_client_gauge` +instrumentation for the process. Used to rule out metric-tracking as a +CPU/memory regression source during rollout. The +`function_worker_restarts_total` and `function_process_diff_updated_total` +counters are **not** gated by this switch — they're cheap and do not +touch the hot Kafka client path. + +## Endpoints used by language workers + +Worker processes outside the Rust supervisor (TypeScript `cluster` +workers, Python streaming runners) don't have direct access to the Rust +metrics registry. They POST JSON event bodies to +`http://127.0.0.1:${MOOSE_MANAGEMENT_PORT}/metrics-logs`, where the +`metrics_log_route` handler forwards them to the shared registry. Only +`StreamingFunctionEvent` and `FunctionWorkerRestart` payloads are +currently accepted; see `apps/framework-cli/src/cli/local_webserver.rs` +for the exact allow-list. diff --git a/apps/framework-cli/src/cli.rs b/apps/framework-cli/src/cli.rs index e766334076..1d70a94762 100644 --- a/apps/framework-cli/src/cli.rs +++ b/apps/framework-cli/src/cli.rs @@ -14,7 +14,7 @@ pub mod settings; /// `spawn_and_await_initial_compile`. pub mod ts_compilation_watcher; pub mod watcher; -use super::metrics::Metrics; +use super::metrics::{set_global_metrics_handle, Metrics}; use crate::utilities::constants; use crate::utilities::docker::DockerClient; use crate::utilities::docker_provider::DockerInfraProvider; @@ -773,6 +773,7 @@ pub async fn top_command_handler( let arc_metrics = Arc::new(metrics); arc_metrics.start_listening_to_metrics(rx_events).await; + set_global_metrics_handle(&arc_metrics); routines::start_development_mode( project_arc, @@ -1018,6 +1019,7 @@ pub async fn top_command_handler( let arc_metrics = Arc::new(metrics); arc_metrics.start_listening_to_metrics(rx_events).await; + set_global_metrics_handle(&arc_metrics); let capture_handle = crate::utilities::capture::capture_usage( ActivityType::ProdCommand, diff --git a/apps/framework-cli/src/cli/local_webserver.rs b/apps/framework-cli/src/cli/local_webserver.rs index 63fbe8b438..e7c79c4814 100644 --- a/apps/framework-cli/src/cli/local_webserver.rs +++ b/apps/framework-cli/src/cli/local_webserver.rs @@ -1362,23 +1362,12 @@ async fn metrics_log_route( let parsed: Result = serde_json::from_reader(body); trace!("Parsed metrics log route: {:?}", parsed); - if let Ok(MetricEvent::StreamingFunctionEvent { - count_in, - count_out, - bytes, - function_name, - timestamp, - }) = parsed - { - metrics - .send_metric_event(MetricEvent::StreamingFunctionEvent { - timestamp, - count_in, - count_out, - bytes, - function_name: function_name.clone(), - }) - .await; + match parsed { + Ok(event @ MetricEvent::StreamingFunctionEvent { .. }) + | Ok(event @ MetricEvent::FunctionWorkerRestart { .. }) => { + metrics.send_metric_event(event).await; + } + _ => {} } Response::builder() @@ -2798,6 +2787,7 @@ impl Webserver { let producer = if project.features.streaming_engine { Some(kafka::client::create_producer( project.redpanda_config.clone(), + kafka::client::PURPOSE_INGEST_PRODUCER, )) } else { None @@ -4280,4 +4270,100 @@ mod tests { // Leading slash edge case assert_eq!(find_api_name("/api/1", &apis), "/api/1"); } + + #[tokio::test] + async fn metrics_endpoint_e2e_exposes_new_churn_observability_series() { + use crate::metrics::{Metrics, TelemetryMetadata}; + use hyper::service::service_fn; + use std::convert::Infallible; + use std::time::Duration; + use tokio::net::TcpListener; + + let (metrics, rx) = Metrics::new( + TelemetryMetadata { + machine_id: "smoke".to_string(), + is_moose_developer: false, + metric_labels: None, + metric_endpoints: None, + is_production: false, + project_name: "smoke".to_string(), + export_metrics: false, + }, + None, + ); + let metrics = Arc::new(metrics); + metrics.start_listening_to_metrics(rx).await; + + metrics.kafka_client_created("smoke_producer"); + metrics.kafka_client_created("smoke_producer"); + metrics.kafka_client_dropped("smoke_producer"); + metrics.function_worker_restart("smoke_reason".to_string()); + metrics.function_process_diff_updated("forced_always"); + metrics.function_process_diff_updated("forced_always"); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let metrics_for_server = metrics.clone(); + let server_task = tokio::spawn(async move { + loop { + let (stream, _) = match listener.accept().await { + Ok(pair) => pair, + Err(_) => return, + }; + let io = TokioIo::new(stream); + let metrics_clone = metrics_for_server.clone(); + tokio::spawn(async move { + let service = service_fn(move |_req: Request| { + let m = metrics_clone.clone(); + async move { + let resp = metrics_route(m).await.unwrap(); + Ok::<_, Infallible>(resp) + } + }); + let _ = auto::Builder::new(TokioExecutor::new()) + .serve_connection(io, service) + .await; + }); + } + }); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let url = format!("http://{addr}/metrics"); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(5)) + .build() + .unwrap(); + let resp = client.get(&url).send().await.expect("GET /metrics failed"); + assert_eq!(resp.status().as_u16(), 200); + let body = resp.text().await.expect("read body"); + + assert!( + body.contains("# TYPE moose_kafka_client_gauge gauge"), + "missing kafka gauge TYPE header in /metrics body:\n{body}" + ); + assert!( + body.contains(r#"moose_kafka_client_gauge{purpose="smoke_producer"} 1"#), + "expected gauge value of 1 after 2 creates + 1 drop:\n{body}" + ); + assert!( + body.contains("# TYPE moose_function_worker_restarts counter"), + "missing worker restarts TYPE header:\n{body}" + ); + assert!( + body.contains(r#"moose_function_worker_restarts_total{reason="smoke_reason"} 1"#), + "expected worker restart counter == 1:\n{body}" + ); + assert!( + body.contains("# TYPE moose_function_process_diff_updated counter"), + "missing diff updated TYPE header:\n{body}" + ); + assert!( + body.contains(r#"moose_function_process_diff_updated_total{reason="forced_always"} 2"#), + "expected diff counter == 2:\n{body}" + ); + + server_task.abort(); + } } diff --git a/apps/framework-cli/src/cli/routines/peek.rs b/apps/framework-cli/src/cli/routines/peek.rs index ab358acb92..0774f452ae 100644 --- a/apps/framework-cli/src/cli/routines/peek.rs +++ b/apps/framework-cli/src/cli/routines/peek.rs @@ -14,7 +14,9 @@ use crate::project::Project; use super::{setup_redis_client, RoutineFailure, RoutineSuccess}; use crate::infrastructure::olap::clickhouse::model::ClickHouseTable; -use crate::infrastructure::stream::kafka::client::create_consumer; +use crate::infrastructure::stream::kafka::client::{ + create_consumer, KafkaClientHandle, PURPOSE_PEEK_CONSUMER, +}; use futures::stream::BoxStream; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::{Message as KafkaMessage, Offset, TopicPartitionList}; @@ -76,13 +78,17 @@ pub async fn peek( )) })?; - let consumer_ref: StreamConsumer; + let consumer_ref: KafkaClientHandle; let table_ref: ClickHouseTable; let mut stream: BoxStream> = if is_stream { let group_id = project.redpanda_config.prefix_with_namespace("peek"); - consumer_ref = create_consumer(&project.redpanda_config, &[("group.id", &group_id)]); + consumer_ref = create_consumer( + &project.redpanda_config, + &[("group.id", &group_id)], + PURPOSE_PEEK_CONSUMER, + ); let consumer = &consumer_ref; let topic = find_topic_by_name(&infra, name).ok_or_else(|| { diff --git a/apps/framework-cli/src/framework/core/infrastructure_map.rs b/apps/framework-cli/src/framework/core/infrastructure_map.rs index b379e493a6..886e72aa06 100644 --- a/apps/framework-cli/src/framework/core/infrastructure_map.rs +++ b/apps/framework-cli/src/framework/core/infrastructure_map.rs @@ -1504,9 +1504,18 @@ impl InfrastructureMap { for (id, process) in self_processes { if let Some(target_process) = target_processes.get(id) { - // Always treat function processes as updated if they exist in both maps - // This ensures function code changes are always redeployed - tracing::debug!("FunctionProcess updated (forced): {}", id); + let reason = if process == target_process { + "no_change" + } else { + "forced_always" + }; + crate::metrics::record_function_process_diff_updated(reason); + + tracing::debug!( + "FunctionProcess updated (forced, diff_reason={}): {}", + reason, + id + ); process_updates += 1; process_changes.push(ProcessChange::FunctionProcess( Change::::Updated { diff --git a/apps/framework-cli/src/infrastructure/processes/functions_registry.rs b/apps/framework-cli/src/infrastructure/processes/functions_registry.rs index d66b489402..ef6745fd47 100644 --- a/apps/framework-cli/src/infrastructure/processes/functions_registry.rs +++ b/apps/framework-cli/src/infrastructure/processes/functions_registry.rs @@ -4,7 +4,11 @@ use crate::{ infrastructure::function_process::FunctionProcess, infrastructure_map::InfrastructureMap, }, framework::{python, typescript}, - infrastructure::stream::{kafka::models::KafkaStreamConfig, StreamConfig}, + infrastructure::stream::{ + kafka::client::PURPOSE_FUNCTION_WORKER_ESTIMATED, kafka::models::KafkaStreamConfig, + StreamConfig, + }, + metrics::{record_kafka_client_created, record_kafka_client_dropped}, project::Project, utilities::system::KillProcessError, }; @@ -29,8 +33,15 @@ pub enum FunctionRegistryError { TopicNotFound { topic_id: String }, } +struct RegistryEntry { + process: RestartingProcess, + // Number of `PURPOSE_FUNCTION_WORKER_ESTIMATED` gauge increments we made + // on start; `stop` must decrement the same count so the gauge balances. + estimated_clients: usize, +} + pub struct FunctionProcessRegistry { - registry: HashMap, + registry: HashMap, project: Arc, } @@ -140,8 +151,17 @@ impl FunctionProcessRegistry { start_fn, RestartPolicy::Always, )?; - self.registry - .insert(function_process.id(), restarting_process); + let estimated_clients = parallel_process_count.max(1); + for _ in 0..estimated_clients { + record_kafka_client_created(PURPOSE_FUNCTION_WORKER_ESTIMATED); + } + self.registry.insert( + function_process.id(), + RegistryEntry { + process: restarting_process, + estimated_clients, + }, + ); Ok(()) } @@ -197,8 +217,17 @@ impl FunctionProcessRegistry { start_fn, RestartPolicy::Always, )?; - self.registry - .insert(function_process.id(), restarting_process); + let estimated_clients = parallel_process_count.max(1); + for _ in 0..estimated_clients { + record_kafka_client_created(PURPOSE_FUNCTION_WORKER_ESTIMATED); + } + self.registry.insert( + function_process.id(), + RegistryEntry { + process: restarting_process, + estimated_clients, + }, + ); Ok(()) } @@ -212,15 +241,21 @@ impl FunctionProcessRegistry { info!("Stopping function process {:?}...", function_process.id()); let id = &function_process.id(); - if let Some(restarting_process) = self.registry.remove(id) { - restarting_process.stop().await; + if let Some(entry) = self.registry.remove(id) { + for _ in 0..entry.estimated_clients { + record_kafka_client_dropped(PURPOSE_FUNCTION_WORKER_ESTIMATED); + } + entry.process.stop().await; } } pub async fn stop_all(&mut self) { - for (id, restarting_process) in self.registry.drain() { + for (id, entry) in self.registry.drain() { info!("Stopping function_process {:?}...", id); - restarting_process.stop().await; + for _ in 0..entry.estimated_clients { + record_kafka_client_dropped(PURPOSE_FUNCTION_WORKER_ESTIMATED); + } + entry.process.stop().await; } } } diff --git a/apps/framework-cli/src/infrastructure/processes/kafka_clickhouse_sync.rs b/apps/framework-cli/src/infrastructure/processes/kafka_clickhouse_sync.rs index 3c7a9f6c99..ed50c3b0e5 100644 --- a/apps/framework-cli/src/infrastructure/processes/kafka_clickhouse_sync.rs +++ b/apps/framework-cli/src/infrastructure/processes/kafka_clickhouse_sync.rs @@ -9,7 +9,7 @@ //! batching, back pressure, and error handling mechanisms. use futures::TryFutureExt; -use rdkafka::consumer::{Consumer, StreamConsumer}; +use rdkafka::consumer::Consumer; use rdkafka::producer::{DeliveryFuture, Producer}; use rdkafka::Message; use serde_json::Value; @@ -32,7 +32,9 @@ use crate::infrastructure::olap::clickhouse::model::{ ClickHouseColumn, ClickHouseRecord, ClickHouseRuntimeEnum, ClickHouseValue, }; use crate::infrastructure::stream::kafka::client::create_subscriber; -use crate::infrastructure::stream::kafka::client::{create_producer, send_with_back_pressure}; +use crate::infrastructure::stream::kafka::client::{ + create_producer, send_with_back_pressure, PURPOSE_SYNC_CONSUMER, PURPOSE_SYNC_PRODUCER, +}; use crate::infrastructure::stream::kafka::models::KafkaConfig; use crate::metrics::{MetricEvent, Metrics}; use crate::utilities::validate_passthrough::DECIMAL_REGEX; @@ -479,12 +481,13 @@ async fn sync_kafka_to_kafka( mut cancel_rx: tokio::sync::oneshot::Receiver<()>, ) { let group_id = format!("{VERSION_SYNC_GROUP_ID}_{source_topic_name}"); - let subscriber: Arc = Arc::new(create_subscriber( + let subscriber = Arc::new(create_subscriber( &kafka_config, &group_id, &source_topic_name, + PURPOSE_SYNC_CONSUMER, )); - let producer = create_producer(kafka_config.clone()); + let producer = create_producer(kafka_config.clone(), PURPOSE_SYNC_PRODUCER); let mut queue: VecDeque = VecDeque::new(); let target_topic_name = &target_topic_name; @@ -590,10 +593,11 @@ async fn sync_kafka_to_clickhouse( ); let group_id = TABLE_SYNC_GROUP_ID.to_string(); - let subscriber: Arc = Arc::new(create_subscriber( + let subscriber = Arc::new(create_subscriber( &kafka_config, &group_id, &source_topic_name, + PURPOSE_SYNC_CONSUMER, )); let clickhouse_columns: Vec = target_table_columns diff --git a/apps/framework-cli/src/infrastructure/stream/kafka/client.rs b/apps/framework-cli/src/infrastructure/stream/kafka/client.rs index fb1cec69b5..e0a56fc3fb 100644 --- a/apps/framework-cli/src/infrastructure/stream/kafka/client.rs +++ b/apps/framework-cli/src/infrastructure/stream/kafka/client.rs @@ -12,6 +12,7 @@ use crate::infrastructure::stream::kafka::constants::{ DEFAULT_MAX_MESSAGE_BYTES, KAFKA_MAX_MESSAGE_BYTES_CONFIG_KEY, KAFKA_RETENTION_CONFIG_KEY, }; +use crate::metrics::{record_kafka_client_created, record_kafka_client_dropped}; use crate::project::Project; use rdkafka::admin::{AlterConfig, NewPartitions, ResourceSpecifier}; use rdkafka::config::RDKafkaLogLevel; @@ -26,9 +27,82 @@ use rdkafka::{ producer::FutureProducer, }; use std::collections::{HashMap, VecDeque}; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; use std::time::Duration; use tracing::{error, info, warn}; +pub const PURPOSE_INGEST_PRODUCER: &str = "ingest_producer"; +pub const PURPOSE_IDEMPOTENT_PRODUCER: &str = "idempotent_producer"; +pub const PURPOSE_SYNC_PRODUCER: &str = "sync_producer"; +pub const PURPOSE_SYNC_CONSUMER: &str = "sync_consumer"; +pub const PURPOSE_PEEK_CONSUMER: &str = "peek_consumer"; +pub const PURPOSE_MCP_SAMPLE_CONSUMER: &str = "mcp_sample_consumer"; +pub const PURPOSE_HEALTH_PROBE: &str = "health_probe"; +pub const PURPOSE_FETCH_TOPICS_CONSUMER: &str = "fetch_topics_consumer"; +pub const PURPOSE_FETCH_TOPICS_ADMIN: &str = "fetch_topics_admin"; +pub const PURPOSE_CHECK_TOPIC_SIZE_CONSUMER: &str = "check_topic_size_consumer"; +pub const PURPOSE_ADMIN_ADD_PARTITIONS: &str = "admin_add_partitions"; +pub const PURPOSE_ADMIN_UPDATE_TOPIC_CONFIG: &str = "admin_update_topic_config"; +pub const PURPOSE_ADMIN_CREATE_TOPICS: &str = "admin_create_topics"; +pub const PURPOSE_ADMIN_DELETE_TOPICS: &str = "admin_delete_topics"; +pub const PURPOSE_ADMIN_DESCRIBE_TOPIC_CONFIG: &str = "admin_describe_topic_config"; +// Option A (plan §0.6.7): function worker Kafka handles (TS/Python) are not +// rdkafka, so we approximate their presence in the gauge with one tick per +// parallel worker instance. Treat this as a proxy for "an external worker is +// alive and holding at least one Kafka socket"; actual socket count per +// worker is tracked separately on the language runtime side if/when needed. +pub const PURPOSE_FUNCTION_WORKER_ESTIMATED: &str = "function_worker_estimated"; + +struct KafkaClientTracker { + purpose: &'static str, +} + +impl KafkaClientTracker { + fn new(purpose: &'static str) -> Arc { + record_kafka_client_created(purpose); + Arc::new(KafkaClientTracker { purpose }) + } +} + +impl Drop for KafkaClientTracker { + fn drop(&mut self) { + record_kafka_client_dropped(self.purpose); + } +} + +#[derive(Clone)] +pub struct KafkaClientHandle { + inner: T, + _tracker: Arc, +} + +impl KafkaClientHandle { + pub fn wrap(inner: T, purpose: &'static str) -> Self { + KafkaClientHandle { + inner, + _tracker: KafkaClientTracker::new(purpose), + } + } + + pub fn into_inner(self) -> T { + self.inner + } +} + +impl Deref for KafkaClientHandle { + type Target = T; + fn deref(&self) -> &T { + &self.inner + } +} + +impl DerefMut for KafkaClientHandle { + fn deref_mut(&mut self) -> &mut T { + &mut self.inner + } +} + use super::constants::{ DEFAULT_RETENTION_MS, KAFKA_ACKS_CONFIG_KEY, KAFKA_AUTO_COMMIT_INTERVAL_MS_CONFIG_KEY, KAFKA_AUTO_OFFSET_RESET_CONFIG_KEY, KAFKA_BOOSTRAP_SERVERS_CONFIG_KEY, @@ -201,6 +275,7 @@ async fn add_partitions( let admin_client: AdminClient<_> = build_rdkafka_client_config(kafka_config) .create() .expect("Redpanda Admin Client creation failed"); + let admin_client = KafkaClientHandle::wrap(admin_client, PURPOSE_ADMIN_ADD_PARTITIONS); let options = AdminOptions::new().operation_timeout(Some(Duration::from_secs(5))); let new_partitions = NewPartitions::new(id, partition_count); @@ -249,6 +324,7 @@ async fn update_topic_config( let admin_client: AdminClient<_> = build_rdkafka_client_config(kafka_config) .create() .expect("Redpanda Admin Client creation failed"); + let admin_client = KafkaClientHandle::wrap(admin_client, PURPOSE_ADMIN_UPDATE_TOPIC_CONFIG); let options = AdminOptions::new().operation_timeout(Some(Duration::from_secs(5))); @@ -302,6 +378,7 @@ pub async fn create_topics( let admin_client: AdminClient<_> = build_rdkafka_client_config(config) .create() .expect("Redpanda Admin Client creation failed"); + let admin_client = KafkaClientHandle::wrap(admin_client, PURPOSE_ADMIN_CREATE_TOPICS); // Prepare the AdminOptions let options = AdminOptions::new().operation_timeout(Some(std::time::Duration::from_secs(5))); @@ -369,6 +446,7 @@ pub async fn delete_topics( let admin_client: AdminClient<_> = build_rdkafka_client_config(config) .create() .expect("Redpanda Admin Client creation failed"); + let admin_client = KafkaClientHandle::wrap(admin_client, PURPOSE_ADMIN_DELETE_TOPICS); // Prepare the AdminOptions let options = AdminOptions::new().operation_timeout(Some(std::time::Duration::from_secs(5))); @@ -418,6 +496,7 @@ pub async fn describe_topic_config( let admin_client: AdminClient<_> = build_rdkafka_client_config(config) .create() .expect("Redpanda Admin Client creation failed"); + let admin_client = KafkaClientHandle::wrap(admin_client, PURPOSE_ADMIN_DESCRIBE_TOPIC_CONFIG); let options = AdminOptions::new().operation_timeout(Some(std::time::Duration::from_secs(5))); @@ -464,7 +543,7 @@ pub async fn describe_topic_config( /// /// # Panics /// * Panics if the producer creation fails -pub fn create_idempotent_producer(config: &KafkaConfig) -> FutureProducer { +pub fn create_idempotent_producer(config: &KafkaConfig) -> KafkaClientHandle { let mut client_config = build_rdkafka_client_config(config); client_config @@ -475,7 +554,8 @@ pub fn create_idempotent_producer(config: &KafkaConfig) -> FutureProducer { .set(KAFKA_ENABLE_IDEMPOTENCE_CONFIG_KEY, true.to_string()) .set(KAFKA_ACKS_CONFIG_KEY, "all") .set(KAFKA_ENABLE_GAPLESS_GUARANTEE_CONFIG_KEY, true.to_string()); - client_config.create().expect("Failed to create producer") + let producer: FutureProducer = client_config.create().expect("Failed to create producer"); + KafkaClientHandle::wrap(producer, PURPOSE_IDEMPOTENT_PRODUCER) } /// Creates a standard producer with custom message timeout and at-least-once delivery guarantees. @@ -488,7 +568,7 @@ pub fn create_idempotent_producer(config: &KafkaConfig) -> FutureProducer { /// /// # Panics /// * Panics if the producer creation fails -pub fn create_producer(config: KafkaConfig) -> ConfiguredProducer { +pub fn create_producer(config: KafkaConfig, purpose: &'static str) -> ConfiguredProducer { let mut client_config = build_rdkafka_client_config(&config); client_config.set( @@ -503,7 +583,8 @@ pub fn create_producer(config: KafkaConfig) -> ConfiguredProducer { // This is the maximum number of retries that will be made before the timeout. client_config.set(KAFKA_RETRIES_CONFIG_KEY, "2147483647"); - let producer = client_config.create().expect("Failed to create producer"); + let producer: FutureProducer = client_config.create().expect("Failed to create producer"); + let producer = KafkaClientHandle::wrap(producer, purpose); ConfiguredProducer { producer, config } } @@ -525,7 +606,8 @@ pub fn create_producer(config: KafkaConfig) -> ConfiguredProducer { /// * Returns error if metadata fetch fails /// * Returns error if watermark fetch fails for any partition pub async fn check_topic_size(topic: &str, config: &KafkaConfig) -> Result { - let client: StreamConsumer<_> = build_rdkafka_client_config(config).create()?; + let client: StreamConsumer = build_rdkafka_client_config(config).create()?; + let client = KafkaClientHandle::wrap(client, PURPOSE_CHECK_TOPIC_SIZE_CONSUMER); let timeout = Duration::from_secs(1); let md = client.fetch_metadata(Some(topic), timeout)?; let partitions = match md.topics().iter().find(|t| t.name() == topic) { @@ -566,7 +648,9 @@ pub async fn fetch_topics( ) -> Result, rdkafka::error::KafkaError> { let rdkafka_config = build_rdkafka_client_config(config); let client: BaseConsumer = rdkafka_config.create()?; + let client = KafkaClientHandle::wrap(client, PURPOSE_FETCH_TOPICS_CONSUMER); let admin_client: AdminClient<_> = rdkafka_config.create()?; + let admin_client = KafkaClientHandle::wrap(admin_client, PURPOSE_FETCH_TOPICS_ADMIN); let mut topics: Vec = Vec::new(); let options = AdminOptions::new().operation_timeout(Some(std::time::Duration::from_secs(5))); @@ -617,13 +701,18 @@ pub async fn fetch_topics( /// /// # Panics /// * Panics if the consumer creation fails -pub fn create_consumer(config: &KafkaConfig, extra_config: &[(&str, &str)]) -> StreamConsumer { +pub fn create_consumer( + config: &KafkaConfig, + extra_config: &[(&str, &str)], + purpose: &'static str, +) -> KafkaClientHandle { let mut client_config = build_rdkafka_client_config(config); extra_config.iter().for_each(|(k, v)| { client_config.set(*k, *v); }); - client_config.create().expect("Failed to create consumer") + let consumer: StreamConsumer = client_config.create().expect("Failed to create consumer"); + KafkaClientHandle::wrap(consumer, purpose) } /// Creates a subscriber (consumer) for a specific topic with standard configuration. @@ -642,7 +731,12 @@ pub fn create_consumer(config: &KafkaConfig, extra_config: &[(&str, &str)]) -> S /// # Panics /// * Panics if the consumer creation fails /// * Panics if the subscription fails -pub fn create_subscriber(config: &KafkaConfig, group_id: &str, topic: &str) -> StreamConsumer { +pub fn create_subscriber( + config: &KafkaConfig, + group_id: &str, + topic: &str, + purpose: &'static str, +) -> KafkaClientHandle { let group_id = config.prefix_with_namespace(group_id); let consumer = create_consumer( config, @@ -658,6 +752,7 @@ pub fn create_subscriber(config: &KafkaConfig, group_id: &str, topic: &str) -> S (KAFKA_GROUP_ID_CONFIG_KEY, &group_id), (KAFKA_ISOLATION_LEVEL_CONFIG_KEY, "read_committed"), ], + purpose, ); let topics = [topic]; @@ -785,6 +880,7 @@ pub async fn send_with_back_pressure( pub async fn health_check(config: &KafkaConfig) -> Result { let client_config = build_rdkafka_client_config(config); let client: BaseConsumer = client_config.create()?; + let client = KafkaClientHandle::wrap(client, PURPOSE_HEALTH_PROBE); // Simple client.metadata() call without iterating over topics match client.fetch_metadata(None, Duration::from_secs(2)) { diff --git a/apps/framework-cli/src/infrastructure/stream/kafka/models.rs b/apps/framework-cli/src/infrastructure/stream/kafka/models.rs index 1b3a9f5b09..1835ecced8 100644 --- a/apps/framework-cli/src/infrastructure/stream/kafka/models.rs +++ b/apps/framework-cli/src/infrastructure/stream/kafka/models.rs @@ -1,4 +1,6 @@ use rdkafka::{metadata::MetadataTopic, producer::FutureProducer}; + +use super::client::KafkaClientHandle; use serde::{Deserialize, Serialize}; use crate::framework::{core::infrastructure::topic::Topic, versions::Version}; @@ -321,8 +323,9 @@ impl Default for KafkaConfig { /// for convenience in passing both together. #[derive(Clone)] pub struct ConfiguredProducer { - /// The underlying Redpanda/Kafka producer - pub producer: FutureProducer, + /// The underlying Redpanda/Kafka producer, wrapped in a tracking handle so + /// live client counts are visible via the `moose_kafka_client_gauge` metric. + pub producer: KafkaClientHandle, /// The configuration used to create this producer pub config: KafkaConfig, } diff --git a/apps/framework-cli/src/mcp/tools/sample_stream.rs b/apps/framework-cli/src/mcp/tools/sample_stream.rs index d78002d6b2..50eea80f30 100644 --- a/apps/framework-cli/src/mcp/tools/sample_stream.rs +++ b/apps/framework-cli/src/mcp/tools/sample_stream.rs @@ -16,7 +16,7 @@ use tracing::info; use super::{create_error_result, create_success_result}; use crate::framework::core::infrastructure_map::InfrastructureMap; use crate::infrastructure::redis::redis_client::RedisClient; -use crate::infrastructure::stream::kafka::client::create_consumer; +use crate::infrastructure::stream::kafka::client::{create_consumer, PURPOSE_MCP_SAMPLE_CONSUMER}; use crate::infrastructure::stream::kafka::models::KafkaConfig; use toon_format::{encode, types::KeyFoldingMode, EncodeOptions, ToonError}; @@ -320,7 +320,11 @@ async fn execute_get_stream_sample( // Create consumer with unique group ID for sampling let group_id = kafka_config.prefix_with_namespace(&format!("mcp_sample_{}", uuid::Uuid::new_v4())); - let consumer = create_consumer(&kafka_config, &[("group.id", &group_id)]); + let consumer = create_consumer( + &kafka_config, + &[("group.id", &group_id)], + PURPOSE_MCP_SAMPLE_CONSUMER, + ); // Build topic partition list with tail offset for getting last N messages let topic_partition_map = build_partition_map(&topic.id(), topic.partition_count, params.limit); diff --git a/apps/framework-cli/src/metrics.rs b/apps/framework-cli/src/metrics.rs index 3c7beb26a1..92511a383b 100644 --- a/apps/framework-cli/src/metrics.rs +++ b/apps/framework-cli/src/metrics.rs @@ -8,7 +8,7 @@ use prometheus_client::{ }; use serde::Deserialize; use serde_json::Value; -use std::sync::Arc; +use std::sync::{Arc, OnceLock, Weak}; use std::time::Duration; use tokio::sync::Mutex; @@ -31,6 +31,12 @@ pub const STREAMING_FUNCTION_EVENT_OUPUT_COUNT: &str = "moose_streaming_functions_events_output_count"; pub const STREAMING_FUNCTION_PROCESSED_BYTE_COUNT: &str = "moose_streaming_functions_processed_byte_count"; +pub const KAFKA_CLIENTS_GAUGE: &str = "moose_kafka_client_gauge"; +// Counter name constants deliberately exclude the `_total` suffix; the +// OpenMetrics encoder in `prometheus_client` appends it automatically when +// serialising. Scraped series end up as `..._total`, matching dashboards. +pub const FUNCTION_WORKER_RESTARTS_TOTAL: &str = "moose_function_worker_restarts"; +pub const FUNCTION_PROCESS_DIFF_UPDATED_TOTAL: &str = "moose_function_process_diff_updated"; #[derive(Debug, thiserror::Error)] #[non_exhaustive] @@ -74,6 +80,18 @@ pub enum MetricEvent { consumer_group: String, topic_name: String, }, + FunctionWorkerRestart { + reason: String, + }, + FunctionProcessDiffUpdated { + diff_reason: String, + }, + KafkaClientCreated { + purpose: String, + }, + KafkaClientDropped { + purpose: String, + }, } #[derive(Clone)] @@ -120,6 +138,9 @@ pub struct Statistics { pub streaming_functions_in_event_total_count: Counter, pub streaming_functions_out_event_total_count: Counter, pub streaming_functions_processed_bytes_total_count: Counter, + pub kafka_clients: Family, + pub function_worker_restarts_total: Family, + pub function_process_diff_updated_total: Family, } #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] @@ -146,6 +167,21 @@ pub struct MessagesOutCounterLabels { topic_name: String, } +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct KafkaClientLabels { + pub purpose: String, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct FunctionWorkerRestartLabels { + pub reason: String, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct FunctionProcessDiffLabels { + pub reason: String, +} + impl Metrics { pub fn new( telemetry_metadata: TelemetryMetadata, @@ -181,6 +217,32 @@ impl Metrics { let _ = self.tx_events.send(data).await; } + pub fn try_send_metric_event(&self, data: MetricEvent) { + let _ = self.tx_events.try_send(data); + } + + pub fn kafka_client_created(&self, purpose: &'static str) { + self.try_send_metric_event(MetricEvent::KafkaClientCreated { + purpose: purpose.to_string(), + }); + } + + pub fn kafka_client_dropped(&self, purpose: &'static str) { + self.try_send_metric_event(MetricEvent::KafkaClientDropped { + purpose: purpose.to_string(), + }); + } + + pub fn function_worker_restart(&self, reason: String) { + self.try_send_metric_event(MetricEvent::FunctionWorkerRestart { reason }); + } + + pub fn function_process_diff_updated(&self, reason: &'static str) { + self.try_send_metric_event(MetricEvent::FunctionProcessDiffUpdated { + diff_reason: reason.to_string(), + }); + } + pub async fn get_metrics_registry_as_string(&self) -> String { let registry = self.registry.lock().await; formatted_registry(®istry) @@ -241,6 +303,13 @@ impl Metrics { >::new_with_constructor( Counter::default ), + kafka_clients: Family::::new_with_constructor(Gauge::default), + function_worker_restarts_total: + Family::::new_with_constructor( + Counter::default, + ), + function_process_diff_updated_total: + Family::::new_with_constructor(Counter::default), }); let mut registry = self.registry.lock().await; @@ -299,6 +368,22 @@ impl Metrics { data.streaming_functions_processed_bytes_count.clone(), ); + registry.register( + KAFKA_CLIENTS_GAUGE, + "Number of live rdkafka client handles held by this process, by purpose", + data.kafka_clients.clone(), + ); + registry.register( + FUNCTION_WORKER_RESTARTS_TOTAL, + "Count of streaming-function worker restarts, bucketed by reason", + data.function_worker_restarts_total.clone(), + ); + registry.register( + FUNCTION_PROCESS_DIFF_UPDATED_TOTAL, + "Count of FunctionProcess diff entries that emit a Change::Updated, bucketed by reason", + data.function_process_diff_updated_total.clone(), + ); + let metrics_inserter = self.metrics_inserter.clone(); let export_metrics = self.telemetry_metadata.export_metrics; @@ -433,6 +518,28 @@ impl Metrics { data.streaming_functions_processed_bytes_total_count .inc_by(bytes); } + MetricEvent::KafkaClientCreated { purpose } => { + data.kafka_clients + .get_or_create(&KafkaClientLabels { purpose }) + .inc(); + } + MetricEvent::KafkaClientDropped { purpose } => { + data.kafka_clients + .get_or_create(&KafkaClientLabels { purpose }) + .dec(); + } + MetricEvent::FunctionWorkerRestart { reason } => { + data.function_worker_restarts_total + .get_or_create(&FunctionWorkerRestartLabels { reason }) + .inc(); + } + MetricEvent::FunctionProcessDiffUpdated { diff_reason } => { + data.function_process_diff_updated_total + .get_or_create(&FunctionProcessDiffLabels { + reason: diff_reason, + }) + .inc(); + } }; trace!("Updated metrics: {:?}", data); @@ -449,3 +556,141 @@ fn formatted_registry(data: &Registry) -> String { let _ = encode(&mut buffer, data); buffer } + +static GLOBAL_METRICS: OnceLock> = OnceLock::new(); + +pub const KAFKA_METRICS_DISABLED_ENV: &str = "MOOSE_KAFKA_CLIENT_METRICS_DISABLED"; + +pub fn set_global_metrics_handle(metrics: &Arc) { + let weak = Arc::downgrade(metrics); + if GLOBAL_METRICS.set(weak).is_err() { + tracing::warn!("set_global_metrics_handle called more than once; ignoring subsequent call"); + } +} + +fn with_global_metrics(f: F) +where + F: FnOnce(&Metrics), +{ + if let Some(weak) = GLOBAL_METRICS.get() { + if let Some(metrics) = weak.upgrade() { + f(&metrics); + } + } +} + +pub fn kafka_client_tracking_enabled() -> bool { + std::env::var(KAFKA_METRICS_DISABLED_ENV).is_err() +} + +pub fn record_kafka_client_created(purpose: &'static str) { + if !kafka_client_tracking_enabled() { + return; + } + with_global_metrics(|m| m.kafka_client_created(purpose)); +} + +pub fn record_kafka_client_dropped(purpose: &'static str) { + if !kafka_client_tracking_enabled() { + return; + } + with_global_metrics(|m| m.kafka_client_dropped(purpose)); +} + +pub fn record_function_worker_restart(reason: String) { + with_global_metrics(|m| m.function_worker_restart(reason)); +} + +pub fn record_function_process_diff_updated(reason: &'static str) { + with_global_metrics(|m| m.function_process_diff_updated(reason)); +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_metrics() -> (Arc, tokio::sync::mpsc::Receiver) { + let (metrics, rx) = Metrics::new( + TelemetryMetadata { + machine_id: "test".to_string(), + is_moose_developer: false, + metric_labels: None, + metric_endpoints: None, + is_production: false, + project_name: "test".to_string(), + export_metrics: false, + }, + None, + ); + (Arc::new(metrics), rx) + } + + async fn drain_and_scrape(metrics: &Arc) -> String { + tokio::time::sleep(Duration::from_millis(50)).await; + metrics.get_metrics_registry_as_string().await + } + + #[tokio::test] + async fn new_kafka_client_event_updates_gauge() { + let (metrics, rx) = test_metrics(); + metrics.start_listening_to_metrics(rx).await; + + metrics.kafka_client_created("unit_test_purpose"); + metrics.kafka_client_created("unit_test_purpose"); + metrics.kafka_client_dropped("unit_test_purpose"); + + let scraped = drain_and_scrape(&metrics).await; + assert!( + scraped.contains("moose_kafka_client_gauge"), + "expected gauge metric in registry output, got:\n{scraped}" + ); + assert!( + scraped.contains(r#"moose_kafka_client_gauge{purpose="unit_test_purpose"} 1"#), + "expected gauge value of 1 after 2 creates and 1 drop, got:\n{scraped}" + ); + } + + #[tokio::test] + async fn new_worker_restart_event_updates_counter() { + let (metrics, rx) = test_metrics(); + metrics.start_listening_to_metrics(rx).await; + + metrics.function_worker_restart("unit_test_reason".to_string()); + + let scraped = drain_and_scrape(&metrics).await; + assert!( + scraped + .contains(r#"moose_function_worker_restarts_total{reason="unit_test_reason"} 1"#), + "expected worker restart counter, got:\n{scraped}" + ); + } + + #[tokio::test] + async fn new_diff_updated_event_updates_counter() { + let (metrics, rx) = test_metrics(); + metrics.start_listening_to_metrics(rx).await; + + metrics.function_process_diff_updated("forced_always"); + metrics.function_process_diff_updated("forced_always"); + + let scraped = drain_and_scrape(&metrics).await; + assert!( + scraped + .contains(r#"moose_function_process_diff_updated_total{reason="forced_always"} 2"#), + "expected diff counter of 2, got:\n{scraped}" + ); + } + + #[test] + fn tracking_disabled_env_suppresses_events() { + let prev = std::env::var(KAFKA_METRICS_DISABLED_ENV).ok(); + std::env::set_var(KAFKA_METRICS_DISABLED_ENV, "1"); + assert!(!kafka_client_tracking_enabled()); + record_kafka_client_created("should_not_panic"); + record_kafka_client_dropped("should_not_panic"); + match prev { + Some(v) => std::env::set_var(KAFKA_METRICS_DISABLED_ENV, v), + None => std::env::remove_var(KAFKA_METRICS_DISABLED_ENV), + } + } +} diff --git a/apps/framework-cli/src/metrics_inserter.rs b/apps/framework-cli/src/metrics_inserter.rs index f2b6b7d55a..45b3b21702 100644 --- a/apps/framework-cli/src/metrics_inserter.rs +++ b/apps/framework-cli/src/metrics_inserter.rs @@ -137,6 +137,12 @@ async fn flush( "topic_name": topic_name, }), ), + MetricEvent::FunctionWorkerRestart { .. } + | MetricEvent::FunctionProcessDiffUpdated { .. } + | MetricEvent::KafkaClientCreated { .. } + | MetricEvent::KafkaClientDropped { .. } => { + continue; + } }; let mut payload = payload.clone(); diff --git a/apps/framework-cli/src/utilities/system.rs b/apps/framework-cli/src/utilities/system.rs index 07d6fb6d56..c72433cf01 100644 --- a/apps/framework-cli/src/utilities/system.rs +++ b/apps/framework-cli/src/utilities/system.rs @@ -11,6 +11,25 @@ use tokio::task::JoinHandle; use tokio::time::{sleep, Instant}; use tracing::{debug, error, info, warn}; +use crate::metrics::record_function_worker_restart; + +fn classify_rust_child_exit( + exit_status: &std::io::Result, +) -> &'static str { + match exit_status { + Ok(status) => { + if status.success() { + "rust_child_exit_ok" + } else if status.code().is_some() { + "rust_child_exit_err_code" + } else { + "rust_child_exit_signal" + } + } + Err(_) => "rust_child_wait_err", + } +} + #[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum KillProcessError { @@ -153,6 +172,7 @@ impl RestartingProcess { } exit_status_result = child.wait() => { let process_runtime = process_start_time.elapsed(); + let exit_reason = classify_rust_child_exit(&exit_status_result); let should_restart = match exit_status_result { Ok(exit_status) => { if exit_status.success() { @@ -182,6 +202,8 @@ impl RestartingProcess { break 'monitor; } + record_function_worker_restart(exit_reason.to_string()); + // Set initial delay based on whether the previous process ran long enough if process_runtime >= MIN_RUNTIME_FOR_RESET { debug!("Previous process ran for {:?}, resetting backoff delay", process_runtime); @@ -317,4 +339,25 @@ mod tests { "expected exactly 5 spawn attempts before the breaker trips, got {total}", ); } + + #[test] + fn classify_rust_child_exit_reasons() { + use std::os::unix::process::ExitStatusExt; + use std::process::ExitStatus; + + let ok: std::io::Result = Ok(ExitStatus::from_raw(0)); + assert_eq!(classify_rust_child_exit(&ok), "rust_child_exit_ok"); + + let err_code: std::io::Result = Ok(ExitStatus::from_raw(256)); + assert_eq!( + classify_rust_child_exit(&err_code), + "rust_child_exit_err_code" + ); + + let signal: std::io::Result = Ok(ExitStatus::from_raw(9)); + assert_eq!(classify_rust_child_exit(&signal), "rust_child_exit_signal"); + + let wait_err: std::io::Result = Err(std::io::Error::other("wait failure")); + assert_eq!(classify_rust_child_exit(&wait_err), "rust_child_wait_err"); + } } diff --git a/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py b/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py index 0bd1f66562..48b1a94096 100644 --- a/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py +++ b/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py @@ -14,6 +14,7 @@ """ import argparse +import atexit import dataclasses import traceback from datetime import datetime, timezone @@ -55,6 +56,27 @@ PARTITION_ASSIGNMENT_POLL_INTERVAL_SECONDS = 0.1 +_worker_exit_reason: dict[str, str] = {"reason": "py_worker_exit_code_0"} + + +def _set_worker_exit_reason(reason: str) -> None: + _worker_exit_reason["reason"] = reason + + +def _report_worker_exit() -> None: + try: + requests.post( + f"http://localhost:{moose_management_port}/metrics-logs", + json={"reason": _worker_exit_reason["reason"]}, + timeout=0.5, + ) + except Exception: + pass + + +atexit.register(_report_worker_exit) + + @dataclasses.dataclass class KafkaTopicConfig: """ @@ -673,6 +695,11 @@ def process_messages(): def shutdown(signum, frame): """Handle shutdown signals gracefully""" log("Received shutdown signal, cleaning up...") + try: + name = signal.Signals(signum).name + except Exception: + name = f"signal_{signum}" + _set_worker_exit_reason(f"py_worker_killed_by_{name}") running.clear() # This will trigger the main loop to exit # Set up signal handlers @@ -739,6 +766,9 @@ def shutdown(signum, frame): log(f"Error closing producer: {e}") exit_code = 1 if fatal_error.is_set() else 0 + _set_worker_exit_reason( + "py_worker_exit_code_nonzero" if exit_code != 0 else "py_worker_exit_code_0" + ) log(f"Shutdown complete with exit code {exit_code}") sys.exit(exit_code) diff --git a/packages/ts-moose-lib/src/cluster-utils.ts b/packages/ts-moose-lib/src/cluster-utils.ts index 5d5d0f185f..885bce3321 100644 --- a/packages/ts-moose-lib/src/cluster-utils.ts +++ b/packages/ts-moose-lib/src/cluster-utils.ts @@ -1,4 +1,5 @@ import cluster from "node:cluster"; +import http from "node:http"; import { availableParallelism } from "node:os"; import { exit } from "node:process"; import { Worker } from "node:cluster"; @@ -12,6 +13,59 @@ const SIGTERM = "SIGTERM"; const SIGINT = "SIGINT"; const SHUTDOWN_WORKERS_INTERVAL = 500; +const METRICS_LOGS_PATH = "/metrics-logs"; +const KNOWN_TERMINATION_SIGNALS: ReadonlyArray = [ + "SIGTERM", + "SIGKILL", + "SIGABRT", + "SIGINT", +]; + +function classifyWorkerExit( + code: number | null, + signal: NodeJS.Signals | string | null, +): string { + if (signal) { + const normalized = String(signal); + if (KNOWN_TERMINATION_SIGNALS.includes(normalized)) { + return `ts_worker_killed_by_signal_${normalized}`; + } + return "ts_worker_killed_other"; + } + if (code === null) { + return "ts_worker_exit_unknown"; + } + if (code === 0) { + return "ts_worker_exit_code_0"; + } + return "ts_worker_exit_code_nonzero"; +} + +function reportWorkerExit( + code: number | null, + signal: NodeJS.Signals | string | null, +): void { + const reason = classifyWorkerExit(code, signal); + const managementPort = parseInt( + process.env.MOOSE_MANAGEMENT_PORT ?? "5001", + 10, + ); + const payload = JSON.stringify({ reason }); + const req = http.request({ + host: "127.0.0.1", + port: managementPort, + method: "POST", + path: METRICS_LOGS_PATH, + headers: { + "content-type": "application/json", + "content-length": Buffer.byteLength(payload), + }, + }); + req.on("error", () => {}); + req.write(payload); + req.end(); +} + /** * Manages a cluster of worker processes, handling their lifecycle including startup, * shutdown, and error handling. @@ -145,6 +199,10 @@ export class Cluster { `worker ${worker.process.pid} exited with code ${code} and signal ${signal}`, ); + try { + reportWorkerExit(code, signal); + } catch {} + if (!this.shutdownInProgress) { setTimeout(() => cluster.fork(), RESTART_TIME_MS); }