diff --git a/rs/config/src/execution_environment.rs b/rs/config/src/execution_environment.rs index c9e68687c4d..8fa2edf6788 100644 --- a/rs/config/src/execution_environment.rs +++ b/rs/config/src/execution_environment.rs @@ -23,12 +23,20 @@ const SUBNET_MEMORY_THRESHOLD: NumBytes = NumBytes::new(450 * GIB); /// IC protocol requires storing copies of the canister state. const SUBNET_MEMORY_CAPACITY: NumBytes = NumBytes::new(700 * GIB); -/// This is the upper limit on how much memory can be used by all canister -/// messages on a given subnet. +/// This is the upper limit on how much memory can be used by all guaranteed +/// response canister messages on a given subnet. /// -/// Message memory usage is calculated as the total size of enqueued canister -/// responses; plus the maximum allowed response size per queue reservation. -const SUBNET_MESSAGE_MEMORY_CAPACITY: NumBytes = NumBytes::new(25 * GIB); +/// Guaranteed response message memory usage is calculated as the total size of +/// enqueued guaranteed responses; plus the maximum allowed response size per +/// reserved guaranteed response slot. +const SUBNET_GUARANTEED_RESPONSE_MESSAGE_MEMORY_CAPACITY: NumBytes = NumBytes::new(25 * GIB); + +/// The limit on how much memory may be used by all guaranteed response messages +/// on a given subnet at the end of a round. +/// +/// During the round, the best-effort message memory usage may exceed the limit, +/// but the constraint is restored at the end of the round by shedding messages. +const SUBNET_BEST_EFFORT_MESSAGE_MEMORY_CAPACITY: NumBytes = NumBytes::new(5 * GIB); /// This is the upper limit on how much memory can be used by the ingress /// history on a given subnet. It is lower than the subnet message memory @@ -169,10 +177,14 @@ pub struct Config { /// the subnet. pub subnet_memory_capacity: NumBytes, - /// The maximum amount of logical storage available to canister messages - /// across the whole subnet. + /// The maximum amount of logical storage available to guaranteed response + /// canister messages across the whole subnet. pub subnet_message_memory_capacity: NumBytes, + /// The maximum amount of logical storage available to best-effort canister + /// messages across the whole subnet. + pub best_effort_message_memory_capacity: NumBytes, + /// The maximum amount of logical storage available to the ingress history /// across the whole subnet. pub ingress_history_memory_capacity: NumBytes, @@ -309,7 +321,8 @@ impl Default for Config { MAX_INSTRUCTIONS_FOR_MESSAGE_ACCEPTANCE_CALLS, subnet_memory_threshold: SUBNET_MEMORY_THRESHOLD, subnet_memory_capacity: SUBNET_MEMORY_CAPACITY, - subnet_message_memory_capacity: SUBNET_MESSAGE_MEMORY_CAPACITY, + subnet_message_memory_capacity: SUBNET_GUARANTEED_RESPONSE_MESSAGE_MEMORY_CAPACITY, + best_effort_message_memory_capacity: SUBNET_BEST_EFFORT_MESSAGE_MEMORY_CAPACITY, ingress_history_memory_capacity: INGRESS_HISTORY_MEMORY_CAPACITY, subnet_wasm_custom_sections_memory_capacity: SUBNET_WASM_CUSTOM_SECTIONS_MEMORY_CAPACITY, diff --git a/rs/messaging/src/message_routing.rs b/rs/messaging/src/message_routing.rs index 6c168b1b462..406fbbced79 100644 --- a/rs/messaging/src/message_routing.rs +++ b/rs/messaging/src/message_routing.rs @@ -585,6 +585,7 @@ impl BatchProcessorImpl { scheduler, demux, stream_builder, + hypervisor_config.clone(), log.clone(), metrics.clone(), )); diff --git a/rs/messaging/src/state_machine.rs b/rs/messaging/src/state_machine.rs index b4c0fe4d8ad..653bda12114 100644 --- a/rs/messaging/src/state_machine.rs +++ b/rs/messaging/src/state_machine.rs @@ -1,5 +1,6 @@ use crate::message_routing::{ApiBoundaryNodes, MessageRoutingMetrics, NodePublicKeys}; use crate::routing::{demux::Demux, stream_builder::StreamBuilder}; +use ic_config::execution_environment::Config as HypervisorConfig; use ic_interfaces::execution_environment::{ ExecutionRoundSummary, ExecutionRoundType, RegistryExecutionSettings, Scheduler, }; @@ -7,6 +8,7 @@ use ic_logger::{fatal, ReplicaLogger}; use ic_query_stats::deliver_query_stats; use ic_registry_subnet_features::SubnetFeatures; use ic_replicated_state::{NetworkTopology, ReplicatedState}; +use ic_types::NumBytes; use ic_types::{batch::Batch, ExecutionRound}; use std::time::Instant; @@ -17,6 +19,7 @@ const PHASE_INDUCTION: &str = "induction"; const PHASE_EXECUTION: &str = "execution"; const PHASE_MESSAGE_ROUTING: &str = "message_routing"; const PHASE_TIME_OUT_MESSAGES: &str = "time_out_messages"; +const PHASE_SHED_MESSAGES: &str = "shed_messages"; pub(crate) trait StateMachine: Send { fn execute_round( @@ -34,6 +37,7 @@ pub(crate) struct StateMachineImpl { scheduler: Box>, demux: Box, stream_builder: Box, + best_effort_message_memory_capacity: NumBytes, log: ReplicaLogger, metrics: MessageRoutingMetrics, } @@ -43,6 +47,7 @@ impl StateMachineImpl { scheduler: Box>, demux: Box, stream_builder: Box, + hypervisor_config: HypervisorConfig, log: ReplicaLogger, metrics: MessageRoutingMetrics, ) -> Self { @@ -50,6 +55,7 @@ impl StateMachineImpl { scheduler, demux, stream_builder, + best_effort_message_memory_capacity: hypervisor_config.subnet_message_memory_capacity, log, metrics, } @@ -162,9 +168,16 @@ impl StateMachine for StateMachineImpl { let since = Instant::now(); // Postprocess the state and consolidate the Streams. - let state_after_stream_builder = self.stream_builder.build_streams(state_after_execution); + let mut state_after_stream_builder = + self.stream_builder.build_streams(state_after_execution); self.observe_phase_duration(PHASE_MESSAGE_ROUTING, &since); + let since = Instant::now(); + // Shed some messages if above the best-effort message memory limit. + state_after_stream_builder + .enforce_best_effort_message_limit(self.best_effort_message_memory_capacity); + self.observe_phase_duration(PHASE_SHED_MESSAGES, &since); + state_after_stream_builder } } diff --git a/rs/messaging/src/state_machine/tests.rs b/rs/messaging/src/state_machine/tests.rs index 63fb2dcd4cb..2587b9df0e5 100644 --- a/rs/messaging/src/state_machine/tests.rs +++ b/rs/messaging/src/state_machine/tests.rs @@ -156,6 +156,7 @@ fn state_machine_populates_network_topology() { fixture.scheduler, fixture.demux, fixture.stream_builder, + Default::default(), log, fixture.metrics, )); @@ -190,6 +191,7 @@ fn test_delivered_batch(provided_batch: Batch) -> ReplicatedState { fixture.scheduler, fixture.demux, fixture.stream_builder, + Default::default(), log, fixture.metrics, )); @@ -290,6 +292,7 @@ fn test_batch_time_impl( fixture.scheduler, fixture.demux, fixture.stream_builder, + Default::default(), log, fixture.metrics, ); diff --git a/rs/replicated_state/src/canister_state/queues.rs b/rs/replicated_state/src/canister_state/queues.rs index dfa55a91f8a..07212853b9f 100644 --- a/rs/replicated_state/src/canister_state/queues.rs +++ b/rs/replicated_state/src/canister_state/queues.rs @@ -1182,7 +1182,7 @@ impl CanisterQueues { /// Does not account for callback references for expired callbacks or dropped /// responses, as these are constant size per callback and thus can be included /// in the cost of a callback. - pub fn best_effort_memory_usage(&self) -> usize { + pub fn best_effort_message_memory_usage(&self) -> usize { self.message_stats().best_effort_message_bytes } @@ -1308,6 +1308,8 @@ impl CanisterQueues { /// Updates the stats for the dropped message and (where applicable) the /// generated response. `own_canister_id` and `local_canisters` are required /// to determine the correct input queue schedule to update (if applicable). + /// + /// Time complexity: `O(log(n))`. pub fn shed_largest_message( &mut self, own_canister_id: &CanisterId, diff --git a/rs/replicated_state/src/canister_state/queues/tests.rs b/rs/replicated_state/src/canister_state/queues/tests.rs index 60c6e49c3a7..6445670234f 100644 --- a/rs/replicated_state/src/canister_state/queues/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/tests.rs @@ -534,20 +534,20 @@ fn test_shed_inbound_response() { const NO_LOCAL_CANISTERS: BTreeMap = BTreeMap::new(); // Shed the largest response (callback ID 3). - let memory_usage3 = queues.best_effort_memory_usage(); + let memory_usage3 = queues.best_effort_message_memory_usage(); assert!(queues.shed_largest_message(&this, &NO_LOCAL_CANISTERS)); - let memory_usage2 = queues.best_effort_memory_usage(); + let memory_usage2 = queues.best_effort_message_memory_usage(); assert!(memory_usage2 < memory_usage3); // Shed the next largest response (callback ID 2). assert!(queues.shed_largest_message(&this, &NO_LOCAL_CANISTERS)); - let memory_usage1 = queues.best_effort_memory_usage(); + let memory_usage1 = queues.best_effort_message_memory_usage(); assert!(memory_usage1 < memory_usage2); // Pop the response for callback ID 1. assert_matches!(queues.pop_input(), Some(CanisterInput::Response(response)) if response.originator_reply_callback.get() == 1); assert_eq!(2, queues.input_queues_response_count()); - assert_eq!(0, queues.best_effort_memory_usage()); + assert_eq!(0, queues.best_effort_message_memory_usage()); // There's nothing else to shed. assert!(!queues.shed_largest_message(&this, &NO_LOCAL_CANISTERS)); diff --git a/rs/replicated_state/src/canister_state/system_state.rs b/rs/replicated_state/src/canister_state/system_state.rs index 13c5af7faea..8357c935358 100644 --- a/rs/replicated_state/src/canister_state/system_state.rs +++ b/rs/replicated_state/src/canister_state/system_state.rs @@ -1346,6 +1346,11 @@ impl SystemState { (self.queues.guaranteed_response_memory_usage() as u64).into() } + /// Returns the memory currently used by best-effort canister messages. + pub fn best_effort_message_memory_usage(&self) -> NumBytes { + (self.queues.best_effort_message_memory_usage() as u64).into() + } + /// Returns the memory currently in use by the `SystemState` /// for canister history. pub fn canister_history_memory_usage(&self) -> NumBytes { @@ -1451,6 +1456,19 @@ impl SystemState { .time_out_messages(current_time, own_canister_id, local_canisters) } + /// Removes the largest best-effort message in the underlying pool. Returns + /// `true` if a message was removed; `false` otherwise. + /// + /// Time complexity: `O(log(n))`. + pub fn shed_largest_message( + &mut self, + own_canister_id: &CanisterId, + local_canisters: &BTreeMap, + ) -> bool { + self.queues + .shed_largest_message(own_canister_id, local_canisters) + } + /// Re-partitions the local and remote input schedules of `self.queues` /// following a canister migration, based on the updated set of local canisters. /// diff --git a/rs/replicated_state/src/replicated_state.rs b/rs/replicated_state/src/replicated_state.rs index 111feeb7675..206bfd828d6 100644 --- a/rs/replicated_state/src/replicated_state.rs +++ b/rs/replicated_state/src/replicated_state.rs @@ -35,7 +35,7 @@ use ic_validate_eq::ValidateEq; use ic_validate_eq_derive::ValidateEq; use rand::{Rng, SeedableRng}; use rand_chacha::ChaChaRng; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::sync::Arc; use strum_macros::{EnumCount, EnumIter}; @@ -929,6 +929,67 @@ impl ReplicatedState { timed_out_messages_count } + /// Enforces the best-effort message limit by repeatedly shedding the largest + /// best-effort messages of the canisters with the largest best-effort memory + /// usage until the memory usage drops below the limit. + /// + /// Time complexity: `O(n * log(n))`. + pub fn enforce_best_effort_message_limit(&mut self, limit: NumBytes) { + const ZERO: NumBytes = NumBytes::new(0); + + // Check if we need to do anything at all before constructing a priority queue. + let mut memory_usage = self + .canister_states + .values() + .fold(NumBytes::new(0), |acc, canister| { + acc + canister.system_state.best_effort_message_memory_usage() + }); + if memory_usage <= limit { + // No need to do anything. + return; + } + + // Construct a priority queue of canisters by best-effort message memory usage. + let mut priority_queue: BTreeSet<_> = self + .canister_states + .iter() + .filter_map(|(canister_id, canister)| { + let memory_usage = canister.system_state.best_effort_message_memory_usage(); + if memory_usage > ZERO { + Some((memory_usage, *canister_id)) + } else { + None + } + }) + .collect(); + + // Shed messages from the canisters with the largest memory usage until we are + // below the limit. + while memory_usage > limit { + // Safe to unwrap, the priority queue cannot be empty. + let (memory_usage_before, canister_id) = priority_queue.pop_last().unwrap(); + + // Remove the canister, shed its largest message, replace it. + let mut canister = self.canister_states.remove(&canister_id).unwrap(); + let message_shed = canister + .system_state + .shed_largest_message(&canister_id, &self.canister_states); + debug_assert!(message_shed); + let memory_usage_after = canister.system_state.best_effort_message_memory_usage(); + self.canister_states.insert(canister_id, canister); + + // Update the priority queue. + if memory_usage_after > ZERO { + priority_queue.insert((memory_usage_after, canister_id)); + } + + // Update the total memory usage. + let memory_usage_delta = memory_usage_before - memory_usage_after; + debug_assert!(memory_usage_delta > ZERO); + memory_usage -= memory_usage_delta; + } + } + /// Splits the replicated state as part of subnet splitting phase 1, retaining /// only the canisters of `subnet_id` (as determined by the provided routing /// table).