Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 118 additions & 42 deletions crates/networking/p2p/sync/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,54 +121,130 @@ pub async fn sync_cycle_full(
end_block_number += 1;
start_block_number = start_block_number.max(1);

// Download block bodies and execute full blocks in batches
for start in (start_block_number..end_block_number).step_by(*EXECUTE_BATCH_SIZE) {
let batch_size = EXECUTE_BATCH_SIZE.min((end_block_number - start) as usize);
let final_batch = end_block_number == start + batch_size as u64;
// Retrieve batch from DB
if !single_batch {
headers = store
// Pipeline: download block bodies in a background task while the main loop executes.
// This overlaps network I/O with block execution for better throughput.
let (body_tx, mut body_rx) =
tokio::sync::mpsc::channel::<Result<(Vec<Block>, bool), SyncError>>(2);

// Clone resources for the background download task
let mut download_peers = peers.clone();
let download_store = store.clone();

let download_task = tokio::spawn(async move {
// If single_batch, we already have headers in memory — send them as the one and only batch.
if single_batch {
let final_batch = true;
let mut batch_headers = headers;
let mut blocks = Vec::new();
while !batch_headers.is_empty() {
let end = min(MAX_BLOCK_BODIES_TO_REQUEST, batch_headers.len());
let header_batch = &batch_headers[..end];
match download_peers.request_block_bodies(header_batch).await {
Ok(Some(bodies)) => {
debug!("Obtained: {} block bodies", bodies.len());
let block_batch = batch_headers
.drain(..bodies.len())
.zip(bodies)
.map(|(header, body)| Block { header, body });
blocks.extend(block_batch);
}
Ok(None) => {
let _ = body_tx.send(Err(SyncError::BodiesNotFound)).await;
return;
}
Err(e) => {
let _ = body_tx.send(Err(e.into())).await;
return;
}
}
}
if !blocks.is_empty() {
let _ = body_tx.send(Ok((blocks, final_batch))).await;
}
return;
}

// Multi-batch path: iterate through all batches, download bodies, and send them.
for start in (start_block_number..end_block_number).step_by(*EXECUTE_BATCH_SIZE) {
let batch_size = EXECUTE_BATCH_SIZE.min((end_block_number - start) as usize);
let final_batch = end_block_number == start + batch_size as u64;

let batch_headers = match download_store
.read_fullsync_batch(start, batch_size as u64)
.await?
.await
{
Ok(h) => h,
Err(e) => {
let _ = body_tx.send(Err(e.into())).await;
return;
}
};
let mut batch_headers: Vec<_> = match batch_headers
.into_iter()
.map(|opt| opt.ok_or(SyncError::MissingFullsyncBatch))
.collect::<Result<Vec<_>, SyncError>>()?;
}
let mut blocks = Vec::new();
// Request block bodies
// Download block bodies
while !headers.is_empty() {
let header_batch = &headers[..min(MAX_BLOCK_BODIES_TO_REQUEST, headers.len())];
let bodies = peers
.request_block_bodies(header_batch)
.await?
.ok_or(SyncError::BodiesNotFound)?;
debug!("Obtained: {} block bodies", bodies.len());
let block_batch = headers
.drain(..bodies.len())
.zip(bodies)
.map(|(header, body)| Block { header, body });
blocks.extend(block_batch);
}
if !blocks.is_empty() {
// Execute blocks
info!(
"Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}",
blocks.len(),
blocks.first().ok_or(SyncError::NoBlocks)?.hash(),
blocks.last().ok_or(SyncError::NoBlocks)?.hash()
);
add_blocks_in_batch(
blockchain.clone(),
cancel_token.clone(),
blocks,
final_batch,
store.clone(),
)
.await?;
.collect()
{
Ok(h) => h,
Err(e) => {
let _ = body_tx.send(Err(e)).await;
return;
}
};

let mut blocks = Vec::new();
while !batch_headers.is_empty() {
let end = min(MAX_BLOCK_BODIES_TO_REQUEST, batch_headers.len());
let header_batch = &batch_headers[..end];
match download_peers.request_block_bodies(header_batch).await {
Ok(Some(bodies)) => {
debug!("Obtained: {} block bodies", bodies.len());
let block_batch = batch_headers
.drain(..bodies.len())
.zip(bodies)
.map(|(header, body)| Block { header, body });
blocks.extend(block_batch);
}
Ok(None) => {
let _ = body_tx.send(Err(SyncError::BodiesNotFound)).await;
return;
}
Err(e) => {
let _ = body_tx.send(Err(e.into())).await;
return;
}
}
}
if !blocks.is_empty()
&& body_tx.send(Ok((blocks, final_batch))).await.is_err()
{
// Receiver dropped (execution loop stopped), stop downloading
return;
}
}
});

// Main loop: receive downloaded batches and execute them
while let Some(result) = body_rx.recv().await {
let (blocks, final_batch) = result?;
info!(
"Executing {} blocks for full sync. First block hash: {:#?} Last block hash: {:#?}",
blocks.len(),
blocks.first().ok_or(SyncError::NoBlocks)?.hash(),
blocks.last().ok_or(SyncError::NoBlocks)?.hash()
);
add_blocks_in_batch(
blockchain.clone(),
cancel_token.clone(),
blocks,
final_batch,
store.clone(),
)
.await?;
}

// Ensure the download task completes and propagate any panics
download_task.await?;

// Execute pending blocks
if !pending_blocks.is_empty() {
info!(
Expand Down
Loading