diff --git a/rust/src/manager/wallet_manager/actor.rs b/rust/src/manager/wallet_manager/actor.rs index b8cd0efe2..586ad1361 100644 --- a/rust/src/manager/wallet_manager/actor.rs +++ b/rust/src/manager/wallet_manager/actor.rs @@ -71,6 +71,8 @@ pub struct WalletActor { seed: u64, transaction_watchers: HashMap>, + scan_task: Option, + scan_generation: u64, // cached values, source of truth is the redb database saved with wallet metadata last_scan_finished: Option, @@ -174,6 +176,8 @@ impl WalletActor { last_height_fetched: None, state: ActorState::Initial, transaction_watchers: HashMap::default(), + scan_task: None, + scan_generation: 0, db, }) } @@ -937,8 +941,12 @@ impl WalletActor { pub async fn stop_all_scans(&mut self) { debug!("stop_all_scans"); + if let Some(handle) = self.scan_task.take() { + handle.abort(); + } + self.scan_generation += 1; + self.state = ActorState::Initial; self.transaction_watchers = HashMap::default(); - // TODO: stop the wallet scans too, need to save the task handle when we start the scan } async fn remove_watcher_for_txn(&mut self, tx_id: Txid) { @@ -947,6 +955,12 @@ impl WalletActor { } pub async fn perform_scan_for_single_tx_id(&mut self, tx_id: Txid) -> ActorResult<()> { + // if stop_all_scans was called, watchers are cleared — skip the scan + if !self.transaction_watchers.contains_key(&tx_id) { + debug!("skipping single tx scan, watcher already removed for {tx_id}"); + return Produces::ok(()); + } + let start = UNIX_EPOCH.elapsed().unwrap().as_secs(); let _ = self.update_height().await?.await; @@ -962,6 +976,7 @@ impl WalletActor { debug!("done scan for spk in {}s", now - start); let addr = self.addr.clone(); + let generation = self.scan_generation; self.addr.send_fut(async move { let scan_result = node_client.sync(&graph, sync_request).await; @@ -969,7 +984,7 @@ impl WalletActor { debug!("done single txn id sync scan in {}s", now - start); // save updated txns and send to frontend - send!(addr.update_sync_state_and_send_transactions(scan_result)); + send!(addr.update_sync_state_and_send_transactions(scan_result, generation)); }); Produces::ok(()) @@ -1150,7 +1165,8 @@ impl WalletActor { let (full_scan_request, graph, node_client) = self.get_for_full_scan().await?; let addr = self.addr.clone(); - self.addr.send_fut(async move { + let generation = self.scan_generation; + let handle = cove_tokio::task::spawn(async move { let start = UNIX_EPOCH.elapsed().unwrap().as_secs(); let full_scan_result = node_client @@ -1161,11 +1177,16 @@ impl WalletActor { debug!("[initial] done initial full scan in {}s", now - start); // update wallet state - let _ = call!(addr.handle_full_scan_complete(full_scan_result, FULL_SCAN_TYPE)).await; + let _ = + call!(addr.handle_full_scan_complete(full_scan_result, FULL_SCAN_TYPE, generation)) + .await; // perform next scan send!(addr.maybe_perform_expanded_full_scan()); }); + if let Some(old) = self.scan_task.replace(handle.abort_handle()) { + old.abort(); + } Produces::ok(()) } @@ -1177,7 +1198,8 @@ impl WalletActor { let (full_scan_request, graph, node_client) = self.get_for_full_scan().await?; let addr = self.addr.clone(); - self.addr.send_fut(async move { + let generation = self.scan_generation; + let handle = cove_tokio::task::spawn(async move { let start = UNIX_EPOCH.elapsed().unwrap().as_secs(); let full_scan_result = node_client @@ -1188,8 +1210,11 @@ impl WalletActor { debug!("[expanded] done expanded full scan in {}s", now - start); // update wallet state - send!(addr.handle_full_scan_complete(full_scan_result, FULL_SCAN_TYPE)); + send!(addr.handle_full_scan_complete(full_scan_result, FULL_SCAN_TYPE, generation)); }); + if let Some(old) = self.scan_task.replace(handle.abort_handle()) { + old.abort(); + } Produces::ok(()) } @@ -1217,7 +1242,8 @@ impl WalletActor { let graph = self.wallet.bdk.tx_graph().clone(); let addr = self.addr.clone(); - self.addr.send_fut(async move { + let generation = self.scan_generation; + let handle = cove_tokio::task::spawn(async move { let scan_result = node_client.start_wallet_scan(&graph, full_scan_request, GAP_LIMIT as usize).await; @@ -1225,8 +1251,11 @@ impl WalletActor { debug!("done incremental scan in {}s", now - start); // update wallet state - send!(addr.handle_incremental_scan_complete(scan_result)); + send!(addr.handle_incremental_scan_complete(scan_result, generation)); }); + if let Some(old) = self.scan_task.replace(handle.abort_handle()) { + old.abort(); + } Produces::ok(()) } @@ -1235,7 +1264,13 @@ impl WalletActor { &mut self, full_scan_result: Result, crate::node::client::Error>, full_scan_type: FullScanType, + generation: u64, ) -> ActorResult<()> { + if generation != self.scan_generation { + debug!("dropping stale full scan result (gen {generation} != {})", self.scan_generation); + return Produces::ok(()); + } + debug!("applying full scan result for {full_scan_type:?}"); match full_scan_result { @@ -1269,7 +1304,16 @@ impl WalletActor { async fn handle_incremental_scan_complete( &mut self, scan_result: Result, crate::node::client::Error>, + generation: u64, ) -> ActorResult<()> { + if generation != self.scan_generation { + debug!( + "dropping stale incremental scan result (gen {generation} != {})", + self.scan_generation + ); + return Produces::ok(()); + } + if scan_result.is_err() { self.state = ActorState::FailedIncrementalScan; } @@ -1287,7 +1331,16 @@ impl WalletActor { async fn update_sync_state_and_send_transactions( &mut self, scan_result: Result, + generation: u64, ) -> ActorResult<()> { + if generation != self.scan_generation { + debug!( + "dropping stale single tx scan result (gen {generation} != {})", + self.scan_generation + ); + return Produces::ok(()); + } + if scan_result.is_err() { self.state = ActorState::FailedSyncScan; } @@ -1438,6 +1491,9 @@ impl WalletActor { impl Drop for WalletActor { fn drop(&mut self) { debug!("[DROP] Wallet Actor for {}", self.wallet.id); + if let Some(handle) = self.scan_task.take() { + handle.abort(); + } } }