Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ use self::{
RefreshDecision, ResolvedDelegatedAccounts, ResolvedPrograms,
},
};
use super::errors::{ChainlinkError, ChainlinkResult};
use super::{
errors::{ChainlinkError, ChainlinkResult},
should_schedule_bank_eviction,
};
use crate::{
chainlink::{
account_still_undelegating_on_chain::account_still_undelegating_on_chain,
Expand Down Expand Up @@ -235,10 +238,15 @@ where
.map(|account| CapacityEvictionProtection {
delegated: account.delegated(),
undelegating: account.undelegating(),
// Cloned base accounts are not ephemeral until after a successful evict
// so this won’t block legitimate cloned-account eviction
ephemeral: account.ephemeral()
&& account.owner() != &Pubkey::default(),
})
.unwrap_or(CapacityEvictionProtection {
delegated: false,
undelegating: false,
ephemeral: false,
})
},
);
Expand Down Expand Up @@ -679,7 +687,11 @@ where
update_slot,
"Dropping subscription update for account that is no longer watched"
);
if self.accounts_bank.get_account(&pubkey).is_some() {
if self
.accounts_bank
.get_account(&pubkey)
.is_some_and(|account| should_schedule_bank_eviction(&account))
{
if let Err(err) = self
.remote_account_provider
.send_removal_update(pubkey)
Expand Down
212 changes: 212 additions & 0 deletions magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,218 @@ async fn test_undelegation_tracking_window_is_protected_from_capacity_eviction()
assert!(remote_account_provider.is_watching(&tracking_pubkey));
}

#[tokio::test]
async fn test_ephemeral_account_is_protected_from_capacity_eviction() {
init_logger();
let validator_keypair = Keypair::new();
const CURRENT_SLOT: u64 = 100;

let protected_pubkey = random_pubkey();
let evictable_pubkey = random_pubkey();
let new_pubkey = random_pubkey();
let account_owner = random_pubkey();

let remote_account = Account {
lamports: 1_000_000,
data: vec![1, 2, 3, 4],
owner: account_owner,
executable: false,
rent_epoch: 0,
};

let FetcherTestCtx {
remote_account_provider,
accounts_bank,
..
} = setup_with_capacity(
[
(protected_pubkey, remote_account.clone()),
(evictable_pubkey, remote_account.clone()),
(new_pubkey, remote_account),
],
CURRENT_SLOT,
validator_keypair.insecure_clone(),
2,
)
.await;
let mut removed_rx = remote_account_provider
.try_get_removed_account_rx()
.expect("removed account receiver should be available");

remote_account_provider
.acquire_subscription(
&protected_pubkey,
SubscriptionReason::DirectAccount,
)
.await
.expect("failed to subscribe protected candidate");
remote_account_provider
.acquire_subscription(
&evictable_pubkey,
SubscriptionReason::DirectAccount,
)
.await
.expect("failed to subscribe evictable candidate");

let mut ephemeral_account =
AccountSharedData::new(1_000_000, 4, &account_owner);
ephemeral_account.set_ephemeral(true);
accounts_bank.insert(protected_pubkey, ephemeral_account);

remote_account_provider
.acquire_subscription(&new_pubkey, SubscriptionReason::DirectAccount)
.await
.expect("should evict the non-protected LRU candidate");

assert!(remote_account_provider.is_watching(&protected_pubkey));
assert!(remote_account_provider.is_watching(&new_pubkey));
assert!(!remote_account_provider.is_watching(&evictable_pubkey));
assert!(remote_account_provider
.pubsub_client()
.subscriptions_union()
.contains(&protected_pubkey));
assert!(!remote_account_provider
.pubsub_client()
.subscriptions_union()
.contains(&evictable_pubkey));
assert_eq!(removed_rx.try_recv().unwrap(), evictable_pubkey);
}

#[tokio::test]
async fn test_acquire_subscription_skips_ephemeral_account() {
init_logger();
let validator_keypair = Keypair::new();
const CURRENT_SLOT: u64 = 100;

let ephemeral_pubkey = random_pubkey();
let account_owner = random_pubkey();
let remote_account = Account {
lamports: 1_000_000,
data: vec![1, 2, 3, 4],
owner: account_owner,
executable: false,
rent_epoch: 0,
};

let FetcherTestCtx {
remote_account_provider,
accounts_bank,
..
} = setup(
[(ephemeral_pubkey, remote_account)],
CURRENT_SLOT,
validator_keypair,
)
.await;

let mut ephemeral_account =
AccountSharedData::new(1_000_000, 4, &account_owner);
ephemeral_account.set_ephemeral(true);
accounts_bank.insert(ephemeral_pubkey, ephemeral_account);

remote_account_provider
.acquire_subscription(
&ephemeral_pubkey,
SubscriptionReason::DirectAccount,
)
.await
.expect("ephemeral subscription should no-op successfully");

assert!(!remote_account_provider.is_watching(&ephemeral_pubkey));
assert!(!remote_account_provider
.pubsub_client()
.subscriptions_union()
.contains(&ephemeral_pubkey));
}

#[tokio::test]
async fn test_stale_subscription_update_does_not_notify_removal_for_ephemeral()
{
init_logger();
let validator_keypair = Keypair::new();
const CURRENT_SLOT: u64 = 100;

let ephemeral_pubkey = random_pubkey();
let normal_pubkey = random_pubkey();
let account_owner = random_pubkey();

let FetcherTestCtx {
fetch_cloner,
accounts_bank,
subscription_tx,
..
} = setup([], CURRENT_SLOT, validator_keypair).await;

let mut ephemeral_account =
AccountSharedData::new(1_000_000, 4, &account_owner);
ephemeral_account.set_ephemeral(true);
accounts_bank.insert(ephemeral_pubkey, ephemeral_account);

let normal_account = AccountSharedData::new(1_000_000, 4, &account_owner);
accounts_bank.insert(normal_pubkey, normal_account);

let mut removed_rx = fetch_cloner
.try_get_removed_account_rx()
.expect("removed account receiver should be available");
while removed_rx.try_recv().is_ok() {}

use crate::remote_account_provider::{
RemoteAccount, RemoteAccountUpdateSource,
};

let chain_update = Account {
lamports: 900_000,
data: vec![9, 9, 9, 9],
owner: account_owner,
executable: false,
rent_epoch: 0,
};

subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey: ephemeral_pubkey,
account: RemoteAccount::from_fresh_account(
chain_update.clone(),
CURRENT_SLOT,
RemoteAccountUpdateSource::Subscription,
),
source: SubscriptionSource::Account,
})
.await
.unwrap();

