Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 64 additions & 8 deletions rust/src/manager/wallet_manager/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub struct WalletActor {

seed: u64,
transaction_watchers: HashMap<Txid, Addr<TransactionWatcher>>,
scan_task: Option<tokio::task::AbortHandle>,
scan_generation: u64,

// cached values, source of truth is the redb database saved with wallet metadata
last_scan_finished: Option<Duration>,
Expand Down Expand Up @@ -174,6 +176,8 @@ impl WalletActor {
last_height_fetched: None,
state: ActorState::Initial,
transaction_watchers: HashMap::default(),
scan_task: None,
scan_generation: 0,
db,
})
}
Expand Down Expand Up @@ -937,8 +941,12 @@ impl WalletActor {

pub async fn stop_all_scans(&mut self) {
debug!("stop_all_scans");
if let Some(handle) = self.scan_task.take() {
Comment thread
Sandipmandal25 marked this conversation as resolved.
handle.abort();
}
self.scan_generation += 1;
self.state = ActorState::Initial;
self.transaction_watchers = HashMap::default();
Comment thread
Sandipmandal25 marked this conversation as resolved.
// TODO: stop the wallet scans too, need to save the task handle when we start the scan
}
Comment thread
Sandipmandal25 marked this conversation as resolved.

async fn remove_watcher_for_txn(&mut self, tx_id: Txid) {
Expand All @@ -947,6 +955,12 @@ impl WalletActor {
}

pub async fn perform_scan_for_single_tx_id(&mut self, tx_id: Txid) -> ActorResult<()> {
// if stop_all_scans was called, watchers are cleared — skip the scan
if !self.transaction_watchers.contains_key(&tx_id) {
debug!("skipping single tx scan, watcher already removed for {tx_id}");
return Produces::ok(());
}

let start = UNIX_EPOCH.elapsed().unwrap().as_secs();
let _ = self.update_height().await?.await;

Expand All @@ -962,14 +976,15 @@ impl WalletActor {
debug!("done scan for spk in {}s", now - start);

let addr = self.addr.clone();
let generation = self.scan_generation;
self.addr.send_fut(async move {
let scan_result = node_client.sync(&graph, sync_request).await;

let now = UNIX_EPOCH.elapsed().unwrap().as_secs();
debug!("done single txn id sync scan in {}s", now - start);

// save updated txns and send to frontend
send!(addr.update_sync_state_and_send_transactions(scan_result));
send!(addr.update_sync_state_and_send_transactions(scan_result, generation));
});

Produces::ok(())
Expand Down Expand Up @@ -1150,7 +1165,8 @@ impl WalletActor {
let (full_scan_request, graph, node_client) = self.get_for_full_scan().await?;

let addr = self.addr.clone();
self.addr.send_fut(async move {
let generation = self.scan_generation;
let handle = cove_tokio::task::spawn(async move {
let start = UNIX_EPOCH.elapsed().unwrap().as_secs();

let full_scan_result = node_client
Expand All @@ -1161,11 +1177,16 @@ impl WalletActor {
debug!("[initial] done initial full scan in {}s", now - start);

// update wallet state
let _ = call!(addr.handle_full_scan_complete(full_scan_result, FULL_SCAN_TYPE)).await;
let _ =
call!(addr.handle_full_scan_complete(full_scan_result, FULL_SCAN_TYPE, generation))
.await;

// perform next scan
send!(addr.maybe_perform_expanded_full_scan());
});
if let Some(old) = self.scan_task.replace(handle.abort_handle()) {
old.abort();
}

Produces::ok(())
}
Expand All @@ -1177,7 +1198,8 @@ impl WalletActor {
let (full_scan_request, graph, node_client) = self.get_for_full_scan().await?;

let addr = self.addr.clone();
self.addr.send_fut(async move {
let generation = self.scan_generation;
let handle = cove_tokio::task::spawn(async move {
let start = UNIX_EPOCH.elapsed().unwrap().as_secs();

let full_scan_result = node_client
Expand All @@ -1188,8 +1210,11 @@ impl WalletActor {
debug!("[expanded] done expanded full scan in {}s", now - start);

// update wallet state
send!(addr.handle_full_scan_complete(full_scan_result, FULL_SCAN_TYPE));
send!(addr.handle_full_scan_complete(full_scan_result, FULL_SCAN_TYPE, generation));
});
if let Some(old) = self.scan_task.replace(handle.abort_handle()) {
old.abort();
}

Produces::ok(())
}
Expand Down Expand Up @@ -1217,16 +1242,20 @@ impl WalletActor {
let graph = self.wallet.bdk.tx_graph().clone();

let addr = self.addr.clone();
self.addr.send_fut(async move {
let generation = self.scan_generation;
let handle = cove_tokio::task::spawn(async move {
let scan_result =
node_client.start_wallet_scan(&graph, full_scan_request, GAP_LIMIT as usize).await;

let now = UNIX_EPOCH.elapsed().unwrap().as_secs();
debug!("done incremental scan in {}s", now - start);

// update wallet state
send!(addr.handle_incremental_scan_complete(scan_result));
send!(addr.handle_incremental_scan_complete(scan_result, generation));
});
if let Some(old) = self.scan_task.replace(handle.abort_handle()) {
old.abort();
}

Produces::ok(())
}
Expand All @@ -1235,7 +1264,13 @@ impl WalletActor {
&mut self,
full_scan_result: Result<FullScanResponse<KeychainKind>, crate::node::client::Error>,
full_scan_type: FullScanType,
generation: u64,
) -> ActorResult<()> {
if generation != self.scan_generation {
debug!("dropping stale full scan result (gen {generation} != {})", self.scan_generation);
return Produces::ok(());
}

debug!("applying full scan result for {full_scan_type:?}");

match full_scan_result {
Expand Down Expand Up @@ -1269,7 +1304,16 @@ impl WalletActor {
async fn handle_incremental_scan_complete(
&mut self,
scan_result: Result<FullScanResponse<KeychainKind>, crate::node::client::Error>,
generation: u64,
) -> ActorResult<()> {
if generation != self.scan_generation {
debug!(
"dropping stale incremental scan result (gen {generation} != {})",
self.scan_generation
);
return Produces::ok(());
}

if scan_result.is_err() {
self.state = ActorState::FailedIncrementalScan;
}
Expand All @@ -1287,7 +1331,16 @@ impl WalletActor {
async fn update_sync_state_and_send_transactions(
&mut self,
scan_result: Result<SyncResponse, crate::node::client::Error>,
generation: u64,
) -> ActorResult<()> {
if generation != self.scan_generation {
debug!(
"dropping stale single tx scan result (gen {generation} != {})",
self.scan_generation
);
return Produces::ok(());
}

if scan_result.is_err() {
self.state = ActorState::FailedSyncScan;
}
Expand Down Expand Up @@ -1438,6 +1491,9 @@ impl WalletActor {
impl Drop for WalletActor {
fn drop(&mut self) {
debug!("[DROP] Wallet Actor for {}", self.wallet.id);
if let Some(handle) = self.scan_task.take() {
handle.abort();
}
}
}

Expand Down