From 9a29c69c11b84c18f10513adb1129d9892fdfbd5 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Fri, 24 Apr 2026 14:39:30 +0000 Subject: [PATCH 1/2] feat: [DSM-103] Actual priority queue for long executions Instead of a binary prioritized / opportunistic flag, explicitly (record and) prioritize long executions based on number of slices executed, AP and round when the long execution started. This ensures that we don't starve low priority canisters (which may happen with bounded AP and just the right distribution across execution cores). Also switch from persisting SubnetSchedule spread across individual canister states to persisting it as part of the subnet's SystemMetadata. --- rs/execution_environment/benches/scheduler.rs | 2 + rs/execution_environment/src/scheduler.rs | 5 +- .../src/scheduler/round_schedule.rs | 133 ++++++++++-------- .../src/scheduler/tests.rs | 3 + .../src/scheduler/tests/dts.rs | 60 +++----- .../src/scheduler/tests/metrics.rs | 3 +- .../src/scheduler/tests/scheduling.rs | 85 ++++------- .../def/state/metadata/v1/metadata.proto | 10 ++ .../src/gen/state/state.metadata.v1.rs | 15 ++ .../src/metadata_state/proto.rs | 46 +++++- .../src/metadata_state/subnet_schedule.rs | 23 +-- .../metadata_state/subnet_schedule/tests.rs | 5 +- .../src/metadata_state/tests.rs | 80 +++++++---- rs/state_machine_tests/src/lib.rs | 1 + rs/state_manager/src/checkpoint.rs | 10 +- rs/state_manager/src/split/tests.rs | 4 + rs/state_manager/src/tip.rs | 6 +- 17 files changed, 294 insertions(+), 197 deletions(-) diff --git a/rs/execution_environment/benches/scheduler.rs b/rs/execution_environment/benches/scheduler.rs index 67117d46e3ad..e35b98ed767b 100644 --- a/rs/execution_environment/benches/scheduler.rs +++ b/rs/execution_environment/benches/scheduler.rs @@ -78,6 +78,7 @@ fn main() { &executed_canisters, &executed_canisters, &BTreeSet::new(), + ExecutionRound::from(1), ); }); }); @@ -89,6 +90,7 @@ fn main() { &executed_canisters, &executed_canisters, &BTreeSet::new(), + ExecutionRound::from(1), ); group.bench_function("finish_round", |bench| { diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index 666341ce2742..efd60fafd565 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -553,6 +553,7 @@ impl SchedulerImpl { &executed_canisters, &canisters_with_completed_messages, &low_cycle_balance_canisters, + current_round, ); round_limits.instructions -= as_round_instructions( @@ -2202,10 +2203,10 @@ fn abort_canister( cost_schedule: CanisterCyclesCostSchedule, ) { if exec_env.abort_canister(canister, log, cost_schedule) { - // Reset the priority credit to zero. + // Reset `executed_slices` to zero. subnet_schedule .get_mut(canister.canister_id()) - .priority_credit = Default::default(); + .executed_slices = 0; } } diff --git a/rs/execution_environment/src/scheduler/round_schedule.rs b/rs/execution_environment/src/scheduler/round_schedule.rs index 2f057af306df..27f0da3d6c30 100644 --- a/rs/execution_environment/src/scheduler/round_schedule.rs +++ b/rs/execution_environment/src/scheduler/round_schedule.rs @@ -8,9 +8,7 @@ use ic_config::flag_status::FlagStatus; use ic_logger::{ReplicaLogger, error}; use ic_replicated_state::canister_state::NextExecution; use ic_replicated_state::{CanisterPriority, CanisterState, ReplicatedState}; -use ic_types::{ - AccumulatedPriority, ComputeAllocation, ExecutionRound, LongExecutionMode, NumInstructions, -}; +use ic_types::{AccumulatedPriority, ComputeAllocation, ExecutionRound, NumInstructions}; use ic_utils::iter::left_outer_join; use more_asserts::debug_assert_gt; use num_traits::SaturatingSub; @@ -30,7 +28,8 @@ const ZERO: AccumulatedPriority = AccumulatedPriority::new(0); const ONE_PERCENT: AccumulatedPriority = AccumulatedPriority::new(1 * MULTIPLIER); /// 100% in accumulated priority. -const ONE_HUNDRED_PERCENT: AccumulatedPriority = AccumulatedPriority::new(100 * MULTIPLIER); +pub(super) const ONE_HUNDRED_PERCENT: AccumulatedPriority = + AccumulatedPriority::new(100 * MULTIPLIER); const fn from_ca(ca: ComputeAllocation) -> AccumulatedPriority { AccumulatedPriority::new(ca.as_percent() as i64 * MULTIPLIER) @@ -47,21 +46,32 @@ pub(super) struct CanisterRoundState { accumulated_priority: AccumulatedPriority, /// Copy of the canister's `SchedulerState::compute_allocation`. compute_allocation: AccumulatedPriority, - /// Copy of the canister's `CanisterPriority::long_execution_mode`. - long_execution_mode: LongExecutionMode, - /// Whether the canister has a long execution. - has_long_execution: bool, + /// Number of DTS slices executed so far in the current long execution. + executed_slices: i64, + /// The round when the current long execution started. `None` means the canister + /// is not in a long execution. + long_execution_start_round: Option, } impl CanisterRoundState { pub fn new(canister: &CanisterState, canister_priority: &CanisterPriority) -> Self { + // Ensure that `long_execution_start_round` matches the canister state. + debug_assert_eq!( + canister.has_long_execution(), + canister_priority.long_execution_start_round.is_some(), + "canister: {:?}, task_queue: {:?}, canister_priority: {:?}", + canister.canister_id(), + canister.system_state.task_queue, + canister_priority, + ); + let compute_allocation = from_ca(canister.compute_allocation()); Self { canister_id: canister.canister_id(), accumulated_priority: canister_priority.accumulated_priority + compute_allocation, compute_allocation, - long_execution_mode: canister_priority.long_execution_mode, - has_long_execution: canister.has_long_execution(), + executed_slices: canister_priority.executed_slices, + long_execution_start_round: canister_priority.long_execution_start_round, } } @@ -72,17 +82,34 @@ impl CanisterRoundState { impl Ord for CanisterRoundState { fn cmp(&self, other: &Self) -> Ordering { - // Sort by: - // 1. Long execution mode, reversed (Prioritized -> Opportunistic) - other - .long_execution_mode - .cmp(&self.long_execution_mode) - // 2. Long execution (long execution -> new execution) - .then(other.has_long_execution.cmp(&self.has_long_execution)) - // 3. Accumulated priority, descending. - .then(other.accumulated_priority.cmp(&self.accumulated_priority)) - // 4. Canister ID, ascending. - .then(self.canister_id.cmp(&other.canister_id)) + // First, sort long executions before new. + match ( + self.long_execution_start_round, + other.long_execution_start_round, + ) { + (Some(_), None) => Ordering::Less, + (None, Some(_)) => Ordering::Greater, + + // Among new executions, sort by AP descending and break ties by canister ID. + (None, None) => other + .accumulated_priority + .cmp(&self.accumulated_priority) + .then_with(|| self.canister_id.cmp(&other.canister_id)), + + // Among long executions, sort by executed slices; AP descending; start round + // ascending; then break ties by canister ID. + // + // An aborted execution (executed slices == 0) is considered to have the same + // priority as a newly started long execution (executed slices == 1). This is to + // avoid starvation of aborted executions. + (Some(self_start_round), Some(other_start_round)) => other + .executed_slices + .max(1) + .cmp(&self.executed_slices.max(1)) + .then_with(|| other.accumulated_priority.cmp(&self.accumulated_priority)) + .then_with(|| self_start_round.cmp(&other_start_round)) + .then_with(|| self.canister_id.cmp(&other.canister_id)), + } } } @@ -269,7 +296,7 @@ impl RoundSchedule { state: &mut ReplicatedState, is_first_iteration: bool, ) -> IterationSchedule { - let (canister_states, subnet_schedule) = state.canisters_and_schedule_mut(); + let (canister_states, _) = state.canisters_and_schedule_mut(); // Collect all active canisters and their next executions. let canister_next_executions: BTreeMap<_, _> = canister_states @@ -345,11 +372,6 @@ impl RoundSchedule { .take(long_execution_cores) .for_each(|canister| { observe_scheduled_as_first(canister); - - // And set prioritized long execution mode for the first `long_execution_cores` - // canisters. - subnet_schedule.get_mut(*canister).long_execution_mode = - LongExecutionMode::Prioritized; }); schedule .iter() @@ -376,16 +398,20 @@ impl RoundSchedule { executed_canisters: &BTreeSet, canisters_with_completed_messages: &BTreeSet, low_cycle_balance_canisters: &BTreeSet, + current_round: ExecutionRound, ) { self.executed_canisters.extend(executed_canisters); self.canisters_with_completed_messages .extend(canisters_with_completed_messages); - for canister_id in canisters_with_completed_messages { - // If a canister has completed a long execution, reset its long execution mode. + // If a canister has completed a long execution, clear its start round. + // + // A canister may run out of cycles while in a long execution (e.g. if making + // calls). Also include low cycle balance canisters. + for canister_id in canisters_with_completed_messages.union(low_cycle_balance_canisters) { state .canister_priority_mut(*canister_id) - .long_execution_mode = LongExecutionMode::Opportunistic; + .long_execution_start_round = None; } for canister_id in executed_canisters.union(low_cycle_balance_canisters) { @@ -394,9 +420,17 @@ impl RoundSchedule { .map(|canister| canister.next_execution()) .unwrap_or(NextExecution::None) { - // Either all messages or a full long execution slice completed. - NextExecution::None | NextExecution::ContinueLong => { + // Completed all messages. + NextExecution::None => { + self.fully_executed_canisters.insert(*canister_id); + } + // Completed a long execution slice. + NextExecution::ContinueLong => { self.fully_executed_canisters.insert(*canister_id); + state + .canister_priority_mut(*canister_id) + .long_execution_start_round + .get_or_insert(current_round); } NextExecution::StartNew => {} NextExecution::ContinueInstallCode => { @@ -420,10 +454,10 @@ impl RoundSchedule { ) { let (canister_states, subnet_schedule) = state.canisters_and_schedule_mut(); - // Charge canisters for full executions in this round. + // Update fully executed canisters' priorities. for canister_id in self.fully_executed_canisters.iter() { let canister_priority = subnet_schedule.get_mut(*canister_id); - canister_priority.priority_credit += ONE_HUNDRED_PERCENT; + canister_priority.executed_slices += 1; canister_priority.last_full_execution_round = current_round; } @@ -438,20 +472,20 @@ impl RoundSchedule { canister_priority.accumulated_priority += from_ca(canister.compute_allocation()); - // Apply the priority credit if not in the same long execution as at the - // beginning of the round. - if canister_priority.priority_credit != ZERO + // On message completion (or short execution), charge for the executed slices. + if canister_priority.executed_slices > 0 && (!canister.has_long_execution() || self .canisters_with_completed_messages .contains(&canister.canister_id())) { canister_priority.accumulated_priority -= - std::mem::take(&mut canister_priority.priority_credit); + ONE_HUNDRED_PERCENT * canister_priority.executed_slices; + canister_priority.executed_slices = 0; } - free_allocation -= - canister_priority.accumulated_priority - canister_priority.priority_credit; + free_allocation -= canister_priority.accumulated_priority + - ONE_HUNDRED_PERCENT * canister_priority.executed_slices; } self.grant_heap_delta_and_install_code_credits(state, metrics); @@ -615,7 +649,6 @@ impl RoundSchedule { for &canister_id in canister_states.keys() { let canister_priority = subnet_schedule.get_mut(canister_id); canister_priority.accumulated_priority = Default::default(); - canister_priority.priority_credit = Default::default(); } } @@ -672,7 +705,7 @@ impl RoundSchedule { // De-facto compute allocation includes bonus allocation let factual = rs.compute_allocation + free_capacity_per_canister; // Count long executions and sum up their compute allocation. - if rs.has_long_execution { + if rs.long_execution_start_round.is_some() { long_executions_compute_allocation += factual; number_of_long_executions += 1; } @@ -706,7 +739,7 @@ impl RoundSchedule { ); round_states.sort(); - let round_schedule = RoundSchedule::new( + RoundSchedule::new( scheduler_cores, heap_delta_rate_limit, rate_limiting_of_heap_delta, @@ -723,19 +756,7 @@ impl RoundSchedule { .take(number_of_long_executions) .map(|rs| rs.canister_id) .collect(), - ); - - for canister_id in round_schedule - .ordered_long_execution_canister_ids - .iter() - .take(long_execution_cores) - { - state - .canister_priority_mut(*canister_id) - .long_execution_mode = LongExecutionMode::Prioritized; - } - - round_schedule + ) } } diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index b2cf1107a28b..e6f523856e4d 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -67,6 +67,9 @@ fn state_sync_clears_paused_execution_registry() { // state with the clean copy. The registry still holds the orphaned paused // execution entry. test.state_mut().put_canister_state(clean_canister); + let canister_priority = test.state_mut().canister_priority_mut(canister); + canister_priority.long_execution_start_round = None; + canister_priority.executed_slices = 0; assert!(!test.canister_state(canister).has_long_execution()); // Execute another round. The scheduler detects that no canister has a paused diff --git a/rs/execution_environment/src/scheduler/tests/dts.rs b/rs/execution_environment/src/scheduler/tests/dts.rs index 7f5852627ad1..91d4b074208d 100644 --- a/rs/execution_environment/src/scheduler/tests/dts.rs +++ b/rs/execution_environment/src/scheduler/tests/dts.rs @@ -9,7 +9,6 @@ use candid::Encode; use ic_config::subnet_config::SchedulerConfig; use ic_management_canister_types_private::{CanisterIdRecord, Method}; use ic_registry_subnet_type::SubnetType; -use ic_types::LongExecutionMode; use ic_types::methods::SystemMethod; use ic_types_test_utils::ids::canister_test_id; use more_asserts::assert_le; @@ -205,26 +204,14 @@ fn dts_long_execution_aborted_after_checkpoint() { // Canister has a paused execution and non-zero priority credit. assert!(test.canister_state(canister).has_paused_execution()); - assert_ne!( - test.state() - .canister_priority(&canister) - .priority_credit - .get(), - 0 - ); + assert_ne!(test.state().canister_priority(&canister).executed_slices, 0); test.execute_round(ExecutionRoundType::CheckpointRound); // After a checkpoint round, the canister has an aborted execution and zero // priority credit. assert!(test.canister_state(canister).has_aborted_execution()); - assert_eq!( - test.state() - .canister_priority(&canister) - .priority_credit - .get(), - 0 - ); + assert_eq!(test.state().canister_priority(&canister).executed_slices, 0); // Complete the long execution. for _ in 0..3 { @@ -238,13 +225,7 @@ fn dts_long_execution_aborted_after_checkpoint() { // After completion, there is no paused or aborted execution. And the priority // credit is again zero. assert!(!test.canister_state(canister).has_long_execution()); - assert_eq!( - test.state() - .canister_priority(&canister) - .priority_credit - .get(), - 0 - ); + assert_eq!(test.state().canister_priority(&canister).executed_slices, 0); // 2 + 3 slices were executed. assert_eq!(test.scheduler().metrics.round.slices.get_sample_sum(), 5.0); @@ -324,11 +305,11 @@ fn respect_max_paused_executions( let priority = subnet_schedule.get(&canister.canister_id()); if canister.has_paused_execution() { // All paused executions have non-zero priority credit. - assert_ne!(priority.priority_credit.get(), 0); + assert_ne!(priority.executed_slices, 0); true } else { // All aborted (or not started) executions have zero priority credit. - assert_eq!(priority.priority_credit.get(), 0); + assert_eq!(priority.executed_slices, 0); false } }) @@ -778,28 +759,31 @@ fn abort_paused_executions_keeps_highest_priority() { test.send_ingress(mid, ingress(100)); test.execute_round(ExecutionRoundType::OrdinaryRound); - // The low compute allocation canister is classified as `Prioritized`, as it has - // executed a second slice. + // The low compute allocation canister started execution a round earlier. assert_eq!( - test.state().canister_priority(&low).long_execution_mode, - LongExecutionMode::Prioritized + test.state() + .canister_priority(&low) + .long_execution_start_round, + Some(ExecutionRound::new(0)) ); - // The high and mid compute allocation canisters are only `Opportunistic`. + // The high and mid compute allocation canisters started a round later. assert_eq!( - test.state().canister_priority(&high).long_execution_mode, - LongExecutionMode::Opportunistic + test.state() + .canister_priority(&high) + .long_execution_start_round, + Some(ExecutionRound::new(1)) ); assert_eq!( - test.state().canister_priority(&mid).long_execution_mode, - LongExecutionMode::Opportunistic + test.state() + .canister_priority(&mid) + .long_execution_start_round, + Some(ExecutionRound::new(1)) ); - // The low compute allocation canister keeps its paused execution (its - // `Prioritized` mode actually gives it the highest priority). + // `low` has the most executed slices (highest priority among long + // executions), `high` has the highest CA (second priority). `mid` is + // aborted. assert!(test.canister_state(low).has_paused_execution()); - // The high compute allocation canister comes in second, so it's still paused. assert!(test.canister_state(high).has_paused_execution()); - // The medium compute allocation canister had the lowest priority, so it was - // aborted. assert!(test.canister_state(mid).has_aborted_execution()); } diff --git a/rs/execution_environment/src/scheduler/tests/metrics.rs b/rs/execution_environment/src/scheduler/tests/metrics.rs index 6b1dc77e1438..965903952978 100644 --- a/rs/execution_environment/src/scheduler/tests/metrics.rs +++ b/rs/execution_environment/src/scheduler/tests/metrics.rs @@ -229,8 +229,7 @@ fn can_record_metrics_for_a_round() { // Bump up the round number. test.execute_round(ExecutionRoundType::OrdinaryRound); - // For allocation violation to happen, the canister age should be more than `100/9 = 11 rounds` - // plus 2 rounds already executed. + // Advance 11 rounds (relative to the 2 rounds already executed). test.advance_to_round(ExecutionRound::from(11 + 2)); test.execute_round(ExecutionRoundType::OrdinaryRound); diff --git a/rs/execution_environment/src/scheduler/tests/scheduling.rs b/rs/execution_environment/src/scheduler/tests/scheduling.rs index ff3b72594519..fb670427060f 100644 --- a/rs/execution_environment/src/scheduler/tests/scheduling.rs +++ b/rs/execution_environment/src/scheduler/tests/scheduling.rs @@ -4,15 +4,14 @@ use super::test_utilities::{ SchedulerTest, SchedulerTestBuilder, ingress, instructions, on_response, other_side, }; use super::*; -use assert_matches::assert_matches; use ic_config::subnet_config::{SchedulerConfig, SubnetConfig}; use ic_registry_subnet_type::SubnetType; use ic_replicated_state::testing::CanisterQueuesTesting; +use ic_types::ComputeAllocation; use ic_types::ingress::IngressStatus; use ic_types::methods::SystemMethod; -use ic_types::{ComputeAllocation, LongExecutionMode}; use ic_types_cycles::Cycles; -use more_asserts::{assert_ge, assert_le, assert_lt}; +use more_asserts::{assert_ge, assert_gt, assert_le}; use std::cmp::min; use std::collections::HashMap; use std::convert::TryFrom; @@ -283,71 +282,45 @@ fn scheduler_long_execution_progress_across_checkpoints() { }) .build(); - let penalized_long_id = test.create_canister(); - let other_long_id = test.create_canister(); + let long_id = test.create_canister(); let mut canister_ids = vec![]; for _ in 0..num_canisters { let canister_id = test.create_canister(); + test.send_ingress(canister_id, ingress(slice_instructions)); canister_ids.push(canister_id); } - // Penalize canister for a long execution. - let message_id = test.send_ingress(penalized_long_id, ingress(message_instructions)); - assert_eq!(test.ingress_state(&message_id), IngressState::Received); - for i in 0..message_instructions / slice_instructions { - // Without short executions, all idle canister will be equally executed. - if let Some(canister_id) = canister_ids.get(i as usize % num_canisters) { - test.send_ingress(*canister_id, ingress(slice_instructions)); - } - test.execute_round(ExecutionRoundType::OrdinaryRound); - } - assert_matches!(test.ingress_state(&message_id), IngressState::Failed(_)); - // Assert penalized canister accumulated priority is lower. - let penalized = test.state().canister_priority(&penalized_long_id); - let other = test.state().canister_priority(&other_long_id); - assert_lt!(penalized.accumulated_priority, other.accumulated_priority); - - // Start another long execution on the penalized canister. - test.send_ingress(penalized_long_id, ingress(message_instructions)); - test.execute_round(ExecutionRoundType::OrdinaryRound); - test.execute_round(ExecutionRoundType::OrdinaryRound); - // Assert the LEM is prioritized. - let penalized = test.state().canister_priority(&penalized_long_id); - assert_eq!( - penalized.long_execution_mode, - LongExecutionMode::Prioritized - ); - - // Start a long execution on another non-penalized canister. - test.send_ingress(other_long_id, ingress(message_instructions)); + // Start a long execution. + test.send_ingress(long_id, ingress(message_instructions)); test.execute_round(ExecutionRoundType::OrdinaryRound); test.execute_round(ExecutionRoundType::OrdinaryRound); - // Assert the LEM is opportunistic. - let other = test.state().canister_priority(&other_long_id); - assert_eq!(other.long_execution_mode, LongExecutionMode::Opportunistic); + // Assert that there's long execution progress. + let priority = test.state().canister_priority(&long_id); + assert!(priority.long_execution_start_round.is_some()); + let start_round = priority.long_execution_start_round; - // Abort both canisters on checkpoint. + // Abort the long execution on checkpoint. test.execute_round(ExecutionRoundType::CheckpointRound); - // Assert penalized canister accumulated priority is still lower. - let penalized = test.state().canister_priority(&penalized_long_id); - let other = test.state().canister_priority(&other_long_id); - assert_lt!(penalized.accumulated_priority, other.accumulated_priority); + // Assert that the long execution canister still has the same start round. + let priority = test.state().canister_priority(&long_id); + assert_eq!(priority.long_execution_start_round, start_round); - let penalized = test.state().canister_state(&penalized_long_id).unwrap(); - let penalized_executed_before = penalized.system_state.canister_metrics().executed(); + let canister = test.state().canister_state(&long_id).unwrap(); + let executed_before = canister.system_state.canister_metrics().executed(); + assert_gt!(executed_before, 0); - // Send a bunch of messages. + // Send a bunch of messages to the other canisters. for canister_id in &canister_ids { test.send_ingress(*canister_id, ingress(slice_instructions)); } - // Assert that after the checkpoint the penalized canister continues its long execution. + // After the checkpoint, the long execution continues. test.execute_round(ExecutionRoundType::OrdinaryRound); - let penalized = test.state().canister_state(&penalized_long_id).unwrap(); + let canister = test.state().canister_state(&long_id).unwrap(); assert_eq!( - penalized_executed_before + 1, - penalized.system_state.canister_metrics().executed() + executed_before + 1, + canister.system_state.canister_metrics().executed() ); } @@ -975,7 +948,7 @@ fn inner_round_first_execution_is_not_a_full_execution() { let mut total_priority_credit = 0; for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() { total_accumulated_priority += canister_priority.accumulated_priority.get(); - total_priority_credit += canister_priority.priority_credit.get(); + total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices; } // The accumulated priority invariant should be respected. assert_eq!(total_accumulated_priority - total_priority_credit, 0); @@ -1034,7 +1007,7 @@ fn inner_round_long_execution_is_a_full_execution() { let mut total_priority_credit = 0; for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() { total_accumulated_priority += canister_priority.accumulated_priority.get(); - total_priority_credit += canister_priority.priority_credit.get(); + total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices; } // The accumulated priority invariant should be respected. assert_eq!(total_accumulated_priority - total_priority_credit, 0); @@ -1088,7 +1061,7 @@ fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores: let mut total_priority_credit = 0; for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() { total_accumulated_priority += canister_priority.accumulated_priority.get(); - total_priority_credit += canister_priority.priority_credit.get(); + total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices; } prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0); @@ -1133,7 +1106,7 @@ fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores: let mut total_priority_credit = 0; for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() { total_accumulated_priority += canister_priority.accumulated_priority.get(); - total_priority_credit += canister_priority.priority_credit.get(); + total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices; } prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0); } @@ -1189,13 +1162,13 @@ fn charge_idle_canisters_for_full_execution_round() { let mut total_priority_credit = 0; for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() { // Assert there is no divergency in accumulated priorities. - let priority = - canister_priority.accumulated_priority - canister_priority.priority_credit; + let priority = canister_priority.accumulated_priority + - ONE_HUNDRED_PERCENT * canister_priority.executed_slices; assert_le!(priority.get(), 100 * MULTIPLIER); assert_ge!(priority.get(), -100 * MULTIPLIER); total_accumulated_priority += canister_priority.accumulated_priority.get(); - total_priority_credit += canister_priority.priority_credit.get(); + total_priority_credit += ONE_HUNDRED_PERCENT.get() * canister_priority.executed_slices; } // The accumulated priority invariant should be respected. assert_eq!(total_accumulated_priority - total_priority_credit, 0); diff --git a/rs/protobuf/def/state/metadata/v1/metadata.proto b/rs/protobuf/def/state/metadata/v1/metadata.proto index a00dd2fe0152..467ec8723529 100644 --- a/rs/protobuf/def/state/metadata/v1/metadata.proto +++ b/rs/protobuf/def/state/metadata/v1/metadata.proto @@ -408,6 +408,16 @@ message SystemMetadata { repeated ApiBoundaryNodeEntry api_boundary_nodes = 21; registry.subnet.v1.ResourceLimits own_resource_limits = 24; + + repeated CanisterPriority subnet_schedule = 25; +} + +message CanisterPriority { + types.v1.CanisterId canister_id = 1; + int64 accumulated_priority = 2; + int64 executed_slices = 3; + optional uint64 long_execution_start_round = 4; + uint64 last_full_execution_round = 5; } message StableMemory { diff --git a/rs/protobuf/src/gen/state/state.metadata.v1.rs b/rs/protobuf/src/gen/state/state.metadata.v1.rs index 72cfce0a2222..e9fd823aaf7d 100644 --- a/rs/protobuf/src/gen/state/state.metadata.v1.rs +++ b/rs/protobuf/src/gen/state/state.metadata.v1.rs @@ -589,6 +589,21 @@ pub struct SystemMetadata { #[prost(message, optional, tag = "24")] pub own_resource_limits: ::core::option::Option, + #[prost(message, repeated, tag = "25")] + pub subnet_schedule: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CanisterPriority { + #[prost(message, optional, tag = "1")] + pub canister_id: ::core::option::Option, + #[prost(int64, tag = "2")] + pub accumulated_priority: i64, + #[prost(int64, tag = "3")] + pub executed_slices: i64, + #[prost(uint64, optional, tag = "4")] + pub long_execution_start_round: ::core::option::Option, + #[prost(uint64, tag = "5")] + pub last_full_execution_round: u64, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct StableMemory { diff --git a/rs/replicated_state/src/metadata_state/proto.rs b/rs/replicated_state/src/metadata_state/proto.rs index 1a1c61add50d..72e0b2a248d7 100644 --- a/rs/replicated_state/src/metadata_state/proto.rs +++ b/rs/replicated_state/src/metadata_state/proto.rs @@ -1,4 +1,5 @@ use super::*; +use crate::CanisterPriority; use ic_base_types::subnet_id_try_from_option; use ic_protobuf::registry::subnet::v1::CanisterCyclesCostSchedule as CanisterCyclesCostScheduleProto; use ic_protobuf::state::system_metadata::v1::ThresholdSignatureAgreementsEntry; @@ -13,7 +14,7 @@ use ic_protobuf::{ }, types::v1 as pb_types, }; -use ic_types::subnet_id_try_from_protobuf; +use ic_types::{AccumulatedPriority, ExecutionRound, subnet_id_try_from_protobuf}; impl From<&NetworkTopology> for pb_metadata::NetworkTopology { fn from(item: &NetworkTopology) -> Self { @@ -369,12 +370,28 @@ impl From<&SystemMetadata> for pb_metadata::SystemMetadata { ) .collect(), blockmaker_metrics_time_series: Some((&item.blockmaker_metrics_time_series).into()), + subnet_schedule: item + .subnet_schedule + .iter() + .map(|(canister_id, priority)| pb_metadata::CanisterPriority { + canister_id: Some(pb_types::CanisterId::from(*canister_id)), + accumulated_priority: priority.accumulated_priority.get(), + executed_slices: priority.executed_slices, + long_execution_start_round: priority + .long_execution_start_round + .map(|round| round.get()), + last_full_execution_round: priority.last_full_execution_round.get(), + }) + .collect(), } } } /// Decodes a `SystemMetadata` proto. The metrics are provided as a side-channel /// for recording errors without being forced to return `Err(_)`. +/// +/// `fallback_subnet_schedule` is used for backward compatibility when loading +/// old checkpoints that don't have the `subnet_schedule` field in the proto. impl TryFrom<( pb_metadata::SystemMetadata, @@ -385,7 +402,7 @@ impl type Error = ProxyDecodeError; fn try_from( - (item, subnet_schedule, metrics): ( + (item, fallback_subnet_schedule, metrics): ( pb_metadata::SystemMetadata, SubnetSchedule, &dyn CheckpointLoadingMetrics, @@ -399,6 +416,31 @@ impl ); } + let subnet_schedule = if !item.subnet_schedule.is_empty() { + let mut priorities = BTreeMap::new(); + for entry in &item.subnet_schedule { + let canister_id = CanisterId::try_from(entry.canister_id.clone().ok_or( + ProxyDecodeError::MissingField("CanisterPriority::canister_id"), + )?)?; + priorities.insert( + canister_id, + CanisterPriority { + accumulated_priority: AccumulatedPriority::new(entry.accumulated_priority), + executed_slices: entry.executed_slices, + long_execution_start_round: entry + .long_execution_start_round + .map(ExecutionRound::new), + last_full_execution_round: ExecutionRound::new( + entry.last_full_execution_round, + ), + }, + ); + } + SubnetSchedule::new(priorities) + } else { + fallback_subnet_schedule + }; + let canister_allocation_ranges: CanisterIdRanges = match item.canister_allocation_ranges { Some(canister_allocation_ranges) => canister_allocation_ranges.try_into()?, None => Default::default(), diff --git a/rs/replicated_state/src/metadata_state/subnet_schedule.rs b/rs/replicated_state/src/metadata_state/subnet_schedule.rs index 2734ebd9b30f..a0bb5cbcb622 100644 --- a/rs/replicated_state/src/metadata_state/subnet_schedule.rs +++ b/rs/replicated_state/src/metadata_state/subnet_schedule.rs @@ -1,4 +1,4 @@ -use ic_types::{AccumulatedPriority, CanisterId, ExecutionRound, LongExecutionMode}; +use ic_types::{AccumulatedPriority, CanisterId, ExecutionRound}; use ic_validate_eq::ValidateEq; use std::collections::BTreeMap; @@ -13,16 +13,17 @@ pub struct CanisterPriority { /// in the vector d that corresponds to this canister. pub accumulated_priority: AccumulatedPriority, - /// Keeps the current priority credit of this Canister, accumulated during long - /// executions. + /// Number of DTS slices executed so far in the current long execution, if any. + /// (Also used transiently by `finish_round()` to charge for full executions.) /// - /// During long executions, the Canister is temporarily credited with priority - /// to slightly boost the long execution priority. Only when the long execution - /// is done, then the `accumulated_priority` is decreased by the `priority_credit`. - pub priority_credit: AccumulatedPriority, + /// During a long execution, this is incremented for each DTS slice executed. + /// In the meantime, the canister accumulates priority normally. It is only + /// charged for these slices when the long execution completes. + pub executed_slices: i64, - /// Long execution mode: Opportunistic (default) or Prioritized - pub long_execution_mode: LongExecutionMode, + /// The round when the current long execution started. `None` means the canister + /// is not in a long execution. + pub long_execution_start_round: Option, /// The last full round that a canister got the chance to execute. This /// means that the canister was given the first pulse in the round or @@ -35,8 +36,8 @@ impl CanisterPriority { /// subnet schedule. pub const DEFAULT: CanisterPriority = CanisterPriority { accumulated_priority: AccumulatedPriority::new(0), - priority_credit: AccumulatedPriority::new(0), - long_execution_mode: LongExecutionMode::Opportunistic, + executed_slices: 0, + long_execution_start_round: None, last_full_execution_round: ExecutionRound::new(0), }; } diff --git a/rs/replicated_state/src/metadata_state/subnet_schedule/tests.rs b/rs/replicated_state/src/metadata_state/subnet_schedule/tests.rs index 922274f47b2e..2bd4ccda7c88 100644 --- a/rs/replicated_state/src/metadata_state/subnet_schedule/tests.rs +++ b/rs/replicated_state/src/metadata_state/subnet_schedule/tests.rs @@ -1,4 +1,5 @@ use super::*; +use ic_types::LongExecutionMode; use strum::IntoEnumIterator; #[test] @@ -35,8 +36,8 @@ fn get() { fn validate_eq() { let some_priority = CanisterPriority { accumulated_priority: AccumulatedPriority::new(1), - priority_credit: AccumulatedPriority::new(2), - long_execution_mode: LongExecutionMode::Opportunistic, + executed_slices: 2, + long_execution_start_round: Some(ExecutionRound::new(3)), last_full_execution_round: ExecutionRound::new(4), }; let canister_id1 = CanisterId::from_u64(1); diff --git a/rs/replicated_state/src/metadata_state/tests.rs b/rs/replicated_state/src/metadata_state/tests.rs index f6682ea4653e..8a1789db8884 100644 --- a/rs/replicated_state/src/metadata_state/tests.rs +++ b/rs/replicated_state/src/metadata_state/tests.rs @@ -4,8 +4,8 @@ use super::subnet_call_context_manager::{ SubnetCallContext, SubnetCallContextManager, ThresholdArguments, }; use super::*; -use crate::InputQueueType; use crate::testing::{CanisterQueuesTesting, StreamTesting}; +use crate::{CanisterPriority, InputQueueType}; use assert_matches::assert_matches; use ic_crypto_test_utils_canister_threshold_sigs::{ CanisterThresholdSigTestEnvironment, IDkgParticipants, generate_ecdsa_presig_quadruple, @@ -370,6 +370,32 @@ fn system_metadata_roundtrip_encoding() { system_metadata.bitcoin_get_successors_follow_up_responses = btreemap! { 10.into() => vec![vec![1], vec![2]] }; + // Observe two `BlockmakerMetrics` on successive days. + system_metadata.blockmaker_metrics_time_series.observe( + Time::from_nanos_since_unix_epoch(0), + &BlockmakerMetrics { + blockmaker: node_test_id(1), + failed_blockmakers: vec![node_test_id(2)], + }, + ); + system_metadata.blockmaker_metrics_time_series.observe( + Time::from_nanos_since_unix_epoch(0) + Duration::from_secs(24 * 3600), + &BlockmakerMetrics { + blockmaker: node_test_id(3), + failed_blockmakers: vec![node_test_id(4)], + }, + ); + + // Add scheduling priority for a canister. + *system_metadata + .subnet_schedule + .get_mut(CanisterId::from_u64(1)) = CanisterPriority { + accumulated_priority: 100.into(), + executed_slices: 2, + long_execution_start_round: Some(3.into()), + last_full_execution_round: 4.into(), + }; + // Validates that a roundtrip encode-decode results in the same `SystemMetadata`. fn validate_roundtrip_encoding(system_metadata: &SystemMetadata) { let proto = pb::SystemMetadata::from(system_metadata); @@ -377,7 +403,7 @@ fn system_metadata_roundtrip_encoding() { *system_metadata, ( proto, - system_metadata.subnet_schedule.clone(), + SubnetSchedule::default(), &DummyMetrics as &dyn CheckpointLoadingMetrics ) .try_into() @@ -399,30 +425,36 @@ fn system_metadata_roundtrip_encoding() { // Set `last_generated_canister_id` to valid, but migrated canister ID. system_metadata.last_generated_canister_id = Some(15.into()); validate_roundtrip_encoding(&system_metadata); +} - // Observe two `BlockmakerMetrics` on successive days. - system_metadata.blockmaker_metrics_time_series.observe( - Time::from_nanos_since_unix_epoch(0), - &BlockmakerMetrics { - blockmaker: node_test_id(1), - failed_blockmakers: vec![node_test_id(2)], - }, - ); - system_metadata.blockmaker_metrics_time_series.observe( - Time::from_nanos_since_unix_epoch(0) + Duration::from_secs(24 * 3600), - &BlockmakerMetrics { - blockmaker: node_test_id(3), - failed_blockmakers: vec![node_test_id(4)], - }, - ); - validate_roundtrip_encoding(&system_metadata); +#[test] +fn subnet_schedule_backward_compatibility() { + // Old encoding: `SystemMetadata` without `subnet_schedule`, plus a + // `SubnetSchedule` aggregated from canister states. + let system_metadata = SystemMetadata::new(SUBNET_0, SubnetType::Application); + let mut subnet_schedule = SubnetSchedule::default(); + *subnet_schedule.get_mut(CanisterId::from_u64(1)) = CanisterPriority { + accumulated_priority: 100.into(), + executed_slices: 2, + long_execution_start_round: Some(3.into()), + last_full_execution_round: 4.into(), + }; - // Add scheduling priority for a canister. - system_metadata - .subnet_schedule - .get_mut(CanisterId::from_u64(1)) - .accumulated_priority = 1.into(); - validate_roundtrip_encoding(&system_metadata); + // Expected decoded `SystemMetadata` has populated `subnet_schedule`. + let mut expected = system_metadata.clone(); + expected.subnet_schedule = subnet_schedule.clone(); + + let proto = pb_metadata::SystemMetadata::from(&system_metadata); + assert_eq!( + expected, + ( + proto, + subnet_schedule, + &DummyMetrics as &dyn CheckpointLoadingMetrics + ) + .try_into() + .unwrap() + ); } #[test] diff --git a/rs/state_machine_tests/src/lib.rs b/rs/state_machine_tests/src/lib.rs index 5f0b06f2e370..3ec2e81bd8d0 100644 --- a/rs/state_machine_tests/src/lib.rs +++ b/rs/state_machine_tests/src/lib.rs @@ -3412,6 +3412,7 @@ impl StateMachine { self.checkpointed_tick(); let (h, mut state) = self.state_manager.take_tip(); state.put_canister_state(source_state.canister_state(&canister_id).unwrap().clone()); + *state.canister_priority_mut(canister_id) = *source_state.canister_priority(&canister_id); self.state_manager .commit_and_certify(state, CertificationScope::Full, None); self.state_manager.flush_hash_channel(); diff --git a/rs/state_manager/src/checkpoint.rs b/rs/state_manager/src/checkpoint.rs index 53b8347f3eda..9a1fae3bb3f9 100644 --- a/rs/state_manager/src/checkpoint.rs +++ b/rs/state_manager/src/checkpoint.rs @@ -888,8 +888,14 @@ pub fn load_canister_state( }; let priority = CanisterPriority { accumulated_priority: canister_state_bits.accumulated_priority, - priority_credit: canister_state_bits.priority_credit, - long_execution_mode: canister_state_bits.long_execution_mode, + // We can only be loading an aborted execution, so zero executed slices. + executed_slices: 0, + // Set a long execution start round where necessary. + long_execution_start_round: if canister_state.has_long_execution() { + Some(0.into()) + } else { + None + }, last_full_execution_round: canister_state_bits.last_full_execution_round, }; diff --git a/rs/state_manager/src/split/tests.rs b/rs/state_manager/src/split/tests.rs index c1ba6fd48e8e..9b6f96c3af07 100644 --- a/rs/state_manager/src/split/tests.rs +++ b/rs/state_manager/src/split/tests.rs @@ -299,6 +299,10 @@ fn split_subnet_b_helper(new_subnet_batch_time_delta: Option) { assert_eq!(SUBNET_A, deserialize_split_from(&root, height_b)); } else if file == SYSTEM_METADATA_FILE { let mut expected = SystemMetadata::new(SUBNET_B, SubnetType::Application); + // Populate default priorities for all canisters. + for canister_id in [CANISTER_1, CANISTER_2, CANISTER_3] { + expected.subnet_schedule.get_mut(canister_id); + } // `batch_time` should be the provided `new_subnet_batch_time` (if `Some`); or // else the original subnet's `batch_time`. expected.batch_time = new_subnet_batch_time.unwrap_or(batch_time); diff --git a/rs/state_manager/src/tip.rs b/rs/state_manager/src/tip.rs index bda1e9772647..3aa02b88c9cf 100644 --- a/rs/state_manager/src/tip.rs +++ b/rs/state_manager/src/tip.rs @@ -1216,8 +1216,10 @@ fn serialize_canister_protos_to_checkpoint_readwrite( controllers: canister_state.system_state.controllers.clone(), last_full_execution_round: canister_priority.last_full_execution_round, compute_allocation: canister_state.compute_allocation(), - priority_credit: canister_priority.priority_credit, - long_execution_mode: canister_priority.long_execution_mode, + // Any long execution must have been aborted, priority credit is always zero. + priority_credit: 0.into(), + // Value is ignored when loading. + long_execution_mode: Default::default(), accumulated_priority: canister_priority.accumulated_priority, memory_allocation: canister_state.system_state.memory_allocation, wasm_memory_threshold: canister_state.system_state.wasm_memory_threshold, From 57435b70fbf2a8a885540c4704725b5a122eea25 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Mon, 27 Apr 2026 15:05:28 +0000 Subject: [PATCH 2/2] Fix `//rs/tests/consensus:subnet_splitting_test_colocate`: * Stop populating `SystemMetadata::subnet_schedule` on "subnet B" during a split to be serialized into individual `canister.pbuf` files; it is actually being written to `system_metadata.pbuf` now. * And stop writing the `CanisterPriority` into every `canister.pbuf`, we no longer read it from there. --- rs/replicated_state/src/metadata_state.rs | 5 ----- rs/replicated_state/tests/replicated_state.rs | 3 --- rs/state_manager/src/split/tests.rs | 4 ---- rs/state_manager/src/tip.rs | 17 ++++++----------- 4 files changed, 6 insertions(+), 23 deletions(-) diff --git a/rs/replicated_state/src/metadata_state.rs b/rs/replicated_state/src/metadata_state.rs index da789d137ca6..36af3fa33b13 100644 --- a/rs/replicated_state/src/metadata_state.rs +++ b/rs/replicated_state/src/metadata_state.rs @@ -721,11 +721,6 @@ impl SystemMetadata { // Preserve ingress history. res.ingress_history = self.ingress_history; - // "Preserve" the subnet schedule. This is actually persisted per-canister, as - // part of the respective canister state, so will only actually be retained for - // local canisters. - res.subnet_schedule = self.subnet_schedule; - // Ensure monotonic time for migrated canisters: apply `new_subnet_batch_time` // if specified and not smaller than `self.batch_time`; else, default to // `self.batch_time`. diff --git a/rs/replicated_state/tests/replicated_state.rs b/rs/replicated_state/tests/replicated_state.rs index 8cabf62ba118..75400b49c91b 100644 --- a/rs/replicated_state/tests/replicated_state.rs +++ b/rs/replicated_state/tests/replicated_state.rs @@ -1224,9 +1224,6 @@ fn split() { expected.metadata.ingress_history = fixture.state.metadata.ingress_history.clone(); // And the split marker should be set. expected.metadata.split_from = Some(SUBNET_A); - // The logic preserves the full subnet schedule, even though only the priority - // of `CANISTER_2` will be persisted along with the canister state. - expected.metadata.subnet_schedule = fixture.state.metadata.subnet_schedule; // Otherwise, the state should be the same. assert_eq!(expected, state_b); diff --git a/rs/state_manager/src/split/tests.rs b/rs/state_manager/src/split/tests.rs index 9b6f96c3af07..c1ba6fd48e8e 100644 --- a/rs/state_manager/src/split/tests.rs +++ b/rs/state_manager/src/split/tests.rs @@ -299,10 +299,6 @@ fn split_subnet_b_helper(new_subnet_batch_time_delta: Option) { assert_eq!(SUBNET_A, deserialize_split_from(&root, height_b)); } else if file == SYSTEM_METADATA_FILE { let mut expected = SystemMetadata::new(SUBNET_B, SubnetType::Application); - // Populate default priorities for all canisters. - for canister_id in [CANISTER_1, CANISTER_2, CANISTER_3] { - expected.subnet_schedule.get_mut(canister_id); - } // `batch_time` should be the provided `new_subnet_batch_time` (if `Some`); or // else the original subnet's `batch_time`. expected.batch_time = new_subnet_batch_time.unwrap_or(batch_time); diff --git a/rs/state_manager/src/tip.rs b/rs/state_manager/src/tip.rs index 3aa02b88c9cf..b734d0f9405b 100644 --- a/rs/state_manager/src/tip.rs +++ b/rs/state_manager/src/tip.rs @@ -26,9 +26,7 @@ use ic_replicated_state::page_map::{ MAX_NUMBER_OF_FILES, MergeCandidate, PAGE_SIZE, PageAllocatorFileDescriptor, StorageLayout, StorageResult, }; -use ic_replicated_state::{ - CanisterPriority, CanisterState, NumWasmPages, PageMap, ReplicatedState, -}; +use ic_replicated_state::{CanisterState, NumWasmPages, PageMap, ReplicatedState}; use ic_state_layout::{ CanisterSnapshotBits, CanisterStateBits, CheckpointLayout, ExecutionStateBits, PageMapLayout, ReadOnly, RwPolicy, StateLayout, TipHandler, WasmFile, error::LayoutError, @@ -1079,11 +1077,7 @@ fn serialize_protos_to_checkpoint_readwrite( })?; let results = parallel_map(thread_pool, state.canisters_iter(), |canister_state| { - serialize_canister_protos_to_checkpoint_readwrite( - canister_state, - state.canister_priority(&canister_state.canister_id()), - checkpoint_readwrite, - )?; + serialize_canister_protos_to_checkpoint_readwrite(canister_state, checkpoint_readwrite)?; for canister_snapshot in canister_state.canister_snapshots.iter() { serialize_snapshot_protos_to_checkpoint_readwrite( canister_snapshot.0, @@ -1183,7 +1177,6 @@ fn serialize_snapshot_wasm_binary( fn serialize_canister_protos_to_checkpoint_readwrite( canister_state: &CanisterState, - canister_priority: &CanisterPriority, checkpoint_readwrite: &CheckpointLayout>, ) -> Result<(), CheckpointError> { let canister_id = canister_state.canister_id(); @@ -1214,13 +1207,15 @@ fn serialize_canister_protos_to_checkpoint_readwrite( canister_layout.canister().serialize( CanisterStateBits { controllers: canister_state.system_state.controllers.clone(), - last_full_execution_round: canister_priority.last_full_execution_round, + // Ignored after the first checkpoint load during an upgrade. + last_full_execution_round: 0.into(), compute_allocation: canister_state.compute_allocation(), // Any long execution must have been aborted, priority credit is always zero. priority_credit: 0.into(), // Value is ignored when loading. long_execution_mode: Default::default(), - accumulated_priority: canister_priority.accumulated_priority, + // Ignored after the first checkpoint load during an upgrade. + accumulated_priority: Default::default(), memory_allocation: canister_state.system_state.memory_allocation, wasm_memory_threshold: canister_state.system_state.wasm_memory_threshold, freeze_threshold: canister_state.system_state.freeze_threshold,