diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 078e12a87a9..a586598fb3d 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -2302,6 +2302,102 @@ impl Blockchain { Ok(()) } + /// Adds multiple blocks using pipelined execution in sub-batches. + /// + /// Instead of running all blocks sequentially then doing one giant merkle at the end + /// (as `add_blocks_in_batch` does), this method processes blocks in sub-batches using + /// the block pipeline (`add_block_pipeline`). Each block gets concurrent + /// execution + merkleization, with warmer prefetching and 16 parallel shard workers. + /// + /// The trade-off: per-block merkleization instead of one collapsed batch, but the + /// pipeline overlap (merkle runs concurrently with execution) should more than compensate. + /// + /// Sub-batch boundaries are used for progress logging and yielding to the async runtime. + pub async fn add_blocks_in_pipeline_batches( + &self, + blocks: Vec, + sub_batch_size: usize, + cancellation_token: CancellationToken, + ) -> Result<(), (ChainError, Option)> { + let blocks_len = blocks.len(); + let mut last_valid_hash = H256::default(); + let mut total_gas_used: u64 = 0; + let mut transactions_count: usize = 0; + + let interval = Instant::now(); + + for (sub_batch_idx, sub_batch) in blocks.chunks(sub_batch_size).enumerate() { + if cancellation_token.is_cancelled() { + info!("Received shutdown signal, aborting"); + return Err((ChainError::Custom(String::from("shutdown signal")), None)); + } + + for block in sub_batch { + let block_hash = block.hash(); + self.add_block_pipeline(block.clone(), None).map_err(|e| { + ( + e, + Some(BatchBlockProcessingFailure { + failed_block_hash: block_hash, + last_valid_hash, + }), + ) + })?; + last_valid_hash = block_hash; + total_gas_used += block.header.gas_used; + transactions_count += block.body.transactions.len(); + } + + let blocks_done = (sub_batch_idx + 1) * sub_batch_size; + let blocks_done = blocks_done.min(blocks_len); + debug!( + "Pipeline sub-batch {}: processed {}/{} blocks", + sub_batch_idx + 1, + blocks_done, + blocks_len, + ); + + // Yield to the async runtime between sub-batches + tokio::task::yield_now().await; + } + + let last_block = blocks + .last() + .ok_or_else(|| (ChainError::Custom("Last block not found".into()), None))?; + + let last_block_number = last_block.header.number; + let last_block_gas_limit = last_block.header.gas_limit; + + let elapsed_seconds = interval.elapsed().as_secs_f64(); + let throughput = if elapsed_seconds > 0.0 && total_gas_used != 0 { + let as_gigas = (total_gas_used as f64) / 1e9; + as_gigas / elapsed_seconds + } else { + 0.0 + }; + + metrics!( + METRICS_BLOCKS.set_block_number(last_block_number); + METRICS_BLOCKS.set_latest_block_gas_limit(last_block_gas_limit as f64); + METRICS_BLOCKS.set_latest_gas_used(total_gas_used as f64 / blocks_len as f64); + METRICS_BLOCKS.set_latest_gigagas(throughput); + ); + + if self.options.perf_logs_enabled { + info!( + "[METRICS] Pipeline executed and stored: Range: {}, Last block num: {}, Last block gas limit: {}, Total transactions: {}, Total Gas: {}, Throughput: {} Gigagas/s", + blocks_len, + last_block_number, + last_block_gas_limit, + transactions_count, + total_gas_used, + throughput + ); + } + + Ok(()) + } + /// Add a blob transaction and its blobs bundle to the mempool checking that the transaction is valid #[cfg(feature = "c-kzg")] pub async fn add_blob_transaction_to_pool( diff --git a/crates/networking/p2p/snap/constants.rs b/crates/networking/p2p/snap/constants.rs index 28732ca65cd..a1b4b0cd32f 100644 --- a/crates/networking/p2p/snap/constants.rs +++ b/crates/networking/p2p/snap/constants.rs @@ -108,6 +108,10 @@ pub const MIN_FULL_BLOCKS: u64 = 10_000; /// Number of blocks to execute in a single batch during full sync. pub const EXECUTE_BATCH_SIZE_DEFAULT: usize = 1024; +/// Number of blocks per sub-batch when using pipelined execution during full sync. +/// Each sub-batch processes blocks through the pipeline (concurrent exec + merkle). +pub const PIPELINE_SUB_BATCH_SIZE_DEFAULT: usize = 64; + /// Average time between blocks (used for timestamp-based calculations). pub const SECONDS_PER_BLOCK: u64 = 12; diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 8f1ff36fcf1..f559f45b00d 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -11,7 +11,7 @@ mod snap_sync; use crate::metrics::METRICS; use crate::peer_handler::{PeerHandler, PeerHandlerError}; -use crate::snap::constants::EXECUTE_BATCH_SIZE_DEFAULT; +use crate::snap::constants::{EXECUTE_BATCH_SIZE_DEFAULT, PIPELINE_SUB_BATCH_SIZE_DEFAULT}; use crate::utils::delete_leaves_folder; use ethrex_blockchain::{Blockchain, error::ChainError}; use ethrex_common::H256; @@ -40,10 +40,12 @@ pub use snap_sync::{ #[cfg(feature = "sync-test")] lazy_static::lazy_static! { static ref EXECUTE_BATCH_SIZE: usize = std::env::var("EXECUTE_BATCH_SIZE").map(|var| var.parse().expect("Execute batch size environmental variable is not a number")).unwrap_or(EXECUTE_BATCH_SIZE_DEFAULT); + static ref PIPELINE_SUB_BATCH_SIZE: usize = std::env::var("PIPELINE_SUB_BATCH_SIZE").map(|var| var.parse().expect("Pipeline sub-batch size environmental variable is not a number")).unwrap_or(PIPELINE_SUB_BATCH_SIZE_DEFAULT); } #[cfg(not(feature = "sync-test"))] lazy_static::lazy_static! { static ref EXECUTE_BATCH_SIZE: usize = EXECUTE_BATCH_SIZE_DEFAULT; + static ref PIPELINE_SUB_BATCH_SIZE: usize = PIPELINE_SUB_BATCH_SIZE_DEFAULT; } #[derive(Debug, PartialEq, Clone, Default)] diff --git a/crates/networking/p2p/sync/full.rs b/crates/networking/p2p/sync/full.rs index aaf6c145fa4..887c5b916e3 100644 --- a/crates/networking/p2p/sync/full.rs +++ b/crates/networking/p2p/sync/full.rs @@ -17,7 +17,7 @@ use tracing::{debug, info, warn}; use crate::peer_handler::{BlockRequestOrder, PeerHandler}; use crate::snap::constants::{MAX_BLOCK_BODIES_TO_REQUEST, MAX_HEADER_FETCH_ATTEMPTS}; -use super::{EXECUTE_BATCH_SIZE, SyncError}; +use super::{EXECUTE_BATCH_SIZE, PIPELINE_SUB_BATCH_SIZE, SyncError}; /// Performs full sync cycle - fetches and executes all blocks between current head and sync head /// @@ -266,37 +266,19 @@ async fn add_blocks_in_batch( Ok(()) } -/// Executes the given blocks and stores them -/// If sync_head_found is true, they will be executed one by one -/// If sync_head_found is false, they will be executed in a single batch +/// Executes the given blocks and stores them using pipelined execution. +/// +/// Blocks are processed in sub-batches (configured via `PIPELINE_SUB_BATCH_SIZE`), +/// where each block goes through the pipeline with concurrent execution + merkleization. +/// For the final batch (near sync head), this also stores each block's state individually. async fn add_blocks( blockchain: Arc, blocks: Vec, - sync_head_found: bool, + _final_batch: bool, cancel_token: CancellationToken, ) -> Result<(), (ChainError, Option)> { - // If we found the sync head, run the blocks sequentially to store all the blocks's state - if sync_head_found { - tokio::task::spawn_blocking(move || { - let mut last_valid_hash = H256::default(); - for block in blocks { - let block_hash = block.hash(); - blockchain.add_block_pipeline(block, None).map_err(|e| { - ( - e, - Some(BatchBlockProcessingFailure { - last_valid_hash, - failed_block_hash: block_hash, - }), - ) - })?; - last_valid_hash = block_hash; - } - Ok(()) - }) + let sub_batch_size = *PIPELINE_SUB_BATCH_SIZE; + blockchain + .add_blocks_in_pipeline_batches(blocks, sub_batch_size, cancel_token) .await - .map_err(|e| (ChainError::Custom(e.to_string()), None))? - } else { - blockchain.add_blocks_in_batch(blocks, cancel_token).await - } }