Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions minter/src/consolidate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use crate::{
mutate_state, read_state,
},
transaction::{SubmitTransactionError, get_recent_slot_and_blockhash, submit_transaction},
utils::chunks::IntoChunksExt,
};
use canlog::log;
use cksol_types_internal::log::Priority;
use icrc_ledger_types::icrc1::account::Account;
use itertools::Itertools;
use sol_rpc_types::{Lamport, Slot};
use solana_hash::Hash;
use solana_signature::Signature;
Expand All @@ -36,21 +36,20 @@ pub async fn consolidate_deposits<R: CanisterRuntime>(runtime: R) {
Err(_) => return,
};

let all_deposits = read_state(|s| group_deposits_by_account(s.deposits_to_consolidate()));
let more_to_process =
all_deposits.len() > MAX_CONCURRENT_RPC_CALLS * MAX_TRANSFERS_PER_CONSOLIDATION;
let (batches, more_to_process) = read_state(|s| {
let all_deposits = group_deposits_by_account(s.deposits_to_consolidate());
let more_to_process =
all_deposits.len() > MAX_CONCURRENT_RPC_CALLS * MAX_TRANSFERS_PER_CONSOLIDATION;
let batches = all_deposits
.iter()
.into_chunks(MAX_TRANSFERS_PER_CONSOLIDATION)
.take_chunks(MAX_CONCURRENT_RPC_CALLS);
(batches, more_to_process)
});
let reschedule = scopeguard::guard(runtime.clone(), |runtime| {
runtime.set_timer(Duration::ZERO, consolidate_deposits);
});

let batches: Vec<Vec<_>> = all_deposits
.into_iter()
.chunks(MAX_TRANSFERS_PER_CONSOLIDATION)
.into_iter()
.take(MAX_CONCURRENT_RPC_CALLS)
.map(Iterator::collect)
.collect();

if batches.is_empty() {
// Nothing to process
scopeguard::ScopeGuard::into_inner(reschedule);
Expand Down
46 changes: 21 additions & 25 deletions minter/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ use crate::{
SubmitTransactionError, get_recent_slot_and_blockhash, get_signature_statuses,
submit_transaction,
},
utils::chunks::IntoChunksExt,
};
use canlog::log;
use cksol_types_internal::log::Priority;
use ic_cdk_management_canister::SignCallError;
use icrc_ledger_types::icrc1::account::Account;
use itertools::Itertools;
use sol_rpc_types::Slot;
use solana_signature::Signature;
use solana_transaction::Transaction;
Expand All @@ -46,23 +46,28 @@ pub async fn finalize_transactions<R: CanisterRuntime>(runtime: R) {
Err(_) => return,
};

