Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2302,6 +2302,102 @@ impl Blockchain {
Ok(())
}

/// Adds multiple blocks using pipelined execution in sub-batches.
///
/// Instead of running all blocks sequentially then doing one giant merkle at the end
/// (as `add_blocks_in_batch` does), this method processes blocks in sub-batches using
/// the block pipeline (`add_block_pipeline`). Each block gets concurrent
/// execution + merkleization, with warmer prefetching and 16 parallel shard workers.
///
/// The trade-off: per-block merkleization instead of one collapsed batch, but the
/// pipeline overlap (merkle runs concurrently with execution) should more than compensate.
///
/// Sub-batch boundaries are used for progress logging and yielding to the async runtime.
pub async fn add_blocks_in_pipeline_batches(
&self,
blocks: Vec<Block>,
sub_batch_size: usize,
cancellation_token: CancellationToken,
) -> Result<(), (ChainError, Option<BatchBlockProcessingFailure>)> {
let blocks_len = blocks.len();
let mut last_valid_hash = H256::default();
let mut total_gas_used: u64 = 0;
let mut transactions_count: usize = 0;

let interval = Instant::now();

for (sub_batch_idx, sub_batch) in blocks.chunks(sub_batch_size).enumerate() {
if cancellation_token.is_cancelled() {
info!("Received shutdown signal, aborting");
return Err((ChainError::Custom(String::from("shutdown signal")), None));
}

for block in sub_batch {
let block_hash = block.hash();
self.add_block_pipeline(block.clone(), None).map_err(|e| {
(
e,
Some(BatchBlockProcessingFailure {
failed_block_hash: block_hash,
last_valid_hash,
}),
)
})?;
last_valid_hash = block_hash;
total_gas_used += block.header.gas_used;
transactions_count += block.body.transactions.len();
}

let blocks_done = (sub_batch_idx + 1) * sub_batch_size;
let blocks_done = blocks_done.min(blocks_len);
debug!(
"Pipeline sub-batch {}: processed {}/{} blocks",
sub_batch_idx + 1,
blocks_done,
blocks_len,
);

// Yield to the async runtime between sub-batches
tokio::task::yield_now().await;
}

let last_block = blocks
.last()
.ok_or_else(|| (ChainError::Custom("Last block not found".into()), None))?;

let last_block_number = last_block.header.number;
let last_block_gas_limit = last_block.header.gas_limit;

let elapsed_seconds = interval.elapsed().as_secs_f64();
let throughput = if elapsed_seconds > 0.0 && total_gas_used != 0 {
let as_gigas = (total_gas_used as f64) / 1e9;
as_gigas / elapsed_seconds
} else {
0.0
};

metrics!(
METRICS_BLOCKS.set_block_number(last_block_number);
METRICS_BLOCKS.set_latest_block_gas_limit(last_block_gas_limit as f64);
METRICS_BLOCKS.set_latest_gas_used(total_gas_used as f64 / blocks_len as f64);
METRICS_BLOCKS.set_latest_gigagas(throughput);
);

if self.options.perf_logs_enabled {
info!(
"[METRICS] Pipeline executed and stored: Range: {}, Last block num: {}, Last block gas limit: {}, Total transactions: {}, Total Gas: {}, Throughput: {} Gigagas/s",
blocks_len,
last_block_number,
last_block_gas_limit,
transactions_count,
total_gas_used,
throughput
);
}

Ok(())
}

