Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rs/rust_canisters/statesync_test/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use candid::CandidType;
use candid::{CandidType, Principal};
use serde::{Deserialize, Serialize};

#[derive(Copy, Clone, CandidType, Deserialize, Serialize)]
#[derive(Clone, Debug, CandidType, Deserialize, Serialize)]
pub enum CanisterCreationStatus {
#[serde(rename = "idle")]
Idle,
#[serde(rename = "in_progress")]
InProgress(u64),
#[serde(rename = "done")]
Done(u64),
Done(Vec<Principal>),
}
47 changes: 40 additions & 7 deletions rs/rust_canisters/statesync_test/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use candid::Nat;
use futures::{StreamExt, stream};
use ic_cdk::futures::spawn;
use ic_cdk::management_canister::{
ProvisionalCreateCanisterWithCyclesArgs, provisional_create_canister_with_cycles,
CanisterSettings, ProvisionalCreateCanisterWithCyclesArgs, UpdateSettingsArgs,
provisional_create_canister_with_cycles, update_settings,
};
use ic_cdk::stable::{
WASM_PAGE_SIZE_IN_BYTES as PAGE_SIZE, stable_grow, stable_size, stable_write,
Expand Down Expand Up @@ -88,12 +90,13 @@ async fn read_state(index: usize) -> Result<u8, String> {

fn set_canister_creation_status(n: u64) -> bool {
let mut canister_creation_status_guard = CANISTER_CREATION_STATUS.lock().unwrap();
match *canister_creation_status_guard {
match &*canister_creation_status_guard {
CanisterCreationStatus::Idle => {
*canister_creation_status_guard = CanisterCreationStatus::InProgress(n);
true
}
CanisterCreationStatus::InProgress(num_canisters) => {
let num_canisters = *num_canisters;
if n == num_canisters {
false
} else {
Expand All @@ -102,7 +105,8 @@ fn set_canister_creation_status(n: u64) -> bool {
);
}
}
CanisterCreationStatus::Done(num_canisters) => {
CanisterCreationStatus::Done(canister_ids) => {
let num_canisters = canister_ids.len() as u64;
if n == num_canisters {
false
} else {
Expand Down Expand Up @@ -133,24 +137,53 @@ async fn create_many_canisters(n: u64) {
};
provisional_create_canister_with_cycles(&create_args)
.await
.expect("Failed to create canister");
.expect("Failed to create canister")
.canister_id
};
futs.push(fut);
}

stream::iter(futs)
let canister_ids = stream::iter(futs)
.buffer_unordered(500) // limit concurrency to 500 (inter-canister queue capacity)
.collect::<Vec<_>>()
.await;

let mut canister_creation_status_guard = CANISTER_CREATION_STATUS.lock().unwrap();
*canister_creation_status_guard = CanisterCreationStatus::Done(n);
*canister_creation_status_guard = CanisterCreationStatus::Done(canister_ids);
});
}

#[update]
async fn update_many_canisters() {
let canister_ids = match &*CANISTER_CREATION_STATUS.lock().unwrap() {
CanisterCreationStatus::Done(ids) => ids.clone(),
_ => ic_cdk::trap("Canister creation is not done yet"),
};

#[allow(clippy::disallowed_methods)]
spawn(async move {
stream::iter(canister_ids.into_iter().cycle().enumerate().map(
|(i, canister_id)| async move {
update_settings(&UpdateSettingsArgs {
canister_id,
settings: CanisterSettings {
freezing_threshold: Some(Nat::from(2592000_u64 + i as u64)),
..Default::default()
},
})
.await
.expect("Failed to update settings");
},
))
.buffer_unordered(500) // limit concurrency to 500 (inter-canister queue capacity)
.for_each(|()| async {})
.await;
});
}

#[query]
fn canister_creation_status() -> CanisterCreationStatus {
*CANISTER_CREATION_STATUS.lock().unwrap()
CANISTER_CREATION_STATUS.lock().unwrap().clone()
}

fn main() {}
Expand Down
3 changes: 2 additions & 1 deletion rs/rust_canisters/statesync_test/statesync_test.did
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
type canister_creation_status = variant {
idle;
in_progress : nat64;
done : nat64;
done : vec principal;
};

service : {
change_state : (nat32) -> (variant { Ok : nat64; Err : text });
read_state : (nat64) -> (variant { Ok : nat8; Err : text }) query;
write_random_data : (nat64, nat64, nat64) -> (variant { Ok : null; Err : text });
create_many_canisters : (nat64) -> ();
update_many_canisters : () -> ();
canister_creation_status : () -> (canister_creation_status) query;
}
48 changes: 48 additions & 0 deletions rs/rust_canisters/statesync_test/test/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,54 @@ fn test_create_many_canisters() {

// We created `num_canisters` in addition to the seed canister.
assert_eq!(env.num_running_canisters(), num_canisters + 1);

let created_canister_ids = match canister_creation_status() {
CanisterCreationStatus::Done(ids) => ids,
s => panic!("Expected Done, got {s:?}"),
};
assert_eq!(created_canister_ids.len(), num_canisters as usize);

let result = env
.execute_ingress(
seed_canister_id,
"update_many_canisters",
Encode!(&()).unwrap(),
)
.unwrap();
let _ = assert_reply(result);

let initial_versions: Vec<(_, u64)> = created_canister_ids
.iter()
.copied()
.map(|canister_id| {
let canister_id_ic = CanisterId::unchecked_from_principal(canister_id.into());
let version = env
.canister_status_query_as(seed_canister_id.into(), canister_id_ic)
.unwrap()
.unwrap()
.version();
(canister_id, version)
})
.collect();

loop {
let all_at_version =
initial_versions
.iter()
.copied()
.all(|(canister_id, initial_version)| {
let canister_id = CanisterId::unchecked_from_principal(canister_id.into());
env.canister_status_query_as(seed_canister_id.into(), canister_id)
.unwrap()
.unwrap()
.version()
>= initial_version + 2
});
if all_at_version {
break;
}
env.tick();
}
Comment thread
mraszyk marked this conversation as resolved.
Outdated
}

fn assert_reply(res: WasmResult) -> Vec<u8> {
Expand Down
19 changes: 19 additions & 0 deletions rs/state_machine_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4733,6 +4733,25 @@ impl StateMachine {
})
}

/// Queries the `canister_status` endpoint on the management canister of the specified sender.
/// Use this if the `canister_id` is controlled by `sender`.
pub fn canister_status_query_as(
&self,
sender: PrincipalId,
canister_id: CanisterId,
) -> Result<Result<CanisterStatusResultV2, String>, UserError> {
self.query_as(
sender,
CanisterId::ic_00(),
"canister_status",
(CanisterIdRecord::from(canister_id)).encode(),
)
.map(|wasm_result| match wasm_result {
WasmResult::Reply(reply) => Ok(Decode!(&reply, CanisterStatusResultV2).unwrap()),
WasmResult::Reject(reject_msg) => Err(reject_msg),
})
}

/// Deletes the canister with the specified ID.
pub fn delete_canister(&self, canister_id: CanisterId) -> Result<WasmResult, UserError> {
self.execute_ingress(
Expand Down
81 changes: 40 additions & 41 deletions rs/tests/message_routing/rejoin_test_lib/rejoin_test_lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use candid::{Decode, Encode, Principal};
use canister_test::{Canister, Runtime, Wasm};
use futures::future::join_all;
use ic_agent::Agent;
use ic_system_test_driver::driver::test_env::TestEnv;
use ic_system_test_driver::driver::test_env_api::get_dependency_path_from_env;
use ic_system_test_driver::driver::test_env_api::retry_async;
use ic_system_test_driver::driver::test_env_api::{HasPublicApiUrl, HasVm, IcNodeSnapshot};
use ic_system_test_driver::util::{MetricsFetcher, UniversalCanister, block_on, runtime_from_url};
use ic_types::PrincipalId;
use ic_universal_canister::wasm;
use ic_utils::interfaces::management_canister::ManagementCanister;
use slog::Logger;
use slog::info;
use statesync_test::CanisterCreationStatus;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -303,24 +300,6 @@ async fn deploy_seed_canister(
seed_canister_id
}

async fn deploy_busy_canister(agent: &Agent, effective_canister_id: PrincipalId, logger: &Logger) {
let universal_canister =
UniversalCanister::new_with_retries(agent, effective_canister_id, logger).await;
universal_canister
.update(
wasm()
.set_heartbeat(
wasm()
.instruction_counter_is_at_least(1_800_000_000)
.build(),
)
.reply()
.build(),
)
.await
.expect("Failed to set up a busy canister.");
}

async fn deploy_canisters_for_long_rounds(
logger: &slog::Logger,
nodes: Vec<IcNodeSnapshot>,
Expand Down Expand Up @@ -354,7 +333,7 @@ async fn deploy_canisters_for_long_rounds(
num_canisters_per_seed_canister * num_seed_canisters,
);
let mut create_many_canisters_futs = vec![];
for seed_canister_id in seed_canisters {
for seed_canister_id in seed_canisters.iter() {
let seed_canister_id_str = seed_canister_id.to_string();
info!(
logger,
Expand All @@ -366,7 +345,7 @@ async fn deploy_canisters_for_long_rounds(
let bytes = Encode!(&num_canisters_per_seed_canister)
.expect("Failed to candid encode argument for a seed canister");
let res = agent
.update(&seed_canister_id, "create_many_canisters")
.update(seed_canister_id, "create_many_canisters")
.with_arg(bytes)
.call_and_wait()
.await;
Expand All @@ -388,7 +367,7 @@ async fn deploy_canisters_for_long_rounds(
loop {
let bytes = Encode!(&()).expect("Failed to candid encode unit type");
let res = agent
.query(&seed_canister_id, "canister_creation_status")
.query(seed_canister_id, "canister_creation_status")
.with_arg(bytes)
.call()
.await;
Expand All @@ -409,7 +388,12 @@ async fn deploy_canisters_for_long_rounds(
"Canister creation on seed canister {seed_canister_id_str:?} is in progress ({n}). Retrying canister_creation_status query ...",
);
}
CanisterCreationStatus::Done(_) => {
CanisterCreationStatus::Done(canister_ids) => {
info!(
logger,
"Canister creation on seed canister {seed_canister_id_str:?} is done ({} canisters created).",
canister_ids.len(),
);
break;
}
}
Expand All @@ -428,26 +412,41 @@ async fn deploy_canisters_for_long_rounds(
}
join_all(create_many_canisters_futs).await;

// We deploy 8 "busy" canisters: this way,
// there are 2 canisters per each of the 4 scheduler threads
// and thus every thread executes 2 x 1.8B = 3.6B instructions.
let num_busy_canisters = 8;
info!(
logger,
"Deploying {} busy canisters on a node {} ({}) ...",
num_busy_canisters,
init_node.node_id,
init_node.get_public_url()
"Calling update_many_canisters on all seed canisters ..."
);
let mut create_busy_canisters_futs = vec![];
for _ in 0..num_busy_canisters {
create_busy_canisters_futs.push(deploy_busy_canister(
&agent,
init_node.effective_canister_id(),
logger,
));
let mut update_many_canisters_futs = vec![];
for seed_canister_id in seed_canisters.iter() {
let seed_canister_id_str = seed_canister_id.to_string();
let agent = agent.clone();
let fut = async move {
loop {
let bytes = Encode!(&()).expect("Failed to candid encode unit type");
let res = agent
.update(seed_canister_id, "update_many_canisters")
.with_arg(bytes)
.call_and_wait()
.await;
match res {
Ok(_) => break,
Err(err) => {
info!(
logger,
"Calling update_many_canisters on seed canister {seed_canister_id_str:?} failed because {err:?}. Retrying ...",
);
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
info!(
logger,
"Successfully called update_many_canisters on seed canister {seed_canister_id_str:?}.",
);
};
update_many_canisters_futs.push(fut);
}
join_all(create_busy_canisters_futs).await;
join_all(update_many_canisters_futs).await;
}

fn no_state_clone_count(node: IcNodeSnapshot, logger: &slog::Logger) -> u64 {
Expand Down
2 changes: 1 addition & 1 deletion rs/tests/message_routing/rejoin_test_long_rounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Runbook::
. setup the testnet of 3f + 1 nodes with f = 4 (like on mainnet)
. pick a random node and install 4 "seed" canisters through it (the state sync test canister is used as "seed")
. create 100,000 canisters via the "seed" canisters (in parallel)
. deploy 8 "busy" canisters (universal canister with heartbeats executing 1.8B instructions)
. make the "seed" canisters cycle through those 100,000 canisters (in parallel) and keep changing their canister state
. pick the slowest node required for consensus in terms of batch processing time and kill that node
. wait for the subnet producing a CUP
. start the killed node
Expand Down
Loading