diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs index cf6b81de5..2bf3695de 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs @@ -64,7 +64,10 @@ use self::{ RefreshDecision, ResolvedDelegatedAccounts, ResolvedPrograms, }, }; -use super::errors::{ChainlinkError, ChainlinkResult}; +use super::{ + errors::{ChainlinkError, ChainlinkResult}, + should_schedule_bank_eviction, +}; use crate::{ chainlink::{ account_still_undelegating_on_chain::account_still_undelegating_on_chain, @@ -235,10 +238,15 @@ where .map(|account| CapacityEvictionProtection { delegated: account.delegated(), undelegating: account.undelegating(), + // Cloned base accounts are not ephemeral until after a successful evict + // so this won’t block legitimate cloned-account eviction + ephemeral: account.ephemeral() + && account.owner() != &Pubkey::default(), }) .unwrap_or(CapacityEvictionProtection { delegated: false, undelegating: false, + ephemeral: false, }) }, ); @@ -679,7 +687,11 @@ where update_slot, "Dropping subscription update for account that is no longer watched" ); - if self.accounts_bank.get_account(&pubkey).is_some() { + if self + .accounts_bank + .get_account(&pubkey) + .is_some_and(|account| should_schedule_bank_eviction(&account)) + { if let Err(err) = self .remote_account_provider .send_removal_update(pubkey) diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs index 46d6e71a4..1f0a97428 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs @@ -1591,6 +1591,218 @@ async fn test_undelegation_tracking_window_is_protected_from_capacity_eviction() assert!(remote_account_provider.is_watching(&tracking_pubkey)); } +#[tokio::test] +async fn test_ephemeral_account_is_protected_from_capacity_eviction() { + init_logger(); + let validator_keypair = Keypair::new(); + const CURRENT_SLOT: u64 = 100; + + let protected_pubkey = random_pubkey(); + let evictable_pubkey = random_pubkey(); + let new_pubkey = random_pubkey(); + let account_owner = random_pubkey(); + + let remote_account = Account { + lamports: 1_000_000, + data: vec![1, 2, 3, 4], + owner: account_owner, + executable: false, + rent_epoch: 0, + }; + + let FetcherTestCtx { + remote_account_provider, + accounts_bank, + .. + } = setup_with_capacity( + [ + (protected_pubkey, remote_account.clone()), + (evictable_pubkey, remote_account.clone()), + (new_pubkey, remote_account), + ], + CURRENT_SLOT, + validator_keypair.insecure_clone(), + 2, + ) + .await; + let mut removed_rx = remote_account_provider + .try_get_removed_account_rx() + .expect("removed account receiver should be available"); + + remote_account_provider + .acquire_subscription( + &protected_pubkey, + SubscriptionReason::DirectAccount, + ) + .await + .expect("failed to subscribe protected candidate"); + remote_account_provider + .acquire_subscription( + &evictable_pubkey, + SubscriptionReason::DirectAccount, + ) + .await + .expect("failed to subscribe evictable candidate"); + + let mut ephemeral_account = + AccountSharedData::new(1_000_000, 4, &account_owner); + ephemeral_account.set_ephemeral(true); + accounts_bank.insert(protected_pubkey, ephemeral_account); + + remote_account_provider + .acquire_subscription(&new_pubkey, SubscriptionReason::DirectAccount) + .await + .expect("should evict the non-protected LRU candidate"); + + assert!(remote_account_provider.is_watching(&protected_pubkey)); + assert!(remote_account_provider.is_watching(&new_pubkey)); + assert!(!remote_account_provider.is_watching(&evictable_pubkey)); + assert!(remote_account_provider + .pubsub_client() + .subscriptions_union() + .contains(&protected_pubkey)); + assert!(!remote_account_provider + .pubsub_client() + .subscriptions_union() + .contains(&evictable_pubkey)); + assert_eq!(removed_rx.try_recv().unwrap(), evictable_pubkey); +} + +#[tokio::test] +async fn test_acquire_subscription_skips_ephemeral_account() { + init_logger(); + let validator_keypair = Keypair::new(); + const CURRENT_SLOT: u64 = 100; + + let ephemeral_pubkey = random_pubkey(); + let account_owner = random_pubkey(); + let remote_account = Account { + lamports: 1_000_000, + data: vec![1, 2, 3, 4], + owner: account_owner, + executable: false, + rent_epoch: 0, + }; + + let FetcherTestCtx { + remote_account_provider, + accounts_bank, + .. + } = setup( + [(ephemeral_pubkey, remote_account)], + CURRENT_SLOT, + validator_keypair, + ) + .await; + + let mut ephemeral_account = + AccountSharedData::new(1_000_000, 4, &account_owner); + ephemeral_account.set_ephemeral(true); + accounts_bank.insert(ephemeral_pubkey, ephemeral_account); + + remote_account_provider + .acquire_subscription( + &ephemeral_pubkey, + SubscriptionReason::DirectAccount, + ) + .await + .expect("ephemeral subscription should no-op successfully"); + + assert!(!remote_account_provider.is_watching(&ephemeral_pubkey)); + assert!(!remote_account_provider + .pubsub_client() + .subscriptions_union() + .contains(&ephemeral_pubkey)); +} + +#[tokio::test] +async fn test_stale_subscription_update_does_not_notify_removal_for_ephemeral() +{ + init_logger(); + let validator_keypair = Keypair::new(); + const CURRENT_SLOT: u64 = 100; + + let ephemeral_pubkey = random_pubkey(); + let normal_pubkey = random_pubkey(); + let account_owner = random_pubkey(); + + let FetcherTestCtx { + fetch_cloner, + accounts_bank, + subscription_tx, + .. + } = setup([], CURRENT_SLOT, validator_keypair).await; + + let mut ephemeral_account = + AccountSharedData::new(1_000_000, 4, &account_owner); + ephemeral_account.set_ephemeral(true); + accounts_bank.insert(ephemeral_pubkey, ephemeral_account); + + let normal_account = AccountSharedData::new(1_000_000, 4, &account_owner); + accounts_bank.insert(normal_pubkey, normal_account); + + let mut removed_rx = fetch_cloner + .try_get_removed_account_rx() + .expect("removed account receiver should be available"); + while removed_rx.try_recv().is_ok() {} + + use crate::remote_account_provider::{ + RemoteAccount, RemoteAccountUpdateSource, + }; + + let chain_update = Account { + lamports: 900_000, + data: vec![9, 9, 9, 9], + owner: account_owner, + executable: false, + rent_epoch: 0, + }; + + subscription_tx + .send(ForwardedSubscriptionUpdate { + pubkey: ephemeral_pubkey, + account: RemoteAccount::from_fresh_account( + chain_update.clone(), + CURRENT_SLOT, + RemoteAccountUpdateSource::Subscription, + ), + source: SubscriptionSource::Account, + }) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_millis(50)).await; + assert!(matches!( + removed_rx.try_recv(), + Err(tokio::sync::mpsc::error::TryRecvError::Empty) + )); + + subscription_tx + .send(ForwardedSubscriptionUpdate { + pubkey: normal_pubkey, + account: RemoteAccount::from_fresh_account( + chain_update, + CURRENT_SLOT, + RemoteAccountUpdateSource::Subscription, + ), + source: SubscriptionSource::Account, + }) + .await + .unwrap(); + + let removed_pubkey = tokio::time::timeout(Duration::from_secs(1), async { + loop { + if let Ok(pubkey) = removed_rx.try_recv() { + return pubkey; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("normal account should enqueue removal notification"); + assert_eq!(removed_pubkey, normal_pubkey); +} + #[tokio::test] async fn test_delegated_cleanup_keeps_undelegation_tracking_subscription() { init_logger(); diff --git a/magicblock-chainlink/src/chainlink/mod.rs b/magicblock-chainlink/src/chainlink/mod.rs index a70103a75..a7b9029a1 100644 --- a/magicblock-chainlink/src/chainlink/mod.rs +++ b/magicblock-chainlink/src/chainlink/mod.rs @@ -188,6 +188,10 @@ impl } } +pub fn should_schedule_bank_eviction(account: &AccountSharedData) -> bool { + !account.ephemeral() && !account.delegated() && !account.undelegating() +} + impl InnerChainlink { @@ -293,14 +297,13 @@ impl // the overhead of building and submitting a doomed tx) let should_evict = match accounts_bank.get_account(&pubkey) { Some(account) => { - let undelegating = account.undelegating(); - let delegated = account.delegated(); - let evict = !undelegating && !delegated; + let evict = should_schedule_bank_eviction(&account); if !evict { trace!( pubkey = %pubkey, - undelegating, - delegated, + ephemeral = account.ephemeral(), + undelegating = account.undelegating(), + delegated = account.delegated(), owner = %account.owner(), "Ignoring removal notification because bank \ state is protected; no EvictAccount \ @@ -311,9 +314,8 @@ impl } None => false, }; - // Skipping a delegated/undelegating LRU candidate is not a - // removal event; protected bank state must not be translated - // into a downstream bank eviction. + // Skipping protected bank state is not a removal event; it + // must not be translated into a downstream bank eviction. if !should_evict { continue; } @@ -606,7 +608,8 @@ mod tests { use tokio::sync::mpsc; use super::{ - errors::ChainlinkError, InnerChainlink, ReplicationModeAwareChainlink, + errors::ChainlinkError, should_schedule_bank_eviction, InnerChainlink, + ReplicationModeAwareChainlink, }; use crate::{ accounts_bank::mock::AccountsBankStub, @@ -788,6 +791,79 @@ mod tests { handle.await.unwrap(); } + #[test] + fn test_should_schedule_bank_eviction() { + let owner = Pubkey::new_unique(); + let mut account = AccountSharedData::new(1, 0, &owner); + assert!(should_schedule_bank_eviction(&account)); + + account.set_delegated(true); + assert!(!should_schedule_bank_eviction(&account)); + + account.set_delegated(false); + account.set_undelegating(true); + assert!(!should_schedule_bank_eviction(&account)); + + account.set_undelegating(false); + account.set_ephemeral(true); + assert!(!should_schedule_bank_eviction(&account)); + } + + #[tokio::test] + async fn test_subscribe_account_removals_skips_ephemeral_and_evicts_normal() + { + init_logger(); + + let accounts_bank = Arc::new(AccountsBankStub::default()); + let cloner = Arc::new(ClonerStub::new(accounts_bank.clone())); + let (removed_tx, removed_rx) = mpsc::channel(8); + let remote_account_provider = test_remote_account_provider().await; + + let ephemeral_pubkey = Pubkey::new_unique(); + let normal_pubkey = Pubkey::new_unique(); + let owner = Pubkey::new_unique(); + + let mut ephemeral_account = + AccountSharedData::new(1_000_000, 0, &owner); + ephemeral_account.set_ephemeral(true); + accounts_bank.insert(ephemeral_pubkey, ephemeral_account); + + let normal_account = AccountSharedData::new(1_000_000, 0, &owner); + accounts_bank.insert(normal_pubkey, normal_account); + + let handle = InnerChainlink::< + ChainRpcClientMock, + ChainPubsubClientMock, + AccountsBankStub, + ClonerStub, + >::subscribe_account_removals( + &accounts_bank, + &cloner, + &remote_account_provider, + removed_rx, + ); + + removed_tx.send(ephemeral_pubkey).await.unwrap(); + removed_tx.send(normal_pubkey).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if accounts_bank.get_account(&normal_pubkey).is_none() { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("normal removal notification should submit eviction"); + + assert!(accounts_bank.get_account(&ephemeral_pubkey).is_some()); + assert!(accounts_bank.get_account(&normal_pubkey).is_none()); + + drop(removed_tx); + handle.await.unwrap(); + } + #[tokio::test] async fn test_subscribe_account_removals_skips_evict_when_account_is_watched_again( ) { diff --git a/magicblock-chainlink/src/remote_account_provider/lru_cache.rs b/magicblock-chainlink/src/remote_account_provider/lru_cache.rs index 055c42b95..a0d4aacc1 100644 --- a/magicblock-chainlink/src/remote_account_provider/lru_cache.rs +++ b/magicblock-chainlink/src/remote_account_provider/lru_cache.rs @@ -377,6 +377,30 @@ mod tests { assert!(!cache.contains(&pubkey3)); } + #[tokio::test] + async fn test_lru_cache_skips_ephemeral_protected_candidate_and_evicts_next( + ) { + let capacity = NonZeroUsize::new(3).unwrap(); + let cache = AccountsLruCache::new(capacity); + + let ephemeral_pubkey = Pubkey::new_unique(); + let pubkey2 = Pubkey::new_unique(); + let pubkey3 = Pubkey::new_unique(); + let pubkey4 = Pubkey::new_unique(); + + cache.add(ephemeral_pubkey); + cache.add(pubkey2); + cache.add(pubkey3); + + let outcome = + cache.add_with_evict_filter(pubkey4, |pk| *pk != ephemeral_pubkey); + + assert_eq!(outcome, AddAccountOutcome::Evicted(pubkey2)); + assert!(cache.contains(&ephemeral_pubkey)); + assert!(cache.contains(&pubkey4)); + assert!(!cache.contains(&pubkey2)); + } + #[test] fn test_never_evicted_accounts() { let capacity = NonZeroUsize::new(3).unwrap(); diff --git a/magicblock-chainlink/src/remote_account_provider/mod.rs b/magicblock-chainlink/src/remote_account_provider/mod.rs index 4eecf1285..81737ce04 100644 --- a/magicblock-chainlink/src/remote_account_provider/mod.rs +++ b/magicblock-chainlink/src/remote_account_provider/mod.rs @@ -342,11 +342,12 @@ pub(crate) enum SubscriptionReleaseMode { pub(crate) struct CapacityEvictionProtection { pub delegated: bool, pub undelegating: bool, + pub ephemeral: bool, } impl CapacityEvictionProtection { pub fn is_protected(self) -> bool { - self.delegated || self.undelegating + self.delegated || self.undelegating || self.ephemeral } } @@ -861,6 +862,7 @@ impl RemoteAccountProvider { CapacityEvictionProtection { delegated: false, undelegating: false, + ephemeral: false, }, ) } @@ -1466,6 +1468,10 @@ impl RemoteAccountProvider { reason: SubscriptionReason, ) -> RemoteAccountProviderResult<()> { // 1. First realize subscription + if self.capacity_eviction_protection_for(pubkey).ephemeral { + trace!(pubkey = %pubkey, "Skipping subscription for ephemeral account"); + return Ok(()); + } self.pubsub_client.subscribe(*pubkey, None).await?; // 2. Add to LRU cache