Skip to content

Commit

Permalink
fix: EXC-1741: Fix full execution round definition (#1772)
Browse files Browse the repository at this point in the history
The full execution round is:

1. When a canister is scheduled first on a CPU core, during the first
inner round.
2. When a canister has nothing to execute (NextExecution::None) or
canister just executed a full slice of instructions.
  • Loading branch information
berestovskyy authored Oct 1, 2024
1 parent 54c3542 commit ba5ffe0
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 1 deletion.
13 changes: 12 additions & 1 deletion rs/execution_environment/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ impl SchedulerImpl {
&measurement_scope,
&mut round_limits,
registry_settings.subnet_size,
is_first_iteration,
);
let instructions_consumed = instructions_before - round_limits.instructions;
drop(execution_timer);
Expand Down Expand Up @@ -874,6 +875,7 @@ impl SchedulerImpl {
measurement_scope: &MeasurementScope,
round_limits: &mut RoundLimits,
subnet_size: usize,
is_first_iteration: bool,
) -> (
Vec<CanisterState>,
BTreeSet<CanisterId>,
Expand Down Expand Up @@ -943,6 +945,7 @@ impl SchedulerImpl {
deterministic_time_slicing,
round_limits,
subnet_size,
is_first_iteration,
);
});
}
Expand Down Expand Up @@ -1967,6 +1970,7 @@ fn execute_canisters_on_thread(
deterministic_time_slicing: FlagStatus,
mut round_limits: RoundLimits,
subnet_size: usize,
is_first_iteration: bool,
) -> ExecutionThreadResult {
// Since this function runs on a helper thread, we cannot use a nested scope
// here. Instead, we propagate metrics to the outer scope manually via
Expand Down Expand Up @@ -2088,7 +2092,14 @@ fn execute_canisters_on_thread(
if let Some(es) = &mut canister.execution_state {
es.last_executed_round = round_id;
}
if !canister.has_input() || rank == 0 {
let full_message_execution = match canister.next_execution() {
NextExecution::None => true,
NextExecution::StartNew => false,
// We just finished a full slice of executions.
NextExecution::ContinueLong | NextExecution::ContinueInstallCode => true,
};
let scheduled_first = is_first_iteration && rank == 0;
if full_message_execution || scheduled_first {
// The very first canister is considered to have a full execution round for
// scheduling purposes even if it did not complete within the round.
canister.scheduler_state.last_full_execution_round = round_id;
Expand Down
121 changes: 121 additions & 0 deletions rs/execution_environment/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5815,3 +5815,124 @@ fn scheduled_heap_delta_limit_scaling() {
assert_eq!(10, scheduled_limit(50, 50, 9, 10, 5));
assert_eq!(10, scheduled_limit(55, 50, 9, 10, 5));
}

#[test]
fn inner_round_first_execution_is_not_a_full_execution() {
let scheduler_cores = 2;
let instructions = 20;
let max_messages_per_round = 3;
let mut test = SchedulerTestBuilder::new()
.with_scheduler_config(SchedulerConfig {
scheduler_cores,
max_instructions_per_round: (instructions * max_messages_per_round).into(),
max_instructions_per_message: instructions.into(),
max_instructions_per_message_without_dts: instructions.into(),
max_instructions_per_slice: instructions.into(),
instruction_overhead_per_execution: NumInstructions::from(0),
instruction_overhead_per_canister: NumInstructions::from(0),
instruction_overhead_per_canister_for_finalization: NumInstructions::from(0),
..SchedulerConfig::application_subnet()
})
.build();

// Bump up the round number.
test.execute_round(ExecutionRoundType::OrdinaryRound);

// Create `scheduler_cores * 2` canisters, so target canister is not scheduled first.
let mut canister_ids = vec![];
for _ in 0..scheduler_cores * 2 {
canister_ids.push(test.create_canister());
}
// Create target canister after.
let target_id = test.create_canister();
// Send messages to the target canister.
for canister_id in &canister_ids {
let message = ingress(instructions).call(
other_side(target_id, instructions - 1),
on_response(instructions - 2),
);
test.send_ingress(*canister_id, message);
}

test.execute_round(ExecutionRoundType::OrdinaryRound);

let mut total_accumulated_priority = 0;
let mut total_priority_credit = 0;
for canister in test.state().canisters_iter() {
let system_state = &canister.system_state;
let scheduler_state = &canister.scheduler_state;
// All ingresses should be executed in the previous round.
assert_eq!(system_state.queues().ingress_queue_size(), 0);
assert_eq!(system_state.canister_metrics.executed, 1);
if canister.canister_id() == target_id {
// The target canister, despite being executed first in the second inner round,
// should not be marked as fully executed.
assert_ne!(test.last_round(), 0.into());
assert_eq!(scheduler_state.last_full_execution_round, 0.into());
} else {
assert_eq!(scheduler_state.last_full_execution_round, test.last_round());
}
total_accumulated_priority += scheduler_state.accumulated_priority.get();
total_priority_credit += scheduler_state.priority_credit.get();
}
// The accumulated priority invariant should be respected.
assert_eq!(total_accumulated_priority - total_priority_credit, 0);
}

#[test]
fn inner_round_long_execution_is_a_full_execution() {
let scheduler_cores = 2;
let slice = 20;
let mut test = SchedulerTestBuilder::new()
.with_scheduler_config(SchedulerConfig {
scheduler_cores,
max_instructions_per_round: (slice * 2).into(),
max_instructions_per_message: (slice * 10).into(),
max_instructions_per_message_without_dts: slice.into(),
max_instructions_per_slice: slice.into(),
instruction_overhead_per_execution: NumInstructions::from(0),
instruction_overhead_per_canister: NumInstructions::from(0),
instruction_overhead_per_canister_for_finalization: NumInstructions::from(0),
..SchedulerConfig::application_subnet()
})
.build();

// Bump up the round number.
test.execute_round(ExecutionRoundType::OrdinaryRound);

// Create `scheduler_cores` canisters, so target canister is not scheduled first.
let mut canister_ids = vec![];
for _ in 0..scheduler_cores {
let canister_id = test.create_canister();
test.send_ingress(canister_id, ingress(slice));
canister_ids.push(canister_id);
}
// Create a target canister with two long executions.
let target_id = test.create_canister();
test.send_ingress(target_id, ingress(slice * 2 + 1));
test.send_ingress(target_id, ingress(slice * 2 + 1));

test.execute_round(ExecutionRoundType::OrdinaryRound);

let mut total_accumulated_priority = 0;
let mut total_priority_credit = 0;
for canister in test.state().canisters_iter() {
let system_state = &canister.system_state;
let scheduler_state = &canister.scheduler_state;
// All canisters should be executed.
assert_eq!(system_state.canister_metrics.executed, 1);
if canister.canister_id() == target_id {
// The target canister was not executed first, and still have messages.
assert_eq!(system_state.queues().ingress_queue_size(), 1);
} else {
assert_eq!(system_state.queues().ingress_queue_size(), 0);
}
// All canisters should be marked as fully executed. The target canister,
// despite still having messages, executed a full slice of instructions.
assert_eq!(scheduler_state.last_full_execution_round, test.last_round());
total_accumulated_priority += scheduler_state.accumulated_priority.get();
total_priority_credit += scheduler_state.priority_credit.get();
}
// The accumulated priority invariant should be respected.
assert_eq!(total_accumulated_priority - total_priority_credit, 0);
}

0 comments on commit ba5ffe0

Please sign in to comment.