diff --git a/crates/networking/p2p/sync/full.rs b/crates/networking/p2p/sync/full.rs index bbae0ae1b8..d709299ace 100644 --- a/crates/networking/p2p/sync/full.rs +++ b/crates/networking/p2p/sync/full.rs @@ -7,7 +7,10 @@ use std::cmp::min; use std::sync::Arc; use std::time::Duration; -use ethrex_blockchain::{BatchBlockProcessingFailure, Blockchain, error::ChainError}; +use ethrex_blockchain::{ + BatchBlockProcessingFailure, Blockchain, + error::{ChainError, InvalidBlockError}, +}; use ethrex_common::{H256, types::Block}; use ethrex_storage::Store; use tokio::time::Instant; @@ -270,7 +273,9 @@ async fn add_blocks_in_batch( /// Executes the given blocks and stores them /// If sync_head_found is true, they will be executed one by one -/// If sync_head_found is false, they will be executed in a single batch +/// If sync_head_found is false, they will be executed in a single batch, +/// falling back to one-by-one pipeline execution if the batch fails with +/// a post-execution error (works around batch-mode state corruption bugs). async fn add_blocks( blockchain: Arc, blocks: Vec, @@ -279,26 +284,81 @@ async fn add_blocks( ) -> Result<(), (ChainError, Option)> { // If we found the sync head, run the blocks sequentially to store all the blocks's state if sync_head_found { - tokio::task::spawn_blocking(move || { - let mut last_valid_hash = H256::default(); - for block in blocks { - let block_hash = block.hash(); - blockchain.add_block_pipeline(block, None).map_err(|e| { - ( - e, - Some(BatchBlockProcessingFailure { - last_valid_hash, - failed_block_hash: block_hash, - }), - ) - })?; - last_valid_hash = block_hash; - } - Ok(()) - }) + return run_blocks_pipeline(blockchain, blocks).await; + } + + // Try batch execution first (faster). + // We clone blocks because add_blocks_in_batch takes ownership but we need + // them for the fallback. The clone cost is negligible (~1-5ms) vs batch + // execution time (median ~29s on hoodi). + match blockchain + .add_blocks_in_batch(blocks.clone(), cancel_token) .await - .map_err(|e| (ChainError::Custom(e.to_string()), None))? - } else { - blockchain.add_blocks_in_batch(blocks, cancel_token).await + { + Ok(()) => Ok(()), + Err((ChainError::InvalidBlock(ref err), ref batch_failure)) + if is_post_execution_error(err) => + { + // Batch execution can produce incorrect results due to cross-block + // state cache pollution (e.g. `mark_modified` setting `exists = true` + // leaking across block boundaries). Fall back to single-block pipeline + // execution which uses fresh state per block. + let failed_block_info = batch_failure + .as_ref() + .and_then(|f| { + blocks + .iter() + .find(|b| b.hash() == f.failed_block_hash) + .map(|b| format!("block {} ({})", b.header.number, f.failed_block_hash)) + }) + .unwrap_or_else(|| "unknown block".to_string()); + warn!( + "Batch execution failed at {failed_block_info} with: {err}. \ + Retrying batch with per-block pipeline execution." + ); + run_blocks_pipeline(blockchain, blocks).await + } + Err(e) => Err(e), } } + +/// Returns true for errors that arise from EVM execution and could differ +/// between batch mode (shared VM state) and single-block pipeline mode. +/// Pre-execution validation errors (header, body, structural) would fail +/// identically in both modes, so retrying them is pointless. +fn is_post_execution_error(err: &InvalidBlockError) -> bool { + matches!( + err, + InvalidBlockError::GasUsedMismatch(_, _) + | InvalidBlockError::StateRootMismatch + | InvalidBlockError::ReceiptsRootMismatch + | InvalidBlockError::RequestsHashMismatch + | InvalidBlockError::BlockAccessListHashMismatch + | InvalidBlockError::BlobGasUsedMismatch + ) +} + +async fn run_blocks_pipeline( + blockchain: Arc, + blocks: Vec, +) -> Result<(), (ChainError, Option)> { + tokio::task::spawn_blocking(move || { + let mut last_valid_hash = H256::default(); + for block in blocks { + let block_hash = block.hash(); + blockchain.add_block_pipeline(block, None).map_err(|e| { + ( + e, + Some(BatchBlockProcessingFailure { + last_valid_hash, + failed_block_hash: block_hash, + }), + ) + })?; + last_valid_hash = block_hash; + } + Ok(()) + }) + .await + .map_err(|e| (ChainError::Custom(e.to_string()), None))? +}