From bbfbeb7727e46fc497a2670ff72ffa968f6c6029 Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Tue, 21 Apr 2026 12:55:47 +0200 Subject: [PATCH 1/7] Revert "revert(l1): revert "reintroduce proper Kademlia k-bucket routing table" (#6505)" This reverts commit c41226b1a50c026642ad31bc0a641a2b49196a64. --- cmd/ethrex/initializers.rs | 3 +- cmd/ethrex/l2/initializers.rs | 6 +- crates/blockchain/metrics/p2p.rs | 4 +- crates/networking/p2p/discv4/server.rs | 13 +- crates/networking/p2p/discv5/server.rs | 34 +- crates/networking/p2p/peer_table.rs | 669 +++++++++++++----- crates/networking/rpc/test_utils.rs | 2 +- .../common_dashboards/p2p_packets.json | 161 +++++ .../p2p/discovery/discv5_server_tests.rs | 6 +- 9 files changed, 710 insertions(+), 188 deletions(-) diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index 3c284e15538..87765b9aeaa 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -530,7 +530,8 @@ pub async fn init_l1( let local_node_record = get_local_node_record(&datadir, &local_p2p_node, &signer); - let peer_table = PeerTableServer::spawn(opts.target_peers, store.clone()); + let peer_table = + PeerTableServer::spawn(local_p2p_node.node_id(), opts.target_peers, store.clone()); // TODO: Check every module starts properly. let tracker = TaskTracker::new(); diff --git a/cmd/ethrex/l2/initializers.rs b/cmd/ethrex/l2/initializers.rs index 1cc4887eeda..3672b4b46c7 100644 --- a/cmd/ethrex/l2/initializers.rs +++ b/cmd/ethrex/l2/initializers.rs @@ -241,7 +241,11 @@ pub async fn init_l2( if !opts.sequencer_opts.based { blockchain.set_synced(); } - let peer_table = PeerTableServer::spawn(opts.node_opts.target_peers, store.clone()); + let peer_table = PeerTableServer::spawn( + local_p2p_node.node_id(), + opts.node_opts.target_peers, + store.clone(), + ); let p2p_context = P2PContext::new( local_p2p_node.clone(), network_config, diff --git a/crates/blockchain/metrics/p2p.rs b/crates/blockchain/metrics/p2p.rs index 14244ad7d3a..4ab303038b3 100644 --- a/crates/blockchain/metrics/p2p.rs +++ b/crates/blockchain/metrics/p2p.rs @@ -98,7 +98,7 @@ impl MetricsP2P { kademlia_insert_contact_duration: Histogram::with_opts( HistogramOpts::new( "ethrex_kademlia_insert_contact_duration_seconds", - "Duration of peer table contact insertion operations", + "Duration of Kademlia insert_contact operations", ) .buckets(vec![ 0.000_001, 0.000_005, 0.000_01, 0.000_05, 0.000_1, 0.000_5, 0.001, 0.01, @@ -108,7 +108,7 @@ impl MetricsP2P { kademlia_iter_contacts_duration: Histogram::with_opts( HistogramOpts::new( "ethrex_kademlia_iter_contacts_duration_seconds", - "Duration of peer table full-scan operations", + "Duration of Kademlia iter_contacts full-scan operations", ) .buckets(vec![ 0.000_01, 0.000_05, 0.000_1, 0.000_5, 0.001, 0.005, 0.01, 0.05, 0.1, diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index 3241b4e5e36..46e5adf487d 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -139,11 +139,7 @@ impl DiscoveryServer { "Adding bootnodes" ); - peer_table.new_contacts( - bootnodes.clone(), - local_node.node_id(), - DiscoveryProtocol::Discv4, - )?; + peer_table.new_contacts(bootnodes.clone(), DiscoveryProtocol::Discv4)?; for bootnode in &bootnodes { discovery_server.send_ping(bootnode).await?; @@ -608,11 +604,8 @@ impl DiscoveryServer { } let nodes = neighbors_message.nodes; - self.peer_table.new_contacts( - nodes, - self.local_node.node_id(), - DiscoveryProtocol::Discv4, - )?; + self.peer_table + .new_contacts(nodes, DiscoveryProtocol::Discv4)?; Ok(()) } diff --git a/crates/networking/p2p/discv5/server.rs b/crates/networking/p2p/discv5/server.rs index cac54a89420..d024ade4b53 100644 --- a/crates/networking/p2p/discv5/server.rs +++ b/crates/networking/p2p/discv5/server.rs @@ -201,7 +201,7 @@ impl DiscoveryServer { count = bootnodes.len(), "Adding bootnodes" ); - peer_table.new_contacts(bootnodes, local_node.node_id(), DiscoveryProtocol::Discv5)?; + peer_table.new_contacts(bootnodes, DiscoveryProtocol::Discv5)?; Ok(discovery_server.start()) } @@ -529,8 +529,7 @@ impl DiscoveryServer { // Add the peer to the peer table if let Some(record) = &authdata.record { - self.peer_table - .new_contact_records(vec![record.clone()], self.local_node.node_id())?; + self.peer_table.new_contact_records(vec![record.clone()])?; } // Derive session keys (we are the recipient, node B) @@ -725,10 +724,7 @@ impl DiscoveryServer { // Per spec, distance 0 means the node itself — include the local ENR explicitly. let mut nodes = self .peer_table - .get_nodes_at_distances( - self.local_node.node_id(), - find_node_message.distances.clone(), - ) + .get_nodes_at_distances(find_node_message.distances.clone()) .await?; if find_node_message.distances.contains(&0) { nodes.push(self.local_node_record.clone()); @@ -776,8 +772,7 @@ impl DiscoveryServer { nodes_message: NodesMessage, ) -> Result<(), DiscoveryServerError> { // TODO(#3746): check that we requested neighbors from the node - self.peer_table - .new_contact_records(nodes_message.nodes, self.local_node.node_id())?; + self.peer_table.new_contact_records(nodes_message.nodes)?; Ok(()) } @@ -1338,12 +1333,14 @@ mod tests { ).expect("Bad enode url"); let signer = SecretKey::new(&mut rand::rngs::OsRng); let local_node_record = NodeRecord::from_node(&local_node, 1, &signer).unwrap(); + let local_node_id = local_node.node_id(); let mut server = DiscoveryServer { local_node, local_node_record, signer, udp_socket: Arc::new(UdpSocket::bind("127.0.0.1:30303").await.unwrap()), peer_table: PeerTableServer::spawn( + local_node_id, 10, Store::new("", EngineType::InMemory).expect("Failed to create store"), ), @@ -1379,12 +1376,14 @@ mod tests { let signer = SecretKey::new(&mut rand::rngs::OsRng); let local_node_record = NodeRecord::from_node(&local_node, 1, &signer).unwrap(); // Use port 0 to let the OS assign an available port + let local_node_id = local_node.node_id(); let mut server = DiscoveryServer { local_node, local_node_record, signer, udp_socket: Arc::new(UdpSocket::bind("127.0.0.1:0").await.unwrap()), peer_table: PeerTableServer::spawn( + local_node_id, 10, Store::new("", EngineType::InMemory).expect("Failed to create store"), ), @@ -1463,14 +1462,13 @@ mod tests { let remote_node_id = remote_node.node_id(); let peer_table = PeerTableServer::spawn( + local_node.node_id(), 10, Store::new("", EngineType::InMemory).expect("Failed to create store"), ); // Add the remote node as a contact with its ENR record - peer_table - .new_contact_records(vec![remote_record], local_node.node_id()) - .unwrap(); + peer_table.new_contact_records(vec![remote_record]).unwrap(); // Set up a session for the remote node (required for send_ordinary) let session = Session { @@ -1567,12 +1565,14 @@ mod tests { let local_node_record = NodeRecord::from_node(&local_node, 1, &signer).unwrap(); let original_seq = local_node_record.seq; + let local_node_id = local_node.node_id(); let mut server = DiscoveryServer { local_node, local_node_record, signer, udp_socket: Arc::new(UdpSocket::bind("127.0.0.1:0").await.unwrap()), peer_table: PeerTableServer::spawn( + local_node_id, 10, Store::new("", EngineType::InMemory).expect("Failed to create store"), ), @@ -1624,12 +1624,14 @@ mod tests { let signer = SecretKey::new(&mut rand::rngs::OsRng); let local_node_record = NodeRecord::from_node(&local_node, 1, &signer).unwrap(); + let local_node_id = local_node.node_id(); let mut server = DiscoveryServer { local_node, local_node_record, signer, udp_socket: Arc::new(UdpSocket::bind("127.0.0.1:0").await.unwrap()), peer_table: PeerTableServer::spawn( + local_node_id, 10, Store::new("", EngineType::InMemory).expect("Failed to create store"), ), @@ -1673,12 +1675,14 @@ mod tests { let local_node_record = NodeRecord::from_node(&local_node, 1, &signer).unwrap(); let original_seq = local_node_record.seq; + let local_node_id = local_node.node_id(); let mut server = DiscoveryServer { local_node, local_node_record, signer, udp_socket: Arc::new(UdpSocket::bind("127.0.0.1:0").await.unwrap()), peer_table: PeerTableServer::spawn( + local_node_id, 10, Store::new("", EngineType::InMemory).expect("Failed to create store"), ), @@ -1727,12 +1731,14 @@ mod tests { let signer = SecretKey::new(&mut rand::rngs::OsRng); let local_node_record = NodeRecord::from_node(&local_node, 1, &signer).unwrap(); + let local_node_id = local_node.node_id(); let mut server = DiscoveryServer { local_node, local_node_record, signer, udp_socket: Arc::new(UdpSocket::bind("127.0.0.1:0").await.unwrap()), peer_table: PeerTableServer::spawn( + local_node_id, 10, Store::new("", EngineType::InMemory).expect("Failed to create store"), ), @@ -1785,12 +1791,14 @@ mod tests { let signer = SecretKey::new(&mut rand::rngs::OsRng); let local_node_record = NodeRecord::from_node(&local_node, 1, &signer).unwrap(); + let local_node_id = local_node.node_id(); let mut server = DiscoveryServer { local_node, local_node_record, signer, udp_socket: Arc::new(UdpSocket::bind("127.0.0.1:0").await.unwrap()), peer_table: PeerTableServer::spawn( + local_node_id, 10, Store::new("", EngineType::InMemory).expect("Failed to create store"), ), @@ -1836,12 +1844,14 @@ mod tests { let signer = SecretKey::new(&mut rand::rngs::OsRng); let local_node_record = NodeRecord::from_node(&local_node, 1, &signer).unwrap(); + let local_node_id = local_node.node_id(); let mut server = DiscoveryServer { local_node, local_node_record, signer, udp_socket: Arc::new(UdpSocket::bind("127.0.0.1:0").await.unwrap()), peer_table: PeerTableServer::spawn( + local_node_id, 10, Store::new("", EngineType::InMemory).expect("Failed to create store"), ), diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 256e6ce9f73..1d1281489bc 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -17,9 +17,9 @@ use crate::{ utils::distance, }; use bytes::Bytes; -use ethrex_common::H256; +use ethrex_common::{H256, U256}; use ethrex_storage::Store; -use indexmap::{IndexMap, map::Entry}; +use indexmap::IndexMap; use rand::seq::{IteratorRandom, SliceRandom}; use rustc_hash::{FxHashMap, FxHashSet}; use spawned_concurrency::{ @@ -47,13 +47,115 @@ const REQUESTS_WEIGHT: i64 = 1; const MAX_CONCURRENT_REQUESTS_PER_PEER: i64 = 100; /// The target number of RLPx connections to reach. pub const TARGET_PEERS: usize = 100; -/// The target number of contacts to maintain in peer_table. -const TARGET_CONTACTS: usize = 100_000; /// Maximum number of ENRs to return in a FindNode response (discv4 compatible). pub(crate) const MAX_NODES_IN_NEIGHBORS_PACKET: usize = 16; /// Maximum number of ENRs to return in a discv5 FindNode response. const MAX_ENRS_PER_FINDNODE_RESPONSE: usize = 16; +/// Number of k-buckets in the Kademlia routing table (one per bit of the 256-bit node ID). +const NUMBER_OF_BUCKETS: usize = 256; +/// Maximum number of contacts per k-bucket (Kademlia k parameter). +pub const MAX_NODES_PER_BUCKET: usize = 16; +/// Maximum number of replacement entries per k-bucket. +const MAX_REPLACEMENTS_PER_BUCKET: usize = 10; + +/// A single k-bucket in the Kademlia routing table. +/// Each bucket stores contacts at a specific XOR distance range from the local node. +#[derive(Debug, Clone, Default)] +pub struct KBucket { + pub(crate) contacts: Vec<(H256, Contact)>, + pub(crate) replacements: Vec<(H256, Contact)>, +} + +impl KBucket { + /// Find a contact by node ID in the main list. + fn get(&self, node_id: &H256) -> Option<&Contact> { + self.contacts + .iter() + .find(|(id, _)| id == node_id) + .map(|(_, c)| c) + } + + /// Find a contact by node ID in either the main or replacement list. + fn get_any(&self, node_id: &H256) -> Option<&Contact> { + self.get(node_id).or_else(|| { + self.replacements + .iter() + .find(|(id, _)| id == node_id) + .map(|(_, c)| c) + }) + } + + /// Find a mutable reference to a contact by node ID within this bucket. + fn get_mut(&mut self, node_id: &H256) -> Option<&mut Contact> { + self.contacts + .iter_mut() + .find(|(id, _)| id == node_id) + .map(|(_, c)| c) + } + + /// Check if a contact exists in this bucket (main or replacement list). + fn contains(&self, node_id: &H256) -> bool { + self.contacts.iter().any(|(id, _)| id == node_id) + || self.replacements.iter().any(|(id, _)| id == node_id) + } + + /// Insert a contact into the bucket. Returns true if inserted into main list. + /// If the bucket is full, the contact is added to the replacement list instead. + fn insert(&mut self, node_id: H256, contact: Contact) -> bool { + if self.contacts.len() < MAX_NODES_PER_BUCKET { + self.contacts.push((node_id, contact)); + true + } else { + self.insert_replacement(node_id, contact); + false + } + } + + /// Add a contact to the replacement list, evicting the oldest if full. + fn insert_replacement(&mut self, node_id: H256, contact: Contact) { + if self.replacements.len() >= MAX_REPLACEMENTS_PER_BUCKET { + self.replacements.remove(0); + } + self.replacements.push((node_id, contact)); + } + + /// Remove a contact from the main list and promote a replacement if available. + /// Returns the promoted replacement's node ID, if any. + fn remove_and_promote(&mut self, node_id: &H256) -> Option { + let idx = self.contacts.iter().position(|(id, _)| id == node_id)?; + self.contacts.remove(idx); + if !self.replacements.is_empty() { + let (replacement_id, replacement) = self.replacements.remove(0); + self.contacts.push((replacement_id, replacement)); + Some(replacement_id) + } else { + None + } + } +} + +/// Computes the bucket index for a node relative to the local node. +/// Uses XOR distance: bucket = floor(log2(XOR(local, remote))), i.e. the +/// position of the highest set bit minus 1. +/// Returns None for the local node itself (XOR = 0). +fn bucket_index(local_node_id: &H256, node_id: &H256) -> Option { + let xor = *local_node_id ^ *node_id; + let dist = U256::from_big_endian(xor.as_bytes()); + if dist.is_zero() { + None + } else { + Some(dist.bits() - 1) + } +} + +/// Computes the raw XOR distance between two node IDs. +/// Used for comparing relative closeness: a is closer to target than b +/// iff xor_distance(target, a) < xor_distance(target, b). +fn xor_distance(a: &H256, b: &H256) -> H256 { + *a ^ *b +} + /// Identifies which discovery protocol was used to find a contact. /// This allows protocol-specific lookups to only query compatible contacts. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -237,17 +339,9 @@ pub enum ContactValidation { #[protocol] pub trait PeerTableServerProtocol: Send + Sync { // Send (cast) methods - fn new_contacts( - &self, - nodes: Vec, - local_node_id: H256, - protocol: DiscoveryProtocol, - ) -> Result<(), ActorError>; - fn new_contact_records( - &self, - node_records: Vec, - local_node_id: H256, - ) -> Result<(), ActorError>; + fn new_contacts(&self, nodes: Vec, protocol: DiscoveryProtocol) + -> Result<(), ActorError>; + fn new_contact_records(&self, node_records: Vec) -> Result<(), ActorError>; fn new_connected_peer( &self, node: Node, @@ -314,11 +408,7 @@ pub trait PeerTableServerProtocol: Send + Sync { fn insert_if_new(&self, node: Node, protocol: DiscoveryProtocol) -> Response; fn validate_contact(&self, node_id: H256, sender_ip: IpAddr) -> Response; fn get_closest_nodes(&self, node_id: H256) -> Response>; - fn get_nodes_at_distances( - &self, - local_node_id: H256, - distances: Vec, - ) -> Response>; + fn get_nodes_at_distances(&self, distances: Vec) -> Response>; fn get_peers_data(&self) -> Response>; fn get_random_peer( &self, @@ -330,7 +420,8 @@ pub trait PeerTableServerProtocol: Send + Sync { #[derive(Debug)] pub struct PeerTableServer { - contacts: IndexMap, + local_node_id: H256, + buckets: Vec, peers: IndexMap, already_tried_peers: FxHashSet, discarded_contacts: FxHashSet, @@ -343,13 +434,14 @@ pub struct PeerTableServer { #[actor(protocol = PeerTableServerProtocol)] impl PeerTableServer { - pub fn spawn(target_peers: usize, store: Store) -> PeerTable { - PeerTableServer::new(target_peers, store).start() + pub fn spawn(local_node_id: H256, target_peers: usize, store: Store) -> PeerTable { + PeerTableServer::new(local_node_id, target_peers, store).start() } - pub(crate) fn new(target_peers: usize, store: Store) -> Self { + pub(crate) fn new(local_node_id: H256, target_peers: usize, store: Store) -> Self { Self { - contacts: Default::default(), + local_node_id, + buckets: vec![KBucket::default(); NUMBER_OF_BUCKETS], peers: Default::default(), already_tried_peers: Default::default(), discarded_contacts: Default::default(), @@ -376,8 +468,7 @@ impl PeerTableServer { msg: peer_table_server_protocol::NewContacts, _ctx: &Context, ) { - self.do_new_contacts(msg.nodes, msg.local_node_id, msg.protocol) - .await; + self.do_new_contacts(msg.nodes, msg.protocol).await; } #[send_handler] @@ -386,8 +477,7 @@ impl PeerTableServer { msg: peer_table_server_protocol::NewContactRecords, _ctx: &Context, ) { - self.do_new_contact_records(msg.node_records, msg.local_node_id) - .await; + self.do_new_contact_records(msg.node_records).await; } #[send_handler] @@ -410,7 +500,7 @@ impl PeerTableServer { // Store in the standalone sessions map (always succeeds, no contact required). self.sessions.insert(msg.node_id, msg.session.clone()); // Also update the contact's cached session if the contact exists. - if let Some(contact) = self.contacts.get_mut(&msg.node_id) { + if let Some(contact) = self.get_contact_mut(&msg.node_id) { contact.session = Some(msg.session); } } @@ -452,9 +542,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::SetUnwanted, _ctx: &Context, ) { - self.contacts - .entry(msg.node_id) - .and_modify(|contact| contact.unwanted = true); + if let Some(contact) = self.get_contact_mut(&msg.node_id) { + contact.unwanted = true; + } } #[send_handler] @@ -463,9 +553,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::SetIsForkIdValid, _ctx: &Context, ) { - self.contacts - .entry(msg.node_id) - .and_modify(|contact| contact.is_fork_id_valid = Some(msg.valid)); + if let Some(contact) = self.get_contact_mut(&msg.node_id) { + contact.is_fork_id_valid = Some(msg.valid); + } } #[send_handler] @@ -512,9 +602,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::RecordPingSent, _ctx: &Context, ) { - self.contacts - .entry(msg.node_id) - .and_modify(|contact| contact.record_ping_sent(msg.ping_id)); + if let Some(contact) = self.get_contact_mut(&msg.node_id) { + contact.record_ping_sent(msg.ping_id); + } } #[send_handler] @@ -523,16 +613,15 @@ impl PeerTableServer { msg: peer_table_server_protocol::RecordPongReceived, _ctx: &Context, ) { - self.contacts.entry(msg.node_id).and_modify(|contact| { - if contact + if let Some(contact) = self.get_contact_mut(&msg.node_id) + && contact .ping_id .as_ref() .map(|value| *value == msg.ping_id) .unwrap_or(false) - { - contact.ping_id = None - } - }); + { + contact.ping_id = None; + } } #[send_handler] @@ -541,9 +630,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::RecordEnrRequestSent, _ctx: &Context, ) { - self.contacts - .entry(msg.node_id) - .and_modify(|contact| contact.record_enr_request_sent(msg.request_hash)); + if let Some(contact) = self.get_contact_mut(&msg.node_id) { + contact.record_enr_request_sent(msg.request_hash); + } } #[send_handler] @@ -552,9 +641,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::RecordEnrResponseReceived, _ctx: &Context, ) { - self.contacts.entry(msg.node_id).and_modify(|contact| { + if let Some(contact) = self.get_contact_mut(&msg.node_id) { contact.record_enr_response_received(msg.request_hash, msg.record); - }); + } } #[send_handler] @@ -563,9 +652,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::SetDisposable, _ctx: &Context, ) { - self.contacts - .entry(msg.node_id) - .and_modify(|contact| contact.disposable = true); + if let Some(contact) = self.get_contact_mut(&msg.node_id) { + contact.disposable = true; + } } #[send_handler] @@ -574,9 +663,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::IncrementFindNodeSent, _ctx: &Context, ) { - self.contacts - .entry(msg.node_id) - .and_modify(|contact| contact.n_find_node_sent += 1); + if let Some(contact) = self.get_contact_mut(&msg.node_id) { + contact.n_find_node_sent += 1; + } } #[send_handler] @@ -585,9 +674,9 @@ impl PeerTableServer { msg: peer_table_server_protocol::MarkKnowsUs, _ctx: &Context, ) { - self.contacts - .entry(msg.node_id) - .and_modify(|c| c.knows_us = true); + if let Some(contact) = self.get_contact_mut(&msg.node_id) { + contact.knows_us = true; + } } #[send_handler] @@ -634,7 +723,7 @@ impl PeerTableServer { _msg: peer_table_server_protocol::TargetReached, _ctx: &Context, ) -> bool { - self.contacts.len() >= TARGET_CONTACTS && self.peers.len() >= self.target_peers + self.peers.len() >= self.target_peers } #[request_handler] @@ -688,7 +777,7 @@ impl PeerTableServer { msg: peer_table_server_protocol::GetContact, _ctx: &Context, ) -> Option> { - self.contacts.get(&msg.node_id).cloned().map(Box::new) + self.get_contact(&msg.node_id).cloned().map(Box::new) } #[request_handler] @@ -777,13 +866,16 @@ impl PeerTableServer { msg: peer_table_server_protocol::InsertIfNew, _ctx: &Context, ) -> bool { - match self.contacts.entry(msg.node.node_id()) { - Entry::Occupied(_) => false, - Entry::Vacant(entry) => { - METRICS.record_new_discovery().await; - entry.insert(Contact::new(msg.node, msg.protocol)); - true - } + let node_id = msg.node.node_id(); + if self.contact_exists(&node_id) { + return false; + } + let contact = Contact::new(msg.node, msg.protocol); + if self.insert_contact(node_id, contact) { + METRICS.record_new_discovery().await; + true + } else { + false } } @@ -811,7 +903,7 @@ impl PeerTableServer { msg: peer_table_server_protocol::GetNodesAtDistances, _ctx: &Context, ) -> Vec { - self.do_get_nodes_at_distances(msg.local_node_id, &msg.distances) + self.do_get_nodes_at_distances(&msg.distances) } #[request_handler] @@ -842,7 +934,7 @@ impl PeerTableServer { self.sessions .get(&msg.node_id) .cloned() - .or_else(|| self.contacts.get(&msg.node_id)?.session.clone()) + .or_else(|| self.get_contact(&msg.node_id)?.session.clone()) } #[request_handler] @@ -877,13 +969,88 @@ impl PeerTableServer { // === Private helper methods === - // Weighting function used to select best peer + // --- K-bucket accessors --- + + /// Get the bucket index for a node ID, or None if it's the local node. + fn bucket_for(&self, node_id: &H256) -> Option { + bucket_index(&self.local_node_id, node_id) + } + + /// Look up a contact by node ID (O(K) within the bucket). + fn get_contact(&self, node_id: &H256) -> Option<&Contact> { + let idx = self.bucket_for(node_id)?; + self.buckets[idx].get(node_id) + } + + /// Look up a mutable reference to a contact by node ID. + fn get_contact_mut(&mut self, node_id: &H256) -> Option<&mut Contact> { + let idx = self.bucket_for(node_id)?; + self.buckets[idx].get_mut(node_id) + } + + /// Check if a contact exists in any bucket (main or replacement list). + fn contact_exists(&self, node_id: &H256) -> bool { + let Some(idx) = self.bucket_for(node_id) else { + return false; + }; + self.buckets[idx].contains(node_id) + } + + /// Insert a contact into the appropriate k-bucket. Returns true if inserted + /// into the main list, false if the node went to the replacement list or is + /// the local node. + fn insert_contact(&mut self, node_id: H256, contact: Contact) -> bool { + #[cfg(feature = "metrics")] + let start = std::time::Instant::now(); + + let Some(idx) = self.bucket_for(&node_id) else { + return false; + }; + let result = self.buckets[idx].insert(node_id, contact); + + #[cfg(feature = "metrics")] + { + use ethrex_metrics::p2p::METRICS_P2P; + METRICS_P2P.observe_insert_contact_duration(start.elapsed().as_secs_f64()); + } + + result + } + + /// Look up a contact by node ID in either the main or replacement list. + fn get_contact_or_replacement(&self, node_id: &H256) -> Option<&Contact> { + let idx = self.bucket_for(node_id)?; + self.buckets[idx].get_any(node_id) + } + + /// Look up a mutable reference in either the main or replacement list. + fn get_contact_or_replacement_mut(&mut self, node_id: &H256) -> Option<&mut Contact> { + let idx = self.bucket_for(node_id)?; + let bucket = &mut self.buckets[idx]; + // Search main list first, then replacement list. + // Done inline to avoid borrow-checker issues with or_else closures. + if let Some(pos) = bucket.contacts.iter().position(|(id, _)| id == node_id) { + return Some(&mut bucket.contacts[pos].1); + } + if let Some(pos) = bucket.replacements.iter().position(|(id, _)| id == node_id) { + return Some(&mut bucket.replacements[pos].1); + } + None + } + + /// Iterate over all contacts across all buckets. + fn iter_contacts(&self) -> impl Iterator { + self.buckets + .iter() + .flat_map(|bucket| bucket.contacts.iter().map(|(id, c)| (id, c))) + } + + // --- Peer selection --- + fn weight_peer(&self, score: &i64, requests: &i64) -> i64 { score * SCORE_WEIGHT - requests * REQUESTS_WEIGHT } - // Returns if the peer has room for more connections given the current score - // and amount of inflight requests fn can_try_more_requests(&self, score: &i64, requests: &i64) -> bool { let score_ratio = (score - MIN_SCORE) as f64 / (MAX_SCORE - MIN_SCORE) as f64; let max_requests = (MAX_CONCURRENT_REQUESTS_PER_PEER as f64 * score_ratio).max(1.0); @@ -920,30 +1087,34 @@ impl PeerTableServer { .map(|(k, _, _, v)| (k, v)) } + // --- Contact operations --- + fn prune(&mut self) { - let disposable_contacts = self - .contacts - .iter() - .filter_map(|(c_id, c)| c.disposable.then_some(*c_id)) - .collect::>(); + let disposable_contacts: Vec = self + .iter_contacts() + .filter_map(|(id, c)| c.disposable.then_some(*id)) + .collect(); - for contact_to_discard_id in disposable_contacts { - self.contacts.swap_remove(&contact_to_discard_id); - self.discarded_contacts.insert(contact_to_discard_id); + for node_id in disposable_contacts { + if let Some(idx) = self.bucket_for(&node_id) { + self.buckets[idx].remove_and_promote(&node_id); + self.discarded_contacts.insert(node_id); + } } } fn do_get_contact_to_initiate(&mut self) -> Option { - for contact in self.contacts.values() { - let node_id = contact.node.node_id(); - if !self.peers.contains_key(&node_id) - && !self.already_tried_peers.contains(&node_id) - && contact.knows_us - && !contact.unwanted - && contact.is_fork_id_valid != Some(false) - { - self.already_tried_peers.insert(node_id); - return Some(contact.clone()); + for bucket in &self.buckets { + for (node_id, contact) in &bucket.contacts { + if !self.peers.contains_key(node_id) + && !self.already_tried_peers.contains(node_id) + && contact.knows_us + && !contact.unwanted + && contact.is_fork_id_valid != Some(false) + { + self.already_tried_peers.insert(*node_id); + return Some(contact.clone()); + } } } tracing::trace!("Resetting list of tried peers."); @@ -952,13 +1123,13 @@ impl PeerTableServer { } fn do_get_contact_for_lookup(&self, protocol: DiscoveryProtocol) -> Option { - self.contacts - .values() - .filter(|c| { + self.iter_contacts() + .filter(|(_, c)| { c.supports_protocol(protocol) && c.n_find_node_sent < MAX_FIND_NODE_PER_PEER && !c.disposable }) + .map(|(_, c)| c) .collect::>() .choose(&mut rand::rngs::OsRng) .cloned() @@ -967,15 +1138,15 @@ impl PeerTableServer { /// Get contact for ENR lookup (discv4 only) fn do_get_contact_for_enr_lookup(&mut self) -> Option { - self.contacts - .values() - .filter(|c| { + self.iter_contacts() + .filter(|(_, c)| { c.is_discv4 && c.was_validated() && !c.has_pending_enr_request() && c.record.is_none() && !c.disposable }) + .map(|(_, c)| c) .collect::>() .choose(&mut rand::rngs::OsRng) .cloned() @@ -987,19 +1158,19 @@ impl PeerTableServer { revalidation_interval: Duration, protocol: DiscoveryProtocol, ) -> Option> { - self.contacts - .values() - .filter(|c| { + self.iter_contacts() + .filter(|(_, c)| { c.supports_protocol(protocol) && Self::is_validation_needed(c, revalidation_interval) }) + .map(|(_, c)| c) .choose(&mut rand::rngs::OsRng) .cloned() .map(Box::new) } fn do_validate_contact(&self, node_id: H256, sender_ip: IpAddr) -> ContactValidation { - let Some(contact) = self.contacts.get(&node_id) else { + let Some(contact) = self.get_contact(&node_id) else { return ContactValidation::UnknownContact; }; if !contact.was_validated() { @@ -1014,24 +1185,22 @@ impl PeerTableServer { ContactValidation::Valid(Box::new(contact.clone())) } - /// Get closest nodes for discv4 (returns Vec) + /// Get closest nodes using raw XOR distance for accurate ordering. fn do_get_closest_nodes(&self, node_id: H256) -> Vec { #[cfg(feature = "metrics")] let scan_start = std::time::Instant::now(); - let mut nodes: Vec<(Node, usize)> = vec![]; + let mut nodes: Vec<(Node, H256)> = vec![]; - for (contact_id, contact) in &self.contacts { - let dist = Self::distance(&node_id, contact_id); + for (contact_id, contact) in self.iter_contacts() { + let dist = xor_distance(&node_id, contact_id); if nodes.len() < MAX_NODES_IN_NEIGHBORS_PACKET { nodes.push((contact.node.clone(), dist)); - } else { - for (i, (_, d)) in &mut nodes.iter().enumerate() { - if dist < *d { - nodes[i] = (contact.node.clone(), dist); - break; - } - } + } else if let Some((farthest_idx, _)) = + nodes.iter().enumerate().max_by_key(|(_, (_, d))| *d) + && dist < nodes[farthest_idx].1 + { + nodes[farthest_idx] = (contact.node.clone(), dist); } } @@ -1048,11 +1217,10 @@ impl PeerTableServer { /// Uses the discv5 spec log-distance: `floor(log2(XOR))` for non-zero XOR. /// Distance 0 is reserved for the local node itself (handled by the caller), /// so contacts start at distance >= 1. - fn do_get_nodes_at_distances(&self, local_node_id: H256, distances: &[u32]) -> Vec { - self.contacts - .iter() + fn do_get_nodes_at_distances(&self, distances: &[u32]) -> Vec { + self.iter_contacts() .filter_map(|(contact_id, contact)| { - let dist = distance(&local_node_id, contact_id) as u32; + let dist = distance(&self.local_node_id, contact_id) as u32; if distances.contains(&dist) { contact.record.clone() } else { @@ -1063,74 +1231,63 @@ impl PeerTableServer { .collect() } - async fn do_new_contacts( - &mut self, - nodes: Vec, - local_node_id: H256, - protocol: DiscoveryProtocol, - ) { + async fn do_new_contacts(&mut self, nodes: Vec, protocol: DiscoveryProtocol) { for node in nodes { let node_id = node.node_id(); - if self.discarded_contacts.contains(&node_id) || node_id == local_node_id { + if self.discarded_contacts.contains(&node_id) || node_id == self.local_node_id { continue; } #[cfg(feature = "metrics")] let insert_start = std::time::Instant::now(); - let is_new = match self.contacts.entry(node_id) { - Entry::Vacant(vacant_entry) => { - vacant_entry.insert(Contact::new(node, protocol)); - true + if self.contact_exists(&node_id) { + // Contact already exists (main or replacement list), update protocol + if let Some(contact) = self.get_contact_or_replacement_mut(&node_id) { + contact.add_protocol(protocol); } - Entry::Occupied(mut occupied_entry) => { - // Contact already exists, just add the protocol - occupied_entry.get_mut().add_protocol(protocol); - false + } else { + let contact = Contact::new(node, protocol); + if self.insert_contact(node_id, contact) { + METRICS.record_new_discovery().await; } - }; + } #[cfg(feature = "metrics")] { use ethrex_metrics::p2p::METRICS_P2P; METRICS_P2P.observe_insert_contact_duration(insert_start.elapsed().as_secs_f64()); } - - if is_new { - METRICS.record_new_discovery().await; - } } } - async fn do_new_contact_records(&mut self, node_records: Vec, local_node_id: H256) { + async fn do_new_contact_records(&mut self, node_records: Vec) { for node_record in node_records { if !node_record.verify_signature() { continue; } if let Ok(node) = Node::from_enr(&node_record) { let node_id = node.node_id(); - if self.discarded_contacts.contains(&node_id) || node_id == local_node_id { + if self.discarded_contacts.contains(&node_id) || node_id == self.local_node_id { continue; } - match self.contacts.entry(node_id) { - Entry::Vacant(vacant_entry) => { - let is_fork_id_valid = - Self::evaluate_fork_id(&node_record, &self.store).await; - let mut contact = Contact::new(node, DiscoveryProtocol::Discv5); - contact.is_fork_id_valid = is_fork_id_valid; - contact.record = Some(node_record); - vacant_entry.insert(contact); - METRICS.record_new_discovery().await; - } - Entry::Occupied(mut occupied_entry) => { - let should_update = match occupied_entry.get().record.as_ref() { + if self.contact_exists(&node_id) { + // Check if we need to evaluate fork_id before taking + // the mutable borrow. + let should_update = self + .get_contact_or_replacement(&node_id) + .map(|c| match c.record.as_ref() { None => true, Some(r) => node_record.seq > r.seq, - }; - let contact = occupied_entry.get_mut(); + }) + .unwrap_or(false); + let is_fork_id_valid = if should_update { + Self::evaluate_fork_id(&node_record, &self.store).await + } else { + None + }; + if let Some(contact) = self.get_contact_or_replacement_mut(&node_id) { contact.add_protocol(DiscoveryProtocol::Discv5); if should_update { - let is_fork_id_valid = - Self::evaluate_fork_id(&node_record, &self.store).await; if contact.node.ip != node.ip || contact.node.udp_port != node.udp_port { contact.validation_timestamp = None; @@ -1141,6 +1298,14 @@ impl PeerTableServer { contact.is_fork_id_valid = is_fork_id_valid; } } + } else { + let is_fork_id_valid = Self::evaluate_fork_id(&node_record, &self.store).await; + let mut contact = Contact::new(node, DiscoveryProtocol::Discv5); + contact.is_fork_id_valid = is_fork_id_valid; + contact.record = Some(node_record); + if self.insert_contact(node_id, contact) { + METRICS.record_new_discovery().await; + } } } } @@ -1209,10 +1374,6 @@ impl PeerTableServer { peers.choose(&mut rand::rngs::OsRng).cloned() } - fn distance(node_id_1: &H256, node_id_2: &H256) -> usize { - distance(node_id_1, node_id_2) - } - fn is_validation_needed(contact: &Contact, revalidation_interval: Duration) -> bool { if contact.disposable { return false; @@ -1238,3 +1399,195 @@ impl PeerTableServer { } pub type PeerTable = ActorRef; + +#[cfg(test)] +mod tests { + use super::*; + use ethrex_common::H512; + use std::net::Ipv4Addr; + + /// Helper: build a dummy contact with a unique node derived from `seed`. + fn dummy_contact(seed: u8) -> (H256, Contact) { + let pk = H512::from_low_u64_be(seed as u64 + 1); + let node = Node::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, seed)), 30303, 30303, pk); + let node_id = node.node_id(); + let contact = Contact::new(node, DiscoveryProtocol::Discv4); + (node_id, contact) + } + + // --- KBucket::insert --- + + #[test] + fn insert_into_empty_bucket() { + let mut bucket = KBucket::default(); + let (id, contact) = dummy_contact(1); + assert!(bucket.insert(id, contact)); + assert_eq!(bucket.contacts.len(), 1); + assert!(bucket.replacements.is_empty()); + } + + #[test] + fn insert_fills_bucket_then_goes_to_replacements() { + let mut bucket = KBucket::default(); + + // Fill the main list to capacity. + for i in 0..MAX_NODES_PER_BUCKET as u8 { + let (id, contact) = dummy_contact(i); + assert!(bucket.insert(id, contact), "contact {i} should go to main"); + } + assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET); + + // The next insert should go to the replacement list. + let (id, contact) = dummy_contact(200); + assert!(!bucket.insert(id, contact)); + assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET); + assert_eq!(bucket.replacements.len(), 1); + } + + // --- KBucket::contains --- + + #[test] + fn contains_checks_main_and_replacement() { + let mut bucket = KBucket::default(); + + let (id_main, contact_main) = dummy_contact(1); + bucket.insert(id_main, contact_main); + assert!(bucket.contains(&id_main)); + + // Fill bucket so next goes to replacement. + for i in 2..=(MAX_NODES_PER_BUCKET as u8) { + let (id, c) = dummy_contact(i); + bucket.insert(id, c); + } + let (id_repl, contact_repl) = dummy_contact(100); + bucket.insert(id_repl, contact_repl); + + assert!(bucket.contains(&id_repl)); + assert!(!bucket.contains(&H256::zero())); + } + + // --- KBucket::get / get_any --- + + #[test] + fn get_returns_main_list_only() { + let mut bucket = KBucket::default(); + let (id, contact) = dummy_contact(1); + bucket.insert(id, contact); + assert!(bucket.get(&id).is_some()); + assert!(bucket.get(&H256::zero()).is_none()); + } + + #[test] + fn get_any_returns_from_replacement() { + let mut bucket = KBucket::default(); + // Fill main list. + for i in 0..MAX_NODES_PER_BUCKET as u8 { + let (id, c) = dummy_contact(i); + bucket.insert(id, c); + } + // Insert into replacements. + let (id_repl, c_repl) = dummy_contact(200); + bucket.insert(id_repl, c_repl); + + assert!(bucket.get(&id_repl).is_none()); // not in main + assert!(bucket.get_any(&id_repl).is_some()); // found via replacement + } + + // --- KBucket::remove_and_promote --- + + #[test] + fn remove_and_promote_with_replacement() { + let mut bucket = KBucket::default(); + + // Fill main list. + let mut main_ids = Vec::new(); + for i in 0..MAX_NODES_PER_BUCKET as u8 { + let (id, c) = dummy_contact(i); + main_ids.push(id); + bucket.insert(id, c); + } + + // Add a replacement. + let (repl_id, repl_contact) = dummy_contact(200); + bucket.insert(repl_id, repl_contact); + + // Remove a main contact — the replacement should be promoted. + let promoted = bucket.remove_and_promote(&main_ids[0]); + assert_eq!(promoted, Some(repl_id)); + assert_eq!(bucket.contacts.len(), MAX_NODES_PER_BUCKET); + assert!(bucket.replacements.is_empty()); + assert!(!bucket.contains(&main_ids[0])); + assert!(bucket.contains(&repl_id)); + } + + #[test] + fn remove_and_promote_without_replacement() { + let mut bucket = KBucket::default(); + let (id, c) = dummy_contact(1); + bucket.insert(id, c); + + let promoted = bucket.remove_and_promote(&id); + assert!(promoted.is_none()); + assert!(bucket.contacts.is_empty()); + } + + #[test] + fn remove_nonexistent_returns_none() { + let mut bucket = KBucket::default(); + assert!(bucket.remove_and_promote(&H256::zero()).is_none()); + } + + // --- Replacement eviction --- + + #[test] + fn replacement_list_evicts_oldest_when_full() { + let mut bucket = KBucket::default(); + // Fill main list. + for i in 0..MAX_NODES_PER_BUCKET as u8 { + let (id, c) = dummy_contact(i); + bucket.insert(id, c); + } + + // Fill replacement list beyond capacity. + let mut repl_ids = Vec::new(); + for i in 0..(MAX_REPLACEMENTS_PER_BUCKET + 2) as u8 { + let seed = 100 + i; + let (id, c) = dummy_contact(seed); + repl_ids.push(id); + bucket.insert(id, c); + } + + assert_eq!(bucket.replacements.len(), MAX_REPLACEMENTS_PER_BUCKET); + // The oldest two should have been evicted. + assert!(!bucket.contains(&repl_ids[0])); + assert!(!bucket.contains(&repl_ids[1])); + // The most recent ones should still be there. + assert!(bucket.contains(repl_ids.last().unwrap())); + } + + // --- bucket_index --- + + #[test] + fn bucket_index_self_is_none() { + let id = H256::random(); + assert_eq!(bucket_index(&id, &id), None); + } + + #[test] + fn bucket_index_minimal_distance() { + let local = H256::zero(); + // XOR distance = 1 → highest bit is bit 0 → bucket 0 + let mut remote = H256::zero(); + remote.0[31] = 1; + assert_eq!(bucket_index(&local, &remote), Some(0)); + } + + #[test] + fn bucket_index_maximal_distance() { + let local = H256::zero(); + // XOR distance has highest bit at position 255 → bucket 255 + let mut remote = H256::zero(); + remote.0[0] = 0x80; + assert_eq!(bucket_index(&local, &remote), Some(255)); + } +} diff --git a/crates/networking/rpc/test_utils.rs b/crates/networking/rpc/test_utils.rs index 5c08b670d00..a656bc406be 100644 --- a/crates/networking/rpc/test_utils.rs +++ b/crates/networking/rpc/test_utils.rs @@ -316,7 +316,7 @@ pub async fn dummy_sync_manager() -> SyncManager { /// Creates a dummy PeerHandler for tests where interacting with peers is not needed /// This should only be used in tests as it won't be able to interact with the node's connected peers pub async fn dummy_peer_handler(store: Store) -> PeerHandler { - let peer_table = PeerTableServer::spawn(TARGET_PEERS, store); + let peer_table = PeerTableServer::spawn(H256::random(), TARGET_PEERS, store); PeerHandler::new(peer_table.clone(), dummy_actor(peer_table).await) } diff --git a/metrics/provisioning/grafana/dashboards/common_dashboards/p2p_packets.json b/metrics/provisioning/grafana/dashboards/common_dashboards/p2p_packets.json index d7592afa210..8d6db6bdd26 100644 --- a/metrics/provisioning/grafana/dashboards/common_dashboards/p2p_packets.json +++ b/metrics/provisioning/grafana/dashboards/common_dashboards/p2p_packets.json @@ -619,6 +619,167 @@ } ] } + , + { + "title": "Kademlia Table", + "type": "row", + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 63 }, + "id": 103, + "collapsed": false + }, + { + "title": "insert_contact Duration (p50 / p99 / max)", + "description": "Duration of Kademlia insert_contact operations", + "type": "timeseries", + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "gridPos": { "h": 10, "w": 12, "x": 0, "y": 64 }, + "id": 30, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisLabel": "", + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "unit": "s" + }, + "overrides": [] + }, + "options": { + "legend": { "calcs": ["mean", "max", "lastNotNull"], "displayMode": "table", "placement": "bottom" }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "targets": [ + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "editorMode": "code", + "expr": "histogram_quantile(0.50, rate(ethrex_kademlia_insert_contact_duration_seconds_bucket{instance=~\"$instance(:\\\\d+)?$\"}[$__rate_interval]))", + "legendFormat": "p50", + "refId": "A" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "editorMode": "code", + "expr": "histogram_quantile(0.99, rate(ethrex_kademlia_insert_contact_duration_seconds_bucket{instance=~\"$instance(:\\\\d+)?$\"}[$__rate_interval]))", + "legendFormat": "p99", + "refId": "B" + } + ] + }, + { + "title": "iter_contacts Full-Scan Duration (p50 / p99 / max)", + "description": "Duration of full Kademlia table scans (e.g. get_closest_nodes)", + "type": "timeseries", + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "gridPos": { "h": 10, "w": 12, "x": 12, "y": 64 }, + "id": 31, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisLabel": "", + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "unit": "s" + }, + "overrides": [] + }, + "options": { + "legend": { "calcs": ["mean", "max", "lastNotNull"], "displayMode": "table", "placement": "bottom" }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "targets": [ + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "editorMode": "code", + "expr": "histogram_quantile(0.50, rate(ethrex_kademlia_iter_contacts_duration_seconds_bucket{instance=~\"$instance(:\\\\d+)?$\"}[$__rate_interval]))", + "legendFormat": "p50", + "refId": "A" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "editorMode": "code", + "expr": "histogram_quantile(0.99, rate(ethrex_kademlia_iter_contacts_duration_seconds_bucket{instance=~\"$instance(:\\\\d+)?$\"}[$__rate_interval]))", + "legendFormat": "p99", + "refId": "B" + } + ] + }, + { + "title": "Kademlia Operations Rate", + "description": "Rate of insert_contact and iter_contacts operations per second", + "type": "timeseries", + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "gridPos": { "h": 10, "w": 24, "x": 0, "y": 74 }, + "id": 32, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisLabel": "ops/s", + "drawStyle": "line", + "fillOpacity": 20, + "gradientMode": "scheme", + "lineInterpolation": "smooth", + "lineWidth": 2, + "pointSize": 5, + "showPoints": "never", + "spanNulls": false, + "stacking": { "group": "A", "mode": "none" }, + "thresholdsStyle": { "mode": "off" } + }, + "unit": "ops" + }, + "overrides": [ + { "matcher": { "id": "byName", "options": "insert_contact" }, "properties": [{ "id": "color", "value": { "fixedColor": "green", "mode": "fixed" } }] }, + { "matcher": { "id": "byName", "options": "iter_contacts" }, "properties": [{ "id": "color", "value": { "fixedColor": "blue", "mode": "fixed" } }] } + ] + }, + "options": { + "legend": { "calcs": ["mean", "max", "lastNotNull"], "displayMode": "table", "placement": "bottom" }, + "tooltip": { "mode": "multi", "sort": "desc" } + }, + "targets": [ + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "editorMode": "code", + "expr": "rate(ethrex_kademlia_insert_contact_duration_seconds_count{instance=~\"$instance(:\\\\d+)?$\"}[$__rate_interval])", + "legendFormat": "insert_contact", + "refId": "A" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "editorMode": "code", + "expr": "rate(ethrex_kademlia_iter_contacts_duration_seconds_count{instance=~\"$instance(:\\\\d+)?$\"}[$__rate_interval])", + "legendFormat": "iter_contacts", + "refId": "B" + } + ] + } ], "schemaVersion": 39, "tags": ["ethrex", "p2p"], diff --git a/test/tests/p2p/discovery/discv5_server_tests.rs b/test/tests/p2p/discovery/discv5_server_tests.rs index 8d81a3d70c7..07eae9e0634 100644 --- a/test/tests/p2p/discovery/discv5_server_tests.rs +++ b/test/tests/p2p/discovery/discv5_server_tests.rs @@ -22,6 +22,7 @@ async fn test_server(peer_table: Option) -> DiscoveryServer { let local_node_record = NodeRecord::from_node(&local_node, 1, &signer).unwrap(); let peer_table = peer_table.unwrap_or_else(|| { PeerTableServer::spawn( + local_node.node_id(), 10, Store::new("", EngineType::InMemory).expect("Failed to create store"), ) @@ -155,13 +156,12 @@ async fn test_enr_update_request_on_pong() { let remote_node_id = remote_node.node_id(); let peer_table = PeerTableServer::spawn( + local_node.node_id(), 10, Store::new("", EngineType::InMemory).expect("Failed to create store"), ); - peer_table - .new_contact_records(vec![remote_record], local_node.node_id()) - .unwrap(); + peer_table.new_contact_records(vec![remote_record]).unwrap(); let session = Session { outbound_key: [0u8; 16], From 000747cb797b9b576b7b5a2bce68b798fba5359e Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Thu, 16 Apr 2026 19:48:45 +0200 Subject: [PATCH 2/7] fix(p2p): Mark unresponsive peers as disposable to prevent snapsync stalls Fixes snapsync failures where peer count stays constant and sync eventually fails with "Failed to receive block headers" after hours of operation. Root cause: After PR #6458 introduced Kademlia k-buckets, peers that became unresponsive during sync weren't marked as disposable, so they remained in the routing table indefinitely. New peers went into replacement lists but were never promoted because dead peers weren't pruned. Changes: - Enhanced prune() to remove disposable contacts from both main and replacement lists, with automatic promotion of replacements - Mark peers as disposable when they timeout during RLPx operations (block headers, block bodies, sync head requests) - Added periodic pruning in the snap_sync main loop to ensure dead peers are regularly removed and replaced Evidence from CI artifacts showed peer count stuck at 6 throughout 3h35m sync before failure. This fix enables peer rotation so healthy peers from replacement lists can take over when active peers become unresponsive. --- crates/networking/p2p/peer_handler.rs | 7 ++++- crates/networking/p2p/peer_table.rs | 36 ++++++++++++++++++++----- crates/networking/p2p/sync/snap_sync.rs | 3 +++ 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index c8f920e7ea9..6be5d6e08f3 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -90,6 +90,8 @@ async fn ask_peer_head_number( } Ok(_other_msgs) => Err(PeerHandlerError::UnexpectedResponseFromPeer(peer_id)), Err(PeerConnectionError::Timeout) => { + // Mark this peer as disposable so it gets pruned and replaced + let _ = peer_table.set_disposable(peer_id); Err(PeerHandlerError::ReceiveMessageFromPeerTimeout(peer_id)) } Err(_other_err) => Err(PeerHandlerError::ReceiveMessageFromPeer(peer_id)), @@ -464,13 +466,15 @@ impl PeerHandler { warn!( "[SYNCING] Received empty/invalid headers from peer, penalizing peer {peer_id}" ); + let _ = self.peer_table.set_disposable(peer_id); return Ok(None); } } - // Timeouted + // Timeout or invalid response - mark peer as disposable warn!( "[SYNCING] Didn't receive block headers from peer, penalizing peer {peer_id}..." ); + let _ = self.peer_table.set_disposable(peer_id); Ok(None) } } @@ -552,6 +556,7 @@ impl PeerHandler { "[SYNCING] Didn't receive block bodies from peer, penalizing peer {peer_id}..." ); self.peer_table.record_failure(peer_id)?; + let _ = self.peer_table.set_disposable(peer_id); Ok(None) } } diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 1d1281489bc..097fbd98b85 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -1089,15 +1089,37 @@ impl PeerTableServer { // --- Contact operations --- + /// Prune disposable contacts from both main and replacement lists. + /// When a main contact is removed, a replacement is automatically promoted. fn prune(&mut self) { - let disposable_contacts: Vec = self - .iter_contacts() - .filter_map(|(id, c)| c.disposable.then_some(*id)) - .collect(); + for bucket in &mut self.buckets { + // Collect disposable contacts from main list + let main_disposable: Vec = bucket + .contacts + .iter() + .filter(|(_, c)| c.disposable) + .map(|(id, _)| *id) + .collect(); + + // Remove from main list and promote replacements + for node_id in main_disposable { + bucket.remove_and_promote(&node_id); + self.discarded_contacts.insert(node_id); + } - for node_id in disposable_contacts { - if let Some(idx) = self.bucket_for(&node_id) { - self.buckets[idx].remove_and_promote(&node_id); + // Remove disposable contacts from replacement list + // (these don't get promoted, just removed) + let replacement_disposable: Vec = bucket + .replacements + .iter() + .filter(|(_, c)| c.disposable) + .map(|(id, _)| *id) + .collect(); + + bucket + .replacements + .retain(|(id, _)| !replacement_disposable.contains(id)); + for node_id in replacement_disposable { self.discarded_contacts.insert(node_id); } } diff --git a/crates/networking/p2p/sync/snap_sync.rs b/crates/networking/p2p/sync/snap_sync.rs index 45d43990a81..8e267632273 100644 --- a/crates/networking/p2p/sync/snap_sync.rs +++ b/crates/networking/p2p/sync/snap_sync.rs @@ -136,6 +136,9 @@ pub async fn sync_cycle_snap( let mut attempts = 0; loop { + // Prune dead/unresponsive peers periodically to allow replacements to be promoted + let _ = peers.peer_table.prune_table(); + debug!("Requesting Block Headers from {current_head}"); let Some(mut block_headers) = peers From 44a257ef4b960f88f32fb7a8d477ff520a594575 Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Fri, 17 Apr 2026 14:28:42 +0200 Subject: [PATCH 3/7] fix(l1): include replacement contacts in peer discovery and iteration The Kademlia k-bucket implementation only iterated over main bucket contacts, ignoring replacement entries. This caused peer starvation because dead contacts in the main list were never replaced by fresher peers from the replacement list. Fix iter_contacts() and do_get_contact_to_initiate() to also check replacement contacts, allowing the node to discover and connect to peers that were previously invisible to the peer selection logic. --- crates/networking/p2p/peer_table.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 097fbd98b85..03cfe7f0079 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -1038,11 +1038,15 @@ impl PeerTableServer { None } - /// Iterate over all contacts across all buckets. + /// Iterate over all contacts across all buckets (main and replacement lists). fn iter_contacts(&self) -> impl Iterator { - self.buckets - .iter() - .flat_map(|bucket| bucket.contacts.iter().map(|(id, c)| (id, c))) + self.buckets.iter().flat_map(|bucket| { + bucket + .contacts + .iter() + .chain(bucket.replacements.iter()) + .map(|(id, c)| (id, c)) + }) } // --- Peer selection --- @@ -1126,8 +1130,10 @@ impl PeerTableServer { } fn do_get_contact_to_initiate(&mut self) -> Option { + // Check both main contacts and replacements in each bucket. + // Replacements may contain fresher peers that haven't been tried yet. for bucket in &self.buckets { - for (node_id, contact) in &bucket.contacts { + for (node_id, contact) in bucket.contacts.iter().chain(bucket.replacements.iter()) { if !self.peers.contains_key(node_id) && !self.already_tried_peers.contains(node_id) && contact.knows_us From 8484e9a461941f295967037f21a2c31de90c60e1 Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Fri, 17 Apr 2026 14:36:32 +0200 Subject: [PATCH 4/7] fix(l1): make contact lookup and mutation cover replacement lists KBucket::get_mut and get_contact only searched the main contact list, so any state mutation (set_disposable, ping tracking, find_node count, mark_knows_us) silently failed for contacts in the replacement list. Since iter_contacts and do_get_contact_to_initiate now return replacement contacts, this caused phantom contacts that were visible to selection but invisible to updates. Update get_contact to use get_any (main + replacements) and get_mut to search both lists, ensuring all contact state mutations work regardless of which list holds the contact. --- crates/networking/p2p/peer_table.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 03cfe7f0079..8f931c3660c 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -86,9 +86,12 @@ impl KBucket { }) } - /// Find a mutable reference to a contact by node ID within this bucket. + /// Find a mutable reference to a contact by node ID (main or replacement list). fn get_mut(&mut self, node_id: &H256) -> Option<&mut Contact> { - self.contacts + if let Some((_, c)) = self.contacts.iter_mut().find(|(id, _)| id == node_id) { + return Some(c); + } + self.replacements .iter_mut() .find(|(id, _)| id == node_id) .map(|(_, c)| c) @@ -976,10 +979,10 @@ impl PeerTableServer { bucket_index(&self.local_node_id, node_id) } - /// Look up a contact by node ID (O(K) within the bucket). + /// Look up a contact by node ID in main or replacement list (O(K) within the bucket). fn get_contact(&self, node_id: &H256) -> Option<&Contact> { let idx = self.bucket_for(node_id)?; - self.buckets[idx].get(node_id) + self.buckets[idx].get_any(node_id) } /// Look up a mutable reference to a contact by node ID. From d455042c1e17949cd9023098a26888709d5df06a Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Mon, 20 Apr 2026 13:41:54 +0200 Subject: [PATCH 5/7] perf(p2p): add flat connection pool decoupled from Kademlia routing table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a separate IndexMap connection pool (capacity 50K) for RLPx connection initiation, decoupled from the k-bucket routing table (which is limited to 256 × 16 = 4,096 contacts by Kademlia design). All discovered contacts are inserted into both the k-buckets (for Kademlia protocol operations like FindNode/GetClosestNodes) and the connection pool (for peer connection initiation). This restores the large candidate pool that existed before the k-bucket migration while preserving correct Kademlia routing semantics. The connection pool is: - Populated on every contact discovery (discv4, discv5, insert_if_new) - Cleaned during prune() when contacts are marked disposable - Capped at 50K entries with oldest-first eviction - Used with random selection and k-bucket state filtering --- crates/networking/p2p/peer_table.rs | 89 +++++++++++++++++++++++------ 1 file changed, 72 insertions(+), 17 deletions(-) diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 8f931c3660c..58c594f7f20 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -58,6 +58,11 @@ const NUMBER_OF_BUCKETS: usize = 256; pub const MAX_NODES_PER_BUCKET: usize = 16; /// Maximum number of replacement entries per k-bucket. const MAX_REPLACEMENTS_PER_BUCKET: usize = 10; +/// Maximum number of entries in the flat connection candidate pool. +/// This pool is separate from the k-bucket routing table and retains +/// more contacts for RLPx connection initiation than the k-bucket +/// structure allows (256 × 16 = 4,096 vs this larger capacity). +const MAX_CONNECTION_POOL_SIZE: usize = 50_000; /// A single k-bucket in the Kademlia routing table. /// Each bucket stores contacts at a specific XOR distance range from the local node. @@ -433,6 +438,12 @@ pub struct PeerTableServer { /// Standalone session store, independent of contacts. /// Allows sessions to be stored even before the contact's ENR is known/parseable. sessions: FxHashMap, + /// Flat pool of discovered contacts for RLPx connection initiation. + /// Decoupled from the k-bucket routing table so that connection initiation + /// has access to a much larger candidate pool than the k-bucket structure + /// allows (k-buckets: 256 × 16 = 4,096 max; this pool: up to 50,000). + /// K-buckets are still used for all Kademlia protocol operations. + connection_pool: IndexMap, } #[actor(protocol = PeerTableServerProtocol)] @@ -451,6 +462,7 @@ impl PeerTableServer { target_peers, store, sessions: Default::default(), + connection_pool: IndexMap::with_capacity(MAX_CONNECTION_POOL_SIZE), } } @@ -870,6 +882,8 @@ impl PeerTableServer { _ctx: &Context, ) -> bool { let node_id = msg.node.node_id(); + // Always add to the connection pool + self.insert_to_connection_pool(node_id, msg.node.clone()); if self.contact_exists(&node_id) { return false; } @@ -1020,6 +1034,18 @@ impl PeerTableServer { result } + /// Insert a node into the flat connection pool for RLPx initiation. + /// Evicts the oldest entry when the pool is at capacity. + fn insert_to_connection_pool(&mut self, node_id: H256, node: Node) { + if self.connection_pool.contains_key(&node_id) { + return; + } + if self.connection_pool.len() >= MAX_CONNECTION_POOL_SIZE { + self.connection_pool.shift_remove_index(0); + } + self.connection_pool.insert(node_id, node); + } + /// Look up a contact by node ID in either the main or replacement list. fn get_contact_or_replacement(&self, node_id: &H256) -> Option<&Contact> { let idx = self.bucket_for(node_id)?; @@ -1098,6 +1124,7 @@ impl PeerTableServer { /// Prune disposable contacts from both main and replacement lists. /// When a main contact is removed, a replacement is automatically promoted. + /// Also removes discarded contacts from the connection pool. fn prune(&mut self) { for bucket in &mut self.buckets { // Collect disposable contacts from main list @@ -1130,27 +1157,48 @@ impl PeerTableServer { self.discarded_contacts.insert(node_id); } } + + // Remove discarded contacts from the connection pool + self.connection_pool + .retain(|id, _| !self.discarded_contacts.contains(id)); } fn do_get_contact_to_initiate(&mut self) -> Option { - // Check both main contacts and replacements in each bucket. - // Replacements may contain fresher peers that haven't been tried yet. - for bucket in &self.buckets { - for (node_id, contact) in bucket.contacts.iter().chain(bucket.replacements.iter()) { - if !self.peers.contains_key(node_id) - && !self.already_tried_peers.contains(node_id) - && contact.knows_us - && !contact.unwanted - && contact.is_fork_id_valid != Some(false) - { - self.already_tried_peers.insert(*node_id); - return Some(contact.clone()); - } - } + // Draw from the flat connection pool (up to 50K contacts) rather than + // the k-bucket routing table (capped at ~4K). This gives the initiator + // access to a much larger and more diverse set of candidates. + // K-bucket state is checked for filtering when available; contacts not + // in k-buckets are assumed eligible (the RLPx handshake will reject + // incompatible peers). + let eligible: Vec<(H256, Node)> = self + .connection_pool + .iter() + .filter(|(node_id, _)| { + !self.peers.contains_key(*node_id) + && !self.already_tried_peers.contains(*node_id) + && !self.discarded_contacts.contains(*node_id) + && self + .get_contact_or_replacement(node_id) + .map(|c| c.knows_us && !c.unwanted && c.is_fork_id_valid != Some(false)) + .unwrap_or(true) + }) + .map(|(id, node)| (*id, node.clone())) + .collect(); + + if let Some((node_id, node)) = eligible.choose(&mut rand::rngs::OsRng).cloned() { + self.already_tried_peers.insert(node_id); + // Return a Contact from k-buckets if available (full state), + // otherwise construct a minimal one from the pool entry. + let contact = self + .get_contact_or_replacement(&node_id) + .cloned() + .unwrap_or_else(|| Contact::new(node, DiscoveryProtocol::Discv4)); + Some(contact) + } else { + tracing::trace!("Resetting list of tried peers."); + self.already_tried_peers.clear(); + None } - tracing::trace!("Resetting list of tried peers."); - self.already_tried_peers.clear(); - None } fn do_get_contact_for_lookup(&self, protocol: DiscoveryProtocol) -> Option { @@ -1271,6 +1319,9 @@ impl PeerTableServer { #[cfg(feature = "metrics")] let insert_start = std::time::Instant::now(); + // Always add to the connection pool (regardless of k-bucket capacity) + self.insert_to_connection_pool(node_id, node.clone()); + if self.contact_exists(&node_id) { // Contact already exists (main or replacement list), update protocol if let Some(contact) = self.get_contact_or_replacement_mut(&node_id) { @@ -1301,6 +1352,10 @@ impl PeerTableServer { if self.discarded_contacts.contains(&node_id) || node_id == self.local_node_id { continue; } + + // Always add to the connection pool (regardless of k-bucket capacity) + self.insert_to_connection_pool(node_id, node.clone()); + if self.contact_exists(&node_id) { // Check if we need to evaluate fork_id before taking // the mutable borrow. From 0449f6ea6ca660921c6fb222d9393eb31e3eced6 Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Mon, 20 Apr 2026 13:52:45 +0200 Subject: [PATCH 6/7] chore(p2p): reduce connection pool cap from 50K to 10K Matches the candidate pool size used by Reth and Nethermind. --- crates/networking/p2p/peer_table.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 58c594f7f20..975e7cc1ad4 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -62,7 +62,8 @@ const MAX_REPLACEMENTS_PER_BUCKET: usize = 10; /// This pool is separate from the k-bucket routing table and retains /// more contacts for RLPx connection initiation than the k-bucket /// structure allows (256 × 16 = 4,096 vs this larger capacity). -const MAX_CONNECTION_POOL_SIZE: usize = 50_000; +/// 10K matches what Reth and Nethermind use for their candidate pools. +const MAX_CONNECTION_POOL_SIZE: usize = 10_000; /// A single k-bucket in the Kademlia routing table. /// Each bucket stores contacts at a specific XOR distance range from the local node. From 144d1642829d59197daef057a3622b4eef7c3b21 Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Tue, 21 Apr 2026 12:37:09 +0200 Subject: [PATCH 7/7] perf(p2p): O(1) random index probe and remove permanent blacklist MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace O(n) collect-then-choose in do_get_contact_to_initiate with O(k) random index probing on the IndexMap (rand % len, scan forward). The old approach scanned all 10K pool entries, cloned eligible ones into a Vec, then randomly picked — blocking the peer_table actor and starving snap sync's get_best_peer calls. - Replace collect-then-choose in do_get_contact_for_lookup with IteratorRandom::choose (single-pass reservoir sampling, zero alloc). - Remove discarded_contacts permanent blacklist entirely. Contacts pruned from k-buckets now remain in the connection pool so they can be retried — the RLPx handshake rejects truly incompatible peers. Previously, a single timeout permanently blacklisted a contact from both the pool and re-discovery. --- crates/networking/p2p/peer_table.rs | 90 ++++++++++++----------------- 1 file changed, 37 insertions(+), 53 deletions(-) diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 975e7cc1ad4..6cc9c4de2ad 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -433,7 +433,6 @@ pub struct PeerTableServer { buckets: Vec, peers: IndexMap, already_tried_peers: FxHashSet, - discarded_contacts: FxHashSet, target_peers: usize, store: Store, /// Standalone session store, independent of contacts. @@ -459,7 +458,6 @@ impl PeerTableServer { buckets: vec![KBucket::default(); NUMBER_OF_BUCKETS], peers: Default::default(), already_tried_peers: Default::default(), - discarded_contacts: Default::default(), target_peers, store, sessions: Default::default(), @@ -1125,7 +1123,8 @@ impl PeerTableServer { /// Prune disposable contacts from both main and replacement lists. /// When a main contact is removed, a replacement is automatically promoted. - /// Also removes discarded contacts from the connection pool. + /// Pruned contacts remain in the connection pool so they can be retried + /// later — the RLPx handshake will reject them if they're truly bad. fn prune(&mut self) { for bucket in &mut self.buckets { // Collect disposable contacts from main list @@ -1139,67 +1138,54 @@ impl PeerTableServer { // Remove from main list and promote replacements for node_id in main_disposable { bucket.remove_and_promote(&node_id); - self.discarded_contacts.insert(node_id); } // Remove disposable contacts from replacement list // (these don't get promoted, just removed) - let replacement_disposable: Vec = bucket - .replacements - .iter() - .filter(|(_, c)| c.disposable) - .map(|(id, _)| *id) - .collect(); - - bucket - .replacements - .retain(|(id, _)| !replacement_disposable.contains(id)); - for node_id in replacement_disposable { - self.discarded_contacts.insert(node_id); - } + bucket.replacements.retain(|(_, c)| !c.disposable); } - - // Remove discarded contacts from the connection pool - self.connection_pool - .retain(|id, _| !self.discarded_contacts.contains(id)); } fn do_get_contact_to_initiate(&mut self) -> Option { - // Draw from the flat connection pool (up to 50K contacts) rather than - // the k-bucket routing table (capped at ~4K). This gives the initiator - // access to a much larger and more diverse set of candidates. - // K-bucket state is checked for filtering when available; contacts not - // in k-buckets are assumed eligible (the RLPx handshake will reject - // incompatible peers). - let eligible: Vec<(H256, Node)> = self - .connection_pool - .iter() - .filter(|(node_id, _)| { - !self.peers.contains_key(*node_id) - && !self.already_tried_peers.contains(*node_id) - && !self.discarded_contacts.contains(*node_id) - && self - .get_contact_or_replacement(node_id) - .map(|c| c.knows_us && !c.unwanted && c.is_fork_id_valid != Some(false)) - .unwrap_or(true) - }) - .map(|(id, node)| (*id, node.clone())) - .collect(); + // Draw from the flat connection pool using O(1) random index probing. + // Pick a random start index and scan forward (wrapping) until we find + // an eligible candidate or complete a full loop. + let pool_len = self.connection_pool.len(); + if pool_len == 0 { + return None; + } - if let Some((node_id, node)) = eligible.choose(&mut rand::rngs::OsRng).cloned() { + let start = rand::random::() % pool_len; + for offset in 0..pool_len { + let idx = (start + offset) % pool_len; + let Some((node_id, node)) = self.connection_pool.get_index(idx) else { + continue; + }; + let node_id = *node_id; + + if self.peers.contains_key(&node_id) + || self.already_tried_peers.contains(&node_id) + || self + .get_contact_or_replacement(&node_id) + .map(|c| !c.knows_us || c.unwanted || c.is_fork_id_valid == Some(false)) + .unwrap_or(false) + { + continue; + } + + let node = node.clone(); self.already_tried_peers.insert(node_id); - // Return a Contact from k-buckets if available (full state), - // otherwise construct a minimal one from the pool entry. let contact = self .get_contact_or_replacement(&node_id) .cloned() .unwrap_or_else(|| Contact::new(node, DiscoveryProtocol::Discv4)); - Some(contact) - } else { - tracing::trace!("Resetting list of tried peers."); - self.already_tried_peers.clear(); - None + return Some(contact); } + + // Exhausted all candidates — reset tried set for next cycle. + tracing::trace!("Resetting list of tried peers."); + self.already_tried_peers.clear(); + None } fn do_get_contact_for_lookup(&self, protocol: DiscoveryProtocol) -> Option { @@ -1210,10 +1196,8 @@ impl PeerTableServer { && !c.disposable }) .map(|(_, c)| c) - .collect::>() .choose(&mut rand::rngs::OsRng) .cloned() - .cloned() } /// Get contact for ENR lookup (discv4 only) @@ -1314,7 +1298,7 @@ impl PeerTableServer { async fn do_new_contacts(&mut self, nodes: Vec, protocol: DiscoveryProtocol) { for node in nodes { let node_id = node.node_id(); - if self.discarded_contacts.contains(&node_id) || node_id == self.local_node_id { + if node_id == self.local_node_id { continue; } #[cfg(feature = "metrics")] @@ -1350,7 +1334,7 @@ impl PeerTableServer { } if let Ok(node) = Node::from_enr(&node_record) { let node_id = node.node_id(); - if self.discarded_contacts.contains(&node_id) || node_id == self.local_node_id { + if node_id == self.local_node_id { continue; }