Skip to content

Commit

Permalink
Add test for SystemState::time_out_callbacks().
Browse files Browse the repository at this point in the history
  • Loading branch information
alin-at-dfinity committed Sep 30, 2024
1 parent 9368bbc commit 32c0d32
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,6 @@ impl CallContextManager {
/// current time.
///
/// Note: A given callback ID will be returned at most once by this function.
#[allow(dead_code)]
pub(super) fn expire_callbacks(&mut self, now: CoarseTime) -> impl Iterator<Item = CallbackId> {
const MIN_CALLBACK_ID: CallbackId = CallbackId::new(0);

Expand Down
165 changes: 151 additions & 14 deletions rs/replicated_state/tests/system_state.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
use assert_matches::assert_matches;
use ic_base_types::{NumBytes, NumSeconds};
use ic_error_types::RejectCode;
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::{
canister_state::DEFAULT_QUEUE_CAPACITY,
testing::{CanisterQueuesTesting, SystemStateTesting},
InputQueueType, StateError, SystemState,
use ic_replicated_state::canister_state::system_state::PausedExecutionId;
use ic_replicated_state::canister_state::DEFAULT_QUEUE_CAPACITY;
use ic_replicated_state::testing::{CanisterQueuesTesting, SystemStateTesting};
use ic_replicated_state::{CallOrigin, ExecutionTask, InputQueueType, StateError, SystemState};
use ic_test_utilities_types::ids::{canister_test_id, user_test_id};
use ic_test_utilities_types::messages::{RequestBuilder, ResponseBuilder};
use ic_types::messages::{
CallbackId, CanisterMessage, CanisterMessageOrTask, Payload, RejectContext, Request,
RequestMetadata, RequestOrResponse, Response, MAX_RESPONSE_COUNT_BYTES,
};
use ic_test_utilities_types::{
ids::{canister_test_id, user_test_id},
messages::{RequestBuilder, ResponseBuilder},
};
use ic_types::{
messages::{CanisterMessage, Request, RequestOrResponse, Response, MAX_RESPONSE_COUNT_BYTES},
time::{CoarseTime, UNIX_EPOCH},
CanisterId, Cycles,
};
use std::sync::Arc;
use ic_types::methods::{Callback, WasmClosure};
use ic_types::time::{CoarseTime, UNIX_EPOCH};
use ic_types::{CanisterId, Cycles, Time};
use std::{collections::BTreeMap, sync::Arc};

/// Figure out how many cycles a canister should have so that it can support the
/// given amount of storage for the given amount of time, given the storage fee.
Expand Down Expand Up @@ -52,6 +53,25 @@ fn default_output_response() -> Arc<Response> {
.into()
}

fn output_request(deadline: CoarseTime) -> Arc<Request> {
RequestBuilder::default()
.sender(CANISTER_ID)
.receiver(OTHER_CANISTER_ID)
.deadline(deadline)
.build()
.into()
}

fn input_response(callback_id: CallbackId, deadline: CoarseTime) -> RequestOrResponse {
ResponseBuilder::default()
.respondent(OTHER_CANISTER_ID)
.originator(CANISTER_ID)
.originator_reply_callback(callback_id)
.deadline(deadline)
.build()
.into()
}

fn default_request_to_self() -> Arc<Request> {
RequestBuilder::default()
.sender(CANISTER_ID)
Expand Down Expand Up @@ -141,6 +161,17 @@ impl SystemStateFixture {
SubnetType::Application,
);
}

/// Times out all callbacks with deadlines before `curernt_time`. Returns the
/// number of expired callbacks.
fn time_out_callbacks(&mut self, current_time: CoarseTime) -> usize {
let input_responses_before = self.system_state.queues().input_queues_response_count();
self.system_state
.time_out_callbacks(current_time, &CANISTER_ID, &BTreeMap::new());
let input_responses_after = self.system_state.queues().input_queues_response_count();

input_responses_after - input_responses_before
}
}

#[test]
Expand Down Expand Up @@ -365,3 +396,109 @@ fn induct_messages_to_self_full_queue() {
assert_eq!(None, fixture.pop_input());
assert_eq!(0, fixture.system_state.queues().output_message_count());
}

#[test]
fn time_out_callbacks() {
let mut fixture = SystemStateFixture::running();

let call_context_manager = fixture.system_state.call_context_manager_mut().unwrap();
let time = Time::from_nanos_since_unix_epoch(1);
let call_context_id = call_context_manager.new_call_context(
CallOrigin::SystemTask,
Cycles::zero(),
time,
RequestMetadata::new(0, time),
);

// Simulates an outbound call with the given deadline, by registering a callback
// and reserving a response slot.
let mut simulate_outbound_call = |deadline| {
// Register a callback.
let callback = Callback::new(
call_context_id,
CANISTER_ID,
OTHER_CANISTER_ID,
Cycles::zero(),
Cycles::new(42),
Cycles::new(84),
WasmClosure::new(0, 2),
WasmClosure::new(0, 2),
None,
deadline,
);
let call_context_manager = fixture.system_state.call_context_manager_mut().unwrap();
let callback_id = call_context_manager.register_callback(callback);

// Reserve a response slot.
fixture
.push_output_request(output_request(deadline))
.unwrap();
fixture.pop_output().unwrap();

callback_id
};

let deadline_expired_reject_payload = Payload::Reject(RejectContext::new(
RejectCode::SysUnknown,
"Call deadline has expired.",
));

let d1 = CoarseTime::from_secs_since_unix_epoch(1);
let d2 = CoarseTime::from_secs_since_unix_epoch(2);
let d3 = CoarseTime::from_secs_since_unix_epoch(3);

let c1 = simulate_outbound_call(d1);
let c2 = simulate_outbound_call(d1);
let c3 = simulate_outbound_call(d1);
let c4 = simulate_outbound_call(d2);

// Simulate a paused execution for `c1`.
fixture
.push_input(input_response(c1, d1), InputQueueType::RemoteSubnet)
.unwrap();
let response1 = fixture.pop_input().unwrap();
fixture
.system_state
.task_queue
.push_front(ExecutionTask::PausedExecution {
id: PausedExecutionId(1),
input: CanisterMessageOrTask::Message(response1),
});

// And enqueue a response for `c2`.
fixture
.push_input(input_response(c2, d1), InputQueueType::RemoteSubnet)
.unwrap();

// Time out callbacks with deadlines before `d2` (only applicable to `c3` now).
assert_eq!(1, fixture.time_out_callbacks(d2));

// Pop the response for `c2`.
assert_matches!(
fixture.pop_input(),
Some(CanisterMessage::Response(response))
if response.originator_reply_callback == c2
);

// Pop the reject response for `c3`.
assert_matches!(
fixture.pop_input(),
Some(CanisterMessage::Response(response))
if response.originator_reply_callback == c3 && response.response_payload == deadline_expired_reject_payload
);
assert_eq!(None, fixture.pop_input());

// Time out callbacks with deadlines before `d3` (i.e. `c4`).
assert_eq!(1, fixture.time_out_callbacks(d3));

// Pop the reject responses for `c4`.
assert_matches!(
fixture.pop_input(),
Some(CanisterMessage::Response(response))
if response.originator_reply_callback == c4 && response.response_payload == deadline_expired_reject_payload
);
assert_eq!(None, fixture.pop_input());

assert!(!fixture.system_state.has_input());
assert!(!fixture.system_state.queues().has_output());
}

0 comments on commit 32c0d32

Please sign in to comment.