From 4dce50f1c8a3294320265246ee07494f4d61d4fa Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Sat, 28 Sep 2024 09:00:23 +0000 Subject: [PATCH 1/2] chore: [MR-560] Drop old canister queue implementations Both the proto and Rust definitions were kept around for backward compatibility (decoding canister queues encoded by old replica versions). Now that all of mainnet is running a replica version using and encoding the new CanisterQueues, it is safe to remove the old ones. --- rs/protobuf/def/state/queues/v1/queues.proto | 26 +- rs/protobuf/src/gen/state/state.queues.v1.rs | 31 - rs/protobuf/src/gen/types/state.queues.v1.rs | 31 - rs/protobuf/src/state/tests.rs | 28 +- .../src/canister_state/queues.rs | 138 +-- .../src/canister_state/queues/message_pool.rs | 6 - .../src/canister_state/queues/queue.rs | 978 +----------------- .../src/canister_state/queues/queue/tests.rs | 863 +--------------- .../src/canister_state/queues/tests.rs | 142 +-- 9 files changed, 63 insertions(+), 2180 deletions(-) diff --git a/rs/protobuf/def/state/queues/v1/queues.proto b/rs/protobuf/def/state/queues/v1/queues.proto index ab40f9b8965..f44dfe632cf 100644 --- a/rs/protobuf/def/state/queues/v1/queues.proto +++ b/rs/protobuf/def/state/queues/v1/queues.proto @@ -112,25 +112,6 @@ message MessageDeadline { uint64 index = 2; } -message InputOutputQueue { - repeated RequestOrResponse queue = 1; - uint64 begin = 2; - uint64 capacity = 3; - uint64 num_slots_reserved = 4; - // Ordered ranges of messages having the same request deadline. Each range - // is represented as a deadline and its end index (the `QueueIndex` just - // past the last request where the deadline applies). Both the deadlines and - // queue indices are strictly increasing. - repeated MessageDeadline deadline_range_ends = 5; - // Queue index from which request timing out will resume. - uint64 timeout_index = 6; -} - -message QueueEntry { - types.v1.CanisterId canister_id = 1; - InputOutputQueue queue = 2; -} - // A pool holding all of a canister's incoming and outgoing canister messages. message MessagePool { // A pool entry: a message keyed by its ID. @@ -178,14 +159,11 @@ message CanisterQueue { } message CanisterQueues { - reserved 1, 4; - reserved "canister_id", "input_schedule"; + reserved 1, 3, 4, 5; + reserved "canister_id", "input_queues", "input_schedule", "output_queues"; repeated ingress.v1.Ingress ingress_queue = 2; - repeated QueueEntry input_queues = 3; - repeated QueueEntry output_queues = 5; - // Input queue from and output queue to `canister_id`. message CanisterQueuePair { types.v1.CanisterId canister_id = 1; diff --git a/rs/protobuf/src/gen/state/state.queues.v1.rs b/rs/protobuf/src/gen/state/state.queues.v1.rs index 445952415ad..17862d66b9d 100644 --- a/rs/protobuf/src/gen/state/state.queues.v1.rs +++ b/rs/protobuf/src/gen/state/state.queues.v1.rs @@ -138,33 +138,6 @@ pub struct MessageDeadline { #[prost(uint64, tag = "2")] pub index: u64, } -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct InputOutputQueue { - #[prost(message, repeated, tag = "1")] - pub queue: ::prost::alloc::vec::Vec, - #[prost(uint64, tag = "2")] - pub begin: u64, - #[prost(uint64, tag = "3")] - pub capacity: u64, - #[prost(uint64, tag = "4")] - pub num_slots_reserved: u64, - /// Ordered ranges of messages having the same request deadline. Each range - /// is represented as a deadline and its end index (the `QueueIndex` just - /// past the last request where the deadline applies). Both the deadlines and - /// queue indices are strictly increasing. - #[prost(message, repeated, tag = "5")] - pub deadline_range_ends: ::prost::alloc::vec::Vec, - /// Queue index from which request timing out will resume. - #[prost(uint64, tag = "6")] - pub timeout_index: u64, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct QueueEntry { - #[prost(message, optional, tag = "1")] - pub canister_id: ::core::option::Option, - #[prost(message, optional, tag = "2")] - pub queue: ::core::option::Option, -} /// A pool holding all of a canister's incoming and outgoing canister messages. #[derive(Clone, PartialEq, ::prost::Message)] pub struct MessagePool { @@ -235,10 +208,6 @@ pub mod canister_queue { pub struct CanisterQueues { #[prost(message, repeated, tag = "2")] pub ingress_queue: ::prost::alloc::vec::Vec, - #[prost(message, repeated, tag = "3")] - pub input_queues: ::prost::alloc::vec::Vec, - #[prost(message, repeated, tag = "5")] - pub output_queues: ::prost::alloc::vec::Vec, #[prost(message, repeated, tag = "9")] pub canister_queues: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "10")] diff --git a/rs/protobuf/src/gen/types/state.queues.v1.rs b/rs/protobuf/src/gen/types/state.queues.v1.rs index 6b6e6e40ba2..5e6b2b21783 100644 --- a/rs/protobuf/src/gen/types/state.queues.v1.rs +++ b/rs/protobuf/src/gen/types/state.queues.v1.rs @@ -138,33 +138,6 @@ pub struct MessageDeadline { #[prost(uint64, tag = "2")] pub index: u64, } -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct InputOutputQueue { - #[prost(message, repeated, tag = "1")] - pub queue: ::prost::alloc::vec::Vec, - #[prost(uint64, tag = "2")] - pub begin: u64, - #[prost(uint64, tag = "3")] - pub capacity: u64, - #[prost(uint64, tag = "4")] - pub num_slots_reserved: u64, - /// Ordered ranges of messages having the same request deadline. Each range - /// is represented as a deadline and its end index (the `QueueIndex` just - /// past the last request where the deadline applies). Both the deadlines and - /// queue indices are strictly increasing. - #[prost(message, repeated, tag = "5")] - pub deadline_range_ends: ::prost::alloc::vec::Vec, - /// Queue index from which request timing out will resume. - #[prost(uint64, tag = "6")] - pub timeout_index: u64, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct QueueEntry { - #[prost(message, optional, tag = "1")] - pub canister_id: ::core::option::Option, - #[prost(message, optional, tag = "2")] - pub queue: ::core::option::Option, -} /// A pool holding all of a canister's incoming and outgoing canister messages. #[derive(Clone, PartialEq, ::prost::Message)] pub struct MessagePool { @@ -235,10 +208,6 @@ pub mod canister_queue { pub struct CanisterQueues { #[prost(message, repeated, tag = "2")] pub ingress_queue: ::prost::alloc::vec::Vec, - #[prost(message, repeated, tag = "3")] - pub input_queues: ::prost::alloc::vec::Vec, - #[prost(message, repeated, tag = "5")] - pub output_queues: ::prost::alloc::vec::Vec, #[prost(message, repeated, tag = "9")] pub canister_queues: ::prost::alloc::vec::Vec, #[prost(message, optional, tag = "10")] diff --git a/rs/protobuf/src/state/tests.rs b/rs/protobuf/src/state/tests.rs index 2b5698e2cb5..905ade94148 100644 --- a/rs/protobuf/src/state/tests.rs +++ b/rs/protobuf/src/state/tests.rs @@ -32,25 +32,25 @@ fn huge_proto_encoding_roundtrip() { deadline_seconds: 0, })), }; - // A queue of 2K requests with 2 MB payloads. - let queue = vec![msg; 2 << 10]; + let entry = message_pool::Entry { + id: 13, + message: Some(msg.clone()), + }; - let q = InputOutputQueue { - queue, - begin: 2197, - capacity: 500, - num_slots_reserved: 14, - deadline_range_ends: Vec::new(), - timeout_index: 15, + // A pool of 2K requests with 2 MB payloads. + let pool = MessagePool { + messages: vec![entry; 2 << 10], + outbound_guaranteed_request_deadlines: vec![], + message_id_generator: 42, }; let mut buf = vec![]; - q.encode(&mut buf).unwrap(); - // Expecting the encoded queue to be larger than 4 GB. + pool.encode(&mut buf).unwrap(); + // Expecting the encoded pool to be larger than 4 GB. assert!(buf.len() > 4 << 30); - let decoded_q = InputOutputQueue::decode(buf.as_slice()).unwrap(); + let decoded_pool = MessagePool::decode(buf.as_slice()).unwrap(); - // Ensure that decoding results in the same queue that we just encoded. - assert_eq!(q, decoded_q); + // Ensure that decoding results in the same pool that we just encoded. + assert_eq!(pool, decoded_pool); } diff --git a/rs/replicated_state/src/canister_state/queues.rs b/rs/replicated_state/src/canister_state/queues.rs index 07f9c305111..e7fe4b36c68 100644 --- a/rs/replicated_state/src/canister_state/queues.rs +++ b/rs/replicated_state/src/canister_state/queues.rs @@ -1494,8 +1494,6 @@ impl From<&CanisterQueues> for pb_queues::CanisterQueues { Self { ingress_queue: (&item.ingress_queue).into(), - input_queues: Default::default(), - output_queues: Default::default(), canister_queues: item .canister_queues .iter() @@ -1531,103 +1529,51 @@ impl TryFrom<(pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics)> for Can fn try_from( (item, metrics): (pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics), ) -> Result { - let mut canister_queues = BTreeMap::new(); - let mut pool = MessagePool::default(); - let mut shed_responses = BTreeMap::new(); - - if !item.input_queues.is_empty() || !item.output_queues.is_empty() { - // Backward compatibility: deserialize from `input_queues` and `output_queues`. - - if item.pool.is_some() || !item.canister_queues.is_empty() { - return Err(ProxyDecodeError::Other( - "Both `input_queues`/`output_queues` and `pool`/`canister_queues` are populated" - .to_string(), - )); - } - - if item.input_queues.len() != item.output_queues.len() { - return Err(ProxyDecodeError::Other(format!( - "CanisterQueues: Mismatched input ({}) and output ({}) queue lengths", - item.input_queues.len(), - item.output_queues.len() - ))); - } - for (ie, oe) in item - .input_queues - .into_iter() - .zip(item.output_queues.into_iter()) - { - if ie.canister_id != oe.canister_id { - return Err(ProxyDecodeError::Other(format!( - "CanisterQueues: Mismatched input {:?} and output {:?} queue entries", - ie.canister_id, oe.canister_id - ))); - } + let pool = MessagePool::try_from(item.pool.unwrap_or_default())?; + let shed_responses = item + .shed_responses + .into_iter() + .map(|sr| { + let sr = message_pool::CallbackReference::try_from(sr)?; + Ok((sr.0, sr.1)) + }) + .collect::>()?; - let canister_id = try_from_option_field(ie.canister_id, "QueueEntry::canister_id")?; - let original_iq: queue::InputQueue = - try_from_option_field(ie.queue, "QueueEntry::queue")?; - let original_oq: queue::OutputQueue = - try_from_option_field(oe.queue, "QueueEntry::queue")?; - let iq = (original_iq, &mut pool).try_into()?; - let oq = (original_oq, &mut pool).try_into()?; - - if canister_queues.insert(canister_id, (iq, oq)).is_some() { - metrics.observe_broken_soft_invariant(format!( - "CanisterQueues: Duplicate queues for canister {}", - canister_id - )); - } - } - } else { - pool = item.pool.unwrap_or_default().try_into()?; - shed_responses = item - .shed_responses - .into_iter() - .map(|sr| { - let sr = message_pool::CallbackReference::try_from(sr)?; - Ok((sr.0, sr.1)) - }) - .collect::>()?; + let mut enqueued_pool_messages = BTreeSet::new(); + let canister_queues = item + .canister_queues + .into_iter() + .map(|qp| { + let canister_id: CanisterId = + try_from_option_field(qp.canister_id, "CanisterQueuePair::canister_id")?; + let iq: CanisterQueue = try_from_option_field( + qp.input_queue.map(|q| (q, Context::Inbound)), + "CanisterQueuePair::input_queue", + )?; + let oq: CanisterQueue = try_from_option_field( + qp.output_queue.map(|q| (q, Context::Outbound)), + "CanisterQueuePair::output_queue", + )?; + + iq.iter().chain(oq.iter()).for_each(|&reference| { + if pool.get(reference).is_some() && !enqueued_pool_messages.insert(reference) { + metrics.observe_broken_soft_invariant(format!( + "CanisterQueues: {:?} enqueued more than once", + reference + )); + } + }); - let mut enqueued_pool_messages = BTreeSet::new(); - canister_queues = item - .canister_queues - .into_iter() - .map(|qp| { - let canister_id: CanisterId = - try_from_option_field(qp.canister_id, "CanisterQueuePair::canister_id")?; - let iq: CanisterQueue = try_from_option_field( - qp.input_queue.map(|q| (q, Context::Inbound)), - "CanisterQueuePair::input_queue", - )?; - let oq: CanisterQueue = try_from_option_field( - qp.output_queue.map(|q| (q, Context::Outbound)), - "CanisterQueuePair::output_queue", - )?; - - iq.iter().chain(oq.iter()).for_each(|&reference| { - if pool.get(reference).is_some() - && !enqueued_pool_messages.insert(reference) - { - metrics.observe_broken_soft_invariant(format!( - "CanisterQueues: {:?} enqueued more than once", - reference - )); - } - }); + Ok((canister_id, (iq, oq))) + }) + .collect::>()?; - Ok((canister_id, (iq, oq))) - }) - .collect::>()?; - - if enqueued_pool_messages.len() != pool.len() { - metrics.observe_broken_soft_invariant(format!( - "CanisterQueues: Pool holds {} messages, but only {} of them are enqueued", - pool.len(), - enqueued_pool_messages.len() - )); - } + if enqueued_pool_messages.len() != pool.len() { + metrics.observe_broken_soft_invariant(format!( + "CanisterQueues: Pool holds {} messages, but only {} of them are enqueued", + pool.len(), + enqueued_pool_messages.len() + )); } let queue_stats = Self::calculate_queue_stats( diff --git a/rs/replicated_state/src/canister_state/queues/message_pool.rs b/rs/replicated_state/src/canister_state/queues/message_pool.rs index e7d31f0fa57..80b0e3bbec3 100644 --- a/rs/replicated_state/src/canister_state/queues/message_pool.rs +++ b/rs/replicated_state/src/canister_state/queues/message_pool.rs @@ -488,12 +488,6 @@ impl MessagePool { self.messages.len() } - /// Returns the implicitly assigned deadlines of enqueued outbound guaranteed - /// response requests. - pub(super) fn outbound_guaranteed_request_deadlines(&self) -> &BTreeMap { - &self.outbound_guaranteed_request_deadlines - } - /// Returns a reference to the pool's message stats. pub(super) fn message_stats(&self) -> &MessageStats { &self.message_stats diff --git a/rs/replicated_state/src/canister_state/queues/queue.rs b/rs/replicated_state/src/canister_state/queues/queue.rs index 6e874ada432..0c797a7fe7d 100644 --- a/rs/replicated_state/src/canister_state/queues/queue.rs +++ b/rs/replicated_state/src/canister_state/queues/queue.rs @@ -1,14 +1,10 @@ -// TODO(MR-569) Remove when `CanisterQueues` has been updated to use this. -#![allow(dead_code)] - -use super::message_pool::{self, Context, Kind, MessagePool, REQUEST_LIFETIME}; +use super::message_pool::{self, Context, Kind}; use crate::StateError; use ic_base_types::CanisterId; use ic_protobuf::proxy::ProxyDecodeError; use ic_protobuf::state::{ingress::v1 as pb_ingress, queues::v1 as pb_queues}; -use ic_types::messages::{Ingress, Request, RequestOrResponse, Response, NO_DEADLINE}; -use ic_types::time::UNIX_EPOCH; -use ic_types::{CountBytes, Cycles, Time}; +use ic_types::messages::Ingress; +use ic_types::CountBytes; use ic_validate_eq::ValidateEq; use ic_validate_eq_derive::ValidateEq; use std::collections::{BTreeMap, VecDeque}; @@ -331,974 +327,6 @@ impl TryFrom<(pb_queues::CanisterQueue, Context)> for CanisterQueue { } } -impl TryFrom<(InputQueue, &mut MessagePool)> for CanisterQueue { - type Error = ProxyDecodeError; - - fn try_from((iq, pool): (InputQueue, &mut MessagePool)) -> Result { - let mut queue = VecDeque::with_capacity(iq.len()); - for msg in iq.queue.queue.into_iter() { - let reference = pool.insert_inbound(msg); - queue.push_back(reference); - } - - let queue = CanisterQueue { - queue, - capacity: iq.queue.capacity, - request_slots: iq.queue.num_request_slots, - response_slots: iq.queue.num_response_slots, - }; - queue - .check_invariants() - .map(|_| queue) - .map_err(ProxyDecodeError::Other) - } -} - -impl TryFrom<(OutputQueue, &mut MessagePool)> for CanisterQueue { - type Error = ProxyDecodeError; - - fn try_from((oq, pool): (OutputQueue, &mut MessagePool)) -> Result { - let mut deadline_range_ends = oq.deadline_range_ends.iter(); - let mut deadline_range_end = deadline_range_ends.next(); - - let mut queue = VecDeque::with_capacity(oq.num_messages); - let mut none_entries = 0; - for (i, msg) in oq.queue.queue.into_iter().enumerate() { - let msg = match msg { - Some(msg) => msg, - None => { - none_entries += 1; - continue; - } - }; - let reference = match msg { - RequestOrResponse::Request(req) => { - let enqueuing_time = if req.deadline == NO_DEADLINE { - // Safe to unwrap because `OutputQueue` ensures that every request is covered by - // a deadline range. - while deadline_range_end.unwrap().1 <= i + oq.begin { - deadline_range_end = deadline_range_ends.next(); - } - // Reconstruct the time when the request was enqueued. - deadline_range_end - .unwrap() - .0 - .checked_sub(REQUEST_LIFETIME) - .unwrap() - } else { - // Irrelevant for best-effort messages, they have explicit deadlines. - UNIX_EPOCH - }; - pool.insert_outbound_request(req, enqueuing_time) - } - - RequestOrResponse::Response(rep) => pool.insert_outbound_response(rep), - }; - queue.push_back(reference); - } - - let queue = CanisterQueue { - queue, - capacity: oq.queue.capacity, - request_slots: oq.queue.num_request_slots - none_entries, - response_slots: oq.queue.num_response_slots, - }; - queue - .check_invariants() - .map(|_| queue) - .map_err(ProxyDecodeError::Other) - } -} - -impl TryFrom<(&CanisterQueue, &MessagePool)> for InputQueue { - type Error = ProxyDecodeError; - - fn try_from((q, pool): (&CanisterQueue, &MessagePool)) -> Result { - let mut input_queue = InputQueue::new(q.capacity); - for reference in q.iter() { - let msg = pool.get(*reference).ok_or_else(|| { - ProxyDecodeError::Other(format!( - "InputQueue: unexpected stale reference ({:?})", - reference - )) - })?; - // Safe to unwrap because we cannot exceed the queue capacity. - if let RequestOrResponse::Response(_) = msg { - input_queue.reserve_slot().unwrap(); - } - input_queue.push(msg.clone()).unwrap(); - } - input_queue.queue.num_response_slots = q.response_slots; - - if !input_queue.queue.check_invariants() { - return Err(ProxyDecodeError::Other(format!( - "Invalid InputQueue: {:?}", - input_queue - ))); - } - - Ok(input_queue) - } -} - -impl TryFrom<(&CanisterQueue, &MessagePool)> for OutputQueue { - type Error = ProxyDecodeError; - - fn try_from((q, pool): (&CanisterQueue, &MessagePool)) -> Result { - let mut output_queue = OutputQueue::new(q.capacity); - let mut request_slots = 0; - let mut response_slots = 0; - for reference in q.iter() { - let msg = match pool.get(*reference) { - Some(msg) => msg.clone(), - - // Stale request, skip it. - None if reference.kind() == Kind::Request => { - continue; - } - - None => { - return Err(ProxyDecodeError::Other(format!( - "InputQueue: unexpected stale response reference ({:?})", - reference - ))) - } - }; - match msg { - RequestOrResponse::Request(req) => { - let deadline = pool - .outbound_guaranteed_request_deadlines() - .get(reference) - .cloned() - .unwrap_or(req.deadline); - // Safe to unwrap because we cannot exceed the queue capacity. - output_queue.push_request(req, deadline.into()).unwrap(); - request_slots += 1; - } - RequestOrResponse::Response(rep) => { - // Safe to unwrap because we cannot exceed the queue capacity. - output_queue.reserve_slot().unwrap(); - output_queue.push_response(rep); - response_slots += 1; - } - } - } - output_queue.queue.num_request_slots = request_slots; - output_queue.queue.num_response_slots = response_slots + q.reserved_slots(); - output_queue.num_messages = request_slots + response_slots; - - if !output_queue.queue.check_invariants() { - return Err(ProxyDecodeError::Other(format!( - "Invalid OutputQueue: {:?}", - output_queue.queue - ))); - } - - Ok(output_queue) - } -} - -/// Trait for queue items in `InputQueue` and `OutputQueue`. Such items must -/// either be a response or a request (including timed out requests). -/// Since an item is either a request or a response, implementing -/// `is_response()` is sufficient. -trait QueueItem { - /// Returns true if the queue item is a response. - fn is_response(&self) -> bool; - - /// Converts a request into a queue item. - fn from_request(request: Arc) -> T; - - /// Converts a response into a queue item. - fn from_response(response: Arc) -> T; -} - -impl QueueItem for RequestOrResponse { - fn is_response(&self) -> bool { - matches!(*self, RequestOrResponse::Response(_)) - } - - fn from_request(request: Arc) -> RequestOrResponse { - RequestOrResponse::Request(request) - } - fn from_response(response: Arc) -> RequestOrResponse { - RequestOrResponse::Response(response) - } -} - -impl QueueItem> for Option { - fn is_response(&self) -> bool { - matches!(*self, Some(RequestOrResponse::Response(_))) - } - - fn from_request(request: Arc) -> Option { - Some(RequestOrResponse::Request(request)) - } - fn from_response(response: Arc) -> Option { - Some(RequestOrResponse::Response(response)) - } -} - -/// A FIFO queue with equal but separate capacities for requests and responses, -/// ensuring full-duplex communication up to the capacity; and providing a -/// backpressure mechanism in either direction, once the limit is reached. This -/// is the basis for both `InputQueue` and `OutputQueue`. -/// -/// Requests are handled in a straightforward manner: pushing a request onto the -/// queue succeeds if there are available request slots, fails if there aren't. -/// -/// Response slots are either used by responses or reserved for expected -/// responses. Since an (incoming or outgoing) response always results from an -/// (outgoing or, respectively, incoming) request, it is required to first -/// reserve a slot for a response; and later push the response into the reserved -/// slot, consuming the slot reservation. Attempting to push a response with no -/// reserved slot available will produce an error. -#[derive(Clone, Eq, PartialEq, Hash, Debug, ValidateEq)] -struct QueueWithReservation + std::clone::Clone + ValidateEq> { - /// A FIFO queue of all requests and responses. Since responses may be enqueued - /// at arbitrary points in time, response reservations cannot be explicitly - /// represented in `queue`. They only exist as the difference between - /// `num_responses + num_requests` and `queue.len()`. - #[validate_eq(CompareWithValidateEq)] - queue: VecDeque, - /// Maximum number of requests; or responses + reservations; allowed by the - /// queue at any one time. - capacity: usize, - /// Number of slots used by requests. - num_request_slots: usize, - /// Number of slots used by responses and response reservations. - num_response_slots: usize, -} - -impl + std::clone::Clone + ValidateEq> QueueWithReservation { - fn new(capacity: usize) -> Self { - let queue = VecDeque::new(); - - Self { - queue, - capacity, - num_request_slots: 0, - num_response_slots: 0, - } - } - - /// Returns the number of slots available in the queue for reservations. - fn available_response_slots(&self) -> usize { - self.capacity.checked_sub(self.num_response_slots).unwrap() - } - - /// Returns the number slots available for requests. - fn available_request_slots(&self) -> usize { - self.capacity.checked_sub(self.num_request_slots).unwrap() - } - - /// Returns `Ok(())` if there exists at least one available request slot, - /// `Err(StateError::QueueFull)` otherwise. - fn check_has_request_slot(&self) -> Result<(), StateError> { - if self.num_request_slots >= self.capacity { - return Err(StateError::QueueFull { - capacity: self.capacity, - }); - } - Ok(()) - } - - /// Reserves a slot for a response, if available; else returns `Err(StateError::QueueFull)`. - fn reserve_slot(&mut self) -> Result<(), StateError> { - if self.available_response_slots() == 0 { - return Err(StateError::QueueFull { - capacity: self.capacity, - }); - } - self.num_response_slots += 1; - debug_assert!(self.check_invariants()); - Ok(()) - } - - /// Pushes a request into the queue or returns an error if the capacity - /// for requests is exhausted. - fn push_request(&mut self, request: Arc) -> Result<(), (StateError, Arc)> { - if self.num_request_slots < self.capacity { - self.num_request_slots += 1; - self.queue - .push_back(>::from_request(request)); - debug_assert!(self.check_invariants()); - Ok(()) - } else { - Err(( - StateError::QueueFull { - capacity: self.capacity, - }, - request, - )) - } - } - - /// Pushes a response into a reserved slot, consuming the reservation or - /// returns an error if there is no reservation available. - fn push_response( - &mut self, - response: Arc, - ) -> Result<(), (StateError, Arc)> { - if self.reserved_slots() > 0 { - self.queue - .push_back(>::from_response(response)); - debug_assert!(self.check_invariants()); - Ok(()) - } else { - Err(( - StateError::non_matching_response("No reserved response slot", &response), - response, - )) - } - } - - /// Pops an item from the queue. Returns `None` if the queue is empty. - fn pop(&mut self) -> Option { - let msg = self.queue.pop_front(); - if let Some(msg) = &msg { - if msg.is_response() { - self.num_response_slots = self.num_response_slots.checked_sub(1).unwrap(); - } else { - self.num_request_slots = self.num_request_slots.checked_sub(1).unwrap(); - } - } - debug_assert!(self.check_invariants()); - msg - } - - /// Returns a reference to the next item in the queue; or `None` if - /// the queue is empty. - fn peek(&self) -> Option<&T> { - self.queue.front() - } - - /// Returns the number of reserved slots in the queue. - pub(super) fn reserved_slots(&self) -> usize { - (self.num_request_slots + self.num_response_slots) - .checked_sub(self.queue.len()) - .unwrap() - } - - /// Returns `true` if the queue has one or more used slots. - pub(super) fn has_used_slots(&self) -> bool { - !self.queue.is_empty() || self.num_response_slots > 0 - } - - /// Calculates the sum of the given stat across all enqueued messages. - /// - /// Time complexity: O(num_messages). - fn calculate_stat_sum(&self, stat: impl Fn(&T) -> usize) -> usize { - self.queue.iter().map(stat).sum::() - } - - /// Queue invariant check that panics if any invariant does not hold. Intended - /// to be called from within a `debug_assert!()` in production code. - fn check_invariants(&self) -> bool { - assert!(self.num_request_slots <= self.capacity); - assert!(self.num_response_slots <= self.capacity); - - let num_responses = self.queue.iter().filter(|msg| msg.is_response()).count(); - assert!(num_responses <= self.num_response_slots); - assert_eq!(self.num_request_slots, self.queue.len() - num_responses); - - true - } -} - -impl From<&QueueWithReservation> for Vec { - fn from(q: &QueueWithReservation) -> Self { - q.queue.iter().map(|rr| rr.into()).collect() - } -} - -impl From<&QueueWithReservation>> for Vec { - fn from(q: &QueueWithReservation>) -> Self { - q.queue - .iter() - .map(|opt| match opt { - Some(rr) => rr.into(), - None => pb_queues::RequestOrResponse { r: None }, - }) - .collect() - } -} - -/// Computes `num_request_slots` and `num_response_slots`. -/// Also performs sanity checks for `capacity` and the above. -fn get_num_slots(q: &pb_queues::InputOutputQueue) -> Result<(usize, usize), ProxyDecodeError> { - let mut num_request_slots: u64 = 0; - let mut num_response_slots: u64 = 0; - for msg in q.queue.iter() { - if let pb_queues::RequestOrResponse { - r: Some(pb_queues::request_or_response::R::Response(_)), - } = msg - { - num_response_slots += 1; - } else { - num_request_slots += 1; - } - } - num_response_slots = num_response_slots.saturating_add(q.num_slots_reserved); - - if q.capacity != super::DEFAULT_QUEUE_CAPACITY as u64 { - return Err(ProxyDecodeError::Other(format!( - "QueueWithReservation: capacity {}, expecting {}", - q.capacity, - super::DEFAULT_QUEUE_CAPACITY - ))); - } - if num_request_slots > q.capacity { - return Err(ProxyDecodeError::Other(format!( - "QueueWithReservation: request slot count ({}) > capacity ({})", - num_request_slots, q.capacity, - ))); - } - if num_response_slots > q.capacity { - return Err(ProxyDecodeError::Other(format!( - "QueueWithReservation: response slot count ({}) > capacity ({})", - num_response_slots, q.capacity, - ))); - } - - Ok((num_request_slots as usize, num_response_slots as usize)) -} - -impl TryFrom for QueueWithReservation { - type Error = ProxyDecodeError; - - fn try_from(q: pb_queues::InputOutputQueue) -> Result { - let (num_request_slots, num_response_slots) = get_num_slots(&q)?; - - let queue = q - .queue - .into_iter() - .map(|rr| rr.try_into()) - .collect::, _>>()?; - Ok(QueueWithReservation { - queue, - capacity: super::DEFAULT_QUEUE_CAPACITY, - num_request_slots, - num_response_slots, - }) - } -} - -impl TryFrom for QueueWithReservation> { - type Error = ProxyDecodeError; - - fn try_from(q: pb_queues::InputOutputQueue) -> Result { - let (num_request_slots, num_response_slots) = get_num_slots(&q)?; - - let queue = q - .queue - .into_iter() - .map(|rr| match rr.r { - None => Ok(None), - Some(_) => rr.try_into().map(Some), - }) - .collect::, _>>()?; - Ok(QueueWithReservation { - queue, - capacity: super::DEFAULT_QUEUE_CAPACITY, - num_request_slots, - num_response_slots, - }) - } -} - -/// Representation of a single canister input queue. There is an upper bound on -/// the number of messages it can store. -#[derive(Clone, Eq, PartialEq, Hash, Debug, ValidateEq)] -pub(super) struct InputQueue { - #[validate_eq(CompareWithValidateEq)] - queue: QueueWithReservation, -} - -impl InputQueue { - pub(super) fn new(capacity: usize) -> Self { - Self { - queue: QueueWithReservation::new(capacity), - } - } - - pub(super) fn available_response_slots(&self) -> usize { - self.queue.available_response_slots() - } - - pub(super) fn check_has_request_slot(&self) -> Result<(), StateError> { - self.queue.check_has_request_slot() - } - - pub(super) fn push( - &mut self, - msg: RequestOrResponse, - ) -> Result<(), (StateError, RequestOrResponse)> { - match msg { - RequestOrResponse::Request(request) => self - .queue - .push_request(request) - .map_err(|(err, request)| (err, RequestOrResponse::Request(request))), - RequestOrResponse::Response(response) => self - .queue - .push_response(response) - .map_err(|(err, response)| (err, RequestOrResponse::Response(response))), - } - } - - pub(super) fn peek(&self) -> Option<&RequestOrResponse> { - self.queue.peek() - } - - pub(super) fn reserve_slot(&mut self) -> Result<(), StateError> { - self.queue.reserve_slot() - } - - pub(super) fn pop(&mut self) -> Option { - self.queue.pop() - } - - /// Returns the number of messages in the queue. - pub(super) fn len(&self) -> usize { - self.queue.queue.len() - } - - /// Returns the number of reserved slots in the queue. - pub(super) fn reserved_slots(&self) -> usize { - self.queue.reserved_slots() - } - - /// Returns `true` if the queue has one or more used slots. - pub(super) fn has_used_slots(&self) -> bool { - self.queue.has_used_slots() - } - - /// Returns the amount of cycles contained in the queue. - pub(super) fn cycles_in_queue(&self) -> Cycles { - let mut total_cycles = Cycles::zero(); - for msg in self.queue.queue.iter() { - total_cycles += msg.cycles(); - } - total_cycles - } - - /// Calculates the size in bytes, including struct and messages. - /// - /// Time complexity: O(num_messages). - pub(super) fn calculate_size_bytes(&self) -> usize { - size_of::() + self.queue.calculate_stat_sum(|msg| msg.count_bytes()) - } - - /// Calculates the sum of the given stat across all enqueued messages. - /// - /// Time complexity: O(num_messages). - pub(super) fn calculate_stat_sum(&self, stat: fn(&RequestOrResponse) -> usize) -> usize { - self.queue.calculate_stat_sum(stat) - } -} - -impl From<&InputQueue> for pb_queues::InputOutputQueue { - fn from(q: &InputQueue) -> Self { - Self { - queue: (&q.queue).into(), - begin: 0, - capacity: q.queue.capacity as u64, - num_slots_reserved: q.queue.reserved_slots() as u64, - deadline_range_ends: Vec::new(), - timeout_index: 0, - } - } -} - -impl TryFrom for InputQueue { - type Error = ProxyDecodeError; - - fn try_from(q: pb_queues::InputOutputQueue) -> Result { - if !q.deadline_range_ends.is_empty() || q.timeout_index != 0 { - return Err(Self::Error::Other( - "Found deadlines on decoding InputQueue".to_string(), - )); - } - Ok(Self { - queue: q.try_into()?, - }) - } -} - -/// Representation of a single Canister output queue. There is an upper bound -/// on the number of messages it can store. There is also a begin index which -/// can be used effectively as a sequence number for the next message popped out -/// of the queue. -/// -/// Uses `Option<_>` items so that requests can be dropped from anywhere in -/// the queue, i.e. replaced with `None`. They will keep their place in the queue -/// until they reach the beginning, where they will be discarded. -/// -/// Additionally, an invariant is imposed such that there is always `Some` at the -/// front. This is ensured when a message is popped off the queue by also popping -/// any subsequent `None` items. -#[derive(Clone, Eq, PartialEq, Hash, Debug, ValidateEq)] -pub(crate) struct OutputQueue { - #[validate_eq(CompareWithValidateEq)] - queue: QueueWithReservation>, - /// Queue begin index. - /// - /// This provides consistent indices that identify each queue item, even as - /// items are being pushed and popped, for use e.g. in `deadline_range_ends`. - begin: usize, - /// Ordered ranges of messages having the same request deadline. Each range - /// is represented as a deadline and its end index (the index just past - /// the last request where the deadline applies). Both the deadlines and queue - /// indices are strictly increasing. - deadline_range_ends: VecDeque<(Time, usize)>, - /// Index from which request timing out will resume. - /// - /// Used to ensure amortized constant time for timing out requests. - /// May point before the beginning of the queue if messages have been popped - /// since the last `time_out_requests()` call. - timeout_index: usize, - /// The number of actual messages in the queue. - num_messages: usize, -} - -impl OutputQueue { - pub(super) fn new(capacity: usize) -> Self { - Self { - queue: QueueWithReservation::new(capacity), - begin: 0, - deadline_range_ends: VecDeque::new(), - timeout_index: 0, - num_messages: 0, - } - } - - pub(super) fn available_request_slots(&self) -> usize { - self.queue.available_request_slots() - } - - pub(super) fn check_has_request_slot(&self) -> Result<(), StateError> { - self.queue.check_has_request_slot() - } - - pub(super) fn push_request( - &mut self, - request: Arc, - deadline: Time, - ) -> Result<(), (StateError, Arc)> { - self.queue.push_request(request)?; - - // Update the deadline queue. - // - // If the deadline is less than or equal the one at the end of the deadline queue, - // update the `end` of the last deadline range. - // - // If the new deadline is greater than the one of the previous request or there is - // no previous request in the queue. push a new tuple. - let end = self.begin + self.queue.queue.len(); - match self.deadline_range_ends.back_mut() { - Some((back_deadline, deadline_range_end)) if *back_deadline >= deadline => { - *deadline_range_end = end; - } - _ => { - self.deadline_range_ends.push_back((deadline, end)); - } - } - - self.num_messages += 1; - debug_assert!(self.check_invariants()); - - Ok(()) - } - - pub(super) fn push_response(&mut self, response: Arc) { - self.queue.push_response(response).unwrap(); - self.num_messages += 1; - debug_assert!(self.check_invariants()); - } - - pub(super) fn reserve_slot(&mut self) -> Result<(), StateError> { - self.queue.reserve_slot() - } - - /// Pops a message off the queue and returns it. - /// - /// Ensures there is always a 'Some' at the beginning. - pub(crate) fn pop(&mut self) -> Option { - match self.queue.pop() { - None => None, - Some(None) => { - unreachable!("OutputQueue invariant violated: Found `None` at the front."); - } - Some(Some(msg)) => { - self.begin += 1; - self.advance_to_next_message(); - - self.num_messages -= 1; - debug_assert!(self.check_invariants()); - - Some(msg) - } - } - } - - /// Consumes any empty slots at the beginning of the queue and discards consumed deadline ranges. - fn advance_to_next_message(&mut self) { - // Remove `None` in the beginning. - while let Some(None) = self.queue.peek() { - self.queue.pop(); - self.begin += 1; - } - - // Remove deadlines that are no longer relevant. - while let Some((_, deadline_range_end)) = self.deadline_range_ends.front() { - if *deadline_range_end <= self.begin || *deadline_range_end <= self.timeout_index { - self.deadline_range_ends.pop_front(); - } else { - break; - } - } - } - - /// Queue invariant check that panics if any invariant does not hold. Intended - /// to be called from within a `debug_assert!()` in production code. - /// - /// This is (and must remain) strictly a wrapper around `test_invariants()`, as - /// we should be enforcing the exact same invariants after deserialization as - /// after mutations. - /// - /// # Panics - /// - /// If an invariant is violated. - fn check_invariants(&self) -> bool { - if let Err(err) = self.test_invariants() { - panic!("{}", err); - } - true - } - - /// Queue invariant check that produces an error if any invariant does not hold. - fn test_invariants(&self) -> Result<(), &str> { - if let Some(None) = self.queue.queue.front() { - return Err("`None` at the beginning of the queue."); - } - - if !self - .deadline_range_ends - .iter() - .zip(self.deadline_range_ends.iter().skip(1)) - .all(|(a, b)| a.0 < b.0 && a.1 < b.1) - { - return Err("Deadline ranges not sorted."); - } - - // Deadline indices must be in the - // `(self.begin, self.begin + self.queue.queue.len()]` interval. - if let Some((_, first_deadline_range_end)) = self.deadline_range_ends.front() { - if *first_deadline_range_end <= self.begin { - return Err("Deadline range end before queue begin."); - } - if *first_deadline_range_end <= self.timeout_index { - return Err("Deadline range end before `timeout_index`."); - } - } - if let Some((_, last_deadline_range_end)) = self.deadline_range_ends.back() { - if *last_deadline_range_end > self.begin + self.queue.queue.len() { - return Err("Deadline range end after queue end."); - } - } - - if self - .queue - .queue - .iter() - .take(self.timeout_index.saturating_sub(self.begin)) - .any(|rr| matches!(rr, Some(RequestOrResponse::Request(_)))) - { - return Err("Request(s) before `timeout_index`."); - } - - if self.num_messages != self.queue.queue.iter().filter(|rr| rr.is_some()).count() { - return Err("`num_messages` does is not equal to the number of messages."); - } - - Ok(()) - } - - /// Returns the message that `pop` would have returned, without removing it - /// from the queue. - pub(crate) fn peek(&self) -> Option<&RequestOrResponse> { - self.queue.peek().map(|msg| msg.as_ref().unwrap()) - } - - /// Number of actual messages in the queue (`None` are ignored). - pub(super) fn num_messages(&self) -> usize { - self.num_messages - } - - /// Returns the number of reserved slots in the queue. - pub(super) fn reserved_slots(&self) -> usize { - self.queue.reserved_slots() - } - - /// Returns `true` if the queue has one or more used slots. - pub(super) fn has_used_slots(&self) -> bool { - self.queue.has_used_slots() - } - - /// Returns the amount of cycles contained in the queue. - pub(super) fn cycles_in_queue(&self) -> Cycles { - let mut total_cycles = Cycles::zero(); - for msg in self.queue.queue.iter().flatten() { - total_cycles += msg.cycles(); - } - total_cycles - } - - /// Calculates the sum of the given stat across all enqueued messages. - /// - /// Time complexity: O(num_messages). - pub(super) fn calculate_stat_sum(&self, stat: fn(&RequestOrResponse) -> usize) -> usize { - let stat = - |item: &Option| if let Some(item) = item { stat(item) } else { 0 }; - self.queue.calculate_stat_sum(stat) - } - - /// Returns true if there are any expired deadlines at `current_time`, false otherwise. - pub(super) fn has_expired_deadlines(&self, current_time: Time) -> bool { - match self.deadline_range_ends.front() { - Some((deadline, _)) => *deadline <= current_time, - None => false, - } - } - - /// Purges timed out requests. Returns an iterator over the timed out requests. - /// Only consumed items are purged. - pub(super) fn time_out_requests(&mut self, current_time: Time) -> TimedOutRequestsIter { - TimedOutRequestsIter { - q: self, - current_time, - } - } - - /// Returns an iterator over the underlying messages. - /// - /// For testing purposes only. - pub(super) fn iter_for_testing(&self) -> impl Iterator + '_ { - self.queue.queue.iter().filter_map(|item| item.clone()) - } -} - -/// Iterator over timed out requests in an OutputQueue. -/// -/// This extracts timed out requests by removing them from the queue, -/// leaving `None` in their place and returning them one by one. -pub(super) struct TimedOutRequestsIter<'a> { - /// A mutable reference to the queue whose requests are to be timed out and returned. - q: &'a mut OutputQueue, - /// The time used to determine which requests should be considered timed out. - /// This is compared to deadlines in q.deadline_range_ends. - current_time: Time, -} - -impl<'a> Iterator for TimedOutRequestsIter<'a> { - type Item = Arc; - - fn next(&mut self) -> Option { - use RequestOrResponse::Request; - - while let Some(&(deadline, deadline_range_end)) = self.q.deadline_range_ends.front() { - if deadline > self.current_time { - return None; - } - - self.q.timeout_index = self.q.timeout_index.max(self.q.begin); - - debug_assert!(deadline_range_end <= self.q.begin + self.q.queue.queue.len()); - while self.q.timeout_index < deadline_range_end { - let i = self.q.timeout_index - self.q.begin; - self.q.timeout_index += 1; - - if let Some(Request(request)) = match self.q.queue.queue.get_mut(i) { - Some(item @ Some(Request(_))) => item.take(), - _ => continue, - } { - self.q.num_messages -= 1; - self.q.advance_to_next_message(); - debug_assert!(self.q.check_invariants()); - - return Some(request); - } - } - self.q.deadline_range_ends.pop_front(); - } - None - } -} - -impl std::iter::Iterator for OutputQueue { - type Item = RequestOrResponse; - - fn next(&mut self) -> Option { - self.pop() - } -} - -impl From<&OutputQueue> for pb_queues::InputOutputQueue { - fn from(q: &OutputQueue) -> Self { - Self { - queue: (&q.queue).into(), - begin: q.begin as u64, - capacity: q.queue.capacity as u64, - num_slots_reserved: q.queue.reserved_slots() as u64, - deadline_range_ends: q - .deadline_range_ends - .iter() - .map( - |(deadline, deadline_range_end)| pb_queues::MessageDeadline { - deadline: deadline.as_nanos_since_unix_epoch(), - index: *deadline_range_end as u64, - }, - ) - .collect(), - timeout_index: q.timeout_index as u64, - } - } -} - -impl TryFrom for OutputQueue { - type Error = ProxyDecodeError; - - fn try_from(q: pb_queues::InputOutputQueue) -> Result { - let begin = q.begin as usize; - let timeout_index = q.timeout_index as usize; - let deadline_range_ends: VecDeque<(Time, usize)> = q - .deadline_range_ends - .iter() - .map(|di| { - ( - Time::from_nanos_since_unix_epoch(di.deadline), - di.index as usize, - ) - }) - .collect(); - let queue: QueueWithReservation> = q.try_into()?; - let num_messages = queue.queue.iter().filter(|rr| rr.is_some()).count(); - - let res = Self { - begin, - queue, - deadline_range_ends, - timeout_index, - num_messages, - }; - - if let Err(err) = res.test_invariants() { - return Err(Self::Error::Other(format!("Invalid OutputQueue: {}", err))); - } - Ok(res) - } -} - /// Representation of the Ingress queue. There is no upper bound on /// the number of messages it can store. /// diff --git a/rs/replicated_state/src/canister_state/queues/queue/tests.rs b/rs/replicated_state/src/canister_state/queues/queue/tests.rs index 85f0bec5bd4..95abe8696bd 100644 --- a/rs/replicated_state/src/canister_state/queues/queue/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/queue/tests.rs @@ -4,14 +4,9 @@ use super::super::message_pool::tests::*; use super::super::message_pool::Class; use super::*; use assert_matches::assert_matches; -use ic_test_utilities_types::arbitrary; use ic_test_utilities_types::ids::{canister_test_id, message_test_id, user_test_id}; -use ic_test_utilities_types::messages::{IngressBuilder, RequestBuilder, ResponseBuilder}; -use ic_types::messages::{CallbackId, RequestOrResponse}; -use ic_types::time::{CoarseTime, UNIX_EPOCH}; -use ic_types::Time; +use ic_test_utilities_types::messages::IngressBuilder; use proptest::prelude::*; -use std::time::Duration; #[test] fn canister_queue_constructor_test() { @@ -345,437 +340,6 @@ fn decode_with_invalid_response_slots_fails() { ); } -fn make_request(callback_id: u64, deadline_seconds: u32) -> Request { - RequestBuilder::default() - .sender_reply_callback(CallbackId::from(callback_id)) - .deadline(CoarseTime::from_secs_since_unix_epoch(deadline_seconds)) - .build() -} - -#[test] -fn canister_queue_try_from_input_queue() { - let req1 = make_request(1, 0); - let req2 = make_request(2, 0); - let req3 = make_request(3, 13); - let rep = ResponseBuilder::default() - .originator_reply_callback(CallbackId::new(4)) - .build(); - - // An `InputQueue` with a non-zero `begin`, a couple of requests, a response and - // a reserved slot. - let mut input_queue = InputQueue::new(10); - input_queue.push(req1.clone().into()).unwrap(); - input_queue.pop().unwrap(); - input_queue.push(req2.clone().into()).unwrap(); - input_queue.push(req3.clone().into()).unwrap(); - input_queue.reserve_slot().unwrap(); - input_queue.push(rep.clone().into()).unwrap(); - input_queue.reserve_slot().unwrap(); - - // Expected `MessagePool` and `CanisterQueue`. - let mut expected_pool = MessagePool::default(); - let mut expected_queue = CanisterQueue::new(10); - expected_queue.push_request(expected_pool.insert_inbound(req2.into())); - expected_queue.push_request(expected_pool.insert_inbound(req3.into())); - expected_queue.try_reserve_response_slot().unwrap(); - expected_queue.push_response(expected_pool.insert_inbound(rep.into())); - expected_queue.try_reserve_response_slot().unwrap(); - - let mut pool = MessagePool::default(); - let queue: CanisterQueue = (input_queue, &mut pool).try_into().unwrap(); - - assert_eq!((expected_pool, expected_queue), (pool, queue)); -} - -#[test] -fn canister_queue_try_from_output_queue() { - let t0 = Time::from_secs_since_unix_epoch(10).unwrap(); - let t3 = t0 + Duration::from_secs(3); - let t4 = t0 + Duration::from_secs(4); - let d0 = t0 + REQUEST_LIFETIME; - let d3 = t3 + REQUEST_LIFETIME; - let d4 = t4 + REQUEST_LIFETIME; - let req1 = make_request(1, 0); - let req2 = make_request(2, 13); - let req3 = make_request(3, 0); - let req4 = make_request(4, 14); - let rep = ResponseBuilder::default() - .originator_reply_callback(CallbackId::new(5)) - .build(); - - // An `OutputQueue` with a non-zero `begin`, a response, a timed out request - // (`None`), a couple of requests and a reserved slot. - let mut output_queue = OutputQueue::new(10); - // Advance `begin`. - output_queue.push_request(req1.clone().into(), d0).unwrap(); - output_queue.pop().unwrap(); - // Enqueue a response to induce head-of-line blocking. - output_queue.reserve_slot().unwrap(); - output_queue.push_response(rep.clone().into()); - // Enqueue and time out a request. - output_queue.push_request(req2.clone().into(), d0).unwrap(); - output_queue.time_out_requests(d3).next(); - // Enqueue a couple more requests. - output_queue.push_request(req3.clone().into(), d3).unwrap(); - output_queue.push_request(req4.clone().into(), d4).unwrap(); - // Make an extra response reservation. - output_queue.reserve_slot().unwrap(); - - // Expected `MessagePool` and `CanisterQueue`. - let mut expected_pool = MessagePool::default(); - let mut expected_queue = CanisterQueue::new(10); - expected_queue.try_reserve_response_slot().unwrap(); - expected_queue.push_response(expected_pool.insert_outbound_response(rep.into())); - expected_queue.push_request(expected_pool.insert_outbound_request(req3.into(), t3)); - expected_queue.push_request(expected_pool.insert_outbound_request(req4.into(), t4)); - expected_queue.try_reserve_response_slot().unwrap(); - - let mut pool = MessagePool::default(); - let queue: CanisterQueue = (output_queue, &mut pool).try_into().unwrap(); - - assert_eq!((expected_pool, expected_queue), (pool, queue)); -} - -#[test] -fn input_queue_try_from_canister_queue() { - let req1 = make_request(1, 0); - let req2 = make_request(2, 14); - let rep = ResponseBuilder::default() - .originator_reply_callback(CallbackId::new(4)) - .build(); - - // A `CanisterQueue` with a couple of requests and a response and a reserved - // slot. - let mut pool = MessagePool::default(); - let mut queue = CanisterQueue::new(10); - // Enqueue a couple of requests and a response. - queue.push_request(pool.insert_inbound(req1.clone().into())); - queue.push_request(pool.insert_inbound(req2.clone().into())); - queue.try_reserve_response_slot().unwrap(); - queue.push_response(pool.insert_inbound(rep.clone().into())); - // Make an extra response reservation. - queue.try_reserve_response_slot().unwrap(); - - // Expected `InputQueue`. - let mut expected_input_queue = InputQueue::new(10); - expected_input_queue.push(req1.into()).unwrap(); - expected_input_queue.push(req2.into()).unwrap(); - expected_input_queue.reserve_slot().unwrap(); - expected_input_queue.push(rep.into()).unwrap(); - expected_input_queue.reserve_slot().unwrap(); - - let input_queue: InputQueue = (&queue, &pool).try_into().unwrap(); - - assert_eq!(expected_input_queue, input_queue); -} - -#[test] -fn output_queue_try_from_canister_queue() { - let t0 = Time::from_secs_since_unix_epoch(10).unwrap(); - let t2 = t0 + Duration::from_secs(2); - let t3 = t0 + Duration::from_secs(3); - let d2 = t2 + REQUEST_LIFETIME; - let req1 = make_request(1, 11); - let req2 = make_request(2, 0); - let req3 = make_request(3, 13); - let rep = ResponseBuilder::default() - .originator_reply_callback(CallbackId::new(4)) - .build(); - - // A `CanisterQueue` with a couple of requests and a response, a stale request - // and a reserved slot. - let mut pool = MessagePool::default(); - let mut queue = CanisterQueue::new(10); - // Enqueue and then shed a request. - queue.push_request(pool.insert_outbound_request(req1.clone().into(), t0)); - pool.shed_largest_message().unwrap(); - assert_eq!(1, queue.len()); - // Enqueue a couple of requests and a response. - queue.push_request(pool.insert_outbound_request(req2.clone().into(), t2)); - queue.push_request(pool.insert_outbound_request(req3.clone().into(), t3)); - queue.try_reserve_response_slot().unwrap(); - queue.push_response(pool.insert_outbound_response(rep.clone().into())); - // Make an extra response reservation. - queue.try_reserve_response_slot().unwrap(); - - // Expected `OutputQueue`. The stale request and response are not preserved. - let mut expected_output_queue = OutputQueue::new(10); - expected_output_queue.push_request(req2.into(), d2).unwrap(); - // `CanisterQueue` does not record when `req3` was pushed (at `t3`), so it - // inherits `req2`'s deadline. - expected_output_queue.push_request(req3.into(), d2).unwrap(); - expected_output_queue.reserve_slot().unwrap(); - expected_output_queue.push_response(rep.into()); - expected_output_queue.reserve_slot().unwrap(); - - let output_queue: OutputQueue = (&queue, &pool).try_into().unwrap(); - - assert_eq!(expected_output_queue, output_queue); -} - -#[test] -fn input_queue_constructor_test() { - let capacity: usize = 14; - let mut queue = InputQueue::new(capacity); - assert_eq!(queue.len(), 0); - assert!(!queue.has_used_slots()); - assert_eq!(queue.pop(), None); -} - -#[test] -fn input_queue_with_message_is_not_empty() { - let mut input_queue = InputQueue::new(1); - - input_queue - .push(RequestBuilder::default().build().into()) - .expect("could push"); - assert_ne!(input_queue.len(), 0); - assert!(input_queue.has_used_slots()); -} - -#[test] -fn input_queue_with_reservation_is_not_empty() { - let mut input_queue = InputQueue::new(1); - input_queue.reserve_slot().unwrap(); - - assert_eq!(input_queue.len(), 0); - assert!(input_queue.has_used_slots()); -} - -/// Test affirming success on popping pushed messages. -#[test] -fn input_queue_pushed_messages_get_popped() { - let capacity: usize = 4; - let mut input_queue = InputQueue::new(capacity); - let mut msg_queue = VecDeque::new(); - for _ in 0..capacity { - let req: RequestOrResponse = RequestBuilder::default().build().into(); - msg_queue.push_back(req.clone()); - assert_eq!(Ok(()), input_queue.push(req)); - } - while !msg_queue.is_empty() { - assert_eq!(input_queue.pop(), msg_queue.pop_front()); - } - assert_eq!(None, msg_queue.pop_front()); - assert_eq!(None, input_queue.pop()); -} - -/// Pushing a request succeeds if there is space. -/// Reserving a slot, then pushing a response succeeds if there is space. -#[test] -fn input_queue_push_succeeds() { - let capacity: usize = 1; - let mut input_queue = InputQueue::new(capacity); - - // Push request. - assert_eq!(input_queue.queue.available_request_slots(), 1); - input_queue - .push(RequestBuilder::default().build().into()) - .unwrap(); - assert_eq!(input_queue.queue.available_request_slots(), 0); - assert!(input_queue.check_has_request_slot().is_err()); - - // Push response. - assert_eq!(input_queue.queue.available_response_slots(), 1); - input_queue.reserve_slot().unwrap(); - assert_eq!(input_queue.queue.available_response_slots(), 0); - input_queue - .push(ResponseBuilder::default().build().into()) - .unwrap(); - assert_eq!(input_queue.queue.available_response_slots(), 0); - - assert_eq!(2, input_queue.len()); -} - -/// Test that overfilling an input queue with messages and reservations -/// results in failed pushes and reservations; also verifies that -/// pushes and reservations below capacity succeed. -#[test] -fn input_queue_push_to_full_queue_fails() { - // First fill up the queue. - let capacity: usize = 2; - let mut input_queue = InputQueue::new(capacity); - for _ in 0..capacity { - input_queue - .push(RequestBuilder::default().build().into()) - .unwrap(); - } - for _index in 0..capacity { - input_queue.reserve_slot().unwrap(); - } - assert_eq!(input_queue.len(), capacity); - - // Now push an extraneous message in. - assert_eq!( - input_queue - .push(RequestBuilder::default().build().into(),) - .map_err(|(err, _)| err), - Err(StateError::QueueFull { capacity }) - ); - // Or try to reserve a slot. - assert_eq!( - input_queue.reserve_slot(), - Err(StateError::QueueFull { capacity }) - ); -} - -#[test] -fn input_queue_push_response_without_reservation_fails() { - let mut queue = InputQueue::new(10); - queue - .push(ResponseBuilder::default().build().into()) - .unwrap_err(); -} - -#[test] -fn input_queue_decode_with_non_empty_deadlines_fails() { - let mut q = InputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); - for _ in 0..2 { - let _ = q.push(RequestOrResponse::Request( - RequestBuilder::default().build().into(), - )); - } - let mut proto_queue: pb_queues::InputOutputQueue = (&q).into(); - proto_queue - .deadline_range_ends - .push(pb_queues::MessageDeadline { - deadline: 0, - index: 0, - }); - assert!(TryInto::::try_into(proto_queue).is_err()); -} - -#[test] -fn output_queue_constructor_test() { - let mut queue = OutputQueue::new(14); - assert_eq!(queue.num_messages(), 0); - assert_eq!(queue.pop(), None); -} - -#[test] -fn output_queue_with_message_is_not_empty() { - let mut queue = OutputQueue::new(14); - - queue - .push_request(RequestBuilder::default().build().into(), UNIX_EPOCH) - .expect("could push"); - assert_eq!(queue.num_messages(), 1); - assert!(queue.has_used_slots()); -} - -#[test] -fn output_queue_with_reservation_is_not_empty() { - let mut queue = OutputQueue::new(14); - queue.reserve_slot().unwrap(); - - assert_eq!(queue.num_messages(), 0); - assert!(queue.has_used_slots()); -} - -// Pushing a request succeeds if there is space. -#[test] -fn output_queue_push_request_succeeds() { - let capacity: usize = 1; - let mut output_queue = OutputQueue::new(capacity); - - assert_eq!(output_queue.queue.available_request_slots(), 1); - output_queue - .push_request(RequestBuilder::default().build().into(), UNIX_EPOCH) - .unwrap(); - assert_eq!(output_queue.queue.available_request_slots(), 0); - assert!(output_queue.check_has_request_slot().is_err()); - - assert_eq!(1, output_queue.num_messages()); -} - -// Reserving a slot, then pushing a response succeeds if there is space. -#[test] -fn output_queue_push_response_succeeds() { - let capacity: usize = 1; - let mut output_queue = OutputQueue::new(capacity); - - assert_eq!(output_queue.queue.available_response_slots(), 1); - output_queue.reserve_slot().unwrap(); - assert_eq!(output_queue.queue.available_response_slots(), 0); - output_queue.push_response(ResponseBuilder::default().build().into()); - assert_eq!(output_queue.queue.available_response_slots(), 0); - - assert_eq!(1, output_queue.num_messages()); -} - -/// Test that overfilling an output queue with messages and reservations -/// results in failed pushes and reservations; also verifies that -/// pushes and reservations below capacity succeeds. -#[test] -fn output_queue_push_to_full_queue_fails() { - // First fill up the queue. - let capacity: usize = 2; - let mut output_queue = OutputQueue::new(capacity); - for _index in 0..capacity { - output_queue - .push_request(RequestBuilder::default().build().into(), UNIX_EPOCH) - .unwrap(); - } - for _index in 0..capacity { - output_queue.reserve_slot().unwrap(); - } - assert_eq!(output_queue.num_messages(), capacity); - - // Now push an extraneous message in - assert_eq!( - output_queue - .push_request(RequestBuilder::default().build().into(), UNIX_EPOCH,) - .map_err(|(err, _)| err), - Err(StateError::QueueFull { capacity }) - ); - // Or try to reserve a slot. - assert_eq!( - output_queue.reserve_slot(), - Err(StateError::QueueFull { capacity }) - ); -} - -#[test] -#[should_panic(expected = "called `Result::unwrap()` on an `Err` value")] -fn output_push_without_reserved_slot_fails() { - let mut queue = OutputQueue::new(10); - queue.push_response(ResponseBuilder::default().build().into()); -} - -/// An explicit example of deadlines in OutputQueue, where we manually fill -/// and empty the queue, while checking whether we find what we'd expect. -/// This test also ensures `push_request` and `push_response` don't increment -/// the queue begin index, but `pop` does (by 1). -#[test] -fn output_queue_explicit_push_and_pop_test() { - let mut q = OutputQueue::new(100); - assert_eq!(0, q.num_messages()); - - let test_request = Arc::::from(RequestBuilder::default().build()); - let test_response = Arc::::from(ResponseBuilder::default().build()); - let deadline1 = Time::from_nanos_since_unix_epoch(3); - let deadline2 = Time::from_nanos_since_unix_epoch(7); - - q.push_request(test_request.clone(), deadline1).unwrap(); - q.reserve_slot().unwrap(); - q.push_response(test_response); - q.push_request(test_request.clone(), deadline1).unwrap(); - q.push_request(test_request, deadline2).unwrap(); - - assert_eq!(4, q.num_messages()); - assert_eq!(0, q.begin); - assert_eq!( - VecDeque::from(vec![(deadline1, 3), (deadline2, 4)]), - q.deadline_range_ends - ); - - let timeout_index = q.timeout_index; - assert!(matches!(q.pop().unwrap(), RequestOrResponse::Request(_))); - assert_eq!(3, q.num_messages()); - assert_eq!(1, q.begin); - assert_eq!(timeout_index, q.timeout_index); -} - /// This ensures `debug_asserts` are enabled, because we are passively testing invariants /// through the function `OutputQueue::check_invariants()`, which is only called when /// `debug_asserts` are enabled. @@ -785,431 +349,6 @@ fn ensure_debug_asserts_enabled() { debug_assert!(false); } -proptest! { - /// Checks `push_request` enforces `OutputQueue` invariants. This is implicitly checked at - /// the bottom of `OutputQueue::push_request()`. There is another explicit check here after - /// the fact to ensure this test doesn't just silently pass if the implicit check is removed. - /// Additionally, an expected `deadline_range_ends` is reconstructed and then compared to - /// the actual version at at the end. - #[test] - fn output_queue_push_request_enforces_invariants( - (requests, deadlines) in (2..=10_usize) - .prop_flat_map(|num_requests| { - ( - proptest::collection::vec( - arbitrary::request().prop_map(Arc::from), - num_requests, - ), - proptest::collection::vec( - (0..=1000_u64).prop_map(Time::from_nanos_since_unix_epoch), - num_requests, - ), - ) - }) - ) { - let mut q = OutputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); - let mut expected_deadline_range_ends = VecDeque::<(Time, usize)>::new(); - - let mut index = 1; - for (request, deadline) in requests - .into_iter() - .zip(deadlines.into_iter()) - { - q.push_request(request, deadline).unwrap(); - - match expected_deadline_range_ends.back_mut() { - Some((back_deadline, end)) if *back_deadline >= deadline => { - *end = index; - } - _ => { - expected_deadline_range_ends.push_back((deadline, index)); - } - } - - index += 1; - } - - prop_assert!(q.check_invariants()); - prop_assert_eq!(expected_deadline_range_ends, q.deadline_range_ends); - } -} - -prop_compose! { - /// Generator for an arbitrary `OutputQueue` where nothing is timed out. An arbitrary number - /// of execution rounds is simulated by starting with round boundaries (in terms of messages - /// pushed onto the queue by the end of the respective round) [1, 2, ... num_msgs]; and removing - /// a random subset thereof. - fn arb_output_queue_no_timeout(num_msgs: std::ops::RangeInclusive) ( - begin in 0..10_usize, - (msgs, indices_to_remove) in num_msgs - .prop_flat_map(|num_msgs| { - ( - proptest::collection::vec(arbitrary::request_or_response(), num_msgs), - proptest::collection::vec(any::(), 0..=num_msgs), - ) - }) - ) -> OutputQueue { - - let mut q = OutputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); - q.begin = begin; - - // Boundaries of execution rounds. - let mut range_ends = (0..=msgs.len()).collect::>(); - for i in indices_to_remove { - range_ends.remove(i % range_ends.len()); - } - range_ends.push(usize::MAX); - - // Push messages on the queue. The deadlines start at 10 so that - // there is some room to pick a random time from that won't time out - // anything. - let mut round = 0; - for (i, msg) in msgs.into_iter().enumerate() { - if range_ends[round] == i { - round += 1; - } - match msg { - RequestOrResponse::Request(request) => { - q.push_request( - request, - Time::from_nanos_since_unix_epoch((10 + round) as u64), - ).unwrap(); - } - RequestOrResponse::Response(response) => { - q.reserve_slot().unwrap(); - q.push_response(response); - } - } - } - q.check_invariants(); - - q - } -} - -prop_compose! { - /// Generator for an arbitrary time in the interval [min_deadline - 5, max_deadline + 5] - /// for an arbitrary `OutputQueue`. Returns 0 if there are no deadlines in the queue. - fn arb_time_for_output_queue_timeouts(q: &OutputQueue) ( - time in { - // Find time for timing out in [min_deadline-5, max_deadline+5]. - if let (Some((min_deadline, _)), Some((max_deadline, _))) = - (q.deadline_range_ends.front(), q.deadline_range_ends.back()) - { - let min_deadline = min_deadline.as_nanos_since_unix_epoch(); - let max_deadline = max_deadline.as_nanos_since_unix_epoch(); - min_deadline - 5 ..= max_deadline + 5 - } else { - 0..=0_u64 - } - }, - ) -> Time { - Time::from_nanos_since_unix_epoch(time) - } -} - -prop_compose! { - /// Generator for an arbitrary `OutputQueue` where requests are (partially) timed out. - /// The time for timing out is chosen in the interval [min_deadline - 5, max_deadline+ 5] - /// such that it encompasses edge cases. - fn arb_output_queue() ( - (time, num_pop, mut q) in arb_output_queue_no_timeout(5..=20) - .prop_flat_map(|q| (arb_time_for_output_queue_timeouts(&q), 0..3_usize, Just(q))) - ) -> OutputQueue { - q.time_out_requests(time).count(); - q.check_invariants(); - - // Pop a few messages to somewhat randomize `timeout_index`. - for _ in 0..num_pop { - q.pop(); - } - - q - } -} - -proptest! { - /// Check timing out requests using an iterator. This uses a random time and then - /// checks that requests whose deadline has expired are extracted from the queue, - /// but the corresponding responses remain. - #[test] - fn output_queue_test_time_out_requests( - (time, mut q) in arb_output_queue() - .prop_flat_map(|q| (arb_time_for_output_queue_timeouts(&q), Just(q))) - ) { - let mut ref_q = q.clone(); - - let mut timed_out_requests = q - .time_out_requests(time) - .map(RequestOrResponse::Request) - .collect::>(); - - q.check_invariants(); - - // Check there are no `None` at or after `timeout_index`. - if !timed_out_requests.is_empty() { - prop_assert!(q - .queue - .queue - .iter() - .skip(q.timeout_index - q.begin) - .all(|msg| msg.is_some())); - } - - while let Some((deadline, _)) = ref_q.deadline_range_ends.front() { - if *deadline > time { - break; - } - match ref_q.peek() { - Some(RequestOrResponse::Response(_)) => { - prop_assert_eq!(ref_q.pop(), q.pop()); - } - Some(RequestOrResponse::Request(_)) => { - prop_assert_eq!(ref_q.pop(), timed_out_requests.pop_front()); - } - None => unreachable!(), - } - } - - // `ref_q` and `q` must be the same after popping all messages from the - // time outed section, except `timeout_index` was not updated since we - // never timed out anything for `ref_q`. - ref_q.timeout_index = q.timeout_index; - prop_assert_eq!(ref_q, q); - } -} - -/// Check whether a timed out request produces back pressure. -#[test] -fn output_queue_check_back_pressure_with_timed_out_requests() { - let mut q = OutputQueue::new(1); - - q.reserve_slot().unwrap(); - q.push_response(Arc::new(ResponseBuilder::default().build())); - - q.push_request( - Arc::new(RequestBuilder::default().build()), - Time::from_nanos_since_unix_epoch(1), - ) - .unwrap(); - q.time_out_requests(Time::from_nanos_since_unix_epoch(u64::MAX)) - .count(); - - assert!(q - .push_request( - Arc::new(RequestBuilder::default().build()), - Time::from_nanos_since_unix_epoch(1), - ) - .is_err()); -} - -proptest! { - /// Check whether the conversion to and from the protobuf version - /// works for arbitrary output queues. - #[test] - fn output_queue_roundtrip_conversions( - q in arb_output_queue() - ) { - let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - let deserialized_q: OutputQueue = proto_queue.try_into().expect("bad conversion"); - - prop_assert_eq!(q, deserialized_q); - } -} - -/// Generates a simple `OutputQueue` holding a specified number of requests, -/// each with a unique deadline. -fn generate_test_queue(num_requests: usize) -> OutputQueue { - let mut q = OutputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); - for t in 1..=num_requests { - q.push_request( - RequestBuilder::default().build().into(), - Time::from_nanos_since_unix_epoch(t as u64), - ) - .unwrap(); - } - q -} - -#[test] -fn output_queue_test_has_expired_deadlines() { - let end_of_time = Time::from_nanos_since_unix_epoch(u64::MAX); - - let q = generate_test_queue(0); - assert!(!q.has_expired_deadlines(end_of_time)); - - let q = generate_test_queue(1); - assert!(!q.has_expired_deadlines(Time::from_nanos_since_unix_epoch(0))); - assert!(q.has_expired_deadlines(end_of_time)); -} - -#[test] -fn output_queue_decode_with_deadlines_not_strictly_sorted_fails() { - let mut q = generate_test_queue(2); - - // Check duplicate deadline range end causes error. - let deadline = q.deadline_range_ends[0].0; - q.deadline_range_ends[0].0 = q.deadline_range_ends[1].0; - let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - assert!(OutputQueue::try_from(proto_queue).is_err()); - - // Check swapped deadline range ends cause error. - q.deadline_range_ends[1].0 = deadline; - let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - assert!(OutputQueue::try_from(proto_queue).is_err()); -} - -#[test] -fn output_queue_decode_with_deadline_indices_not_strictly_sorted_fails() { - let mut q = generate_test_queue(2); - - // Check duplicate deadline range end causes error. - let index = q.deadline_range_ends[0].1; - q.deadline_range_ends[0].1 = q.deadline_range_ends[1].1; - let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - assert!(OutputQueue::try_from(proto_queue).is_err()); - - // Check swapped deadline range ends cause error. - q.deadline_range_ends[1].1 = index; - let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - assert!(OutputQueue::try_from(proto_queue).is_err()); -} - -#[test] -fn output_queue_decode_with_deadlines_index_out_of_bounds_fails() { - let mut q = generate_test_queue(1); - q.begin = 1; - - // Check deadline index before the queue causes error. - q.deadline_range_ends[0].1 = 0; - let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - assert!(OutputQueue::try_from(proto_queue).is_err()); - - // Check deadline index after the queue causes error. - q.deadline_range_ends[0].1 = 3; - let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - assert!(OutputQueue::try_from(proto_queue).is_err()); -} - -#[test] -fn output_queue_decode_with_none_head_fails() { - let mut q = OutputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); - for _ in 0..2 { - q.push_request(RequestBuilder::default().build().into(), UNIX_EPOCH) - .unwrap(); - } - q.queue.queue.front_mut().unwrap().take(); - - let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - assert!(matches!( - OutputQueue::try_from(proto_queue).err(), - Some(ProxyDecodeError::Other(_)) - )); -} - -// Creates an `OutputQueue` from a VecDeque directly to allow for roundtrips with -// `queue.len() > capacity`. -fn output_queue_roundtrip_from_vec_deque( - queue: VecDeque>, - num_request_slots: usize, - num_response_slots: usize, - num_messages: usize, -) -> Result { - let q = OutputQueue { - queue: QueueWithReservation::> { - queue, - capacity: super::super::DEFAULT_QUEUE_CAPACITY, - num_response_slots, - num_request_slots, - }, - begin: 0, - deadline_range_ends: VecDeque::new(), - timeout_index: 0, - num_messages, - }; - - let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - TryInto::::try_into(proto_queue) -} - -#[test] -fn output_queue_decode_check_num_requests_and_responses() { - let capacity = super::super::DEFAULT_QUEUE_CAPACITY; - let half_capacity = capacity / 2; - - let mut queue = VecDeque::new(); - - for _ in 0..half_capacity { - queue.push_back(Some(RequestOrResponse::Request( - RequestBuilder::default().build().into(), - ))); - } - for _ in half_capacity..capacity { - queue.push_back(None); - } - for _ in 0..half_capacity { - queue.push_back(Some(RequestOrResponse::Response( - ResponseBuilder::default().build().into(), - ))); - } - let num_request_slots = capacity; - let num_response_slots = capacity; - let num_messages = 2 * half_capacity; - assert!(output_queue_roundtrip_from_vec_deque( - queue.clone(), - num_request_slots, - num_response_slots, - num_messages - ) - .is_ok()); - - // Check that we get an error with one more request. - queue.push_back(Some(RequestOrResponse::Request( - RequestBuilder::default().build().into(), - ))); - assert!(output_queue_roundtrip_from_vec_deque( - queue.clone(), - num_request_slots + 1, - num_response_slots, - num_messages + 1, - ) - .is_err()); - queue.pop_back(); - - // Check that we get an error with one more `None`. - queue.push_back(None); - assert!(output_queue_roundtrip_from_vec_deque( - queue.clone(), - num_request_slots + 1, - num_response_slots, - num_messages - ) - .is_err()); - queue.pop_back(); - - // Check that we get an error with one more response. - queue.push_back(Some(RequestOrResponse::Response( - ResponseBuilder::default().build().into(), - ))); - assert!(output_queue_roundtrip_from_vec_deque( - queue.clone(), - num_request_slots, - num_response_slots + 1, - num_messages + 1, - ) - .is_err()); - queue.pop_back(); - - // Check that we get an error with one more slot reservation. - assert!(output_queue_roundtrip_from_vec_deque( - queue.clone(), - num_request_slots, - num_response_slots + 1, - num_messages, - ) - .is_err()); -} - #[test] fn ingress_queue_constructor_test() { let mut queue = IngressQueue::default(); diff --git a/rs/replicated_state/src/canister_state/queues/tests.rs b/rs/replicated_state/src/canister_state/queues/tests.rs index 51ef6af58c7..6768bcd3ae5 100644 --- a/rs/replicated_state/src/canister_state/queues/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/tests.rs @@ -1,6 +1,5 @@ use super::input_schedule::testing::InputScheduleTesting; use super::message_pool::{MessageStats, REQUEST_LIFETIME}; -use super::queue::{InputQueue, OutputQueue}; use super::testing::{new_canister_output_queues_for_test, CanisterQueuesTesting}; use super::*; use crate::{CanisterState, InputQueueType::*, SchedulerState, SystemState}; @@ -13,7 +12,7 @@ use ic_test_utilities_types::messages::{IngressBuilder, RequestBuilder, Response use ic_types::messages::{CallbackId, MAX_INTER_CANISTER_PAYLOAD_IN_BYTES_U64, NO_DEADLINE}; use ic_types::time::{expiry_time_from_now, CoarseTime, UNIX_EPOCH}; use ic_types::{Cycles, UserId}; -use maplit::{btreemap, btreeset}; +use maplit::btreemap; use proptest::prelude::*; use std::cell::RefCell; use std::convert::TryInto; @@ -1775,145 +1774,6 @@ fn encode_non_default_pool() { assert_eq!(queues, decoded); } -/// Tests decoding `CanisterQueues` from `input_queues` + `output_queues` -/// (instead of `canister_queues` + `pool`). -#[test] -fn decode_backward_compatibility() { - let local_canister = canister_test_id(13); - let remote_canister = canister_test_id(14); - - let mut queues_proto = pb_queues::CanisterQueues::default(); - let mut expected_queues = CanisterQueues::default(); - - let response_callback = CallbackId::from(42); - let req = RequestBuilder::default() - .sender(local_canister) - .receiver(local_canister) - .sender_reply_callback(response_callback) - .build(); - let rep = ResponseBuilder::default() - .originator(local_canister) - .respondent(local_canister) - .originator_reply_callback(response_callback) - .build(); - let t1 = Time::from_secs_since_unix_epoch(12345).unwrap(); - let t2 = t1 + Duration::from_secs(1); - let d1 = t1 + REQUEST_LIFETIME; - let d2 = t2 + REQUEST_LIFETIME; - - // - // `local_canister`'s queues. - // - - // An `InputQueue` with a request, a response and a reserved slot. - let mut iq1 = InputQueue::new(DEFAULT_QUEUE_CAPACITY); - iq1.push(req.clone().into()).unwrap(); - iq1.reserve_slot().unwrap(); - iq1.push(rep.clone().into()).unwrap(); - iq1.reserve_slot().unwrap(); - - // Expected input queue. - let mut expected_iq1 = CanisterQueue::new(DEFAULT_QUEUE_CAPACITY); - // Enqueue a request and a response. - expected_iq1.push_request(expected_queues.pool.insert_inbound(req.clone().into())); - expected_iq1.try_reserve_response_slot().unwrap(); - expected_iq1.push_response(expected_queues.pool.insert_inbound(rep.clone().into())); - // Make an extra response reservation. - expected_iq1.try_reserve_response_slot().unwrap(); - - // An output queue with a response, a timed out request, a non-timed out request - // and a reserved slot. - let mut oq1 = OutputQueue::new(DEFAULT_QUEUE_CAPACITY); - oq1.reserve_slot().unwrap(); - oq1.push_response(rep.clone().into()); - oq1.push_request(req.clone().into(), d1).unwrap(); - oq1.time_out_requests(d2).count(); - oq1.push_request(req.clone().into(), d2).unwrap(); - oq1.reserve_slot().unwrap(); - - // Expected output queue. The timed out request is gone. - let mut expected_oq1 = CanisterQueue::new(DEFAULT_QUEUE_CAPACITY); - expected_oq1.try_reserve_response_slot().unwrap(); - expected_oq1.push_response( - expected_queues - .pool - .insert_outbound_response(rep.clone().into()), - ); - expected_oq1.push_request( - expected_queues - .pool - .insert_outbound_request(req.clone().into(), t2), - ); - expected_oq1.try_reserve_response_slot().unwrap(); - - queues_proto.input_queues.push(pb_queues::QueueEntry { - canister_id: Some(local_canister.into()), - queue: Some((&iq1).into()), - }); - queues_proto.output_queues.push(pb_queues::QueueEntry { - canister_id: Some(local_canister.into()), - queue: Some((&oq1).into()), - }); - queues_proto - .local_sender_schedule - .push(local_canister.into()); - queues_proto.guaranteed_response_memory_reservations += 2; - expected_queues - .canister_queues - .insert(local_canister, (expected_iq1, expected_oq1)); - expected_queues - .input_schedule - .schedule(local_canister, LocalSubnet); - - // - // `remote_canister`'s queues. - // - - // Input queue with a reserved slot. - let mut iq2 = InputQueue::new(DEFAULT_QUEUE_CAPACITY); - iq2.reserve_slot().unwrap(); - - // Expected input queue. - let mut expected_iq2 = CanisterQueue::new(DEFAULT_QUEUE_CAPACITY); - expected_iq2.try_reserve_response_slot().unwrap(); - - // Empty output queue. - let oq2 = OutputQueue::new(DEFAULT_QUEUE_CAPACITY); - - queues_proto.input_queues.push(pb_queues::QueueEntry { - canister_id: Some(remote_canister.into()), - queue: Some((&iq2).into()), - }); - queues_proto.output_queues.push(pb_queues::QueueEntry { - canister_id: Some(remote_canister.into()), - queue: Some((&oq2).into()), - }); - queues_proto.guaranteed_response_memory_reservations += 1; - expected_queues.canister_queues.insert( - remote_canister, - (expected_iq2, CanisterQueue::new(DEFAULT_QUEUE_CAPACITY)), - ); - - // - // Adjust stats. - // - - expected_queues.queue_stats = CanisterQueues::calculate_queue_stats( - &expected_queues.canister_queues, - queues_proto.guaranteed_response_memory_reservations as usize, - 0, - ); - expected_queues.callbacks_with_enqueued_response = btreeset! {CallbackId::from(42)}; - - let queues = ( - queues_proto, - &StrictMetrics as &dyn CheckpointLoadingMetrics, - ) - .try_into() - .unwrap(); - assert_eq!(expected_queues, queues); -} - /// Constructs an encoded `CanisterQueues` with 2 inbound responses (callbacks 1 /// and 2) and one shed inbound response (callback 3). fn canister_queues_proto_with_inbound_responses() -> pb_queues::CanisterQueues { From 20ce3057d9fed27bd5e4f1aa4937a33b32e78a34 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Mon, 30 Sep 2024 14:40:44 +0000 Subject: [PATCH 2/2] Move reserved tags and names to the bottom of CanisterQueues message definition. --- rs/protobuf/def/state/queues/v1/queues.proto | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rs/protobuf/def/state/queues/v1/queues.proto b/rs/protobuf/def/state/queues/v1/queues.proto index f44dfe632cf..f4dfe69b3f2 100644 --- a/rs/protobuf/def/state/queues/v1/queues.proto +++ b/rs/protobuf/def/state/queues/v1/queues.proto @@ -159,9 +159,6 @@ message CanisterQueue { } message CanisterQueues { - reserved 1, 3, 4, 5; - reserved "canister_id", "input_queues", "input_schedule", "output_queues"; - repeated ingress.v1.Ingress ingress_queue = 2; // Input queue from and output queue to `canister_id`. @@ -194,4 +191,7 @@ message CanisterQueues { repeated types.v1.CanisterId remote_sender_schedule = 8; uint64 guaranteed_response_memory_reservations = 11; + + reserved 1, 3, 4, 5; + reserved "canister_id", "input_queues", "input_schedule", "output_queues"; }