Skip to content

Commit

Permalink
chore: [MR-569] Drop old canister queue implementations (#1733)
Browse files Browse the repository at this point in the history
[MR-569]: Both the proto and Rust definitions were kept around for
backward compatibility (decoding canister queues encoded by old replica
versions). Now that all of mainnet is running a replica version using
and encoding the new `CanisterQueues`, it is safe to remove the old
ones.

[MR-569]:
https://dfinity.atlassian.net/browse/MR-569?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
  • Loading branch information
alin-at-dfinity authored Oct 2, 2024
1 parent 1d41511 commit f8f2d84
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 2,199 deletions.
28 changes: 3 additions & 25 deletions rs/protobuf/def/state/queues/v1/queues.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,6 @@ message MessageDeadline {
uint64 index = 2;
}

message InputOutputQueue {
repeated RequestOrResponse queue = 1;
uint64 begin = 2;
uint64 capacity = 3;
uint64 num_slots_reserved = 4;
// Ordered ranges of messages having the same request deadline. Each range
// is represented as a deadline and its end index (the `QueueIndex` just
// past the last request where the deadline applies). Both the deadlines and
// queue indices are strictly increasing.
repeated MessageDeadline deadline_range_ends = 5;
// Queue index from which request timing out will resume.
uint64 timeout_index = 6;
}

message QueueEntry {
types.v1.CanisterId canister_id = 1;
InputOutputQueue queue = 2;
}

// A pool holding all of a canister's incoming and outgoing canister messages.
message MessagePool {
// A pool entry: a message keyed by its ID.
Expand Down Expand Up @@ -178,14 +159,8 @@ message CanisterQueue {
}

message CanisterQueues {
reserved 1, 4;
reserved "canister_id", "input_schedule";

repeated ingress.v1.Ingress ingress_queue = 2;

repeated QueueEntry input_queues = 3;
repeated QueueEntry output_queues = 5;

// Input queue from and output queue to `canister_id`.
message CanisterQueuePair {
types.v1.CanisterId canister_id = 1;
Expand Down Expand Up @@ -216,4 +191,7 @@ message CanisterQueues {
repeated types.v1.CanisterId remote_sender_schedule = 8;

uint64 guaranteed_response_memory_reservations = 11;

reserved 1, 3, 4, 5;
reserved "canister_id", "input_queues", "input_schedule", "output_queues";
}
31 changes: 0 additions & 31 deletions rs/protobuf/src/gen/state/state.queues.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,33 +138,6 @@ pub struct MessageDeadline {
#[prost(uint64, tag = "2")]
pub index: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct InputOutputQueue {
#[prost(message, repeated, tag = "1")]
pub queue: ::prost::alloc::vec::Vec<RequestOrResponse>,
#[prost(uint64, tag = "2")]
pub begin: u64,
#[prost(uint64, tag = "3")]
pub capacity: u64,
#[prost(uint64, tag = "4")]
pub num_slots_reserved: u64,
/// Ordered ranges of messages having the same request deadline. Each range
/// is represented as a deadline and its end index (the `QueueIndex` just
/// past the last request where the deadline applies). Both the deadlines and
/// queue indices are strictly increasing.
#[prost(message, repeated, tag = "5")]
pub deadline_range_ends: ::prost::alloc::vec::Vec<MessageDeadline>,
/// Queue index from which request timing out will resume.
#[prost(uint64, tag = "6")]
pub timeout_index: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct QueueEntry {
#[prost(message, optional, tag = "1")]
pub canister_id: ::core::option::Option<super::super::super::types::v1::CanisterId>,
#[prost(message, optional, tag = "2")]
pub queue: ::core::option::Option<InputOutputQueue>,
}
/// A pool holding all of a canister's incoming and outgoing canister messages.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MessagePool {
Expand Down Expand Up @@ -235,10 +208,6 @@ pub mod canister_queue {
pub struct CanisterQueues {
#[prost(message, repeated, tag = "2")]
pub ingress_queue: ::prost::alloc::vec::Vec<super::super::ingress::v1::Ingress>,
#[prost(message, repeated, tag = "3")]
pub input_queues: ::prost::alloc::vec::Vec<QueueEntry>,
#[prost(message, repeated, tag = "5")]
pub output_queues: ::prost::alloc::vec::Vec<QueueEntry>,
#[prost(message, repeated, tag = "9")]
pub canister_queues: ::prost::alloc::vec::Vec<canister_queues::CanisterQueuePair>,
#[prost(message, optional, tag = "10")]
Expand Down
31 changes: 0 additions & 31 deletions rs/protobuf/src/gen/types/state.queues.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,33 +138,6 @@ pub struct MessageDeadline {
#[prost(uint64, tag = "2")]
pub index: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct InputOutputQueue {
#[prost(message, repeated, tag = "1")]
pub queue: ::prost::alloc::vec::Vec<RequestOrResponse>,
#[prost(uint64, tag = "2")]
pub begin: u64,
#[prost(uint64, tag = "3")]
pub capacity: u64,
#[prost(uint64, tag = "4")]
pub num_slots_reserved: u64,
/// Ordered ranges of messages having the same request deadline. Each range
/// is represented as a deadline and its end index (the `QueueIndex` just
/// past the last request where the deadline applies). Both the deadlines and
/// queue indices are strictly increasing.
#[prost(message, repeated, tag = "5")]
pub deadline_range_ends: ::prost::alloc::vec::Vec<MessageDeadline>,
/// Queue index from which request timing out will resume.
#[prost(uint64, tag = "6")]
pub timeout_index: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct QueueEntry {
#[prost(message, optional, tag = "1")]
pub canister_id: ::core::option::Option<super::super::super::types::v1::CanisterId>,
#[prost(message, optional, tag = "2")]
pub queue: ::core::option::Option<InputOutputQueue>,
}
/// A pool holding all of a canister's incoming and outgoing canister messages.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MessagePool {
Expand Down Expand Up @@ -235,10 +208,6 @@ pub mod canister_queue {
pub struct CanisterQueues {
#[prost(message, repeated, tag = "2")]
pub ingress_queue: ::prost::alloc::vec::Vec<super::super::ingress::v1::Ingress>,
#[prost(message, repeated, tag = "3")]
pub input_queues: ::prost::alloc::vec::Vec<QueueEntry>,
#[prost(message, repeated, tag = "5")]
pub output_queues: ::prost::alloc::vec::Vec<QueueEntry>,
#[prost(message, repeated, tag = "9")]
pub canister_queues: ::prost::alloc::vec::Vec<canister_queues::CanisterQueuePair>,
#[prost(message, optional, tag = "10")]
Expand Down
28 changes: 14 additions & 14 deletions rs/protobuf/src/state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,25 @@ fn huge_proto_encoding_roundtrip() {
deadline_seconds: 0,
})),
};
// A queue of 2K requests with 2 MB payloads.
let queue = vec![msg; 2 << 10];
let entry = message_pool::Entry {
id: 13,
message: Some(msg.clone()),
};

let q = InputOutputQueue {
queue,
begin: 2197,
capacity: 500,
num_slots_reserved: 14,
deadline_range_ends: Vec::new(),
timeout_index: 15,
// A pool of 2K requests with 2 MB payloads.
let pool = MessagePool {
messages: vec![entry; 2 << 10],
outbound_guaranteed_request_deadlines: vec![],
message_id_generator: 42,
};

let mut buf = vec![];
q.encode(&mut buf).unwrap();
// Expecting the encoded queue to be larger than 4 GB.
pool.encode(&mut buf).unwrap();
// Expecting the encoded pool to be larger than 4 GB.
assert!(buf.len() > 4 << 30);

let decoded_q = InputOutputQueue::decode(buf.as_slice()).unwrap();
let decoded_pool = MessagePool::decode(buf.as_slice()).unwrap();

// Ensure that decoding results in the same queue that we just encoded.
assert_eq!(q, decoded_q);
// Ensure that decoding results in the same pool that we just encoded.
assert_eq!(pool, decoded_pool);
}
160 changes: 54 additions & 106 deletions rs/replicated_state/src/canister_state/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1627,8 +1627,6 @@ impl From<&CanisterQueues> for pb_queues::CanisterQueues {

Self {
ingress_queue: (&item.ingress_queue).into(),
input_queues: Default::default(),
output_queues: Default::default(),
canister_queues: item
.canister_queues
.iter()
Expand Down Expand Up @@ -1660,115 +1658,65 @@ impl TryFrom<(pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics)> for Can
fn try_from(
(item, metrics): (pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics),
) -> Result<Self, Self::Error> {
let mut canister_queues = BTreeMap::new();
let mut pool = MessagePool::default();
let mut shed_responses = BTreeMap::new();
let pool = MessagePool::try_from(item.pool.unwrap_or_default())?;

if !item.input_queues.is_empty() || !item.output_queues.is_empty() {
// Backward compatibility: deserialize from `input_queues` and `output_queues`.

if item.pool.is_some() || !item.canister_queues.is_empty() {
return Err(ProxyDecodeError::Other(
"Both `input_queues`/`output_queues` and `pool`/`canister_queues` are populated"
.to_string(),
));
}

if item.input_queues.len() != item.output_queues.len() {
return Err(ProxyDecodeError::Other(format!(
"CanisterQueues: Mismatched input ({}) and output ({}) queue lengths",
item.input_queues.len(),
item.output_queues.len()
)));
}
for (ie, oe) in item
.input_queues
fn callback_references_try_from_proto(
callback_references: Vec<pb_queues::canister_queues::CallbackReference>,
) -> Result<BTreeMap<message_pool::InboundReference, CallbackId>, ProxyDecodeError>
{
callback_references
.into_iter()
.zip(item.output_queues.into_iter())
{
if ie.canister_id != oe.canister_id {
return Err(ProxyDecodeError::Other(format!(
"CanisterQueues: Mismatched input {:?} and output {:?} queue entries",
ie.canister_id, oe.canister_id
)));
}

let canister_id = try_from_option_field(ie.canister_id, "QueueEntry::canister_id")?;
let original_iq: queue::OldInputQueue =
try_from_option_field(ie.queue, "QueueEntry::queue")?;
let original_oq: queue::OldOutputQueue =
try_from_option_field(oe.queue, "QueueEntry::queue")?;
let iq = (original_iq, &mut pool).try_into()?;
let oq = (original_oq, &mut pool).try_into()?;

if canister_queues.insert(canister_id, (iq, oq)).is_some() {
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: Duplicate queues for canister {}",
canister_id
));
}
}
} else {
pool = item.pool.unwrap_or_default().try_into()?;

fn callback_references_try_from_proto(
callback_references: Vec<pb_queues::canister_queues::CallbackReference>,
) -> Result<BTreeMap<message_pool::InboundReference, CallbackId>, ProxyDecodeError>
{
callback_references
.into_iter()
.map(|cr_proto| {
let cr = message_pool::CallbackReference::try_from(cr_proto)?;
Ok((cr.0, cr.1))
})
.collect()
}
shed_responses = callback_references_try_from_proto(item.shed_responses)?;
.map(|cr_proto| {
let cr = message_pool::CallbackReference::try_from(cr_proto)?;
Ok((cr.0, cr.1))
})
.collect()
}
let shed_responses = callback_references_try_from_proto(item.shed_responses)?;

let mut enqueued_pool_messages = BTreeSet::new();
canister_queues = item
.canister_queues
.into_iter()
.map(|qp| {
let canister_id: CanisterId =
try_from_option_field(qp.canister_id, "CanisterQueuePair::canister_id")?;
let iq: InputQueue =
try_from_option_field(qp.input_queue, "CanisterQueuePair::input_queue")?;
let oq: OutputQueue =
try_from_option_field(qp.output_queue, "CanisterQueuePair::output_queue")?;

iq.iter().for_each(|&reference| {
if pool.get(reference).is_some()
&& !enqueued_pool_messages.insert(SomeReference::Inbound(reference))
{
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: {:?} enqueued more than once",
reference
));
}
});
oq.iter().for_each(|&reference| {
if pool.get(reference).is_some()
&& !enqueued_pool_messages.insert(SomeReference::Outbound(reference))
{
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: {:?} enqueued more than once",
reference
));
}
});
let mut enqueued_pool_messages = BTreeSet::new();
let canister_queues = item
.canister_queues
.into_iter()
.map(|qp| {
let canister_id: CanisterId =
try_from_option_field(qp.canister_id, "CanisterQueuePair::canister_id")?;
let iq: InputQueue =
try_from_option_field(qp.input_queue, "CanisterQueuePair::input_queue")?;
let oq: OutputQueue =
try_from_option_field(qp.output_queue, "CanisterQueuePair::output_queue")?;

iq.iter().for_each(|&reference| {
if pool.get(reference).is_some()
&& !enqueued_pool_messages.insert(SomeReference::Inbound(reference))
{
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: {:?} enqueued more than once",
reference
));
}
});
oq.iter().for_each(|&reference| {
if pool.get(reference).is_some()
&& !enqueued_pool_messages.insert(SomeReference::Outbound(reference))
{
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: {:?} enqueued more than once",
reference
));
}
});

Ok((canister_id, (iq, oq)))
})
.collect::<Result<_, Self::Error>>()?;
Ok((canister_id, (iq, oq)))
})
.collect::<Result<_, Self::Error>>()?;

if enqueued_pool_messages.len() != pool.len() {
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: Pool holds {} messages, but only {} of them are enqueued",
pool.len(),
enqueued_pool_messages.len()
));
}
if enqueued_pool_messages.len() != pool.len() {
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: Pool holds {} messages, but only {} of them are enqueued",
pool.len(),
enqueued_pool_messages.len()
));
}

let queue_stats = Self::calculate_queue_stats(
Expand Down
10 changes: 0 additions & 10 deletions rs/replicated_state/src/canister_state/queues/message_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,16 +650,6 @@ impl MessagePool {
self.messages.len()
}

/// Returns the implicitly assigned deadlines of enqueued outbound guaranteed
/// response requests.
pub(super) fn outbound_guaranteed_request_deadline<T>(
&self,
reference: Reference<T>,
) -> Option<&CoarseTime> {
self.outbound_guaranteed_request_deadlines
.get(&reference.into())
}

/// Returns a reference to the pool's message stats.
pub(super) fn message_stats(&self) -> &MessageStats {
&self.message_stats
Expand Down
Loading

0 comments on commit f8f2d84

Please sign in to comment.