Skip to content

Commit

Permalink
feat: [MR-607] Ignore non-matching best-effort responses (#1517)
Browse files Browse the repository at this point in the history
A best-effort response may be inducted after the matching call context
has already expired; the reject response has been executed; and the call
context was dropped. Because of this (and as opposed to guaranteed
responses, where a duplicate or non-matching callback implies malicious
behavior or a bug) best-effort responses that are either duplicates or
don't match an existing callback should be silently dropped.
  • Loading branch information
alin-at-dfinity authored Sep 18, 2024
1 parent fbde007 commit 70dc1a7
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 45 deletions.
66 changes: 41 additions & 25 deletions rs/replicated_state/src/canister_state/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,29 +359,36 @@ impl CanisterQueues {
self.ingress_queue.filter_messages(filter)
}

/// Pushes a canister-to-canister message into the induction pool.
/// Enqueues a canister-to-canister message into the induction pool.
///
/// If the message is a `Request` this will also reserve a slot in the
/// corresponding output queue for the eventual response.
/// If the message is a `Request` and is enqueued successfully, this will also
/// reserve a slot in the corresponding output queue for the eventual response.
///
/// If the message is a `Response` the protocol will have already reserved a
/// slot for it, so the push should not fail due to the input queue being full
/// (although an error may be returned in case of a bug in the upper layers).
/// If the message is a `Response`, `SystemState` will have already checked for
/// a matching callback:
///
/// Adds the sender to the appropriate input schedule (local or remote), if not
/// already there.
/// * If this is a guaranteed `Response`, the protocol should have reserved a
/// slot for it, so the push should not fail for lack of one (although an
/// error may be returned in case of a bug in the upper layers).
/// * If this is a best-effort `Response`, a slot is available and no duplicate
/// (time out) response is already enqueued, it is enqueued.
/// * If this is a best-effort `Response` and a duplicate (time out) response
/// is already enqueued (which is implicitly true when no slot is available),
/// the response is silently dropped and `Ok(())` is returned.
///
/// If the message was enqueued, adds the sender to the appropriate input
/// schedule (local or remote), if not already there.
///
/// # Errors
///
/// If pushing fails, returns the provided message along with a
/// `StateError`:
/// If pushing fails, returns the provided message along with a `StateError`:
///
/// * `QueueFull` if pushing a `Request` and the corresponding input or
/// output queues are full.
/// * `QueueFull` if pushing a `Request` and the corresponding input or output
/// queues are full.
///
/// * `NonMatchingResponse` if pushing a `Response` and the corresponding input
/// queue does not have a reserved slot; or this is a duplicate guaranteed
/// response.
/// * `NonMatchingResponse` if pushing a guaranteed `Response` and the
/// corresponding input queue does not have a reserved slot; or it is a
/// duplicate.
pub(super) fn push_input(
&mut self,
msg: RequestOrResponse,
Expand Down Expand Up @@ -411,8 +418,8 @@ impl CanisterQueues {
.insert(response.originator_reply_callback)
{
debug_assert_eq!(Ok(()), self.test_invariants());
// This is a critical error for guaranteed responses.
if response.deadline == NO_DEADLINE {
// This is a critical error for a guaranteed response.
return Err((
StateError::non_matching_response(
"Duplicate response",
Expand All @@ -421,8 +428,7 @@ impl CanisterQueues {
msg,
));
} else {
// But is OK for best-effort responses (if we already generated a timeout response).
// Silently ignore the response.
// But it's OK for a best-effort response. Silently drop it.
return Ok(());
}
}
Expand All @@ -431,13 +437,23 @@ impl CanisterQueues {

// Queue does not exist or has no reserved slot for this response.
_ => {
return Err((
StateError::non_matching_response(
"No reserved response slot",
response,
),
msg,
));
if response.deadline == NO_DEADLINE {
// Critical error for a guaranteed response.
return Err((
StateError::non_matching_response(
"No reserved response slot",
response,
),
msg,
));
} else {
// This must be a duplicate best-effort response (since `SystemState` has
// aleady checked for a matching callback). Silently drop it.

This comment has been minimized.

Copy link
@ilbertt

ilbertt Sep 22, 2024

typo: aleady -> already

debug_assert!(self
.callbacks_with_enqueued_response
.contains(&response.originator_reply_callback));
return Ok(());
}
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions rs/replicated_state/src/canister_state/system_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1148,9 +1148,13 @@ impl SystemState {
},
) => {
if let RequestOrResponse::Response(response) = &msg {
call_context_manager
.validate_response(response)
.map_err(|err| (err, msg.clone()))?;
if !call_context_manager
.should_enqueue(response)
.map_err(|err| (err, msg.clone()))?
{
// Best effort response whose callback is gone. Silently drop it.
return Ok(());
}
}
push_input(
&mut self.queues,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,13 +594,21 @@ impl CallContextManager {
self.callbacks.get(&callback_id).map(AsRef::as_ref)
}

/// Validates the given response before inducting it into the queue.
/// Tests whether the given response should be inducted or silently dropped.
/// Verifies that the stored respondent and originator associated with the
/// `callback_id`, as well as its deadline match those of the response.
///
/// Returns a `StateError::NonMatchingResponse` if the `callback_id` was not found
/// or if the response is not valid.
pub(crate) fn validate_response(&self, response: &Response) -> Result<(), StateError> {
/// Returns:
///
/// * `Ok(true)` if the response can be safely inducted.
/// * `Ok(false)` (drop silently) when a matching `callback_id` was not found
/// for a best-effort response (because the callback might have expired and
/// been closed).
/// * `Err(StateError::NonMatchingResponse)` when a matching `callback_id` was
/// not found for a guaranteed response.
/// * `Err(StateError::NonMatchingResponse)` if the response details do not
/// match those of the callback.
pub(crate) fn should_enqueue(&self, response: &Response) -> Result<bool, StateError> {
match self.callback(response.originator_reply_callback) {
Some(callback) if response.respondent != callback.respondent
|| response.originator != callback.originator
Expand All @@ -610,10 +618,17 @@ impl CallContextManager {
callback.originator, callback.respondent, Time::from(callback.deadline)
), response))
}
Some(_) => Ok(()),
Some(_) => Ok(true),
None => {
// Received an unknown callback ID.
Err(StateError::non_matching_response("unknown callback ID", response))
if response.deadline == NO_DEADLINE {
// This is an error for a guaranteed response.
Err(StateError::non_matching_response("unknown callback ID", response))
} else {
// But should be ignored in the case of a best-effort response (as the callback
// may have expired and been dropped in the meantime).
Ok(false)
}
}
}
}
Expand Down
140 changes: 129 additions & 11 deletions rs/replicated_state/src/canister_state/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::canister_state::system_state::{
use crate::metadata_state::subnet_call_context_manager::InstallCodeCallId;
use crate::CallOrigin;
use crate::Memory;
use assert_matches::assert_matches;
use ic_base_types::NumSeconds;
use ic_logger::replica_logger::no_op_logger;
use ic_management_canister_types::{
Expand Down Expand Up @@ -42,6 +43,7 @@ use strum::IntoEnumIterator;
const CANISTER_ID: CanisterId = CanisterId::from_u64(42);
const OTHER_CANISTER_ID: CanisterId = CanisterId::from_u64(13);
const SUBNET_AVAILABLE_MEMORY: i64 = i64::MAX / 2;
const SOME_DEADLINE: CoarseTime = CoarseTime::from_secs_since_unix_epoch(1);

fn default_input_request(deadline: CoarseTime) -> RequestOrResponse {
RequestBuilder::default()
Expand All @@ -52,11 +54,12 @@ fn default_input_request(deadline: CoarseTime) -> RequestOrResponse {
.into()
}

fn default_input_response(callback_id: CallbackId) -> Response {
fn default_input_response(callback_id: CallbackId, deadline: CoarseTime) -> Response {
ResponseBuilder::default()
.originator(CANISTER_ID)
.respondent(OTHER_CANISTER_ID)
.originator_reply_callback(callback_id)
.deadline(deadline)
.build()
}

Expand Down Expand Up @@ -92,7 +95,7 @@ impl CanisterStateFixture {
}
}

fn make_callback(&mut self) -> CallbackId {
fn make_callback(&mut self, deadline: CoarseTime) -> CallbackId {
let call_context_id = self
.canister_state
.system_state
Expand All @@ -118,7 +121,7 @@ impl CanisterStateFixture {
WasmClosure::new(0, 2),
WasmClosure::new(0, 2),
None,
NO_DEADLINE,
deadline,
))
}

Expand Down Expand Up @@ -159,12 +162,32 @@ fn canister_state_push_input_request_success() {
InputQueueType::RemoteSubnet,
)
.unwrap();
// Request was enqueued.
assert!(fixture.canister_state.has_input());
}

#[test]
fn canister_state_push_input_response_no_reserved_slot() {
fn canister_state_push_input_response_success() {
let mut fixture = CanisterStateFixture::new();
let response = default_input_response(fixture.make_callback());
// Reserve a slot in the input queue.
fixture.with_input_slot_reservation();
// Pushing input response should succeed.
let response = default_input_response(fixture.make_callback(NO_DEADLINE), NO_DEADLINE).into();
fixture
.push_input(
response,
SubnetType::Application,
InputQueueType::RemoteSubnet,
)
.unwrap();
// Response was enqueued.
assert!(fixture.canister_state.has_input());
}

#[test]
fn canister_state_push_input_guaranteed_response_no_reserved_slot() {
let mut fixture = CanisterStateFixture::new();
let response = default_input_response(fixture.make_callback(NO_DEADLINE), NO_DEADLINE);
assert_eq!(
Err((
StateError::non_matching_response("No reserved response slot", &response),
Expand All @@ -176,22 +199,116 @@ fn canister_state_push_input_response_no_reserved_slot() {
InputQueueType::RemoteSubnet
),
);
// Nothing was enqueued.
assert!(!fixture.canister_state.has_input());
}

#[test]
fn canister_state_push_input_response_success() {
fn canister_state_push_input_best_effort_response_no_reserved_slot() {
let mut fixture = CanisterStateFixture::new();
// Reserve a slot in the input queue.
fixture.with_input_slot_reservation();
// Pushing input response should succeed.
let response = default_input_response(fixture.make_callback()).into();
let response = default_input_response(fixture.make_callback(SOME_DEADLINE), SOME_DEADLINE);
// Push a matching response into the slot.
fixture
.push_input(
response.clone().into(),
SubnetType::Application,
InputQueueType::RemoteSubnet,
)
.unwrap();
// Pushing a second best-effort response without a reserved slot should fail
// silently.
fixture
.push_input(
response.clone().into(),
SubnetType::Application,
InputQueueType::RemoteSubnet,
)
.unwrap();
// Only one response was enqueued.
assert_eq!(
Some(CanisterMessage::Response(response.into())),
fixture.canister_state.pop_input()
);
assert!(!fixture.canister_state.has_input());
}

#[test]
fn canister_state_push_input_guaranteed_response_no_matching_callback() {
let mut fixture = CanisterStateFixture::new();
// Reserve a slot in the input queue.
fixture.with_input_slot_reservation();
// Pushing an input response with a mismatched callback should fail.
let response = default_input_response(CallbackId::from(1), NO_DEADLINE).into();
assert_matches!(
fixture.push_input(
response,
SubnetType::Application,
InputQueueType::RemoteSubnet
),
Err((StateError::NonMatchingResponse { .. }, _))
);

// Nothing was enqueued.
assert!(!fixture.canister_state.has_input());
}

#[test]
fn canister_state_push_input_best_effort_response_no_matching_callback() {
let mut fixture = CanisterStateFixture::new();
// Reserve a slot in the input queue.
fixture.with_input_slot_reservation();
// Push a best-effort input response with a nonexistent callback.
let response = default_input_response(CallbackId::from(1), SOME_DEADLINE).into();
fixture
.push_input(
response,
SubnetType::Application,
InputQueueType::RemoteSubnet,
)
.unwrap();

// Nothing was enqueued.
assert!(!fixture.canister_state.has_input());
}

#[test]
fn canister_state_push_input_guaranteed_response_mismatched_callback() {
let mut fixture = CanisterStateFixture::new();
let response = default_input_response(fixture.make_callback(SOME_DEADLINE), NO_DEADLINE);
assert_matches!(
fixture.push_input(
response.clone().into(),
SubnetType::Application,
InputQueueType::RemoteSubnet
),
Err((
StateError::NonMatchingResponse { err_str, .. },
r,
)) if err_str.contains("invalid details") && r == response.into()
);
// Nothing was enqueued.
assert!(!fixture.canister_state.has_input());
}

#[test]
fn canister_state_push_input_best_effort_response_mismatched_callback() {
let mut fixture = CanisterStateFixture::new();
let response = default_input_response(fixture.make_callback(NO_DEADLINE), SOME_DEADLINE);
assert_matches!(
fixture.push_input(
response.clone().into(),
SubnetType::Application,
InputQueueType::RemoteSubnet
),
Err((
StateError::NonMatchingResponse { err_str, .. },
r,
)) if err_str.contains("invalid details") && r == response.into()
);
// Nothing was enqueued.
assert!(!fixture.canister_state.has_input());
}

#[test]
Expand Down Expand Up @@ -275,7 +392,7 @@ fn system_subnet_local_push_input_request_ignores_subnet_memory() {
#[test]
fn application_subnet_push_input_best_effort_request_ignores_subnet_memory() {
canister_state_push_input_request_memory_limit_test_impl(
CoarseTime::from_secs_since_unix_epoch(1),
SOME_DEADLINE,
-13,
SubnetType::Application,
InputQueueType::RemoteSubnet,
Expand All @@ -286,7 +403,7 @@ fn application_subnet_push_input_best_effort_request_ignores_subnet_memory() {
#[test]
fn system_subnet_push_input_best_effort_request_ignores_subnet_memory() {
canister_state_push_input_request_memory_limit_test_impl(
CoarseTime::from_secs_since_unix_epoch(1),
SOME_DEADLINE,
-13,
SubnetType::System,
InputQueueType::RemoteSubnet,
Expand Down Expand Up @@ -444,7 +561,8 @@ fn canister_state_push_input_response_memory_limit_test_impl(

// Reserve a slot in the input queue.
fixture.with_input_slot_reservation();
let response: RequestOrResponse = default_input_response(fixture.make_callback()).into();
let response: RequestOrResponse =
default_input_response(fixture.make_callback(NO_DEADLINE), NO_DEADLINE).into();

let mut subnet_available_memory = -13;
fixture
Expand Down

0 comments on commit 70dc1a7

Please sign in to comment.