Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 73 additions & 17 deletions crates/networking/p2p/peer_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ const NUMBER_OF_BUCKETS: usize = 256;
pub const MAX_NODES_PER_BUCKET: usize = 16;
/// Maximum number of replacement entries per k-bucket.
const MAX_REPLACEMENTS_PER_BUCKET: usize = 10;
/// Maximum number of entries in the flat connection candidate pool.
/// This pool is separate from the k-bucket routing table and retains
/// more contacts for RLPx connection initiation than the k-bucket
/// structure allows (256 × 16 = 4,096 vs this larger capacity).
/// 10K matches what Reth and Nethermind use for their candidate pools.
const MAX_CONNECTION_POOL_SIZE: usize = 10_000;

/// A single k-bucket in the Kademlia routing table.
/// Each bucket stores contacts at a specific XOR distance range from the local node.
Expand Down Expand Up @@ -410,6 +416,12 @@ pub struct PeerTableServer {
/// Standalone session store, independent of contacts.
/// Allows sessions to be stored even before the contact's ENR is known/parseable.
sessions: FxHashMap<H256, Session>,
/// Flat pool of discovered contacts for RLPx connection initiation.
/// Decoupled from the k-bucket routing table so that connection initiation
/// has access to a much larger candidate pool than the k-bucket structure
/// allows (k-buckets: 256 × 16 = 4,096 max; this pool: up to 50,000).
/// K-buckets are still used for all Kademlia protocol operations.
connection_pool: IndexMap<H256, Node>,
}

#[actor(protocol = PeerTableServerProtocol)]
Expand All @@ -428,6 +440,7 @@ impl PeerTableServer {
target_peers,
store,
sessions: Default::default(),
connection_pool: IndexMap::with_capacity(MAX_CONNECTION_POOL_SIZE),
}
}

Expand Down Expand Up @@ -833,6 +846,8 @@ impl PeerTableServer {
_ctx: &Context<Self>,
) -> bool {
let node_id = msg.node.node_id();
// Always add to the connection pool
self.insert_to_connection_pool(node_id, msg.node.clone());
if self.contact_exists(&node_id) {
return false;
}
Expand Down Expand Up @@ -953,6 +968,18 @@ impl PeerTableServer {
result
}

/// Insert a node into the flat connection pool for RLPx initiation.
/// Evicts the oldest entry when the pool is at capacity.
fn insert_to_connection_pool(&mut self, node_id: H256, node: Node) {
if self.connection_pool.contains_key(&node_id) {
return;
}
if self.connection_pool.len() >= MAX_CONNECTION_POOL_SIZE {
self.connection_pool.shift_remove_index(0);
}
self.connection_pool.insert(node_id, node);
}

/// Look up a contact by node ID in either the main or replacement list.
fn get_contact_or_replacement(&self, node_id: &H256) -> Option<&Contact> {
let idx = self.bucket_for(node_id)?;
Expand Down Expand Up @@ -1020,6 +1047,7 @@ impl PeerTableServer {

/// Prune disposable contacts from both main and replacement lists.
/// When a main contact is removed, a replacement is automatically promoted.
/// Also removes discarded contacts from the connection pool.
fn prune(&mut self) {
for bucket in &mut self.buckets {
// Collect disposable contacts from main list
Expand Down Expand Up @@ -1052,27 +1080,48 @@ impl PeerTableServer {
self.discarded_contacts.insert(node_id);
}
}

// Remove discarded contacts from the connection pool
self.connection_pool
.retain(|id, _| !self.discarded_contacts.contains(id));
}

fn do_get_contact_to_initiate(&mut self) -> Option<Contact> {
// Check both main contacts and replacements in each bucket.
// Replacements may contain fresher peers that haven't been tried yet.
for bucket in &self.buckets {
for (node_id, contact) in bucket.contacts.iter().chain(bucket.replacements.iter()) {
if !self.peers.contains_key(node_id)
&& !self.already_tried_peers.contains(node_id)
&& contact.knows_us
&& !contact.unwanted
&& contact.is_fork_id_valid != Some(false)
{
self.already_tried_peers.insert(*node_id);
return Some(contact.clone());
}
}
// Draw from the flat connection pool (up to 50K contacts) rather than
// the k-bucket routing table (capped at ~4K). This gives the initiator
// access to a much larger and more diverse set of candidates.
// K-bucket state is checked for filtering when available; contacts not
// in k-buckets are assumed eligible (the RLPx handshake will reject
// incompatible peers).
let eligible: Vec<(H256, Node)> = self
.connection_pool
.iter()
.filter(|(node_id, _)| {
!self.peers.contains_key(*node_id)
&& !self.already_tried_peers.contains(*node_id)
&& !self.discarded_contacts.contains(*node_id)
&& self
.get_contact_or_replacement(node_id)
.map(|c| c.knows_us && !c.unwanted && c.is_fork_id_valid != Some(false))
.unwrap_or(true)
})
.map(|(id, node)| (*id, node.clone()))
.collect();

if let Some((node_id, node)) = eligible.choose(&mut rand::rngs::OsRng).cloned() {
self.already_tried_peers.insert(node_id);
// Return a Contact from k-buckets if available (full state),
// otherwise construct a minimal one from the pool entry.
let contact = self
.get_contact_or_replacement(&node_id)
.cloned()
.unwrap_or_else(|| Contact::new(node, DiscoveryProtocol::Discv4));
Some(contact)
} else {
tracing::trace!("Resetting list of tried peers.");
self.already_tried_peers.clear();
None
}
tracing::trace!("Resetting list of tried peers.");
self.already_tried_peers.clear();
None
}

fn do_get_contact_for_lookup(&self, protocol: DiscoveryProtocol) -> Option<Contact> {
Expand Down Expand Up @@ -1193,6 +1242,9 @@ impl PeerTableServer {
#[cfg(feature = "metrics")]
let insert_start = std::time::Instant::now();

// Always add to the connection pool (regardless of k-bucket capacity)
self.insert_to_connection_pool(node_id, node.clone());

if self.contact_exists(&node_id) {
// Contact already exists (main or replacement list), update protocol
if let Some(contact) = self.get_contact_or_replacement_mut(&node_id) {
Expand Down Expand Up @@ -1223,6 +1275,10 @@ impl PeerTableServer {
if self.discarded_contacts.contains(&node_id) || node_id == self.local_node_id {
continue;
}

// Always add to the connection pool (regardless of k-bucket capacity)
self.insert_to_connection_pool(node_id, node.clone());

if self.contact_exists(&node_id) {
// Check if we need to evaluate fork_id before taking
// the mutable borrow.
Expand Down
Loading