diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index a921bc975a4..b2da1b4cc62 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -90,6 +90,8 @@ async fn ask_peer_head_number( } Ok(_other_msgs) => Err(PeerHandlerError::UnexpectedResponseFromPeer(peer_id)), Err(PeerConnectionError::Timeout) => { + // Mark this peer as disposable so it gets pruned and replaced + let _ = peer_table.set_disposable(peer_id); Err(PeerHandlerError::ReceiveMessageFromPeerTimeout(peer_id)) } Err(_other_err) => Err(PeerHandlerError::ReceiveMessageFromPeer(peer_id)), @@ -448,13 +450,15 @@ impl PeerHandler { warn!( "[SYNCING] Received empty/invalid headers from peer, penalizing peer {peer_id}" ); + let _ = self.peer_table.set_disposable(peer_id); return Ok(None); } } - // Timeouted + // Timeout or invalid response - mark peer as disposable warn!( "[SYNCING] Didn't receive block headers from peer, penalizing peer {peer_id}..." ); + let _ = self.peer_table.set_disposable(peer_id); Ok(None) } } @@ -536,6 +540,7 @@ impl PeerHandler { "[SYNCING] Didn't receive block bodies from peer, penalizing peer {peer_id}..." ); self.peer_table.record_failure(peer_id)?; + let _ = self.peer_table.set_disposable(peer_id); Ok(None) } } diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index f699b60295a..78a91f05bc9 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -86,9 +86,12 @@ impl KBucket { }) } - /// Find a mutable reference to a contact by node ID within this bucket. + /// Find a mutable reference to a contact by node ID (main or replacement list). fn get_mut(&mut self, node_id: &H256) -> Option<&mut Contact> { - self.contacts + if let Some((_, c)) = self.contacts.iter_mut().find(|(id, _)| id == node_id) { + return Some(c); + } + self.replacements .iter_mut() .find(|(id, _)| id == node_id) .map(|(_, c)| c) @@ -909,10 +912,10 @@ impl PeerTableServer { bucket_index(&self.local_node_id, node_id) } - /// Look up a contact by node ID (O(K) within the bucket). + /// Look up a contact by node ID in main or replacement list (O(K) within the bucket). fn get_contact(&self, node_id: &H256) -> Option<&Contact> { let idx = self.bucket_for(node_id)?; - self.buckets[idx].get(node_id) + self.buckets[idx].get_any(node_id) } /// Look up a mutable reference to a contact by node ID. @@ -971,11 +974,15 @@ impl PeerTableServer { None } - /// Iterate over all contacts across all buckets. + /// Iterate over all contacts across all buckets (main and replacement lists). fn iter_contacts(&self) -> impl Iterator { - self.buckets - .iter() - .flat_map(|bucket| bucket.contacts.iter().map(|(id, c)| (id, c))) + self.buckets.iter().flat_map(|bucket| { + bucket + .contacts + .iter() + .chain(bucket.replacements.iter()) + .map(|(id, c)| (id, c)) + }) } // --- Peer selection --- @@ -1011,23 +1018,47 @@ impl PeerTableServer { // --- Contact operations --- + /// Prune disposable contacts from both main and replacement lists. + /// When a main contact is removed, a replacement is automatically promoted. fn prune(&mut self) { - let disposable_contacts: Vec = self - .iter_contacts() - .filter_map(|(id, c)| c.disposable.then_some(*id)) - .collect(); + for bucket in &mut self.buckets { + // Collect disposable contacts from main list + let main_disposable: Vec = bucket + .contacts + .iter() + .filter(|(_, c)| c.disposable) + .map(|(id, _)| *id) + .collect(); + + // Remove from main list and promote replacements + for node_id in main_disposable { + bucket.remove_and_promote(&node_id); + self.discarded_contacts.insert(node_id); + } - for node_id in disposable_contacts { - if let Some(idx) = self.bucket_for(&node_id) { - self.buckets[idx].remove_and_promote(&node_id); + // Remove disposable contacts from replacement list + // (these don't get promoted, just removed) + let replacement_disposable: Vec = bucket + .replacements + .iter() + .filter(|(_, c)| c.disposable) + .map(|(id, _)| *id) + .collect(); + + bucket + .replacements + .retain(|(id, _)| !replacement_disposable.contains(id)); + for node_id in replacement_disposable { self.discarded_contacts.insert(node_id); } } } fn do_get_contact_to_initiate(&mut self) -> Option { + // Check both main contacts and replacements in each bucket. + // Replacements may contain fresher peers that haven't been tried yet. for bucket in &self.buckets { - for (node_id, contact) in &bucket.contacts { + for (node_id, contact) in bucket.contacts.iter().chain(bucket.replacements.iter()) { if !self.peers.contains_key(node_id) && !self.already_tried_peers.contains(node_id) && contact.knows_us diff --git a/crates/networking/p2p/sync/snap_sync.rs b/crates/networking/p2p/sync/snap_sync.rs index 519f1c73389..f2cfef9c9d1 100644 --- a/crates/networking/p2p/sync/snap_sync.rs +++ b/crates/networking/p2p/sync/snap_sync.rs @@ -130,6 +130,9 @@ pub async fn sync_cycle_snap( let mut attempts = 0; loop { + // Prune dead/unresponsive peers periodically to allow replacements to be promoted + let _ = peers.peer_table.prune_table(); + debug!("Requesting Block Headers from {current_head}"); let Some(mut block_headers) = peers