/// Add a blob transaction and its blobs bundle to the mempool checking that the transaction is valid
#[cfg(feature = "c-kzg")]
pub async fn add_blob_transaction_to_pool(
Expand Down
4 changes: 4 additions & 0 deletions crates/networking/p2p/snap/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ pub const MIN_FULL_BLOCKS: u64 = 10_000;
/// Number of blocks to execute in a single batch during full sync.
pub const EXECUTE_BATCH_SIZE_DEFAULT: usize = 1024;

/// Number of blocks per sub-batch when using pipelined execution during full sync.
/// Each sub-batch processes blocks through the pipeline (concurrent exec + merkle).
pub const PIPELINE_SUB_BATCH_SIZE_DEFAULT: usize = 64;

/// Average time between blocks (used for timestamp-based calculations).
pub const SECONDS_PER_BLOCK: u64 = 12;

Expand Down
4 changes: 3 additions & 1 deletion crates/networking/p2p/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod snap_sync;

use crate::metrics::METRICS;
use crate::peer_handler::{PeerHandler, PeerHandlerError};
use crate::snap::constants::EXECUTE_BATCH_SIZE_DEFAULT;
use crate::snap::constants::{EXECUTE_BATCH_SIZE_DEFAULT, PIPELINE_SUB_BATCH_SIZE_DEFAULT};
use crate::utils::delete_leaves_folder;
use ethrex_blockchain::{Blockchain, error::ChainError};
use ethrex_common::H256;
Expand Down Expand Up @@ -40,10 +40,12 @@ pub use snap_sync::{
#[cfg(feature = "sync-test")]
lazy_static::lazy_static! {
static ref EXECUTE_BATCH_SIZE: usize = std::env::var("EXECUTE_BATCH_SIZE").map(|var| var.parse().expect("Execute batch size environmental variable is not a number")).unwrap_or(EXECUTE_BATCH_SIZE_DEFAULT);
static ref PIPELINE_SUB_BATCH_SIZE: usize = std::env::var("PIPELINE_SUB_BATCH_SIZE").map(|var| var.parse().expect("Pipeline sub-batch size environmental variable is not a number")).unwrap_or(PIPELINE_SUB_BATCH_SIZE_DEFAULT);
}
#[cfg(not(feature = "sync-test"))]
lazy_static::lazy_static! {
static ref EXECUTE_BATCH_SIZE: usize = EXECUTE_BATCH_SIZE_DEFAULT;
static ref PIPELINE_SUB_BATCH_SIZE: usize = PIPELINE_SUB_BATCH_SIZE_DEFAULT;
}

#[derive(Debug, PartialEq, Clone, Default)]
Expand Down
38 changes: 10 additions & 28 deletions crates/networking/p2p/sync/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tracing::{debug, info, warn};
use crate::peer_handler::{BlockRequestOrder, PeerHandler};
use crate::snap::constants::{MAX_BLOCK_BODIES_TO_REQUEST, MAX_HEADER_FETCH_ATTEMPTS};

use super::{EXECUTE_BATCH_SIZE, SyncError};
use super::{EXECUTE_BATCH_SIZE, PIPELINE_SUB_BATCH_SIZE, SyncError};

/// Performs full sync cycle - fetches and executes all blocks between current head and sync head
///
Expand Down Expand Up @@ -266,37 +266,19 @@ async fn add_blocks_in_batch(
Ok(())
}

/// Executes the given blocks and stores them
/// If sync_head_found is true, they will be executed one by one
/// If sync_head_found is false, they will be executed in a single batch
/// Executes the given blocks and stores them using pipelined execution.
///
/// Blocks are processed in sub-batches (configured via `PIPELINE_SUB_BATCH_SIZE`),
/// where each block goes through the pipeline with concurrent execution + merkleization.
/// For the final batch (near sync head), this also stores each block's state individually.
async fn add_blocks(
blockchain: Arc<Blockchain>,
blocks: Vec<Block>,
sync_head_found: bool,
_final_batch: bool,
cancel_token: CancellationToken,
) -> Result<(), (ChainError, Option<BatchBlockProcessingFailure>)> {
// If we found the sync head, run the blocks sequentially to store all the blocks's state
if sync_head_found {
tokio::task::spawn_blocking(move || {
let mut last_valid_hash = H256::default();
for block in blocks {
let block_hash = block.hash();
blockchain.add_block_pipeline(block, None).map_err(|e| {
(
e,
Some(BatchBlockProcessingFailure {
last_valid_hash,
failed_block_hash: block_hash,
}),
)
})?;
last_valid_hash = block_hash;
}
Ok(())
})
let sub_batch_size = *PIPELINE_SUB_BATCH_SIZE;
blockchain
.add_blocks_in_pipeline_batches(blocks, sub_batch_size, cancel_token)
.await
.map_err(|e| (ChainError::Custom(e.to_string()), None))?
} else {
blockchain.add_blocks_in_batch(blocks, cancel_token).await
}
}
Loading