Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ async fn ask_peer_head_number(
}
Ok(_other_msgs) => Err(PeerHandlerError::UnexpectedResponseFromPeer(peer_id)),
Err(PeerConnectionError::Timeout) => {
// Mark this peer as disposable so it gets pruned and replaced
let _ = peer_table.set_disposable(peer_id);
Err(PeerHandlerError::ReceiveMessageFromPeerTimeout(peer_id))
}
Err(_other_err) => Err(PeerHandlerError::ReceiveMessageFromPeer(peer_id)),
Expand Down Expand Up @@ -448,13 +450,15 @@ impl PeerHandler {
warn!(
"[SYNCING] Received empty/invalid headers from peer, penalizing peer {peer_id}"
);
let _ = self.peer_table.set_disposable(peer_id);
return Ok(None);
}
}
// Timeouted
// Timeout or invalid response - mark peer as disposable
warn!(
"[SYNCING] Didn't receive block headers from peer, penalizing peer {peer_id}..."
);
let _ = self.peer_table.set_disposable(peer_id);
Ok(None)
}
}
Expand Down Expand Up @@ -536,6 +540,7 @@ impl PeerHandler {
"[SYNCING] Didn't receive block bodies from peer, penalizing peer {peer_id}..."
);
self.peer_table.record_failure(peer_id)?;
let _ = self.peer_table.set_disposable(peer_id);
Comment on lines 540 to +543
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 record_failure + set_disposable on every block-body miss

A single bad block-body response (empty body, wrong count, or timeout) now permanently disposes the peer via set_disposable, on top of the pre-existing record_failure score penalty. Since set_disposable is the stronger action—it prevents the peer from ever re-entering the main bucket—calling record_failure first is harmless but redundant. More importantly, transient failures (brief network hiccup, peer momentarily behind) will now permanently remove the peer from the routing table after a single miss, which may be too aggressive and could reduce the effective peer pool over a long sync. Consider requiring either multiple consecutive failures or a combined threshold before marking disposable, similar to how record_critical_failure is used for misbehaving peers.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/peer_handler.rs
Line: 540-543

Comment:
**`record_failure` + `set_disposable` on every block-body miss**

A single bad block-body response (empty body, wrong count, or timeout) now permanently disposes the peer via `set_disposable`, on top of the pre-existing `record_failure` score penalty. Since `set_disposable` is the stronger action—it prevents the peer from ever re-entering the main bucket—calling `record_failure` first is harmless but redundant. More importantly, transient failures (brief network hiccup, peer momentarily behind) will now permanently remove the peer from the routing table after a single miss, which may be too aggressive and could reduce the effective peer pool over a long sync. Consider requiring either multiple consecutive failures or a combined threshold before marking disposable, similar to how `record_critical_failure` is used for misbehaving peers.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Ok(None)
}
}
Expand Down
63 changes: 47 additions & 16 deletions crates/networking/p2p/peer_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ impl KBucket {
})
}

