Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [MR-552] Implement callback expiration #1699

Open
wants to merge 44 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
75461e9
feat: [MR-523] Prevent enqueueing multiple responses for the same cal…
alin-at-dfinity Sep 7, 2024
9534ca4
feat: [MR-603] Keep track of shed inbound responses
alin-at-dfinity Sep 7, 2024
e65c087
Define two separate CanisterInput variants of SysUnknown responses: D…
alin-at-dfinity Sep 8, 2024
c70e701
Merge branch 'master' into alin/MR-523-response-deduplication
alin-at-dfinity Sep 9, 2024
9fe3fa6
Merge branch 'alin/MR-523-response-deduplication' into alin/MR-603-sh…
alin-at-dfinity Sep 9, 2024
fa0e182
Make clippy happy.
alin-at-dfinity Sep 9, 2024
5c90ada
Address review comments.
alin-at-dfinity Sep 10, 2024
e2a5852
Merge branch 'alin/MR-523-response-deduplication' into alin/MR-603-sh…
alin-at-dfinity Sep 10, 2024
9b8bbf5
Update queues_compatibility_test to use mainnet version. Describe cal…
alin-at-dfinity Sep 12, 2024
9d0e758
Merge branch 'alin/MR-523-response-deduplication' into alin/MR-603-sh…
alin-at-dfinity Sep 12, 2024
54f89c3
Merge branch 'master' into alin/MR-603-shed-inbound-responses
alin-at-dfinity Sep 12, 2024
679d618
Produce SysUnknown reject codes for shed responses.
alin-at-dfinity Sep 12, 2024
e0b1bc3
Merge branch 'master' into alin/MR-603-shed-inbound-responses
alin-at-dfinity Sep 13, 2024
e55ba00
Add test for shedding inbound responses.
alin-at-dfinity Sep 16, 2024
d01c587
Have SystemState::pop_input() always succeed, by returning an arbitra…
alin-at-dfinity Sep 20, 2024
dc80ea3
Merge branch 'master' into alin/MR-603-shed-inbound-responses
alin-at-dfinity Sep 20, 2024
195b5e8
Address review comments: use the anonymous principal instead of IC-00…
alin-at-dfinity Sep 23, 2024
2b6fb3a
refactor: [MR-603] Typed canister queues and references
alin-at-dfinity Sep 26, 2024
efa16a2
Make clippy happy.
alin-at-dfinity Sep 26, 2024
4ede3ff
Make clippy even happier.
alin-at-dfinity Sep 26, 2024
0e7b71c
Address review comments: deduplicate `queue_front_not_stale()` out of…
alin-at-dfinity Sep 26, 2024
e0388bb
Merge branch 'master' into alin/MR-603-typed-queues
alin-at-dfinity Sep 26, 2024
523a1ce
Add tests for Reference<T> conversions. Improve documentations. MNino…
alin-at-dfinity Sep 27, 2024
61a3f17
Fix doc comment.
alin-at-dfinity Sep 28, 2024
46b514a
Merge branch 'master' into alin/MR-603-typed-queues
alin-at-dfinity Sep 28, 2024
913a0af
Make clippy happy.
alin-at-dfinity Sep 28, 2024
2da5b3b
Remove the now unnecessary and potentially conflicting Context argume…
alin-at-dfinity Sep 28, 2024
e9f462b
Have MessageStore::get() return &RequestOrResponse for output queues …
alin-at-dfinity Sep 28, 2024
9728778
Clippy.
alin-at-dfinity Sep 28, 2024
94937cc
Rely on free functions instead of implementing from to convert from I…
alin-at-dfinity Sep 30, 2024
0e185a2
Get rid of asserts about reference contexts altogether. Heve the queu…
alin-at-dfinity Sep 30, 2024
0a5a38e
feat: [MR-552] Implement callback expiration
alin-at-dfinity Sep 26, 2024
a4de1e3
Make clippy extatic.
alin-at-dfinity Sep 26, 2024
d141beb
Minor cleanup.
alin-at-dfinity Sep 29, 2024
d03ccd2
CanisterQueues tests.
alin-at-dfinity Sep 29, 2024
41d6b69
Add test for SystemState::time_out_callbacks().
alin-at-dfinity Sep 30, 2024
a55c769
Merge branch 'master' into alin/MR-552-callback-expiration
alin-at-dfinity Oct 1, 2024
f8e8ffc
Address review comments: avoid also allocating a Vec in InboundMessag…
alin-at-dfinity Oct 1, 2024
f1d4040
Address review comment: restructure a bit MessageStore<CanisterInput>…
alin-at-dfinity Oct 1, 2024
a8330fd
Address review comments: implement Reference::.is_inbound_best_effort…
alin-at-dfinity Oct 2, 2024
5a9dd15
Address review comments.
alin-at-dfinity Oct 2, 2024
4c46593
Merge branch 'master' into alin/MR-552-callback-expiration
alin-at-dfinity Oct 3, 2024
fceea1b
Address review comments.
alin-at-dfinity Oct 3, 2024
c4fb4b5
Add a bad weather test case for SystemState::time_out_callbacks().
alin-at-dfinity Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rs/protobuf/def/state/queues/v1/queues.proto
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ message CanisterQueues {
uint64 id = 1;
uint64 callback_id = 2;
}
repeated CallbackReference expired_callbacks = 13;
derlerd-dfinity marked this conversation as resolved.
Show resolved Hide resolved
repeated CallbackReference shed_responses = 12;

