diff --git a/minter/src/consolidate/mod.rs b/minter/src/consolidate/mod.rs index 3f96cb94..e4b87f35 100644 --- a/minter/src/consolidate/mod.rs +++ b/minter/src/consolidate/mod.rs @@ -11,11 +11,11 @@ use crate::{ mutate_state, read_state, }, transaction::{SubmitTransactionError, get_recent_slot_and_blockhash, submit_transaction}, + utils::chunks::IntoChunksExt, }; use canlog::log; use cksol_types_internal::log::Priority; use icrc_ledger_types::icrc1::account::Account; -use itertools::Itertools; use sol_rpc_types::{Lamport, Slot}; use solana_hash::Hash; use solana_signature::Signature; @@ -36,21 +36,20 @@ pub async fn consolidate_deposits(runtime: R) { Err(_) => return, }; - let all_deposits = read_state(|s| group_deposits_by_account(s.deposits_to_consolidate())); - let more_to_process = - all_deposits.len() > MAX_CONCURRENT_RPC_CALLS * MAX_TRANSFERS_PER_CONSOLIDATION; + let (batches, more_to_process) = read_state(|s| { + let all_deposits = group_deposits_by_account(s.deposits_to_consolidate()); + let more_to_process = + all_deposits.len() > MAX_CONCURRENT_RPC_CALLS * MAX_TRANSFERS_PER_CONSOLIDATION; + let batches = all_deposits + .iter() + .into_chunks(MAX_TRANSFERS_PER_CONSOLIDATION) + .take_chunks(MAX_CONCURRENT_RPC_CALLS); + (batches, more_to_process) + }); let reschedule = scopeguard::guard(runtime.clone(), |runtime| { runtime.set_timer(Duration::ZERO, consolidate_deposits); }); - let batches: Vec> = all_deposits - .into_iter() - .chunks(MAX_TRANSFERS_PER_CONSOLIDATION) - .into_iter() - .take(MAX_CONCURRENT_RPC_CALLS) - .map(Iterator::collect) - .collect(); - if batches.is_empty() { // Nothing to process scopeguard::ScopeGuard::into_inner(reschedule); diff --git a/minter/src/monitor/mod.rs b/minter/src/monitor/mod.rs index b42e1a23..c8c49337 100644 --- a/minter/src/monitor/mod.rs +++ b/minter/src/monitor/mod.rs @@ -14,12 +14,12 @@ use crate::{ SubmitTransactionError, get_recent_slot_and_blockhash, get_signature_statuses, submit_transaction, }, + utils::chunks::IntoChunksExt, }; use canlog::log; use cksol_types_internal::log::Priority; use ic_cdk_management_canister::SignCallError; use icrc_ledger_types::icrc1::account::Account; -use itertools::Itertools; use sol_rpc_types::Slot; use solana_signature::Signature; use solana_transaction::Transaction; @@ -46,23 +46,28 @@ pub async fn finalize_transactions(runtime: R) { Err(_) => return, }; - let all_transactions: BTreeMap = read_state(|state| { - state - .submitted_transactions() - .iter() - .map(|(sig, tx)| (*sig, tx.slot)) - .collect() + let (batches, more_to_process, slots) = read_state(|state| { + let submitted = state.submitted_transactions(); + let more_to_process = + submitted.len() > MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK; + let slots: BTreeMap = + submitted.iter().map(|(sig, tx)| (*sig, tx.slot)).collect(); + let batches = slots + .keys() + .into_chunks(MAX_SIGNATURES_PER_STATUS_CHECK) + .take_chunks(MAX_CONCURRENT_RPC_CALLS); + (batches, more_to_process, slots) }); - if all_transactions.is_empty() { - return; - } - - let more_to_process = - all_transactions.len() > MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK; let reschedule = scopeguard::guard(runtime.clone(), |runtime| { runtime.set_timer(Duration::ZERO, finalize_transactions); }); + if batches.is_empty() { + // Nothing to process + scopeguard::ScopeGuard::into_inner(reschedule); + return; + } + // Fetch the current slot before checking statuses: if a transaction finalizes // after we snapshot the slot, the status check will see it as finalized rather // than missing, so it will never be incorrectly marked as expired. @@ -74,8 +79,7 @@ pub async fn finalize_transactions(runtime: R) { } }; - let signatures: Vec = all_transactions.keys().copied().collect(); - let statuses = check_transaction_statuses(&runtime, signatures).await; + let statuses = check_transaction_statuses(&runtime, batches).await; for (signature, error) in &statuses.errored { log!( @@ -107,7 +111,7 @@ pub async fn finalize_transactions(runtime: R) { } for signature in &statuses.not_found { - if all_transactions[signature] + MAX_BLOCKHASH_AGE < current_slot { + if slots[signature] + MAX_BLOCKHASH_AGE < current_slot { mutate_state(|state| { process_event( state, @@ -172,16 +176,8 @@ struct TransactionStatuses { async fn check_transaction_statuses( runtime: &R, - signatures: Vec, + batches: Vec>, ) -> TransactionStatuses { - let batches: Vec> = signatures - .into_iter() - .chunks(MAX_SIGNATURES_PER_STATUS_CHECK) - .into_iter() - .take(MAX_CONCURRENT_RPC_CALLS) - .map(Iterator::collect) - .collect(); - let mut result = TransactionStatuses { succeeded: BTreeSet::new(), errored: BTreeMap::new(), diff --git a/minter/src/utils/chunks/mod.rs b/minter/src/utils/chunks/mod.rs new file mode 100644 index 00000000..03b0ec05 --- /dev/null +++ b/minter/src/utils/chunks/mod.rs @@ -0,0 +1,62 @@ +use itertools::Itertools; + +#[cfg(test)] +mod tests; + +/// A partially-applied chunking operation over an iterator. +/// +/// Produced by [`IntoChunksExt::into_chunks`]; call [`take_chunks`] to +/// finish. +/// +/// [`take_chunks`]: Chunked::take_chunks +pub struct Chunked { + iter: I, + chunk_size: usize, +} + +impl<'a, T, I> Chunked +where + I: Iterator, + T: Clone + 'a, +{ + /// Collects at most `max_chunks` chunks, discarding any remaining items. + pub fn take_chunks(self, max_chunks: usize) -> Vec> { + let chunked = self.iter.chunks(self.chunk_size); + chunked + .into_iter() + .take(max_chunks) + .map(|chunk| chunk.cloned().collect()) + .collect() + } +} + +/// Extends iterators of references with staged chunked collection. +/// +/// # Example +/// +/// ```ignore +/// let data = vec![1, 2, 3, 4, 5, 6, 7]; +/// let chunks = data.iter().into_chunks(3).take_chunks(2); +/// assert_eq!(chunks, vec![vec![1, 2, 3], vec![4, 5, 6]]); +/// ``` +pub trait IntoChunksExt<'a, T: 'a>: Sized { + /// Begins a chunked collection with the given chunk size. + /// + /// # Panics + /// + /// Panics if `chunk_size` is zero. + fn into_chunks(self, chunk_size: usize) -> Chunked; +} + +impl<'a, T: 'a, I> IntoChunksExt<'a, T> for I +where + I: Iterator, +{ + fn into_chunks(self, chunk_size: usize) -> Chunked { + assert!(chunk_size > 0, "chunk_size must be greater than zero"); + Chunked { + iter: self, + chunk_size, + } + } +} diff --git a/minter/src/utils/chunks/tests.rs b/minter/src/utils/chunks/tests.rs new file mode 100644 index 00000000..dea68db5 --- /dev/null +++ b/minter/src/utils/chunks/tests.rs @@ -0,0 +1,43 @@ +use super::*; + +#[test] +fn should_split_into_chunks_of_given_size() { + let data: Vec = (1..=7).collect(); + let chunks = data.iter().into_chunks(3).take_chunks(10); + assert_eq!(chunks, vec![vec![1, 2, 3], vec![4, 5, 6], vec![7]]); +} + +#[test] +fn should_limit_to_max_chunks() { + let data: Vec = (1..=9).collect(); + let chunks = data.iter().into_chunks(3).take_chunks(2); + assert_eq!(chunks, vec![vec![1, 2, 3], vec![4, 5, 6]]); +} + +#[test] +fn should_return_empty_for_empty_input() { + let data: Vec = vec![]; + let chunks = data.iter().into_chunks(3).take_chunks(5); + assert!(chunks.is_empty()); +} + +#[test] +fn should_return_empty_when_max_chunks_is_zero() { + let data: Vec = (1..=9).collect(); + let chunks = data.iter().into_chunks(3).take_chunks(0); + assert!(chunks.is_empty()); +} + +#[test] +fn should_handle_chunk_size_larger_than_input() { + let data: Vec = vec![1, 2, 3]; + let chunks = data.iter().into_chunks(10).take_chunks(5); + assert_eq!(chunks, vec![vec![1, 2, 3]]); +} + +#[test] +#[should_panic(expected = "chunk_size must be greater than zero")] +fn should_panic_on_zero_chunk_size() { + let data: Vec = vec![1, 2, 3]; + let _ = data.iter().into_chunks(0); +} diff --git a/minter/src/utils/mod.rs b/minter/src/utils/mod.rs index cc87b6c5..a8fd6e42 100644 --- a/minter/src/utils/mod.rs +++ b/minter/src/utils/mod.rs @@ -1 +1,2 @@ +pub mod chunks; pub mod insertion_ordered_map; diff --git a/minter/src/withdraw/mod.rs b/minter/src/withdraw/mod.rs index 122d9c34..1745f651 100644 --- a/minter/src/withdraw/mod.rs +++ b/minter/src/withdraw/mod.rs @@ -8,7 +8,6 @@ use solana_address::Address; use canlog::log; use cksol_types_internal::log::Priority; -use itertools::Itertools; use sol_rpc_types::Slot; use solana_hash::Hash; @@ -26,6 +25,7 @@ use crate::{ mutate_state, read_state, }, transaction::{get_recent_slot_and_blockhash, submit_transaction}, + utils::chunks::IntoChunksExt, }; pub const WITHDRAWAL_PROCESSING_DELAY: Duration = Duration::from_mins(1); @@ -102,9 +102,10 @@ pub async fn process_pending_withdrawals(runtime: R) { } }; - let (affordable_requests, num_pending_withdrawals) = read_state(|state| { + let (batches, more_to_process, needs_consolidation) = read_state(|state| { let mut available_balance = state.balance(); let pending = state.pending_withdrawal_requests(); + let num_pending = pending.len(); let affordable: Vec<_> = pending .values() @@ -119,10 +120,16 @@ pub async fn process_pending_withdrawals(runtime: R) { .map(|t| t.request.clone()) .collect(); - (affordable, pending.len()) + let more_to_process = affordable.len() > MAX_CONCURRENT_RPC_CALLS * MAX_WITHDRAWALS_PER_TX; + let needs_consolidation = affordable.len() < num_pending; + let batches = affordable + .iter() + .into_chunks(MAX_WITHDRAWALS_PER_TX) + .take_chunks(MAX_CONCURRENT_RPC_CALLS); + (batches, more_to_process, needs_consolidation) }); - if affordable_requests.len() < num_pending_withdrawals { + if needs_consolidation { log!( Priority::Info, "Insufficient minter balance for some withdrawal requests, scheduling consolidation" @@ -130,20 +137,10 @@ pub async fn process_pending_withdrawals(runtime: R) { runtime.set_timer(Duration::ZERO, consolidate_deposits); } - let more_to_process = - affordable_requests.len() > MAX_CONCURRENT_RPC_CALLS * MAX_WITHDRAWALS_PER_TX; let reschedule = scopeguard::guard(runtime.clone(), |runtime| { runtime.set_timer(Duration::ZERO, process_pending_withdrawals); }); - let batches: Vec> = affordable_requests - .into_iter() - .chunks(MAX_WITHDRAWALS_PER_TX) - .into_iter() - .take(MAX_CONCURRENT_RPC_CALLS) - .map(Iterator::collect) - .collect(); - if batches.is_empty() { // Nothing to process scopeguard::ScopeGuard::into_inner(reschedule);