From d4cd725e04cdefb1efff12e0dd45d4bc2106a8fd Mon Sep 17 00:00:00 2001 From: Andriy Berestovskyy Date: Mon, 30 Sep 2024 22:23:27 +0200 Subject: [PATCH] feat: EXC-1735: Move scheduling into the inner round By moving the scheduling strategy into the inner round, we can adjust canister priorities within each round. This allows for greater flexibility and responsiveness to change canister priorities. --- rs/execution_environment/src/scheduler.rs | 68 ++++---- .../src/scheduler/tests.rs | 149 ++++++++++++++---- 2 files changed, 149 insertions(+), 68 deletions(-) diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index ace96c69a1d..094c787fcf0 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -367,16 +367,9 @@ impl SchedulerImpl { let active_cores = scheduler_cores.min(number_of_canisters); for (i, canister_id) in scheduling_order.take(active_cores).enumerate() { let canister_state = canister_states.get_mut(canister_id).unwrap(); - // As top `scheduler_cores` canisters are guaranteed to be scheduled - // this round, their accumulated priorities must be decreased here - // by `capacity * multiplier / scheduler_cores`. But instead this - // value is accumulated in the `priority_credit`, and applied later: - // * For short executions, the `priority_credit` is deducted from - // the `accumulated_priority` at the end of the round. - // * For long executions, the `priority_credit` is accumulated - // for a few rounds, and deducted from the `accumulated_priority` - // at the end of the long execution. - canister_state.scheduler_state.priority_credit += + // Decrease accumulated priorities of the top `scheduler_cores` canisters. + // This is required to respect scheduler invariant after the round is finished. + canister_state.scheduler_state.accumulated_priority -= (compute_capacity_percent * multiplier / active_cores as i64).into(); if i < round_schedule.long_execution_cores { canister_state.scheduler_state.long_execution_mode = @@ -626,9 +619,9 @@ impl SchedulerImpl { #[allow(clippy::too_many_arguments, clippy::type_complexity)] fn inner_round<'a>( &'a self, + round_log: &ReplicaLogger, mut state: ReplicatedState, csprng: &mut Csprng, - round_schedule: &RoundSchedule, current_round: ExecutionRound, root_measurement_scope: &MeasurementScope<'a>, scheduler_round_limits: &mut SchedulerRoundLimits, @@ -700,19 +693,32 @@ impl SchedulerImpl { // Update subnet available memory before taking out the canisters. round_limits.subnet_available_memory = self.exec_env.subnet_available_memory(&state); - let canisters = state.take_canister_states(); - // Obtain the active canisters and update the collection of heap delta rate-limited canisters. - let (active_round_schedule, rate_limited_canister_ids) = round_schedule - .filter_canisters( - &canisters, - self.config.heap_delta_rate_limit, - self.rate_limiting_of_heap_delta, + let mut canisters = state.take_canister_states(); + + // Scheduling. + // TODO(EXC-1617): The scheduling will be optimized in the follow up PRs. + let (mut active_canisters_partitioned_by_cores, inactive_canisters) = { + let _timer = self.metrics.round_scheduling_duration.start_timer(); + let round_schedule = self.apply_scheduling_strategy( + round_log, + self.config.scheduler_cores, + current_round, + self.config.accumulated_priority_reset_interval, + &mut canisters, ); - round_filtered_canisters - .add_canisters(&active_round_schedule, &rate_limited_canister_ids); + // Obtain the active canisters and update the collection + // of heap delta rate-limited canisters. + let (active_round_schedule, rate_limited_canister_ids) = round_schedule + .filter_canisters( + &canisters, + self.config.heap_delta_rate_limit, + self.rate_limiting_of_heap_delta, + ); + round_filtered_canisters + .add_canisters(&active_round_schedule, &rate_limited_canister_ids); - let (mut active_canisters_partitioned_by_cores, inactive_canisters) = - active_round_schedule.partition_canisters_to_cores(canisters); + active_round_schedule.partition_canisters_to_cores(canisters) + }; if is_first_iteration { for partition in active_canisters_partitioned_by_cores.iter_mut() { @@ -1690,27 +1696,11 @@ impl Scheduler for SchedulerImpl { scheduler_round_limits.update_subnet_round_limits(&subnet_round_limits); }; - // Scheduling. - let round_schedule = { - let _timer = self.metrics.round_scheduling_duration.start_timer(); - - let mut canisters = state.take_canister_states(); - let round_schedule_candidate = self.apply_scheduling_strategy( - &round_log, - self.config.scheduler_cores, - current_round, - self.config.accumulated_priority_reset_interval, - &mut canisters, - ); - state.put_canister_states(canisters); - round_schedule_candidate - }; - // Inner round. let (mut state, active_canister_ids) = self.inner_round( + &round_log, state, &mut csprng, - &round_schedule, current_round, &root_measurement_scope, &mut scheduler_round_limits, diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index eb9ddf721da..6d217033a3c 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -65,7 +65,7 @@ fn can_fully_execute_canisters_with_one_input_message_each() { let mut test = SchedulerTestBuilder::new() .with_scheduler_config(SchedulerConfig { scheduler_cores: 2, - max_instructions_per_round: NumInstructions::from(1 << 30), + max_instructions_per_round: NumInstructions::from(10), max_instructions_per_message: NumInstructions::from(5), max_instructions_per_message_without_dts: NumInstructions::from(5), max_instructions_per_slice: NumInstructions::from(5), @@ -1699,21 +1699,15 @@ fn can_fully_execute_multiple_canisters_with_multiple_messages_each() { for canister_state in test.state().canisters_iter() { let system_state = &canister_state.system_state; assert_eq!(system_state.queues().ingress_queue_size(), 0); - assert_eq!( - canister_state.scheduler_state.last_full_execution_round, - ExecutionRound::new(1) - ); - assert_eq!( - system_state - .canister_metrics - .skipped_round_due_to_no_messages, - 0 - ); - assert_eq!(system_state.canister_metrics.executed, 1); - assert_eq!( - system_state.canister_metrics.interrupted_during_execution, - 0 - ); + + let scheduler_state = &canister_state.scheduler_state; + assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); + + let metrics = &system_state.canister_metrics; + // The inner round was skipped once before breaking the round. + assert_eq!(metrics.skipped_round_due_to_no_messages, 1); + assert_eq!(metrics.executed, 1); + assert_eq!(metrics.interrupted_during_execution, 0); } assert_eq!( @@ -4781,28 +4775,31 @@ fn break_after_long_executions(#[strategy(2..10_usize)] scheduler_cores: usize) } /// Scenario: -/// 1. One canister with two long messages `slice + 1` instructions each. +/// 1. Scheduler cores + 1 canisters with two long executions each. /// /// Expectations: /// 1. After the first round the canister should have a paused long execution. -/// 2. After the second round the canister should have no executions, i.e. the +/// 2. After the second round the canister should have no executions, i.e. /// finish the paused execution and should not start any new executions. #[test] fn filter_after_long_executions() { + let scheduler_cores = 2; let max_instructions_per_slice = SchedulerConfig::application_subnet() .max_instructions_per_slice .get(); let mut test = SchedulerTestBuilder::new() .with_scheduler_config(SchedulerConfig { + scheduler_cores, max_instructions_per_round: (max_instructions_per_slice * 2).into(), max_instructions_per_message: (max_instructions_per_slice * 2).into(), max_instructions_per_message_without_dts: max_instructions_per_slice.into(), + max_instructions_per_slice: max_instructions_per_slice.into(), ..SchedulerConfig::application_subnet() }) .build(); - // Create a canister with long messages + // Create scheduler cores + 1 canisters with long executions. let mut long_message_ids = vec![]; let long_canister_id = test.create_canister(); for _ in 0..2 { @@ -4810,21 +4807,32 @@ fn filter_after_long_executions() { test.send_ingress(long_canister_id, ingress(max_instructions_per_slice + 1)); long_message_ids.push(long_message_id); } + for _ in 0..scheduler_cores { + let another_canister_id = test.create_canister(); + for _ in 0..2 { + test.send_ingress(another_canister_id, ingress(max_instructions_per_slice + 1)); + } + } + + // Remember the initial accumulated priority. + let canister = test.state().canister_state(&long_canister_id).unwrap(); + let ap_before_execution = canister.scheduler_state.accumulated_priority; // After the first round the canister should have a paused long execution. test.execute_round(ExecutionRoundType::OrdinaryRound); - for canister in test.state().canisters_iter() { - assert_eq!(canister.system_state.canister_metrics.executed, 1); - assert!(canister.has_paused_execution()); - } + let canister = test.state().canister_state(&long_canister_id).unwrap(); + assert_eq!(canister.system_state.canister_metrics.executed, 1); + assert!(canister.has_paused_execution()); - // After the second round the canister should have no executions, i.e. the - // finish the paused execution and should not start any new executions. + // After the second round the canister should finish the paused execution. test.execute_round(ExecutionRoundType::OrdinaryRound); - for canister in test.state().canisters_iter() { - assert_eq!(canister.system_state.canister_metrics.executed, 2); - assert!(!canister.has_paused_execution()); - } + let canister = test.state().canister_state(&long_canister_id).unwrap(); + assert_eq!(canister.system_state.canister_metrics.executed, 2); + assert!(!canister.has_paused_execution()); + + // The accumulated priority should not increase after the long execution is finished. + let ap_after_execution = canister.scheduler_state.accumulated_priority; + assert!(ap_after_execution <= ap_before_execution); } #[test] @@ -5815,3 +5823,86 @@ fn scheduled_heap_delta_limit_scaling() { assert_eq!(10, scheduled_limit(50, 50, 9, 10, 5)); assert_eq!(10, scheduled_limit(55, 50, 9, 10, 5)); } + +#[test_strategy::proptest(ProptestConfig { cases: 8, ..ProptestConfig::default() })] +fn apply_scheduling_strategy_in_inner_round(#[strategy(2..10_usize)] scheduler_cores: usize) { + fn run_inner_rounds_on_cores(inner_rounds: u64, scheduler_cores: usize) -> SchedulerTest { + let instructions = 20; + let canisters_per_core = 2; + let max_messages_per_round = canisters_per_core * inner_rounds; + let mut test = SchedulerTestBuilder::new() + .with_scheduler_config(SchedulerConfig { + scheduler_cores, + max_instructions_per_round: (instructions * max_messages_per_round).into(), + max_instructions_per_message: instructions.into(), + max_instructions_per_message_without_dts: instructions.into(), + max_instructions_per_slice: instructions.into(), + instruction_overhead_per_execution: NumInstructions::from(0), + instruction_overhead_per_canister: NumInstructions::from(0), + instruction_overhead_per_canister_for_finalization: NumInstructions::from(0), + ..SchedulerConfig::application_subnet() + }) + .build(); + + // Bump up the round number. + test.execute_round(ExecutionRoundType::OrdinaryRound); + + // Create 2 canisters for each scheduler core. + let num_canisters = scheduler_cores as u64 * canisters_per_core; + let mut canister_ids = vec![]; + for _ in 0..num_canisters { + canister_ids.push(test.create_canister()); + } + + // Send a self call message to each canister. Having 2 canisters + // on each scheduler core and `2 * inner_rounds`` messages per round, + // all canisters should be executed in each inner round. + for canister_id in canister_ids.iter() { + let message = ingress(instructions).call( + other_side(*canister_id, instructions - 1), + on_response(instructions - 2), + ); + test.send_ingress(*canister_id, message); + } + + test.execute_round(ExecutionRoundType::OrdinaryRound); + test + } + + let test = run_inner_rounds_on_cores(1, scheduler_cores); + let mut total_accumulated_priority = 0; + for (i, canister) in test.state().canisters_iter().enumerate() { + let system_state = &canister.system_state; + let scheduler_state = &canister.scheduler_state; + // After just one inner round, the first `scheduler_cores` canisters + // should be charged for execution. + if i < scheduler_cores { + prop_assert!(scheduler_state.accumulated_priority < 0.into()); + } else { + prop_assert!(scheduler_state.accumulated_priority > 0.into()); + } + // All ingresses should be executed in the previous round. + prop_assert_eq!(system_state.queues().ingress_queue_size(), 0); + prop_assert_eq!(system_state.canister_metrics.executed, 1); + prop_assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); + total_accumulated_priority += scheduler_state.accumulated_priority.get(); + } + // The accumulated priority invariant should be respected. + prop_assert_eq!(total_accumulated_priority, 0); + + let test = run_inner_rounds_on_cores(2, scheduler_cores); + let mut total_accumulated_priority = 0; + for canister in test.state().canisters_iter() { + let system_state = &canister.system_state; + let scheduler_state = &canister.scheduler_state; + // After two inner rounds, all canisters should be charged for execution. + prop_assert!(scheduler_state.accumulated_priority == 0.into()); + // All ingresses should be executed twice in the previous round. + prop_assert_eq!(system_state.queues().ingress_queue_size(), 0); + prop_assert_eq!(system_state.canister_metrics.executed, 2); + prop_assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); + total_accumulated_priority += scheduler_state.accumulated_priority.get(); + } + // The accumulated priority invariant should be respected. + prop_assert_eq!(total_accumulated_priority, 0); +}