Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 61 additions & 27 deletions crates/full-node/sov-blob-sender/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ mod in_flight_blob;

use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime};

use async_trait::async_trait;
Expand Down Expand Up @@ -47,6 +46,52 @@ pub fn new_blob_id() -> BlobInternalId {
uuid::Uuid::now_v7().as_u128()
}

/// Tracks the number of in-flight blob submissions, broken down by type (batch vs proof).
///
/// Uses a [`RwLock`] internally so that reads of both counters are consistent
/// (no race between reading total and reading proofs).
#[derive(Debug, Default, Clone)]
pub struct InFlightBlobCounts {
inner: Arc<RwLock<InFlightBlobCountsInner>>,
}

#[derive(Debug, Default)]
struct InFlightBlobCountsInner {
total: usize,
proofs: usize,
}

impl InFlightBlobCounts {
/// Increment the count. If `is_proof` is true, both the total and proof counts increase.
pub fn increment(&self, is_proof: bool) {
let mut inner = self.inner.write().expect("poisoned");
inner.total += 1;
if is_proof {
inner.proofs += 1;
}
}

/// Decrement the count. If `is_proof` is true, both the total and proof counts decrease.
pub fn decrement(&self, is_proof: bool) {
let mut inner = self.inner.write().expect("poisoned");
inner.total = inner.total.saturating_sub(1);
if is_proof {
inner.proofs = inner.proofs.saturating_sub(1);
}
}

/// Returns (total_blobs, total_proofs) as a consistent snapshot.
pub fn get(&self) -> (usize, usize) {
let inner = self.inner.read().expect("poisoned");
(inner.total, inner.proofs)
}

/// Total number of in-flight blobs (batches + proofs).
pub fn total(&self) -> usize {
self.inner.read().expect("poisoned").total
}
}

/// Hooks for [`BlobSender`] events.
///
/// We guarantee at-least-once delivery of all events.
Expand Down Expand Up @@ -82,7 +127,7 @@ pub struct BlobSender<Da: DaService, H, FM: FinalizationManager> {
shutdown_sender: watch::Sender<()>,
da: Da,
finalization_manager: FM,
nb_of_concurrent_blob_submissions: Arc<AtomicUsize>,
in_flight_counts: InFlightBlobCounts,
blob_processing_timeout: Duration,
blob_sender_channel: Option<broadcast::Sender<BlobExecutionStatus<Da::Spec>>>,
ledger_pool_interval: Duration,
Expand All @@ -104,7 +149,7 @@ where
blob_processing_timeout: Duration,
blob_sender_channel: Option<broadcast::Sender<BlobExecutionStatus<Da::Spec>>>,
completed_blobs_to_send: Vec<(BlobToSend, BlobInternalId)>,
nb_of_concurrent_blob_submissions: Arc<AtomicUsize>,
in_flight_counts: InFlightBlobCounts,
) -> anyhow::Result<(Self, JoinHandle<()>)> {
Self::new_with_task_intervals(
da,
Expand All @@ -116,7 +161,7 @@ where
blob_sender_channel,
LEDGER_POLL_INTERVAL,
completed_blobs_to_send,
nb_of_concurrent_blob_submissions,
in_flight_counts,
)
.await
}
Expand All @@ -131,7 +176,7 @@ where
blob_sender_channel: Option<broadcast::Sender<BlobExecutionStatus<Da::Spec>>>,
ledger_pool_interval: Duration,
completed_blobs_to_send: Vec<(BlobToSend, BlobInternalId)>,
nb_of_concurrent_blob_submissions: Arc<AtomicUsize>,
in_flight_counts: InFlightBlobCounts,
) -> anyhow::Result<(Self, JoinHandle<()>)> {
let shutdown_receiver = shutdown_sender.subscribe();
let db = Arc::new(BlobSenderDb::new(storage_path).await?);
Expand Down Expand Up @@ -163,7 +208,7 @@ where
shutdown_sender,
da,
finalization_manager,
nb_of_concurrent_blob_submissions,
in_flight_counts,
blob_processing_timeout,
blob_sender_channel,
ledger_pool_interval,
Expand All @@ -177,18 +222,12 @@ where

/// Number of concurrent blob submissions in flight.
pub fn nb_of_concurrent_blob_submissions(&self) -> usize {
self.nb_of_concurrent_blob_submissions
.load(Ordering::Relaxed)
}

/// Returns a handle to the (atomic) number of blob submissions currently in flight.
pub fn nb_of_in_flight_blobs_handle(&self) -> Arc<AtomicUsize> {
self.nb_of_concurrent_blob_submissions.clone()
self.in_flight_counts.total()
}

fn inc_nb_of_concurrent_blob_submissions(&self) {
self.nb_of_concurrent_blob_submissions
.fetch_add(1, Ordering::Relaxed);
/// Returns a handle to the in-flight blob counts.
pub fn in_flight_counts(&self) -> InFlightBlobCounts {
self.in_flight_counts.clone()
}

/// Returns a reference to the [`BlobSenderHooks`] instance.
Expand Down Expand Up @@ -285,7 +324,7 @@ where
db: self.db.clone(),
hooks: self.hooks.clone(),
in_flight_blobs: self.in_flight_blobs.clone(),
nb_of_concurrent_blob_submissions: self.nb_of_concurrent_blob_submissions.clone(),
in_flight_counts: self.in_flight_counts.clone(),
blob_processing_timeout: self.blob_processing_timeout,
ledger_pool_interval: self.ledger_pool_interval,
blob_sender_channel: self.blob_sender_channel.clone(),
Expand All @@ -294,7 +333,7 @@ where

let shutdown_receiver = self.shutdown_receiver.clone();

self.inc_nb_of_concurrent_blob_submissions();
self.in_flight_counts.increment(!is_batch);
let handle = tokio::task::spawn({
let state = task_state;
let blob = blob.clone();
Expand All @@ -314,7 +353,7 @@ where
info!(%blob_id, "BlobSender: Shutting down task");
}
}
state.dec_nb_of_concurrent_blob_submissions();
state.in_flight_counts.decrement(!is_batch);
}
});