let all_transactions: BTreeMap<Signature, Slot> = read_state(|state| {
state
.submitted_transactions()
.iter()
.map(|(sig, tx)| (*sig, tx.slot))
.collect()
let (batches, more_to_process, slots) = read_state(|state| {
let submitted = state.submitted_transactions();
let more_to_process =
submitted.len() > MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK;
let slots: BTreeMap<Signature, Slot> =
submitted.iter().map(|(sig, tx)| (*sig, tx.slot)).collect();
let batches = slots
.keys()
.into_chunks(MAX_SIGNATURES_PER_STATUS_CHECK)
.take_chunks(MAX_CONCURRENT_RPC_CALLS);
(batches, more_to_process, slots)
});
if all_transactions.is_empty() {
return;
}

let more_to_process =
all_transactions.len() > MAX_CONCURRENT_RPC_CALLS * MAX_SIGNATURES_PER_STATUS_CHECK;
let reschedule = scopeguard::guard(runtime.clone(), |runtime| {
runtime.set_timer(Duration::ZERO, finalize_transactions);
});

if batches.is_empty() {
// Nothing to process
scopeguard::ScopeGuard::into_inner(reschedule);
return;
}

// Fetch the current slot before checking statuses: if a transaction finalizes
// after we snapshot the slot, the status check will see it as finalized rather
// than missing, so it will never be incorrectly marked as expired.
Expand All @@ -74,8 +79,7 @@ pub async fn finalize_transactions<R: CanisterRuntime>(runtime: R) {
}
};

let signatures: Vec<Signature> = all_transactions.keys().copied().collect();
let statuses = check_transaction_statuses(&runtime, signatures).await;
let statuses = check_transaction_statuses(&runtime, batches).await;

for (signature, error) in &statuses.errored {
log!(
Expand Down Expand Up @@ -107,7 +111,7 @@ pub async fn finalize_transactions<R: CanisterRuntime>(runtime: R) {
}

for signature in &statuses.not_found {
if all_transactions[signature] + MAX_BLOCKHASH_AGE < current_slot {
if slots[signature] + MAX_BLOCKHASH_AGE < current_slot {
mutate_state(|state| {
process_event(
state,
Expand Down Expand Up @@ -172,16 +176,8 @@ struct TransactionStatuses {

async fn check_transaction_statuses<R: CanisterRuntime>(
runtime: &R,
signatures: Vec<Signature>,
batches: Vec<Vec<Signature>>,
) -> TransactionStatuses {
let batches: Vec<Vec<_>> = signatures
.into_iter()
.chunks(MAX_SIGNATURES_PER_STATUS_CHECK)
.into_iter()
.take(MAX_CONCURRENT_RPC_CALLS)
.map(Iterator::collect)
.collect();

let mut result = TransactionStatuses {
succeeded: BTreeSet::new(),
errored: BTreeMap::new(),
Expand Down
62 changes: 62 additions & 0 deletions minter/src/utils/chunks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use itertools::Itertools;

#[cfg(test)]
mod tests;

/// A partially-applied chunking operation over an iterator.
///
/// Produced by [`IntoChunksExt::into_chunks`]; call [`take_chunks`] to
/// finish.
///
/// [`take_chunks`]: Chunked::take_chunks
pub struct Chunked<I> {
iter: I,
chunk_size: usize,
}

impl<'a, T, I> Chunked<I>
where
I: Iterator<Item = &'a T>,
T: Clone + 'a,
{
/// Collects at most `max_chunks` chunks, discarding any remaining items.
pub fn take_chunks(self, max_chunks: usize) -> Vec<Vec<T>> {
let chunked = self.iter.chunks(self.chunk_size);
chunked
.into_iter()
.take(max_chunks)
.map(|chunk| chunk.cloned().collect())
.collect()
}
Comment on lines +17 to +30
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current Chunked/IntoChunksExt design only supports Iterator<Item = &T> and builds chunks by cloning (chunk.cloned().collect()). This forces callers with owned data (e.g., Vec<(Account, (Lamport, Vec<...>))>) to iterate by reference and clone just to batch. Consider adding an owning variant/impl that chunks Iterator<Item = T> by moving items, so batch construction can remain allocation-efficient.

Copilot uses AI. Check for mistakes.
}

/// Extends iterators of references with staged chunked collection.
///
/// # Example
///
/// ```ignore
/// let data = vec![1, 2, 3, 4, 5, 6, 7];
/// let chunks = data.iter().into_chunks(3).take_chunks(2);
/// assert_eq!(chunks, vec![vec![1, 2, 3], vec![4, 5, 6]]);
/// ```
pub trait IntoChunksExt<'a, T: 'a>: Sized {
/// Begins a chunked collection with the given chunk size.
///
/// # Panics
///
/// Panics if `chunk_size` is zero.
fn into_chunks(self, chunk_size: usize) -> Chunked<Self>;
}

impl<'a, T: 'a, I> IntoChunksExt<'a, T> for I
where
I: Iterator<Item = &'a T>,
{
fn into_chunks(self, chunk_size: usize) -> Chunked<Self> {
assert!(chunk_size > 0, "chunk_size must be greater than zero");
Chunked {
iter: self,
chunk_size,
}
}
}
Comment on lines +42 to +62
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IntoChunksExt only supports iterators over references and requires cloning items when collecting chunks. That forces call sites to clone even when they already own a Vec<T> and could chunk by value. Consider adding an owned variant (e.g., implement for Iterator<Item = T> returning Vec<Vec<T>> without cloning) to avoid extra copies/allocations in batch-building hot paths.

Copilot uses AI. Check for mistakes.
43 changes: 43 additions & 0 deletions minter/src/utils/chunks/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use super::*;

#[test]
fn should_split_into_chunks_of_given_size() {
let data: Vec<i32> = (1..=7).collect();
let chunks = data.iter().into_chunks(3).take_chunks(10);
assert_eq!(chunks, vec![vec![1, 2, 3], vec![4, 5, 6], vec![7]]);
}

#[test]
fn should_limit_to_max_chunks() {
let data: Vec<i32> = (1..=9).collect();
let chunks = data.iter().into_chunks(3).take_chunks(2);
assert_eq!(chunks, vec![vec![1, 2, 3], vec![4, 5, 6]]);
}

#[test]
fn should_return_empty_for_empty_input() {
let data: Vec<i32> = vec![];
let chunks = data.iter().into_chunks(3).take_chunks(5);
assert!(chunks.is_empty());
}

#[test]
fn should_return_empty_when_max_chunks_is_zero() {
let data: Vec<i32> = (1..=9).collect();
let chunks = data.iter().into_chunks(3).take_chunks(0);
assert!(chunks.is_empty());
}

#[test]
fn should_handle_chunk_size_larger_than_input() {
let data: Vec<i32> = vec![1, 2, 3];
let chunks = data.iter().into_chunks(10).take_chunks(5);
assert_eq!(chunks, vec![vec![1, 2, 3]]);
}

#[test]
#[should_panic(expected = "chunk_size must be greater than zero")]
fn should_panic_on_zero_chunk_size() {
let data: Vec<i32> = vec![1, 2, 3];
let _ = data.iter().into_chunks(0);
}
1 change: 1 addition & 0 deletions minter/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod chunks;
pub mod insertion_ordered_map;
25 changes: 11 additions & 14 deletions minter/src/withdraw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use solana_address::Address;
use canlog::log;
use cksol_types_internal::log::Priority;

use itertools::Itertools;
use sol_rpc_types::Slot;
use solana_hash::Hash;

Expand All @@ -26,6 +25,7 @@ use crate::{
mutate_state, read_state,
},
transaction::{get_recent_slot_and_blockhash, submit_transaction},
utils::chunks::IntoChunksExt,
};

pub const WITHDRAWAL_PROCESSING_DELAY: Duration = Duration::from_mins(1);
Expand Down Expand Up @@ -102,9 +102,10 @@ pub async fn process_pending_withdrawals<R: CanisterRuntime>(runtime: R) {
}
};

let (affordable_requests, num_pending_withdrawals) = read_state(|state| {
let (batches, more_to_process, needs_consolidation) = read_state(|state| {
let mut available_balance = state.balance();
let pending = state.pending_withdrawal_requests();
let num_pending = pending.len();

let affordable: Vec<_> = pending
.values()
Expand All @@ -119,31 +120,27 @@ pub async fn process_pending_withdrawals<R: CanisterRuntime>(runtime: R) {
.map(|t| t.request.clone())
.collect();

(affordable, pending.len())
let more_to_process = affordable.len() > MAX_CONCURRENT_RPC_CALLS * MAX_WITHDRAWALS_PER_TX;
let needs_consolidation = affordable.len() < num_pending;
let batches = affordable
.iter()
.into_chunks(MAX_WITHDRAWALS_PER_TX)
.take_chunks(MAX_CONCURRENT_RPC_CALLS);
(batches, more_to_process, needs_consolidation)
Comment on lines +125 to +129
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

affordable is already built by cloning requests from state, and then affordable.iter().into_chunks(...).take_chunks(...) clones each WithdrawalRequest again when building batches. This adds avoidable allocations/copies for every processed request. Consider chunking the owned affordable vector by value (moving items into batches) or extending the chunking utility to support Iterator<Item = T> so these batches can be built without a second clone.

Copilot uses AI. Check for mistakes.
});

if affordable_requests.len() < num_pending_withdrawals {
if needs_consolidation {
log!(
Priority::Info,
"Insufficient minter balance for some withdrawal requests, scheduling consolidation"
);
runtime.set_timer(Duration::ZERO, consolidate_deposits);
}

let more_to_process =
affordable_requests.len() > MAX_CONCURRENT_RPC_CALLS * MAX_WITHDRAWALS_PER_TX;
let reschedule = scopeguard::guard(runtime.clone(), |runtime| {
runtime.set_timer(Duration::ZERO, process_pending_withdrawals);
});

let batches: Vec<Vec<_>> = affordable_requests
.into_iter()
.chunks(MAX_WITHDRAWALS_PER_TX)
.into_iter()
.take(MAX_CONCURRENT_RPC_CALLS)
.map(Iterator::collect)
.collect();

if batches.is_empty() {
// Nothing to process
scopeguard::ScopeGuard::into_inner(reschedule);
Expand Down
Loading