Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rs/execution_environment/benches/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ fn main() {
&executed_canisters,
&executed_canisters,
&BTreeSet::new(),
ExecutionRound::from(1),
);
});
});
Expand All @@ -89,6 +90,7 @@ fn main() {
&executed_canisters,
&executed_canisters,
&BTreeSet::new(),
ExecutionRound::from(1),
);

group.bench_function("finish_round", |bench| {
Expand Down
5 changes: 3 additions & 2 deletions rs/execution_environment/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ impl SchedulerImpl {
&executed_canisters,
&canisters_with_completed_messages,
&low_cycle_balance_canisters,
current_round,
);

round_limits.instructions -= as_round_instructions(
Expand Down Expand Up @@ -2210,10 +2211,10 @@ fn abort_canister(
cost_schedule: CanisterCyclesCostSchedule,
) {
if exec_env.abort_canister(canister, log, cost_schedule) {
// Reset the priority credit to zero.
// Reset `executed_slices` to zero.
subnet_schedule
.get_mut(canister.canister_id())
.priority_credit = Default::default();
.executed_slices = 0;
}
}

Expand Down
133 changes: 77 additions & 56 deletions rs/execution_environment/src/scheduler/round_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use ic_config::flag_status::FlagStatus;
use ic_logger::{ReplicaLogger, error};
use ic_replicated_state::canister_state::NextExecution;
use ic_replicated_state::{CanisterPriority, CanisterState, ReplicatedState};
use ic_types::{
AccumulatedPriority, ComputeAllocation, ExecutionRound, LongExecutionMode, NumInstructions,
};
use ic_types::{AccumulatedPriority, ComputeAllocation, ExecutionRound, NumInstructions};
use ic_utils::iter::left_outer_join;
use more_asserts::debug_assert_gt;
use num_traits::SaturatingSub;
Expand All @@ -30,7 +28,8 @@ const ZERO: AccumulatedPriority = AccumulatedPriority::new(0);
const ONE_PERCENT: AccumulatedPriority = AccumulatedPriority::new(1 * MULTIPLIER);

/// 100% in accumulated priority.
const ONE_HUNDRED_PERCENT: AccumulatedPriority = AccumulatedPriority::new(100 * MULTIPLIER);
pub(super) const ONE_HUNDRED_PERCENT: AccumulatedPriority =
AccumulatedPriority::new(100 * MULTIPLIER);

const fn from_ca(ca: ComputeAllocation) -> AccumulatedPriority {
AccumulatedPriority::new(ca.as_percent() as i64 * MULTIPLIER)
Expand All @@ -47,21 +46,32 @@ pub(super) struct CanisterRoundState {
accumulated_priority: AccumulatedPriority,
/// Copy of the canister's `SchedulerState::compute_allocation`.
compute_allocation: AccumulatedPriority,
/// Copy of the canister's `CanisterPriority::long_execution_mode`.
long_execution_mode: LongExecutionMode,
/// Whether the canister has a long execution.
has_long_execution: bool,
/// Number of DTS slices executed so far in the current long execution.
executed_slices: i64,
/// The round when the current long execution started. `None` means the canister
/// is not in a long execution.
long_execution_start_round: Option<ExecutionRound>,
}

impl CanisterRoundState {
pub fn new(canister: &CanisterState, canister_priority: &CanisterPriority) -> Self {
// Ensure that `long_execution_start_round` matches the canister state.
debug_assert_eq!(
canister.has_long_execution(),
canister_priority.long_execution_start_round.is_some(),
"canister: {:?}, task_queue: {:?}, canister_priority: {:?}",
canister.canister_id(),
canister.system_state.task_queue,
canister_priority,
);

let compute_allocation = from_ca(canister.compute_allocation());
Self {
canister_id: canister.canister_id(),
accumulated_priority: canister_priority.accumulated_priority + compute_allocation,
compute_allocation,
long_execution_mode: canister_priority.long_execution_mode,
has_long_execution: canister.has_long_execution(),
executed_slices: canister_priority.executed_slices,
long_execution_start_round: canister_priority.long_execution_start_round,
}
}

Expand All @@ -72,17 +82,34 @@ impl CanisterRoundState {

impl Ord for CanisterRoundState {
fn cmp(&self, other: &Self) -> Ordering {
// Sort by:
// 1. Long execution mode, reversed (Prioritized -> Opportunistic)
other
.long_execution_mode
.cmp(&self.long_execution_mode)
// 2. Long execution (long execution -> new execution)
.then(other.has_long_execution.cmp(&self.has_long_execution))
// 3. Accumulated priority, descending.
.then(other.accumulated_priority.cmp(&self.accumulated_priority))
// 4. Canister ID, ascending.
.then(self.canister_id.cmp(&other.canister_id))
// First, sort long executions before new.
match (
self.long_execution_start_round,
other.long_execution_start_round,
) {
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,

// Among new executions, sort by AP descending and break ties by canister ID.
(None, None) => other
.accumulated_priority
.cmp(&self.accumulated_priority)
.then_with(|| self.canister_id.cmp(&other.canister_id)),

// Among long executions, sort by executed slices; AP descending; start round
// ascending; then break ties by canister ID.
//
// An aborted execution (executed slices == 0) is considered to have the same
// priority as a newly started long execution (executed slices == 1). This is to
// avoid starvation of aborted executions.
(Some(self_start_round), Some(other_start_round)) => other
.executed_slices
.max(1)
.cmp(&self.executed_slices.max(1))
.then_with(|| other.accumulated_priority.cmp(&self.accumulated_priority))
.then_with(|| self_start_round.cmp(&other_start_round))
.then_with(|| self.canister_id.cmp(&other.canister_id)),
}
}
}

Expand Down Expand Up @@ -269,7 +296,7 @@ impl RoundSchedule {
state: &mut ReplicatedState,
is_first_iteration: bool,
) -> IterationSchedule {
let (canister_states, subnet_schedule) = state.canisters_and_schedule_mut();
let (canister_states, _) = state.canisters_and_schedule_mut();

// Collect all active canisters and their next executions.
let canister_next_executions: BTreeMap<_, _> = canister_states
Expand Down Expand Up @@ -345,11 +372,6 @@ impl RoundSchedule {
.take(long_execution_cores)
.for_each(|canister| {
observe_scheduled_as_first(canister);

// And set prioritized long execution mode for the first `long_execution_cores`
// canisters.
subnet_schedule.get_mut(*canister).long_execution_mode =
LongExecutionMode::Prioritized;
});
schedule
.iter()
Expand All @@ -376,16 +398,20 @@ impl RoundSchedule {
executed_canisters: &BTreeSet<CanisterId>,
canisters_with_completed_messages: &BTreeSet<CanisterId>,
low_cycle_balance_canisters: &BTreeSet<CanisterId>,
current_round: ExecutionRound,
) {
self.executed_canisters.extend(executed_canisters);
self.canisters_with_completed_messages
.extend(canisters_with_completed_messages);

for canister_id in canisters_with_completed_messages {
// If a canister has completed a long execution, reset its long execution mode.
// If a canister has completed a long execution, clear its start round.
//
// A canister may run out of cycles while in a long execution (e.g. if making
// calls). Also include low cycle balance canisters.
for canister_id in canisters_with_completed_messages.union(low_cycle_balance_canisters) {
state
.canister_priority_mut(*canister_id)
.long_execution_mode = LongExecutionMode::Opportunistic;
.long_execution_start_round = None;
}

for canister_id in executed_canisters.union(low_cycle_balance_canisters) {
Expand All @@ -394,9 +420,17 @@ impl RoundSchedule {
.map(|canister| canister.next_execution())
.unwrap_or(NextExecution::None)
{
// Either all messages or a full long execution slice completed.
NextExecution::None | NextExecution::ContinueLong => {
// Completed all messages.
NextExecution::None => {
self.fully_executed_canisters.insert(*canister_id);
}
// Completed a long execution slice.
NextExecution::ContinueLong => {
self.fully_executed_canisters.insert(*canister_id);
state
.canister_priority_mut(*canister_id)
.long_execution_start_round
.get_or_insert(current_round);
}
NextExecution::StartNew => {}
NextExecution::ContinueInstallCode => {
Expand All @@ -420,10 +454,10 @@ impl RoundSchedule {
) {
let (canister_states, subnet_schedule) = state.canisters_and_schedule_mut();

// Charge canisters for full executions in this round.
// Update fully executed canisters' priorities.
for canister_id in self.fully_executed_canisters.iter() {
let canister_priority = subnet_schedule.get_mut(*canister_id);
canister_priority.priority_credit += ONE_HUNDRED_PERCENT;
canister_priority.executed_slices += 1;
canister_priority.last_full_execution_round = current_round;
}

Expand All @@ -438,20 +472,20 @@ impl RoundSchedule {

canister_priority.accumulated_priority += from_ca(canister.compute_allocation());

// Apply the priority credit if not in the same long execution as at the
// beginning of the round.
if canister_priority.priority_credit != ZERO
// On message completion (or short execution), charge for the executed slices.
if canister_priority.executed_slices > 0
&& (!canister.has_long_execution()
|| self
.canisters_with_completed_messages
.contains(&canister.canister_id()))
{
canister_priority.accumulated_priority -=
std::mem::take(&mut canister_priority.priority_credit);
ONE_HUNDRED_PERCENT * canister_priority.executed_slices;
canister_priority.executed_slices = 0;
}

free_allocation -=
canister_priority.accumulated_priority - canister_priority.priority_credit;
free_allocation -= canister_priority.accumulated_priority
- ONE_HUNDRED_PERCENT * canister_priority.executed_slices;
}

self.grant_heap_delta_and_install_code_credits(state, metrics);
Expand Down Expand Up @@ -615,7 +649,6 @@ impl RoundSchedule {
for &canister_id in canister_states.keys() {
let canister_priority = subnet_schedule.get_mut(canister_id);
canister_priority.accumulated_priority = Default::default();
canister_priority.priority_credit = Default::default();
}
}

Expand Down Expand Up @@ -672,7 +705,7 @@ impl RoundSchedule {
// De-facto compute allocation includes bonus allocation
let factual = rs.compute_allocation + free_capacity_per_canister;
// Count long executions and sum up their compute allocation.
if rs.has_long_execution {
if rs.long_execution_start_round.is_some() {
long_executions_compute_allocation += factual;
number_of_long_executions += 1;
}
Expand Down Expand Up @@ -706,7 +739,7 @@ impl RoundSchedule {
);

round_states.sort();
let round_schedule = RoundSchedule::new(
RoundSchedule::new(
scheduler_cores,
heap_delta_rate_limit,
rate_limiting_of_heap_delta,
Expand All @@ -723,19 +756,7 @@ impl RoundSchedule {
.take(number_of_long_executions)
.map(|rs| rs.canister_id)
.collect(),
);

for canister_id in round_schedule
.ordered_long_execution_canister_ids
.iter()
.take(long_execution_cores)
{
state
.canister_priority_mut(*canister_id)
.long_execution_mode = LongExecutionMode::Prioritized;
}

round_schedule
)
}
}

Expand Down
3 changes: 3 additions & 0 deletions rs/execution_environment/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ fn state_sync_clears_paused_execution_registry() {
// state with the clean copy. The registry still holds the orphaned paused
// execution entry.
test.state_mut().put_canister_state(clean_canister);
let canister_priority = test.state_mut().canister_priority_mut(canister);
canister_priority.long_execution_start_round = None;
canister_priority.executed_slices = 0;
assert!(!test.canister_state(canister).has_long_execution());

// Execute another round. The scheduler detects that no canister has a paused
Expand Down
Loading
Loading