Skip to content

Commit

Permalink
feat: EXC-1735: Charge canisters for full execution (#1782)
Browse files Browse the repository at this point in the history
This change applies charges for each fully executed canister. The total
amount of points charged is evenly distributed across canisters, but it
is not included in the compute capacity used to calculate long/new
execution cores.
  • Loading branch information
berestovskyy authored Oct 2, 2024
1 parent 60f1d55 commit fcb7192
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 20 deletions.
1 change: 1 addition & 0 deletions rs/execution_environment/benches/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ fn main() {
let round_schedule = RoundSchedule::new(
scheduler_cores,
long_execution_cores,
0,
ordered_new_execution_canister_ids,
ordered_long_execution_canister_ids,
);
Expand Down
53 changes: 35 additions & 18 deletions rs/execution_environment/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ impl SchedulerImpl {
// `(compute_capacity - total_compute_allocation) * multiplier / number_of_canisters`
// can be simplified to just
// `(compute_capacity - total_compute_allocation) * scheduler_cores`
let free_capacity_per_canister = (compute_capacity_percent
.saturating_sub(total_compute_allocation_percent))
let free_capacity_per_canister = compute_capacity_percent
.saturating_sub(total_compute_allocation_percent)
* scheduler_cores as i64;

// Fully divide the free allocation across all canisters.
Expand Down Expand Up @@ -344,6 +344,7 @@ impl SchedulerImpl {
let round_schedule = RoundSchedule::new(
scheduler_cores,
long_execution_cores,
total_compute_allocation_percent,
round_states
.iter()
.skip(number_of_long_executions)
Expand Down Expand Up @@ -643,7 +644,7 @@ impl SchedulerImpl {
scheduler_round_limits: &mut SchedulerRoundLimits,
registry_settings: &RegistryExecutionSettings,
idkg_subnet_public_keys: &BTreeMap<MasterPublicKeyId, MasterPublicKey>,
) -> (ReplicatedState, BTreeSet<CanisterId>) {
) -> (ReplicatedState, BTreeSet<CanisterId>, BTreeSet<CanisterId>) {
let measurement_scope =
MeasurementScope::nested(&self.metrics.round_inner, root_measurement_scope);
let mut ingress_execution_results = Vec::new();
Expand All @@ -654,6 +655,9 @@ impl SchedulerImpl {

let mut heartbeat_and_timer_canister_ids = BTreeSet::new();
let mut round_executed_canister_ids = BTreeSet::new();
// The set of canisters marked as fully executed: have no messages to execute
// or were scheduled first on a core.
let mut round_fully_executed_canister_ids = BTreeSet::new();

// Start iteration loop:
// - Execute subnet messages.
Expand Down Expand Up @@ -725,6 +729,7 @@ impl SchedulerImpl {
let (
active_canisters,
executed_canister_ids,
fully_executed_canister_ids,
mut loop_ingress_execution_results,
heap_delta,
) = self.execute_canisters_in_inner_round(
Expand All @@ -739,9 +744,10 @@ impl SchedulerImpl {
);
let instructions_consumed = instructions_before - round_limits.instructions;
drop(execution_timer);
round_executed_canister_ids.extend(executed_canister_ids);

let finalization_timer = self.metrics.round_inner_iteration_fin.start_timer();
round_executed_canister_ids.extend(executed_canister_ids);
round_fully_executed_canister_ids.extend(fully_executed_canister_ids);
total_heap_delta += heap_delta;
state.metadata.heap_delta_estimate += heap_delta;

Expand Down Expand Up @@ -852,7 +858,11 @@ impl SchedulerImpl {
.heap_delta_rate_limited_canisters_per_round
.observe(round_filtered_canisters.rate_limited_canister_ids.len() as f64);

(state, round_filtered_canisters.active_canister_ids)
(
state,
round_filtered_canisters.active_canister_ids,
round_fully_executed_canister_ids,
)
}

/// Executes canisters in parallel using the thread pool.
Expand All @@ -879,6 +889,7 @@ impl SchedulerImpl {
) -> (
Vec<CanisterState>,
BTreeSet<CanisterId>,
Vec<CanisterId>,
Vec<(MessageId, IngressStatus)>,
NumBytes,
) {
Expand All @@ -892,6 +903,7 @@ impl SchedulerImpl {
canisters_by_thread.into_iter().flatten().collect(),
BTreeSet::new(),
vec![],
vec![],
NumBytes::from(0),
);
}
Expand Down Expand Up @@ -955,13 +967,15 @@ impl SchedulerImpl {
// Aggregate `results_by_thread` to get the result of this function.
let mut canisters = Vec::new();
let mut executed_canister_ids = BTreeSet::new();
let mut fully_executed_canister_ids = vec![];
let mut ingress_results = Vec::new();
let mut total_instructions_executed = NumInstructions::from(0);
let mut max_instructions_executed_per_thread = NumInstructions::from(0);
let mut heap_delta = NumBytes::from(0);
for mut result in results_by_thread.into_iter() {
canisters.append(&mut result.canisters);
executed_canister_ids.extend(result.executed_canister_ids);
fully_executed_canister_ids.extend(result.fully_executed_canister_ids);
ingress_results.append(&mut result.ingress_results);
let instructions_executed = as_num_instructions(
round_limits_per_thread.instructions - result.round_limits.instructions,
Expand Down Expand Up @@ -995,6 +1009,7 @@ impl SchedulerImpl {
(
canisters,
executed_canister_ids,
fully_executed_canister_ids,
ingress_results,
heap_delta,
)
Expand Down Expand Up @@ -1707,7 +1722,7 @@ impl Scheduler for SchedulerImpl {
};

// Inner round.
let (mut state, active_canister_ids) = self.inner_round(
let (mut state, active_canister_ids, fully_executed_canister_ids) = self.inner_round(
state,
&mut csprng,
&round_schedule,
Expand Down Expand Up @@ -1868,6 +1883,10 @@ impl Scheduler for SchedulerImpl {
.num_canister_snapshots
.set(final_state.canister_snapshots.count() as i64);
}
round_schedule.finish_round(
&mut final_state.canister_states,
fully_executed_canister_ids,
);
self.finish_round(&mut final_state, current_round_type);
final_state
.metadata
Expand Down Expand Up @@ -1943,6 +1962,7 @@ fn observe_instructions_consumed_per_message(
struct ExecutionThreadResult {
canisters: Vec<CanisterState>,
executed_canister_ids: BTreeSet<CanisterId>,
fully_executed_canister_ids: Vec<CanisterId>,
ingress_results: Vec<(MessageId, IngressStatus)>,
slices_executed: NumSlices,
messages_executed: NumMessages,
Expand Down Expand Up @@ -1980,6 +2000,7 @@ fn execute_canisters_on_thread(
// These variables accumulate the results and will be returned at the end.
let mut canisters = vec![];
let mut executed_canister_ids = BTreeSet::new();
let mut fully_executed_canister_ids = vec![];
let mut ingress_results = vec![];
let mut total_slices_executed = NumSlices::from(0);
let mut total_messages_executed = NumMessages::from(0);
Expand Down Expand Up @@ -2092,18 +2113,13 @@ fn execute_canisters_on_thread(
if let Some(es) = &mut canister.execution_state {
es.last_executed_round = round_id;
}
let full_message_execution = match canister.next_execution() {
NextExecution::None => true,
NextExecution::StartNew => false,
// We just finished a full slice of executions.
NextExecution::ContinueLong | NextExecution::ContinueInstallCode => true,
};
let scheduled_first = is_first_iteration && rank == 0;
if full_message_execution || scheduled_first {
// The very first canister is considered to have a full execution round for
// scheduling purposes even if it did not complete within the round.
canister.scheduler_state.last_full_execution_round = round_id;
}
RoundSchedule::finish_canister_execution(
&mut canister,
&mut fully_executed_canister_ids,
round_id,
is_first_iteration,
rank,
);
canister.system_state.canister_metrics.executed += 1;
canisters.push(canister);
round_limits.instructions -=
Expand All @@ -2113,6 +2129,7 @@ fn execute_canisters_on_thread(
ExecutionThreadResult {
canisters,
executed_canister_ids,
fully_executed_canister_ids,
ingress_results,
slices_executed: total_slices_executed,
messages_executed: total_messages_executed,
Expand Down
69 changes: 67 additions & 2 deletions rs/execution_environment/src/scheduler/round_schedule.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap};

use ic_base_types::{CanisterId, NumBytes};
use ic_config::flag_status::FlagStatus;
use ic_replicated_state::{canister_state::NextExecution, CanisterState};
use ic_types::{AccumulatedPriority, ComputeAllocation, LongExecutionMode};
use ic_types::{AccumulatedPriority, ComputeAllocation, ExecutionRound, LongExecutionMode};

/// Round metrics required to prioritize a canister.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -42,6 +42,8 @@ pub struct RoundSchedule {
pub scheduler_cores: usize,
/// Number of cores dedicated for long executions.
pub long_execution_cores: usize,
// Sum of all canisters compute allocation in percent.
pub total_compute_allocation_percent: i64,
/// Ordered Canister IDs with new executions.
pub ordered_new_execution_canister_ids: Vec<CanisterId>,
/// Ordered Canister IDs with long executions.
Expand All @@ -52,13 +54,15 @@ impl RoundSchedule {
pub fn new(
scheduler_cores: usize,
long_execution_cores: usize,
total_compute_allocation_percent: i64,
ordered_new_execution_canister_ids: Vec<CanisterId>,
ordered_long_execution_canister_ids: Vec<CanisterId>,
) -> Self {
RoundSchedule {
scheduler_cores,
long_execution_cores: long_execution_cores
.min(ordered_long_execution_canister_ids.len()),
total_compute_allocation_percent,
ordered_new_execution_canister_ids,
ordered_long_execution_canister_ids,
}
Expand Down Expand Up @@ -174,6 +178,7 @@ impl RoundSchedule {
RoundSchedule::new(
self.scheduler_cores,
self.long_execution_cores,
self.total_compute_allocation_percent,
ordered_new_execution_canister_ids,
ordered_long_execution_canister_ids,
),
Expand Down Expand Up @@ -229,4 +234,64 @@ impl RoundSchedule {

(canisters_partitioned_by_cores, canisters)
}

pub fn finish_canister_execution(
canister: &mut CanisterState,
fully_executed_canister_ids: &mut Vec<CanisterId>,
round_id: ExecutionRound,
is_first_iteration: bool,
rank: usize,
) {
let full_message_execution = match canister.next_execution() {
NextExecution::None => true,
NextExecution::StartNew => false,
// We just finished a full slice of executions.
NextExecution::ContinueLong => true,
NextExecution::ContinueInstallCode => false,
};
let scheduled_first = is_first_iteration && rank == 0;

// The very first canister is considered to have a full execution round for
// scheduling purposes even if it did not complete within the round.
if full_message_execution || scheduled_first {
canister.scheduler_state.last_full_execution_round = round_id;

// We schedule canisters (as opposed to individual messages),
// and we charge for every full execution round.
fully_executed_canister_ids.push(canister.canister_id());
}
}

pub(crate) fn finish_round(
&self,
canister_states: &mut BTreeMap<CanisterId, CanisterState>,
fully_executed_canister_ids: BTreeSet<CanisterId>,
) {
let scheduler_cores = self.scheduler_cores;
let number_of_canisters = canister_states.len();
let multiplier = (scheduler_cores * number_of_canisters).max(1) as i64;

// Charge canisters for full executions in this round.
let mut total_charged_priority = 0;
for canister_id in fully_executed_canister_ids {
if let Some(canister) = canister_states.get_mut(&canister_id) {
total_charged_priority += 100 * multiplier;
canister.scheduler_state.priority_credit += (100 * multiplier).into();
}
}

let total_allocated = self.total_compute_allocation_percent * multiplier;
// Free capacity per canister in multiplied percent.
let free_capacity_per_canister = total_charged_priority.saturating_sub(total_allocated)
/ number_of_canisters.max(1) as i64;
// Fully divide the free allocation across all canisters.
for canister in canister_states.values_mut() {
// De-facto compute allocation includes bonus allocation
let factual = canister.scheduler_state.compute_allocation.as_percent() as i64
* multiplier
+ free_capacity_per_canister;
// Increase accumulated priority by de-facto compute allocation.
canister.scheduler_state.accumulated_priority += factual.into();
}
}
}
93 changes: 93 additions & 0 deletions rs/execution_environment/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5936,3 +5936,96 @@ fn inner_round_long_execution_is_a_full_execution() {
// The accumulated priority invariant should be respected.
assert_eq!(total_accumulated_priority - total_priority_credit, 0);
}

#[test_strategy::proptest(ProptestConfig { cases: 8, ..ProptestConfig::default() })]
fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores: usize) {
let instructions = 20;
let messages_per_round = 2;
let mut test = SchedulerTestBuilder::new()
.with_scheduler_config(SchedulerConfig {
scheduler_cores,
max_instructions_per_round: (instructions * messages_per_round).into(),
max_instructions_per_message: instructions.into(),
max_instructions_per_message_without_dts: instructions.into(),
max_instructions_per_slice: instructions.into(),
instruction_overhead_per_execution: 0.into(),
instruction_overhead_per_canister: 0.into(),
instruction_overhead_per_canister_for_finalization: 0.into(),
..SchedulerConfig::application_subnet()
})
.build();

// Bump up the round number.
test.execute_round(ExecutionRoundType::OrdinaryRound);

// Create `messages_per_round * 2` canisters for each scheduler core.
let num_canisters = scheduler_cores as u64 * messages_per_round * 2;
let mut canister_ids = vec![];
for _ in 0..num_canisters {
let canister_id = test.create_canister();
// Send one messages per canister. Having `max_messages_per_round * 2` canisters,
// only half of them will finish in one round.
test.send_ingress(canister_id, ingress(instructions));
canister_ids.push(canister_id);
}

test.execute_round(ExecutionRoundType::OrdinaryRound);

let mut total_accumulated_priority = 0;
let mut total_priority_credit = 0;
for (i, canister) in test.state().canisters_iter().enumerate() {
if i < num_canisters as usize / 2 {
// The first half of the canisters should finish their messages.
prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 0);
prop_assert_eq!(canister.system_state.canister_metrics.executed, 1);
prop_assert_eq!(
canister.scheduler_state.last_full_execution_round,
test.last_round()
);
} else {
// The second half of the canisters should still have their messages.
prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 1);
prop_assert_eq!(canister.system_state.canister_metrics.executed, 0);
prop_assert_eq!(canister.scheduler_state.last_full_execution_round, 0.into());
}
total_accumulated_priority += canister.scheduler_state.accumulated_priority.get();
total_priority_credit += canister.scheduler_state.priority_credit.get();
}
prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0);

// Send one more message for first half of the canisters.
for (i, canister) in canister_ids.iter().enumerate() {
if i < num_canisters as usize / 2 {
test.send_ingress(*canister, ingress(instructions));
}
}

test.execute_round(ExecutionRoundType::OrdinaryRound);

let mut total_accumulated_priority = 0;
let mut total_priority_credit = 0;
for (i, canister) in test.state().canisters_iter().enumerate() {
// Now all the canisters should be executed once.
prop_assert_eq!(canister.system_state.canister_metrics.executed, 1);
if i < num_canisters as usize / 2 {
// The first half of the canisters should have messages.
prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 1);
// The first half of the canisters should be executed two rounds ago.
prop_assert_eq!(
canister.scheduler_state.last_full_execution_round.get(),
test.last_round().get() - 1
);
} else {
// The second half of the canisters should finish their messages.
prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 0);
// The second half of the canisters should be executed in the last round.
prop_assert_eq!(
canister.scheduler_state.last_full_execution_round,
test.last_round()
);
}
total_accumulated_priority += canister.scheduler_state.accumulated_priority.get();
total_priority_credit += canister.scheduler_state.priority_credit.get();
}
prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0);
}

0 comments on commit fcb7192

Please sign in to comment.