Expand Down Expand Up @@ -455,19 +494,14 @@ struct TaskState<Da: DaService, FM: FinalizationManager> {
db: Arc<BlobSenderDb>,
hooks: Arc<dyn BlobSenderHooks<Da = Da::Spec>>,
in_flight_blobs: Arc<Mutex<HashMap<BlobInternalId, InFlightBlob<Da::Spec>>>>,
nb_of_concurrent_blob_submissions: Arc<AtomicUsize>,
in_flight_counts: InFlightBlobCounts,
blob_processing_timeout: Duration,
ledger_pool_interval: Duration,
blob_sender_channel: Option<broadcast::Sender<BlobExecutionStatus<Da::Spec>>>,
shutdown_sender: watch::Sender<()>,
}

impl<Da: DaService, FM: FinalizationManager> TaskState<Da, FM> {
fn dec_nb_of_concurrent_blob_submissions(&self) {
self.nb_of_concurrent_blob_submissions
.fetch_sub(1, Ordering::Relaxed);
}

async fn remove_blob_or_err(&self, blob_id: BlobInternalId) -> anyhow::Result<()> {
let res = self.db.remove(blob_id).await;
if let Err(err) = &res {
Expand Down Expand Up @@ -761,7 +795,7 @@ impl<Da: DaService, FM: FinalizationManager> TaskState<Da, FM> {
match blob_status.blob_selector_status {
Some(BlobSelectorStatus::Accepted)
| Some(BlobSelectorStatus::Discarded(
BlobDiscardReason::SequenceNumberTooLow,
BlobDiscardReason::SequenceNumberTooLow { .. },
)) => {
trace!(%blob_id, %receipt, ?blob_status, "Removing blob form the blob sender");

Expand Down
11 changes: 6 additions & 5 deletions crates/full-node/sov-blob-sender/tests/blob_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use sov_rollup_interface::da::BlobReaderTrait;
use sov_rollup_interface::node::da::DaService;
use sov_rollup_interface::stf::BlobDiscardReason;
use sov_test_utils::logging::LogCollector;
use std::sync::atomic::AtomicUsize;
use tempfile::TempDir;
use tokio::sync::{broadcast, watch, RwLock};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -327,7 +326,10 @@ async fn blobs_with_seq_nr_too_low_are_not_resubmitted() -> anyhow::Result<()> {
Duration::from_secs(20),
&deps,
Some(status_sender),
BlobSelectorStatus::Discarded(BlobDiscardReason::SequenceNumberTooLow),
BlobSelectorStatus::Discarded(BlobDiscardReason::SequenceNumberTooLow {
found: 0,
expected: 1,
}),
)
.await;
let nb_of_blobs = 1;
Expand Down Expand Up @@ -366,7 +368,7 @@ async fn blobs_with_seq_nr_too_low_are_not_resubmitted() -> anyhow::Result<()> {
) && matches!(
status.blob_selector_status,
Some(BlobSelectorStatus::Discarded(
BlobDiscardReason::SequenceNumberTooLow
BlobDiscardReason::SequenceNumberTooLow { .. }
))
)
},
Expand Down Expand Up @@ -496,7 +498,6 @@ async fn create_blob_sender(

let hooks = TestHooks {};

let nb_of_concurrent_blob_submissions = Arc::new(AtomicUsize::new(0));
let (blob_sender, handle) = BlobSender::new_with_task_intervals(
deps.da.clone(),
finalization_manager,
Expand All @@ -507,7 +508,7 @@ async fn create_blob_sender(
blob_status_sender,
Duration::from_millis(1000),
Default::default(),
nb_of_concurrent_blob_submissions,
Default::default(),
)
.await
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ where
let (executor_events_sender, executor_events_receiver) =
ExecutorEventsSender::new(shutdown_sender.clone(), db_cache);

let in_flight_blobs = blob_sender.nb_of_in_flight_blobs();
let in_flight_counts = blob_sender.in_flight_counts();

let (forced_tx_batch_notifier, _) = broadcast::channel(1);
let rollup_exec_config = RollupBlockExecutorConfig {
Expand Down Expand Up @@ -168,7 +168,7 @@ where
shutdown_sender.clone(),
executor_events_sender,
next_sequence_number,
in_flight_blobs,
in_flight_counts,
stop_at_rollup_height,
rollup_exec_config.clone(),
cached_txs.write_handle(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use sov_blob_sender::BlobExecutionStatus;
use sov_blob_sender::{BlobInternalId, BlobSender, BlobToSend};
use sov_blob_sender::{BlobInternalId, BlobSender, BlobToSend, InFlightBlobCounts};
use sov_blob_storage::{PreferredBatchData, PreferredProofData};
use sov_db::ledger_db::LedgerDb;
use sov_modules_api::TxHash;
use sov_rollup_interface::node::da::DaService;
use std::{
path::Path,
sync::{atomic::AtomicUsize, Arc},
};
use std::{path::Path, sync::Arc};
use tokio::sync::broadcast;
use tokio::sync::watch;
use tokio::task::JoinHandle;
Expand All @@ -22,7 +19,7 @@ use crate::{common::TxStatusBlobSenderHooks, TxStatusManager};
/// Wrapper around [`BlobSender`] with preferred blob -specific logic.
pub struct PreferredBlobSender<Da: DaService> {
inner: Option<BlobSender<Da, TxStatusBlobSenderHooks<Da::Spec>, LedgerDb>>,
nb_of_concurrent_blob_submissions: Arc<AtomicUsize>,
in_flight_counts: InFlightBlobCounts,
}

impl<Da: DaService> PreferredBlobSender<Da> {
Expand All @@ -37,12 +34,12 @@ impl<Da: DaService> PreferredBlobSender<Da> {
blobs_sender_channel: broadcast::Sender<BlobExecutionStatus<Da::Spec>>,
seq_role: SequencerRole,
) -> anyhow::Result<(Self, Option<JoinHandle<()>>)> {
let nb_of_concurrent_blob_submissions = Arc::new(AtomicUsize::new(0));
let in_flight_counts = InFlightBlobCounts::default();
match seq_role {
SequencerRole::PgSyncReplica | SequencerRole::DaOnlyReplica => Ok((
Self {
inner: None,
nb_of_concurrent_blob_submissions,
in_flight_counts,
},
None,
)),
Expand All @@ -63,14 +60,14 @@ impl<Da: DaService> PreferredBlobSender<Da> {
blob_processing_timeout,
Some(blobs_sender_channel),
blobs_to_send,
nb_of_concurrent_blob_submissions.clone(),
in_flight_counts.clone(),
)
.await?;

Ok((
Self {
inner: Some(inner),
nb_of_concurrent_blob_submissions,
in_flight_counts,
},
Some(blob_sender_handle),
))
Expand Down Expand Up @@ -132,8 +129,8 @@ impl<Da: DaService> PreferredBlobSender<Da> {
Ok(())
}

pub(crate) fn nb_of_in_flight_blobs(&self) -> Arc<AtomicUsize> {
self.nb_of_concurrent_blob_submissions.clone()
pub(crate) fn in_flight_counts(&self) -> InFlightBlobCounts {
self.in_flight_counts.clone()
}

pub(crate) async fn add_txs(&self, blob_id: BlobInternalId, tx_hashes: Arc<Vec<TxHash>>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use sov_state::pinned_cache::PinnedCache;
use sov_state::{NativeStorage, Storage};
use std::num::NonZero;
use std::ops::Deref;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::sync::{oneshot, watch};
use tracing::{debug, info, warn};
Expand All @@ -48,6 +48,10 @@ const COMFORTABLE_SIZE_LIMIT_MULTIPLIER: u64 = 99;
const COMFORTABLE_SIZE_LIMIT_DIVISOR: u64 = 100;

const COMFORTABLE_IN_FLIGHT_BLOBS: usize = 5;
/// The minimum number of batches that have to be in flight for the sequencer to consider it *un*comfortable to produce a new batch.
/// Without this value, the seuqencer will skip batch production when there are only proofs in flight, and since proofs aren't processed
/// until the next batch arrives this creates a deadlock.
const COMFORTABLE_IN_FLIGHT_BATCHES: usize = 2;

const METRICS_BATCH_SIZE: usize = 32;

Expand Down Expand Up @@ -82,7 +86,7 @@ where
pub(crate) batch_execution_time_limit_micros: u64,
pub(crate) batch_size_tracker: BatchSizeTracker,
pub(crate) is_ready: Result<(), SequencerNotReadyDetails>,
pub(crate) in_flight_blobs: Arc<AtomicUsize>,
pub(crate) in_flight_counts: sov_blob_sender::InFlightBlobCounts,
pub(crate) executor_events_sender: ExecutorEventsSender<S, Rt>,
// We track two sequence numbers: the sequence number of the current open batch, and the next unassigned sequence number.
// This is because we might need to assign a sequence number to some proofs while a batch is in progress,
Expand Down Expand Up @@ -183,7 +187,7 @@ where
Rt: Runtime<S>,
{
pub(crate) fn nb_of_concurrent_blob_submissions(&self) -> usize {
self.in_flight_blobs.load(Ordering::Acquire)
self.in_flight_counts.total()
}

pub(crate) async fn overwrite_next_sequence_number_for_recovery(
Expand Down Expand Up @@ -391,11 +395,15 @@ where
return;
}

let in_flight_blobs = self.in_flight_blobs.load(Ordering::Relaxed);
if in_flight_blobs >= COMFORTABLE_IN_FLIGHT_BLOBS {
tracing::trace!(
let (in_flight_blobs, in_flight_proofs) = self.in_flight_counts.get();
let in_flight_batches = in_flight_blobs.saturating_sub(in_flight_proofs);
if in_flight_blobs >= COMFORTABLE_IN_FLIGHT_BLOBS
&& in_flight_batches >= COMFORTABLE_IN_FLIGHT_BATCHES
{
tracing::info!(
current_in_flight = %in_flight_blobs,
max_comfortable = %COMFORTABLE_IN_FLIGHT_BLOBS,
current_sequence_number_of_open_batch = %self.sequence_number_of_open_batch.unwrap_or(0),
"Skipping batch production due too many in flight blobs");
return;
}
Expand Down
Loading
Loading