Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [MR-617] Enforce subnet-wide best-effort message memory limit #1835

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions rs/config/src/execution_environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,20 @@ const SUBNET_MEMORY_THRESHOLD: NumBytes = NumBytes::new(450 * GIB);
/// IC protocol requires storing copies of the canister state.
const SUBNET_MEMORY_CAPACITY: NumBytes = NumBytes::new(700 * GIB);

/// This is the upper limit on how much memory can be used by all canister
/// messages on a given subnet.
/// This is the upper limit on how much memory can be used by all guaranteed
/// response canister messages on a given subnet.
///
/// Message memory usage is calculated as the total size of enqueued canister
/// responses; plus the maximum allowed response size per queue reservation.
const SUBNET_MESSAGE_MEMORY_CAPACITY: NumBytes = NumBytes::new(25 * GIB);
/// Guaranteed response message memory usage is calculated as the total size of
/// enqueued guaranteed responses; plus the maximum allowed response size per
/// reserved guaranteed response slot.
const SUBNET_GUARANTEED_RESPONSE_MESSAGE_MEMORY_CAPACITY: NumBytes = NumBytes::new(25 * GIB);

/// The limit on how much memory may be used by all guaranteed response messages
/// on a given subnet at the end of a round.
///
/// During the round, the best-effort message memory usage may exceed the limit,
/// but the constraint is restored at the end of the round by shedding messages.
const SUBNET_BEST_EFFORT_MESSAGE_MEMORY_CAPACITY: NumBytes = NumBytes::new(5 * GIB);

