diff --git a/crates/networking/p2p/sync/full.rs b/crates/networking/p2p/sync/full.rs index d709299ace..58dfdcc679 100644 --- a/crates/networking/p2p/sync/full.rs +++ b/crates/networking/p2p/sync/full.rs @@ -121,54 +121,130 @@ pub async fn sync_cycle_full( end_block_number += 1; start_block_number = start_block_number.max(1); - // Download block bodies and execute full blocks in batches - for start in (start_block_number..end_block_number).step_by(*EXECUTE_BATCH_SIZE) { - let batch_size = EXECUTE_BATCH_SIZE.min((end_block_number - start) as usize); - let final_batch = end_block_number == start + batch_size as u64; - // Retrieve batch from DB - if !single_batch { - headers = store + // Pipeline: download block bodies in a background task while the main loop executes. + // This overlaps network I/O with block execution for better throughput. + let (body_tx, mut body_rx) = + tokio::sync::mpsc::channel::, bool), SyncError>>(2); + + // Clone resources for the background download task + let mut download_peers = peers.clone(); + let download_store = store.clone(); + + let download_task = tokio::spawn(async move { + // If single_batch, we already have headers in memory — send them as the one and only batch. + if single_batch { + let final_batch = true; + let mut batch_headers = headers; + let mut blocks = Vec::new(); + while !batch_headers.is_empty() { + let end = min(MAX_BLOCK_BODIES_TO_REQUEST, batch_headers.len()); + let header_batch = &batch_headers[..end]; + match download_peers.request_block_bodies(header_batch).await { + Ok(Some(bodies)) => { + debug!("Obtained: {} block bodies", bodies.len()); + let block_batch = batch_headers + .drain(..bodies.len()) + .zip(bodies) + .map(|(header, body)| Block { header, body }); + blocks.extend(block_batch); + } + Ok(None) => { + let _ = body_tx.send(Err(SyncError::BodiesNotFound)).await; + return; + } + Err(e) => { + let _ = body_tx.send(Err(e.into())).await; + return; + } + } + } + if !blocks.is_empty() { + let _ = body_tx.send(Ok((blocks, final_batch))).await; + } + return; + } + + // Multi-batch path: iterate through all batches, download bodies, and send them. + for start in (start_block_number..end_block_number).step_by(*EXECUTE_BATCH_SIZE) { + let batch_size = EXECUTE_BATCH_SIZE.min((end_block_number - start) as usize); + let final_batch = end_block_number == start + batch_size as u64; + + let batch_headers = match download_store .read_fullsync_batch(start, batch_size as u64) - .await? + .await + { + Ok(h) => h, + Err(e) => { + let _ = body_tx.send(Err(e.into())).await; + return; + } + }; + let mut batch_headers: Vec<_> = match batch_headers .into_iter() .map(|opt| opt.ok_or(SyncError::MissingFullsyncBatch)) - .collect::, SyncError>>()?; - } - let mut blocks = Vec::new(); - // Request block bodies - // Download block bodies - while !headers.is_empty() { - let header_batch = &headers[..min(MAX_BLOCK_BODIES_TO_REQUEST, headers.len())]; - let bodies = peers - .request_block_bodies(header_batch) - .await? - .ok_or(SyncError::BodiesNotFound)?; - debug!("Obtained: {} block bodies", bodies.len()); - let block_batch = headers - .drain(..bodies.len()) - .zip(bodies) - .map(|(header, body)| Block { header, body }); - blocks.extend(block_batch); - } - if !blocks.is_empty() { - // Execute blocks - info!( - "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", - blocks.len(), - blocks.first().ok_or(SyncError::NoBlocks)?.hash(), - blocks.last().ok_or(SyncError::NoBlocks)?.hash() - ); - add_blocks_in_batch( - blockchain.clone(), - cancel_token.clone(), - blocks, - final_batch, - store.clone(), - ) - .await?; + .collect() + { + Ok(h) => h, + Err(e) => { + let _ = body_tx.send(Err(e)).await; + return; + } + }; + + let mut blocks = Vec::new(); + while !batch_headers.is_empty() { + let end = min(MAX_BLOCK_BODIES_TO_REQUEST, batch_headers.len()); + let header_batch = &batch_headers[..end]; + match download_peers.request_block_bodies(header_batch).await { + Ok(Some(bodies)) => { + debug!("Obtained: {} block bodies", bodies.len()); + let block_batch = batch_headers + .drain(..bodies.len()) + .zip(bodies) + .map(|(header, body)| Block { header, body }); + blocks.extend(block_batch); + } + Ok(None) => { + let _ = body_tx.send(Err(SyncError::BodiesNotFound)).await; + return; + } + Err(e) => { + let _ = body_tx.send(Err(e.into())).await; + return; + } + } + } + if !blocks.is_empty() + && body_tx.send(Ok((blocks, final_batch))).await.is_err() + { + // Receiver dropped (execution loop stopped), stop downloading + return; + } } + }); + + // Main loop: receive downloaded batches and execute them + while let Some(result) = body_rx.recv().await { + let (blocks, final_batch) = result?; + info!( + "Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}", + blocks.len(), + blocks.first().ok_or(SyncError::NoBlocks)?.hash(), + blocks.last().ok_or(SyncError::NoBlocks)?.hash() + ); + add_blocks_in_batch( + blockchain.clone(), + cancel_token.clone(), + blocks, + final_batch, + store.clone(), + ) + .await?; } + // Ensure the download task completes and propagate any panics + download_task.await?; + // Execute pending blocks if !pending_blocks.is_empty() { info!(