diff --git a/rs/protobuf/def/state/queues/v1/queues.proto b/rs/protobuf/def/state/queues/v1/queues.proto index 6128648b011..69f403abd27 100644 --- a/rs/protobuf/def/state/queues/v1/queues.proto +++ b/rs/protobuf/def/state/queues/v1/queues.proto @@ -180,6 +180,7 @@ message CanisterQueues { uint64 id = 1; uint64 callback_id = 2; } + repeated CallbackReference expired_callbacks = 13; repeated CallbackReference shed_responses = 12; enum NextInputQueue { diff --git a/rs/protobuf/src/gen/state/state.queues.v1.rs b/rs/protobuf/src/gen/state/state.queues.v1.rs index b6653b3b307..a868ca1fffa 100644 --- a/rs/protobuf/src/gen/state/state.queues.v1.rs +++ b/rs/protobuf/src/gen/state/state.queues.v1.rs @@ -216,6 +216,8 @@ pub struct CanisterQueues { pub canister_queues: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "10")] pub pool: ::core::option::Option, + #[prost(message, repeated, tag = "13")] + pub expired_callbacks: ::prost::alloc::vec::Vec, #[prost(message, repeated, tag = "12")] pub shed_responses: ::prost::alloc::vec::Vec, #[prost(enumeration = "canister_queues::NextInputQueue", tag = "6")] diff --git a/rs/protobuf/src/gen/types/state.queues.v1.rs b/rs/protobuf/src/gen/types/state.queues.v1.rs index cbb5633472d..86f1b11c7b5 100644 --- a/rs/protobuf/src/gen/types/state.queues.v1.rs +++ b/rs/protobuf/src/gen/types/state.queues.v1.rs @@ -216,6 +216,8 @@ pub struct CanisterQueues { pub canister_queues: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "10")] pub pool: ::core::option::Option, + #[prost(message, repeated, tag = "13")] + pub expired_callbacks: ::prost::alloc::vec::Vec, #[prost(message, repeated, tag = "12")] pub shed_responses: ::prost::alloc::vec::Vec, #[prost(enumeration = "canister_queues::NextInputQueue", tag = "6")] diff --git a/rs/replicated_state/src/canister_state/queues.rs b/rs/replicated_state/src/canister_state/queues.rs index dfa55a91f8a..88bad05e0ad 100644 --- a/rs/replicated_state/src/canister_state/queues.rs +++ b/rs/replicated_state/src/canister_state/queues.rs @@ -7,7 +7,7 @@ mod tests; pub use self::input_schedule::CanisterQueuesLoopDetector; use self::input_schedule::InputSchedule; use self::message_pool::{ - Class, Context, InboundReference, Kind, MessagePool, OutboundReference, SomeReference, + Context, InboundReference, Kind, MessagePool, OutboundReference, SomeReference, }; use self::queue::{CanisterQueue, IngressQueue, InputQueue, OutputQueue}; use crate::replicated_state::MR_SYNTHETIC_REJECT_MESSAGE_MAX_LEN; @@ -26,6 +26,7 @@ use ic_types::messages::{ use ic_types::{CanisterId, CountBytes, Time}; use ic_validate_eq::ValidateEq; use ic_validate_eq_derive::ValidateEq; +use message_pool::ToContext; use prost::Message; use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::convert::{From, TryFrom}; @@ -313,7 +314,6 @@ pub(crate) enum CanisterInput { Request(Arc), Response(Arc), /// A concise reject response meaning "call deadine has expired". - #[allow(dead_code)] DeadlineExpired(CallbackId), /// A concise reject response meaning "call response was dropped". ResponseDropped(CallbackId), @@ -347,7 +347,7 @@ impl From for CanisterInput { /// /// * a `MessagePool`, holding messages and providing message stats (count, /// size) and support for time-based expiration and load shedding; and -/// * maps of compact resopnses (`CallbackIds` that have either expired or +/// * maps of compact responses (`CallbackIds` that have either expired or /// whose responses have been shed. /// /// Implements the `MessageStore` trait for both inbound messages @@ -360,6 +360,12 @@ struct MessageStoreImpl { #[validate_eq(CompareWithValidateEq)] pool: MessagePool, + /// "Deadline expired" compact reject responses (`CallbackIds`), returned as + /// `CanisterInput::DeadlineExpired` by `peek_input()` / `pop_input()` (and + /// "inflated" by `SystemState` into `SysUnknown` reject responses based on the + /// callback). + expired_callbacks: BTreeMap, + /// Compact reject responses (`CallbackIds`) replacing best-effort responses /// that were shed. These are returned as `CanisterInput::ResponseDropped` by /// `peek_input()` / `pop_input()` (and "inflated" by `SystemState` into @@ -400,7 +406,9 @@ impl MessageStoreImpl { /// Returns `true` if `ingress_queue` or at least one of the canister input /// queues is not empty; `false` otherwise. pub fn has_input(&self) -> bool { - self.pool.message_stats().inbound_message_count > 0 || !self.shed_responses.is_empty() + self.pool.message_stats().inbound_message_count > 0 + || !self.expired_callbacks.is_empty() + || !self.shed_responses.is_empty() } /// Returns `true` if at least one output queue is not empty; false otherwise. @@ -411,7 +419,7 @@ impl MessageStoreImpl { /// Tests whether the message store contains neither pooled messages nor compact /// responses. fn is_empty(&self) -> bool { - self.pool.len() == 0 && self.shed_responses.is_empty() + self.pool.len() == 0 && self.expired_callbacks.is_empty() && self.shed_responses.is_empty() } /// Helper function for concisely validating the hard invariant that a canister @@ -426,12 +434,13 @@ impl MessageStoreImpl { ) -> Result<(), String> where MessageStoreImpl: MessageStore, + T: ToContext, { if let Some(reference) = queue.peek() { if self.is_stale(reference) { return Err(format!( "Stale reference at the front of {:?} queue to/from {}", - reference.context(), + T::context(), canister_id )); } @@ -465,13 +474,15 @@ impl MessageStore for MessageStoreImpl { type TRef<'a> = CanisterInput; fn get(&self, reference: InboundReference) -> CanisterInput { - assert_eq!(Context::Inbound, reference.context()); - if let Some(msg) = self.pool.get(reference) { + debug_assert!(!self.expired_callbacks.contains_key(&reference)); debug_assert!(!self.shed_responses.contains_key(&reference)); return msg.clone().into(); - } else if reference.class() == Class::BestEffort && reference.kind() == Kind::Response { - if let Some(callback_id) = self.shed_responses.get(&reference) { + } else if reference.is_inbound_best_effort_response() { + if let Some(callback_id) = self.expired_callbacks.get(&reference) { + debug_assert!(!self.shed_responses.contains_key(&reference)); + return CanisterInput::DeadlineExpired(*callback_id); + } else if let Some(callback_id) = self.shed_responses.get(&reference) { return CanisterInput::ResponseDropped(*callback_id); } } @@ -480,13 +491,15 @@ impl MessageStore for MessageStoreImpl { } fn take(&mut self, reference: InboundReference) -> CanisterInput { - assert_eq!(Context::Inbound, reference.context()); - if let Some(msg) = self.pool.take(reference) { + debug_assert!(!self.expired_callbacks.contains_key(&reference)); debug_assert!(!self.shed_responses.contains_key(&reference)); return msg.into(); - } else if reference.class() == Class::BestEffort && reference.kind() == Kind::Response { - if let Some(callback_id) = self.shed_responses.remove(&reference) { + } else if reference.is_inbound_best_effort_response() { + if let Some(callback_id) = self.expired_callbacks.remove(&reference) { + debug_assert!(!self.shed_responses.contains_key(&reference)); + return CanisterInput::DeadlineExpired(callback_id); + } else if let Some(callback_id) = self.shed_responses.remove(&reference) { return CanisterInput::ResponseDropped(callback_id); } } @@ -495,12 +508,10 @@ impl MessageStore for MessageStoreImpl { } fn is_stale(&self, reference: InboundReference) -> bool { - assert_eq!(Context::Inbound, reference.context()); - self.pool.get(reference).is_none() - && (reference.class() != Class::BestEffort - || reference.kind() != Kind::Response - || !self.shed_responses.contains_key(&reference)) + && !(reference.is_inbound_best_effort_response() + && (self.expired_callbacks.contains_key(&reference) + || self.shed_responses.contains_key(&reference))) } } @@ -508,28 +519,26 @@ impl MessageStore for MessageStoreImpl { type TRef<'a> = &'a RequestOrResponse; fn get(&self, reference: OutboundReference) -> &RequestOrResponse { - assert_eq!(Context::Outbound, reference.context()); - self.pool .get(reference) .expect("stale reference at the front of output queue") } fn take(&mut self, reference: OutboundReference) -> RequestOrResponse { - assert_eq!(Context::Outbound, reference.context()); - self.pool .take(reference) .expect("stale reference at the front of output queue") } fn is_stale(&self, reference: OutboundReference) -> bool { - assert_eq!(Context::Outbound, reference.context()); self.pool.get(reference).is_none() } } trait InboundMessageStore: MessageStore { + /// Enqueues a "deadline expired" compact response for the given callback. + fn push_inbound_timeout_response(&mut self, callback_id: CallbackId) -> InboundReference; + /// Collects the `CallbackIds` of all responses and compact responses enqueued /// in input queues. /// @@ -544,50 +553,65 @@ trait InboundMessageStore: MessageStore { } impl InboundMessageStore for MessageStoreImpl { + fn push_inbound_timeout_response(&mut self, callback_id: CallbackId) -> InboundReference { + let reference = self.pool.make_inbound_timeout_response_reference(); + self.expired_callbacks.insert(reference, callback_id); + reference + } + fn callbacks_with_enqueued_response( &self, canister_queues: &BTreeMap, ) -> Result, String> { - let callbacks_vec = canister_queues + let mut callbacks = BTreeSet::new(); + canister_queues .values() .flat_map(|(input_queue, _)| input_queue.iter()) - .filter_map(|reference| { - let (a, b) = ( + .try_for_each(|reference| { + let (a, b, c) = ( self.pool.get(*reference), + self.expired_callbacks.get(reference), self.shed_responses.get(reference), ); - match (a, b) { + let callback_id = match (a, b, c) { // Pooled response. - (Some(RequestOrResponse::Response(rep)), None) => { - Some(Ok(rep.originator_reply_callback)) + (Some(RequestOrResponse::Response(rep)), None, None) => { + rep.originator_reply_callback } // Compact response. - (None, Some(callback_id)) => Some(Ok(*callback_id)), + (None, Some(callback_id), None) | (None, None, Some(callback_id)) => { + *callback_id + } // Request or stale reference. - (Some(RequestOrResponse::Request(_)), None) | (None, None) => None, + (Some(RequestOrResponse::Request(_)), None, None) | (None, None, None) => { + return Ok(()) + } // Two or more of the above. This should never happen. - _ => Some(Err(format!( - "CanisterQueues: Multiple responses for {:?}", - reference - ))), - } - }) - .collect::, String>>()?; + _ => { + return Err(format!( + "CanisterQueues: Multiple responses for {:?}", + reference + )) + } + }; - let callbacks: BTreeSet<_> = callbacks_vec.iter().cloned().collect(); - if callbacks.len() != callbacks_vec.len() { - return Err(format!( - "CanisterQueues: Duplicate inbound response callback(s): {:?}", - callbacks_vec - )); - } + if callbacks.insert(callback_id) { + Ok(()) + } else { + Err(format!( + "CanisterQueues: Duplicate inbound response callback: {:?}", + callback_id + )) + } + })?; - let response_count = - self.pool.message_stats().inbound_response_count + self.shed_responses.len(); - if callbacks_vec.len() != response_count { + let response_count = self.pool.message_stats().inbound_response_count + + self.expired_callbacks.len() + + self.shed_responses.len(); + if callbacks.len() != response_count { return Err(format!( "CanisterQueues: Have {} inbound responses, but only {} are enqueued", response_count, @@ -786,6 +810,64 @@ impl CanisterQueues { Ok(()) } + /// Enqueues a "deadline expired" compact response for the given callback, iff a + /// response for the callback is not already enqueued. + /// + /// Must only be called for not-yet-executing callbacks (i.e. not for a paused + /// or aborted callback). + /// + /// Returns `Err` iff a compact response should have been enqueued, but wasn't + /// because no reserved slot was available. + pub(super) fn try_push_deadline_expired_input( + &mut self, + callback_id: CallbackId, + respondent: &CanisterId, + own_canister_id: &CanisterId, + local_canisters: &BTreeMap, + ) -> Result<(), String> { + // For a not yet executed callback, there must be a queue with either a reserved + // slot or an enqueued response. + let Some((input_queue, _)) = self.canister_queues.get_mut(respondent) else { + return Err(format!( + "No input queue for expired callback: {}", + callback_id + )); + }; + + // Check against duplicate responses. + if !self.callbacks_with_enqueued_response.insert(callback_id) { + // There is already a response enqueued for the callback. + return Ok(()); + } + + if input_queue.check_has_reserved_response_slot().is_err() { + // No response enqueued for `callback_id`, but no reserved slot either. This + // should never happen. + self.callbacks_with_enqueued_response.remove(&callback_id); + return Err(format!( + "No reserved response slot for expired callback: {}", + callback_id + )); + } + + let reference = self.store.push_inbound_timeout_response(callback_id); + input_queue.push_response(reference); + self.queue_stats.on_push_timeout_response(); + + // Add sender canister ID to the appropriate input schedule queue if it is not + // already scheduled. + if input_queue.len() == 1 { + let input_queue_type = + input_queue_type_fn(own_canister_id, local_canisters)(respondent); + self.input_schedule.schedule(*respondent, input_queue_type); + } + + debug_assert_eq!(Ok(()), self.test_invariants()); + debug_assert_eq!(Ok(()), self.schedules_ok(&|_| InputQueueType::RemoteSubnet)); + + Ok(()) + } + /// Pops the next canister input queue message. /// /// Note: We pop senders from the front of `input_schedule` and insert them @@ -1133,7 +1215,9 @@ impl CanisterQueues { /// Returns the number of non-stale canister messages enqueued in input queues. pub fn input_queues_message_count(&self) -> usize { - self.message_stats().inbound_message_count + self.store.shed_responses.len() + self.message_stats().inbound_message_count + + self.store.expired_callbacks.len() + + self.store.shed_responses.len() } /// Returns the number of reserved slots across all input queues. @@ -1161,7 +1245,9 @@ impl CanisterQueues { /// Returns the number of non-stale responses enqueued in input queues. pub fn input_queues_response_count(&self) -> usize { - self.message_stats().inbound_response_count + self.store.shed_responses.len() + self.message_stats().inbound_response_count + + self.store.expired_callbacks.len() + + self.store.shed_responses.len() } /// Returns the number of actual (non-stale) messages in output queues. @@ -1354,8 +1440,6 @@ impl CanisterQueues { /// Releases the outbound slot reservation of a shed or expired inbound request. /// Updates the stats for the dropped message. fn on_inbound_message_dropped(&mut self, reference: InboundReference, msg: RequestOrResponse) { - assert_eq!(Context::Inbound, reference.context()); - match msg { RequestOrResponse::Response(response) => { // This is an inbound response, remember its `originator_reply_callback`, so @@ -1401,8 +1485,6 @@ impl CanisterQueues { msg: RequestOrResponse, input_queue_type_fn: impl Fn(&CanisterId) -> InputQueueType, ) { - assert_eq!(Context::Outbound, reference.context()); - let remote = msg.receiver(); let (input_queue, output_queue) = self .canister_queues @@ -1641,6 +1723,7 @@ impl From<&CanisterQueues> for pb_queues::CanisterQueues { } else { None }, + expired_callbacks: callback_references_to_proto(&item.store.expired_callbacks), shed_responses: callback_references_to_proto(&item.store.shed_responses), next_input_source, local_sender_schedule, @@ -1672,6 +1755,7 @@ impl TryFrom<(pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics)> for Can }) .collect() } + let expired_callbacks = callback_references_try_from_proto(item.expired_callbacks)?; let shed_responses = callback_references_try_from_proto(item.shed_responses)?; let mut enqueued_pool_messages = BTreeSet::new(); @@ -1733,6 +1817,7 @@ impl TryFrom<(pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics)> for Can let store = MessageStoreImpl { pool, + expired_callbacks, shed_responses, }; let callbacks_with_enqueued_response = store @@ -1855,6 +1940,14 @@ impl QueueStats { } } + /// Updates the stats to reflect the enqueueing of a "deadline expired" + /// reference into an input queue. + fn on_push_timeout_response(&mut self) { + // Pushing a response into an input queue, consume an input queue slot. + debug_assert!(self.input_queues_reserved_slots > 0); + self.input_queues_reserved_slots = self.input_queues_reserved_slots.saturating_sub(1); + } + /// Updates the stats to reflect the dropping of the given request from an input /// queue. fn on_drop_input_request(&mut self, request: &Request) { diff --git a/rs/replicated_state/src/canister_state/queues/message_pool.rs b/rs/replicated_state/src/canister_state/queues/message_pool.rs index 759eb6cd7f1..e1e1b2ace19 100644 --- a/rs/replicated_state/src/canister_state/queues/message_pool.rs +++ b/rs/replicated_state/src/canister_state/queues/message_pool.rs @@ -117,6 +117,18 @@ impl Id { Class::BestEffort } } + + /// Tests whether this `Id` represents an inbound best-effort response. + fn is_inbound_best_effort_response(&self) -> bool { + self.0 & (Context::BIT | Class::BIT | Kind::BIT) + == (Context::Inbound as u64 | Class::BestEffort as u64 | Kind::Response as u64) + } + + /// Tests whether this `Id` represents an outbound guaranteed-response request. + fn is_outbound_guaranteed_request(&self) -> bool { + self.0 & (Context::BIT | Class::BIT | Kind::BIT) + == (Context::Outbound as u64 | Class::GuaranteedResponse as u64 | Kind::Request as u64) + } } /// A typed reference -- inbound (`CanisterInput`) or outbound @@ -142,13 +154,19 @@ impl Reference { Id::from(self).kind() } - pub(super) fn context(&self) -> Context { + fn context(&self) -> Context { Id::from(self).context() } - pub(super) fn class(&self) -> Class { + #[allow(dead_code)] + fn class(&self) -> Class { Id::from(self).class() } + + /// Tests whether this is a reference to an inbound best-effort response. + pub(super) fn is_inbound_best_effort_response(&self) -> bool { + Id::from(self).is_inbound_best_effort_response() + } } impl Clone for Reference { @@ -324,10 +342,7 @@ impl TryFrom for CallbackReferenc type Error = ProxyDecodeError; fn try_from(item: pb_queues::canister_queues::CallbackReference) -> Result { let reference = Reference(item.id, PhantomData); - if reference.context() == Context::Inbound - && reference.class() == Class::BestEffort - && reference.kind() == Kind::Response - { + if reference.is_inbound_best_effort_response() { Ok(CallbackReference(reference, item.callback_id.into())) } else { Err(ProxyDecodeError::Other( @@ -411,6 +426,14 @@ impl MessagePool { self.insert_impl(msg, actual_deadline, Context::Inbound) } + /// Reserves an `InboundReference` for a timeout reject response for a + /// best-effort callback. + /// + /// This is equivalent to inserting and then immediately removing the response. + pub(super) fn make_inbound_timeout_response_reference(&mut self) -> InboundReference { + self.next_reference(Class::BestEffort, Kind::Response) + } + /// Inserts an outbound request (one that is to be enqueued in an output queue) /// into the pool. Returns the reference assigned to the request. /// @@ -640,7 +663,9 @@ impl MessagePool { .into_iter() .map(|(_, id)| { let msg = self.take_impl(id).unwrap(); - self.outbound_guaranteed_request_deadlines.remove(&id); + if id.is_outbound_guaranteed_request() { + self.outbound_guaranteed_request_deadlines.remove(&id); + } self.remove_from_size_queue(id, &msg); (id.into(), msg) }) @@ -736,10 +761,7 @@ impl MessagePool { // guaranteed response requests (and nothing else). let mut expected_outbound_guaranteed_request_ids = BTreeSet::new(); self.messages.keys().for_each(|id| { - if id.context() == Context::Outbound - && id.class() == Class::GuaranteedResponse - && id.kind() == Kind::Request - { + if id.is_outbound_guaranteed_request() { expected_outbound_guaranteed_request_ids.insert(id); } }); diff --git a/rs/replicated_state/src/canister_state/queues/message_pool/tests.rs b/rs/replicated_state/src/canister_state/queues/message_pool/tests.rs index 4efd4d07428..9c17224d0e1 100644 --- a/rs/replicated_state/src/canister_state/queues/message_pool/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/message_pool/tests.rs @@ -135,6 +135,33 @@ fn test_get() { #[test] fn test_take() { + fn test_take_impl( + request_id: Reference, + response_id: Reference, + request: Request, + response: Response, + pool: &mut MessagePool, + ) { + let request: RequestOrResponse = request.into(); + let response: RequestOrResponse = response.into(); + + // Ensure that the messages are now in the pool. + assert_eq!(Some(&request), pool.get(request_id)); + assert_eq!(Some(&response), pool.get(response_id)); + + // Actually take the messages. + assert_eq!(Some(request), pool.take(request_id)); + assert_eq!(Some(response), pool.take(response_id)); + + // Messages are gone. + assert_eq!(None, pool.get(request_id)); + assert_eq!(None, pool.get(response_id)); + + // And cannot be taken out again. + assert_eq!(None, pool.take(request_id)); + assert_eq!(None, pool.take(response_id)); + } + let mut pool = MessagePool::default(); for deadline in [NO_DEADLINE, time(13)] { @@ -155,33 +182,6 @@ fn test_take() { test_take_impl(request_id, response_id, request, response, &mut pool); } } - - fn test_take_impl( - request_id: Reference, - response_id: Reference, - request: Request, - response: Response, - pool: &mut MessagePool, - ) { - let request: RequestOrResponse = request.into(); - let response: RequestOrResponse = response.into(); - - // Ensure that the messages are now in the pool. - assert_eq!(Some(&request), pool.get(request_id)); - assert_eq!(Some(&response), pool.get(response_id)); - - // Actually take the messages. - assert_eq!(Some(request), pool.take(request_id)); - assert_eq!(Some(response), pool.take(response_id)); - - // Messages are gone. - assert_eq!(None, pool.get(request_id)); - assert_eq!(None, pool.get(response_id)); - - // And cannot be taken out again. - assert_eq!(None, pool.take(request_id)); - assert_eq!(None, pool.take(response_id)); - } } } @@ -562,6 +562,10 @@ fn test_id_from_reference_roundtrip() { assert_eq!(reference.0, id.0); assert_eq!(id, reference.into()); assert_eq!(SomeReference::Inbound(reference), SomeReference::from(id)); + assert_eq!( + (id.context(), id.class(), id.kind()), + (reference.context(), reference.class(), reference.kind()) + ); // Outbound. let reference = OutboundReference::new(class, kind, 13); @@ -569,6 +573,57 @@ fn test_id_from_reference_roundtrip() { assert_eq!(reference.0, id.0); assert_eq!(id, reference.into()); assert_eq!(SomeReference::Outbound(reference), SomeReference::from(id)); + assert_eq!( + (id.context(), id.class(), id.kind()), + (reference.context(), reference.class(), reference.kind()) + ); + } + } +} + +#[test] +fn test_is_inbound_best_effort_response() { + use Class::*; + use Kind::*; + + for kind in [Request, Response] { + for class in [GuaranteedResponse, BestEffort] { + let reference = InboundReference::new(class, kind, 13); + let id = Id::from(reference); + assert_eq!( + class == BestEffort && kind == Response, + id.is_inbound_best_effort_response() + ); + assert_eq!( + class == BestEffort && kind == Response, + reference.is_inbound_best_effort_response() + ); + + let reference = OutboundReference::new(class, kind, 13); + let id = Id::from(reference); + assert!(!id.is_inbound_best_effort_response()); + assert!(!reference.is_inbound_best_effort_response()); + } + } +} + +#[test] +fn test_is_outbound_guaranteed_request() { + use Class::*; + use Kind::*; + + for kind in [Request, Response] { + for class in [GuaranteedResponse, BestEffort] { + let reference = InboundReference::new(class, kind, 13); + let id = Id::from(reference); + assert!(!id.is_outbound_guaranteed_request()); + + let reference = OutboundReference::new(class, kind, 13); + let id = Id::from(reference); + assert_eq!( + class == GuaranteedResponse && kind == Request, + id.is_outbound_guaranteed_request() + ); } } } diff --git a/rs/replicated_state/src/canister_state/queues/tests.rs b/rs/replicated_state/src/canister_state/queues/tests.rs index 60c6e49c3a7..b456adfd257 100644 --- a/rs/replicated_state/src/canister_state/queues/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/tests.rs @@ -73,6 +73,20 @@ impl CanisterQueuesFixture { ) } + fn try_push_deadline_expired_input(&mut self) -> Result<(), String> { + self.last_callback_id += 1; + self.queues.try_push_deadline_expired_input( + CallbackId::from(self.last_callback_id), + &self.other, + &self.this, + &BTreeMap::new(), + ) + } + + fn peek_input(&mut self) -> Option { + self.queues.peek_input() + } + fn pop_input(&mut self) -> Option { self.queues.pop_input() } @@ -456,6 +470,115 @@ fn test_available_output_request_slots() { ); } +#[test] +fn test_deadline_expired_input() { + let mut fixture = CanisterQueuesFixture::new(); + + // Enqueue a "deadline expired" compact reject response. + fixture.push_output_request().unwrap(); + fixture.pop_output().unwrap(); + fixture.try_push_deadline_expired_input().unwrap(); + + // We have one input (compact) response. + assert_eq!(1, fixture.queues.input_queues_message_count()); + assert_eq!(1, fixture.queues.input_queues_response_count()); + assert_eq!(0, fixture.queues.input_queues_reserved_slots()); + assert!(fixture.queues.has_input()); + assert!(!fixture.queues.has_output()); + assert!(!fixture.queues.store.is_empty()); + + // Peek, then pop the "deadline expired" compact reject response. This also + // implicitly checks that the input schedule was correctly updated. + let expected_callback_id = CallbackId::from(fixture.last_callback_id); + assert_eq!( + Some(CanisterInput::DeadlineExpired(expected_callback_id)), + fixture.peek_input() + ); + assert_eq!( + Some(CanisterInput::DeadlineExpired(expected_callback_id)), + fixture.pop_input() + ); + + // No inputs and no outputs left. + assert_eq!(0, fixture.queues.input_queues_message_count()); + assert_eq!(0, fixture.queues.input_queues_response_count()); + assert_eq!(0, fixture.queues.input_queues_reserved_slots()); + assert!(!fixture.queues.has_input()); + assert!(!fixture.queues.has_output()); + assert!(fixture.queues.store.is_empty()); +} + +#[test] +fn test_try_push_deadline_expired_input_no_queue() { + let mut fixture = CanisterQueuesFixture::new(); + + // Pushing a deadline expired input into a non-existent queue signals a bug. + assert_eq!( + "No input queue for expired callback: 1", + fixture.try_push_deadline_expired_input().unwrap_err() + ); +} + +#[test] +fn test_try_push_deadline_expired_input_no_reserved_slot() { + let mut fixture = CanisterQueuesFixture::new(); + + // Enqueue an input request, to create the input queue. + fixture.push_input_request().unwrap(); + + // Pushing a deadline expired input without a reserved slot signals a bug. + assert_eq!( + "No reserved response slot for expired callback: 1", + fixture.try_push_deadline_expired_input().unwrap_err() + ); +} + +#[test] +fn test_try_push_deadline_expired_input_with_same_callback_id() { + let mut fixture = CanisterQueuesFixture::new(); + + // Push an input response. + fixture.push_output_request().unwrap(); + fixture.pop_output().unwrap(); + fixture.push_input_response().unwrap(); + + // Sanity check. + assert_eq!(1, fixture.queues.input_queues_message_count()); + assert_eq!(1, fixture.queues.input_queues_response_count()); + assert_eq!(0, fixture.queues.input_queues_reserved_slots()); + assert!(fixture.queues.has_input()); + assert!(!fixture.queues.store.is_empty()); + + // Pushing a deadline expired input with the same callback ID is a no-op. + let callback_id = fixture.last_callback_id.into(); + fixture + .queues + .try_push_deadline_expired_input( + callback_id, + &fixture.other, + &fixture.this, + &BTreeMap::new(), + ) + .unwrap(); + + // Nothing has changed. + assert_eq!(1, fixture.queues.input_queues_message_count()); + assert_eq!(1, fixture.queues.input_queues_response_count()); + assert_eq!(0, fixture.queues.input_queues_reserved_slots()); + assert!(fixture.queues.has_input()); + assert!(!fixture.queues.store.is_empty()); + + // Pop the response. + assert_matches!(fixture.pop_input(), Some(CanisterInput::Response(_))); + + // Nothing left. + assert_eq!(0, fixture.queues.input_queues_message_count()); + assert_eq!(0, fixture.queues.input_queues_response_count()); + assert_eq!(0, fixture.queues.input_queues_reserved_slots()); + assert!(!fixture.queues.has_input()); + assert!(fixture.queues.store.is_empty()); +} + #[test] fn test_shed_largest_message() { let this = canister_test_id(13); @@ -1775,11 +1898,14 @@ fn encode_non_default_pool() { } /// Constructs an encoded `CanisterQueues` with 2 inbound responses (callbacks 1 -/// and 2) and one shed inbound response (callback 3). +/// and 2), one shed inbound response (callback 3) and one expired callback +/// response (4). fn canister_queues_proto_with_inbound_responses() -> pb_queues::CanisterQueues { let mut queues = CanisterQueues::default(); - // Make 3 input queue reservations. + let canister_id = canister_test_id(13); + + // Make 4 input queue reservations. let deadline = coarse_time(1); queues .push_output_request(request(1, NO_DEADLINE).into(), UNIX_EPOCH) @@ -1790,7 +1916,10 @@ fn canister_queues_proto_with_inbound_responses() -> pb_queues::CanisterQueues { queues .push_output_request(request(3, deadline).into(), UNIX_EPOCH) .unwrap(); - assert_eq!(3, queues.output_into_iter().count()); + queues + .push_output_request(request(4, deadline).into(), UNIX_EPOCH) + .unwrap(); + assert_eq!(4, queues.output_into_iter().count()); // Enqueue 3 inbound responses. queues @@ -1802,9 +1931,12 @@ fn canister_queues_proto_with_inbound_responses() -> pb_queues::CanisterQueues { queues .push_input(response(3, deadline).into(), LocalSubnet) .unwrap(); + queues + .try_push_deadline_expired_input(4.into(), &canister_id, &canister_id, &BTreeMap::new()) + .unwrap(); // Shed the response for callback 3. - assert!(queues.shed_largest_message(&canister_test_id(13), &BTreeMap::new())); + assert!(queues.shed_largest_message(&canister_id, &BTreeMap::new())); assert_eq!( Some(&CallbackId::from(3)), queues.store.shed_responses.values().next() @@ -1838,7 +1970,7 @@ fn decode_with_duplicate_response_callback_in_pool() { assert_matches!( CanisterQueues::try_from((encoded, &StrictMetrics as &dyn CheckpointLoadingMetrics)), - Err(ProxyDecodeError::Other(msg)) if &msg == "CanisterQueues: Duplicate inbound response callback(s): [1, 1, 3]" + Err(ProxyDecodeError::Other(msg)) if &msg == "CanisterQueues: Duplicate inbound response callback: 1" ); } @@ -1853,7 +1985,23 @@ fn decode_with_duplicate_response_callback_in_shed_responses() { assert_matches!( CanisterQueues::try_from((encoded, &StrictMetrics as &dyn CheckpointLoadingMetrics)), - Err(ProxyDecodeError::Other(msg)) if &msg == "CanisterQueues: Duplicate inbound response callback(s): [1, 2, 1]" + Err(ProxyDecodeError::Other(msg)) if &msg == "CanisterQueues: Duplicate inbound response callback: 1" + ); +} + +#[test] +fn decode_with_duplicate_response_callback_in_expired_callbacks() { + let mut encoded = canister_queues_proto_with_inbound_responses(); + + // Have the callback ID of the expired callback match that of one of the + // responses. + for expired_callback in &mut encoded.expired_callbacks { + expired_callback.callback_id = 1; + } + + assert_matches!( + CanisterQueues::try_from((encoded, &StrictMetrics as &dyn CheckpointLoadingMetrics)), + Err(ProxyDecodeError::Other(msg)) if &msg == "CanisterQueues: Duplicate inbound response callback: 1" ); } @@ -1870,7 +2018,7 @@ fn decode_with_duplicate_reference() { let metrics = CountingMetrics(RefCell::new(0)); assert_matches!( CanisterQueues::try_from((encoded, &metrics as &dyn CheckpointLoadingMetrics)), - Err(ProxyDecodeError::Other(msg)) if &msg == "CanisterQueues: Duplicate inbound response callback(s): [1, 3, 3]" + Err(ProxyDecodeError::Other(msg)) if &msg == "CanisterQueues: Duplicate inbound response callback: 3" ); // A critical error should also have been observed. assert_eq!(1, *metrics.0.borrow()); @@ -1892,6 +2040,39 @@ fn decode_with_both_response_and_shed_response_for_reference() { ); } +#[test] +fn decode_with_both_response_and_expired_callback_for_reference() { + let mut encoded = canister_queues_proto_with_inbound_responses(); + + // Make the the shed response have the same reference as one of the responses. + let input_queue = encoded.canister_queues[0].input_queue.as_ref().unwrap(); + let response_id = input_queue.queue[1]; + for expired_callback in &mut encoded.expired_callbacks { + expired_callback.id = response_id; + } + + assert_matches!( + CanisterQueues::try_from((encoded, &StrictMetrics as &dyn CheckpointLoadingMetrics)), + Err(ProxyDecodeError::Other(msg)) if msg.contains("CanisterQueues: Multiple responses for Reference(") + ); +} + +#[test] +fn decode_with_both_shed_response_and_expired_callback_for_reference() { + let mut encoded = canister_queues_proto_with_inbound_responses(); + + // Make the the expired callback have the same reference as the shed response. + let response_id = encoded.shed_responses[0].id; + for expired_callback in &mut encoded.expired_callbacks { + expired_callback.id = response_id; + } + + assert_matches!( + CanisterQueues::try_from((encoded, &StrictMetrics as &dyn CheckpointLoadingMetrics)), + Err(ProxyDecodeError::Other(msg)) if msg.contains("CanisterQueues: Multiple responses for Reference(") + ); +} + #[test] fn decode_with_unreferenced_inbound_response() { let mut encoded = canister_queues_proto_with_inbound_responses(); @@ -1904,7 +2085,7 @@ fn decode_with_unreferenced_inbound_response() { let metrics = CountingMetrics(RefCell::new(0)); assert_matches!( CanisterQueues::try_from((encoded, &metrics as &dyn CheckpointLoadingMetrics)), - Err(ProxyDecodeError::Other(msg)) if &msg == "CanisterQueues: Have 3 inbound responses, but only 2 are enqueued" + Err(ProxyDecodeError::Other(msg)) if &msg == "CanisterQueues: Have 4 inbound responses, but only 3 are enqueued" ); // A critical error should also have been observed. assert_eq!(1, *metrics.0.borrow()); @@ -1921,58 +2102,26 @@ fn decode_with_unreferenced_shed_response() { assert_matches!( CanisterQueues::try_from((encoded, &StrictMetrics as &dyn CheckpointLoadingMetrics)), - Err(ProxyDecodeError::Other(msg)) if &msg == "CanisterQueues: Have 3 inbound responses, but only 2 are enqueued" + Err(ProxyDecodeError::Other(msg)) if &msg == "CanisterQueues: Have 4 inbound responses, but only 3 are enqueued" ); } #[test] -fn decode_with_duplicate_inbound_response() { - let mut queues = CanisterQueues::default(); - - // Make 2 input queue reservations. - queues - .push_output_request(request(1, NO_DEADLINE).into(), UNIX_EPOCH) - .unwrap(); - queues - .push_output_request(request(2, SOME_DEADLINE).into(), UNIX_EPOCH) - .unwrap(); - assert_eq!(2, queues.output_into_iter().count()); - - // Enqueue 2 inbound responses. - queues - .push_input(response(1, NO_DEADLINE).into(), LocalSubnet) - .unwrap(); - queues - .push_input(response(2, SOME_DEADLINE).into(), LocalSubnet) - .unwrap(); - - // Sanity check: roundtrip encode succeeds. - let mut encoded: pb_queues::CanisterQueues = (&queues).into(); - let decoded = ( - encoded.clone(), - &StrictMetrics as &dyn CheckpointLoadingMetrics, - ) - .try_into() - .unwrap(); - assert_eq!(queues, decoded); +fn decode_with_unreferenced_expired_callback() { + let mut encoded = canister_queues_proto_with_inbound_responses(); - // Tweak the encoded queues so both responses have the same `CallbackId`. - for entry in &mut encoded.pool.as_mut().unwrap().messages { - let message = entry.message.as_mut().unwrap().r.as_mut().unwrap(); - let pb_queues::request_or_response::R::Response(ref mut response) = message else { - panic!("Expected only responses"); - }; - response.originator_reply_callback = 1; - } + // Remove the reference to the fourth (expired callback) response. + let input_queue = encoded.canister_queues[0].input_queue.as_mut().unwrap(); + input_queue.queue.remove(3); - // Decoding should now fail because of the duplicate `CallbackId`. - let err = CanisterQueues::try_from((encoded, &StrictMetrics as &dyn CheckpointLoadingMetrics)) - .unwrap_err(); - assert_matches!(err, ProxyDecodeError::Other(msg) if &msg == "CanisterQueues: Duplicate inbound response callback(s): [1, 1]"); + assert_matches!( + CanisterQueues::try_from((encoded, &StrictMetrics as &dyn CheckpointLoadingMetrics)), + Err(ProxyDecodeError::Other(msg)) if &msg == "CanisterQueues: Have 4 inbound responses, but only 3 are enqueued" + ); } #[test] -fn decode_duplicate_inbound_response() { +fn decode_with_duplicate_inbound_response() { let mut queues = CanisterQueues::default(); // Make 2 input queue reservations. @@ -2014,7 +2163,7 @@ fn decode_duplicate_inbound_response() { // Decoding should now fail because of the duplicate `CallbackId`. let err = CanisterQueues::try_from((encoded, &StrictMetrics as &dyn CheckpointLoadingMetrics)) .unwrap_err(); - assert_matches!(err, ProxyDecodeError::Other(msg) if &msg == "CanisterQueues: Duplicate inbound response callback(s): [1, 1]"); + assert_matches!(err, ProxyDecodeError::Other(msg) if &msg == "CanisterQueues: Duplicate inbound response callback: 1"); } #[test] diff --git a/rs/replicated_state/src/canister_state/system_state.rs b/rs/replicated_state/src/canister_state/system_state.rs index 13c5af7faea..9e00a435735 100644 --- a/rs/replicated_state/src/canister_state/system_state.rs +++ b/rs/replicated_state/src/canister_state/system_state.rs @@ -278,7 +278,7 @@ pub struct SystemState { pub controllers: BTreeSet, pub canister_id: CanisterId, // This must remain private, in order to properly enforce system states (running, stopping, - // stopped) when enqueuing inputs; and to ensure message memory reservations are accurate. + // stopped) when enqueueing inputs; and to ensure message memory reservations are accurate. #[validate_eq(CompareWithValidateEq)] queues: CanisterQueues, /// The canister's memory allocation. @@ -1451,6 +1451,67 @@ impl SystemState { .time_out_messages(current_time, own_canister_id, local_canisters) } + /// Enqueues "deadline expired" references for all expired best-effort callbacks + /// without a response. + /// + /// Returns `Err` if a `SystemState` internal inconsistency prevented one or + /// more "deadline expired" references from being enqueued. + pub fn time_out_callbacks( + &mut self, + current_time: CoarseTime, + own_canister_id: &CanisterId, + local_canisters: &BTreeMap, + ) -> Result<(), Vec> { + if self.status == CanisterStatus::Stopped { + // Stopped canisters have no call context manager, so no callbacks. + return Ok(()); + } + + let aborted_or_paused_callback_id = self + .aborted_or_paused_response() + .map(|response| response.originator_reply_callback); + + // Safe to unwrap because we just checked that the status is not `Stopped`. + let call_context_manager = call_context_manager_mut(&mut self.status).unwrap(); + + let mut errors = Vec::new(); + let expired_callbacks = call_context_manager + .expire_callbacks(current_time) + .collect::>(); + for callback_id in expired_callbacks { + if Some(callback_id) == aborted_or_paused_callback_id { + // This callback is already executing, don't produce a second response for it. + continue; + } + + // Safe to unwrap because this is a callback ID we just got from the + // `CallContextManager`. + let callback = call_context_manager.callbacks().get(&callback_id).unwrap(); + self.queues + .try_push_deadline_expired_input( + callback_id, + &callback.respondent, + own_canister_id, + local_canisters, + ) + .unwrap_or_else(|err_str| { + errors.push(StateError::NonMatchingResponse { + err_str, + originator: callback.originator, + callback_id, + respondent: callback.respondent, + deadline: callback.deadline, + }) + }); + } + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } + } + /// Re-partitions the local and remote input schedules of `self.queues` /// following a canister migration, based on the updated set of local canisters. /// @@ -1718,6 +1779,20 @@ pub(crate) fn push_input( res } +fn call_context_manager_mut(status: &mut CanisterStatus) -> Option<&mut CallContextManager> { + match status { + CanisterStatus::Running { + call_context_manager, + } + | CanisterStatus::Stopping { + call_context_manager, + .. + } => Some(call_context_manager), + + CanisterStatus::Stopped => None, + } +} + pub mod testing { use super::*; diff --git a/rs/replicated_state/src/canister_state/system_state/call_context_manager.rs b/rs/replicated_state/src/canister_state/system_state/call_context_manager.rs index 3eed309a8bc..d73bb1514df 100644 --- a/rs/replicated_state/src/canister_state/system_state/call_context_manager.rs +++ b/rs/replicated_state/src/canister_state/system_state/call_context_manager.rs @@ -276,7 +276,7 @@ impl CallContextManagerStats { /// Calculates the stats for the given call contexts and callbacks. /// - /// Time complexity: `O(|call_contexts| + |callbacks|)`. + /// Time complexity: `O(n)`. pub(crate) fn calculate_stats( call_contexts: &BTreeMap, callbacks: &BTreeMap>, @@ -827,7 +827,6 @@ impl CallContextManager { /// current time. /// /// Note: A given callback ID will be returned at most once by this function. - #[allow(dead_code)] pub(super) fn expire_callbacks(&mut self, now: CoarseTime) -> impl Iterator { const MIN_CALLBACK_ID: CallbackId = CallbackId::new(0); diff --git a/rs/replicated_state/src/replicated_state.rs b/rs/replicated_state/src/replicated_state.rs index 111feeb7675..cd72abb449a 100644 --- a/rs/replicated_state/src/replicated_state.rs +++ b/rs/replicated_state/src/replicated_state.rs @@ -284,7 +284,7 @@ impl StateError { } /// Creates a `StateError::CanisterNotFound` variant with the given error - /// mnessage for the given `Response`. + /// message for the given `Response`. pub fn non_matching_response(err_str: impl ToString, response: &Response) -> Self { Self::NonMatchingResponse { err_str: err_str.to_string(), diff --git a/rs/replicated_state/tests/system_state.rs b/rs/replicated_state/tests/system_state.rs index bc64cadec62..cb1577e7e80 100644 --- a/rs/replicated_state/tests/system_state.rs +++ b/rs/replicated_state/tests/system_state.rs @@ -1,20 +1,21 @@ +use assert_matches::assert_matches; use ic_base_types::{NumBytes, NumSeconds}; +use ic_error_types::RejectCode; use ic_registry_subnet_type::SubnetType; -use ic_replicated_state::{ - canister_state::DEFAULT_QUEUE_CAPACITY, - testing::{CanisterQueuesTesting, SystemStateTesting}, - InputQueueType, StateError, SystemState, +use ic_replicated_state::canister_state::system_state::PausedExecutionId; +use ic_replicated_state::canister_state::DEFAULT_QUEUE_CAPACITY; +use ic_replicated_state::testing::{CanisterQueuesTesting, SystemStateTesting}; +use ic_replicated_state::{CallOrigin, ExecutionTask, InputQueueType, StateError, SystemState}; +use ic_test_utilities_types::ids::{canister_test_id, user_test_id}; +use ic_test_utilities_types::messages::{RequestBuilder, ResponseBuilder}; +use ic_types::messages::{ + CallbackId, CanisterMessage, CanisterMessageOrTask, Payload, RejectContext, Request, + RequestMetadata, RequestOrResponse, Response, MAX_RESPONSE_COUNT_BYTES, }; -use ic_test_utilities_types::{ - ids::{canister_test_id, user_test_id}, - messages::{RequestBuilder, ResponseBuilder}, -}; -use ic_types::{ - messages::{CanisterMessage, Request, RequestOrResponse, Response, MAX_RESPONSE_COUNT_BYTES}, - time::{CoarseTime, UNIX_EPOCH}, - CanisterId, Cycles, -}; -use std::sync::Arc; +use ic_types::methods::{Callback, WasmClosure}; +use ic_types::time::{CoarseTime, UNIX_EPOCH}; +use ic_types::{CanisterId, Cycles, Time}; +use std::{collections::BTreeMap, sync::Arc}; /// Figure out how many cycles a canister should have so that it can support the /// given amount of storage for the given amount of time, given the storage fee. @@ -52,6 +53,25 @@ fn default_output_response() -> Arc { .into() } +fn output_request(deadline: CoarseTime) -> Arc { + RequestBuilder::default() + .sender(CANISTER_ID) + .receiver(OTHER_CANISTER_ID) + .deadline(deadline) + .build() + .into() +} + +fn input_response(callback_id: CallbackId, deadline: CoarseTime) -> RequestOrResponse { + ResponseBuilder::default() + .respondent(OTHER_CANISTER_ID) + .originator(CANISTER_ID) + .originator_reply_callback(callback_id) + .deadline(deadline) + .build() + .into() +} + fn default_request_to_self() -> Arc { RequestBuilder::default() .sender(CANISTER_ID) @@ -141,6 +161,22 @@ impl SystemStateFixture { SubnetType::Application, ); } + + /// Times out all callbacks with deadlines before `current_time`. Returns the + /// number of expired callbacks. + fn time_out_callbacks( + &mut self, + current_time: CoarseTime, + ) -> (usize, Result<(), Vec>) { + let input_responses_before = self.system_state.queues().input_queues_response_count(); + // TODO: Add test for `time_out_callbacks()` returning an error. + let result = + self.system_state + .time_out_callbacks(current_time, &CANISTER_ID, &BTreeMap::new()); + let input_responses_after = self.system_state.queues().input_queues_response_count(); + + (input_responses_after - input_responses_before, result) + } } #[test] @@ -365,3 +401,150 @@ fn induct_messages_to_self_full_queue() { assert_eq!(None, fixture.pop_input()); assert_eq!(0, fixture.system_state.queues().output_message_count()); } + +/// Registers a callback with the given deadline. +fn register_callback(fixture: &mut SystemStateFixture, deadline: CoarseTime) -> CallbackId { + let call_context_manager = fixture.system_state.call_context_manager_mut().unwrap(); + let time = Time::from_nanos_since_unix_epoch(1); + let call_context_id = call_context_manager.new_call_context( + CallOrigin::SystemTask, + Cycles::zero(), + time, + RequestMetadata::new(0, time), + ); + + call_context_manager.register_callback(Callback::new( + call_context_id, + CANISTER_ID, + OTHER_CANISTER_ID, + Cycles::zero(), + Cycles::new(42), + Cycles::new(84), + WasmClosure::new(0, 2), + WasmClosure::new(0, 2), + None, + deadline, + )) +} + +/// Simulates an outbound call with the given deadline, by registering a +/// callback and reserving a response slot. +fn simulate_outbound_call(fixture: &mut SystemStateFixture, deadline: CoarseTime) -> CallbackId { + // Reserve a response slot. + fixture + .push_output_request(output_request(deadline)) + .unwrap(); + fixture.pop_output().unwrap(); + + // Register a callback. + register_callback(fixture, deadline) +} + +#[test] +fn time_out_callbacks() { + let mut fixture = SystemStateFixture::running(); + + let deadline_expired_reject_payload = Payload::Reject(RejectContext::new( + RejectCode::SysUnknown, + "Call deadline has expired.", + )); + + let d1 = CoarseTime::from_secs_since_unix_epoch(1); + let d2 = CoarseTime::from_secs_since_unix_epoch(2); + let d3 = CoarseTime::from_secs_since_unix_epoch(3); + + let c1 = simulate_outbound_call(&mut fixture, d1); + let c2 = simulate_outbound_call(&mut fixture, d1); + let c3 = simulate_outbound_call(&mut fixture, d1); + let c4 = simulate_outbound_call(&mut fixture, d2); + + // Simulate a paused execution for `c1`. + fixture + .push_input(input_response(c1, d1), InputQueueType::RemoteSubnet) + .unwrap(); + let response1 = fixture.pop_input().unwrap(); + fixture + .system_state + .task_queue + .push_front(ExecutionTask::PausedExecution { + id: PausedExecutionId(1), + input: CanisterMessageOrTask::Message(response1), + }); + + // And enqueue a response for `c2`. + fixture + .push_input(input_response(c2, d1), InputQueueType::RemoteSubnet) + .unwrap(); + + // Time out callbacks with deadlines before `d2` (only applicable to `c3` now). + assert_eq!((1, Ok(())), fixture.time_out_callbacks(d2)); + + // Pop the response for `c2`. + assert_matches!( + fixture.pop_input(), + Some(CanisterMessage::Response(response)) + if response.originator_reply_callback == c2 + ); + + // Pop the reject response for `c3`. + assert_matches!( + fixture.pop_input(), + Some(CanisterMessage::Response(response)) + if response.originator_reply_callback == c3 && response.response_payload == deadline_expired_reject_payload + ); + assert_eq!(None, fixture.pop_input()); + + // Time out callbacks with deadlines before `d3` (i.e. `c4`). + assert_eq!((1, Ok(())), fixture.time_out_callbacks(d3)); + + // Pop the reject responses for `c4`. + assert_matches!( + fixture.pop_input(), + Some(CanisterMessage::Response(response)) + if response.originator_reply_callback == c4 && response.response_payload == deadline_expired_reject_payload + ); + assert_eq!(None, fixture.pop_input()); + + assert!(!fixture.system_state.has_input()); + assert!(!fixture.system_state.queues().has_output()); +} + +#[test] +fn time_out_callbacks_no_reserved_slot() { + let mut fixture = SystemStateFixture::running(); + + let deadline_expired_reject_payload = Payload::Reject(RejectContext::new( + RejectCode::SysUnknown, + "Call deadline has expired.", + )); + + let d1 = CoarseTime::from_secs_since_unix_epoch(1); + let d2 = CoarseTime::from_secs_since_unix_epoch(2); + + // Register 3 callbacks, but only make one slot reservation. + let c1 = simulate_outbound_call(&mut fixture, d1); + let c2 = register_callback(&mut fixture, d1); + let c3 = register_callback(&mut fixture, d1); + + // Time out callbacks with deadlines before `d2`. + let (expired_callbacks, result) = fixture.time_out_callbacks(d2); + + // Only one timeout reject for `c1` was enqueued before we ran out of slots. + assert_eq!(1, expired_callbacks); + assert_matches!( + fixture.pop_input(), + Some(CanisterMessage::Response(response)) + if response.originator_reply_callback == c1 && response.response_payload == deadline_expired_reject_payload + ); + assert_eq!(None, fixture.pop_input()); + + // The result should contain two errors: one for `c2` and one for `c3`. + assert_matches!(result, Err(_)); + let errors = result.unwrap_err(); + assert_eq!(2, errors.len()); + assert_matches!(errors[0], StateError::NonMatchingResponse { callback_id, deadline, .. } if callback_id == c2 && deadline == d1); + assert_matches!(errors[1], StateError::NonMatchingResponse { callback_id, deadline, .. } if callback_id == c3 && deadline == d1); + + assert!(!fixture.system_state.has_input()); + assert!(!fixture.system_state.queues().has_output()); +}