diff --git a/rs/replicated_state/src/canister_state/system_state/call_context_manager.rs b/rs/replicated_state/src/canister_state/system_state/call_context_manager.rs index 5a2603461a9..d73bb1514df 100644 --- a/rs/replicated_state/src/canister_state/system_state/call_context_manager.rs +++ b/rs/replicated_state/src/canister_state/system_state/call_context_manager.rs @@ -827,7 +827,6 @@ impl CallContextManager { /// current time. /// /// Note: A given callback ID will be returned at most once by this function. - #[allow(dead_code)] pub(super) fn expire_callbacks(&mut self, now: CoarseTime) -> impl Iterator { const MIN_CALLBACK_ID: CallbackId = CallbackId::new(0); diff --git a/rs/replicated_state/tests/system_state.rs b/rs/replicated_state/tests/system_state.rs index bc64cadec62..9bda6c00048 100644 --- a/rs/replicated_state/tests/system_state.rs +++ b/rs/replicated_state/tests/system_state.rs @@ -1,20 +1,21 @@ +use assert_matches::assert_matches; use ic_base_types::{NumBytes, NumSeconds}; +use ic_error_types::RejectCode; use ic_registry_subnet_type::SubnetType; -use ic_replicated_state::{ - canister_state::DEFAULT_QUEUE_CAPACITY, - testing::{CanisterQueuesTesting, SystemStateTesting}, - InputQueueType, StateError, SystemState, +use ic_replicated_state::canister_state::system_state::PausedExecutionId; +use ic_replicated_state::canister_state::DEFAULT_QUEUE_CAPACITY; +use ic_replicated_state::testing::{CanisterQueuesTesting, SystemStateTesting}; +use ic_replicated_state::{CallOrigin, ExecutionTask, InputQueueType, StateError, SystemState}; +use ic_test_utilities_types::ids::{canister_test_id, user_test_id}; +use ic_test_utilities_types::messages::{RequestBuilder, ResponseBuilder}; +use ic_types::messages::{ + CallbackId, CanisterMessage, CanisterMessageOrTask, Payload, RejectContext, Request, + RequestMetadata, RequestOrResponse, Response, MAX_RESPONSE_COUNT_BYTES, }; -use ic_test_utilities_types::{ - ids::{canister_test_id, user_test_id}, - messages::{RequestBuilder, ResponseBuilder}, -}; -use ic_types::{ - messages::{CanisterMessage, Request, RequestOrResponse, Response, MAX_RESPONSE_COUNT_BYTES}, - time::{CoarseTime, UNIX_EPOCH}, - CanisterId, Cycles, -}; -use std::sync::Arc; +use ic_types::methods::{Callback, WasmClosure}; +use ic_types::time::{CoarseTime, UNIX_EPOCH}; +use ic_types::{CanisterId, Cycles, Time}; +use std::{collections::BTreeMap, sync::Arc}; /// Figure out how many cycles a canister should have so that it can support the /// given amount of storage for the given amount of time, given the storage fee. @@ -52,6 +53,25 @@ fn default_output_response() -> Arc { .into() } +fn output_request(deadline: CoarseTime) -> Arc { + RequestBuilder::default() + .sender(CANISTER_ID) + .receiver(OTHER_CANISTER_ID) + .deadline(deadline) + .build() + .into() +} + +fn input_response(callback_id: CallbackId, deadline: CoarseTime) -> RequestOrResponse { + ResponseBuilder::default() + .respondent(OTHER_CANISTER_ID) + .originator(CANISTER_ID) + .originator_reply_callback(callback_id) + .deadline(deadline) + .build() + .into() +} + fn default_request_to_self() -> Arc { RequestBuilder::default() .sender(CANISTER_ID) @@ -141,6 +161,17 @@ impl SystemStateFixture { SubnetType::Application, ); } + + /// Times out all callbacks with deadlines before `curernt_time`. Returns the + /// number of expired callbacks. + fn time_out_callbacks(&mut self, current_time: CoarseTime) -> usize { + let input_responses_before = self.system_state.queues().input_queues_response_count(); + self.system_state + .time_out_callbacks(current_time, &CANISTER_ID, &BTreeMap::new()); + let input_responses_after = self.system_state.queues().input_queues_response_count(); + + input_responses_after - input_responses_before + } } #[test] @@ -365,3 +396,109 @@ fn induct_messages_to_self_full_queue() { assert_eq!(None, fixture.pop_input()); assert_eq!(0, fixture.system_state.queues().output_message_count()); } + +#[test] +fn time_out_callbacks() { + let mut fixture = SystemStateFixture::running(); + + let call_context_manager = fixture.system_state.call_context_manager_mut().unwrap(); + let time = Time::from_nanos_since_unix_epoch(1); + let call_context_id = call_context_manager.new_call_context( + CallOrigin::SystemTask, + Cycles::zero(), + time, + RequestMetadata::new(0, time), + ); + + // Simulates an outbound call with the given deadline, by registering a callback + // and reserving a response slot. + let mut simulate_outbound_call = |deadline| { + // Register a callback. + let callback = Callback::new( + call_context_id, + CANISTER_ID, + OTHER_CANISTER_ID, + Cycles::zero(), + Cycles::new(42), + Cycles::new(84), + WasmClosure::new(0, 2), + WasmClosure::new(0, 2), + None, + deadline, + ); + let call_context_manager = fixture.system_state.call_context_manager_mut().unwrap(); + let callback_id = call_context_manager.register_callback(callback); + + // Reserve a response slot. + fixture + .push_output_request(output_request(deadline)) + .unwrap(); + fixture.pop_output().unwrap(); + + callback_id + }; + + let deadline_expired_reject_payload = Payload::Reject(RejectContext::new( + RejectCode::SysUnknown, + "Call deadline has expired.", + )); + + let d1 = CoarseTime::from_secs_since_unix_epoch(1); + let d2 = CoarseTime::from_secs_since_unix_epoch(2); + let d3 = CoarseTime::from_secs_since_unix_epoch(3); + + let c1 = simulate_outbound_call(d1); + let c2 = simulate_outbound_call(d1); + let c3 = simulate_outbound_call(d1); + let c4 = simulate_outbound_call(d2); + + // Simulate a paused execution for `c1`. + fixture + .push_input(input_response(c1, d1), InputQueueType::RemoteSubnet) + .unwrap(); + let response1 = fixture.pop_input().unwrap(); + fixture + .system_state + .task_queue + .push_front(ExecutionTask::PausedExecution { + id: PausedExecutionId(1), + input: CanisterMessageOrTask::Message(response1), + }); + + // And enqueue a response for `c2`. + fixture + .push_input(input_response(c2, d1), InputQueueType::RemoteSubnet) + .unwrap(); + + // Time out callbacks with deadlines before `d2` (only applicable to `c3` now). + assert_eq!(1, fixture.time_out_callbacks(d2)); + + // Pop the response for `c2`. + assert_matches!( + fixture.pop_input(), + Some(CanisterMessage::Response(response)) + if response.originator_reply_callback == c2 + ); + + // Pop the reject response for `c3`. + assert_matches!( + fixture.pop_input(), + Some(CanisterMessage::Response(response)) + if response.originator_reply_callback == c3 && response.response_payload == deadline_expired_reject_payload + ); + assert_eq!(None, fixture.pop_input()); + + // Time out callbacks with deadlines before `d3` (i.e. `c4`). + assert_eq!(1, fixture.time_out_callbacks(d3)); + + // Pop the reject responses for `c4`. + assert_matches!( + fixture.pop_input(), + Some(CanisterMessage::Response(response)) + if response.originator_reply_callback == c4 && response.response_payload == deadline_expired_reject_payload + ); + assert_eq!(None, fixture.pop_input()); + + assert!(!fixture.system_state.has_input()); + assert!(!fixture.system_state.queues().has_output()); +}