diff --git a/crates/blockchain/metrics/api.rs b/crates/blockchain/metrics/api.rs index bda3b98f735..9c0237645b6 100644 --- a/crates/blockchain/metrics/api.rs +++ b/crates/blockchain/metrics/api.rs @@ -54,6 +54,9 @@ pub(crate) async fn get_metrics() -> String { Err(_) => tracing::error!("Failed to gather METRICS_P2P"), }; + // METRICS_SYNC registers into the default Prometheus registry at init, + // so its metrics are already included in gather_default_metrics() above. + ret_string.push('\n'); if let Some(node_metrics) = METRICS_NODE.get() { match node_metrics.gather_metrics() { diff --git a/crates/blockchain/metrics/mod.rs b/crates/blockchain/metrics/mod.rs index 47f188a09ca..82df7e57065 100644 --- a/crates/blockchain/metrics/mod.rs +++ b/crates/blockchain/metrics/mod.rs @@ -14,6 +14,8 @@ pub mod process; pub mod profiling; #[cfg(feature = "api")] pub mod rpc; +#[cfg(any(feature = "api", feature = "metrics"))] +pub mod sync; #[cfg(any(feature = "api", feature = "transactions"))] pub mod transactions; diff --git a/crates/blockchain/metrics/sync.rs b/crates/blockchain/metrics/sync.rs new file mode 100644 index 00000000000..4323d7bf6d7 --- /dev/null +++ b/crates/blockchain/metrics/sync.rs @@ -0,0 +1,205 @@ +use prometheus::{ + IntCounterVec, IntGauge, IntGaugeVec, register_int_counter_vec, register_int_gauge, + register_int_gauge_vec, +}; +use std::sync::LazyLock; + +// Metrics defined in this module register into the Prometheus default registry. +// The metrics API exposes them via `gather_default_metrics()`. + +pub static METRICS_SYNC: LazyLock = LazyLock::new(MetricsSync::default); + +#[derive(Debug, Clone)] +pub struct MetricsSync { + // --- Current state (gauges) --- + pub stage: IntGauge, + pub pivot_block: IntGauge, + pub eligible_peers: IntGauge, + pub snap_peers: IntGauge, + pub inflight_requests: IntGauge, + pub pivot_age_seconds: IntGauge, + pub pivot_timestamp: IntGauge, + pub phase_start_timestamp: IntGaugeVec, + + // --- Progress counters (gauges set from METRICS atomics) --- + // Use rate() in Grafana to derive throughput. + pub headers_downloaded: IntGauge, + pub headers_total: IntGauge, + pub accounts_downloaded: IntGauge, + pub accounts_inserted: IntGauge, + pub storage_downloaded: IntGauge, + pub storage_inserted: IntGauge, + pub state_leaves_healed: IntGauge, + pub storage_leaves_healed: IntGauge, + pub bytecodes_downloaded: IntGauge, + pub bytecodes_total: IntGauge, + + // --- Outcome counters (counter vecs) --- + pub pivot_updates: IntCounterVec, + pub storage_requests: IntCounterVec, + pub header_resolution: IntCounterVec, +} + +impl Default for MetricsSync { + fn default() -> Self { + Self::new() + } +} + +impl MetricsSync { + pub fn new() -> Self { + MetricsSync { + // Current state + stage: register_int_gauge!( + "ethrex_sync_stage", + "Current snap sync stage (0=idle, 1=headers, 2=account_ranges, 3=account_insertion, 4=storage_ranges, 5=storage_insertion, 6=state_healing, 7=storage_healing, 8=bytecodes)" + ) + .expect("Failed to create ethrex_sync_stage"), + pivot_block: register_int_gauge!( + "ethrex_sync_pivot_block", + "Current pivot block number" + ) + .expect("Failed to create ethrex_sync_pivot_block"), + eligible_peers: register_int_gauge!( + "ethrex_sync_eligible_peers", + "Number of peers eligible for requests" + ) + .expect("Failed to create ethrex_sync_eligible_peers"), + snap_peers: register_int_gauge!( + "ethrex_sync_snap_peers", + "Number of connected peers supporting the snap protocol" + ) + .expect("Failed to create ethrex_sync_snap_peers"), + inflight_requests: register_int_gauge!( + "ethrex_sync_inflight_requests", + "Total inflight requests across all peers" + ) + .expect("Failed to create ethrex_sync_inflight_requests"), + pivot_age_seconds: register_int_gauge!( + "ethrex_sync_pivot_age_seconds", + "Age of the current pivot block in seconds" + ) + .expect("Failed to create ethrex_sync_pivot_age_seconds"), + pivot_timestamp: register_int_gauge!( + "ethrex_sync_pivot_timestamp", + "Unix timestamp of the current pivot block (use time() - this for age in Grafana)" + ) + .expect("Failed to create ethrex_sync_pivot_timestamp"), + + phase_start_timestamp: register_int_gauge_vec!( + "ethrex_sync_phase_start_timestamp", + "Unix timestamp when each phase began (use time() - this for elapsed in Grafana)", + &["phase"] + ) + .expect("Failed to create ethrex_sync_phase_start_timestamp"), + + // Progress (set periodically from METRICS atomics) + headers_downloaded: register_int_gauge!( + "ethrex_sync_headers_downloaded", + "Headers downloaded so far" + ) + .expect("Failed to create ethrex_sync_headers_downloaded"), + headers_total: register_int_gauge!( + "ethrex_sync_headers_total", + "Total headers to download (pivot block number)" + ) + .expect("Failed to create ethrex_sync_headers_total"), + accounts_downloaded: register_int_gauge!( + "ethrex_sync_accounts_downloaded", + "Account ranges downloaded from peers" + ) + .expect("Failed to create ethrex_sync_accounts_downloaded"), + accounts_inserted: register_int_gauge!( + "ethrex_sync_accounts_inserted", + "Accounts inserted into storage" + ) + .expect("Failed to create ethrex_sync_accounts_inserted"), + storage_downloaded: register_int_gauge!( + "ethrex_sync_storage_downloaded", + "Storage leaves downloaded from peers" + ) + .expect("Failed to create ethrex_sync_storage_downloaded"), + storage_inserted: register_int_gauge!( + "ethrex_sync_storage_inserted", + "Storage leaves inserted into storage" + ) + .expect("Failed to create ethrex_sync_storage_inserted"), + state_leaves_healed: register_int_gauge!( + "ethrex_sync_state_leaves_healed", + "State trie leaves healed" + ) + .expect("Failed to create ethrex_sync_state_leaves_healed"), + storage_leaves_healed: register_int_gauge!( + "ethrex_sync_storage_leaves_healed", + "Storage trie leaves healed" + ) + .expect("Failed to create ethrex_sync_storage_leaves_healed"), + bytecodes_downloaded: register_int_gauge!( + "ethrex_sync_bytecodes_downloaded", + "Bytecodes downloaded so far" + ) + .expect("Failed to create ethrex_sync_bytecodes_downloaded"), + bytecodes_total: register_int_gauge!( + "ethrex_sync_bytecodes_total", + "Total bytecodes to download" + ) + .expect("Failed to create ethrex_sync_bytecodes_total"), + + // Outcome counters + pivot_updates: register_int_counter_vec!( + "ethrex_sync_pivot_updates_total", + "Total pivot update attempts by outcome", + &["outcome"] + ) + .expect("Failed to create ethrex_sync_pivot_updates_total"), + storage_requests: register_int_counter_vec!( + "ethrex_sync_storage_requests_total", + "Total storage range requests by outcome", + &["outcome"] + ) + .expect("Failed to create ethrex_sync_storage_requests_total"), + header_resolution: register_int_counter_vec!( + "ethrex_sync_header_resolution_total", + "Total header resolution attempts by outcome", + &["outcome"] + ) + .expect("Failed to create ethrex_sync_header_resolution_total"), + } + } + + // --- Gauge setters (used by p2p sync code directly) --- + + pub fn set_eligible_peers(&self, count: i64) { + self.eligible_peers.set(count); + } + + pub fn set_snap_peers(&self, count: i64) { + self.snap_peers.set(count); + } + + pub fn set_inflight_requests(&self, count: i64) { + self.inflight_requests.set(count); + } + + pub fn set_pivot_age_seconds(&self, age: i64) { + self.pivot_age_seconds.set(age); + } + + pub fn set_current_phase(&self, phase: i64) { + self.stage.set(phase); + } + + // --- Counter incrementers --- + + pub fn inc_pivot_update(&self, outcome: &str) { + self.pivot_updates.with_label_values(&[outcome]).inc(); + } + + pub fn inc_storage_request(&self, outcome: &str) { + self.storage_requests.with_label_values(&[outcome]).inc(); + } + + pub fn inc_header_resolution(&self, outcome: &str) { + self.header_resolution.with_label_values(&[outcome]).inc(); + } +} diff --git a/crates/networking/p2p/metrics.rs b/crates/networking/p2p/metrics.rs index e525aed8d06..35950ea4e21 100644 --- a/crates/networking/p2p/metrics.rs +++ b/crates/networking/p2p/metrics.rs @@ -57,6 +57,7 @@ pub struct Metrics { /* Snap Sync */ // Common pub sync_head_block: AtomicU64, + pub pivot_timestamp: AtomicU64, pub sync_head_hash: Arc>, pub current_step: Arc, @@ -708,6 +709,7 @@ impl Default for Metrics { /* Snap Sync */ // Common sync_head_block: AtomicU64::new(0), + pivot_timestamp: AtomicU64::new(0), sync_head_hash: Arc::new(Mutex::new(H256::default())), current_step: Arc::new(CurrentStep(AtomicU8::new(0))), diff --git a/crates/networking/p2p/network.rs b/crates/networking/p2p/network.rs index 3fc2bac185f..21faaf3d972 100644 --- a/crates/networking/p2p/network.rs +++ b/crates/networking/p2p/network.rs @@ -390,11 +390,29 @@ pub async fn periodically_show_peer_stats_during_syncing( phase_elapsed_str, &phase_metrics(previous_step, &phase_start).await, ); + + // Emit final metrics for completed phase + #[cfg(feature = "metrics")] + push_sync_prometheus_metrics(previous_step); } // Start new phase phase_start_time = std::time::Instant::now(); + // Record phase start timestamp for Grafana elapsed panels + #[cfg(feature = "metrics")] + { + let (_, phase_name) = phase_info(current_step); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + ethrex_metrics::sync::METRICS_SYNC + .phase_start_timestamp + .with_label_values(&[phase_name]) + .set(now as i64); + } + // Capture metrics at phase start phase_start = PhaseCounters::capture_current(); prev_interval = phase_start; @@ -416,6 +434,22 @@ pub async fn periodically_show_peer_stats_during_syncing( ) .await; + // Push progress + peer health to Prometheus + #[cfg(feature = "metrics")] + { + push_sync_prometheus_metrics(current_step); + let diag = peer_table.get_peer_diagnostics().await.unwrap_or_default(); + let snap_peers = diag + .iter() + .filter(|p| p.capabilities.iter().any(|c| c.starts_with("snap/"))) + .count(); + let eligible = diag.iter().filter(|p| p.eligible).count(); + let inflight: i64 = diag.iter().map(|p| p.inflight_requests).sum(); + ethrex_metrics::sync::METRICS_SYNC.set_snap_peers(snap_peers as i64); + ethrex_metrics::sync::METRICS_SYNC.set_eligible_peers(eligible as i64); + ethrex_metrics::sync::METRICS_SYNC.set_inflight_requests(inflight); + } + // Update previous interval counters for next rate calculation prev_interval = PhaseCounters::capture_current(); @@ -684,6 +718,78 @@ async fn log_phase_progress( } } +/// Push snap sync progress to Prometheus gauges (from METRICS atomics). +/// Called each polling cycle. Rates are NOT computed here — use rate() in Grafana. +#[cfg(feature = "metrics")] +fn push_sync_prometheus_metrics(step: CurrentStepValue) { + use ethrex_metrics::sync::METRICS_SYNC; + use std::sync::atomic::Ordering::Relaxed; + + let (phase_num, _) = phase_info(step); + METRICS_SYNC.stage.set(phase_num as i64); + METRICS_SYNC + .pivot_block + .set(METRICS.sync_head_block.load(Relaxed) as i64); + + // Push raw pivot timestamp — Grafana computes age via time() - timestamp + let pivot_ts = METRICS.pivot_timestamp.load(Relaxed); + if pivot_ts > 0 { + METRICS_SYNC.pivot_timestamp.set(pivot_ts as i64); + } + // Also update pivot_age_seconds for RPC/peer_top consumers + if pivot_ts > 0 { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + METRICS_SYNC + .pivot_age_seconds + .set(now.saturating_sub(pivot_ts) as i64); + } + + match step { + CurrentStepValue::DownloadingHeaders => { + let total = METRICS.sync_head_block.load(Relaxed); + let downloaded = u64::min(METRICS.downloaded_headers.get(), total); + METRICS_SYNC.headers_downloaded.set(downloaded as i64); + METRICS_SYNC.headers_total.set(total as i64); + } + CurrentStepValue::RequestingAccountRanges => { + let downloaded = METRICS.downloaded_account_tries.load(Relaxed); + METRICS_SYNC.accounts_downloaded.set(downloaded as i64); + } + CurrentStepValue::InsertingAccountRanges | CurrentStepValue::InsertingAccountRangesNoDb => { + let total = METRICS.downloaded_account_tries.load(Relaxed); + let inserted = METRICS.account_tries_inserted.load(Relaxed); + METRICS_SYNC.accounts_downloaded.set(total as i64); + METRICS_SYNC.accounts_inserted.set(inserted as i64); + } + CurrentStepValue::RequestingStorageRanges => { + let downloaded = METRICS.storage_leaves_downloaded.get(); + METRICS_SYNC.storage_downloaded.set(downloaded as i64); + } + CurrentStepValue::InsertingStorageRanges => { + let inserted = METRICS.storage_leaves_inserted.get(); + METRICS_SYNC.storage_inserted.set(inserted as i64); + } + CurrentStepValue::HealingState => { + let healed = METRICS.global_state_trie_leafs_healed.load(Relaxed); + METRICS_SYNC.state_leaves_healed.set(healed as i64); + } + CurrentStepValue::HealingStorage => { + let healed = METRICS.global_storage_tries_leafs_healed.load(Relaxed); + METRICS_SYNC.storage_leaves_healed.set(healed as i64); + } + CurrentStepValue::RequestingBytecodes => { + let total = METRICS.bytecodes_to_download.load(Relaxed); + let downloaded = METRICS.downloaded_bytecodes.load(Relaxed); + METRICS_SYNC.bytecodes_downloaded.set(downloaded as i64); + METRICS_SYNC.bytecodes_total.set(total as i64); + } + CurrentStepValue::None => {} + } +} + fn progress_bar(percentage: f64, width: usize) -> String { let clamped_percentage = percentage.clamp(0.0, 100.0); let filled = ((clamped_percentage / 100.0) * width as f64) as usize; diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index a921bc975a4..915cbf35b10 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -1,7 +1,7 @@ use crate::rlpx::initiator::RLPxInitiator; use crate::{ metrics::{CurrentStepValue, METRICS}, - peer_table::{PeerData, PeerTable, PeerTableServerProtocol as _}, + peer_table::{PeerData, PeerDiagnostics, PeerTable, PeerTableServerProtocol as _}, rlpx::{ connection::server::PeerConnection, error::PeerConnectionError, @@ -170,6 +170,16 @@ impl PeerHandler { .get_peer_connections(SUPPORTED_ETH_CAPABILITIES.to_vec()) .await?; + let selected_peers: Vec<_> = peer_connection + .iter() + .take(MAX_PEERS_TO_ASK) + .map(|(id, _)| *id) + .collect(); + debug!( + retry = retries, + peers_selected = ?selected_peers, + "request_block_headers: resolving sync head with peers" + ); for (peer_id, mut connection) in peer_connection.into_iter().take(MAX_PEERS_TO_ASK) { match ask_peer_head_number( peer_id, @@ -183,10 +193,16 @@ impl PeerHandler { Ok(number) => { sync_head_number = number; if number != 0 { + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_header_resolution("found"); break; } + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_header_resolution("unknown"); } Err(err) => { + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_header_resolution("timeout"); debug!( "Sync Log 13: Failed to retrieve sync head block number from peer {peer_id}: {err}" ); @@ -578,6 +594,14 @@ impl PeerHandler { } Ok(None) } + /// Returns diagnostic snapshots for all connected peers (scores, requests, eligibility). + pub async fn read_peer_diagnostics(&self) -> Vec { + self.peer_table + .get_peer_diagnostics() + .await + .unwrap_or_default() + } + /// Returns the PeerData for each connected Peer pub async fn read_connected_peers(&mut self) -> Vec { self.peer_table diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 7c33022032f..23f3f841f72 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -187,6 +187,8 @@ pub struct PeerData { score: i64, /// Track the amount of concurrent requests this peer is handling requests: i64, + /// Timestamp (seconds since UNIX epoch) of the last successful response from this peer + pub last_response_time: Option, } impl PeerData { @@ -204,10 +206,25 @@ impl PeerData { connection, score: Default::default(), requests: Default::default(), + last_response_time: None, } } } +/// Diagnostic snapshot of a peer's state, used by admin RPC endpoints. +#[derive(Debug, Clone, serde::Serialize)] +pub struct PeerDiagnostics { + pub peer_id: H256, + pub score: i64, + pub inflight_requests: i64, + pub eligible: bool, + pub capabilities: Vec, + pub ip: IpAddr, + pub client_version: String, + pub connection_direction: String, + pub last_response_time: Option, +} + /// Result of contact validation. #[derive(Debug, Clone)] pub enum ContactValidation { @@ -303,6 +320,7 @@ pub trait PeerTableServerProtocol: Send + Sync { capabilities: Vec, ) -> Response>; fn get_session_info(&self, node_id: H256) -> Response>; + fn get_peer_diagnostics(&self) -> Response>; } #[derive(Debug)] @@ -451,9 +469,14 @@ impl PeerTableServer { msg: peer_table_server_protocol::RecordSuccess, _ctx: &Context, ) { - self.peers - .entry(msg.node_id) - .and_modify(|peer_data| peer_data.score = (peer_data.score + 1).min(MAX_SCORE)); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + self.peers.entry(msg.node_id).and_modify(|peer_data| { + peer_data.score = (peer_data.score + 1).min(MAX_SCORE); + peer_data.last_response_time = Some(now); + }); } #[send_handler] @@ -808,6 +831,36 @@ impl PeerTableServer { .or_else(|| self.contacts.get(&msg.node_id)?.session.clone()) } + #[request_handler] + async fn handle_get_peer_diagnostics( + &mut self, + _msg: peer_table_server_protocol::GetPeerDiagnostics, + _ctx: &Context, + ) -> Vec { + self.peers + .iter() + .map(|(id, peer_data)| PeerDiagnostics { + peer_id: *id, + score: peer_data.score, + inflight_requests: peer_data.requests, + eligible: self.can_try_more_requests(&peer_data.score, &peer_data.requests), + capabilities: peer_data + .supported_capabilities + .iter() + .map(|c| format!("{}/{}", c.protocol(), c.version)) + .collect(), + ip: peer_data.node.ip, + client_version: peer_data.node.version.clone().unwrap_or_default(), + connection_direction: if peer_data.is_connection_inbound { + "inbound".to_string() + } else { + "outbound".to_string() + }, + last_response_time: peer_data.last_response_time, + }) + .collect() + } + // === Private helper methods === // Weighting function used to select best peer diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index d8d984ac172..1106c6c101e 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -98,6 +98,7 @@ pub async fn request_account_range( account_state_snapshots_dir: &Path, pivot_header: &mut BlockHeader, block_sync_state: &mut SnapBlockSyncState, + diagnostics: &std::sync::Arc>, ) -> Result<(), SnapError> { METRICS .current_step @@ -257,6 +258,7 @@ pub async fn request_account_range( pivot_header.timestamp, peers, block_sync_state, + diagnostics, ) .await .expect("Should be able to update pivot") @@ -1280,6 +1282,7 @@ async fn request_storage_ranges_worker( limit_hash: task.end_hash.unwrap_or(HASH_MAX), response_bytes: MAX_RESPONSE_BYTES, }); + tracing::trace!(peer_id = %peer_id, msg_type = "GetStorageRanges", "Sending storage range request"); let Ok(RLPxMessage::StorageRanges(StorageRanges { id: _, slots, @@ -1290,11 +1293,17 @@ async fn request_storage_ranges_worker( .outgoing_request(request, PEER_REPLY_TIMEOUT) .await else { + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("timeout"); + tracing::trace!(peer_id = %peer_id, msg_type = "GetStorageRanges", outcome = "timeout", "Storage range request failed"); tracing::debug!("Failed to get storage range"); tx.send(empty_task_result).await.ok(); return Ok(()); }; if slots.is_empty() && proof.is_empty() { + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("empty"); + tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "empty", "Storage range response empty"); tx.send(empty_task_result).await.ok(); tracing::debug!("Received empty storage range"); return Ok(()); @@ -1390,6 +1399,7 @@ async fn request_storage_ranges_worker( } else { (start + account_storages.len(), end, H256::zero()) }; + let slot_count: usize = account_storages.iter().map(|s| s.len()).sum(); let task_result = StorageTaskResult { start_index: start, account_storages, @@ -1398,6 +1408,9 @@ async fn request_storage_ranges_worker( remaining_end, remaining_hash_range: (remaining_start_hash, task.end_hash), }; + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("success"); + tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "success", slots = slot_count, "Storage range response received"); tx.send(task_result).await.ok(); Ok::<(), SnapError>(()) } diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 8f1ff36fcf1..6170a7e9849 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -29,7 +29,7 @@ use std::sync::{ use tokio::sync::mpsc::error::SendError; use tokio::time::Instant; use tokio_util::sync::CancellationToken; -use tracing::{error, info}; +use tracing::{debug, error, info}; // Re-export types used by submodules pub use snap_sync::{ @@ -53,6 +53,56 @@ pub enum SyncMode { Snap, } +/// Diagnostic snapshot of the sync state, used by admin RPC endpoints. +#[derive(Debug, Clone, Default, serde::Serialize)] +pub struct SyncDiagnostics { + pub sync_mode: String, + pub current_phase: String, + pub pivot_block_number: Option, + pub pivot_timestamp: Option, + pub pivot_age_seconds: Option, + pub staleness_threshold_seconds: u64, + pub phase_progress: std::collections::HashMap, + pub recent_pivot_changes: std::collections::VecDeque, + pub recent_errors: std::collections::VecDeque, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct PivotChangeEvent { + pub timestamp: u64, + pub old_pivot_number: u64, + pub new_pivot_number: u64, + pub outcome: String, + pub failure_reason: Option, +} + +#[derive(Debug, Clone, serde::Serialize)] +pub struct SyncErrorEvent { + pub timestamp: u64, + pub error_type: String, + pub error_message: String, + pub recoverable: bool, +} + +impl SyncDiagnostics { + const MAX_PIVOT_CHANGES: usize = 10; + const MAX_ERRORS: usize = 20; + + pub fn push_pivot_change(&mut self, event: PivotChangeEvent) { + if self.recent_pivot_changes.len() >= Self::MAX_PIVOT_CHANGES { + self.recent_pivot_changes.pop_front(); + } + self.recent_pivot_changes.push_back(event); + } + + pub fn push_error(&mut self, event: SyncErrorEvent) { + if self.recent_errors.len() >= Self::MAX_ERRORS { + self.recent_errors.pop_front(); + } + self.recent_errors.push_back(event); + } +} + /// Manager in charge the sync process #[derive(Debug)] pub struct Syncer { @@ -66,6 +116,7 @@ pub struct Syncer { /// This string indicates a folder where the snap algorithm will store temporary files that are /// used during the syncing process datadir: PathBuf, + diagnostics: Arc>, } impl Syncer { @@ -75,6 +126,7 @@ impl Syncer { cancel_token: CancellationToken, blockchain: Arc, datadir: PathBuf, + diagnostics: Arc>, ) -> Self { Self { snap_enabled, @@ -82,6 +134,7 @@ impl Syncer { cancel_token, blockchain, datadir, + diagnostics, } } @@ -97,6 +150,7 @@ impl Syncer { let start_time = Instant::now(); match self.sync_cycle(sync_head, store).await { Ok(()) => { + self.diagnostics.write().await.current_phase = "idle".to_string(); info!( time_elapsed_s = start_time.elapsed().as_secs(), %sync_head, @@ -106,7 +160,27 @@ impl Syncer { // If the error is irrecoverable, we exit ethrex Err(error) => { - match error.is_recoverable() { + let recoverable = error.is_recoverable(); + debug!( + error_type = %error, + recoverable = recoverable, + action = if recoverable { "retry" } else { "exit" }, + "Sync cycle error classification" + ); + { + let mut diag = self.diagnostics.write().await; + diag.current_phase = "idle".to_string(); + diag.push_error(SyncErrorEvent { + timestamp: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(), + error_type: format!("{:?}", std::mem::discriminant(&error)), + error_message: error.to_string(), + recoverable, + }); + } + match recoverable { false => { // We exit the node, as we can't recover this error error!( @@ -144,6 +218,7 @@ impl Syncer { sync_head, store, &self.datadir, + &self.diagnostics, ) .await; METRICS.disable().await; diff --git a/crates/networking/p2p/sync/snap_sync.rs b/crates/networking/p2p/sync/snap_sync.rs index 519f1c73389..90aee7c8054 100644 --- a/crates/networking/p2p/sync/snap_sync.rs +++ b/crates/networking/p2p/sync/snap_sync.rs @@ -105,6 +105,7 @@ pub async fn sync_cycle_snap( sync_head: H256, store: Store, datadir: &Path, + diagnostics: &Arc>, ) -> Result<(), SyncError> { // Request all block headers between the current head and the sync head // We will begin from the current head so that we download the earliest state first @@ -118,6 +119,11 @@ pub async fn sync_cycle_snap( .get_block_number(current_head) .await? .ok_or(SyncError::BlockNumber(current_head))?; + { + let mut diag = diagnostics.write().await; + diag.current_phase = "headers".to_string(); + diag.sync_mode = "snap".to_string(); + } info!( "Syncing from current head {:?} to sync_head {:?}", current_head, sync_head @@ -236,12 +242,21 @@ pub async fn sync_cycle_snap( .await?; } + // Update diagnostics with header progress + { + let mut diag = diagnostics.write().await; + diag.phase_progress.insert( + "headers_downloaded".to_string(), + block_sync_state.block_hashes.len() as u64, + ); + } + if sync_head_found { break; }; } - snap_sync(peers, &store, &mut block_sync_state, datadir).await?; + snap_sync(peers, &store, &mut block_sync_state, datadir, diagnostics).await?; store.clear_snap_state().await?; snap_enabled.store(false, Ordering::Relaxed); @@ -255,6 +270,7 @@ pub async fn snap_sync( store: &Store, block_sync_state: &mut SnapBlockSyncState, datadir: &Path, + diagnostics: &Arc>, ) -> Result<(), SyncError> { // snap-sync: launch tasks to fetch blocks and state in parallel // - Fetch each block's body and its receipt via eth p2p requests @@ -274,6 +290,7 @@ pub async fn snap_sync( pivot_header.timestamp, peers, block_sync_state, + diagnostics, ) .await?; } @@ -281,6 +298,18 @@ pub async fn snap_sync( "Selected block {} as pivot for snap sync", pivot_header.number ); + { + let mut diag = diagnostics.write().await; + diag.pivot_block_number = Some(pivot_header.number); + diag.pivot_timestamp = Some(pivot_header.timestamp); + let pivot_age = current_unix_time().saturating_sub(pivot_header.timestamp); + diag.pivot_age_seconds = Some(pivot_age); + diag.staleness_threshold_seconds = (SNAP_LIMIT as u64) * SECONDS_PER_BLOCK; + diag.sync_mode = "snap".to_string(); + METRICS + .pivot_timestamp + .store(pivot_header.timestamp, std::sync::atomic::Ordering::Relaxed); + } let state_root = pivot_header.state_root; let account_state_snapshots_dir = get_account_state_snapshots_dir(datadir); @@ -303,6 +332,7 @@ pub async fn snap_sync( // The function request_account_range writes the leafs into files in // account_state_snapshots_dir + diagnostics.write().await.current_phase = "account_ranges".to_string(); info!("Starting to download account ranges from peers"); request_account_range( peers, @@ -311,10 +341,21 @@ pub async fn snap_sync( account_state_snapshots_dir.as_ref(), &mut pivot_header, block_sync_state, + diagnostics, ) .await?; info!("Finish downloading account ranges from peers"); + { + let mut diag = diagnostics.write().await; + diag.current_phase = "account_insertion".to_string(); + diag.phase_progress.insert( + "account_ranges_downloaded".to_string(), + METRICS + .downloaded_account_tries + .load(std::sync::atomic::Ordering::Relaxed), + ); + } *METRICS.account_tries_insert_start_time.lock().await = Some(SystemTime::now()); METRICS .current_step @@ -341,6 +382,7 @@ pub async fn snap_sync( info!("Original state root: {state_root:?}"); info!("Computed state root after request_account_rages: {computed_state_root:?}"); + diagnostics.write().await.current_phase = "storage_ranges".to_string(); *METRICS.storage_tries_download_start_time.lock().await = Some(SystemTime::now()); // We start downloading the storage leafs. To do so, we need to be sure that the storage root // is correct. To do so, we always heal the state trie before requesting storage rates @@ -354,6 +396,7 @@ pub async fn snap_sync( pivot_header.timestamp, peers, block_sync_state, + diagnostics, ) .await?; } @@ -429,6 +472,7 @@ pub async fn snap_sync( info!("Finished request_storage_ranges"); *METRICS.storage_tries_download_end_time.lock().await = Some(SystemTime::now()); + diagnostics.write().await.current_phase = "storage_insertion".to_string(); *METRICS.storage_tries_insert_start_time.lock().await = Some(SystemTime::now()); METRICS .current_step @@ -448,6 +492,7 @@ pub async fn snap_sync( info!("Finished storing storage tries"); } + diagnostics.write().await.current_phase = "healing".to_string(); *METRICS.heal_start_time.lock().await = Some(SystemTime::now()); info!("Starting Healing Process"); let mut global_state_leafs_healed: u64 = 0; @@ -461,6 +506,7 @@ pub async fn snap_sync( pivot_header.timestamp, peers, block_sync_state, + diagnostics, ) .await?; } @@ -506,6 +552,7 @@ pub async fn snap_sync( let mut seen_code_hashes = HashSet::new(); let mut code_hashes_to_download = Vec::new(); + diagnostics.write().await.current_phase = "bytecodes".to_string(); info!("Starting download code hashes from peers"); for entry in std::fs::read_dir(&code_hashes_dir) .map_err(|_| SyncError::CodeHashesSnapshotsDirNotFound)? @@ -635,6 +682,7 @@ pub async fn update_pivot( block_timestamp: u64, peers: &mut PeerHandler, block_sync_state: &mut SnapBlockSyncState, + diagnostics: &Arc>, ) -> Result { const MAX_RETRIES_PER_PEER: u64 = 3; const MAX_TOTAL_FAILURES: u64 = 15; @@ -656,6 +704,18 @@ pub async fn update_pivot( loop { if total_failures >= MAX_TOTAL_FAILURES { + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("max_failures"); + diagnostics + .write() + .await + .push_pivot_change(super::PivotChangeEvent { + timestamp: current_unix_time(), + old_pivot_number: block_number, + new_pivot_number: new_pivot_block_number, + outcome: "max_failures".to_string(), + failure_reason: Some(format!("Exhausted {MAX_TOTAL_FAILURES} total failures")), + }); return Err(SyncError::PeerHandler( crate::peer_handler::PeerHandlerError::BlockHeaders, )); @@ -679,6 +739,8 @@ pub async fn update_pivot( .await? else { debug!("We tried to get peers during update_pivot, but we found no free peers"); + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("no_peers"); consecutive_failures = consecutive_failures.saturating_add(1); total_failures = total_failures.saturating_add(1); continue; @@ -701,6 +763,18 @@ pub async fn update_pivot( } let peer_score = peers.peer_table.get_score(peer_id).await?; + let diag = peers.read_peer_diagnostics().await; + let eligible_count = diag.iter().filter(|p| p.eligible).count(); + let total_count = diag.len(); + debug!( + eligible_peers = eligible_count, + total_peers = total_count, + selected_peer = %peer_id, + peer_score = peer_score, + consecutive_failures = consecutive_failures, + total_failures = total_failures, + "update_pivot: attempting with peer" + ); info!( "Trying to update pivot to {new_pivot_block_number} with peer {peer_id} (score: {peer_score})" ); @@ -714,6 +788,8 @@ pub async fn update_pivot( warn!( "Received None pivot from peer {peer_id} (score after penalizing: {peer_score}). Retrying" ); + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_none"); last_failed_peer = Some(peer_id); consecutive_failures = consecutive_failures.saturating_add(1); total_failures = total_failures.saturating_add(1); @@ -722,7 +798,26 @@ pub async fn update_pivot( // Reward peer peers.peer_table.record_success(peer_id)?; + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("success"); info!("Succesfully updated pivot"); + { + let mut diag = diagnostics.write().await; + diag.push_pivot_change(super::PivotChangeEvent { + timestamp: current_unix_time(), + old_pivot_number: block_number, + new_pivot_number: pivot.number, + outcome: "success".to_string(), + failure_reason: None, + }); + diag.pivot_block_number = Some(pivot.number); + diag.pivot_timestamp = Some(pivot.timestamp); + let pivot_age = current_unix_time().saturating_sub(pivot.timestamp); + diag.pivot_age_seconds = Some(pivot_age); + METRICS + .pivot_timestamp + .store(pivot.timestamp, std::sync::atomic::Ordering::Relaxed); + } let block_headers = peers .request_block_headers(block_number + 1, pivot.hash()) .await? @@ -736,7 +831,21 @@ pub async fn update_pivot( } pub fn block_is_stale(block_header: &BlockHeader) -> bool { - calculate_staleness_timestamp(block_header.timestamp) < current_unix_time() + let threshold = calculate_staleness_timestamp(block_header.timestamp); + let now = current_unix_time(); + let is_stale = threshold < now; + if is_stale { + let pivot_age = now.saturating_sub(block_header.timestamp); + let staleness_limit = (SNAP_LIMIT as u64) * SECONDS_PER_BLOCK; + debug!( + pivot_number = block_header.number, + pivot_timestamp = block_header.timestamp, + pivot_age_seconds = pivot_age, + staleness_threshold_seconds = staleness_limit, + "Pivot block detected as stale" + ); + } + is_stale } pub fn calculate_staleness_timestamp(timestamp: u64) -> u64 { diff --git a/crates/networking/p2p/sync_manager.rs b/crates/networking/p2p/sync_manager.rs index 59e126f8963..c124d581c0d 100644 --- a/crates/networking/p2p/sync_manager.rs +++ b/crates/networking/p2p/sync_manager.rs @@ -18,7 +18,7 @@ use tracing::{error, info, warn}; use crate::{ peer_handler::PeerHandler, - sync::{SyncMode, Syncer}, + sync::{SyncDiagnostics, SyncMode, Syncer}, }; /// Abstraction to interact with the active sync process without disturbing it @@ -30,6 +30,7 @@ pub struct SyncManager { syncer: Arc>, last_fcu_head: Arc>, store: Store, + diagnostics: Arc>, } impl SyncManager { @@ -76,18 +77,21 @@ impl SyncManager { } } + let diagnostics = Arc::new(tokio::sync::RwLock::new(SyncDiagnostics::default())); let syncer = Arc::new(Mutex::new(Syncer::new( peer_handler, snap_enabled.clone(), cancel_token, blockchain, datadir, + diagnostics.clone(), ))); let sync_manager = Self { snap_enabled, syncer, last_fcu_head: Arc::new(Mutex::new(H256::zero())), store: store.clone(), + diagnostics, }; // If the node was in the middle of a sync and then re-started we must resume syncing // Otherwise we will incorreclty assume the node is already synced and work on invalid state @@ -120,6 +124,58 @@ impl SyncManager { self.snap_enabled.store(false, Ordering::Relaxed); } + /// Returns a snapshot of the current sync diagnostics with live values. + pub async fn get_sync_diagnostics(&self) -> SyncDiagnostics { + use crate::metrics::METRICS; + use std::sync::atomic::Ordering::Relaxed; + + let mut diag = self.diagnostics.read().await.clone(); + + // Compute live pivot age + if let Some(ts) = diag.pivot_timestamp { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + diag.pivot_age_seconds = Some(now.saturating_sub(ts)); + } + + // Populate live progress from METRICS atomics + let headers = METRICS.downloaded_headers.get(); + let accounts_downloaded = METRICS.downloaded_account_tries.load(Relaxed); + let accounts_inserted = METRICS.account_tries_inserted.load(Relaxed); + let storage_downloaded = METRICS.storage_leaves_downloaded.get(); + let storage_inserted = METRICS.storage_leaves_inserted.get(); + + if headers > 0 { + diag.phase_progress + .insert("headers_downloaded".into(), headers); + } + if accounts_downloaded > 0 { + diag.phase_progress + .insert("accounts_downloaded".into(), accounts_downloaded); + } + if accounts_inserted > 0 { + diag.phase_progress + .insert("accounts_inserted".into(), accounts_inserted); + } + if storage_downloaded > 0 { + diag.phase_progress + .insert("storage_slots_downloaded".into(), storage_downloaded); + } + if storage_inserted > 0 { + diag.phase_progress + .insert("storage_slots_inserted".into(), storage_inserted); + } + + diag + } + + /// Returns a reference to the diagnostics RwLock for updating from the sync code. + pub fn diagnostics(&self) -> &Arc> { + &self.diagnostics + } + /// Updates the last fcu head. This may be used on the next sync cycle if needed fn set_head(&self, fcu_head: H256) { if let Ok(mut latest_fcu_head) = self.last_fcu_head.try_lock() { diff --git a/crates/networking/rpc/admin/mod.rs b/crates/networking/rpc/admin/mod.rs index 16e16c6535c..2f3668a6176 100644 --- a/crates/networking/rpc/admin/mod.rs +++ b/crates/networking/rpc/admin/mod.rs @@ -10,7 +10,7 @@ use crate::{ utils::{RpcErr, RpcRequest}, }; mod peers; -pub use peers::{add_peer, peers}; +pub use peers::{add_peer, peer_scores, peers, sync_status}; #[derive(Serialize, Debug)] #[serde(rename_all = "camelCase")] diff --git a/crates/networking/rpc/admin/peers.rs b/crates/networking/rpc/admin/peers.rs index 795229f5a94..2ee28fbe72b 100644 --- a/crates/networking/rpc/admin/peers.rs +++ b/crates/networking/rpc/admin/peers.rs @@ -152,6 +152,43 @@ async fn peer_is_connected(peer_handler: &mut PeerHandler, enode_url: &str) -> b .any(|peer| peer.node.enode_url() == *enode_url) } +pub async fn peer_scores(context: &mut RpcApiContext) -> Result { + let Some(peer_handler) = &context.peer_handler else { + return Err(RpcErr::Internal("Peer handler not initialized".to_string())); + }; + + let diagnostics = peer_handler.read_peer_diagnostics().await; + let total = diagnostics.len(); + let eligible = diagnostics.iter().filter(|p| p.eligible).count(); + let avg_score = if total > 0 { + diagnostics.iter().map(|p| p.score).sum::() / total as i64 + } else { + 0 + }; + let total_inflight: i64 = diagnostics.iter().map(|p| p.inflight_requests).sum(); + + let response = serde_json::json!({ + "peers": diagnostics, + "summary": { + "total_peers": total, + "eligible_peers": eligible, + "average_score": avg_score, + "total_inflight_requests": total_inflight, + } + }); + + Ok(response) +} + +pub async fn sync_status(context: &mut RpcApiContext) -> Result { + let Some(syncer) = &context.syncer else { + return Err(RpcErr::Internal("Sync manager not initialized".to_string())); + }; + + let diag = syncer.get_sync_diagnostics().await; + serde_json::to_value(diag).map_err(|e| RpcErr::Internal(e.to_string())) +} + // TODO: Adapt the test to the new P2P architecture. #[cfg(test)] mod tests { diff --git a/crates/networking/rpc/rpc.rs b/crates/networking/rpc/rpc.rs index 21e38073508..4ab95d3c11d 100644 --- a/crates/networking/rpc/rpc.rs +++ b/crates/networking/rpc/rpc.rs @@ -869,6 +869,8 @@ pub async fn map_admin_requests( match req.method.as_str() { "admin_nodeInfo" => admin::node_info(context.storage, &context.node_data).await, "admin_peers" => admin::peers(&mut context).await, + "admin_peerScores" => admin::peer_scores(&mut context).await, + "admin_syncStatus" => admin::sync_status(&mut context).await, "admin_setLogLevel" => admin::set_log_level(req, &context.log_filter_handler), "admin_addPeer" => admin::add_peer(&mut context, req).await, unknown_admin_method => Err(RpcErr::MethodNotFound(unknown_admin_method.to_owned())), diff --git a/metrics/provisioning/grafana/dashboards/common_dashboards/snapsync_dashboard.json b/metrics/provisioning/grafana/dashboards/common_dashboards/snapsync_dashboard.json new file mode 100644 index 00000000000..e0bda1ceb78 --- /dev/null +++ b/metrics/provisioning/grafana/dashboards/common_dashboards/snapsync_dashboard.json @@ -0,0 +1,1927 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "panels": [ + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "title": "Sync Overview", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 1 + }, + "title": "Sync Stage", + "fieldConfig": { + "defaults": { + "mappings": [ + { + "options": { + "0": { + "text": "Idle" + }, + "1": { + "text": "Headers" + }, + "2": { + "text": "Account Ranges" + }, + "3": { + "text": "Account Insertion" + }, + "4": { + "text": "Storage Ranges" + }, + "5": { + "text": "Storage Insertion" + }, + "6": { + "text": "State Healing" + }, + "7": { + "text": "Storage Healing" + }, + "8": { + "text": "Bytecodes" + } + }, + "type": "value" + } + ], + "thresholds": { + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + }, + { + "color": "orange", + "value": 3 + }, + { + "color": "blue", + "value": 6 + } + ] + }, + "color": { + "mode": "thresholds" + } + } + }, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "center", + "textMode": "value", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "ethrex_sync_stage{instance=~\"$instance\"}", + "legendFormat": "Stage" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Pivot Block", + "gridPos": { + "h": 4, + "w": 6, + "x": 6, + "y": 1 + }, + "fieldConfig": { + "defaults": { + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "ethrex_sync_pivot_block{instance=~\"$instance\"}", + "legendFormat": "" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Peers", + "gridPos": { + "h": 4, + "w": 6, + "x": 12, + "y": 1 + }, + "fieldConfig": { + "defaults": { + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "ethrex_p2p_peer_count{instance=~\"$instance\"}", + "legendFormat": "" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Phase Elapsed", + "gridPos": { + "h": 4, + "w": 6, + "x": 18, + "y": 1 + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "decimals": 0 + } + }, + "options": { + "colorMode": "none", + "graphMode": "none", + "justifyMode": "center", + "textMode": "value_and_name", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "time() - ethrex_sync_phase_start_timestamp{phase=~\".+\",instance=~\"$instance\"} and ethrex_sync_phase_start_timestamp{phase=~\".+\",instance=~\"$instance\"} > 0", + "legendFormat": "{{phase}}" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "timeseries", + "title": "Sync Rates Overview", + "gridPos": { + "h": 6, + "w": 24, + "x": 0, + "y": 5 + }, + "fieldConfig": { + "defaults": { + "unit": "none", + "custom": { + "drawStyle": "line", + "fillOpacity": 10, + "lineWidth": 2, + "showPoints": "never" + } + } + }, + "options": { + "tooltip": { + "mode": "multi", + "sort": "desc" + }, + "legend": { + "displayMode": "list", + "placement": "bottom", + "calcs": [ + "lastNotNull", + "max" + ] + } + }, + "targets": [ + { + "expr": "rate(ethrex_sync_headers_downloaded{instance=~\"$instance\"}[5m])", + "legendFormat": "headers/s" + }, + { + "expr": "rate(ethrex_sync_accounts_downloaded{instance=~\"$instance\"}[5m])", + "legendFormat": "accounts dl/s" + }, + { + "expr": "rate(ethrex_sync_accounts_inserted{instance=~\"$instance\"}[5m])", + "legendFormat": "accounts ins/s" + }, + { + "expr": "rate(ethrex_sync_storage_downloaded{instance=~\"$instance\"}[5m])", + "legendFormat": "storage dl/s" + }, + { + "expr": "rate(ethrex_sync_storage_inserted{instance=~\"$instance\"}[5m])", + "legendFormat": "storage ins/s" + }, + { + "expr": "rate(ethrex_sync_state_leaves_healed{instance=~\"$instance\"}[5m])", + "legendFormat": "state heals/s" + }, + { + "expr": "rate(ethrex_sync_storage_leaves_healed{instance=~\"$instance\"}[5m])", + "legendFormat": "storage heals/s" + }, + { + "expr": "rate(ethrex_sync_bytecodes_downloaded{instance=~\"$instance\"}[5m])", + "legendFormat": "bytecodes/s" + } + ] + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 12 + }, + "title": "Peer Health", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Eligible Peers", + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 13 + }, + "fieldConfig": { + "defaults": { + "thresholds": { + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "yellow", + "value": 5 + }, + { + "color": "green", + "value": 20 + } + ] + }, + "color": { + "mode": "thresholds" + } + } + }, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "ethrex_sync_eligible_peers{instance=~\"$instance\"}" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Pivot Age", + "gridPos": { + "h": 4, + "w": 6, + "x": 6, + "y": 13 + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "decimals": 0, + "thresholds": { + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 600 + }, + { + "color": "red", + "value": 1200 + } + ] + }, + "color": { + "mode": "thresholds" + } + } + }, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "time() - ethrex_sync_pivot_timestamp{instance=~\"$instance\"} and ethrex_sync_pivot_timestamp{instance=~\"$instance\"} > 0" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Inflight Requests", + "gridPos": { + "h": 4, + "w": 6, + "x": 12, + "y": 13 + }, + "fieldConfig": { + "defaults": { + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "ethrex_sync_inflight_requests{instance=~\"$instance\"}" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Pivot Updates", + "gridPos": { + "h": 4, + "w": 6, + "x": 18, + "y": 13 + }, + "fieldConfig": { + "defaults": { + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "sum(ethrex_sync_pivot_updates_total{instance=~\"$instance\"})" + } + ] + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 18 + }, + "title": "Headers", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "gauge", + "title": "Headers Progress", + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 19 + }, + "fieldConfig": { + "defaults": { + "min": 0, + "max": 100, + "unit": "percent", + "decimals": 1, + "thresholds": { + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "yellow", + "value": 33 + }, + { + "color": "green", + "value": 66 + } + ] + }, + "color": { + "mode": "thresholds" + } + } + }, + "options": { + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "ethrex_sync_headers_downloaded{instance=~\"$instance\"} / clamp_min(ethrex_sync_headers_total{instance=~\"$instance\"}, 1) * 100", + "legendFormat": "progress" + }, + { + "expr": "ethrex_sync_headers_downloaded{instance=~\"$instance\"}", + "legendFormat": "downloaded", + "hide": true + }, + { + "expr": "ethrex_sync_headers_total{instance=~\"$instance\"}", + "legendFormat": "total", + "hide": true + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "", + "gridPos": { + "h": 2, + "w": 6, + "x": 0, + "y": 23 + }, + "fieldConfig": { + "defaults": { + "decimals": 0 + } + }, + "options": { + "colorMode": "none", + "graphMode": "none", + "justifyMode": "center", + "textMode": "value_and_name", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "ethrex_sync_headers_downloaded{instance=~\"$instance\"}", + "legendFormat": "downloaded" + }, + { + "expr": "ethrex_sync_headers_total{instance=~\"$instance\"}", + "legendFormat": "total" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "gauge", + "title": "Headers Rate", + "gridPos": { + "h": 6, + "w": 6, + "x": 6, + "y": 19 + }, + "fieldConfig": { + "defaults": { + "min": 0, + "unit": "none", + "decimals": 0, + "thresholds": { + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 500 + }, + { + "color": "green", + "value": 1500 + } + ] + }, + "color": { + "mode": "thresholds" + } + } + }, + "options": { + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "rate(ethrex_sync_headers_downloaded{instance=~\"$instance\"}[5m])", + "legendFormat": "/s" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Headers ETA", + "gridPos": { + "h": 6, + "w": 6, + "x": 12, + "y": 19 + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "(ethrex_sync_headers_total{instance=~\"$instance\"} - ethrex_sync_headers_downloaded{instance=~\"$instance\"}) / clamp_min(rate(ethrex_sync_headers_downloaded{instance=~\"$instance\"}[5m]), 0.001) and rate(ethrex_sync_headers_downloaded{instance=~\"$instance\"}[5m]) > 0", + "legendFormat": "ETA" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Headers Elapsed", + "gridPos": { + "h": 6, + "w": 6, + "x": 18, + "y": 19 + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "time() - ethrex_sync_phase_start_timestamp{phase=\"BLOCK HEADERS\",instance=~\"$instance\"} and ethrex_sync_phase_start_timestamp{phase=\"BLOCK HEADERS\",instance=~\"$instance\"} > 0", + "legendFormat": "Elapsed" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "timeseries", + "title": "Headers Rate Over Time", + "gridPos": { + "h": 5, + "w": 24, + "x": 0, + "y": 25 + }, + "fieldConfig": { + "defaults": { + "unit": "none", + "custom": { + "drawStyle": "line", + "fillOpacity": 10 + } + } + }, + "options": { + "tooltip": { + "mode": "multi" + } + }, + "targets": [ + { + "expr": "rate(ethrex_sync_headers_downloaded{instance=~\"$instance\"}[5m])", + "legendFormat": "headers/s" + } + ] + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 30 + }, + "title": "Accounts", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "gauge", + "title": "Accounts Progress", + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 31 + }, + "fieldConfig": { + "defaults": { + "min": 0, + "max": 100, + "unit": "percent", + "decimals": 1, + "thresholds": { + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "yellow", + "value": 33 + }, + { + "color": "green", + "value": 66 + } + ] + }, + "color": { + "mode": "thresholds" + } + } + }, + "options": { + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "ethrex_sync_accounts_inserted{instance=~\"$instance\"} / clamp_min(ethrex_sync_accounts_downloaded{instance=~\"$instance\"}, 1) * 100", + "legendFormat": "progress" + }, + { + "expr": "ethrex_sync_accounts_downloaded{instance=~\"$instance\"}", + "legendFormat": "downloaded", + "hide": true + }, + { + "expr": "ethrex_sync_accounts_inserted{instance=~\"$instance\"}", + "legendFormat": "inserted", + "hide": true + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "", + "gridPos": { + "h": 2, + "w": 6, + "x": 0, + "y": 35 + }, + "fieldConfig": { + "defaults": { + "decimals": 0 + } + }, + "options": { + "colorMode": "none", + "graphMode": "none", + "justifyMode": "center", + "textMode": "value_and_name", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "ethrex_sync_accounts_downloaded{instance=~\"$instance\"}", + "legendFormat": "downloaded" + }, + { + "expr": "ethrex_sync_accounts_inserted{instance=~\"$instance\"}", + "legendFormat": "inserted" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "gauge", + "title": "Accounts Rate", + "gridPos": { + "h": 6, + "w": 6, + "x": 6, + "y": 31 + }, + "fieldConfig": { + "defaults": { + "min": 0, + "unit": "none", + "decimals": 0, + "thresholds": { + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 100 + }, + { + "color": "green", + "value": 500 + } + ] + }, + "color": { + "mode": "thresholds" + } + } + }, + "options": { + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "rate(ethrex_sync_accounts_inserted{instance=~\"$instance\"}[5m])", + "legendFormat": "/s" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Accounts ETA", + "gridPos": { + "h": 6, + "w": 6, + "x": 12, + "y": 31 + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "(ethrex_sync_accounts_downloaded{instance=~\"$instance\"} - ethrex_sync_accounts_inserted{instance=~\"$instance\"}) / clamp_min(rate(ethrex_sync_accounts_inserted{instance=~\"$instance\"}[5m]), 0.001) and rate(ethrex_sync_accounts_inserted{instance=~\"$instance\"}[5m]) > 0", + "legendFormat": "ETA" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Accounts Elapsed", + "gridPos": { + "h": 6, + "w": 6, + "x": 18, + "y": 31 + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "time() - ethrex_sync_phase_start_timestamp{phase=\"ACCOUNT RANGES\",instance=~\"$instance\"} and ethrex_sync_phase_start_timestamp{phase=\"ACCOUNT RANGES\",instance=~\"$instance\"} > 0", + "legendFormat": "Elapsed" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "timeseries", + "title": "Accounts Rate Over Time", + "gridPos": { + "h": 5, + "w": 24, + "x": 0, + "y": 37 + }, + "fieldConfig": { + "defaults": { + "unit": "none", + "custom": { + "drawStyle": "line", + "fillOpacity": 10 + } + } + }, + "options": { + "tooltip": { + "mode": "multi" + } + }, + "targets": [ + { + "expr": "rate(ethrex_sync_accounts_downloaded{instance=~\"$instance\"}[5m])", + "legendFormat": "download/s" + }, + { + "expr": "rate(ethrex_sync_accounts_inserted{instance=~\"$instance\"}[5m])", + "legendFormat": "insert/s" + } + ] + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 42 + }, + "title": "Storage", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "gauge", + "title": "Storage Progress", + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 43 + }, + "fieldConfig": { + "defaults": { + "min": 0, + "max": 100, + "unit": "percent", + "decimals": 1, + "thresholds": { + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "yellow", + "value": 33 + }, + { + "color": "green", + "value": 66 + } + ] + }, + "color": { + "mode": "thresholds" + } + } + }, + "options": { + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "ethrex_sync_storage_inserted{instance=~\"$instance\"} / clamp_min(ethrex_sync_storage_downloaded{instance=~\"$instance\"}, 1) * 100", + "legendFormat": "progress" + }, + { + "expr": "ethrex_sync_storage_downloaded{instance=~\"$instance\"}", + "legendFormat": "downloaded", + "hide": true + }, + { + "expr": "ethrex_sync_storage_inserted{instance=~\"$instance\"}", + "legendFormat": "inserted", + "hide": true + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "", + "gridPos": { + "h": 2, + "w": 6, + "x": 0, + "y": 47 + }, + "fieldConfig": { + "defaults": { + "decimals": 0 + } + }, + "options": { + "colorMode": "none", + "graphMode": "none", + "justifyMode": "center", + "textMode": "value_and_name", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "ethrex_sync_storage_downloaded{instance=~\"$instance\"}", + "legendFormat": "downloaded" + }, + { + "expr": "ethrex_sync_storage_inserted{instance=~\"$instance\"}", + "legendFormat": "inserted" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "gauge", + "title": "Storage Rate", + "gridPos": { + "h": 6, + "w": 6, + "x": 6, + "y": 43 + }, + "fieldConfig": { + "defaults": { + "min": 0, + "unit": "none", + "decimals": 0, + "thresholds": { + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 500 + }, + { + "color": "green", + "value": 2000 + } + ] + }, + "color": { + "mode": "thresholds" + } + } + }, + "options": { + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "rate(ethrex_sync_storage_inserted{instance=~\"$instance\"}[5m])", + "legendFormat": "/s" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Storage ETA", + "gridPos": { + "h": 6, + "w": 6, + "x": 12, + "y": 43 + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "(ethrex_sync_storage_downloaded{instance=~\"$instance\"} - ethrex_sync_storage_inserted{instance=~\"$instance\"}) / clamp_min(rate(ethrex_sync_storage_inserted{instance=~\"$instance\"}[5m]), 0.001) and rate(ethrex_sync_storage_inserted{instance=~\"$instance\"}[5m]) > 0", + "legendFormat": "ETA" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Storage Elapsed", + "gridPos": { + "h": 6, + "w": 6, + "x": 18, + "y": 43 + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "time() - ethrex_sync_phase_start_timestamp{phase=\"STORAGE RANGES\",instance=~\"$instance\"} and ethrex_sync_phase_start_timestamp{phase=\"STORAGE RANGES\",instance=~\"$instance\"} > 0", + "legendFormat": "Elapsed" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "timeseries", + "title": "Storage Rate Over Time", + "gridPos": { + "h": 5, + "w": 24, + "x": 0, + "y": 49 + }, + "fieldConfig": { + "defaults": { + "unit": "none", + "custom": { + "drawStyle": "line", + "fillOpacity": 10 + } + } + }, + "options": { + "tooltip": { + "mode": "multi" + } + }, + "targets": [ + { + "expr": "rate(ethrex_sync_storage_downloaded{instance=~\"$instance\"}[5m])", + "legendFormat": "download/s" + }, + { + "expr": "rate(ethrex_sync_storage_inserted{instance=~\"$instance\"}[5m])", + "legendFormat": "insert/s" + } + ] + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 54 + }, + "title": "Healing", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Leaves Healed", + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 55 + }, + "fieldConfig": { + "defaults": { + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "ethrex_sync_state_leaves_healed{instance=~\"$instance\"} + ethrex_sync_storage_leaves_healed{instance=~\"$instance\"}", + "legendFormat": "total" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "", + "gridPos": { + "h": 2, + "w": 6, + "x": 0, + "y": 59 + }, + "fieldConfig": { + "defaults": { + "decimals": 0 + } + }, + "options": { + "colorMode": "none", + "graphMode": "none", + "justifyMode": "center", + "textMode": "value_and_name", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "ethrex_sync_state_leaves_healed{instance=~\"$instance\"}", + "legendFormat": "state" + }, + { + "expr": "ethrex_sync_storage_leaves_healed{instance=~\"$instance\"}", + "legendFormat": "storage" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "gauge", + "title": "Healing Rate", + "gridPos": { + "h": 6, + "w": 6, + "x": 6, + "y": 55 + }, + "fieldConfig": { + "defaults": { + "min": 0, + "unit": "none", + "decimals": 0, + "thresholds": { + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 100 + }, + { + "color": "green", + "value": 500 + } + ] + }, + "color": { + "mode": "thresholds" + } + } + }, + "options": { + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "rate(ethrex_sync_state_leaves_healed{instance=~\"$instance\"}[5m]) + rate(ethrex_sync_storage_leaves_healed{instance=~\"$instance\"}[5m])", + "legendFormat": "/s" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Healing Rate (avg)", + "gridPos": { + "h": 6, + "w": 6, + "x": 12, + "y": 55 + }, + "fieldConfig": { + "defaults": { + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "rate(ethrex_sync_state_leaves_healed{instance=~\"$instance\"}[5m]) + rate(ethrex_sync_storage_leaves_healed{instance=~\"$instance\"}[5m])", + "legendFormat": "heals/s" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Healing Elapsed", + "gridPos": { + "h": 6, + "w": 6, + "x": 18, + "y": 55 + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "time() - ethrex_sync_phase_start_timestamp{phase=\"STATE HEALING\",instance=~\"$instance\"} and ethrex_sync_phase_start_timestamp{phase=\"STATE HEALING\",instance=~\"$instance\"} > 0", + "legendFormat": "Elapsed" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "timeseries", + "title": "Healing Rate Over Time", + "gridPos": { + "h": 5, + "w": 24, + "x": 0, + "y": 61 + }, + "fieldConfig": { + "defaults": { + "unit": "none", + "custom": { + "drawStyle": "line", + "fillOpacity": 10 + } + } + }, + "options": { + "tooltip": { + "mode": "multi" + } + }, + "targets": [ + { + "expr": "rate(ethrex_sync_state_leaves_healed{instance=~\"$instance\"}[5m])", + "legendFormat": "state heals/s" + }, + { + "expr": "rate(ethrex_sync_storage_leaves_healed{instance=~\"$instance\"}[5m])", + "legendFormat": "storage heals/s" + } + ] + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 66 + }, + "title": "Bytecodes", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "gauge", + "title": "Bytecodes Progress", + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 67 + }, + "fieldConfig": { + "defaults": { + "min": 0, + "max": 100, + "unit": "percent", + "decimals": 1, + "thresholds": { + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "yellow", + "value": 33 + }, + { + "color": "green", + "value": 66 + } + ] + }, + "color": { + "mode": "thresholds" + } + } + }, + "options": { + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "ethrex_sync_bytecodes_downloaded{instance=~\"$instance\"} / clamp_min(ethrex_sync_bytecodes_total{instance=~\"$instance\"}, 1) * 100", + "legendFormat": "progress" + }, + { + "expr": "ethrex_sync_bytecodes_downloaded{instance=~\"$instance\"}", + "legendFormat": "downloaded", + "hide": true + }, + { + "expr": "ethrex_sync_bytecodes_total{instance=~\"$instance\"}", + "legendFormat": "total", + "hide": true + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "", + "gridPos": { + "h": 2, + "w": 6, + "x": 0, + "y": 71 + }, + "fieldConfig": { + "defaults": { + "decimals": 0 + } + }, + "options": { + "colorMode": "none", + "graphMode": "none", + "justifyMode": "center", + "textMode": "value_and_name", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "ethrex_sync_bytecodes_downloaded{instance=~\"$instance\"}", + "legendFormat": "downloaded" + }, + { + "expr": "ethrex_sync_bytecodes_total{instance=~\"$instance\"}", + "legendFormat": "total" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "gauge", + "title": "Bytecodes Rate", + "gridPos": { + "h": 6, + "w": 6, + "x": 6, + "y": 67 + }, + "fieldConfig": { + "defaults": { + "min": 0, + "unit": "none", + "decimals": 0, + "thresholds": { + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "orange", + "value": 50 + }, + { + "color": "green", + "value": 200 + } + ] + }, + "color": { + "mode": "thresholds" + } + } + }, + "options": { + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "orientation": "horizontal" + }, + "targets": [ + { + "expr": "rate(ethrex_sync_bytecodes_downloaded{instance=~\"$instance\"}[5m])", + "legendFormat": "/s" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Bytecodes ETA", + "gridPos": { + "h": 6, + "w": 6, + "x": 12, + "y": 67 + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "(ethrex_sync_bytecodes_total{instance=~\"$instance\"} - ethrex_sync_bytecodes_downloaded{instance=~\"$instance\"}) / clamp_min(rate(ethrex_sync_bytecodes_downloaded{instance=~\"$instance\"}[5m]), 0.001) and rate(ethrex_sync_bytecodes_downloaded{instance=~\"$instance\"}[5m]) > 0", + "legendFormat": "ETA" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "stat", + "title": "Bytecodes Elapsed", + "gridPos": { + "h": 6, + "w": 6, + "x": 18, + "y": 67 + }, + "fieldConfig": { + "defaults": { + "unit": "s", + "decimals": 0 + } + }, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ] + } + }, + "targets": [ + { + "expr": "time() - ethrex_sync_phase_start_timestamp{phase=\"BYTECODES\",instance=~\"$instance\"} and ethrex_sync_phase_start_timestamp{phase=\"BYTECODES\",instance=~\"$instance\"} > 0", + "legendFormat": "Elapsed" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "type": "timeseries", + "title": "Bytecodes Rate Over Time", + "gridPos": { + "h": 5, + "w": 24, + "x": 0, + "y": 73 + }, + "fieldConfig": { + "defaults": { + "unit": "none", + "custom": { + "drawStyle": "line", + "fillOpacity": 10 + } + } + }, + "options": { + "tooltip": { + "mode": "multi" + } + }, + "targets": [ + { + "expr": "rate(ethrex_sync_bytecodes_downloaded{instance=~\"$instance\"}[5m])", + "legendFormat": "codes/s" + } + ] + } + ], + "refresh": "10s", + "schemaVersion": 39, + "tags": [ + "ethrex", + "snapsync" + ], + "templating": { + "list": [ + { + "current": { + "text": "Prometheus", + "value": "${DS_PROMETHEUS}" + }, + "hide": 2, + "name": "DS_PROMETHEUS", + "options": [], + "query": "prometheus", + "refresh": 1, + "type": "datasource" + }, + { + "current": { + "text": "localhost:3701", + "value": "localhost:3701" + }, + "definition": "label_values(ethrex_sync_stage, instance)", + "includeAll": false, + "multi": false, + "name": "instance", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(ethrex_sync_stage, instance)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 2, + "regex": "", + "type": "query" + } + ] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "ethrex Snap Sync", + "uid": "ethrex-snapsync", + "version": 1 +} diff --git a/tooling/repl/src/commands/admin.rs b/tooling/repl/src/commands/admin.rs index f1ae2c7d599..e6a79a9dad6 100644 --- a/tooling/repl/src/commands/admin.rs +++ b/tooling/repl/src/commands/admin.rs @@ -48,5 +48,19 @@ pub fn commands() -> Vec { params: ENODE, description: "Adds a peer by enode URL", }, + CommandDef { + namespace: "admin", + name: "peerScores", + rpc_method: "admin_peerScores", + params: NO_PARAMS, + description: "Returns peer diagnostics: scores, inflight requests, eligibility", + }, + CommandDef { + namespace: "admin", + name: "syncStatus", + rpc_method: "admin_syncStatus", + params: NO_PARAMS, + description: "Returns sync diagnostics: phase, pivot, staleness, recent events", + }, ] } diff --git a/tooling/repl/src/formatter.rs b/tooling/repl/src/formatter.rs index b2975a62e13..db274706b3c 100644 --- a/tooling/repl/src/formatter.rs +++ b/tooling/repl/src/formatter.rs @@ -70,48 +70,64 @@ fn format_object_box(map: &serde_json::Map, title: &str) -> Strin let rows = flatten_object(map, ""); - let key_w = rows.iter().map(|(k, _)| k.len()).max().unwrap_or(0); - let val_w = rows - .iter() - .map(|(_, v)| v.len()) - .max() - .unwrap_or(0) - .min(MAX_VALUE_DISPLAY_LEN); - let content_w = key_w + 3 + val_w; - let box_w = content_w + 4; // "│ " + content + " │" + // Separate scalar rows from table sections (arrays of objects rendered below the box) + let mut scalar_rows = Vec::new(); + let mut table_sections: Vec<(String, String)> = Vec::new(); + for (key, value) in rows { + if value.starts_with('\n') && value.contains("items)") { + table_sections.push((key, value)); + } else { + scalar_rows.push((key, value)); + } + } let mut out = String::new(); - // Top border - if title.is_empty() { - out.push_str(&format!("┌{}┐\n", "─".repeat(box_w - 2))); - } else { - let fill = (box_w - 2).saturating_sub(title.len() + 1); - out.push_str(&format!("┌─{}{}┐\n", title.bold(), "─".repeat(fill))); + if !scalar_rows.is_empty() { + let key_w = scalar_rows.iter().map(|(k, _)| k.len()).max().unwrap_or(0); + let val_w = scalar_rows + .iter() + .map(|(_, v)| v.len()) + .max() + .unwrap_or(0) + .min(MAX_VALUE_DISPLAY_LEN); + let content_w = key_w + 3 + val_w; + let box_w = content_w + 4; + + if title.is_empty() { + out.push_str(&format!("┌{}┐\n", "─".repeat(box_w - 2))); + } else { + let fill = (box_w - 2).saturating_sub(title.len() + 1); + out.push_str(&format!("┌─{}{}┐\n", title.bold(), "─".repeat(fill))); + } + + for (key, value) in &scalar_rows { + let display_val = truncate_middle(value, val_w); + let key_pad = " ".repeat(key_w.saturating_sub(key.len())); + let val_pad = " ".repeat(val_w.saturating_sub(display_val.len())); + out.push_str(&format!( + "│ {}{} {}{} │\n", + key_pad, + key.cyan(), + colorize_inline(&display_val), + val_pad, + )); + } + + out.push_str(&format!("└{}┘", "─".repeat(box_w - 2))); } - // Rows - for (key, value) in &rows { - let display_val = truncate_middle(value, val_w); - let key_pad = " ".repeat(key_w.saturating_sub(key.len())); - let val_pad = " ".repeat(val_w.saturating_sub(display_val.len())); - out.push_str(&format!( - "│ {}{} {}{} │\n", - key_pad, - key.cyan(), - colorize_inline(&display_val), - val_pad, - )); - } - - // Bottom border - out.push_str(&format!("└{}┘", "─".repeat(box_w - 2))); + // Render table sections below the box + for (key, table) in &table_sections { + out.push_str(&format!("\n {}:{}", key.cyan(), table)); + } out } /// Flatten a JSON object into (key, plain-text-value) pairs. /// Nested objects are expanded with dot-separated keys. +/// Arrays of objects are rendered as inline tables. fn flatten_object(map: &serde_json::Map, prefix: &str) -> Vec<(String, String)> { let mut rows = Vec::new(); for (key, value) in map { @@ -124,6 +140,10 @@ fn flatten_object(map: &serde_json::Map, prefix: &str) -> Vec<(St Value::Object(nested) if !nested.is_empty() => { rows.extend(flatten_object(nested, &full_key)); } + Value::Array(arr) if !arr.is_empty() && arr.iter().all(|v| v.is_object()) => { + // Render array of objects as a table + rows.push((full_key, format_object_array_table(arr))); + } Value::Array(arr) => { let items: Vec = arr.iter().map(inline_value).collect(); rows.push((full_key, items.join(", "))); @@ -136,6 +156,85 @@ fn flatten_object(map: &serde_json::Map, prefix: &str) -> Vec<(St rows } +/// Render an array of objects as a compact table with headers. +fn format_object_array_table(arr: &[Value]) -> String { + if arr.is_empty() { + return "[]".to_string(); + } + + // Collect all keys from all objects to build columns + let mut columns: Vec = Vec::new(); + for item in arr { + if let Value::Object(map) = item { + for key in map.keys() { + if !columns.contains(key) { + columns.push(key.clone()); + } + } + } + } + + if columns.is_empty() { + return "[]".to_string(); + } + + // Compute column widths + let col_values: Vec> = arr + .iter() + .map(|item| { + columns + .iter() + .map(|col| item.get(col).map(inline_value).unwrap_or_default()) + .collect() + }) + .collect(); + + let col_widths: Vec = columns + .iter() + .enumerate() + .map(|(i, header)| { + let max_val = col_values.iter().map(|row| row[i].len()).max().unwrap_or(0); + header.len().max(max_val).min(30) + }) + .collect(); + + let mut out = String::new(); + + // Header + out.push('\n'); + let header_parts: Vec = columns + .iter() + .zip(&col_widths) + .map(|(h, w)| format!("{:>width$}", h, width = *w)) + .collect(); + out.push_str(&format!(" {}", header_parts.join(" "))); + + // Separator + let sep_parts: Vec = col_widths.iter().map(|w| "─".repeat(*w)).collect(); + out.push_str(&format!("\n {}", sep_parts.join("──"))); + + // Rows + for row in &col_values { + let parts: Vec = row + .iter() + .zip(&col_widths) + .map(|(val, w)| { + let truncated = if val.len() > *w { + let s: String = val.chars().take(*w - 1).collect(); + format!("{s}…") + } else { + val.clone() + }; + format!("{:>width$}", truncated, width = *w) + }) + .collect(); + out.push_str(&format!("\n {}", parts.join(" "))); + } + + out.push_str(&format!("\n ({} items)", arr.len())); + out +} + /// Convert a Value to a plain-text string for table cells. fn inline_value(value: &Value) -> String { match value { diff --git a/tooling/sync/Makefile b/tooling/sync/Makefile index 0343373b30d..1f2426bad9c 100644 --- a/tooling/sync/Makefile +++ b/tooling/sync/Makefile @@ -251,6 +251,9 @@ MULTISYNC_BUILD_PROFILE ?= release-with-debug-assertions MULTISYNC_LOCAL_IMAGE ?= ethrex-local:multisync # Branch to track for auto-update mode (defaults to current branch if not set) MULTISYNC_BRANCH ?= $(shell git rev-parse --abbrev-ref HEAD) +# Sync phases that trigger TRACE logging and fast polling in the monitor. +# Empty by default (no extra tracing). Example: MULTISYNC_WATCHED_PHASES=healing +MULTISYNC_WATCHED_PHASES ?= multisync-up: ## Start all networks specified in MULTISYNC_NETWORKS via Docker Compose. $(MULTISYNC_COMPOSE) up -d $(MULTISYNC_SERVICES) @@ -341,4 +344,5 @@ multisync-loop-auto: ## Continuous loop with auto-update: pull latest, build, an --branch "$(MULTISYNC_BRANCH)" \ --build-profile "$(MULTISYNC_BUILD_PROFILE)" \ --image-tag "$(MULTISYNC_LOCAL_IMAGE)" \ - --ethrex-dir "$(ETHREX_DIR)" + --ethrex-dir "$(ETHREX_DIR)" \ + $(if $(MULTISYNC_WATCHED_PHASES),--watched-phases "$(MULTISYNC_WATCHED_PHASES)") diff --git a/tooling/sync/docker-compose.multisync.yaml b/tooling/sync/docker-compose.multisync.yaml index 200f91326a7..99b9205d832 100644 --- a/tooling/sync/docker-compose.multisync.yaml +++ b/tooling/sync/docker-compose.multisync.yaml @@ -28,9 +28,13 @@ x-ethrex-common: ðrex-common image: "${ETHREX_IMAGE:-ghcr.io/lambdaclass/ethrex:main}" pull_policy: "${ETHREX_PULL_POLICY:-always}" + environment: + RUST_LOG: "${RUST_LOG:-info,ethrex_p2p::sync=debug}" ulimits: nofile: 1000000 restart: unless-stopped + # Metrics are exposed via --metrics in each container's command. + # Only mainnet maps to host port 3701 (scraped by centralized Prometheus). x-consensus-common: &consensus-common image: sigp/lighthouse:v8.0.1 @@ -70,6 +74,7 @@ services: container_name: ethrex-hoodi ports: - "8545:8545" # RPC + - "3702:3701" # Metrics volumes: - secrets-hoodi:/secrets - ethrex-hoodi:/data @@ -80,6 +85,7 @@ services: --authrpc.jwtsecret /secrets/jwt.hex --syncmode snap --datadir /data + --metrics --metrics.addr 0.0.0.0 --metrics.port 3701 depends_on: setup-jwt-hoodi: condition: service_completed_successfully @@ -117,6 +123,7 @@ services: container_name: ethrex-sepolia ports: - "8546:8545" # RPC on different host port + - "3703:3701" # Metrics volumes: - secrets-sepolia:/secrets - ethrex-sepolia:/data @@ -127,6 +134,7 @@ services: --authrpc.jwtsecret /secrets/jwt.hex --syncmode snap --datadir /data + --metrics --metrics.addr 0.0.0.0 --metrics.port 3701 depends_on: setup-jwt-sepolia: condition: service_completed_successfully @@ -164,6 +172,7 @@ services: container_name: ethrex-mainnet ports: - "8547:8545" # RPC on different host port + - "3701:3701" # Metrics (scraped by centralized Prometheus) volumes: - secrets-mainnet:/secrets - ethrex-mainnet:/data @@ -174,6 +183,7 @@ services: --authrpc.jwtsecret /secrets/jwt.hex --syncmode snap --datadir /data + --metrics --metrics.addr 0.0.0.0 --metrics.port 3701 depends_on: setup-jwt-mainnet: condition: service_completed_successfully @@ -221,6 +231,7 @@ services: --authrpc.jwtsecret /secrets/jwt.hex --syncmode snap --datadir /data + --metrics --metrics.addr 0.0.0.0 --metrics.port 3701 depends_on: setup-jwt-hoodi-2: condition: service_completed_successfully diff --git a/tooling/sync/docker_monitor.py b/tooling/sync/docker_monitor.py index 0ab1c4254c0..586afe73605 100644 --- a/tooling/sync/docker_monitor.py +++ b/tooling/sync/docker_monitor.py @@ -61,6 +61,249 @@ } +# Diagnostics polling configuration +DIAGNOSTICS_NORMAL_INTERVAL = 30 # seconds between polls during normal operation +DIAGNOSTICS_DEGRADED_INTERVAL = 5 # seconds between polls during degradation +DIAGNOSTICS_NORMAL_BUFFER_SIZE = 20 # snapshots kept in normal mode +DIAGNOSTICS_DEGRADED_BUFFER_SIZE = 60 # snapshots kept in degraded mode +DEGRADATION_ELIGIBLE_PEERS_THRESHOLD = 5 # trigger if eligible peers below this +DEGRADATION_STALENESS_RATIO = 0.8 # trigger if pivot age > 80% of threshold +DEGRADATION_RECOVERY_TIMEOUT = 60 # seconds of health before leaving degraded mode +# Watched phases: sync phases that warrant closer monitoring. +# When the node enters a watched phase, the monitor bumps the log level to +# TRACE (via admin_setLogLevel) and switches to fast polling (5s intervals). +# This is useful for investigating specific sync stages — e.g. "healing" is +# where pivot-update failures tend to occur. +# +# Default: empty (no phases watched). Set via --watched-phases CLI flag: +# --watched-phases "healing" +# --watched-phases "healing,storage_insertion" +WATCHED_PHASES: set[str] = set() +LOG_LEVEL_NORMAL = "info,ethrex_p2p::sync=debug" +LOG_LEVEL_DEGRADED = "info,ethrex_p2p=trace" + + +class DiagnosticsTracker: + """Polls admin_peerScores and admin_syncStatus, keeps rolling buffer, dumps on degradation.""" + + def __init__(self, instances: list): + self.instances = instances + self.buffers: dict[str, list[dict]] = {inst.name: [] for inst in instances} + self.degraded: dict[str, bool] = {inst.name: False for inst in instances} + self.degraded_since: dict[str, float] = {inst.name: 0 for inst in instances} + self.healthy_since: dict[str, float] = {inst.name: 0 for inst in instances} + self.last_poll: dict[str, float] = {inst.name: 0 for inst in instances} + self.events: list[dict] = [] # degradation events across all networks + self.dumped_for_run: dict[str, bool] = {inst.name: False for inst in instances} + self._last_progress: dict[str, Optional[str]] = {inst.name: None for inst in instances} + self.last_reasons: dict[str, frozenset] = {inst.name: frozenset() for inst in instances} + + def poll_interval(self, name: str) -> float: + return DIAGNOSTICS_DEGRADED_INTERVAL if self.degraded[name] else DIAGNOSTICS_NORMAL_INTERVAL + + def buffer_limit(self, name: str) -> int: + return DIAGNOSTICS_DEGRADED_BUFFER_SIZE if self.degraded[name] else DIAGNOSTICS_NORMAL_BUFFER_SIZE + + def should_poll(self, name: str) -> bool: + return (time.time() - self.last_poll[name]) >= self.poll_interval(name) + + def poll(self, inst, force: bool = False) -> None: + """Poll diagnostics RPC endpoints for a single instance.""" + if not force and inst.status in ("success", "failed", "waiting"): + return + if not force and not self.should_poll(inst.name): + return + + self.last_poll[inst.name] = time.time() + peer_scores = rpc_call(inst.rpc_url, "admin_peerScores") + sync_status = rpc_call(inst.rpc_url, "admin_syncStatus") + + if peer_scores is None and sync_status is None: + return # node not reachable, skip + + snapshot = { + "timestamp": datetime.utcnow().isoformat() + "Z", + "epoch": time.time(), + "peer_scores": peer_scores, + "sync_status": sync_status, + } + + buf = self.buffers[inst.name] + buf.append(snapshot) + # Trim buffer to limit + limit = self.buffer_limit(inst.name) + while len(buf) > limit: + buf.pop(0) + + self._check_alert_conditions(inst, snapshot) + + def _check_alert_conditions(self, inst, snapshot: dict) -> None: + """Check for degradation conditions and trigger dump if needed.""" + now = time.time() + name = inst.name + reasons = [] + + # Check eligible peers + if snapshot.get("peer_scores") and isinstance(snapshot["peer_scores"], dict): + summary = snapshot["peer_scores"].get("summary", {}) + eligible = summary.get("eligible_peers", 999) + if eligible < DEGRADATION_ELIGIBLE_PEERS_THRESHOLD: + reasons.append(f"eligible_peers={eligible}") + + # Check sync progress stall + if snapshot.get("sync_status") and isinstance(snapshot["sync_status"], dict): + phase = snapshot["sync_status"].get("current_phase", "idle") + progress_key = str(snapshot["sync_status"].get("phase_progress", {})) + if phase not in ("idle", ""): + if self._last_progress[name] is not None and self._last_progress[name] == progress_key: + pass # stall detection not yet implemented + self._last_progress[name] = progress_key + + # Check staleness ratio + pivot_age = snapshot["sync_status"].get("pivot_age_seconds") + threshold = snapshot["sync_status"].get("staleness_threshold_seconds", 0) + if pivot_age and threshold and threshold > 0: + ratio = pivot_age / threshold + if ratio > DEGRADATION_STALENESS_RATIO: + reasons.append(f"staleness_ratio={ratio:.2f}") + + # Healing phase is high-risk for pivot failures — increase polling + if phase in WATCHED_PHASES: + reasons.append(f"watched_phase:{phase}") + + reasons_set = frozenset(reasons) + if reasons: + newly_degraded = not self.degraded[name] + reasons_changed = reasons_set != self.last_reasons.get(name, frozenset()) + if newly_degraded: + self.degraded[name] = True + self.degraded_since[name] = now + self.healthy_since[name] = 0 + # Distinguish intentional tracing (watched phase) from real issues + only_watched = all(r.startswith("watched_phase:") for r in reasons) + event = { + "timestamp": datetime.utcnow().isoformat() + "Z", + "network": name, + "event_type": "watched_phase_start" if only_watched else "degradation_start", + "reasons": reasons, + "eligible_peers": snapshot.get("peer_scores", {}).get("summary", {}).get("eligible_peers"), + "phase": snapshot.get("sync_status", {}).get("current_phase"), + } + self.events.append(event) + if only_watched: + print(f"👁️ [{name}] Watched phase active: {', '.join(reasons)} — increasing poll frequency") + else: + print(f"⚠️ [{name}] Degradation detected: {', '.join(reasons)} — increasing poll frequency") + # Bump log level to TRACE for detailed peer comms + if rpc_set_log_level(inst.rpc_url, LOG_LEVEL_DEGRADED): + print(f"🔍 [{name}] Log level bumped to TRACE for peer diagnostics") + else: + print(f"⚠️ [{name}] Failed to bump log level") + elif reasons_changed: + # Already degraded but reasons changed — record and log + event = { + "timestamp": datetime.utcnow().isoformat() + "Z", + "network": name, + "event_type": "reasons_changed", + "reasons": reasons, + "phase": snapshot.get("sync_status", {}).get("current_phase"), + } + self.events.append(event) + print(f"🔄 [{name}] Monitor reasons changed: {', '.join(reasons)}") + # Dump snapshots on degradation / watched phase + self._dump_snapshots(name) + else: + # Healthy — check if we can exit degraded mode + if self.degraded[name]: + if self.healthy_since[name] == 0: + self.healthy_since[name] = now + elif (now - self.healthy_since[name]) >= DEGRADATION_RECOVERY_TIMEOUT: + self.degraded[name] = False + self.healthy_since[name] = 0 + event = { + "timestamp": datetime.utcnow().isoformat() + "Z", + "network": name, + "event_type": "monitoring_normal", + } + self.events.append(event) + print(f"✅ [{name}] Monitoring back to normal — resuming default poll frequency") + # Restore log level to normal + if rpc_set_log_level(inst.rpc_url, LOG_LEVEL_NORMAL): + print(f"📝 [{name}] Log level restored to DEBUG") + else: + print(f"⚠️ [{name}] Failed to restore log level") + self.last_reasons[name] = reasons_set + + def on_failure(self, inst, name: str) -> None: + """Called when a network fails — do a final poll and dump snapshots.""" + # Do one last poll to capture the state at failure time + self.poll(inst, force=True) + # Bump log level to capture any post-failure details + rpc_set_log_level(inst.rpc_url, LOG_LEVEL_DEGRADED) + # Always dump on failure, even if previously dumped for degradation + self._dump_snapshots(name, force=True) + event = { + "timestamp": datetime.utcnow().isoformat() + "Z", + "network": name, + "event_type": "failure", + } + self.events.append(event) + + def set_run_id(self, run_id: str) -> None: + """Set the current run ID so snapshots go to the right directory.""" + self.run_id = run_id + # Ensure the directory exists now, not at the end of the run + run_dir = LOGS_DIR / f"run_{run_id}" + run_dir.mkdir(parents=True, exist_ok=True) + + def _dump_snapshots(self, name: str, force: bool = False) -> None: + """Dump the rolling buffer to disk.""" + if not force and self.dumped_for_run.get(name): + return + self.dumped_for_run[name] = True + buf = self.buffers[name] + if not buf: + return + if not hasattr(self, 'run_id') or not self.run_id: + return + run_dir = LOGS_DIR / f"run_{self.run_id}" + run_dir.mkdir(parents=True, exist_ok=True) + out_path = run_dir / f"{name}_peer_snapshots.json" + try: + import json + out_path.write_text(json.dumps(buf, indent=2, default=str)) + print(f"📸 [{name}] Dumped {len(buf)} diagnostic snapshots to {out_path}") + except Exception as e: + print(f"⚠️ [{name}] Failed to dump snapshots: {e}") + + def format_degradation_events(self) -> str: + """Format monitor events for the summary.txt.""" + if not self.events: + return "" + lines = ["\n Monitor Events:"] + for ev in self.events: + ts = ev["timestamp"] + net = ev.get("network", "?") + evt = ev.get("event_type", "?") + reasons = ev.get("reasons", []) + detail = f" ({', '.join(reasons)})" if reasons else "" + lines.append(f" {ts} [{net}] {evt}{detail}") + return "\n".join(lines) + + def reset(self) -> None: + """Reset state for a new run.""" + for name in self.buffers: + self.buffers[name] = [] + self.degraded[name] = False + self.degraded_since[name] = 0 + self.healthy_since[name] = 0 + self.last_poll[name] = 0 + self.dumped_for_run[name] = False + self._last_progress[name] = None + self.last_reasons[name] = frozenset() + self.events = [] + + @dataclass class Instance: name: str @@ -274,6 +517,15 @@ def rpc_call(url: str, method: str) -> Optional[Any]: return None +def rpc_set_log_level(url: str, level: str) -> bool: + """Set the node's log level via admin_setLogLevel RPC.""" + try: + resp = requests.post(url, json={"jsonrpc": "2.0", "method": "admin_setLogLevel", "params": [level], "id": 1}, timeout=5).json() + return resp.get("result") is not None and "error" not in resp + except Exception: + return False + + def parse_phase_timings(run_id: str, container: str) -> list[tuple[str, str, str]]: """Parse phase completion times from saved container logs. @@ -421,7 +673,7 @@ def save_all_logs(instances: list[Instance], run_id: str, compose_file: str): print(f"📁 Logs saved to {LOGS_DIR}/run_{run_id}/\n") -def log_run_result(run_id: str, run_count: int, instances: list[Instance], hostname: str, branch: str, commit: str, build_profile: str = ""): +def log_run_result(run_id: str, run_count: int, instances: list[Instance], hostname: str, branch: str, commit: str, build_profile: str = "", diagnostics_tracker: Optional['DiagnosticsTracker'] = None): """Append run result to the persistent log file.""" ensure_logs_dir() all_success = all(i.status == "success" for i in instances) @@ -477,6 +729,12 @@ def log_run_result(run_id: str, run_count: int, instances: list[Instance], hostn for name, count, duration in phases: lines.append(f" {name:<{max_name_len}} {duration} ({count})") + # Include degradation events if any + if diagnostics_tracker: + degradation_text = diagnostics_tracker.format_degradation_events() + if degradation_text: + lines.append(degradation_text) + lines.append("") # Append to log file with open(RUN_LOG_FILE, "a") as f: @@ -667,6 +925,7 @@ def update_instance(inst: Instance, timeout_min: int) -> bool: def main(): + global WATCHED_PHASES p = argparse.ArgumentParser(description="Monitor Docker snapsync instances") p.add_argument("--networks", default="hoodi,sepolia,mainnet") p.add_argument("--timeout", type=int, default=SYNC_TIMEOUT) @@ -685,8 +944,13 @@ def main(): help="Docker image tag to build") p.add_argument("--ethrex-dir", default=os.environ.get("ETHREX_DIR", "../.."), help="Path to ethrex repository root") + p.add_argument("--watched-phases", default=",".join(WATCHED_PHASES), + help="Comma-separated sync phases that trigger TRACE logging and fast polling (default: healing)") args = p.parse_args() + # Apply CLI override for watched phases + WATCHED_PHASES = {ph.strip() for ph in args.watched_phases.split(",") if ph.strip()} + # Resolve ethrex directory to absolute path ethrex_dir = os.path.abspath(args.ethrex_dir) @@ -699,7 +963,8 @@ def main(): containers = [f"ethrex-{n}" for n in names] instances = [Instance(n, p, c) for n, p, c in zip(names, ports, containers)] - + tracker = DiagnosticsTracker(instances) + # Detect state of already-running containers for inst in instances: if t := container_start_time(inst.container): @@ -731,6 +996,7 @@ def main(): # Get run count from existing logs (persists across restarts) run_count = get_next_run_count() run_id = generate_run_id() + tracker.set_run_id(run_id) print(f"📁 Logs will be saved to {LOGS_DIR.absolute()}") print(f"📝 Run history: {RUN_LOG_FILE.absolute()}") @@ -763,6 +1029,7 @@ def main(): # Reset instances since we restarted for inst in instances: reset_instance(inst) + tracker.reset() time.sleep(30) # Wait for containers to start print(f"{'='*60}\n") @@ -770,6 +1037,12 @@ def main(): last_print = 0 while True: changed = any(update_instance(i, args.timeout) for i in instances) + # Poll diagnostics endpoints + for inst in instances: + tracker.poll(inst) + # Trigger dump on failure + if inst.status == "failed" and changed: + tracker.on_failure(inst, inst.name) if changed or (time.time() - last_print) > STATUS_PRINT_INTERVAL: print_status(instances) last_print = time.time() @@ -779,7 +1052,7 @@ def main(): time.sleep(CHECK_INTERVAL) # Log the run result and save container logs BEFORE any restart save_all_logs(instances, run_id, args.compose_file) - log_run_result(run_id, run_count, instances, hostname, branch, commit, args.build_profile) + log_run_result(run_id, run_count, instances, hostname, branch, commit, args.build_profile, tracker) # Send a single Slack summary notification for the run if not args.no_slack: slack_notify(run_id, run_count, instances, hostname, branch, commit, args.build_profile) @@ -791,6 +1064,7 @@ def main(): # Prepare for another run run_count += 1 run_id = generate_run_id() # New run ID for the new cycle + tracker.set_run_id(run_id) # If auto-update is enabled, the loop will pull/build/restart # Otherwise, just restart containers now diff --git a/tooling/sync/peer_top.py b/tooling/sync/peer_top.py new file mode 100644 index 00000000000..48a1b49daa6 --- /dev/null +++ b/tooling/sync/peer_top.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python3 +"""Live peer table viewer — like top for ethrex peers.""" +import os, signal, sys, time +import requests as req + +ENDPOINT = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:18547" +INTERVAL = float(sys.argv[2]) if len(sys.argv) > 2 else 1.0 + +# ANSI colors +RED = "\033[31m" +GREEN = "\033[32m" +YELLOW = "\033[33m" +CYAN = "\033[36m" +DIM = "\033[2m" +BOLD = "\033[1m" +RESET = "\033[0m" + +# Track previous scores for delta coloring +prev_scores: dict[str, int] = {} + + +def fetch(method): + try: + r = req.post( + ENDPOINT, + json={"jsonrpc": "2.0", "method": method, "params": [], "id": 1}, + timeout=3, + ) + return r.json().get("result") + except Exception: + return None + + +start_time = time.time() + + +def color_score(peer_id: str, score: int) -> str: + """Color the score based on value and delta from previous tick.""" + prev = prev_scores.get(peer_id) + if score <= -30: + color = RED + elif score <= 0: + color = YELLOW + else: + color = GREEN + + if prev is not None and prev != score: + if score > prev: + # Score went up — bright green arrow + return f"{GREEN}{BOLD}{score:>4} \u2191{RESET}" + else: + # Score went down — bright red arrow + return f"{RED}{BOLD}{score:>4} \u2193{RESET}" + return f"{color}{score:>4} {RESET}" + + +def trim_client(client: str, width: int) -> str: + """Trim client/version string to width. When very tight, show just the client name.""" + if len(client) <= width: + return client + if width < 10: + # Space is tight — drop version info, keep the client name + name = client.split("/")[0] + return name[:width] + return client[: width - 1] + "\u2026" + + +def render(term_cols: int): + global prev_scores + lines = [] + elapsed = int(time.time() - start_time) + h, m, s = elapsed // 3600, (elapsed % 3600) // 60, elapsed % 60 + now_str = time.strftime("%H:%M:%S") + lines.append( + f"{BOLD}peer_top{RESET} {DIM}— {now_str} — up {h:02d}:{m:02d}:{s:02d} — {ENDPOINT}{RESET}" + ) + lines.append("") + sync = fetch("admin_syncStatus") + data = fetch("admin_peerScores") + + if sync: + phase = sync.get("current_phase") or "idle" + pivot = sync.get("pivot_block_number") or "?" + age = sync.get("pivot_age_seconds") + threshold = sync.get("staleness_threshold_seconds", 0) + progress = sync.get("phase_progress", {}) + age_str = f"{age}s" if age else "?" + + # Color staleness margin + if age and threshold: + margin_secs = threshold - age + if margin_secs < 0: + margin_color = RED + elif margin_secs < 300: + margin_color = YELLOW + else: + margin_color = GREEN + margin = f"{margin_color}({margin_secs}s to stale){RESET}" + else: + margin = "" + + lines.append( + f"{BOLD}Phase:{RESET} {CYAN}{phase}{RESET} " + f"{BOLD}Pivot:{RESET} {pivot} " + f"{BOLD}Age:{RESET} {age_str} {margin}" + ) + if progress: + parts = [f"{k}={v:,}" for k, v in progress.items()] + lines.append(f"{DIM}Progress: {', '.join(parts)}{RESET}") + + # Pivot update history + pivot_changes = sync.get("recent_pivot_changes", []) + if pivot_changes: + lines.append("") + lines.append(f"{BOLD}Pivot History:{RESET} (last {len(pivot_changes)})") + for pc in pivot_changes[-5:]: # show last 5 + ts = pc.get("timestamp", 0) + ts_str = time.strftime("%H:%M:%S", time.localtime(ts)) if ts else "?" + old_n = pc.get("old_pivot_number", "?") + new_n = pc.get("new_pivot_number", "?") + outcome = pc.get("outcome", "?") + reason = pc.get("failure_reason", "") + if outcome == "success": + icon = f"{GREEN}\u2713{RESET}" + else: + icon = f"{RED}\u2717{RESET}" + if reason: + reason = f" {RED}{reason}{RESET}" + lines.append( + f" {icon} {DIM}{ts_str}{RESET} {old_n} \u2192 {new_n} [{outcome}]{reason}" + ) + + # Recent errors + errors = sync.get("recent_errors", []) + if errors: + lines.append("") + lines.append(f"{BOLD}Recent Errors:{RESET} (last {len(errors)})") + for err in errors[-3:]: # show last 3 + ts = err.get("timestamp", 0) + ts_str = time.strftime("%H:%M:%S", time.localtime(ts)) if ts else "?" + msg = err.get("error_message", "?")[:60] + recov = f"{GREEN}recoverable{RESET}" if err.get("recoverable") else f"{RED}irrecoverable{RESET}" + lines.append(f" {DIM}{ts_str}{RESET} {msg} [{recov}]") + + lines.append("") + + if not data: + lines.append(f"{RED}Node not reachable{RESET}") + return lines + + s = data["summary"] + peers = data["peers"] + + # Color eligible count + elig_count = s["eligible_peers"] + if elig_count < 5: + elig_color = RED + elif elig_count < 20: + elig_color = YELLOW + else: + elig_color = GREEN + + lines.append( + f"{BOLD}Peers:{RESET} {s['total_peers']} " + f"{BOLD}Eligible:{RESET} {elig_color}{elig_count}{RESET} " + f"{BOLD}Avg Score:{RESET} {s['average_score']} " + f"{BOLD}Inflight:{RESET} {s['total_inflight_requests']}" + ) + lines.append("") + + # Column widths — fixed columns + dynamic Capabilities / Client + # Layout: PID Score Reqs Elig Caps Dir Client + W_PID, W_SCORE, W_REQS, W_ELIG, W_DIR = 14, 6, 5, 4, 4 + SEPARATORS = 6 # one space between each of the 7 columns + fixed = W_PID + W_SCORE + W_REQS + W_ELIG + W_DIR + SEPARATORS # = 39 + # Budget for Caps + Client. Leave 1 char right-margin. + budget = max(20, term_cols - fixed - 1) + W_CAPS = max(12, min(22, budget - 10)) # caps capped at 22, min 12 + W_CLIENT = max(8, budget - W_CAPS) + + lines.append( + f"{DIM}{'Peer ID':>{W_PID}} {'Score':>{W_SCORE}} {'Reqs':>{W_REQS}}" + f" {'Elig':>{W_ELIG}} {'Capabilities':<{W_CAPS}} {'Dir':>{W_DIR}}" + f" {'Client':<{W_CLIENT}}{RESET}" + ) + lines.append(f"{DIM}{'-' * (fixed + W_CAPS + W_CLIENT)}{RESET}") + + new_scores = {} + for p in sorted(peers, key=lambda x: x["score"], reverse=True): + pid_full = p["peer_id"] + pid = pid_full[:6] + ".." + pid_full[-4:] + score = p["score"] + new_scores[pid_full] = score + + score_str = color_score(pid_full, score) + + # Group capabilities by protocol + by_proto = {} + for c in p["capabilities"]: + parts = c.split("/") + proto = parts[0] + ver = parts[1] if len(parts) > 1 else "?" + by_proto.setdefault(proto, []).append(ver) + caps = " ".join(f"{k}/{','.join(vs)}" for k, vs in by_proto.items()) + if len(caps) > W_CAPS: + caps = caps[: W_CAPS - 1] + "\u2026" + client = trim_client(p["client_version"], W_CLIENT) + d = p["connection_direction"][:3] + + elig_char = "\u2713" if p["eligible"] else "\u2717" + elig_col = GREEN if p["eligible"] else RED + # Visible-width 1, right-aligned in W_ELIG column + elig_str = f"{' ' * (W_ELIG - 1)}{elig_col}{elig_char}{RESET}" + + reqs = p["inflight_requests"] + reqs_str = f"{YELLOW}{reqs:>{W_REQS}}{RESET}" if reqs > 0 else f"{reqs:>{W_REQS}}" + + lines.append( + f"{pid:>{W_PID}} {score_str} {reqs_str}" + f" {elig_str} {caps:<{W_CAPS}} {d:>{W_DIR}}" + f" {DIM}{client:<{W_CLIENT}}{RESET}" + ) + + prev_scores = new_scores + return lines + + +def cleanup(*_): + sys.stdout.write("\033[?1049l\033[?25h") + sys.stdout.flush() + sys.exit(0) + + +def main(): + signal.signal(signal.SIGINT, cleanup) + signal.signal(signal.SIGTERM, cleanup) + + sys.stdout.write("\033[?1049h\033[?25l\033[2J") + sys.stdout.flush() + + try: + prev_line_count = 0 + while True: + try: + size = os.get_terminal_size() + term_rows, term_cols = size.lines, size.columns + except OSError: + term_rows, term_cols = 40, 120 + lines = render(term_cols) + if len(lines) > term_rows - 2: + hidden = len(lines) - term_rows + 3 + lines = lines[: term_rows - 3] + lines.append( + f" {DIM}... {hidden} more peers (resize terminal to see all){RESET}" + ) + buf = "\033[H" + for line in lines: + buf += f"{line}\033[K\n" + for _ in range(max(0, prev_line_count - len(lines))): + buf += "\033[K\n" + sys.stdout.write(buf) + sys.stdout.flush() + prev_line_count = len(lines) + time.sleep(INTERVAL) + except Exception: + cleanup() + + +if __name__ == "__main__": + main()