/// Find a mutable reference to a contact by node ID within this bucket.
/// Find a mutable reference to a contact by node ID (main or replacement list).
fn get_mut(&mut self, node_id: &H256) -> Option<&mut Contact> {
self.contacts
if let Some((_, c)) = self.contacts.iter_mut().find(|(id, _)| id == node_id) {
return Some(c);
}
self.replacements
.iter_mut()
.find(|(id, _)| id == node_id)
.map(|(_, c)| c)
Expand Down Expand Up @@ -909,10 +912,10 @@ impl PeerTableServer {
bucket_index(&self.local_node_id, node_id)
}

/// Look up a contact by node ID (O(K) within the bucket).
/// Look up a contact by node ID in main or replacement list (O(K) within the bucket).
fn get_contact(&self, node_id: &H256) -> Option<&Contact> {
let idx = self.bucket_for(node_id)?;
self.buckets[idx].get(node_id)
self.buckets[idx].get_any(node_id)
}

/// Look up a mutable reference to a contact by node ID.
Expand Down Expand Up @@ -971,11 +974,15 @@ impl PeerTableServer {
None
}

/// Iterate over all contacts across all buckets.
/// Iterate over all contacts across all buckets (main and replacement lists).
fn iter_contacts(&self) -> impl Iterator<Item = (&H256, &Contact)> {
self.buckets
.iter()
.flat_map(|bucket| bucket.contacts.iter().map(|(id, c)| (id, c)))
self.buckets.iter().flat_map(|bucket| {
bucket
.contacts
.iter()
.chain(bucket.replacements.iter())
.map(|(id, c)| (id, c))
})
Comment on lines 974 to +985
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 iter_contacts expansion silently broadens multiple consumers

iter_contacts is used by five functions beyond the set_disposable path that directly motivates this fix: do_get_contact_for_lookup, do_get_contact_for_enr_lookup, do_get_contact_to_revalidate, do_find_closest_nodes, and do_get_nodes_at_distances. Replacement contacts will now be revalidated, sent FindNode queries, and returned in neighbor advertisements. For do_find_closest_nodes this is benign (candidates are still capped at 16), but revalidating or sending FindNode to a replacement that was intentionally held back (because its bucket was full) may generate extra traffic and slightly change the discovery convergence behavior. This is worth a targeted comment acknowledging the broader scope.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/peer_table.rs
Line: 974-985

Comment:
**`iter_contacts` expansion silently broadens multiple consumers**

`iter_contacts` is used by five functions beyond the `set_disposable` path that directly motivates this fix: `do_get_contact_for_lookup`, `do_get_contact_for_enr_lookup`, `do_get_contact_to_revalidate`, `do_find_closest_nodes`, and `do_get_nodes_at_distances`. Replacement contacts will now be revalidated, sent FindNode queries, and returned in neighbor advertisements. For `do_find_closest_nodes` this is benign (candidates are still capped at 16), but revalidating or sending FindNode to a replacement that was intentionally held back (because its bucket was full) may generate extra traffic and slightly change the discovery convergence behavior. This is worth a targeted comment acknowledging the broader scope.

How can I resolve this? If you propose a fix, please make it concise.

}

// --- Peer selection ---
Expand Down Expand Up @@ -1011,23 +1018,47 @@ impl PeerTableServer {

// --- Contact operations ---

/// Prune disposable contacts from both main and replacement lists.
/// When a main contact is removed, a replacement is automatically promoted.
fn prune(&mut self) {
let disposable_contacts: Vec<H256> = self
.iter_contacts()
.filter_map(|(id, c)| c.disposable.then_some(*id))
.collect();
for bucket in &mut self.buckets {
// Collect disposable contacts from main list
let main_disposable: Vec<H256> = bucket
.contacts
.iter()
.filter(|(_, c)| c.disposable)
.map(|(id, _)| *id)
.collect();

// Remove from main list and promote replacements
for node_id in main_disposable {
bucket.remove_and_promote(&node_id);
self.discarded_contacts.insert(node_id);
}

for node_id in disposable_contacts {
if let Some(idx) = self.bucket_for(&node_id) {
self.buckets[idx].remove_and_promote(&node_id);
// Remove disposable contacts from replacement list
// (these don't get promoted, just removed)
let replacement_disposable: Vec<H256> = bucket
.replacements
.iter()
.filter(|(_, c)| c.disposable)
.map(|(id, _)| *id)
.collect();

bucket
.replacements
.retain(|(id, _)| !replacement_disposable.contains(id));
Comment on lines +1041 to +1050
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 O(n²) Vec::contains in retain loop

replacement_disposable is a Vec, so replacement_disposable.contains(id) inside retain is O(n) per element, making the full retain O(n²). For MAX_REPLACEMENTS_PER_BUCKET = 10 this is negligible, but converting to a FxHashSet keeps this consistent with the codebase's other hash-set patterns and is a trivial change.

let replacement_disposable: FxHashSet<H256> = bucket
    .replacements
    .iter()
    .filter(|(_, c)| c.disposable)
    .map(|(id, _)| *id)
    .collect();

bucket
    .replacements
    .retain(|(id, _)| !replacement_disposable.contains(id));
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/peer_table.rs
Line: 1041-1050

Comment:
**O(n²) `Vec::contains` in `retain` loop**

`replacement_disposable` is a `Vec`, so `replacement_disposable.contains(id)` inside `retain` is O(n) per element, making the full retain O(n²). For `MAX_REPLACEMENTS_PER_BUCKET = 10` this is negligible, but converting to a `FxHashSet` keeps this consistent with the codebase's other hash-set patterns and is a trivial change.

```rust
let replacement_disposable: FxHashSet<H256> = bucket
    .replacements
    .iter()
    .filter(|(_, c)| c.disposable)
    .map(|(id, _)| *id)
    .collect();

bucket
    .replacements
    .retain(|(id, _)| !replacement_disposable.contains(id));
```

How can I resolve this? If you propose a fix, please make it concise.

for node_id in replacement_disposable {
self.discarded_contacts.insert(node_id);
}
}
}

fn do_get_contact_to_initiate(&mut self) -> Option<Contact> {
// Check both main contacts and replacements in each bucket.
// Replacements may contain fresher peers that haven't been tried yet.
for bucket in &self.buckets {
for (node_id, contact) in &bucket.contacts {
for (node_id, contact) in bucket.contacts.iter().chain(bucket.replacements.iter()) {
if !self.peers.contains_key(node_id)
&& !self.already_tried_peers.contains(node_id)
&& contact.knows_us
Expand Down
3 changes: 3 additions & 0 deletions crates/networking/p2p/sync/snap_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ pub async fn sync_cycle_snap(
let mut attempts = 0;

loop {
// Prune dead/unresponsive peers periodically to allow replacements to be promoted
let _ = peers.peer_table.prune_table();

debug!("Requesting Block Headers from {current_head}");

let Some(mut block_headers) = peers
Expand Down
Loading