diff --git a/Cargo.lock b/Cargo.lock index 8ec9632..48bbb93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2910,6 +2910,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.4.0", + "indexmap 2.12.1", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -3105,7 +3124,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.27", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -3129,6 +3148,7 @@ dependencies = [ "bytes", "futures-channel", "futures-core", + "h2 0.4.13", "http 1.4.0", "http-body 1.0.1", "httparse", @@ -3187,6 +3207,19 @@ dependencies = [ "webpki-roots 1.0.4", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.8.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.19" @@ -3873,6 +3906,11 @@ dependencies = [ "mockall", "nilauth-client", "nillion-nucs", + "opentelemetry", + "opentelemetry-appender-tracing", + "opentelemetry-otlp", + "opentelemetry-resource-detectors", + "opentelemetry_sdk", "procfs", "rand 0.8.5", "reqwest 0.12.25", @@ -3890,6 +3928,7 @@ dependencies = [ "tokio", "tower-http", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "utoipa", "utoipa-axum", @@ -4088,6 +4127,95 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.17", + "tracing", +] + +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef6a1ac5ca3accf562b8c306fa8483c85f4390f768185ab775f242f7fe8fdcc2" +dependencies = [ + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" +dependencies = [ + "http 1.4.0", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "thiserror 2.0.17", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", + "tonic-prost", +] + +[[package]] +name = "opentelemetry-resource-detectors" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e82845106cf72d47c141cee7f0d95e0650d8f28c6222a1f1ae727a8883899c19" +dependencies = [ + "opentelemetry", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" + +[[package]] +name = "opentelemetry_sdk" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "rand 0.9.2", + "thiserror 2.0.17", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -4551,6 +4679,29 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -4881,7 +5032,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.27", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.32", @@ -6653,6 +6804,43 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" +[[package]] +name = "tonic" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.8.1", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "sync_wrapper 1.0.2", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost", + "tonic", +] + [[package]] name = "tower" version = "0.5.2" @@ -6661,9 +6849,12 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", + "indexmap 2.12.1", "pin-project-lite", + "slab", "sync_wrapper 1.0.2", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -6729,6 +6920,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c" dependencies = [ "once_cell", + "valuable", ] [[package]] @@ -6741,6 +6933,33 @@ dependencies = [ "tracing", ] +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac28f2d093c6c477eaa76b23525478f38de514fa9aeb1285738d4b97a9552fc" +dependencies = [ + "js-sys", + "opentelemetry", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-subscriber" version = "0.3.22" diff --git a/Cargo.toml b/Cargo.toml index cb3ad2e..7603cba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,12 @@ sqlx = { version = "0.8", features = ["postgres", "runtime-tokio", "chrono"] } thiserror = "2" tracing = "0.1" tracing-subscriber = { version = "0.3", default-features = false, features = ["env-filter", "fmt"] } +opentelemetry = { version = "0.31", default-features = false, features = ["logs", "trace", "metrics"] } +opentelemetry_sdk = { version = "0.31", default-features = false, features = ["rt-tokio", "logs", "trace", "metrics"] } +opentelemetry-otlp = { version = "0.31", default-features = false, features = ["grpc-tonic", "logs", "trace", "metrics"] } +opentelemetry-appender-tracing = "0.31" +opentelemetry-resource-detectors = "0.10" +tracing-opentelemetry = "0.32" tokio = { version = "1.44", features = ["rt-multi-thread", "macros", "time", "signal"] } tower-http = { version = "0.6", features = ["cors"] } utoipa = { version = "5.3.1", features = ["chrono", "axum_extras"] } diff --git a/config.sample.yaml b/config.sample.yaml index 7107ec6..f081562 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -4,9 +4,39 @@ server: private_key: hex: 7c5c8d3114ac2410249ca2baae7dec86ac2950a389ac44a5fdca8941b92b6c86 +# Prometheus metrics endpoint configuration. metrics: bind_endpoint: 127.0.0.1:30922 +# OpenTelemetry configuration. +# +# Standard OTEL environment variables are supported: +# OTEL_SDK_DISABLED=true - Disable OTEL SDK, use only fmt logging +# OTEL_EXPORTER_OTLP_ENDPOINT - Global OTLP gRPC endpoint URL (default: http://localhost:4317) +# OTEL_SERVICE_NAME - Service name for telemetry (default: nilauth) +# OTEL_RESOURCE_ATTRIBUTES - Resource attributes as key=value,key=value +# Example: team.name=myteam,deployment.environment.name=prod +# +# NOTE: Only gRPC transport (port 4317) is supported. +# +# otel: +# enabled: true # default: false +# endpoint: "http://localhost:4317" # OTLP gRPC endpoint +# service_name: nilauth +# resource_attributes: +# service.instance.id: nilauth-001 +# export_timeout: 30 +# logs: +# enabled: true # default: true (when otel.enabled is true) +# endpoint: "http://localhost:4317" # optional, overrides global endpoint +# traces: +# enabled: true # default: true (when otel.enabled is true) +# endpoint: "http://localhost:4317" # optional, overrides global endpoint +# metrics: +# enabled: true # default: true - set to false to keep using Prometheus metrics +# endpoint: "http://localhost:4317" # optional, overrides global endpoint +# export_interval: 15 # seconds between metric exports (default: 15) + payments: # Ethereum RPC endpoint (Anvil for local development) ethereum_rpc_url: http://127.0.0.1:8545 diff --git a/docker-compose.yml b/docker-compose.yml index d5cd512..7782de2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,3 +24,38 @@ services: - "30923:80" command: | caddy respond --listen :80 --body '{"nillion":{"usd":1.0}}' --header "Content-Type: application/json" + + otel-collector: + image: otel/opentelemetry-collector:latest + command: ["--config=env:OTEL_CONFIG"] + environment: + OTEL_CONFIG: | + receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + processors: + batch: + exporters: + debug: + verbosity: detailed + service: + telemetry: + logs: + level: info + pipelines: + logs: + receivers: [otlp] + processors: [batch] + exporters: [debug] + traces: + receivers: [otlp] + processors: [batch] + exporters: [debug] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [debug] + ports: + - "4317:4317" diff --git a/src/cleanup.rs b/src/cleanup.rs index 896e8d1..f46d17b 100644 --- a/src/cleanup.rs +++ b/src/cleanup.rs @@ -1,5 +1,4 @@ -use crate::{db::revocations::RevocationDb, time::TimeService}; -use metrics::counter; +use crate::{db::revocations::RevocationDb, metrics, time::TimeService}; use std::{sync::Arc, time::Duration}; use tokio::time::sleep; use tracing::{error, info}; @@ -38,7 +37,7 @@ impl RevokedTokenCleaner { let expired_count = self.db.delete_expired(cleanup_threshold).await?; info!("Deleted {expired_count} expired revoked tokens"); - counter!("expired_revoked_tokens_removed_total").increment(expired_count); + metrics::record_expired_tokens_removed(expired_count); Ok(()) } } diff --git a/src/config.rs b/src/config.rs index 655a9f7..bfd8a86 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,7 +2,7 @@ use anyhow::Context; use rust_decimal::Decimal; use serde::Deserialize; use serde_with::serde_as; -use std::{fs, net::SocketAddr, path::PathBuf, time::Duration}; +use std::{collections::HashMap, fs, net::SocketAddr, path::PathBuf, time::Duration}; /// The configuration for the authority service. #[derive(Clone, Deserialize)] @@ -13,9 +13,13 @@ pub struct Config { /// The private key pub private_key: PrivateKeyConfig, - /// Configuration for metrics. + /// Configuration for Prometheus metrics endpoint. pub metrics: MetricsConfig, + /// OpenTelemetry configuration. + #[serde(default)] + pub otel: OtelConfig, + /// The payments configuration. pub payments: PaymentsConfig, @@ -77,13 +81,141 @@ impl PrivateKeyConfig { } } -/// The configuration for metrics. +/// The configuration for Prometheus metrics endpoint. #[derive(Clone, Deserialize)] pub struct MetricsConfig { - /// The address to bind to. + /// The address to bind the Prometheus metrics endpoint to. pub bind_endpoint: SocketAddr, } +/// OpenTelemetry configuration. +/// +/// Resource attributes like `team.name` and `deployment.environment.name` can be set +/// via the standard `OTEL_RESOURCE_ATTRIBUTES` environment variable: +/// ```bash +/// OTEL_RESOURCE_ATTRIBUTES=team.name=myteam,deployment.environment.name=production +/// ``` +#[serde_as] +#[derive(Clone, Deserialize)] +pub struct OtelConfig { + /// Whether OpenTelemetry is enabled. + #[serde(default)] + pub enabled: bool, + + /// The global OTLP gRPC endpoint URL (e.g., "http://localhost:4317"). + /// Can be overridden per-signal (e.g., `logs.endpoint`) or via + /// `OTEL_EXPORTER_OTLP_ENDPOINT` environment variable. + #[serde(default = "default_otlp_endpoint")] + pub endpoint: String, + + /// The service name for OTEL resource attributes. + /// Can also be set via `OTEL_SERVICE_NAME` environment variable. + #[serde(default = "default_service_name")] + pub service_name: String, + + /// Additional resource attributes as key-value pairs. + /// These are merged with attributes from `OTEL_RESOURCE_ATTRIBUTES`. + #[serde(default)] + pub resource_attributes: HashMap, + + /// The batch export timeout. + #[serde_as(as = "serde_with::DurationSeconds")] + #[serde(default = "default_export_timeout")] + pub export_timeout: Duration, + + /// Log export configuration. + #[serde(default)] + pub logs: OtelLogsConfig, + + /// Trace export configuration. + #[serde(default)] + pub traces: OtelTracesConfig, + + /// Metrics export configuration. + #[serde(default)] + pub metrics: OtelMetricsConfig, +} + +impl Default for OtelConfig { + fn default() -> Self { + Self { + enabled: false, + endpoint: default_otlp_endpoint(), + service_name: default_service_name(), + resource_attributes: HashMap::new(), + export_timeout: default_export_timeout(), + logs: OtelLogsConfig::default(), + traces: OtelTracesConfig::default(), + metrics: OtelMetricsConfig::default(), + } + } +} + +/// OpenTelemetry logs export configuration. +#[derive(Clone, Deserialize)] +pub struct OtelLogsConfig { + /// Whether log export is enabled. + #[serde(default = "default_true")] + pub enabled: bool, + + /// Optional endpoint override for log export. + /// If not set, uses the global `otel.endpoint`. + #[serde(default)] + pub endpoint: Option, +} + +impl Default for OtelLogsConfig { + fn default() -> Self { + Self { enabled: true, endpoint: None } + } +} + +/// OpenTelemetry traces export configuration. +#[derive(Clone, Deserialize)] +pub struct OtelTracesConfig { + /// Whether trace export is enabled. + #[serde(default = "default_true")] + pub enabled: bool, + + /// Optional endpoint override for trace export. + /// If not set, uses the global `otel.endpoint`. + #[serde(default)] + pub endpoint: Option, +} + +impl Default for OtelTracesConfig { + fn default() -> Self { + Self { enabled: true, endpoint: None } + } +} + +/// OpenTelemetry metrics export configuration. +#[serde_as] +#[derive(Clone, Deserialize)] +pub struct OtelMetricsConfig { + /// Whether metrics export is enabled + /// When enabled, OTEL metrics are used instead of Prometheus metrics. + /// Set to `false` to keep using Prometheus metrics while still using OTEL for logs/traces. + #[serde(default = "default_true")] + pub enabled: bool, + + /// Optional endpoint override for metrics export. + /// If not set, uses the global `otel.endpoint`. + #[serde(default)] + pub endpoint: Option, + + /// The interval at which metrics are exported. + #[serde_as(as = "serde_with::DurationSeconds")] + #[serde(default = "default_metrics_export_interval")] + pub export_interval: Duration, +} + +impl Default for OtelMetricsConfig { + fn default() -> Self { + Self { enabled: true, endpoint: None, export_interval: default_metrics_export_interval() } + } +} + /// The payments configuration. #[derive(Clone, Deserialize)] pub struct PaymentsConfig { @@ -182,3 +314,71 @@ fn default_slippage() -> Decimal { // 3% Decimal::new(3, 2) } + +fn default_otlp_endpoint() -> String { + "http://localhost:4317".into() +} + +fn default_service_name() -> String { + "nilauth".into() +} + +fn default_true() -> bool { + true +} + +fn default_export_timeout() -> Duration { + Duration::from_secs(30) +} + +fn default_metrics_export_interval() -> Duration { + Duration::from_secs(15) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn otel_config_defaults() { + // Verify OtelConfig::default() matches serde defaults + let config = OtelConfig::default(); + + assert!(!config.enabled); + assert_eq!(config.endpoint, "http://localhost:4317"); + assert_eq!(config.service_name, "nilauth"); + assert!(config.resource_attributes.is_empty()); + assert_eq!(config.export_timeout, Duration::from_secs(30)); + + // Sub-configs default to enabled with no endpoint override + assert!(config.logs.enabled); + assert!(config.logs.endpoint.is_none()); + assert!(config.traces.enabled); + assert!(config.traces.endpoint.is_none()); + assert!(config.metrics.enabled); + assert!(config.metrics.endpoint.is_none()); + assert_eq!(config.metrics.export_interval, Duration::from_secs(15)); + } + + #[test] + fn otel_endpoint_resolution() { + let config = OtelConfig { + enabled: true, + endpoint: "http://global:4317".to_string(), + service_name: "test".to_string(), + resource_attributes: HashMap::new(), + export_timeout: Duration::from_secs(30), + logs: OtelLogsConfig { enabled: true, endpoint: None }, + traces: OtelTracesConfig { enabled: true, endpoint: Some("http://traces-override:4317".to_string()) }, + metrics: OtelMetricsConfig::default(), + }; + + // Falls back to global when no override + let logs_endpoint = config.logs.endpoint.as_ref().unwrap_or(&config.endpoint); + assert_eq!(logs_endpoint, "http://global:4317"); + + // Uses override when set + let traces_endpoint = config.traces.endpoint.as_ref().unwrap_or(&config.endpoint); + assert_eq!(traces_endpoint, "http://traces-override:4317"); + } +} diff --git a/src/lib.rs b/src/lib.rs index d52e5a3..925602c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,10 @@ pub mod args; /// Service configuration structure and loading logic. pub mod config; +/// Unified metrics module for Prometheus and OTEL. +pub mod metrics; +/// Observability initialization (logging, metrics, tracing). +pub mod observability; /// The main application entry point and server setup logic. pub mod run; @@ -13,7 +17,7 @@ mod auth; mod cleanup; mod db; mod docs; -mod metrics; +mod process_metrics; mod routes; mod services; mod signed; diff --git a/src/main.rs b/src/main.rs index 1eb934b..0b36570 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,13 +3,12 @@ use clap::Parser; use nilauth::args::Cli; use nilauth::config::Config; +use nilauth::observability; use nilauth::run::run; use std::process::exit; #[tokio::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt().init(); - let cli = Cli::parse(); let config = match Config::load(cli.config_file.as_deref()) { Ok(config) => config, @@ -18,7 +17,24 @@ async fn main() -> anyhow::Result<()> { exit(1); } }; - if let Err(e) = run(config).await { + + // Initialize observability (metrics or otel). + // This must happen before any logging calls. + let observability_guard = match observability::init(&config) { + Ok(guard) => guard, + Err(e) => { + eprintln!("Failed to initialize observability: {e}"); + exit(1); + } + }; + + let result = run(config, &observability_guard).await; + + // Explicitly shutdown observability to flush pending traces/logs. + // Necessary because exit() doesn't run destructors. + observability_guard.shutdown(); + + if let Err(e) = result { eprintln!("Failed to run server: {e:#}"); exit(1); } else { diff --git a/src/metrics.rs b/src/metrics.rs index db9576f..af6aa57 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,90 +1,360 @@ -pub use collector::ProcessMetricsCollector; +//! Unified metrics. +//! +//! This module defines metrics once and records to both backends: +//! - **Prometheus**: Underscore naming (`nucs_minted_total`) +//! - **OTEL**: Dot-notation semantic conventions (`nilauth.nucs.minted`) +//! +//! The active backend depends on configuration: +//! - OTEL metrics enabled → OTEL records to collector, Prometheus is no-op +//! - OTEL metrics disabled → Prometheus records via installed recorder, OTEL uses no-op +//! +//! ## Usage +//! +//! ```ignore +//! use crate::metrics; +//! metrics::record_nuc_minted("NilDb"); +//! metrics::record_invalid_payment("underpaid"); +//! ``` -#[cfg(not(target_os = "linux"))] -mod collector { - use tracing::warn; +// Use explicit `::metrics` to reference the external crate, avoiding ambiguity +// with this module's name. +use ::metrics::{counter, gauge, histogram}; +use opentelemetry::metrics::{Counter, Gauge, Histogram, Meter}; +use opentelemetry::{KeyValue, global}; +use std::sync::LazyLock; - /// A stub collector for non-Linux systems. - pub struct ProcessMetricsCollector; +/// Returns the nilauth meter from the global meter provider. +fn meter() -> Meter { + global::meter("nilauth") +} - impl ProcessMetricsCollector { - /// On non-Linux systems, warns that process metrics collection is disabled. - pub fn spawn() { - warn!("Metrics collection is only supported on Linux."); - } - } +// ============================================================================= +// OTEL Instrument Definitions +// ============================================================================= + +/// Counter for invalid payment attempts. +static OTEL_PAYMENT_INVALID: LazyLock> = LazyLock::new(|| { + meter() + .u64_counter("nilauth.payment.invalid") + .with_description("Total number of invalid payment attempts") + .with_unit("{payment}") + .build() +}); + +/// Counter for valid payments processed. +static OTEL_PAYMENT_VALID: LazyLock> = LazyLock::new(|| { + meter() + .u64_counter("nilauth.payment.valid") + .with_description("Total number of valid payments processed") + .with_unit("{payment}") + .build() +}); + +/// Counter for nucs (tokens) minted. +static OTEL_NUC_MINTED: LazyLock> = LazyLock::new(|| { + meter().u64_counter("nilauth.nuc.minted").with_description("Total number of nucs minted").with_unit("{nuc}").build() +}); + +/// Counter for revoked tokens. +static OTEL_TOKEN_REVOKED: LazyLock> = LazyLock::new(|| { + meter() + .u64_counter("nilauth.token.revoked") + .with_description("Total number of tokens revoked") + .with_unit("{token}") + .build() +}); + +/// Counter for expired revoked tokens removed from the database. +static OTEL_TOKEN_EXPIRED_REMOVED: LazyLock> = LazyLock::new(|| { + meter() + .u64_counter("nilauth.token.expired.removed") + .with_description("Total number of expired revoked tokens removed") + .with_unit("{token}") + .build() +}); + +/// Counter for token price fetch errors. +static OTEL_TOKEN_PRICE_FETCH_ERROR: LazyLock> = LazyLock::new(|| { + meter() + .u64_counter("nilauth.token.price.fetch.error") + .with_description("Total number of token price fetch errors") + .with_unit("{error}") + .build() +}); + +/// Counter for token price cache hits. +static OTEL_TOKEN_PRICE_CACHE_HIT: LazyLock> = LazyLock::new(|| { + meter() + .u64_counter("nilauth.token.price.cache.hit") + .with_description("Total number of token price cache hits") + .with_unit("{hit}") + .build() +}); + +/// Gauge for current NIL token price in USD. +static OTEL_TOKEN_PRICE_USD: LazyLock> = LazyLock::new(|| { + meter() + .f64_gauge("nilauth.token.price.usd") + .with_description("Current NIL token price in USD") + .with_unit("{USD}") + .build() +}); + +/// Histogram for token price fetch duration. +static OTEL_TOKEN_PRICE_FETCH_DURATION: LazyLock> = LazyLock::new(|| { + meter() + .f64_histogram("nilauth.token.price.fetch.duration") + .with_description("Duration of token price fetch requests") + .with_unit("s") + .build() +}); + +// ============================================================================= +// Process Metrics OTEL Instruments +// ============================================================================= + +/// Counter for process CPU time in seconds. +static OTEL_PROCESS_CPU_TIME: LazyLock> = LazyLock::new(|| { + meter() + .f64_counter("process.cpu.time") + .with_description("Total CPU time consumed by the process") + .with_unit("s") + .build() +}); + +/// Gauge for process resident memory usage in bytes. +static OTEL_PROCESS_MEMORY_USAGE: LazyLock> = LazyLock::new(|| { + meter() + .f64_gauge("process.memory.usage") + .with_description("Current resident memory usage of the process") + .with_unit("By") + .build() +}); + +/// Gauge for open file descriptor count. +static OTEL_PROCESS_OPEN_FD_COUNT: LazyLock> = LazyLock::new(|| { + meter() + .i64_gauge("process.unix.file_descriptor.count") + .with_description("Number of open file descriptors") + .with_unit("{file_descriptor}") + .build() +}); + +/// Gauge for process thread count. +static OTEL_PROCESS_THREAD_COUNT: LazyLock> = LazyLock::new(|| { + meter() + .i64_gauge("process.thread.count") + .with_description("Number of threads in the process") + .with_unit("{thread}") + .build() +}); + +/// Counter for disk I/O bytes. +static OTEL_PROCESS_DISK_IO: LazyLock> = LazyLock::new(|| { + meter().u64_counter("process.disk.io").with_description("Total bytes read/written to disk").with_unit("By").build() +}); + +/// Counter for disk I/O syscalls. +static OTEL_PROCESS_DISK_SYSCALLS: LazyLock> = LazyLock::new(|| { + meter() + .u64_counter("process.disk.syscalls") + .with_description("Total number of disk read/write syscalls") + .with_unit("{syscall}") + .build() +}); + +/// Gauge for established TCP connections for this process. +static OTEL_PROCESS_NETWORK_CONNECTIONS: LazyLock> = LazyLock::new(|| { + meter() + .i64_gauge("process.network.connection.count") + .with_description("Number of established TCP connections for this process") + .with_unit("{connection}") + .build() +}); + +// ============================================================================= +// Unified Recording Functions - Application Metrics +// +// Each function records to BOTH backends. The active backend depends on config: +// - OTEL metrics enabled: OTEL records, Prometheus is no-op +// - OTEL metrics disabled: Prometheus records, OTEL uses no-op provider +// ============================================================================= + +/// Records an invalid payment attempt. +/// +/// - Prometheus: `invalid_payments_total{reason="..."}` +/// - OTEL: `nilauth.payment.invalid{reason="..."}` +pub fn record_invalid_payment(reason: &str) { + // Prometheus + counter!("invalid_payments_total", "reason" => reason.to_string()).increment(1); + // OTEL + OTEL_PAYMENT_INVALID.add(1, &[KeyValue::new("reason", reason.to_string())]); } -#[cfg(target_os = "linux")] -mod collector { - use metrics::{counter, gauge}; - use procfs::{WithCurrentSystemInfo, net::TcpState, process::Process}; - use std::{sync::LazyLock, time::Duration}; - use tokio::time::sleep; - use tracing::warn; - - static TICKS_PER_SECOND: LazyLock = LazyLock::new(|| procfs::ticks_per_second() as f64); - const COLLECT_INTERVAL: Duration = Duration::from_secs(30); - - /// Metrics about the node process. - pub struct ProcessMetricsCollector; - - impl ProcessMetricsCollector { - /// Spawns a background task to periodically collect and report process metrics. - pub fn spawn() { - tokio::spawn(async move { - loop { - Self::collect_metrics(); - sleep(COLLECT_INTERVAL).await; - } - }); - } - - fn collect_metrics() { - let metrics = match Process::myself() { - Ok(metrics) => metrics, - Err(e) => { - warn!("Failed to load procfs entry: {e}"); - return; - } - }; - let stat = match metrics.stat() { - Ok(stat) => stat, - Err(e) => { - warn!("Failed to load procfs stat: {e}"); - return; - } - }; - let tick_rate = *TICKS_PER_SECOND; - match stat.utime.checked_add(stat.stime) { - Some(total_ticks) => { - let total_milliseconds = (total_ticks as f64 / tick_rate) * 1000.0; - counter!("process_cpu_milliseconds_total").absolute(total_milliseconds as u64); - } - None => warn!("CPU time calculation overflowed"), - }; - let rss = stat.rss_bytes().get() as f64; - gauge!("process_resident_memory_bytes").set(rss); - - if let Some(count) = metrics.fd_count().ok().and_then(|c| i32::try_from(c).ok()) { - gauge!("open_file_descriptors").set(count); - } - gauge!("process_threads").set(stat.num_threads as f64); - - if let Ok(io) = metrics.io() { - let operation_values = [("read", io.read_bytes, io.syscr), ("write", io.write_bytes, io.syscw)]; - for (operation, bytes, syscalls) in operation_values { - // See notes on gauge vs counter semantics needed for CPU time. - counter!("storage_io_bytes_total", "operation" => operation).absolute(bytes); - counter!("storage_io_syscalls_total", "operation" => operation).absolute(syscalls); - } - } - - if let Ok(net) = metrics.tcp() { - let established_count = - net.iter().filter(|connection| connection.state == TcpState::Established).count() as f64; - gauge!("established_tcp_connections").set(established_count); - } - } +/// Records a valid payment. +/// +/// - Prometheus: `payments_valid_total{module="..."}` +/// - OTEL: `nilauth.payment.valid{module="..."}` +pub fn record_valid_payment(module: &str) { + // Prometheus + counter!("payments_valid_total", "module" => module.to_string()).increment(1); + // OTEL + OTEL_PAYMENT_VALID.add(1, &[KeyValue::new("module", module.to_string())]); +} + +/// Records a nuc minted. +/// +/// - Prometheus: `nucs_minted_total{module="..."}` +/// - OTEL: `nilauth.nuc.minted{module="..."}` +pub fn record_nuc_minted(module: &str) { + // Prometheus + counter!("nucs_minted_total", "module" => module.to_string()).increment(1); + // OTEL + OTEL_NUC_MINTED.add(1, &[KeyValue::new("module", module.to_string())]); +} + +/// Records a token revocation. +/// +/// - Prometheus: `revoked_tokens_total` +/// - OTEL: `nilauth.token.revoked` +pub fn record_token_revoked() { + // Prometheus + counter!("revoked_tokens_total").increment(1); + // OTEL + OTEL_TOKEN_REVOKED.add(1, &[]); +} + +/// Records expired revoked tokens removed. +/// +/// - Prometheus: `expired_revoked_tokens_removed_total` +/// - OTEL: `nilauth.token.expired.removed` +pub fn record_expired_tokens_removed(count: u64) { + // Prometheus + counter!("expired_revoked_tokens_removed_total").increment(count); + // OTEL + OTEL_TOKEN_EXPIRED_REMOVED.add(count, &[]); +} + +/// Records a token price fetch error. +/// +/// - Prometheus: `nil_token_price_fetch_errors_total` +/// - OTEL: `nilauth.token.price.fetch.error` +pub fn record_token_price_fetch_error() { + // Prometheus + counter!("nil_token_price_fetch_errors_total").increment(1); + // OTEL + OTEL_TOKEN_PRICE_FETCH_ERROR.add(1, &[]); +} + +/// Records a token price cache hit. +/// +/// - Prometheus: `nil_token_cache_hits_total` +/// - OTEL: `nilauth.token.price.cache.hit` +pub fn record_token_price_cache_hit() { + // Prometheus + counter!("nil_token_cache_hits_total").increment(1); + // OTEL + OTEL_TOKEN_PRICE_CACHE_HIT.add(1, &[]); +} + +/// Records the current token price in USD. +/// +/// - Prometheus: `nil_token_price` +/// - OTEL: `nilauth.token.price.usd` +pub fn record_token_price(price: f64) { + // Prometheus + gauge!("nil_token_price").set(price); + // OTEL + OTEL_TOKEN_PRICE_USD.record(price, &[]); +} + +/// Records a token price fetch duration in seconds. +/// +/// - Prometheus: `nil_token_price_fetch_seconds` +/// - OTEL: `nilauth.token.price.fetch.duration` +pub fn record_token_price_fetch_duration(seconds: f64) { + // Prometheus + histogram!("nil_token_price_fetch_seconds").record(seconds); + // OTEL + OTEL_TOKEN_PRICE_FETCH_DURATION.record(seconds, &[]); +} + +// ============================================================================= +// Unified Recording Functions - Process Metrics +// +// NOTE: Process metrics are OTEL-only. Unlike application metrics above which +// dual-write to both backends, process metrics use separate collectors: +// - Prometheus: Uses `ProcessMetricsCollector` with `counter!().absolute()` semantics +// - OTEL: Uses `OtelProcessMetricsCollector` with delta computation via these functions +// +// This separation exists because Prometheus counters support `.absolute()` for +// cumulative values, while OTEL counters only support `.add()` requiring delta +// computation. The collectors in `process_metrics.rs` handle this difference. +// ============================================================================= + +/// Records process CPU time in seconds (OTEL only). +pub fn record_process_cpu_time(seconds: f64) { + OTEL_PROCESS_CPU_TIME.add(seconds, &[]); +} + +/// Records process resident memory usage in bytes. +pub fn record_process_memory_usage(bytes: f64) { + OTEL_PROCESS_MEMORY_USAGE.record(bytes, &[]); +} + +/// Records open file descriptor count. +pub fn record_process_open_fd_count(count: i64) { + OTEL_PROCESS_OPEN_FD_COUNT.record(count, &[]); +} + +/// Records process thread count. +pub fn record_process_thread_count(count: i64) { + OTEL_PROCESS_THREAD_COUNT.record(count, &[]); +} + +/// Records disk I/O bytes with direction attribute. +pub fn record_process_disk_io(bytes: u64, direction: &str) { + OTEL_PROCESS_DISK_IO.add(bytes, &[KeyValue::new("direction", direction.to_string())]); +} + +/// Records disk I/O syscalls with direction attribute. +pub fn record_process_disk_syscalls(count: u64, direction: &str) { + OTEL_PROCESS_DISK_SYSCALLS.add(count, &[KeyValue::new("direction", direction.to_string())]); +} + +/// Records established TCP connection count for this process. +pub fn record_network_connections(count: i64) { + OTEL_PROCESS_NETWORK_CONNECTIONS.record(count, &[]); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn otel_metrics_recording() { + // Verify all recording functions execute without panic. + // Uses no-op provider in tests so we can't verify values, only execution. + + // Application metrics + record_invalid_payment("underpaid"); + record_valid_payment("NilDb"); + record_nuc_minted("NilAi"); + record_token_revoked(); + record_expired_tokens_removed(5); + record_token_price_fetch_error(); + record_token_price_cache_hit(); + record_token_price(1.5); + record_token_price_fetch_duration(0.5); + + // Process metrics + record_process_cpu_time(1.0); + record_process_memory_usage(1024.0); + record_process_open_fd_count(10); + record_process_thread_count(4); + record_process_disk_io(1024, "read"); + record_process_disk_io(2048, "write"); + record_process_disk_syscalls(100, "read"); + record_network_connections(5); } } diff --git a/src/observability.rs b/src/observability.rs new file mode 100644 index 0000000..9677375 --- /dev/null +++ b/src/observability.rs @@ -0,0 +1,363 @@ +//! Observability initialization for the nilauth service. +//! +//! This module provides OpenTelemetry integration for exporting logs and traces via OTLP gRPC. +//! When OTEL is enabled, telemetry is exported to a configured endpoint. + +use std::env; + +use crate::config::{Config, OtelConfig}; +use opentelemetry::{KeyValue, global, trace::TracerProvider}; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig}; +use opentelemetry_resource_detectors::ProcessResourceDetector; +use opentelemetry_sdk::{ + Resource, + logs::SdkLoggerProvider, + metrics::{PeriodicReader, SdkMeterProvider}, + trace::SdkTracerProvider, +}; +use tracing::{error, info}; +use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; + +/// A guard that manages the lifecycle of observability resources. +/// +/// When dropped, this guard ensures proper shutdown of any OTEL providers. +pub struct ObservabilityGuard { + logger_provider: Option, + tracer_provider: Option, + meter_provider: Option, +} + +impl ObservabilityGuard { + /// Returns whether OTEL metrics are enabled. + pub fn otel_metrics_enabled(&self) -> bool { + self.meter_provider.is_some() + } + + /// Shuts down the observability providers, flushing any pending data. + pub fn shutdown(mut self) { + self.shutdown_providers(); + } + + /// Internal helper to shut down all providers. + /// + /// Flushes pending data before shutdown to ensure nothing is lost. + /// Uses tracing::error! for consistency. The fmt layer remains active even + /// after OTEL providers are shut down, so errors will still be logged. + fn shutdown_providers(&mut self) { + if let Some(provider) = self.meter_provider.take() { + if let Err(e) = provider.force_flush() { + error!("Error flushing meter provider: {e}"); + } + if let Err(e) = provider.shutdown() { + error!("Error shutting down meter provider: {e}"); + } + } + if let Some(provider) = self.tracer_provider.take() { + if let Err(e) = provider.force_flush() { + error!("Error flushing tracer provider: {e}"); + } + if let Err(e) = provider.shutdown() { + error!("Error shutting down tracer provider: {e}"); + } + } + if let Some(provider) = self.logger_provider.take() { + if let Err(e) = provider.force_flush() { + error!("Error flushing logger provider: {e}"); + } + if let Err(e) = provider.shutdown() { + error!("Error shutting down logger provider: {e}"); + } + } + } +} + +impl Drop for ObservabilityGuard { + fn drop(&mut self) { + self.shutdown_providers(); + } +} + +/// Initializes observability based on the provided configuration. +/// +/// When `otel.enabled` is true, OTLP export is configured for enabled signals (logs, metrics, traces). +/// Standard fmt logging is always enabled alongside OTEL. +/// +/// Standard OTEL environment variables are supported: +/// - `OTEL_SDK_DISABLED`: Set to "true" to disable OTEL and use only fmt logging +/// - `OTEL_EXPORTER_OTLP_ENDPOINT`: OTLP endpoint URL (overrides config) +/// - `OTEL_SERVICE_NAME`: Service name (overrides config) +/// - `OTEL_RESOURCE_ATTRIBUTES`: Additional resource attributes (e.g., "team.name=myteam,deployment.environment.name=prod") +pub fn init(config: &Config) -> anyhow::Result { + // Check if OTEL is disabled via environment variable + if config.otel.enabled && is_otel_sdk_disabled() { + let guard = init_fmt_logging()?; + info!("OpenTelemetry SDK disabled via OTEL_SDK_DISABLED environment variable"); + return Ok(guard); + } + + if config.otel.enabled { + let otel_config = apply_otel_env_overrides(&config.otel); + init_otel(&otel_config) + } else { + init_fmt_logging() + } +} + +/// Checks if the OTEL SDK is disabled via the `OTEL_SDK_DISABLED` environment variable. +fn is_otel_sdk_disabled() -> bool { + env::var("OTEL_SDK_DISABLED").map(|v| v.eq_ignore_ascii_case("true") || v == "1").unwrap_or(false) +} + +/// Applies standard OTEL environment variable overrides to the configuration. +/// +/// Environment variables take precedence over YAML configuration values. +fn apply_otel_env_overrides(config: &OtelConfig) -> OtelConfig { + let mut config = config.clone(); + + if let Ok(endpoint) = env::var("OTEL_EXPORTER_OTLP_ENDPOINT") { + config.endpoint = endpoint; + } + + if let Ok(service_name) = env::var("OTEL_SERVICE_NAME") { + config.service_name = service_name; + } + + config +} + +/// Initializes standard fmt logging. +/// +/// Uses `try_init()` to gracefully handle the case where a subscriber is already set +/// (e.g., in integration tests that initialize tracing before calling this function). +fn init_fmt_logging() -> anyhow::Result { + // Ignore SetGlobalDefaultError. If a subscriber is already set, that's fine + let _ = tracing_subscriber::fmt().try_init(); + Ok(ObservabilityGuard { logger_provider: None, tracer_provider: None, meter_provider: None }) +} + +/// Initializes OpenTelemetry with OTLP export for logs, traces, and metrics. +/// +/// Gracefully degrades on exporter errors: if a provider fails to initialize, +/// logs a warning and continues with partial observability rather than failing completely. +fn init_otel(config: &OtelConfig) -> anyhow::Result { + let resource = create_resource(config); + + // Initialize tracer provider if traces are enabled (graceful degradation on error) + let tracer_provider = if config.traces.enabled { + match init_tracer_provider(config, resource.clone()) { + Ok(provider) => Some(provider), + Err(e) => { + eprintln!("Warning: Failed to initialize tracer provider: {e}"); + None + } + } + } else { + None + }; + + // Initialize logger provider if logs are enabled (graceful degradation on error) + let logger_provider = if config.logs.enabled { + match init_logger_provider(config, resource.clone()) { + Ok(provider) => Some(provider), + Err(e) => { + eprintln!("Warning: Failed to initialize logger provider: {e}"); + None + } + } + } else { + None + }; + + // Initialize meter provider if metrics are enabled (graceful degradation on error) + let meter_provider = if config.metrics.enabled { + match init_meter_provider(config, resource) { + Ok(provider) => { + // Set the global meter provider so metrics can be recorded from anywhere + global::set_meter_provider(provider.clone()); + Some(provider) + } + Err(e) => { + eprintln!("Warning: Failed to initialize meter provider: {e}"); + None + } + } + } else { + None + }; + + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + // Build optional layers + let tracing_layer = + tracer_provider.as_ref().map(|tp| tracing_opentelemetry::layer().with_tracer(tp.tracer("nilauth"))); + let logs_layer = logger_provider.as_ref().map(OpenTelemetryTracingBridge::new); + + tracing_subscriber::registry() + .with(env_filter) + .with(tracing_layer) + .with(logs_layer) + .with(tracing_subscriber::fmt::layer()) + .try_init() + .map_err(|e| anyhow::anyhow!("failed to initialize tracing subscriber: {e}"))?; + + // Determine effective endpoints for logging + let logs_endpoint = config.logs.endpoint.as_ref().unwrap_or(&config.endpoint); + let traces_endpoint = config.traces.endpoint.as_ref().unwrap_or(&config.endpoint); + let metrics_endpoint = config.metrics.endpoint.as_ref().unwrap_or(&config.endpoint); + + info!( + logs_endpoint = %logs_endpoint, + traces_endpoint = %traces_endpoint, + metrics_endpoint = %metrics_endpoint, + service_name = %config.service_name, + logs_enabled = config.logs.enabled, + traces_enabled = config.traces.enabled, + metrics_enabled = config.metrics.enabled, + "OpenTelemetry initialized" + ); + + Ok(ObservabilityGuard { logger_provider, tracer_provider, meter_provider }) +} + +/// Creates an OTEL resource with service attributes. +/// +/// Additional attributes like `team.name` and `deployment.environment.name` +/// can be set via the `OTEL_RESOURCE_ATTRIBUTES` environment variable. +fn create_resource(config: &OtelConfig) -> Resource { + // Build the base attributes + let mut attributes = vec![KeyValue::new("service.version", env!("CARGO_PKG_VERSION"))]; + + // Add any custom resource attributes from config + for (key, value) in &config.resource_attributes { + attributes.push(KeyValue::new(key.clone(), value.clone())); + } + + Resource::builder() + .with_service_name(config.service_name.clone()) + .with_attributes(attributes) + .with_detector(Box::new(ProcessResourceDetector)) + .build() +} + +/// Initializes the OTEL tracer provider with OTLP export. +fn init_tracer_provider(config: &OtelConfig, resource: Resource) -> anyhow::Result { + let exporter = build_span_exporter(config)?; + let provider = SdkTracerProvider::builder().with_resource(resource).with_batch_exporter(exporter).build(); + + Ok(provider) +} + +/// Builds the OTLP span exporter using gRPC transport. +fn build_span_exporter(config: &OtelConfig) -> anyhow::Result { + // Use traces.endpoint if set, otherwise fall back to global endpoint + let endpoint = config.traces.endpoint.as_ref().unwrap_or(&config.endpoint); + + let exporter = SpanExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_timeout(config.export_timeout) + .build() + .map_err(|e| anyhow::anyhow!("failed to build gRPC span exporter: {e}"))?; + + Ok(exporter) +} + +/// Initializes the OTEL logger provider with OTLP export. +fn init_logger_provider(config: &OtelConfig, resource: Resource) -> anyhow::Result { + let exporter = build_log_exporter(config)?; + let provider = SdkLoggerProvider::builder().with_resource(resource).with_batch_exporter(exporter).build(); + + Ok(provider) +} + +/// Builds the OTLP log exporter using gRPC transport. +fn build_log_exporter(config: &OtelConfig) -> anyhow::Result { + // Use logs.endpoint if set, otherwise fall back to global endpoint + let endpoint = config.logs.endpoint.as_ref().unwrap_or(&config.endpoint); + + let exporter = LogExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_timeout(config.export_timeout) + .build() + .map_err(|e| anyhow::anyhow!("failed to build gRPC log exporter: {e}"))?; + + Ok(exporter) +} + +/// Initializes the OTEL meter provider with OTLP export. +/// +/// NOTE: The export timeout is configured on the `MetricExporter` itself (in `build_metric_exporter`), +/// not on the `PeriodicReader`. The reader's interval controls how often metrics are collected and +/// exported, while the exporter's timeout controls the network operation timeout. +fn init_meter_provider(config: &OtelConfig, resource: Resource) -> anyhow::Result { + let exporter = build_metric_exporter(config)?; + let reader = PeriodicReader::builder(exporter).with_interval(config.metrics.export_interval).build(); + let provider = SdkMeterProvider::builder().with_resource(resource).with_reader(reader).build(); + + Ok(provider) +} + +/// Builds the OTLP metric exporter using gRPC transport. +fn build_metric_exporter(config: &OtelConfig) -> anyhow::Result { + // Use metrics.endpoint if set, otherwise fall back to global endpoint + let endpoint = config.metrics.endpoint.as_ref().unwrap_or(&config.endpoint); + + let exporter = MetricExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_timeout(config.export_timeout) + .build() + .map_err(|e| anyhow::anyhow!("failed to build gRPC metric exporter: {e}"))?; + + Ok(exporter) +} + +#[cfg(test)] +mod tests { + use super::*; + use serial_test::serial; + + #[test] + #[serial] + fn otel_sdk_disabled_env_var() { + // SAFETY: Test-only, running serially + unsafe { env::remove_var("OTEL_SDK_DISABLED") }; + assert!(!is_otel_sdk_disabled(), "should be enabled when unset"); + + unsafe { env::set_var("OTEL_SDK_DISABLED", "true") }; + assert!(is_otel_sdk_disabled(), "should be disabled for 'true'"); + + unsafe { env::set_var("OTEL_SDK_DISABLED", "1") }; + assert!(is_otel_sdk_disabled(), "should be disabled for '1'"); + + unsafe { env::set_var("OTEL_SDK_DISABLED", "false") }; + assert!(!is_otel_sdk_disabled(), "should be enabled for 'false'"); + + unsafe { env::remove_var("OTEL_SDK_DISABLED") }; + } + + #[test] + fn create_resource_with_attributes() { + let mut config = OtelConfig::default(); + config.service_name = "test-service".to_string(); + config.resource_attributes.insert("team.name".to_string(), "platform".to_string()); + + let resource = create_resource(&config); + + let get_attr = |name: &str| resource.iter().find(|(k, _)| k.as_str() == name).map(|(_, v)| v.to_string()); + + assert_eq!(get_attr("service.name"), Some("test-service".to_string())); + assert_eq!(get_attr("service.version"), Some(env!("CARGO_PKG_VERSION").to_string())); + assert_eq!(get_attr("team.name"), Some("platform".to_string())); + } + + #[test] + fn observability_guard_safe_with_no_providers() { + let guard = ObservabilityGuard { logger_provider: None, tracer_provider: None, meter_provider: None }; + + assert!(!guard.otel_metrics_enabled()); + guard.shutdown(); // Should not panic + } +} diff --git a/src/process_metrics.rs b/src/process_metrics.rs new file mode 100644 index 0000000..f5363b4 --- /dev/null +++ b/src/process_metrics.rs @@ -0,0 +1,211 @@ +pub use collector::ProcessMetricsCollector; +pub use otel_collector::OtelProcessMetricsCollector; + +#[cfg(not(target_os = "linux"))] +mod collector { + use tracing::warn; + + /// A stub collector for non-Linux systems. + pub struct ProcessMetricsCollector; + + impl ProcessMetricsCollector { + /// On non-Linux systems, warns that process metrics collection is disabled. + pub fn spawn() { + warn!("Metrics collection is only supported on Linux."); + } + } +} + +#[cfg(not(target_os = "linux"))] +mod otel_collector { + use tracing::warn; + + /// A stub OTEL collector for non-Linux systems. + pub struct OtelProcessMetricsCollector; + + impl OtelProcessMetricsCollector { + /// On non-Linux systems, warns that process metrics collection is disabled. + pub fn spawn() { + warn!("OTEL metrics collection is only supported on Linux."); + } + } +} + +#[cfg(target_os = "linux")] +pub(crate) mod collector { + use metrics::{counter, gauge}; + use procfs::{WithCurrentSystemInfo, net::TcpState, process::Process}; + use std::{sync::LazyLock, time::Duration}; + use tokio::time::sleep; + use tracing::warn; + + pub(crate) static TICKS_PER_SECOND: LazyLock = LazyLock::new(|| procfs::ticks_per_second() as f64); + pub(crate) const COLLECT_INTERVAL: Duration = Duration::from_secs(30); + + /// Metrics about the node process. + pub struct ProcessMetricsCollector; + + impl ProcessMetricsCollector { + /// Spawns a background task to periodically collect and report process metrics. + pub fn spawn() { + tokio::spawn(async move { + loop { + Self::collect_metrics(); + sleep(COLLECT_INTERVAL).await; + } + }); + } + + fn collect_metrics() { + let metrics = match Process::myself() { + Ok(metrics) => metrics, + Err(e) => { + warn!("Failed to load procfs entry: {e}"); + return; + } + }; + let stat = match metrics.stat() { + Ok(stat) => stat, + Err(e) => { + warn!("Failed to load procfs stat: {e}"); + return; + } + }; + let tick_rate = *TICKS_PER_SECOND; + match stat.utime.checked_add(stat.stime) { + Some(total_ticks) => { + let total_milliseconds = (total_ticks as f64 / tick_rate) * 1000.0; + counter!("process_cpu_milliseconds_total").absolute(total_milliseconds as u64); + } + None => warn!("CPU time calculation overflowed"), + }; + let rss = stat.rss_bytes().get() as f64; + gauge!("process_resident_memory_bytes").set(rss); + + if let Some(count) = metrics.fd_count().ok().and_then(|c| i32::try_from(c).ok()) { + gauge!("open_file_descriptors").set(count); + } + gauge!("process_threads").set(stat.num_threads as f64); + + if let Ok(io) = metrics.io() { + let operation_values = [("read", io.read_bytes, io.syscr), ("write", io.write_bytes, io.syscw)]; + for (operation, bytes, syscalls) in operation_values { + // See notes on gauge vs counter semantics needed for CPU time. + counter!("storage_io_bytes_total", "operation" => operation).absolute(bytes); + counter!("storage_io_syscalls_total", "operation" => operation).absolute(syscalls); + } + } + + if let Ok(net) = metrics.tcp() { + let established_count = + net.iter().filter(|connection| connection.state == TcpState::Established).count() as f64; + gauge!("established_tcp_connections").set(established_count); + } + } + } +} + +#[cfg(target_os = "linux")] +pub(crate) mod otel_collector { + use crate::metrics; + use procfs::{WithCurrentSystemInfo, net::TcpState, process::Process}; + use std::{sync::LazyLock, time::Duration}; + use tokio::{sync::Mutex, time::sleep}; + use tracing::warn; + + static TICKS_PER_SECOND: LazyLock = LazyLock::new(|| procfs::ticks_per_second() as f64); + pub(crate) const COLLECT_INTERVAL: Duration = Duration::from_secs(30); + + /// Tracks previous values for cumulative metrics to compute deltas. + /// OTEL counters use `add()` which increments, so we must track deltas ourselves. + #[derive(Default)] + pub(crate) struct PreviousValues { + pub(crate) cpu_ticks: u64, + pub(crate) disk_read_bytes: u64, + pub(crate) disk_write_bytes: u64, + pub(crate) disk_read_syscalls: u64, + pub(crate) disk_write_syscalls: u64, + } + + static PREVIOUS: LazyLock> = LazyLock::new(|| Mutex::new(PreviousValues::default())); + + /// OTEL process metrics collector using OpenTelemetry semantic conventions. + pub struct OtelProcessMetricsCollector; + + impl OtelProcessMetricsCollector { + /// Spawns a background task to periodically collect and report process metrics via OTEL. + pub fn spawn() { + tokio::spawn(async move { + loop { + Self::collect_metrics().await; + sleep(COLLECT_INTERVAL).await; + } + }); + } + + async fn collect_metrics() { + let process = match Process::myself() { + Ok(p) => p, + Err(e) => { + warn!("Failed to load procfs entry: {e}"); + return; + } + }; + let stat = match process.stat() { + Ok(stat) => stat, + Err(e) => { + warn!("Failed to load procfs stat: {e}"); + return; + } + }; + + let mut prev = PREVIOUS.lock().await; + let tick_rate = *TICKS_PER_SECOND; + + // CPU time - compute delta from previous reading + match stat.utime.checked_add(stat.stime) { + Some(total_ticks) => { + let delta_ticks = total_ticks.saturating_sub(prev.cpu_ticks); + prev.cpu_ticks = total_ticks; + let delta_seconds = delta_ticks as f64 / tick_rate; + metrics::record_process_cpu_time(delta_seconds); + } + None => warn!("CPU time calculation overflowed"), + }; + + // Gauges - these are point-in-time values, no delta needed + let rss = stat.rss_bytes().get() as f64; + metrics::record_process_memory_usage(rss); + + if let Some(count) = process.fd_count().ok().and_then(|c| i64::try_from(c).ok()) { + metrics::record_process_open_fd_count(count); + } + metrics::record_process_thread_count(stat.num_threads as i64); + + // Disk I/O - compute deltas from previous reading + if let Ok(io) = process.io() { + let delta_read_bytes = io.read_bytes.saturating_sub(prev.disk_read_bytes); + let delta_write_bytes = io.write_bytes.saturating_sub(prev.disk_write_bytes); + let delta_read_syscalls = io.syscr.saturating_sub(prev.disk_read_syscalls); + let delta_write_syscalls = io.syscw.saturating_sub(prev.disk_write_syscalls); + + prev.disk_read_bytes = io.read_bytes; + prev.disk_write_bytes = io.write_bytes; + prev.disk_read_syscalls = io.syscr; + prev.disk_write_syscalls = io.syscw; + + metrics::record_process_disk_io(delta_read_bytes, "read"); + metrics::record_process_disk_io(delta_write_bytes, "write"); + metrics::record_process_disk_syscalls(delta_read_syscalls, "read"); + metrics::record_process_disk_syscalls(delta_write_syscalls, "write"); + } + + // TCP connections - gauge, no delta needed + if let Ok(net) = process.tcp() { + let established_count = + net.iter().filter(|connection| connection.state == TcpState::Established).count() as i64; + metrics::record_network_connections(established_count); + } + } + } +} diff --git a/src/routes/nucs/create.rs b/src/routes/nucs/create.rs index 2f1a86a..d1efd07 100644 --- a/src/routes/nucs/create.rs +++ b/src/routes/nucs/create.rs @@ -9,13 +9,14 @@ use axum::{ response::{IntoResponse, Response}, }; use chrono::{DateTime, Utc}; -use metrics::counter; use nillion_nucs::NucSigner; use nillion_nucs::token::Command; use nillion_nucs::{builder::DelegationBuilder, did::Did}; use serde::{Deserialize, Serialize}; use strum::EnumDiscriminants; use tracing::{error, info}; + +use crate::metrics; use utoipa::ToSchema; /// The legacy payload format for creating a Nuc, used for `did:nil` authentication. @@ -160,7 +161,7 @@ async fn handle_nuc_creation( HandlerError::Internal })?; - counter!("nucs_minted_total", "module" => blind_module.to_string()).increment(1); + metrics::record_nuc_minted(&blind_module.to_string()); let response = CreateNucResponse { token }; Ok(Json(response)) } diff --git a/src/routes/payments/validate.rs b/src/routes/payments/validate.rs index cfceda3..7f818ae 100644 --- a/src/routes/payments/validate.rs +++ b/src/routes/payments/validate.rs @@ -1,5 +1,6 @@ use crate::auth::IdentityNuc; use crate::db::subscriptions::{BlindModule, PaymentRecord}; +use crate::metrics; use crate::routes::Json; use crate::services::ethereum_rpc::EthereumRpcError; use crate::{db::subscriptions::CreditPaymentError, routes::RequestHandlerError, state::SharedState}; @@ -7,7 +8,6 @@ use axum::{ http::StatusCode, response::{IntoResponse, Response}, }; -use metrics::counter; use nillion_nucs::did::Did; use nillion_nucs::k256::sha2::{Digest, Sha256}; use nillion_nucs::token::Command; @@ -121,14 +121,14 @@ pub(crate) async fn handler( if actual_digest != expected_digest { store_invalid_payment(&state, make_payment_record()).await; - counter!("invalid_payments_total", "reason" => "digest_mismatch").increment(1); + metrics::record_invalid_payment("digest_mismatch"); return Err(HandlerError::DigestMismatch); } // Verify chain ID from event matches if event.chain_id != state.parameters.chain_id { warn!("Event chain ID {} does not match configured chain ID {}", event.chain_id, state.parameters.chain_id); - counter!("invalid_payments_total", "reason" => "chain_id").increment(1); + metrics::record_invalid_payment("chain_id"); return Err(HandlerError::ChainIdMismatch { expected: state.parameters.chain_id, actual: event.chain_id }); } @@ -146,10 +146,10 @@ pub(crate) async fn handler( if Decimal::from(amount_unils) < minimum_payment { warn!("Expected payment for {minimum_payment} but got {amount_unils} unils"); - counter!("invalid_payments_total", "reason" => "underpaid").increment(1); + metrics::record_invalid_payment("underpaid"); return Err(HandlerError::InsufficientPayment); } - counter!("payments_valid_total", "module" => blind_module.to_string()).increment(1); + metrics::record_valid_payment(&blind_module.to_string()); info!("Processed payment for {amount_unils} unils, minimum was {minimum_payment} unils"); } Err(_) => { diff --git a/src/routes/revocations/revoke.rs b/src/routes/revocations/revoke.rs index 0f547f2..8c9c8f3 100644 --- a/src/routes/revocations/revoke.rs +++ b/src/routes/revocations/revoke.rs @@ -6,7 +6,6 @@ use crate::{ }; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; -use metrics::counter; use nillion_nucs::{ envelope::{InvalidSignature, NucEnvelopeParseError, NucTokenEnvelope}, token::{Command, TokenBody}, @@ -15,6 +14,8 @@ use std::{iter, sync::LazyLock}; use strum::EnumDiscriminants; use tracing::info; +use crate::metrics; + const TOKEN_ARG: &str = "token"; static REVOCATION_CMD: LazyLock = LazyLock::new(|| ["nuc", "revoke"].into()); @@ -65,7 +66,7 @@ pub(crate) async fn handler(state: SharedState, auth: CapabilityNuc) -> Result { info!("Revoked token {hash}, expires at {expires_at}"); - counter!("revoked_tokens_total").increment(1); + metrics::record_token_revoked(); Ok(Json(())) } Err(StoreRevocationError::AlreadyRevoked) => { diff --git a/src/run.rs b/src/run.rs index 886e729..3ead07b 100644 --- a/src/run.rs +++ b/src/run.rs @@ -2,7 +2,8 @@ use crate::cleanup::RevokedTokenCleaner; use crate::config::Config; use crate::db::revocations::PostgresRevocationDb; use crate::db::{PostgresPool, subscriptions::PostgresSubscriptionDb}; -use crate::metrics::ProcessMetricsCollector; +use crate::observability::ObservabilityGuard; +use crate::process_metrics::{OtelProcessMetricsCollector, ProcessMetricsCollector}; use crate::services::ethereum_rpc::AlloyBurnWithDigestEventRetriever; use crate::services::subscription_cost::DefaultSubscriptionCostService; use crate::services::token_price::CoinGeckoTokenPriceService; @@ -11,7 +12,6 @@ use crate::time::DefaultTimeService; use anyhow::Context; use axum::http; use axum::{Router, routing::get}; -use axum_prometheus::{EndpointLabel, PrometheusMetricLayerBuilder, metrics_exporter_prometheus::PrometheusBuilder}; use chrono::Utc; use nillion_nucs::did::Did; use nillion_nucs::{DidMethod, Signer}; @@ -27,7 +27,7 @@ use tracing::info; /// This function sets up the database connection, services, application state, /// and starts the main application and metrics servers. It also handles /// graceful shutdown on receiving a termination signal. -pub async fn run(config: Config) -> anyhow::Result<()> { +pub async fn run(config: Config, observability_guard: &ObservabilityGuard) -> anyhow::Result<()> { info!("Starting nilauth service"); info!( server_endpoint = %config.server.bind_endpoint, @@ -94,29 +94,54 @@ pub async fn run(config: Config) -> anyhow::Result<()> { // Spawn a helper to clean up expired tokens RevokedTokenCleaner::spawn(state.databases.revocations.clone(), Box::new(DefaultTimeService)); - // Create a custom prometheus layer that ignores unknown paths and returns `/unknown` instead so - // crawlers/malicious actors can't create high cardinality metrics by hitting unknown routes. - let (prometheus_layer, metrics_handle) = PrometheusMetricLayerBuilder::new() - .with_prefix("app") - .with_endpoint_label_type(EndpointLabel::MatchedPathWithFallbackFn(|_| "/unknown".into())) - .with_metrics_from_fn(|| { - PrometheusBuilder::new().install_recorder().expect("failed to install metrics recorder") - }) - .build_pair(); let cors = CorsLayer::new() .allow_methods([http::Method::GET, http::Method::POST]) .allow_headers([http::header::CONTENT_TYPE, http::header::AUTHORIZATION]) .allow_origin(tower_http::cors::Any); - let router = crate::routes::build_router(state).layer(prometheus_layer).layer(cors); - let metrics_router = Router::new().route("/metrics", get(|| async move { metrics_handle.render() })); - ProcessMetricsCollector::spawn(); + // Check if OTEL metrics are enabled (mutually exclusive with Prometheus) + // Use the guard's method which accounts for runtime conditions like OTEL_SDK_DISABLED + if observability_guard.otel_metrics_enabled() { + info!("Exporting OTEL metrics."); + + let router = crate::routes::build_router(state).layer(cors); + + // Spawn OTEL process metrics collector + OtelProcessMetricsCollector::spawn(); + + let app = serve(config.server.bind_endpoint, router, "main"); + app.await.context("running main server")?; + } else { + // Prometheus metrics mode (default) + use axum_prometheus::{ + EndpointLabel, PrometheusMetricLayerBuilder, metrics_exporter_prometheus::PrometheusBuilder, + }; + + info!("Exporting Prometheus metrics"); + + // Create a custom prometheus layer that ignores unknown paths and returns `/unknown` instead so + // crawlers/malicious actors can't create high cardinality metrics by hitting unknown routes. + let (prometheus_layer, metrics_handle) = PrometheusMetricLayerBuilder::new() + .with_prefix("app") + .with_endpoint_label_type(EndpointLabel::MatchedPathWithFallbackFn(|_| "/unknown".into())) + .with_metrics_from_fn(|| { + PrometheusBuilder::new().install_recorder().expect("failed to install metrics recorder") + }) + .build_pair(); + + let router = crate::routes::build_router(state).layer(prometheus_layer).layer(cors); + let metrics_router = Router::new().route("/metrics", get(|| async move { metrics_handle.render() })); + + // Spawn Prometheus process metrics collector + ProcessMetricsCollector::spawn(); + + let app = serve(config.server.bind_endpoint, router, "main"); + let metrics = serve(config.metrics.bind_endpoint, metrics_router, "metrics"); + let (app, metrics) = join!(app, metrics); + app.context("running main server")?; + metrics.context("running metrics server")?; + } - let app = serve(config.server.bind_endpoint, router, "main"); - let metrics = serve(config.metrics.bind_endpoint, metrics_router, "metrics"); - let (app, metrics) = join!(app, metrics); - app.context("running main server")?; - metrics.context("running metrics server")?; Ok(()) } diff --git a/src/services/subscription_cost.rs b/src/services/subscription_cost.rs index 06d8cd1..6c3e669 100644 --- a/src/services/subscription_cost.rs +++ b/src/services/subscription_cost.rs @@ -1,6 +1,7 @@ -use crate::{config::BlindModuleCosts, db::subscriptions::BlindModule, services::token_price::TokenPriceService}; +use crate::{ + config::BlindModuleCosts, db::subscriptions::BlindModule, metrics, services::token_price::TokenPriceService, +}; use async_trait::async_trait; -use metrics::counter; use rust_decimal::Decimal; use std::sync::Arc; use tracing::error; @@ -33,7 +34,7 @@ impl SubscriptionCostService for DefaultSubscriptionCostService { async fn blind_module_cost(&self, blind_module: BlindModule) -> Result { let token_price = self.token_price_service.nil_token_price().await.map_err(|e| { error!("Failed to get token price: {e}"); - counter!("nil_token_price_fetch_errors_total").increment(1); + metrics::record_token_price_fetch_error(); SubscriptionCostError })?; let dollar_cost = match blind_module { diff --git a/src/services/token_price.rs b/src/services/token_price.rs index efca2a4..c498c4d 100644 --- a/src/services/token_price.rs +++ b/src/services/token_price.rs @@ -1,7 +1,6 @@ -use crate::config::TokenPriceConfig; +use crate::{config::TokenPriceConfig, metrics}; use anyhow::{anyhow, bail}; use async_trait::async_trait; -use metrics::{counter, gauge, histogram}; use rust_decimal::Decimal; use serde::Deserialize; use std::{collections::HashMap, time::Duration}; @@ -50,7 +49,7 @@ impl TokenPriceService for CoinGeckoTokenPriceService { async fn nil_token_price(&self) -> anyhow::Result { let mut last_price = self.last_price.lock().await; if last_price.timestamp.elapsed() < PRICE_CACHE_DURATION { - counter!("nil_token_cache_hits_total").increment(1); + metrics::record_token_price_cache_hit(); return Ok(last_price.price); } @@ -66,8 +65,8 @@ impl TokenPriceService for CoinGeckoTokenPriceService { .send() .await .and_then(|r| r.error_for_status()); - let elapsed = now.elapsed(); - histogram!("nil_token_price_fetch_seconds",).record(elapsed.as_millis() as f64 / 1000.0); + let elapsed_seconds = now.elapsed().as_millis() as f64 / 1000.0; + metrics::record_token_price_fetch_duration(elapsed_seconds); let response = match response { Ok(response) => response, @@ -87,8 +86,8 @@ impl TokenPriceService for CoinGeckoTokenPriceService { if price <= Decimal::from(0) { bail!("token price is <= 0: {price}") } - if let Ok(price) = f64::try_from(price) { - gauge!("nil_token_price").set(price); + if let Ok(price_f64) = f64::try_from(price) { + metrics::record_token_price(price_f64); } info!("Token price from CoinGecko: {price}"); diff --git a/tests/setup.rs b/tests/setup.rs index fa0c6d5..ff19846 100644 --- a/tests/setup.rs +++ b/tests/setup.rs @@ -1,4 +1,4 @@ -use ::nilauth::{config::Config, run::run}; +use ::nilauth::{config::Config, observability, run::run}; use axum::Router; use axum::http::StatusCode; use axum::routing::get; @@ -145,7 +145,9 @@ impl Services { config: config.clone(), }; let handle = RUNTIME.spawn(async move { - match run(config).await { + // Initialize observability for tests (OTEL disabled in sample config) + let guard = observability::init(&config).expect("failed to initialize observability"); + match run(config, &guard).await { Ok(_) => info!("nilauth finished successfully"), Err(e) => error!("nilauth finished with error: {e}"), };