Skip to content

Commit

Permalink
feat: [MR-551] Support for best-effort messages in CanisterQueues (#…
Browse files Browse the repository at this point in the history
…879)

[MR-551] Reimplement `CanisterQueues` on top of the new `MessagePool`
and
`CanisterQueue`, to allow for expiring and shedding best-effort
messages.

* We now explicitly differentiate between queue slot reservations (as
the
mechanism that provides backpressure between pairs of canisters) and
memory
reservations for guaranteed responses (used for computing message memory
  usage).
* All enqueued messages are held within `MessagePool`.
* `CanisterQueue` now holds references to messages held in
`MessagePool`.
* Timing out (of best-effort messages and guaranteed response requests
in
output queues) and shedding is achieved by dropping the respective
message
  from `MessagePool`.
* Expiring or shedding a message may result in a "dangling reference" or
"stale
  message" being left in a `CanisterQueue`.
* Each `CanisterQueue` is guaranteed to have a non-stale entry at its
head.
* Message-related stats (message counts, memory size) are maintained by
`MessagePool`. Queue related stats (slot reservations and memory
reservations)
  are maintained by `CanisterQueues`.

[MR-551]:
https://dfinity.atlassian.net/browse/MR-551?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
  • Loading branch information
alin-at-dfinity authored Sep 7, 2024
1 parent 6e34aed commit 20321d1
Show file tree
Hide file tree
Showing 13 changed files with 1,889 additions and 1,575 deletions.
4 changes: 2 additions & 2 deletions rs/messaging/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ impl StateMachine for StateMachineImpl {
}

// Time out expired messages.
let timed_out_messages = state.time_out_requests();
let timed_out_messages = state.time_out_messages();
self.metrics
.timed_out_messages_total
.inc_by(timed_out_messages);
.inc_by(timed_out_messages as u64);
self.observe_phase_duration(PHASE_TIME_OUT_MESSAGES, &since);

// Preprocess messages and add messages to the induction pool through the Demux.
Expand Down
1,549 changes: 588 additions & 961 deletions rs/replicated_state/src/canister_state/queues.rs

Large diffs are not rendered by default.

10 changes: 4 additions & 6 deletions rs/replicated_state/src/canister_state/queues/input_schedule.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![allow(dead_code)]

use super::queue::CanisterQueue;
use super::CanisterQueues;
use crate::{InputQueueType, InputSource};
Expand Down Expand Up @@ -305,11 +303,11 @@ pub struct CanisterQueuesLoopDetector {
impl CanisterQueuesLoopDetector {
/// Detects a loop in `CanisterQueues`.
pub fn detected_loop(&self, canister_queues: &CanisterQueues) -> bool {
let skipped_all_remote =
self.remote_queue_skip_count >= canister_queues.remote_subnet_input_schedule.len();
let skipped_all_remote = self.remote_queue_skip_count
>= canister_queues.input_schedule.remote_sender_schedule.len();

let skipped_all_local =
self.local_queue_skip_count >= canister_queues.local_subnet_input_schedule.len();
let skipped_all_local = self.local_queue_skip_count
>= canister_queues.input_schedule.local_sender_schedule.len();

let skipped_all_ingress =
self.ingress_queue_skip_count >= canister_queues.ingress_queue.ingress_schedule_size();
Expand Down
3 changes: 0 additions & 3 deletions rs/replicated_state/src/canister_state/queues/message_pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
// TODO(MR-569) Remove when `CanisterQueues` has been updated to use this.
#![allow(dead_code)]

use ic_protobuf::proxy::{try_from_option_field, ProxyDecodeError};
use ic_protobuf::state::queues::v1 as pb_queues;
use ic_types::messages::{
Expand Down
23 changes: 22 additions & 1 deletion rs/replicated_state/src/canister_state/queues/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod tests;
///
/// May be a weak reference into the message pool; or identify a reject response to
/// a specific callback.
#[derive(Clone, Eq, PartialEq, Debug)]
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub(super) enum CanisterQueueItem {
/// Weak reference to a `Request` or `Response` held in the message pool.
///
Expand Down Expand Up @@ -188,6 +188,16 @@ impl CanisterQueue {
Ok(())
}

/// Releases a reserved response slot.
///
/// This is used when a request in the reverse queue is dropped before having
/// had a chance to be popped.
pub(super) fn release_reserved_response_slot(&mut self) {
debug_assert!(self.response_slots > 0);

self.response_slots = self.response_slots.saturating_sub(1);
}

/// Returns the number of reserved response slots.
pub(super) fn reserved_slots(&self) -> usize {
debug_assert!(self.request_slots + self.response_slots >= self.queue.len());
Expand Down Expand Up @@ -252,6 +262,17 @@ impl CanisterQueue {
self.queue.len()
}

/// Discards all items at the front of the queue for which the predicate holds.
/// Stops when it encounters the first item for which the predicate is false.
pub(super) fn pop_while(&mut self, predicate: impl Fn(&CanisterQueueItem) -> bool) {
while let Some(item) = self.peek() {
if !predicate(item) {
break;
}
self.pop();
}
}

/// Queue invariant check that panics if any invariant does not hold. Intended
/// to be called from within a `debug_assert!()` in production code.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use ic_test_utilities_types::messages::{IngressBuilder, RequestBuilder, Response
use ic_types::messages::{CallbackId, RequestOrResponse};
use ic_types::time::{CoarseTime, UNIX_EPOCH};
use ic_types::Time;
use message_pool::REQUEST_LIFETIME;
use proptest::prelude::*;
use std::time::Duration;

Expand Down
Loading

0 comments on commit 20321d1

Please sign in to comment.