diff --git a/crates/full-node/sov-blob-sender/src/lib.rs b/crates/full-node/sov-blob-sender/src/lib.rs index 8c5788da83..8d0d8e9c9d 100644 --- a/crates/full-node/sov-blob-sender/src/lib.rs +++ b/crates/full-node/sov-blob-sender/src/lib.rs @@ -3,8 +3,7 @@ mod in_flight_blob; use std::collections::HashMap; use std::path::Path; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::{Duration, SystemTime}; use async_trait::async_trait; @@ -47,6 +46,52 @@ pub fn new_blob_id() -> BlobInternalId { uuid::Uuid::now_v7().as_u128() } +/// Tracks the number of in-flight blob submissions, broken down by type (batch vs proof). +/// +/// Uses a [`RwLock`] internally so that reads of both counters are consistent +/// (no race between reading total and reading proofs). +#[derive(Debug, Default, Clone)] +pub struct InFlightBlobCounts { + inner: Arc>, +} + +#[derive(Debug, Default)] +struct InFlightBlobCountsInner { + total: usize, + proofs: usize, +} + +impl InFlightBlobCounts { + /// Increment the count. If `is_proof` is true, both the total and proof counts increase. + pub fn increment(&self, is_proof: bool) { + let mut inner = self.inner.write().expect("poisoned"); + inner.total += 1; + if is_proof { + inner.proofs += 1; + } + } + + /// Decrement the count. If `is_proof` is true, both the total and proof counts decrease. + pub fn decrement(&self, is_proof: bool) { + let mut inner = self.inner.write().expect("poisoned"); + inner.total = inner.total.saturating_sub(1); + if is_proof { + inner.proofs = inner.proofs.saturating_sub(1); + } + } + + /// Returns (total_blobs, total_proofs) as a consistent snapshot. + pub fn get(&self) -> (usize, usize) { + let inner = self.inner.read().expect("poisoned"); + (inner.total, inner.proofs) + } + + /// Total number of in-flight blobs (batches + proofs). + pub fn total(&self) -> usize { + self.inner.read().expect("poisoned").total + } +} + /// Hooks for [`BlobSender`] events. /// /// We guarantee at-least-once delivery of all events. @@ -82,7 +127,7 @@ pub struct BlobSender { shutdown_sender: watch::Sender<()>, da: Da, finalization_manager: FM, - nb_of_concurrent_blob_submissions: Arc, + in_flight_counts: InFlightBlobCounts, blob_processing_timeout: Duration, blob_sender_channel: Option>>, ledger_pool_interval: Duration, @@ -104,7 +149,7 @@ where blob_processing_timeout: Duration, blob_sender_channel: Option>>, completed_blobs_to_send: Vec<(BlobToSend, BlobInternalId)>, - nb_of_concurrent_blob_submissions: Arc, + in_flight_counts: InFlightBlobCounts, ) -> anyhow::Result<(Self, JoinHandle<()>)> { Self::new_with_task_intervals( da, @@ -116,7 +161,7 @@ where blob_sender_channel, LEDGER_POLL_INTERVAL, completed_blobs_to_send, - nb_of_concurrent_blob_submissions, + in_flight_counts, ) .await } @@ -131,7 +176,7 @@ where blob_sender_channel: Option>>, ledger_pool_interval: Duration, completed_blobs_to_send: Vec<(BlobToSend, BlobInternalId)>, - nb_of_concurrent_blob_submissions: Arc, + in_flight_counts: InFlightBlobCounts, ) -> anyhow::Result<(Self, JoinHandle<()>)> { let shutdown_receiver = shutdown_sender.subscribe(); let db = Arc::new(BlobSenderDb::new(storage_path).await?); @@ -163,7 +208,7 @@ where shutdown_sender, da, finalization_manager, - nb_of_concurrent_blob_submissions, + in_flight_counts, blob_processing_timeout, blob_sender_channel, ledger_pool_interval, @@ -177,18 +222,12 @@ where /// Number of concurrent blob submissions in flight. pub fn nb_of_concurrent_blob_submissions(&self) -> usize { - self.nb_of_concurrent_blob_submissions - .load(Ordering::Relaxed) - } - - /// Returns a handle to the (atomic) number of blob submissions currently in flight. - pub fn nb_of_in_flight_blobs_handle(&self) -> Arc { - self.nb_of_concurrent_blob_submissions.clone() + self.in_flight_counts.total() } - fn inc_nb_of_concurrent_blob_submissions(&self) { - self.nb_of_concurrent_blob_submissions - .fetch_add(1, Ordering::Relaxed); + /// Returns a handle to the in-flight blob counts. + pub fn in_flight_counts(&self) -> InFlightBlobCounts { + self.in_flight_counts.clone() } /// Returns a reference to the [`BlobSenderHooks`] instance. @@ -285,7 +324,7 @@ where db: self.db.clone(), hooks: self.hooks.clone(), in_flight_blobs: self.in_flight_blobs.clone(), - nb_of_concurrent_blob_submissions: self.nb_of_concurrent_blob_submissions.clone(), + in_flight_counts: self.in_flight_counts.clone(), blob_processing_timeout: self.blob_processing_timeout, ledger_pool_interval: self.ledger_pool_interval, blob_sender_channel: self.blob_sender_channel.clone(), @@ -294,7 +333,7 @@ where let shutdown_receiver = self.shutdown_receiver.clone(); - self.inc_nb_of_concurrent_blob_submissions(); + self.in_flight_counts.increment(!is_batch); let handle = tokio::task::spawn({ let state = task_state; let blob = blob.clone(); @@ -314,7 +353,7 @@ where info!(%blob_id, "BlobSender: Shutting down task"); } } - state.dec_nb_of_concurrent_blob_submissions(); + state.in_flight_counts.decrement(!is_batch); } }); @@ -455,7 +494,7 @@ struct TaskState { db: Arc, hooks: Arc>, in_flight_blobs: Arc>>>, - nb_of_concurrent_blob_submissions: Arc, + in_flight_counts: InFlightBlobCounts, blob_processing_timeout: Duration, ledger_pool_interval: Duration, blob_sender_channel: Option>>, @@ -463,11 +502,6 @@ struct TaskState { } impl TaskState { - fn dec_nb_of_concurrent_blob_submissions(&self) { - self.nb_of_concurrent_blob_submissions - .fetch_sub(1, Ordering::Relaxed); - } - async fn remove_blob_or_err(&self, blob_id: BlobInternalId) -> anyhow::Result<()> { let res = self.db.remove(blob_id).await; if let Err(err) = &res { @@ -761,7 +795,7 @@ impl TaskState { match blob_status.blob_selector_status { Some(BlobSelectorStatus::Accepted) | Some(BlobSelectorStatus::Discarded( - BlobDiscardReason::SequenceNumberTooLow, + BlobDiscardReason::SequenceNumberTooLow { .. }, )) => { trace!(%blob_id, %receipt, ?blob_status, "Removing blob form the blob sender"); diff --git a/crates/full-node/sov-blob-sender/tests/blob_sender.rs b/crates/full-node/sov-blob-sender/tests/blob_sender.rs index ae54427c0d..e84faa076a 100644 --- a/crates/full-node/sov-blob-sender/tests/blob_sender.rs +++ b/crates/full-node/sov-blob-sender/tests/blob_sender.rs @@ -15,7 +15,6 @@ use sov_rollup_interface::da::BlobReaderTrait; use sov_rollup_interface::node::da::DaService; use sov_rollup_interface::stf::BlobDiscardReason; use sov_test_utils::logging::LogCollector; -use std::sync::atomic::AtomicUsize; use tempfile::TempDir; use tokio::sync::{broadcast, watch, RwLock}; use tokio::task::JoinHandle; @@ -327,7 +326,10 @@ async fn blobs_with_seq_nr_too_low_are_not_resubmitted() -> anyhow::Result<()> { Duration::from_secs(20), &deps, Some(status_sender), - BlobSelectorStatus::Discarded(BlobDiscardReason::SequenceNumberTooLow), + BlobSelectorStatus::Discarded(BlobDiscardReason::SequenceNumberTooLow { + found: 0, + expected: 1, + }), ) .await; let nb_of_blobs = 1; @@ -366,7 +368,7 @@ async fn blobs_with_seq_nr_too_low_are_not_resubmitted() -> anyhow::Result<()> { ) && matches!( status.blob_selector_status, Some(BlobSelectorStatus::Discarded( - BlobDiscardReason::SequenceNumberTooLow + BlobDiscardReason::SequenceNumberTooLow { .. } )) ) }, @@ -496,7 +498,6 @@ async fn create_blob_sender( let hooks = TestHooks {}; - let nb_of_concurrent_blob_submissions = Arc::new(AtomicUsize::new(0)); let (blob_sender, handle) = BlobSender::new_with_task_intervals( deps.da.clone(), finalization_manager, @@ -507,7 +508,7 @@ async fn create_blob_sender( blob_status_sender, Duration::from_millis(1000), Default::default(), - nb_of_concurrent_blob_submissions, + Default::default(), ) .await .unwrap(); diff --git a/crates/full-node/sov-sequencer/src/preferred/initialization.rs b/crates/full-node/sov-sequencer/src/preferred/initialization.rs index e55082c7fe..58e5bc8675 100644 --- a/crates/full-node/sov-sequencer/src/preferred/initialization.rs +++ b/crates/full-node/sov-sequencer/src/preferred/initialization.rs @@ -128,7 +128,7 @@ where let (executor_events_sender, executor_events_receiver) = ExecutorEventsSender::new(shutdown_sender.clone(), db_cache); - let in_flight_blobs = blob_sender.nb_of_in_flight_blobs(); + let in_flight_counts = blob_sender.in_flight_counts(); let (forced_tx_batch_notifier, _) = broadcast::channel(1); let rollup_exec_config = RollupBlockExecutorConfig { @@ -168,7 +168,7 @@ where shutdown_sender.clone(), executor_events_sender, next_sequence_number, - in_flight_blobs, + in_flight_counts, stop_at_rollup_height, rollup_exec_config.clone(), cached_txs.write_handle(), diff --git a/crates/full-node/sov-sequencer/src/preferred/preferred_blob_sender.rs b/crates/full-node/sov-sequencer/src/preferred/preferred_blob_sender.rs index 2ed0138c6b..b905fd6125 100644 --- a/crates/full-node/sov-sequencer/src/preferred/preferred_blob_sender.rs +++ b/crates/full-node/sov-sequencer/src/preferred/preferred_blob_sender.rs @@ -1,13 +1,10 @@ use sov_blob_sender::BlobExecutionStatus; -use sov_blob_sender::{BlobInternalId, BlobSender, BlobToSend}; +use sov_blob_sender::{BlobInternalId, BlobSender, BlobToSend, InFlightBlobCounts}; use sov_blob_storage::{PreferredBatchData, PreferredProofData}; use sov_db::ledger_db::LedgerDb; use sov_modules_api::TxHash; use sov_rollup_interface::node::da::DaService; -use std::{ - path::Path, - sync::{atomic::AtomicUsize, Arc}, -}; +use std::{path::Path, sync::Arc}; use tokio::sync::broadcast; use tokio::sync::watch; use tokio::task::JoinHandle; @@ -22,7 +19,7 @@ use crate::{common::TxStatusBlobSenderHooks, TxStatusManager}; /// Wrapper around [`BlobSender`] with preferred blob -specific logic. pub struct PreferredBlobSender { inner: Option, LedgerDb>>, - nb_of_concurrent_blob_submissions: Arc, + in_flight_counts: InFlightBlobCounts, } impl PreferredBlobSender { @@ -37,12 +34,12 @@ impl PreferredBlobSender { blobs_sender_channel: broadcast::Sender>, seq_role: SequencerRole, ) -> anyhow::Result<(Self, Option>)> { - let nb_of_concurrent_blob_submissions = Arc::new(AtomicUsize::new(0)); + let in_flight_counts = InFlightBlobCounts::default(); match seq_role { SequencerRole::PgSyncReplica | SequencerRole::DaOnlyReplica => Ok(( Self { inner: None, - nb_of_concurrent_blob_submissions, + in_flight_counts, }, None, )), @@ -63,14 +60,14 @@ impl PreferredBlobSender { blob_processing_timeout, Some(blobs_sender_channel), blobs_to_send, - nb_of_concurrent_blob_submissions.clone(), + in_flight_counts.clone(), ) .await?; Ok(( Self { inner: Some(inner), - nb_of_concurrent_blob_submissions, + in_flight_counts, }, Some(blob_sender_handle), )) @@ -132,8 +129,8 @@ impl PreferredBlobSender { Ok(()) } - pub(crate) fn nb_of_in_flight_blobs(&self) -> Arc { - self.nb_of_concurrent_blob_submissions.clone() + pub(crate) fn in_flight_counts(&self) -> InFlightBlobCounts { + self.in_flight_counts.clone() } pub(crate) async fn add_txs(&self, blob_id: BlobInternalId, tx_hashes: Arc>) { diff --git a/crates/full-node/sov-sequencer/src/preferred/sync_sequencer_state/inner.rs b/crates/full-node/sov-sequencer/src/preferred/sync_sequencer_state/inner.rs index 20ab9e7e77..feec088c49 100644 --- a/crates/full-node/sov-sequencer/src/preferred/sync_sequencer_state/inner.rs +++ b/crates/full-node/sov-sequencer/src/preferred/sync_sequencer_state/inner.rs @@ -36,7 +36,7 @@ use sov_state::pinned_cache::PinnedCache; use sov_state::{NativeStorage, Storage}; use std::num::NonZero; use std::ops::Deref; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::AtomicU64; use std::sync::Arc; use tokio::sync::{oneshot, watch}; use tracing::{debug, info, warn}; @@ -48,6 +48,10 @@ const COMFORTABLE_SIZE_LIMIT_MULTIPLIER: u64 = 99; const COMFORTABLE_SIZE_LIMIT_DIVISOR: u64 = 100; const COMFORTABLE_IN_FLIGHT_BLOBS: usize = 5; +/// The minimum number of batches that have to be in flight for the sequencer to consider it *un*comfortable to produce a new batch. +/// Without this value, the seuqencer will skip batch production when there are only proofs in flight, and since proofs aren't processed +/// until the next batch arrives this creates a deadlock. +const COMFORTABLE_IN_FLIGHT_BATCHES: usize = 2; const METRICS_BATCH_SIZE: usize = 32; @@ -82,7 +86,7 @@ where pub(crate) batch_execution_time_limit_micros: u64, pub(crate) batch_size_tracker: BatchSizeTracker, pub(crate) is_ready: Result<(), SequencerNotReadyDetails>, - pub(crate) in_flight_blobs: Arc, + pub(crate) in_flight_counts: sov_blob_sender::InFlightBlobCounts, pub(crate) executor_events_sender: ExecutorEventsSender, // We track two sequence numbers: the sequence number of the current open batch, and the next unassigned sequence number. // This is because we might need to assign a sequence number to some proofs while a batch is in progress, @@ -183,7 +187,7 @@ where Rt: Runtime, { pub(crate) fn nb_of_concurrent_blob_submissions(&self) -> usize { - self.in_flight_blobs.load(Ordering::Acquire) + self.in_flight_counts.total() } pub(crate) async fn overwrite_next_sequence_number_for_recovery( @@ -391,11 +395,15 @@ where return; } - let in_flight_blobs = self.in_flight_blobs.load(Ordering::Relaxed); - if in_flight_blobs >= COMFORTABLE_IN_FLIGHT_BLOBS { - tracing::trace!( + let (in_flight_blobs, in_flight_proofs) = self.in_flight_counts.get(); + let in_flight_batches = in_flight_blobs.saturating_sub(in_flight_proofs); + if in_flight_blobs >= COMFORTABLE_IN_FLIGHT_BLOBS + && in_flight_batches >= COMFORTABLE_IN_FLIGHT_BATCHES + { + tracing::info!( current_in_flight = %in_flight_blobs, max_comfortable = %COMFORTABLE_IN_FLIGHT_BLOBS, + current_sequence_number_of_open_batch = %self.sequence_number_of_open_batch.unwrap_or(0), "Skipping batch production due too many in flight blobs"); return; } diff --git a/crates/full-node/sov-sequencer/src/preferred/sync_sequencer_state/mod.rs b/crates/full-node/sov-sequencer/src/preferred/sync_sequencer_state/mod.rs index 9ae3638773..2d9da8f9c4 100644 --- a/crates/full-node/sov-sequencer/src/preferred/sync_sequencer_state/mod.rs +++ b/crates/full-node/sov-sequencer/src/preferred/sync_sequencer_state/mod.rs @@ -32,7 +32,7 @@ use sov_modules_api::VersionReader; use sov_modules_api::{FullyBakedTx, Runtime, Spec, StateUpdateInfo}; use sov_state::Storage; use std::collections::BTreeMap; -use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize}; +use std::sync::atomic::{AtomicU32, AtomicU64}; use std::sync::Arc; pub(crate) use sync_state::*; use tokio::sync::broadcast; @@ -168,7 +168,7 @@ pub(crate) fn create( shutdown_sender: watch::Sender<()>, executor_events_sender: ExecutorEventsSender, sequence_number_of_next_blob: SequenceNumber, - in_flight_blobs: Arc, + in_flight_counts: sov_blob_sender::InFlightBlobCounts, stop_at_rollup_height: Option, rollup_exec_config: RollupBlockExecutorConfig, tx_cache_writer: TxResultWriter, @@ -216,7 +216,7 @@ where executor_events_sender, sequence_number_of_open_batch: None, next_unassigned_sequence_number: sequence_number_of_next_blob, - in_flight_blobs, + in_flight_counts, has_finished_startup: false, metrics: Vec::with_capacity(128), is_ready, diff --git a/crates/full-node/sov-sequencer/src/preferred/sync_sequencer_state/sync_state.rs b/crates/full-node/sov-sequencer/src/preferred/sync_sequencer_state/sync_state.rs index b21a63f4e5..8c5a5c3b24 100644 --- a/crates/full-node/sov-sequencer/src/preferred/sync_sequencer_state/sync_state.rs +++ b/crates/full-node/sov-sequencer/src/preferred/sync_sequencer_state/sync_state.rs @@ -856,6 +856,7 @@ where ) { let mut inner = self.get_inner_with_timing(reason).await; let sequence_number = inner.take_sequence_number_for_proof(); + tracing::info!("Took sequence number {sequence_number} for proof blob {blob_id}"); let proof_bytes = proof_bytes(&data.0, sequence_number).expect("Serialization to vec is infallible"); inner diff --git a/crates/full-node/sov-sequencer/src/standard/mod.rs b/crates/full-node/sov-sequencer/src/standard/mod.rs index d975eb4ab7..c4823276dd 100644 --- a/crates/full-node/sov-sequencer/src/standard/mod.rs +++ b/crates/full-node/sov-sequencer/src/standard/mod.rs @@ -34,7 +34,6 @@ use std::marker::PhantomData; use std::net::IpAddr; use std::num::NonZero; use std::path::Path; -use std::sync::atomic::AtomicUsize; use std::sync::Arc; use thiserror::Error; use tokio::sync::{watch, Mutex}; @@ -159,7 +158,6 @@ where "Standard sequencer require DaService to be configured with submitting support", )?; - let nb_of_concurrent_blob_submissions = Arc::new(AtomicUsize::new(0)); let (blob_sender, blob_sender_handle) = BlobSender::new( da, ledger_db.clone(), @@ -169,7 +167,7 @@ where Duration::from_secs(config.blob_processing_timeout_secs), None, Default::default(), - nb_of_concurrent_blob_submissions, + Default::default(), ) .await?; diff --git a/crates/full-node/sov-sequencer/src/test_stateless.rs b/crates/full-node/sov-sequencer/src/test_stateless.rs index 4325bef72c..3fa5f24d28 100644 --- a/crates/full-node/sov-sequencer/src/test_stateless.rs +++ b/crates/full-node/sov-sequencer/src/test_stateless.rs @@ -16,7 +16,6 @@ use sov_rollup_interface::{StateUpdateInfo, TxHash}; use std::marker::PhantomData; use std::net::IpAddr; use std::path::Path; -use std::sync::atomic::AtomicUsize; use std::sync::Arc; use tokio::sync::{watch, Mutex}; use tokio::task::JoinHandle; @@ -79,7 +78,6 @@ where ))); let tx_status_manager = TxStatusManager::default(); - let nb_of_concurrent_blob_submissions = Arc::new(AtomicUsize::new(0)); let seq = Self { inner: inner.into(), blob_sender: Arc::new(Mutex::new( @@ -92,7 +90,7 @@ where Duration::from_secs(config.blob_processing_timeout_secs), None, Default::default(), - nb_of_concurrent_blob_submissions, + Default::default(), ) .await? .0, diff --git a/crates/full-node/sov-sequencer/tests/integration/preferred_with_proofs.rs b/crates/full-node/sov-sequencer/tests/integration/preferred_with_proofs.rs index 80a99e0b34..1b2312c381 100644 --- a/crates/full-node/sov-sequencer/tests/integration/preferred_with_proofs.rs +++ b/crates/full-node/sov-sequencer/tests/integration/preferred_with_proofs.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use sov_mock_da::BlockProducingConfig; use sov_mock_zkvm::crypto::private_key::Ed25519PrivateKey; use sov_modules_api::{RawTx, Runtime}; @@ -143,3 +145,139 @@ async fn flaky_test_proof_generation_doesnt_break_sequencer() -> anyhow::Result< Ok(()) } + +/// Reproduces sovereign-labs/sovereign-sdk#2558: proof blobs are discarded +/// with SequenceNumberTooLow after a sequencer resync. +/// +/// The test generates proofs, forces a resync by burst-producing DA blocks, +/// then verifies that proofs continue to be generated and land on the ledger +/// after the resync completes. +/// +/// Run with `RUST_LOG=sov_blob_storage::capabilities=info` to see discard +/// messages confirming the root cause. +#[tokio::test(flavor = "multi_thread")] +async fn test_proof_blobs_survive_resync() -> anyhow::Result<()> { + let _log_guard = sov_test_utils::initialize_logging(); + let (test_rollup, admin) = create_test_rollup_with_prover().await; + + test_rollup.produce_enough_finalized_slots().await; + test_rollup.wait_for_sequencer_ready().await?; + + let client = test_rollup.api_client().clone(); + let mut slot_subscription = client.subscribe_slots().await?; + let mut aggregated_proofs = client.subscribe_aggregated_proof().await?; + + // Phase 1: Generate proofs until we have at least 2 visible on the ledger. + let mut proofs_before_resync = 0usize; + let mut tx_generation = 0u64; + + for _ in 0..50 { + let tx = tx_set_value(&admin.private_key, tx_generation, tx_generation); + if client.send_raw_tx_to_sequencer(&tx).await.is_ok() { + tx_generation += 1; + } + + test_rollup.da_service.produce_block_now().await?; + let _slot = slot_subscription.next().await.unwrap()?; + + while let Ok(Some(Ok(_proof))) = + tokio::time::timeout(Duration::from_millis(150), aggregated_proofs.next()).await + { + proofs_before_resync += 1; + } + + if proofs_before_resync >= 2 { + break; + } + } + assert!( + proofs_before_resync >= 2, + "Expected at least 2 proofs before resync, got {proofs_before_resync}" + ); + eprintln!("[test] Phase 1 complete: {proofs_before_resync} proofs generated before resync"); + + // Phase 2: Force a resync by shutting down, producing DA blocks while + // offline, then restarting. This is deterministic and doesn't depend on + // timing — the node restarts behind DA and must resync. + let builder = test_rollup.shutdown().await?; + let rollup_storage_path = builder.storage_path(); + + // Produce blocks while the rollup is offline. + // The DA service persists in sqlite, so blocks accumulate while the node is down. + let da_for_offline = sov_mock_da::storable::StorableMockDaService::from_config( + sov_mock_da::MockDaConfig { + connection_string: sov_mock_da::MockDaConfig::sqlite_in_dir(rollup_storage_path.path()) + .unwrap(), + sender_address: sov_mock_da::MockAddress::new([0; 32]), + finalization_blocks: 0, + block_producing: BlockProducingConfig::Manual, + da_layer: None, + randomization: None, + failure_behavior: Default::default(), + }, + tokio::sync::watch::channel(()).1, + ) + .await; + for _ in 0..20 { + da_for_offline.produce_block_now().await?; + } + drop(da_for_offline); + eprintln!("[test] Phase 2: produced 20 DA blocks while rollup was offline"); + + // Restart — the node will be behind DA and must resync. + let test_rollup = builder.start().await?; + test_rollup.wait_for_node_synced().await?; + test_rollup.wait_for_sequencer_ready().await?; + eprintln!("[test] Sequencer recovered from resync"); + + // Phase 3: Continue producing blocks and verify proofs keep landing. + // Use the new rollup's client since the old one points to a dead port. + let client = test_rollup.api_client().clone(); + let mut slot_subscription = client.subscribe_slots().await?; + let mut aggregated_proofs = client.subscribe_aggregated_proof().await?; + let mut proofs_after_resync = 0usize; + + for i in 0..50 { + let tx = tx_set_value(&admin.private_key, tx_generation, tx_generation); + match client.send_raw_tx_to_sequencer(&tx).await { + Ok(_) => tx_generation += 1, + Err(e) => { + eprintln!("[test] Phase 3 tx submit failed (iter {i}): {e}"); + } + } + + test_rollup.da_service.produce_block_now().await?; + if slot_subscription.next().await.is_none() { + slot_subscription = client.subscribe_slots().await?; + continue; + } + + while let Ok(Some(Ok(_proof))) = + tokio::time::timeout(Duration::from_millis(150), aggregated_proofs.next()).await + { + proofs_after_resync += 1; + eprintln!("[test] Proof {proofs_after_resync} arrived after resync (iter {i})"); + } + + if proofs_after_resync >= 2 { + break; + } + } + + // If this assertion fails, proof blobs were likely discarded during resync. + // Run with RUST_LOG=sov_blob_storage::capabilities=info to see: + // "Discarding blob ... reason=SequenceNumberTooLow" + assert!( + proofs_after_resync >= 2, + "Expected at least 2 proofs after resync, got {proofs_after_resync}. \ + Proof blobs were likely discarded with SequenceNumberTooLow during resync \ + (see sovereign-labs/sovereign-sdk#2558). \ + Run with RUST_LOG=sov_blob_storage::capabilities=info to confirm." + ); + + eprintln!( + "[test] PASS: {proofs_after_resync} proofs after resync, {proofs_before_resync} before" + ); + + Ok(()) +} diff --git a/crates/module-system/module-implementations/sov-blob-storage/src/capabilities.rs b/crates/module-system/module-implementations/sov-blob-storage/src/capabilities.rs index 4a589a7445..38b9fe047f 100644 --- a/crates/module-system/module-implementations/sov-blob-storage/src/capabilities.rs +++ b/crates/module-system/module-implementations/sov-blob-storage/src/capabilities.rs @@ -678,7 +678,10 @@ impl BlobStorage { discarded_blobs, preferred_sender, blob.id, - BlobDiscardReason::SequenceNumberTooLow, + BlobDiscardReason::SequenceNumberTooLow { + found: blob.inner.sequence_number(), + expected: next_sequence_number, + }, ); false } diff --git a/crates/rollup-interface/src/state_machine/stf/mod.rs b/crates/rollup-interface/src/state_machine/stf/mod.rs index 3d8602c7bf..69f61679ce 100644 --- a/crates/rollup-interface/src/state_machine/stf/mod.rs +++ b/crates/rollup-interface/src/state_machine/stf/mod.rs @@ -285,7 +285,8 @@ type ProofReceipts = )] pub enum BlobDiscardReason { /// The sequencer sent a blob with an old sequencer number that we've already processed. - SequenceNumberTooLow, + #[allow(missing_docs)] + SequenceNumberTooLow { found: u64, expected: u64 }, /// Sender doesn't have enough staked sequencer funds SenderInsufficientStake, /// The max amount of unregistered blobs allowed to be processed per slot