diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index ace96c69a1d..4f1efdd8c3d 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -626,9 +626,9 @@ impl SchedulerImpl { #[allow(clippy::too_many_arguments, clippy::type_complexity)] fn inner_round<'a>( &'a self, + round_log: &ReplicaLogger, mut state: ReplicatedState, csprng: &mut Csprng, - round_schedule: &RoundSchedule, current_round: ExecutionRound, root_measurement_scope: &MeasurementScope<'a>, scheduler_round_limits: &mut SchedulerRoundLimits, @@ -700,19 +700,32 @@ impl SchedulerImpl { // Update subnet available memory before taking out the canisters. round_limits.subnet_available_memory = self.exec_env.subnet_available_memory(&state); - let canisters = state.take_canister_states(); - // Obtain the active canisters and update the collection of heap delta rate-limited canisters. - let (active_round_schedule, rate_limited_canister_ids) = round_schedule - .filter_canisters( - &canisters, - self.config.heap_delta_rate_limit, - self.rate_limiting_of_heap_delta, + let mut canisters = state.take_canister_states(); + + // Scheduling. + // TODO(EXC-1617): The scheduling will be optimized in the follow up PRs. + let (mut active_canisters_partitioned_by_cores, inactive_canisters) = { + let _timer = self.metrics.round_scheduling_duration.start_timer(); + let round_schedule = self.apply_scheduling_strategy( + round_log, + self.config.scheduler_cores, + current_round, + self.config.accumulated_priority_reset_interval, + &mut canisters, ); - round_filtered_canisters - .add_canisters(&active_round_schedule, &rate_limited_canister_ids); + // Obtain the active canisters and update the collection + // of heap delta rate-limited canisters. + let (active_round_schedule, rate_limited_canister_ids) = round_schedule + .filter_canisters( + &canisters, + self.config.heap_delta_rate_limit, + self.rate_limiting_of_heap_delta, + ); + round_filtered_canisters + .add_canisters(&active_round_schedule, &rate_limited_canister_ids); - let (mut active_canisters_partitioned_by_cores, inactive_canisters) = - active_round_schedule.partition_canisters_to_cores(canisters); + active_round_schedule.partition_canisters_to_cores(canisters) + }; if is_first_iteration { for partition in active_canisters_partitioned_by_cores.iter_mut() { @@ -1690,27 +1703,11 @@ impl Scheduler for SchedulerImpl { scheduler_round_limits.update_subnet_round_limits(&subnet_round_limits); }; - // Scheduling. - let round_schedule = { - let _timer = self.metrics.round_scheduling_duration.start_timer(); - - let mut canisters = state.take_canister_states(); - let round_schedule_candidate = self.apply_scheduling_strategy( - &round_log, - self.config.scheduler_cores, - current_round, - self.config.accumulated_priority_reset_interval, - &mut canisters, - ); - state.put_canister_states(canisters); - round_schedule_candidate - }; - // Inner round. let (mut state, active_canister_ids) = self.inner_round( + &round_log, state, &mut csprng, - &round_schedule, current_round, &root_measurement_scope, &mut scheduler_round_limits, diff --git a/rs/execution_environment/src/scheduler/round_schedule.rs b/rs/execution_environment/src/scheduler/round_schedule.rs index 13e21c48848..70f5a4ee1b6 100644 --- a/rs/execution_environment/src/scheduler/round_schedule.rs +++ b/rs/execution_environment/src/scheduler/round_schedule.rs @@ -143,39 +143,12 @@ impl RoundSchedule { .cloned() .collect(); - let ordered_long_execution_canister_ids = self - .ordered_long_execution_canister_ids - .iter() - .filter( - |canister_id| match canister_next_executions.get(canister_id) { - Some(NextExecution::ContinueLong) => true, - - // We expect long execution, but there is none, - // so the long execution was finished in the - // previous inner round. - // - // We should avoid scheduling this canister to: - // 1. Avoid the canister to bypass the logic in - // `apply_scheduling_strategy()`. - // 2. Charge canister for resources at the end - // of the round. - Some(NextExecution::StartNew) => false, - - None // No such canister. Should not happen. - | Some(NextExecution::None) // Idle canister. - | Some(NextExecution::ContinueInstallCode) // Subnet message. - => false, - }, - ) - .cloned() - .collect(); - ( RoundSchedule::new( self.scheduler_cores, self.long_execution_cores, ordered_new_execution_canister_ids, - ordered_long_execution_canister_ids, + self.ordered_long_execution_canister_ids.clone(), ), rate_limited_canister_ids, ) diff --git a/rs/execution_environment/src/scheduler/scheduler_metrics.rs b/rs/execution_environment/src/scheduler/scheduler_metrics.rs index 2d08e6703f4..89be50231a0 100644 --- a/rs/execution_environment/src/scheduler/scheduler_metrics.rs +++ b/rs/execution_environment/src/scheduler/scheduler_metrics.rs @@ -474,7 +474,7 @@ impl SchedulerMetrics { }, round_scheduling_duration: duration_histogram( "execution_round_scheduling_duration_seconds", - "The duration of execution round scheduling in seconds.", + "The duration of execution inner round scheduling in seconds.", metrics_registry, ), round_update_signature_request_contexts_duration: duration_histogram( diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index eb9ddf721da..78af02171d9 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -65,7 +65,7 @@ fn can_fully_execute_canisters_with_one_input_message_each() { let mut test = SchedulerTestBuilder::new() .with_scheduler_config(SchedulerConfig { scheduler_cores: 2, - max_instructions_per_round: NumInstructions::from(1 << 30), + max_instructions_per_round: NumInstructions::from(10), max_instructions_per_message: NumInstructions::from(5), max_instructions_per_message_without_dts: NumInstructions::from(5), max_instructions_per_slice: NumInstructions::from(5), @@ -1699,21 +1699,15 @@ fn can_fully_execute_multiple_canisters_with_multiple_messages_each() { for canister_state in test.state().canisters_iter() { let system_state = &canister_state.system_state; assert_eq!(system_state.queues().ingress_queue_size(), 0); - assert_eq!( - canister_state.scheduler_state.last_full_execution_round, - ExecutionRound::new(1) - ); - assert_eq!( - system_state - .canister_metrics - .skipped_round_due_to_no_messages, - 0 - ); - assert_eq!(system_state.canister_metrics.executed, 1); - assert_eq!( - system_state.canister_metrics.interrupted_during_execution, - 0 - ); + + let scheduler_state = &canister_state.scheduler_state; + assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); + + let metrics = &system_state.canister_metrics; + // The inner round was skipped once before breaking the round. + assert_eq!(metrics.skipped_round_due_to_no_messages, 1); + assert_eq!(metrics.executed, 1); + assert_eq!(metrics.interrupted_during_execution, 0); } assert_eq!( @@ -4781,28 +4775,31 @@ fn break_after_long_executions(#[strategy(2..10_usize)] scheduler_cores: usize) } /// Scenario: -/// 1. One canister with two long messages `slice + 1` instructions each. +/// 1. Scheduler cores + 1 canisters with two long executions each. /// /// Expectations: /// 1. After the first round the canister should have a paused long execution. -/// 2. After the second round the canister should have no executions, i.e. the +/// 2. After the second round the canister should have no executions, i.e. /// finish the paused execution and should not start any new executions. #[test] fn filter_after_long_executions() { + let scheduler_cores = 2; let max_instructions_per_slice = SchedulerConfig::application_subnet() .max_instructions_per_slice .get(); let mut test = SchedulerTestBuilder::new() .with_scheduler_config(SchedulerConfig { + scheduler_cores, max_instructions_per_round: (max_instructions_per_slice * 2).into(), max_instructions_per_message: (max_instructions_per_slice * 2).into(), max_instructions_per_message_without_dts: max_instructions_per_slice.into(), + max_instructions_per_slice: max_instructions_per_slice.into(), ..SchedulerConfig::application_subnet() }) .build(); - // Create a canister with long messages + // Create scheduler cores + 1 canisters with long executions. let mut long_message_ids = vec![]; let long_canister_id = test.create_canister(); for _ in 0..2 { @@ -4810,21 +4807,34 @@ fn filter_after_long_executions() { test.send_ingress(long_canister_id, ingress(max_instructions_per_slice + 1)); long_message_ids.push(long_message_id); } + for _ in 0..scheduler_cores { + let another_canister_id = test.create_canister(); + for _ in 0..2 { + test.send_ingress(another_canister_id, ingress(max_instructions_per_slice + 1)); + } + } + + // Remember the initial accumulated priority. + let canister = test.state().canister_state(&long_canister_id).unwrap(); + let ap_before_execution = + canister.scheduler_state.accumulated_priority - canister.scheduler_state.priority_credit; // After the first round the canister should have a paused long execution. test.execute_round(ExecutionRoundType::OrdinaryRound); - for canister in test.state().canisters_iter() { - assert_eq!(canister.system_state.canister_metrics.executed, 1); - assert!(canister.has_paused_execution()); - } + let canister = test.state().canister_state(&long_canister_id).unwrap(); + assert_eq!(canister.system_state.canister_metrics.executed, 1); + assert!(canister.has_paused_execution()); - // After the second round the canister should have no executions, i.e. the - // finish the paused execution and should not start any new executions. + // After the second round the canister should finish the paused execution. test.execute_round(ExecutionRoundType::OrdinaryRound); - for canister in test.state().canisters_iter() { - assert_eq!(canister.system_state.canister_metrics.executed, 2); - assert!(!canister.has_paused_execution()); - } + let canister = test.state().canister_state(&long_canister_id).unwrap(); + assert_eq!(canister.system_state.canister_metrics.executed, 2); + assert!(!canister.has_paused_execution()); + + // The accumulated priority should not increase after the long execution is finished. + let ap_after_execution = + canister.scheduler_state.accumulated_priority - canister.scheduler_state.priority_credit; + assert!(ap_after_execution <= ap_before_execution); } #[test] @@ -5815,3 +5825,96 @@ fn scheduled_heap_delta_limit_scaling() { assert_eq!(10, scheduled_limit(50, 50, 9, 10, 5)); assert_eq!(10, scheduled_limit(55, 50, 9, 10, 5)); } + +#[test_strategy::proptest(ProptestConfig { cases: 8, ..ProptestConfig::default() })] +fn apply_scheduling_strategy_in_inner_round(#[strategy(2..10_usize)] scheduler_cores: usize) { + fn run_inner_rounds_on_cores(inner_rounds: u64, scheduler_cores: usize) -> SchedulerTest { + let instructions = 20; + let canisters_per_core = 2; + let max_messages_per_round = canisters_per_core * inner_rounds; + let mut test = SchedulerTestBuilder::new() + .with_scheduler_config(SchedulerConfig { + scheduler_cores, + max_instructions_per_round: (instructions * max_messages_per_round).into(), + max_instructions_per_message: instructions.into(), + max_instructions_per_message_without_dts: instructions.into(), + max_instructions_per_slice: instructions.into(), + instruction_overhead_per_execution: NumInstructions::from(0), + instruction_overhead_per_canister: NumInstructions::from(0), + instruction_overhead_per_canister_for_finalization: NumInstructions::from(0), + ..SchedulerConfig::application_subnet() + }) + .build(); + + // Bump up the round number. + test.execute_round(ExecutionRoundType::OrdinaryRound); + + // Create 2 canisters for each scheduler core. + let num_canisters = scheduler_cores as u64 * canisters_per_core; + let mut canister_ids = vec![]; + for _ in 0..num_canisters { + canister_ids.push(test.create_canister()); + } + + // Send a self call message to each canister. Having 2 canisters + // on each scheduler core and `2 * inner_rounds`` messages per round, + // all canisters should be executed in each inner round. + for canister_id in canister_ids.iter() { + let message = ingress(instructions).call( + other_side(*canister_id, instructions - 1), + on_response(instructions - 2), + ); + test.send_ingress(*canister_id, message); + } + + test.execute_round(ExecutionRoundType::OrdinaryRound); + test + } + + let test = run_inner_rounds_on_cores(1, scheduler_cores); + let mut total_accumulated_priority = 0; + let mut total_priority_credit = 0; + for (i, canister) in test.state().canisters_iter().enumerate() { + let system_state = &canister.system_state; + let scheduler_state = &canister.scheduler_state; + // After just one inner round, the first `scheduler_cores` canisters + // should be charged for execution. + if i < scheduler_cores { + prop_assert!( + scheduler_state.accumulated_priority - scheduler_state.priority_credit < 0.into() + ); + } else { + prop_assert!( + scheduler_state.accumulated_priority + scheduler_state.priority_credit > 0.into() + ); + } + // All ingresses should be executed in the previous round. + prop_assert_eq!(system_state.queues().ingress_queue_size(), 0); + prop_assert_eq!(system_state.canister_metrics.executed, 1); + prop_assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); + total_accumulated_priority += scheduler_state.accumulated_priority.get(); + total_priority_credit += scheduler_state.priority_credit.get(); + } + // The accumulated priority invariant should be respected. + prop_assert_eq!(total_accumulated_priority, total_priority_credit); + + let test = run_inner_rounds_on_cores(2, scheduler_cores); + let mut total_accumulated_priority = 0; + let mut total_priority_credit = 0; + for canister in test.state().canisters_iter() { + let system_state = &canister.system_state; + let scheduler_state = &canister.scheduler_state; + // After two inner rounds, all canisters should be charged for execution. + prop_assert!( + scheduler_state.accumulated_priority - scheduler_state.priority_credit == 0.into() + ); + // All ingresses should be executed twice in the previous round. + prop_assert_eq!(system_state.queues().ingress_queue_size(), 0); + prop_assert_eq!(system_state.canister_metrics.executed, 2); + prop_assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); + total_accumulated_priority += scheduler_state.accumulated_priority.get(); + total_priority_credit += scheduler_state.priority_credit.get(); + } + // The accumulated priority invariant should be respected. + prop_assert_eq!(total_accumulated_priority, total_priority_credit); +}