/// This is the upper limit on how much memory can be used by the ingress
/// history on a given subnet. It is lower than the subnet message memory
Expand Down Expand Up @@ -169,10 +177,14 @@ pub struct Config {
/// the subnet.
pub subnet_memory_capacity: NumBytes,

/// The maximum amount of logical storage available to canister messages
/// across the whole subnet.
/// The maximum amount of logical storage available to guaranteed response
/// canister messages across the whole subnet.
pub subnet_message_memory_capacity: NumBytes,

/// The maximum amount of logical storage available to best-effort canister
/// messages across the whole subnet.
pub best_effort_message_memory_capacity: NumBytes,

/// The maximum amount of logical storage available to the ingress history
/// across the whole subnet.
pub ingress_history_memory_capacity: NumBytes,
Expand Down Expand Up @@ -309,7 +321,8 @@ impl Default for Config {
MAX_INSTRUCTIONS_FOR_MESSAGE_ACCEPTANCE_CALLS,
subnet_memory_threshold: SUBNET_MEMORY_THRESHOLD,
subnet_memory_capacity: SUBNET_MEMORY_CAPACITY,
subnet_message_memory_capacity: SUBNET_MESSAGE_MEMORY_CAPACITY,
subnet_message_memory_capacity: SUBNET_GUARANTEED_RESPONSE_MESSAGE_MEMORY_CAPACITY,
best_effort_message_memory_capacity: SUBNET_BEST_EFFORT_MESSAGE_MEMORY_CAPACITY,
ingress_history_memory_capacity: INGRESS_HISTORY_MEMORY_CAPACITY,
subnet_wasm_custom_sections_memory_capacity:
SUBNET_WASM_CUSTOM_SECTIONS_MEMORY_CAPACITY,
Expand Down
1 change: 1 addition & 0 deletions rs/messaging/src/message_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ impl BatchProcessorImpl {
scheduler,
demux,
stream_builder,
hypervisor_config.clone(),
log.clone(),
metrics.clone(),
));
Expand Down
15 changes: 14 additions & 1 deletion rs/messaging/src/state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::message_routing::{ApiBoundaryNodes, MessageRoutingMetrics, NodePublicKeys};
use crate::routing::{demux::Demux, stream_builder::StreamBuilder};
use ic_config::execution_environment::Config as HypervisorConfig;
use ic_interfaces::execution_environment::{
ExecutionRoundSummary, ExecutionRoundType, RegistryExecutionSettings, Scheduler,
};
use ic_logger::{fatal, ReplicaLogger};
use ic_query_stats::deliver_query_stats;
use ic_registry_subnet_features::SubnetFeatures;
use ic_replicated_state::{NetworkTopology, ReplicatedState};
use ic_types::NumBytes;
use ic_types::{batch::Batch, ExecutionRound};
use std::time::Instant;

Expand All @@ -17,6 +19,7 @@ const PHASE_INDUCTION: &str = "induction";
const PHASE_EXECUTION: &str = "execution";
const PHASE_MESSAGE_ROUTING: &str = "message_routing";
const PHASE_TIME_OUT_MESSAGES: &str = "time_out_messages";
const PHASE_SHED_MESSAGES: &str = "shed_messages";

pub(crate) trait StateMachine: Send {
fn execute_round(
Expand All @@ -34,6 +37,7 @@ pub(crate) struct StateMachineImpl {
scheduler: Box<dyn Scheduler<State = ReplicatedState>>,
demux: Box<dyn Demux>,
stream_builder: Box<dyn StreamBuilder>,
best_effort_message_memory_capacity: NumBytes,
log: ReplicaLogger,
metrics: MessageRoutingMetrics,
}
Expand All @@ -43,13 +47,15 @@ impl StateMachineImpl {
scheduler: Box<dyn Scheduler<State = ReplicatedState>>,
demux: Box<dyn Demux>,
stream_builder: Box<dyn StreamBuilder>,
hypervisor_config: HypervisorConfig,
log: ReplicaLogger,
metrics: MessageRoutingMetrics,
) -> Self {
Self {
scheduler,
demux,
stream_builder,
best_effort_message_memory_capacity: hypervisor_config.subnet_message_memory_capacity,
log,
metrics,
}
Expand Down Expand Up @@ -162,9 +168,16 @@ impl StateMachine for StateMachineImpl {

let since = Instant::now();
// Postprocess the state and consolidate the Streams.
let state_after_stream_builder = self.stream_builder.build_streams(state_after_execution);
let mut state_after_stream_builder =
self.stream_builder.build_streams(state_after_execution);
self.observe_phase_duration(PHASE_MESSAGE_ROUTING, &since);

let since = Instant::now();
// Shed some messages if above the best-effort message memory limit.
state_after_stream_builder
.enforce_best_effort_message_limit(self.best_effort_message_memory_capacity);
self.observe_phase_duration(PHASE_SHED_MESSAGES, &since);

state_after_stream_builder
}
}
3 changes: 3 additions & 0 deletions rs/messaging/src/state_machine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ fn state_machine_populates_network_topology() {
fixture.scheduler,
fixture.demux,
fixture.stream_builder,
Default::default(),
log,
fixture.metrics,
));
Expand Down Expand Up @@ -190,6 +191,7 @@ fn test_delivered_batch(provided_batch: Batch) -> ReplicatedState {
fixture.scheduler,
fixture.demux,
fixture.stream_builder,
Default::default(),
log,
fixture.metrics,
));
Expand Down Expand Up @@ -290,6 +292,7 @@ fn test_batch_time_impl(
fixture.scheduler,
fixture.demux,
fixture.stream_builder,
Default::default(),
log,
fixture.metrics,
);
Expand Down
4 changes: 3 additions & 1 deletion rs/replicated_state/src/canister_state/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,7 @@ impl CanisterQueues {
/// Does not account for callback references for expired callbacks or dropped
/// responses, as these are constant size per callback and thus can be included
/// in the cost of a callback.
pub fn best_effort_memory_usage(&self) -> usize {
pub fn best_effort_message_memory_usage(&self) -> usize {
self.message_stats().best_effort_message_bytes
}

Expand Down Expand Up @@ -1308,6 +1308,8 @@ impl CanisterQueues {
/// Updates the stats for the dropped message and (where applicable) the
/// generated response. `own_canister_id` and `local_canisters` are required
/// to determine the correct input queue schedule to update (if applicable).
///
/// Time complexity: `O(log(n))`.
pub fn shed_largest_message(
&mut self,
own_canister_id: &CanisterId,
Expand Down
8 changes: 4 additions & 4 deletions rs/replicated_state/src/canister_state/queues/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,20 +534,20 @@ fn test_shed_inbound_response() {
const NO_LOCAL_CANISTERS: BTreeMap<CanisterId, CanisterState> = BTreeMap::new();

// Shed the largest response (callback ID 3).
let memory_usage3 = queues.best_effort_memory_usage();
let memory_usage3 = queues.best_effort_message_memory_usage();
assert!(queues.shed_largest_message(&this, &NO_LOCAL_CANISTERS));
let memory_usage2 = queues.best_effort_memory_usage();
let memory_usage2 = queues.best_effort_message_memory_usage();
assert!(memory_usage2 < memory_usage3);

// Shed the next largest response (callback ID 2).
assert!(queues.shed_largest_message(&this, &NO_LOCAL_CANISTERS));
let memory_usage1 = queues.best_effort_memory_usage();
let memory_usage1 = queues.best_effort_message_memory_usage();
assert!(memory_usage1 < memory_usage2);

// Pop the response for callback ID 1.
assert_matches!(queues.pop_input(), Some(CanisterInput::Response(response)) if response.originator_reply_callback.get() == 1);
assert_eq!(2, queues.input_queues_response_count());
assert_eq!(0, queues.best_effort_memory_usage());
assert_eq!(0, queues.best_effort_message_memory_usage());

// There's nothing else to shed.
assert!(!queues.shed_largest_message(&this, &NO_LOCAL_CANISTERS));
Expand Down
18 changes: 18 additions & 0 deletions rs/replicated_state/src/canister_state/system_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,11 @@ impl SystemState {
(self.queues.guaranteed_response_memory_usage() as u64).into()
}

/// Returns the memory currently used by best-effort canister messages.
pub fn best_effort_message_memory_usage(&self) -> NumBytes {
(self.queues.best_effort_message_memory_usage() as u64).into()
}

/// Returns the memory currently in use by the `SystemState`
/// for canister history.
pub fn canister_history_memory_usage(&self) -> NumBytes {
Expand Down Expand Up @@ -1451,6 +1456,19 @@ impl SystemState {
.time_out_messages(current_time, own_canister_id, local_canisters)
}

/// Removes the largest best-effort message in the underlying pool. Returns
/// `true` if a message was removed; `false` otherwise.
///
/// Time complexity: `O(log(n))`.
pub fn shed_largest_message(
&mut self,
own_canister_id: &CanisterId,
local_canisters: &BTreeMap<CanisterId, CanisterState>,
) -> bool {
self.queues
.shed_largest_message(own_canister_id, local_canisters)
}

/// Re-partitions the local and remote input schedules of `self.queues`
/// following a canister migration, based on the updated set of local canisters.
///
Expand Down
63 changes: 62 additions & 1 deletion rs/replicated_state/src/replicated_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use ic_validate_eq::ValidateEq;
use ic_validate_eq_derive::ValidateEq;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use std::collections::{BTreeMap, VecDeque};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::Arc;
use strum_macros::{EnumCount, EnumIter};

Expand Down Expand Up @@ -929,6 +929,67 @@ impl ReplicatedState {
timed_out_messages_count
}

/// Enforces the best-effort message limit by repeatedly shedding the largest
/// best-effort messages of the canisters with the largest best-effort memory
/// usage until the memory usage drops below the limit.
///
/// Time complexity: `O(n * log(n))`.
pub fn enforce_best_effort_message_limit(&mut self, limit: NumBytes) {
const ZERO: NumBytes = NumBytes::new(0);

// Check if we need to do anything at all before constructing a priority queue.
let mut memory_usage = self
.canister_states
.values()
.fold(NumBytes::new(0), |acc, canister| {
acc + canister.system_state.best_effort_message_memory_usage()
});
if memory_usage <= limit {
// No need to do anything.
return;
}

// Construct a priority queue of canisters by best-effort message memory usage.
let mut priority_queue: BTreeSet<_> = self
.canister_states
.iter()
.filter_map(|(canister_id, canister)| {
let memory_usage = canister.system_state.best_effort_message_memory_usage();
if memory_usage > ZERO {
Some((memory_usage, *canister_id))
} else {
None
}
})
.collect();

// Shed messages from the canisters with the largest memory usage until we are
// below the limit.
while memory_usage > limit {
// Safe to unwrap, the priority queue cannot be empty.
let (memory_usage_before, canister_id) = priority_queue.pop_last().unwrap();

// Remove the canister, shed its largest message, replace it.
let mut canister = self.canister_states.remove(&canister_id).unwrap();
let message_shed = canister
.system_state
.shed_largest_message(&canister_id, &self.canister_states);
debug_assert!(message_shed);
let memory_usage_after = canister.system_state.best_effort_message_memory_usage();
self.canister_states.insert(canister_id, canister);

// Update the priority queue.
if memory_usage_after > ZERO {
priority_queue.insert((memory_usage_after, canister_id));
}

// Update the total memory usage.
let memory_usage_delta = memory_usage_before - memory_usage_after;
debug_assert!(memory_usage_delta > ZERO);
memory_usage -= memory_usage_delta;
}
}

/// Splits the replicated state as part of subnet splitting phase 1, retaining
/// only the canisters of `subnet_id` (as determined by the provided routing
/// table).
Expand Down
Loading