From 12f71cf9219b0b6913ba01bf6c9ac7c82ac29426 Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Thu, 16 Apr 2026 12:37:23 +0200 Subject: [PATCH 1/2] feat(sync): pipeline body download with execution (#6479) Overlap block body downloads with block execution during full sync by using a background tokio task and an mpsc channel (buffer of 2 batches). The download task iterates through batches, fetches headers from the store, downloads bodies from peers, constructs Block objects, and sends them through the channel. The main loop receives completed batches and executes them immediately. This hides network I/O latency behind execution time for an estimated 12-15% throughput improvement. Closes #6479 --- crates/networking/p2p/sync/full.rs | 160 +++++++++++++++++++++-------- 1 file changed, 118 insertions(+), 42 deletions(-) diff --git a/crates/networking/p2p/sync/full.rs b/crates/networking/p2p/sync/full.rs index aaf6c145fa4..ef02f1ffdf9 100644 --- a/crates/networking/p2p/sync/full.rs +++ b/crates/networking/p2p/sync/full.rs @@ -116,54 +116,130 @@ pub async fn sync_cycle_full( end_block_number += 1; start_block_number = start_block_number.max(1); - // Download block bodies and execute full blocks in batches - for start in (start_block_number..end_block_number).step_by(*EXECUTE_BATCH_SIZE) { - let batch_size = EXECUTE_BATCH_SIZE.min((end_block_number - start) as usize); - let final_batch = end_block_number == start + batch_size as u64; - // Retrieve batch from DB - if !single_batch { - headers = store + // Pipeline: download block bodies in a background task while the main loop executes. + // This overlaps network I/O with block execution for better throughput. + let (body_tx, mut body_rx) = + tokio::sync::mpsc::channel::, bool), SyncError>>(2); + + // Clone resources for the background download task + let mut download_peers = peers.clone(); + let download_store = store.clone(); + + let download_task = tokio::spawn(async move { + // If single_batch, we already have headers in memory — send them as the one and only batch. + if single_batch { + let final_batch = true; + let mut batch_headers = headers; + let mut blocks = Vec::new(); + while !batch_headers.is_empty() { + let end = min(MAX_BLOCK_BODIES_TO_REQUEST, batch_headers.len()); + let header_batch = &batch_headers[..end]; + match download_peers.request_block_bodies(header_batch).await { + Ok(Some(bodies)) => { + debug!("Obtained: {} block bodies", bodies.len()); + let block_batch = batch_headers + .drain(..bodies.len()) + .zip(bodies) + .map(|(header, body)| Block { header, body }); + blocks.extend(block_batch); + } + Ok(None) => { + let _ = body_tx.send(Err(SyncError::BodiesNotFound)).await; + return; + } + Err(e) => { + let _ = body_tx.send(Err(e.into())).await; + return; + } + } + } + if !blocks.is_empty() { + let _ = body_tx.send(Ok((blocks, final_batch))).await; + } + return; + } + + // Multi-batch path: iterate through all batches, download bodies, and send them. + for start in (start_block_number..end_block_number).step_by(*EXECUTE_BATCH_SIZE) { + let batch_size = EXECUTE_BATCH_SIZE.min((end_block_number - start) as usize); + let final_batch = end_block_number == start + batch_size as u64; + + let batch_headers = match download_store .read_fullsync_batch(start, batch_size as u64) - .await? + .await + { + Ok(h) => h, + Err(e) => { + let _ = body_tx.send(Err(e.into())).await; + return; + } + }; + let mut batch_headers: Vec<_> = match batch_headers .into_iter() .map(|opt| opt.ok_or(SyncError::MissingFullsyncBatch)) - .collect::, SyncError>>()?; - } - let mut blocks = Vec::new(); - // Request block bodies - // Download block bodies - while !headers.is_empty() { - let header_batch = &headers[..min(MAX_BLOCK_BODIES_TO_REQUEST, headers.len())]; - let bodies = peers - .request_block_bodies(header_batch) - .await? - .ok_or(SyncError::BodiesNotFound)?; - debug!("Obtained: {} block bodies", bodies.len()); - let block_batch = headers - .drain(..bodies.len()) - .zip(bodies) - .map(|(header, body)| Block { header, body }); - blocks.extend(block_batch); - } - if !blocks.is_empty() { - // Execute blocks - info!( - "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", - blocks.len(), - blocks.first().ok_or(SyncError::NoBlocks)?.hash(), - blocks.last().ok_or(SyncError::NoBlocks)?.hash() - ); - add_blocks_in_batch( - blockchain.clone(), - cancel_token.clone(), - blocks, - final_batch, - store.clone(), - ) - .await?; + .collect() + { + Ok(h) => h, + Err(e) => { + let _ = body_tx.send(Err(e)).await; + return; + } + }; + + let mut blocks = Vec::new(); + while !batch_headers.is_empty() { + let end = min(MAX_BLOCK_BODIES_TO_REQUEST, batch_headers.len()); + let header_batch = &batch_headers[..end]; + match download_peers.request_block_bodies(header_batch).await { + Ok(Some(bodies)) => { + debug!("Obtained: {} block bodies", bodies.len()); + let block_batch = batch_headers + .drain(..bodies.len()) + .zip(bodies) + .map(|(header, body)| Block { header, body }); + blocks.extend(block_batch); + } + Ok(None) => { + let _ = body_tx.send(Err(SyncError::BodiesNotFound)).await; + return; + } + Err(e) => { + let _ = body_tx.send(Err(e.into())).await; + return; + } + } + } + if !blocks.is_empty() { + if body_tx.send(Ok((blocks, final_batch))).await.is_err() { + // Receiver dropped (execution loop stopped), stop downloading + return; + } + } } + }); + + // Main loop: receive downloaded batches and execute them + while let Some(result) = body_rx.recv().await { + let (blocks, final_batch) = result?; + info!( + "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", + blocks.len(), + blocks.first().ok_or(SyncError::NoBlocks)?.hash(), + blocks.last().ok_or(SyncError::NoBlocks)?.hash() + ); + add_blocks_in_batch( + blockchain.clone(), + cancel_token.clone(), + blocks, + final_batch, + store.clone(), + ) + .await?; } + // Ensure the download task completes and propagate any panics + download_task.await?; + // Execute pending blocks if !pending_blocks.is_empty() { info!( From f355cc7cafd339e3f7b23cc5245b046faec512c7 Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Thu, 16 Apr 2026 13:06:22 +0200 Subject: [PATCH 2/2] fix(l1): resolve clippy warning in pipelined body download --- crates/networking/p2p/sync/full.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/networking/p2p/sync/full.rs b/crates/networking/p2p/sync/full.rs index ef02f1ffdf9..6afb82d153c 100644 --- a/crates/networking/p2p/sync/full.rs +++ b/crates/networking/p2p/sync/full.rs @@ -209,11 +209,11 @@ pub async fn sync_cycle_full( } } } - if !blocks.is_empty() { - if body_tx.send(Ok((blocks, final_batch))).await.is_err() { - // Receiver dropped (execution loop stopped), stop downloading - return; - } + if !blocks.is_empty() + && body_tx.send(Ok((blocks, final_batch))).await.is_err() + { + // Receiver dropped (execution loop stopped), stop downloading + return; } } });