tokio::time::sleep(Duration::from_millis(50)).await;
assert!(matches!(
removed_rx.try_recv(),
Err(tokio::sync::mpsc::error::TryRecvError::Empty)
));

subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey: normal_pubkey,
account: RemoteAccount::from_fresh_account(
chain_update,
CURRENT_SLOT,
RemoteAccountUpdateSource::Subscription,
),
source: SubscriptionSource::Account,
})
.await
.unwrap();

let removed_pubkey = tokio::time::timeout(Duration::from_secs(1), async {
loop {
if let Ok(pubkey) = removed_rx.try_recv() {
return pubkey;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("normal account should enqueue removal notification");
assert_eq!(removed_pubkey, normal_pubkey);
}

#[tokio::test]
async fn test_delegated_cleanup_keeps_undelegation_tracking_subscription() {
init_logger();
Expand Down
94 changes: 85 additions & 9 deletions magicblock-chainlink/src/chainlink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ impl<T: ChainRpcClient, U: ChainPubsubClient, V: AccountsBank, C: Cloner>
}
}

pub fn should_schedule_bank_eviction(account: &AccountSharedData) -> bool {
!account.ephemeral() && !account.delegated() && !account.undelegating()
}

impl<T: ChainRpcClient, U: ChainPubsubClient, V: AccountsBank, C: Cloner>
InnerChainlink<T, U, V, C>
{
Expand Down Expand Up @@ -293,14 +297,13 @@ impl<T: ChainRpcClient, U: ChainPubsubClient, V: AccountsBank, C: Cloner>
// the overhead of building and submitting a doomed tx)
let should_evict = match accounts_bank.get_account(&pubkey) {
Some(account) => {
let undelegating = account.undelegating();
let delegated = account.delegated();
let evict = !undelegating && !delegated;
let evict = should_schedule_bank_eviction(&account);
if !evict {
trace!(
pubkey = %pubkey,
undelegating,
delegated,
ephemeral = account.ephemeral(),
undelegating = account.undelegating(),
delegated = account.delegated(),
owner = %account.owner(),
"Ignoring removal notification because bank \
state is protected; no EvictAccount \
Expand All @@ -311,9 +314,8 @@ impl<T: ChainRpcClient, U: ChainPubsubClient, V: AccountsBank, C: Cloner>
}
None => false,
};
// Skipping a delegated/undelegating LRU candidate is not a
// removal event; protected bank state must not be translated
// into a downstream bank eviction.
// Skipping protected bank state is not a removal event; it
// must not be translated into a downstream bank eviction.
if !should_evict {
continue;
}
Expand Down Expand Up @@ -606,7 +608,8 @@ mod tests {
use tokio::sync::mpsc;

use super::{
errors::ChainlinkError, InnerChainlink, ReplicationModeAwareChainlink,
errors::ChainlinkError, should_schedule_bank_eviction, InnerChainlink,
ReplicationModeAwareChainlink,
};
use crate::{
accounts_bank::mock::AccountsBankStub,
Expand Down Expand Up @@ -788,6 +791,79 @@ mod tests {
handle.await.unwrap();
}

#[test]
fn test_should_schedule_bank_eviction() {
let owner = Pubkey::new_unique();
let mut account = AccountSharedData::new(1, 0, &owner);
assert!(should_schedule_bank_eviction(&account));

account.set_delegated(true);
assert!(!should_schedule_bank_eviction(&account));

account.set_delegated(false);
account.set_undelegating(true);
assert!(!should_schedule_bank_eviction(&account));

account.set_undelegating(false);
account.set_ephemeral(true);
assert!(!should_schedule_bank_eviction(&account));
}

#[tokio::test]
async fn test_subscribe_account_removals_skips_ephemeral_and_evicts_normal()
{
init_logger();

let accounts_bank = Arc::new(AccountsBankStub::default());
let cloner = Arc::new(ClonerStub::new(accounts_bank.clone()));
let (removed_tx, removed_rx) = mpsc::channel(8);
let remote_account_provider = test_remote_account_provider().await;

let ephemeral_pubkey = Pubkey::new_unique();
let normal_pubkey = Pubkey::new_unique();
let owner = Pubkey::new_unique();

let mut ephemeral_account =
AccountSharedData::new(1_000_000, 0, &owner);
ephemeral_account.set_ephemeral(true);
accounts_bank.insert(ephemeral_pubkey, ephemeral_account);

let normal_account = AccountSharedData::new(1_000_000, 0, &owner);
accounts_bank.insert(normal_pubkey, normal_account);

let handle = InnerChainlink::<
ChainRpcClientMock,
ChainPubsubClientMock,
AccountsBankStub,
ClonerStub,
>::subscribe_account_removals(
&accounts_bank,
&cloner,
&remote_account_provider,
removed_rx,
);

removed_tx.send(ephemeral_pubkey).await.unwrap();
removed_tx.send(normal_pubkey).await.unwrap();

tokio::time::timeout(Duration::from_secs(1), async {
loop {
if accounts_bank.get_account(&normal_pubkey).is_none() {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("normal removal notification should submit eviction");

assert!(accounts_bank.get_account(&ephemeral_pubkey).is_some());
assert!(accounts_bank.get_account(&normal_pubkey).is_none());

drop(removed_tx);
handle.await.unwrap();
}

#[tokio::test]
async fn test_subscribe_account_removals_skips_evict_when_account_is_watched_again(
) {
Expand Down
Loading
Loading