From ba5ffe01aea846ec7bfffc67334ba5c386138c88 Mon Sep 17 00:00:00 2001 From: Andriy Berestovskyy Date: Tue, 1 Oct 2024 22:06:59 +0200 Subject: [PATCH] fix: EXC-1741: Fix full execution round definition (#1772) The full execution round is: 1. When a canister is scheduled first on a CPU core, during the first inner round. 2. When a canister has nothing to execute (NextExecution::None) or canister just executed a full slice of instructions. --- rs/execution_environment/src/scheduler.rs | 13 +- .../src/scheduler/tests.rs | 121 ++++++++++++++++++ 2 files changed, 133 insertions(+), 1 deletion(-) diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index 2f61d703f4a..c3e6a72cbac 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -735,6 +735,7 @@ impl SchedulerImpl { &measurement_scope, &mut round_limits, registry_settings.subnet_size, + is_first_iteration, ); let instructions_consumed = instructions_before - round_limits.instructions; drop(execution_timer); @@ -874,6 +875,7 @@ impl SchedulerImpl { measurement_scope: &MeasurementScope, round_limits: &mut RoundLimits, subnet_size: usize, + is_first_iteration: bool, ) -> ( Vec, BTreeSet, @@ -943,6 +945,7 @@ impl SchedulerImpl { deterministic_time_slicing, round_limits, subnet_size, + is_first_iteration, ); }); } @@ -1967,6 +1970,7 @@ fn execute_canisters_on_thread( deterministic_time_slicing: FlagStatus, mut round_limits: RoundLimits, subnet_size: usize, + is_first_iteration: bool, ) -> ExecutionThreadResult { // Since this function runs on a helper thread, we cannot use a nested scope // here. Instead, we propagate metrics to the outer scope manually via @@ -2088,7 +2092,14 @@ fn execute_canisters_on_thread( if let Some(es) = &mut canister.execution_state { es.last_executed_round = round_id; } - if !canister.has_input() || rank == 0 { + let full_message_execution = match canister.next_execution() { + NextExecution::None => true, + NextExecution::StartNew => false, + // We just finished a full slice of executions. + NextExecution::ContinueLong | NextExecution::ContinueInstallCode => true, + }; + let scheduled_first = is_first_iteration && rank == 0; + if full_message_execution || scheduled_first { // The very first canister is considered to have a full execution round for // scheduling purposes even if it did not complete within the round. canister.scheduler_state.last_full_execution_round = round_id; diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index eb9ddf721da..3af9951d31b 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -5815,3 +5815,124 @@ fn scheduled_heap_delta_limit_scaling() { assert_eq!(10, scheduled_limit(50, 50, 9, 10, 5)); assert_eq!(10, scheduled_limit(55, 50, 9, 10, 5)); } + +#[test] +fn inner_round_first_execution_is_not_a_full_execution() { + let scheduler_cores = 2; + let instructions = 20; + let max_messages_per_round = 3; + let mut test = SchedulerTestBuilder::new() + .with_scheduler_config(SchedulerConfig { + scheduler_cores, + max_instructions_per_round: (instructions * max_messages_per_round).into(), + max_instructions_per_message: instructions.into(), + max_instructions_per_message_without_dts: instructions.into(), + max_instructions_per_slice: instructions.into(), + instruction_overhead_per_execution: NumInstructions::from(0), + instruction_overhead_per_canister: NumInstructions::from(0), + instruction_overhead_per_canister_for_finalization: NumInstructions::from(0), + ..SchedulerConfig::application_subnet() + }) + .build(); + + // Bump up the round number. + test.execute_round(ExecutionRoundType::OrdinaryRound); + + // Create `scheduler_cores * 2` canisters, so target canister is not scheduled first. + let mut canister_ids = vec![]; + for _ in 0..scheduler_cores * 2 { + canister_ids.push(test.create_canister()); + } + // Create target canister after. + let target_id = test.create_canister(); + // Send messages to the target canister. + for canister_id in &canister_ids { + let message = ingress(instructions).call( + other_side(target_id, instructions - 1), + on_response(instructions - 2), + ); + test.send_ingress(*canister_id, message); + } + + test.execute_round(ExecutionRoundType::OrdinaryRound); + + let mut total_accumulated_priority = 0; + let mut total_priority_credit = 0; + for canister in test.state().canisters_iter() { + let system_state = &canister.system_state; + let scheduler_state = &canister.scheduler_state; + // All ingresses should be executed in the previous round. + assert_eq!(system_state.queues().ingress_queue_size(), 0); + assert_eq!(system_state.canister_metrics.executed, 1); + if canister.canister_id() == target_id { + // The target canister, despite being executed first in the second inner round, + // should not be marked as fully executed. + assert_ne!(test.last_round(), 0.into()); + assert_eq!(scheduler_state.last_full_execution_round, 0.into()); + } else { + assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); + } + total_accumulated_priority += scheduler_state.accumulated_priority.get(); + total_priority_credit += scheduler_state.priority_credit.get(); + } + // The accumulated priority invariant should be respected. + assert_eq!(total_accumulated_priority - total_priority_credit, 0); +} + +#[test] +fn inner_round_long_execution_is_a_full_execution() { + let scheduler_cores = 2; + let slice = 20; + let mut test = SchedulerTestBuilder::new() + .with_scheduler_config(SchedulerConfig { + scheduler_cores, + max_instructions_per_round: (slice * 2).into(), + max_instructions_per_message: (slice * 10).into(), + max_instructions_per_message_without_dts: slice.into(), + max_instructions_per_slice: slice.into(), + instruction_overhead_per_execution: NumInstructions::from(0), + instruction_overhead_per_canister: NumInstructions::from(0), + instruction_overhead_per_canister_for_finalization: NumInstructions::from(0), + ..SchedulerConfig::application_subnet() + }) + .build(); + + // Bump up the round number. + test.execute_round(ExecutionRoundType::OrdinaryRound); + + // Create `scheduler_cores` canisters, so target canister is not scheduled first. + let mut canister_ids = vec![]; + for _ in 0..scheduler_cores { + let canister_id = test.create_canister(); + test.send_ingress(canister_id, ingress(slice)); + canister_ids.push(canister_id); + } + // Create a target canister with two long executions. + let target_id = test.create_canister(); + test.send_ingress(target_id, ingress(slice * 2 + 1)); + test.send_ingress(target_id, ingress(slice * 2 + 1)); + + test.execute_round(ExecutionRoundType::OrdinaryRound); + + let mut total_accumulated_priority = 0; + let mut total_priority_credit = 0; + for canister in test.state().canisters_iter() { + let system_state = &canister.system_state; + let scheduler_state = &canister.scheduler_state; + // All canisters should be executed. + assert_eq!(system_state.canister_metrics.executed, 1); + if canister.canister_id() == target_id { + // The target canister was not executed first, and still have messages. + assert_eq!(system_state.queues().ingress_queue_size(), 1); + } else { + assert_eq!(system_state.queues().ingress_queue_size(), 0); + } + // All canisters should be marked as fully executed. The target canister, + // despite still having messages, executed a full slice of instructions. + assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); + total_accumulated_priority += scheduler_state.accumulated_priority.get(); + total_priority_credit += scheduler_state.priority_credit.get(); + } + // The accumulated priority invariant should be respected. + assert_eq!(total_accumulated_priority - total_priority_credit, 0); +}