enum NextInputQueue {
Expand Down
2 changes: 2 additions & 0 deletions rs/protobuf/src/gen/state/state.queues.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ pub struct CanisterQueues {
pub canister_queues: ::prost::alloc::vec::Vec<canister_queues::CanisterQueuePair>,
#[prost(message, optional, tag = "10")]
pub pool: ::core::option::Option<MessagePool>,
#[prost(message, repeated, tag = "13")]
pub expired_callbacks: ::prost::alloc::vec::Vec<canister_queues::CallbackReference>,
#[prost(message, repeated, tag = "12")]
pub shed_responses: ::prost::alloc::vec::Vec<canister_queues::CallbackReference>,
#[prost(enumeration = "canister_queues::NextInputQueue", tag = "6")]
Expand Down
2 changes: 2 additions & 0 deletions rs/protobuf/src/gen/types/state.queues.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ pub struct CanisterQueues {
pub canister_queues: ::prost::alloc::vec::Vec<canister_queues::CanisterQueuePair>,
#[prost(message, optional, tag = "10")]
pub pool: ::core::option::Option<MessagePool>,
#[prost(message, repeated, tag = "13")]
pub expired_callbacks: ::prost::alloc::vec::Vec<canister_queues::CallbackReference>,
#[prost(message, repeated, tag = "12")]
pub shed_responses: ::prost::alloc::vec::Vec<canister_queues::CallbackReference>,
#[prost(enumeration = "canister_queues::NextInputQueue", tag = "6")]
Expand Down
203 changes: 148 additions & 55 deletions rs/replicated_state/src/canister_state/queues.rs

Large diffs are not rendered by default.

44 changes: 33 additions & 11 deletions rs/replicated_state/src/canister_state/queues/message_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@ impl Id {
Class::BestEffort
}
}

/// Tests whether this `Id` represents an inbound best-effort response.
fn is_inbound_best_effort_response(&self) -> bool {
derlerd-dfinity marked this conversation as resolved.
Show resolved Hide resolved
self.0 & (Context::BIT | Class::BIT | Kind::BIT)
== (Context::Inbound as u64 | Class::BestEffort as u64 | Kind::Response as u64)
}

/// Tests whether this `Id` represents an outbound guaranteed-response request.
fn is_outbound_guaranteed_request(&self) -> bool {
self.0 & (Context::BIT | Class::BIT | Kind::BIT)
== (Context::Outbound as u64 | Class::GuaranteedResponse as u64 | Kind::Request as u64)
}
}

/// A typed reference -- inbound (`CanisterInput`) or outbound
Expand All @@ -142,13 +154,19 @@ impl<T> Reference<T> {
Id::from(self).kind()
}

pub(super) fn context(&self) -> Context {
fn context(&self) -> Context {
Id::from(self).context()
}

pub(super) fn class(&self) -> Class {
#[allow(dead_code)]
derlerd-dfinity marked this conversation as resolved.
Show resolved Hide resolved
fn class(&self) -> Class {
Id::from(self).class()
}

/// Tests whether this is a reference to an inbound best-effort response.
pub(super) fn is_inbound_best_effort_response(&self) -> bool {
Id::from(self).is_inbound_best_effort_response()
}
}

impl<T> Clone for Reference<T> {
Expand Down Expand Up @@ -324,10 +342,7 @@ impl TryFrom<pb_queues::canister_queues::CallbackReference> for CallbackReferenc
type Error = ProxyDecodeError;
fn try_from(item: pb_queues::canister_queues::CallbackReference) -> Result<Self, Self::Error> {
let reference = Reference(item.id, PhantomData);
if reference.context() == Context::Inbound
&& reference.class() == Class::BestEffort
&& reference.kind() == Kind::Response
{
if reference.is_inbound_best_effort_response() {
Ok(CallbackReference(reference, item.callback_id.into()))
} else {
Err(ProxyDecodeError::Other(
Expand Down Expand Up @@ -411,6 +426,14 @@ impl MessagePool {
self.insert_impl(msg, actual_deadline, Context::Inbound)
}

/// Reserves an `InboundReference` for a timeout reject response for a
/// best-effort callback.
///
/// This is equivalent to inserting and then immediately removing the response.
pub(super) fn make_inbound_timeout_response_reference(&mut self) -> InboundReference {
self.next_reference(Class::BestEffort, Kind::Response)
}

/// Inserts an outbound request (one that is to be enqueued in an output queue)
/// into the pool. Returns the reference assigned to the request.
///
Expand Down Expand Up @@ -640,7 +663,9 @@ impl MessagePool {
.into_iter()
.map(|(_, id)| {
let msg = self.take_impl(id).unwrap();
self.outbound_guaranteed_request_deadlines.remove(&id);
if id.is_outbound_guaranteed_request() {
self.outbound_guaranteed_request_deadlines.remove(&id);
}
self.remove_from_size_queue(id, &msg);
(id.into(), msg)
})
Expand Down Expand Up @@ -736,10 +761,7 @@ impl MessagePool {
// guaranteed response requests (and nothing else).
let mut expected_outbound_guaranteed_request_ids = BTreeSet::new();
self.messages.keys().for_each(|id| {
if id.context() == Context::Outbound
&& id.class() == Class::GuaranteedResponse
&& id.kind() == Kind::Request
{
if id.is_outbound_guaranteed_request() {
expected_outbound_guaranteed_request_ids.insert(id);
}
});
Expand Down
109 changes: 82 additions & 27 deletions rs/replicated_state/src/canister_state/queues/message_pool/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,33 @@ fn test_get() {

#[test]
fn test_take() {
fn test_take_impl<T>(
request_id: Reference<T>,
response_id: Reference<T>,
request: Request,
response: Response,
pool: &mut MessagePool,
) {
let request: RequestOrResponse = request.into();
let response: RequestOrResponse = response.into();
alin-at-dfinity marked this conversation as resolved.
Show resolved Hide resolved

// Ensure that the messages are now in the pool.
assert_eq!(Some(&request), pool.get(request_id));
assert_eq!(Some(&response), pool.get(response_id));

// Actually take the messages.
assert_eq!(Some(request), pool.take(request_id));
assert_eq!(Some(response), pool.take(response_id));

// Messages are gone.
assert_eq!(None, pool.get(request_id));
assert_eq!(None, pool.get(response_id));

// And cannot be taken out again.
assert_eq!(None, pool.take(request_id));
assert_eq!(None, pool.take(response_id));
}

let mut pool = MessagePool::default();

for deadline in [NO_DEADLINE, time(13)] {
Expand All @@ -155,33 +182,6 @@ fn test_take() {
test_take_impl(request_id, response_id, request, response, &mut pool);
}
}

fn test_take_impl<T>(
request_id: Reference<T>,
response_id: Reference<T>,
request: Request,
response: Response,
pool: &mut MessagePool,
) {
let request: RequestOrResponse = request.into();
let response: RequestOrResponse = response.into();

// Ensure that the messages are now in the pool.
assert_eq!(Some(&request), pool.get(request_id));
assert_eq!(Some(&response), pool.get(response_id));

// Actually take the messages.
assert_eq!(Some(request), pool.take(request_id));
assert_eq!(Some(response), pool.take(response_id));

// Messages are gone.
assert_eq!(None, pool.get(request_id));
assert_eq!(None, pool.get(response_id));

// And cannot be taken out again.
assert_eq!(None, pool.take(request_id));
assert_eq!(None, pool.take(response_id));
}
}
}

Expand Down Expand Up @@ -562,13 +562,68 @@ fn test_id_from_reference_roundtrip() {
assert_eq!(reference.0, id.0);
assert_eq!(id, reference.into());
assert_eq!(SomeReference::Inbound(reference), SomeReference::from(id));
assert_eq!(
(id.context(), id.class(), id.kind()),
(reference.context(), reference.class(), reference.kind())
);

// Outbound.
let reference = OutboundReference::new(class, kind, 13);
let id = Id::from(reference);
assert_eq!(reference.0, id.0);
assert_eq!(id, reference.into());
assert_eq!(SomeReference::Outbound(reference), SomeReference::from(id));
assert_eq!(
(id.context(), id.class(), id.kind()),
(reference.context(), reference.class(), reference.kind())
);
}
}
}

#[test]
fn test_is_inbound_best_effort_response() {
use Class::*;
use Kind::*;

for kind in [Request, Response] {
for class in [GuaranteedResponse, BestEffort] {
let reference = InboundReference::new(class, kind, 13);
let id = Id::from(reference);
assert_eq!(
class == BestEffort && kind == Response,
id.is_inbound_best_effort_response()
);
assert_eq!(
class == BestEffort && kind == Response,
reference.is_inbound_best_effort_response()
);

let reference = OutboundReference::new(class, kind, 13);
let id = Id::from(reference);
assert!(!id.is_inbound_best_effort_response());
assert!(!reference.is_inbound_best_effort_response());
}
}
}

#[test]
fn test_is_outbound_guaranteed_request() {
use Class::*;
use Kind::*;

for kind in [Request, Response] {
for class in [GuaranteedResponse, BestEffort] {
let reference = InboundReference::new(class, kind, 13);
let id = Id::from(reference);
assert!(!id.is_outbound_guaranteed_request());

let reference = OutboundReference::new(class, kind, 13);
let id = Id::from(reference);
assert_eq!(
class == GuaranteedResponse && kind == Request,
id.is_outbound_guaranteed_request()
);
}
}
}
Expand Down
Loading
Loading