diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 78a91f05bc..ef71e8162d 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -58,6 +58,12 @@ const NUMBER_OF_BUCKETS: usize = 256; pub const MAX_NODES_PER_BUCKET: usize = 16; /// Maximum number of replacement entries per k-bucket. const MAX_REPLACEMENTS_PER_BUCKET: usize = 10; +/// Maximum number of entries in the flat connection candidate pool. +/// This pool is separate from the k-bucket routing table and retains +/// more contacts for RLPx connection initiation than the k-bucket +/// structure allows (256 × 16 = 4,096 vs this larger capacity). +/// 10K matches what Reth and Nethermind use for their candidate pools. +const MAX_CONNECTION_POOL_SIZE: usize = 10_000; /// A single k-bucket in the Kademlia routing table. /// Each bucket stores contacts at a specific XOR distance range from the local node. @@ -410,6 +416,12 @@ pub struct PeerTableServer { /// Standalone session store, independent of contacts. /// Allows sessions to be stored even before the contact's ENR is known/parseable. sessions: FxHashMap, + /// Flat pool of discovered contacts for RLPx connection initiation. + /// Decoupled from the k-bucket routing table so that connection initiation + /// has access to a much larger candidate pool than the k-bucket structure + /// allows (k-buckets: 256 × 16 = 4,096 max; this pool: up to 50,000). + /// K-buckets are still used for all Kademlia protocol operations. + connection_pool: IndexMap, } #[actor(protocol = PeerTableServerProtocol)] @@ -428,6 +440,7 @@ impl PeerTableServer { target_peers, store, sessions: Default::default(), + connection_pool: IndexMap::with_capacity(MAX_CONNECTION_POOL_SIZE), } } @@ -833,6 +846,8 @@ impl PeerTableServer { _ctx: &Context, ) -> bool { let node_id = msg.node.node_id(); + // Always add to the connection pool + self.insert_to_connection_pool(node_id, msg.node.clone()); if self.contact_exists(&node_id) { return false; } @@ -953,6 +968,18 @@ impl PeerTableServer { result } + /// Insert a node into the flat connection pool for RLPx initiation. + /// Evicts the oldest entry when the pool is at capacity. + fn insert_to_connection_pool(&mut self, node_id: H256, node: Node) { + if self.connection_pool.contains_key(&node_id) { + return; + } + if self.connection_pool.len() >= MAX_CONNECTION_POOL_SIZE { + self.connection_pool.shift_remove_index(0); + } + self.connection_pool.insert(node_id, node); + } + /// Look up a contact by node ID in either the main or replacement list. fn get_contact_or_replacement(&self, node_id: &H256) -> Option<&Contact> { let idx = self.bucket_for(node_id)?; @@ -1020,6 +1047,7 @@ impl PeerTableServer { /// Prune disposable contacts from both main and replacement lists. /// When a main contact is removed, a replacement is automatically promoted. + /// Also removes discarded contacts from the connection pool. fn prune(&mut self) { for bucket in &mut self.buckets { // Collect disposable contacts from main list @@ -1052,27 +1080,48 @@ impl PeerTableServer { self.discarded_contacts.insert(node_id); } } + + // Remove discarded contacts from the connection pool + self.connection_pool + .retain(|id, _| !self.discarded_contacts.contains(id)); } fn do_get_contact_to_initiate(&mut self) -> Option { - // Check both main contacts and replacements in each bucket. - // Replacements may contain fresher peers that haven't been tried yet. - for bucket in &self.buckets { - for (node_id, contact) in bucket.contacts.iter().chain(bucket.replacements.iter()) { - if !self.peers.contains_key(node_id) - && !self.already_tried_peers.contains(node_id) - && contact.knows_us - && !contact.unwanted - && contact.is_fork_id_valid != Some(false) - { - self.already_tried_peers.insert(*node_id); - return Some(contact.clone()); - } - } + // Draw from the flat connection pool (up to 50K contacts) rather than + // the k-bucket routing table (capped at ~4K). This gives the initiator + // access to a much larger and more diverse set of candidates. + // K-bucket state is checked for filtering when available; contacts not + // in k-buckets are assumed eligible (the RLPx handshake will reject + // incompatible peers). + let eligible: Vec<(H256, Node)> = self + .connection_pool + .iter() + .filter(|(node_id, _)| { + !self.peers.contains_key(*node_id) + && !self.already_tried_peers.contains(*node_id) + && !self.discarded_contacts.contains(*node_id) + && self + .get_contact_or_replacement(node_id) + .map(|c| c.knows_us && !c.unwanted && c.is_fork_id_valid != Some(false)) + .unwrap_or(true) + }) + .map(|(id, node)| (*id, node.clone())) + .collect(); + + if let Some((node_id, node)) = eligible.choose(&mut rand::rngs::OsRng).cloned() { + self.already_tried_peers.insert(node_id); + // Return a Contact from k-buckets if available (full state), + // otherwise construct a minimal one from the pool entry. + let contact = self + .get_contact_or_replacement(&node_id) + .cloned() + .unwrap_or_else(|| Contact::new(node, DiscoveryProtocol::Discv4)); + Some(contact) + } else { + tracing::trace!("Resetting list of tried peers."); + self.already_tried_peers.clear(); + None } - tracing::trace!("Resetting list of tried peers."); - self.already_tried_peers.clear(); - None } fn do_get_contact_for_lookup(&self, protocol: DiscoveryProtocol) -> Option { @@ -1193,6 +1242,9 @@ impl PeerTableServer { #[cfg(feature = "metrics")] let insert_start = std::time::Instant::now(); + // Always add to the connection pool (regardless of k-bucket capacity) + self.insert_to_connection_pool(node_id, node.clone()); + if self.contact_exists(&node_id) { // Contact already exists (main or replacement list), update protocol if let Some(contact) = self.get_contact_or_replacement_mut(&node_id) { @@ -1223,6 +1275,10 @@ impl PeerTableServer { if self.discarded_contacts.contains(&node_id) || node_id == self.local_node_id { continue; } + + // Always add to the connection pool (regardless of k-bucket capacity) + self.insert_to_connection_pool(node_id, node.clone()); + if self.contact_exists(&node_id) { // Check if we need to evaluate fork_id before taking // the mutable borrow.