Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: [MR-569] Drop old canister queue implementations #1733

Merged
merged 3 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 3 additions & 25 deletions rs/protobuf/def/state/queues/v1/queues.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,6 @@ message MessageDeadline {
uint64 index = 2;
}

message InputOutputQueue {
repeated RequestOrResponse queue = 1;
uint64 begin = 2;
uint64 capacity = 3;
uint64 num_slots_reserved = 4;
// Ordered ranges of messages having the same request deadline. Each range
// is represented as a deadline and its end index (the `QueueIndex` just
// past the last request where the deadline applies). Both the deadlines and
// queue indices are strictly increasing.
repeated MessageDeadline deadline_range_ends = 5;
// Queue index from which request timing out will resume.
uint64 timeout_index = 6;
}

message QueueEntry {
types.v1.CanisterId canister_id = 1;
InputOutputQueue queue = 2;
}

// A pool holding all of a canister's incoming and outgoing canister messages.
message MessagePool {
// A pool entry: a message keyed by its ID.
Expand Down Expand Up @@ -178,14 +159,8 @@ message CanisterQueue {
}

message CanisterQueues {
reserved 1, 4;
reserved "canister_id", "input_schedule";

repeated ingress.v1.Ingress ingress_queue = 2;

repeated QueueEntry input_queues = 3;
repeated QueueEntry output_queues = 5;

// Input queue from and output queue to `canister_id`.
message CanisterQueuePair {
types.v1.CanisterId canister_id = 1;
Expand Down Expand Up @@ -216,4 +191,7 @@ message CanisterQueues {
repeated types.v1.CanisterId remote_sender_schedule = 8;

uint64 guaranteed_response_memory_reservations = 11;

reserved 1, 3, 4, 5;
reserved "canister_id", "input_queues", "input_schedule", "output_queues";
}
31 changes: 0 additions & 31 deletions rs/protobuf/src/gen/state/state.queues.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,33 +138,6 @@ pub struct MessageDeadline {
#[prost(uint64, tag = "2")]
pub index: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct InputOutputQueue {
#[prost(message, repeated, tag = "1")]
pub queue: ::prost::alloc::vec::Vec<RequestOrResponse>,
#[prost(uint64, tag = "2")]
pub begin: u64,
#[prost(uint64, tag = "3")]
pub capacity: u64,
#[prost(uint64, tag = "4")]
pub num_slots_reserved: u64,
/// Ordered ranges of messages having the same request deadline. Each range
/// is represented as a deadline and its end index (the `QueueIndex` just
/// past the last request where the deadline applies). Both the deadlines and
/// queue indices are strictly increasing.
#[prost(message, repeated, tag = "5")]
pub deadline_range_ends: ::prost::alloc::vec::Vec<MessageDeadline>,
/// Queue index from which request timing out will resume.
#[prost(uint64, tag = "6")]
pub timeout_index: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct QueueEntry {
#[prost(message, optional, tag = "1")]
pub canister_id: ::core::option::Option<super::super::super::types::v1::CanisterId>,
#[prost(message, optional, tag = "2")]
pub queue: ::core::option::Option<InputOutputQueue>,
}
/// A pool holding all of a canister's incoming and outgoing canister messages.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MessagePool {
Expand Down Expand Up @@ -235,10 +208,6 @@ pub mod canister_queue {
pub struct CanisterQueues {
#[prost(message, repeated, tag = "2")]
pub ingress_queue: ::prost::alloc::vec::Vec<super::super::ingress::v1::Ingress>,
#[prost(message, repeated, tag = "3")]
pub input_queues: ::prost::alloc::vec::Vec<QueueEntry>,
#[prost(message, repeated, tag = "5")]
pub output_queues: ::prost::alloc::vec::Vec<QueueEntry>,
#[prost(message, repeated, tag = "9")]
pub canister_queues: ::prost::alloc::vec::Vec<canister_queues::CanisterQueuePair>,
#[prost(message, optional, tag = "10")]
Expand Down
31 changes: 0 additions & 31 deletions rs/protobuf/src/gen/types/state.queues.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,33 +138,6 @@ pub struct MessageDeadline {
#[prost(uint64, tag = "2")]
pub index: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct InputOutputQueue {
#[prost(message, repeated, tag = "1")]
pub queue: ::prost::alloc::vec::Vec<RequestOrResponse>,
#[prost(uint64, tag = "2")]
pub begin: u64,
#[prost(uint64, tag = "3")]
pub capacity: u64,
#[prost(uint64, tag = "4")]
pub num_slots_reserved: u64,
/// Ordered ranges of messages having the same request deadline. Each range
/// is represented as a deadline and its end index (the `QueueIndex` just
/// past the last request where the deadline applies). Both the deadlines and
/// queue indices are strictly increasing.
#[prost(message, repeated, tag = "5")]
pub deadline_range_ends: ::prost::alloc::vec::Vec<MessageDeadline>,
/// Queue index from which request timing out will resume.
#[prost(uint64, tag = "6")]
pub timeout_index: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct QueueEntry {
#[prost(message, optional, tag = "1")]
pub canister_id: ::core::option::Option<super::super::super::types::v1::CanisterId>,
#[prost(message, optional, tag = "2")]
pub queue: ::core::option::Option<InputOutputQueue>,
}
/// A pool holding all of a canister's incoming and outgoing canister messages.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MessagePool {
Expand Down Expand Up @@ -235,10 +208,6 @@ pub mod canister_queue {
pub struct CanisterQueues {
#[prost(message, repeated, tag = "2")]
pub ingress_queue: ::prost::alloc::vec::Vec<super::super::ingress::v1::Ingress>,
#[prost(message, repeated, tag = "3")]
pub input_queues: ::prost::alloc::vec::Vec<QueueEntry>,
#[prost(message, repeated, tag = "5")]
pub output_queues: ::prost::alloc::vec::Vec<QueueEntry>,
#[prost(message, repeated, tag = "9")]
pub canister_queues: ::prost::alloc::vec::Vec<canister_queues::CanisterQueuePair>,
#[prost(message, optional, tag = "10")]
Expand Down
28 changes: 14 additions & 14 deletions rs/protobuf/src/state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,25 @@ fn huge_proto_encoding_roundtrip() {
deadline_seconds: 0,
})),
};
// A queue of 2K requests with 2 MB payloads.
let queue = vec![msg; 2 << 10];
let entry = message_pool::Entry {
id: 13,
message: Some(msg.clone()),
};

let q = InputOutputQueue {
queue,
begin: 2197,
capacity: 500,
num_slots_reserved: 14,
deadline_range_ends: Vec::new(),
timeout_index: 15,
// A pool of 2K requests with 2 MB payloads.
let pool = MessagePool {
messages: vec![entry; 2 << 10],
outbound_guaranteed_request_deadlines: vec![],
message_id_generator: 42,
};

let mut buf = vec![];
q.encode(&mut buf).unwrap();
// Expecting the encoded queue to be larger than 4 GB.
pool.encode(&mut buf).unwrap();
// Expecting the encoded pool to be larger than 4 GB.
assert!(buf.len() > 4 << 30);

let decoded_q = InputOutputQueue::decode(buf.as_slice()).unwrap();
let decoded_pool = MessagePool::decode(buf.as_slice()).unwrap();

// Ensure that decoding results in the same queue that we just encoded.
assert_eq!(q, decoded_q);
// Ensure that decoding results in the same pool that we just encoded.
assert_eq!(pool, decoded_pool);
}
160 changes: 54 additions & 106 deletions rs/replicated_state/src/canister_state/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1627,8 +1627,6 @@ impl From<&CanisterQueues> for pb_queues::CanisterQueues {

Self {
ingress_queue: (&item.ingress_queue).into(),
input_queues: Default::default(),
output_queues: Default::default(),
canister_queues: item
.canister_queues
.iter()
Expand Down Expand Up @@ -1660,115 +1658,65 @@ impl TryFrom<(pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics)> for Can
fn try_from(
(item, metrics): (pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics),
) -> Result<Self, Self::Error> {
let mut canister_queues = BTreeMap::new();
let mut pool = MessagePool::default();
let mut shed_responses = BTreeMap::new();
let pool = MessagePool::try_from(item.pool.unwrap_or_default())?;

if !item.input_queues.is_empty() || !item.output_queues.is_empty() {
// Backward compatibility: deserialize from `input_queues` and `output_queues`.

if item.pool.is_some() || !item.canister_queues.is_empty() {
return Err(ProxyDecodeError::Other(
"Both `input_queues`/`output_queues` and `pool`/`canister_queues` are populated"
.to_string(),
));
}

if item.input_queues.len() != item.output_queues.len() {
return Err(ProxyDecodeError::Other(format!(
"CanisterQueues: Mismatched input ({}) and output ({}) queue lengths",
item.input_queues.len(),
item.output_queues.len()
)));
}
for (ie, oe) in item
.input_queues
fn callback_references_try_from_proto(
callback_references: Vec<pb_queues::canister_queues::CallbackReference>,
) -> Result<BTreeMap<message_pool::InboundReference, CallbackId>, ProxyDecodeError>
{
callback_references
.into_iter()
.zip(item.output_queues.into_iter())
{
if ie.canister_id != oe.canister_id {
return Err(ProxyDecodeError::Other(format!(
"CanisterQueues: Mismatched input {:?} and output {:?} queue entries",
ie.canister_id, oe.canister_id
)));
}

let canister_id = try_from_option_field(ie.canister_id, "QueueEntry::canister_id")?;
let original_iq: queue::OldInputQueue =
try_from_option_field(ie.queue, "QueueEntry::queue")?;
let original_oq: queue::OldOutputQueue =
try_from_option_field(oe.queue, "QueueEntry::queue")?;
let iq = (original_iq, &mut pool).try_into()?;
let oq = (original_oq, &mut pool).try_into()?;

if canister_queues.insert(canister_id, (iq, oq)).is_some() {
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: Duplicate queues for canister {}",
canister_id
));
}
}
} else {
pool = item.pool.unwrap_or_default().try_into()?;

fn callback_references_try_from_proto(
callback_references: Vec<pb_queues::canister_queues::CallbackReference>,
) -> Result<BTreeMap<message_pool::InboundReference, CallbackId>, ProxyDecodeError>
{
callback_references
.into_iter()
.map(|cr_proto| {
let cr = message_pool::CallbackReference::try_from(cr_proto)?;
Ok((cr.0, cr.1))
})
.collect()
}
shed_responses = callback_references_try_from_proto(item.shed_responses)?;
.map(|cr_proto| {
let cr = message_pool::CallbackReference::try_from(cr_proto)?;
Ok((cr.0, cr.1))
})
.collect()
}
let shed_responses = callback_references_try_from_proto(item.shed_responses)?;

let mut enqueued_pool_messages = BTreeSet::new();
canister_queues = item
.canister_queues
.into_iter()
.map(|qp| {
let canister_id: CanisterId =
try_from_option_field(qp.canister_id, "CanisterQueuePair::canister_id")?;
let iq: InputQueue =
try_from_option_field(qp.input_queue, "CanisterQueuePair::input_queue")?;
let oq: OutputQueue =
try_from_option_field(qp.output_queue, "CanisterQueuePair::output_queue")?;

iq.iter().for_each(|&reference| {
if pool.get(reference).is_some()
&& !enqueued_pool_messages.insert(SomeReference::Inbound(reference))
{
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: {:?} enqueued more than once",
reference
));
}
});
oq.iter().for_each(|&reference| {
if pool.get(reference).is_some()
&& !enqueued_pool_messages.insert(SomeReference::Outbound(reference))
{
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: {:?} enqueued more than once",
reference
));
}
});
let mut enqueued_pool_messages = BTreeSet::new();
let canister_queues = item
.canister_queues
.into_iter()
.map(|qp| {
let canister_id: CanisterId =
try_from_option_field(qp.canister_id, "CanisterQueuePair::canister_id")?;
let iq: InputQueue =
try_from_option_field(qp.input_queue, "CanisterQueuePair::input_queue")?;
let oq: OutputQueue =
try_from_option_field(qp.output_queue, "CanisterQueuePair::output_queue")?;

iq.iter().for_each(|&reference| {
if pool.get(reference).is_some()
&& !enqueued_pool_messages.insert(SomeReference::Inbound(reference))
{
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: {:?} enqueued more than once",
reference
));
}
});
oq.iter().for_each(|&reference| {
if pool.get(reference).is_some()
&& !enqueued_pool_messages.insert(SomeReference::Outbound(reference))
{
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: {:?} enqueued more than once",
reference
));
}
});

Ok((canister_id, (iq, oq)))
})
.collect::<Result<_, Self::Error>>()?;
Ok((canister_id, (iq, oq)))
})
.collect::<Result<_, Self::Error>>()?;

if enqueued_pool_messages.len() != pool.len() {
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: Pool holds {} messages, but only {} of them are enqueued",
pool.len(),
enqueued_pool_messages.len()
));
}
if enqueued_pool_messages.len() != pool.len() {
metrics.observe_broken_soft_invariant(format!(
"CanisterQueues: Pool holds {} messages, but only {} of them are enqueued",
pool.len(),
enqueued_pool_messages.len()
));
}

let queue_stats = Self::calculate_queue_stats(
Expand Down
10 changes: 0 additions & 10 deletions rs/replicated_state/src/canister_state/queues/message_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,16 +650,6 @@ impl MessagePool {
self.messages.len()
}

/// Returns the implicitly assigned deadlines of enqueued outbound guaranteed
/// response requests.
pub(super) fn outbound_guaranteed_request_deadline<T>(
&self,
reference: Reference<T>,
) -> Option<&CoarseTime> {
self.outbound_guaranteed_request_deadlines
.get(&reference.into())
}

/// Returns a reference to the pool's message stats.
pub(super) fn message_stats(&self) -> &MessageStats {
&self.message_stats
Expand Down
Loading
Loading