From 3221c593640b6514d7a7babb13306ffd19675dc4 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean <58422065+alin-at-dfinity@users.noreply.github.com> Date: Tue, 1 Oct 2024 09:46:42 +0200 Subject: [PATCH] refactor: [MR-603] Typed canister queues and references (#1697) [MR-603]: Assign type parameters to canister queues and references, to designate them as either input/inbound or output/outbound. Ensures that input queues can only hold inbound references; and output queues can only hold outbound references. Implements separate logic (for inbound and outbound references) for determining staleness, lookup and removal. [MR-603]: https://dfinity.atlassian.net/browse/MR-603?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ --- rs/messaging/tests/queue_tests.rs | 6 +- .../src/canister_state/queues.rs | 847 ++++++++++-------- .../canister_state/queues/input_schedule.rs | 4 +- .../queues/input_schedule/tests.rs | 6 +- .../src/canister_state/queues/message_pool.rs | 262 +++++- .../queues/message_pool/tests.rs | 339 +++++-- .../src/canister_state/queues/queue.rs | 129 +-- .../src/canister_state/queues/queue/tests.rs | 158 ++-- .../src/canister_state/queues/tests.rs | 112 +-- 9 files changed, 1169 insertions(+), 694 deletions(-) diff --git a/rs/messaging/tests/queue_tests.rs b/rs/messaging/tests/queue_tests.rs index 0ab428ca8b1..756130c24a7 100644 --- a/rs/messaging/tests/queue_tests.rs +++ b/rs/messaging/tests/queue_tests.rs @@ -178,7 +178,7 @@ impl SubnetPairProxy { &self.local_canister_id, &self.remote_canister_id, ) - .map(|iter| iter.collect::>()) + .map(|iter| iter.cloned().collect::>()) } /// Generates a snapshot of the output queue on the remote canister and @@ -189,7 +189,7 @@ impl SubnetPairProxy { &self.remote_canister_id, &self.local_canister_id, ) - .map(|iter| iter.collect::>()) + .map(|iter| iter.cloned().collect::>()) } /// Build backpressure on `local_env` until a minimum number of requests are found in the @@ -272,7 +272,7 @@ fn get_output_queue_iter<'a>( state: &'a ReplicatedState, local_canister_id: &CanisterId, remote_canister_id: &'a CanisterId, -) -> Option + 'a> { +) -> Option> { state .canister_states .get(local_canister_id) diff --git a/rs/replicated_state/src/canister_state/queues.rs b/rs/replicated_state/src/canister_state/queues.rs index 07f9c305111..c65f7932192 100644 --- a/rs/replicated_state/src/canister_state/queues.rs +++ b/rs/replicated_state/src/canister_state/queues.rs @@ -6,8 +6,10 @@ mod tests; pub use self::input_schedule::CanisterQueuesLoopDetector; use self::input_schedule::InputSchedule; -use self::message_pool::{Context, Kind, MessagePool}; -use self::queue::{CanisterQueue, IngressQueue}; +use self::message_pool::{ + Class, Context, InboundReference, Kind, MessagePool, OutboundReference, SomeReference, +}; +use self::queue::{CanisterQueue, IngressQueue, InputQueue, OutputQueue}; use crate::replicated_state::MR_SYNTHETIC_REJECT_MESSAGE_MAX_LEN; use crate::{CanisterState, CheckpointLoadingMetrics, InputQueueType, InputSource, StateError}; use ic_base_types::PrincipalId; @@ -24,7 +26,6 @@ use ic_types::messages::{ use ic_types::{CanisterId, CountBytes, Time}; use ic_validate_eq::ValidateEq; use ic_validate_eq_derive::ValidateEq; -use message_pool::Class; use prost::Message; use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::convert::{From, TryFrom}; @@ -50,28 +51,28 @@ pub const DEFAULT_QUEUE_CAPACITY: usize = 500; /// 2. Canister input and output queues: a map of pairs of canister input and /// output queues; one pair per canister (including ourselves). Canister /// queues come in pairs in order to reliably implement backpressure, by -/// reserving queue slots for responses: before a message can be enqueued +/// reserving queue slots for responses: before a request can be enqueued /// into an input / output queue, a response slot must have been reserved in /// the reverse output / input queue. /// -/// Canister queues hold references (of type `message_pool::Id`) into the -/// message pool (see below) or into maps of expired callbacks or shed -/// responses. Some references may be *stale* due to expiration or load -/// shedding. +/// Canister queues hold references (of type `message_pool::Reference`) +/// into the message pool (see below) or into maps of expired callbacks or +/// shed responses ("compact responses", represented as `CallbackIds`). Some +/// references may be *stale* due to expiration or load shedding. /// /// 3. Message pool (for the purpose of this breakdown, also includes the maps -/// of expired callbacks and shed responses): backing storage for canister -/// input and output queues. +/// of compact responses): backing storage for canister input and output +/// queues. /// /// The message pool holds the messages referenced from `canister_queues`, /// with support for time-based expiration and load shedding. Also maintains /// message count and size stats, broken down along several dimensions. /// /// In order to handle shedding of inbound responses; as well as for compact -/// representation of timeout responses; shed and expired `CallbackIds`` are -/// maintained in separate maps. When it peeks or pops such a `CallbackId`, -/// `SystemState` retrieves the `Callback` and synthesizes a reject response -/// based on it. +/// representation of timeout responses; shed and expired `CallbackIds` +/// ("compact responses") are maintained in separate maps. When it peeks or +/// pops such a `CallbackId`, `SystemState` retrieves the `Callback` and +/// synthesizes a reject response based on it. /// /// 4. Queue stats: slot and memory reservation stats, for efficient validation /// checks and memory usage calculations. @@ -104,15 +105,15 @@ pub const DEFAULT_QUEUE_CAPACITY: usize = 500; /// attempted implementation. /// /// * `callbacks_with_enqueued_response` contains the precise set of -/// `CallbackIds` of all inbound responses. +/// `CallbackIds` of all inbound responses and compact responses. /// /// # Soft invariants /// /// * `QueueStats`' input / output queue slot reservation stats are consistent /// with the actual number of reserved slots across input / output queues. /// -/// * All keys (references) in the pool and in the shed / expired callback maps -/// are enqueued in the canister queues exactly once. +/// * All keys (references) in the pool and in the compact response maps are +/// enqueued in the canister queues exactly once. /// /// * `InputSchedule` invariants: all non-empty input queues are scheduled; /// input schedules are internally consistent; local canisters are scheduled @@ -136,20 +137,16 @@ pub struct CanisterQueues { /// non-stale. A reference in an output queue is stale if there exists no /// corresponding message in the message pool. This can happen if the message /// was expired or shed. A reference in an input queue is stale if there exists - /// no corresponding message in the message pool; or entry in the - /// `shed_responses` map (which records the `CallbackIds` of shed inbound + /// no corresponding message in the message pool; or entry in the compact + /// response maps (which record the `CallbackIds` of expired / shed inbound /// best-effort responses). - canister_queues: BTreeMap, + canister_queues: BTreeMap, - /// Pool holding the messages referenced by `canister_queues`, providing message - /// stats (count, size) and support for time-based expiration and load shedding. + /// Backing store for `canister_queues` references, combining a `MessagePool` + /// and maps of compact responses (`CallbackIds` of expired / shed responses), + /// with specific behavior for inbound vs outbound messages. #[validate_eq(CompareWithValidateEq)] - pool: MessagePool, - - /// The `CallbackIds` of shed inbound best-effort responses, to be returned as - /// `UnknownResponse` by `peek_input()` / `pop_input()` (and then be "inflated" - /// by `SystemState` into a reject response based on the actual callback). - shed_responses: BTreeMap, + store: MessageStoreImpl, /// Slot and memory reservation stats. Message count and size stats are /// maintained separately in the `MessagePool`. @@ -161,7 +158,8 @@ pub struct CanisterQueues { input_schedule: InputSchedule, /// The `CallbackIds` of all responses enqueued in input queues, whether an - /// actual `Response` in the message pool or a `CallbackId` in `shed_responses`. + /// actual `Response` in the message pool or a compact response (`CallbackId`) + /// in `expired_callbacks` or `shed_responses`. /// /// Used for response deduplication (whether due to a locally generated reject /// response to a best-effort call; or due to a malicious / buggy subnet). @@ -181,10 +179,10 @@ pub struct CanisterQueues { pub struct CanisterOutputQueuesIterator<'a> { /// Priority queue of non-empty output queues. The next message to be popped /// / peeked is the one at the front of the first queue. - queues: VecDeque<(&'a CanisterId, &'a mut CanisterQueue)>, + queues: VecDeque<(&'a CanisterId, &'a mut OutputQueue)>, - /// Mutable pool holding the messages referenced by `queues`. - pool: &'a mut MessagePool, + /// Mutable store holding the messages referenced by `queues`. + store: &'a mut MessageStoreImpl, /// Number of (potentially stale) messages left in the iterator. size: usize, @@ -195,8 +193,8 @@ impl<'a> CanisterOutputQueuesIterator<'a> { /// `CanisterQueues::canister_queues` (a map of `CanisterId` to an input queue, /// output queue pair) and `MessagePool`. fn new( - queues: &'a mut BTreeMap, - pool: &'a mut MessagePool, + queues: &'a mut BTreeMap, + store: &'a mut MessageStoreImpl, ) -> Self { let queues: VecDeque<_> = queues .iter_mut() @@ -205,7 +203,11 @@ impl<'a> CanisterOutputQueuesIterator<'a> { .collect(); let size = Self::compute_size(&queues); - CanisterOutputQueuesIterator { queues, pool, size } + CanisterOutputQueuesIterator { + queues, + store, + size, + } } /// Returns the first message from the next queue. @@ -213,9 +215,7 @@ impl<'a> CanisterOutputQueuesIterator<'a> { let queue = &self.queues.front()?.1; let reference = queue.peek().expect("Empty queue in iterator."); - let msg = self.pool.get(reference); - assert!(msg.is_some(), "stale reference at front of output queue"); - msg + Some(self.store.get(reference)) } /// Pops the first message from the next queue. @@ -228,12 +228,11 @@ impl<'a> CanisterOutputQueuesIterator<'a> { self.size -= queue.len(); // Queue must be non-empty and message at the front of queue non-stale. - let msg = output_queue_pop_and_advance(queue, self.pool) + let msg = self + .store + .queue_pop_and_advance(queue) .expect("Empty queue in output iterator."); - debug_assert_eq!( - Ok(()), - queue_front_not_stale(queue, self.pool, &NO_SHED_RESPONSES, receiver) - ); + debug_assert_eq!(Ok(()), self.store.queue_front_not_stale(queue, receiver)); if queue.len() > 0 { self.size += queue.len(); @@ -281,7 +280,7 @@ impl<'a> CanisterOutputQueuesIterator<'a> { /// Computes the number of (potentially stale) messages left in `queues`. /// /// Time complexity: `O(n)`. - fn compute_size(queues: &VecDeque<(&'a CanisterId, &'a mut CanisterQueue)>) -> usize { + fn compute_size(queues: &VecDeque<(&'a CanisterId, &'a mut OutputQueue)>) -> usize { queues.iter().map(|(_, q)| q.len()).sum() } } @@ -308,7 +307,7 @@ impl Iterator for CanisterOutputQueuesIterator<'_> { /// `CanisterQueues::peek_input()`: in addition to the regular ingress messages /// and canister requests / responses, `pop_input()` / `peek_input()` may also /// return concise "reject response for callback ID" messages. -#[derive(Debug, PartialEq, Eq)] +#[derive(Clone, PartialEq, Eq, Debug)] pub(crate) enum CanisterInput { Ingress(Arc), Request(Arc), @@ -344,6 +343,262 @@ impl From for CanisterInput { } } +/// A backing store for canister input and output queues, consisting of: +/// +/// * a `MessagePool`, holding messages and providing message stats (count, +/// size) and support for time-based expiration and load shedding; and +/// * maps of compact resopnses (`CallbackIds` that have either expired or +/// whose responses have been shed. +/// +/// Implements the `MessageStore` trait for both inbound messages +/// (`T = CanisterInput` items that are eiter pooled messages or compact +/// responses) and outbound messages (pooled `RequestOrResponse items`). +#[derive(Clone, Eq, PartialEq, Debug, Default, ValidateEq)] +struct MessageStoreImpl { + /// Pool holding the messages referenced by `canister_queues`, providing message + /// stats (count, size) and support for time-based expiration and load shedding. + #[validate_eq(CompareWithValidateEq)] + pool: MessagePool, + + /// Compact reject responses (`CallbackIds`) replacing best-effort responses + /// that were shed. These are returned as `CanisterInput::ResponseDropped` by + /// `peek_input()` / `pop_input()` (and "inflated" by `SystemState` into + /// `SysUnknown` reject responses based on the callback). + shed_responses: BTreeMap, +} + +impl MessageStoreImpl { + /// Inserts an inbound message into the pool. + fn insert_inbound(&mut self, msg: RequestOrResponse) -> InboundReference { + self.pool.insert_inbound(msg) + } + + /// Pops and returns the item at the front of the given queue, advancing to the + /// next non-stale reference. + /// + /// Panics if the reference at the front of the queue is stale. + fn queue_pop_and_advance(&mut self, queue: &mut CanisterQueue) -> Option + where + MessageStoreImpl: MessageStore, + { + let reference = queue.pop()?; + + // Advance to the next non-stale reference. + self.queue_advance(queue); + + Some(self.take(reference)) + } + + /// Advances the queue to the next non-stale reference. + fn queue_advance(&mut self, queue: &mut CanisterQueue) + where + MessageStoreImpl: MessageStore, + { + queue.pop_while(|reference| self.is_stale(reference)); + } + + /// Returns `true` if `ingress_queue` or at least one of the canister input + /// queues is not empty; `false` otherwise. + pub fn has_input(&self) -> bool { + self.pool.message_stats().inbound_message_count > 0 || !self.shed_responses.is_empty() + } + + /// Returns `true` if at least one output queue is not empty; false otherwise. + pub fn has_output(&self) -> bool { + self.pool.message_stats().outbound_message_count > 0 + } + + /// Tests whether the message store contains neither pooled messages nor compact + /// responses. + fn is_empty(&self) -> bool { + self.pool.len() == 0 && self.shed_responses.is_empty() + } + + /// Helper function for concisely validating the hard invariant that a canister + /// queue is either empty or has a non-stale reference at the front, by writing + /// `debug_assert_eq!(Ok(()), store.queue_front_not_stale(...)`. + /// + /// Time complexity: `O(log(n))`. + fn queue_front_not_stale( + &self, + queue: &CanisterQueue, + canister_id: &CanisterId, + ) -> Result<(), String> + where + MessageStoreImpl: MessageStore, + { + if let Some(reference) = queue.peek() { + if self.is_stale(reference) { + return Err(format!( + "Stale reference at the front of {:?} queue to/from {}", + reference.context(), + canister_id + )); + } + } + + Ok(()) + } +} + +/// Defines context-specific (inbound / outbound) message store operations +/// (lookup, removal, staleness check) for `MessageStoreImpl`. +trait MessageStore { + /// The type returned by `get()`: `&T` if the implementation actually holds + /// items of type `T`; or the type `T` if it has to be built on demand. + type TRef<'a> + where + Self: 'a; + + /// Looks up the referenced item. Panics if the reference is stale. + fn get(&self, reference: message_pool::Reference) -> Self::TRef<'_>; + + /// Removes the referenced item. Panics if the reference is stale. + fn take(&mut self, reference: message_pool::Reference) -> T; + + /// Checks whether the given reference is stale (i.e. neither in the pool, nor + /// in one of the compact response maps iff inbound). + fn is_stale(&self, reference: message_pool::Reference) -> bool; +} + +impl MessageStore for MessageStoreImpl { + type TRef<'a> = CanisterInput; + + fn get(&self, reference: InboundReference) -> CanisterInput { + assert_eq!(Context::Inbound, reference.context()); + + if let Some(msg) = self.pool.get(reference) { + debug_assert!(!self.shed_responses.contains_key(&reference)); + return msg.clone().into(); + } else if reference.class() == Class::BestEffort && reference.kind() == Kind::Response { + if let Some(callback_id) = self.shed_responses.get(&reference) { + return CanisterInput::ResponseDropped(*callback_id); + } + } + + panic!("stale reference at the front of input queue"); + } + + fn take(&mut self, reference: InboundReference) -> CanisterInput { + assert_eq!(Context::Inbound, reference.context()); + + if let Some(msg) = self.pool.take(reference) { + debug_assert!(!self.shed_responses.contains_key(&reference)); + return msg.into(); + } else if reference.class() == Class::BestEffort && reference.kind() == Kind::Response { + if let Some(callback_id) = self.shed_responses.remove(&reference) { + return CanisterInput::ResponseDropped(callback_id); + } + } + + panic!("stale reference at the front of input queue"); + } + + fn is_stale(&self, reference: InboundReference) -> bool { + assert_eq!(Context::Inbound, reference.context()); + + self.pool.get(reference).is_none() + && (reference.class() != Class::BestEffort + || reference.kind() != Kind::Response + || !self.shed_responses.contains_key(&reference)) + } +} + +impl MessageStore for MessageStoreImpl { + type TRef<'a> = &'a RequestOrResponse; + + fn get(&self, reference: OutboundReference) -> &RequestOrResponse { + assert_eq!(Context::Outbound, reference.context()); + + self.pool + .get(reference) + .expect("stale reference at the front of output queue") + } + + fn take(&mut self, reference: OutboundReference) -> RequestOrResponse { + assert_eq!(Context::Outbound, reference.context()); + + self.pool + .take(reference) + .expect("stale reference at the front of output queue") + } + + fn is_stale(&self, reference: OutboundReference) -> bool { + assert_eq!(Context::Outbound, reference.context()); + self.pool.get(reference).is_none() + } +} + +trait InboundMessageStore: MessageStore { + /// Collects the `CallbackIds` of all responses and compact responses enqueued + /// in input queues. + /// + /// Returns an error if there are duplicate `CallbackIds` among the responses; + /// or if not all inbound responses or compact responses are enqueued. + /// + /// Time complexity: `O(n * log(n))`. + fn callbacks_with_enqueued_response( + &self, + canister_queues: &BTreeMap, + ) -> Result, String>; +} + +impl InboundMessageStore for MessageStoreImpl { + fn callbacks_with_enqueued_response( + &self, + canister_queues: &BTreeMap, + ) -> Result, String> { + let callbacks_vec = canister_queues + .values() + .flat_map(|(input_queue, _)| input_queue.iter()) + .filter_map(|reference| { + let (a, b) = ( + self.pool.get(*reference), + self.shed_responses.get(reference), + ); + match (a, b) { + // Pooled response. + (Some(RequestOrResponse::Response(rep)), None) => { + Some(Ok(rep.originator_reply_callback)) + } + + // Compact response. + (None, Some(callback_id)) => Some(Ok(*callback_id)), + + // Request or stale reference. + (Some(RequestOrResponse::Request(_)), None) | (None, None) => None, + + // Two or more of the above. This should never happen. + _ => Some(Err(format!( + "CanisterQueues: Multiple responses for {:?}", + reference + ))), + } + }) + .collect::, String>>()?; + + let callbacks: BTreeSet<_> = callbacks_vec.iter().cloned().collect(); + if callbacks.len() != callbacks_vec.len() { + return Err(format!( + "CanisterQueues: Duplicate inbound response callback(s): {:?}", + callbacks_vec + )); + } + + let response_count = + self.pool.message_stats().inbound_response_count + self.shed_responses.len(); + if callbacks_vec.len() != response_count { + return Err(format!( + "CanisterQueues: Have {} inbound responses, but only {} are enqueued", + response_count, + callbacks.len() + )); + } + + Ok(callbacks) + } +} + impl CanisterQueues { /// Pushes an ingress message into the induction pool. pub fn push_ingress(&mut self, msg: Ingress) { @@ -376,7 +631,7 @@ impl CanisterQueues { { for (canister_id, (_, queue)) in self.canister_queues.iter_mut() { while let Some(reference) = queue.peek() { - let Some(msg) = self.pool.get(reference) else { + let Some(msg) = self.store.pool.get(reference) else { // Expired / dropped message. Pop it and advance. assert_eq!(Some(reference), queue.pop()); continue; @@ -388,9 +643,7 @@ impl CanisterQueues { // Message was consumed, pop it. Ok(_) => { - self.pool - .take(reference) - .expect("get() returned a message, take() should not fail"); + self.store.take(reference); assert_eq!(Some(reference), queue.pop()); } } @@ -404,7 +657,7 @@ impl CanisterQueues { /// at a time from each in a round robin fashion. The iterator consumes all /// popped messages. pub(crate) fn output_into_iter(&mut self) -> CanisterOutputQueuesIterator { - CanisterOutputQueuesIterator::new(&mut self.canister_queues, &mut self.pool) + CanisterOutputQueuesIterator::new(&mut self.canister_queues, &mut self.store) } /// See `IngressQueue::filter_messages()` for documentation. @@ -516,7 +769,7 @@ impl CanisterQueues { }; self.queue_stats.on_push(&msg, Context::Inbound); - let reference = self.pool.insert_inbound(msg); + let reference = self.store.insert_inbound(msg); match reference.kind() { Kind::Request => input_queue.push_request(reference), Kind::Response => input_queue.push_response(reference), @@ -552,8 +805,7 @@ impl CanisterQueues { continue; }; - let msg = - input_queue_pop_and_advance(input_queue, &mut self.pool, &mut self.shed_responses); + let msg = self.store.queue_pop_and_advance(input_queue); // Update the input schedule. if input_queue.len() != 0 { @@ -594,9 +846,7 @@ impl CanisterQueues { .get(sender) .and_then(|(input_queue, _)| input_queue.peek()) { - let msg = self - .get_canister_input(reference) - .expect("stale reference at the front of input queue"); + let msg = self.store.get(reference); debug_assert_eq!(Ok(()), self.test_invariants()); debug_assert_eq!(Ok(()), self.schedules_ok(&|_| InputQueueType::RemoteSubnet)); return Some(msg); @@ -613,24 +863,6 @@ impl CanisterQueues { None } - /// Returns the `CanisterInput` corresponding to the given reference, by looking - /// it up in the message pool or in the shed inbound responses map. - fn get_canister_input(&self, reference: message_pool::Id) -> Option { - assert_eq!(Context::Inbound, reference.context()); - - if let Some(msg) = self.pool.get(reference) { - debug_assert!(!self.shed_responses.contains_key(&reference)); - Some(msg.clone().into()) - } else if reference.kind() == Kind::Response && reference.class() == Class::BestEffort { - self.shed_responses - .get(&reference) - .map(|callback_id| CanisterInput::ResponseDropped(*callback_id)) - } else { - debug_assert!(!self.shed_responses.contains_key(&reference)); - None - } - } - /// Skips the next sender canister from the given input schedule (local or /// remote). fn skip_canister_input(&mut self, input_queue_type: InputQueueType) { @@ -659,14 +891,12 @@ impl CanisterQueues { /// Returns `true` if `ingress_queue` or at least one of the canister input /// queues is not empty; `false` otherwise. pub fn has_input(&self) -> bool { - !self.ingress_queue.is_empty() - || self.pool.message_stats().inbound_message_count > 0 - || !self.shed_responses.is_empty() + !self.ingress_queue.is_empty() || self.store.has_input() } /// Returns `true` if at least one output queue is not empty; false otherwise. pub fn has_output(&self) -> bool { - self.pool.message_stats().outbound_message_count > 0 + self.store.has_output() } /// Peeks the ingress or inter-canister input message that would be returned by @@ -762,7 +992,7 @@ impl CanisterQueues { self.queue_stats .on_push_request(&request, Context::Outbound); - let reference = self.pool.insert_outbound_request(request, time); + let reference = self.store.pool.insert_outbound_request(request, time); output_queue.push_request(reference); debug_assert_eq!(Ok(()), self.test_invariants()); @@ -849,7 +1079,7 @@ impl CanisterQueues { .get_mut(&response.originator) .expect("pushing response into inexistent output queue") .1; - let reference = self.pool.insert_outbound_response(response); + let reference = self.store.pool.insert_outbound_response(response); output_queue.push_response(reference); debug_assert_eq!(Ok(()), self.test_invariants()); @@ -860,9 +1090,7 @@ impl CanisterQueues { pub(super) fn peek_output(&self, canister_id: &CanisterId) -> Option<&RequestOrResponse> { let output_queue = &self.canister_queues.get(canister_id)?.1; - let msg = self.pool.get(output_queue.peek()?); - assert!(msg.is_some(), "stale reference at front of output queue"); - msg + Some(self.store.get(output_queue.peek()?)) } /// Tries to induct a message from the output queue to `own_canister_id` @@ -879,7 +1107,8 @@ impl CanisterQueues { .get_mut(&own_canister_id) .expect("Output queue existed above so lookup should not fail.") .1; - output_queue_pop_and_advance(queue, &mut self.pool) + self.store + .queue_pop_and_advance(queue) .expect("Message peeked above so pop should not fail."); debug_assert_eq!(Ok(()), self.test_invariants()); @@ -887,6 +1116,11 @@ impl CanisterQueues { Ok(()) } + /// Returns a reference to the pool's message stats. + fn message_stats(&self) -> &message_pool::MessageStats { + self.store.pool.message_stats() + } + /// Returns the number of enqueued ingress messages. pub fn ingress_queue_message_count(&self) -> usize { self.ingress_queue.size() @@ -899,7 +1133,7 @@ impl CanisterQueues { /// Returns the number of non-stale canister messages enqueued in input queues. pub fn input_queues_message_count(&self) -> usize { - self.pool.message_stats().inbound_message_count + self.shed_responses.len() + self.message_stats().inbound_message_count + self.store.shed_responses.len() } /// Returns the number of reserved slots across all input queues. @@ -916,24 +1150,23 @@ impl CanisterQueues { /// responses, as these are constant size per callback and thus can be included /// in the cost of a callback. pub fn input_queues_size_bytes(&self) -> usize { - self.pool.message_stats().inbound_size_bytes - + self.canister_queues.len() * size_of::() + self.message_stats().inbound_size_bytes + + self.canister_queues.len() * size_of::() } /// Returns the number of non-stale requests enqueued in input queues. pub fn input_queues_request_count(&self) -> usize { - self.pool.message_stats().inbound_message_count - - self.pool.message_stats().inbound_response_count + self.message_stats().inbound_message_count - self.message_stats().inbound_response_count } /// Returns the number of non-stale responses enqueued in input queues. pub fn input_queues_response_count(&self) -> usize { - self.pool.message_stats().inbound_response_count + self.shed_responses.len() + self.message_stats().inbound_response_count + self.store.shed_responses.len() } /// Returns the number of actual (non-stale) messages in output queues. pub fn output_queues_message_count(&self) -> usize { - self.pool.message_stats().outbound_message_count + self.message_stats().outbound_message_count } /// Returns the number of reserved slots across all output queues. @@ -950,19 +1183,19 @@ impl CanisterQueues { /// responses, as these are constant size per callback and thus can be included /// in the cost of a callback. pub fn best_effort_memory_usage(&self) -> usize { - self.pool.message_stats().best_effort_message_bytes + self.message_stats().best_effort_message_bytes } /// Returns the memory usage of all guaranteed response messages. pub fn guaranteed_response_memory_usage(&self) -> usize { self.queue_stats.guaranteed_response_memory_usage() - + self.pool.message_stats().guaranteed_response_memory_usage() + + self.message_stats().guaranteed_response_memory_usage() } /// Returns the total byte size of guaranteed responses across input and /// output queues. pub fn guaranteed_responses_size_bytes(&self) -> usize { - self.pool.message_stats().guaranteed_responses_size_bytes + self.message_stats().guaranteed_responses_size_bytes } /// Returns the total memory reservations for guaranteed responses across input @@ -977,8 +1210,7 @@ impl CanisterQueues { /// Returns the sum total of bytes above `MAX_RESPONSE_COUNT_BYTES` per /// oversized guaranteed response call request. pub fn oversized_guaranteed_requests_extra_bytes(&self) -> usize { - self.pool - .message_stats() + self.message_stats() .oversized_guaranteed_requests_extra_bytes } @@ -1007,10 +1239,10 @@ impl CanisterQueues { // `CanisterQueues` serializes as an empty byte array (and there is no need to // persist it explicitly). if self.canister_queues.is_empty() && self.ingress_queue.is_empty() { - // The schedules and stats will already have default (zero) values, only `pool` + // The schedules and stats will already have default (zero) values, only `store` // and `input_schedule` must be reset explicitly. - debug_assert!(self.pool.len() == 0); - self.pool = MessagePool::default(); + debug_assert!(self.store.is_empty()); + self.store = MessageStoreImpl::default(); self.input_schedule = InputSchedule::default(); // Trust but verify. Ensure that the `CanisterQueues` now encodes to zero bytes. @@ -1040,13 +1272,13 @@ impl CanisterQueues { /// /// Time complexity: `O(1)`. pub fn has_expired_deadlines(&self, current_time: Time) -> bool { - self.pool.has_expired_deadlines(current_time) + self.store.pool.has_expired_deadlines(current_time) } - /// Drops expired messages given a current time, enqueuing a reject response for - /// own requests into the matching reverse queue (input or output). + /// Drops expired messages given a current time, enqueueing a reject response + /// for own requests into the matching reverse queue (input or output). /// - /// Updating the correct input queues schedule after enqueuing a reject response + /// Updating the correct input queues schedule after enqueueing a reject response /// into a previously empty input queue also requires the set of local canisters /// to decide whether the destination canister was local or remote. /// @@ -1057,16 +1289,17 @@ impl CanisterQueues { own_canister_id: &CanisterId, local_canisters: &BTreeMap, ) -> usize { - let expired_messages = self.pool.expire_messages(current_time); + let expired_messages = self.store.pool.expire_messages(current_time); + let expired_message_count = expired_messages.len(); let input_queue_type_fn = input_queue_type_fn(own_canister_id, local_canisters); - for (reference, msg) in expired_messages.iter() { - self.on_message_dropped(*reference, msg, &input_queue_type_fn); + for (reference, msg) in expired_messages.into_iter() { + self.on_message_dropped(reference, msg, &input_queue_type_fn); } debug_assert_eq!(Ok(()), self.test_invariants()); debug_assert_eq!(Ok(()), self.schedules_ok(&input_queue_type_fn)); - expired_messages.len() + expired_message_count } /// Removes the largest best-effort message in the underlying pool. Returns @@ -1080,9 +1313,9 @@ impl CanisterQueues { own_canister_id: &CanisterId, local_canisters: &BTreeMap, ) -> bool { - if let Some((reference, msg)) = self.pool.shed_largest_message() { + if let Some((reference, msg)) = self.store.pool.shed_largest_message() { let input_queue_type_fn = input_queue_type_fn(own_canister_id, local_canisters); - self.on_message_dropped(reference, &msg, &input_queue_type_fn); + self.on_message_dropped(reference, msg, &input_queue_type_fn); debug_assert_eq!(Ok(()), self.test_invariants()); debug_assert_eq!(Ok(()), self.schedules_ok(&input_queue_type_fn)); @@ -1094,47 +1327,87 @@ impl CanisterQueues { /// Handles the timing out or shedding of a message from the pool. /// - /// Records the callback of a shed inbound best-effort response. Releases the - /// outbound slot reservation of a shed inbound request. Generates and enqueues - /// a reject response if the message was an outbound request. Updates the stats - /// for the dropped message and (where applicable) the generated response. + /// Updates the stats, replaces shed inbound responses with compact reject + /// responses, generates reject responses for expired outbound requests, etc. /// /// `input_queue_type_fn` is required to determine the appropriate sender /// schedule to update when generating a reject response. fn on_message_dropped( &mut self, - reference: message_pool::Id, - msg: &RequestOrResponse, + reference: SomeReference, + msg: RequestOrResponse, input_queue_type_fn: impl Fn(&CanisterId) -> InputQueueType, ) { - use Context::*; - - // If this is an inbound response, remember its `originator_reply_callback`, so - // we can later produce an `UnknownResponse` for it, when popped. - let context = reference.context(); - if let (Inbound, RequestOrResponse::Response(response)) = (context, msg) { - assert_eq!( - None, - self.shed_responses - .insert(reference, response.originator_reply_callback) - ); + match reference { + SomeReference::Inbound(reference) => { + self.on_inbound_message_dropped(reference, msg); + } + SomeReference::Outbound(reference) => { + self.on_outbound_message_dropped(reference, msg, input_queue_type_fn); + } + } + } - // Leave the input queue unchanged, as "shed responses" are non-stale. - return; + /// Handles the timing out or shedding of an inbound message from the pool. + /// + /// Replaces a shed inbound best-effort response with a compact reject response. + /// Releases the outbound slot reservation of a shed or expired inbound request. + /// Updates the stats for the dropped message. + fn on_inbound_message_dropped(&mut self, reference: InboundReference, msg: RequestOrResponse) { + assert_eq!(Context::Inbound, reference.context()); + + match msg { + RequestOrResponse::Response(response) => { + // This is an inbound response, remember its `originator_reply_callback`, so + // we can later produce a `ResponseDropped` for it, when popped. + assert_eq!( + None, + self.store + .shed_responses + .insert(reference, response.originator_reply_callback) + ); + } + + RequestOrResponse::Request(request) => { + let remote = request.sender; + let (input_queue, output_queue) = self + .canister_queues + .get_mut(&remote) + .expect("No matching queue for dropped message."); + + if input_queue.peek() == Some(reference) { + input_queue.pop(); + self.store.queue_advance(input_queue); + } + + // Release the outbound response slot. + output_queue.release_reserved_response_slot(); + self.queue_stats.on_drop_input_request(&request); + } } + } - let remote = match context { - Inbound => msg.sender(), - Outbound => msg.receiver(), - }; + /// Handles the timing out or shedding of an outbound message from the pool. + /// + /// Generates and enqueues a reject response if the message was an outbound + /// request. Updates the stats for the dropped message and the generated + /// response. + /// + /// `input_queue_type_fn` is required to determine the appropriate sender + /// schedule to update when generating a reject response. + fn on_outbound_message_dropped( + &mut self, + reference: OutboundReference, + msg: RequestOrResponse, + input_queue_type_fn: impl Fn(&CanisterId) -> InputQueueType, + ) { + assert_eq!(Context::Outbound, reference.context()); + + let remote = msg.receiver(); let (input_queue, output_queue) = self .canister_queues .get_mut(&remote) .expect("No matching queue for dropped message."); - let (queue, reverse_queue) = match context { - Inbound => (input_queue, output_queue), - Outbound => (output_queue, input_queue), - }; // Ensure that the first reference in a queue is never stale: if we drop the // message at the front of a queue, advance to the first non-stale reference. @@ -1143,46 +1416,35 @@ impl CanisterQueues { // `on_message_dropped()` call if multiple messages expired at once (e.g. given // a queue containing references `[1, 2]`; `1` and `2` expire as part of the // same `time_out_messages()` call; `on_message_dropped(1)` will also pop `2`). - if queue.peek() == Some(reference) { - queue.pop(); - queue.pop_while(|reference| is_stale(reference, &self.pool, &self.shed_responses)); + if output_queue.peek() == Some(reference) { + output_queue.pop(); + self.store.queue_advance(output_queue); } - // Release the response slot, generate reject responses or remember shed inbound - // responses, as necessary. - match (context, msg) { - // Inbound request: release the outbound response slot. - (Inbound, RequestOrResponse::Request(request)) => { - reverse_queue.release_reserved_response_slot(); - self.queue_stats.on_drop_input_request(request); - } - - // Outbound request: enqueue a `SYS_TRANSIENT` timeout reject response. - (Outbound, RequestOrResponse::Request(request)) => { - let response = generate_timeout_response(request); + match msg { + RequestOrResponse::Request(request) => { + let response = generate_timeout_response(&request); // Update stats for the generated response. - self.queue_stats.on_push_response(&response, Inbound); + self.queue_stats + .on_push_response(&response, Context::Inbound); assert!(self .callbacks_with_enqueued_response .insert(response.originator_reply_callback)); - let reference = self.pool.insert_inbound(response.into()); - reverse_queue.push_response(reference); + let reference = self.store.insert_inbound(response.into()); + input_queue.push_response(reference); // If the input queue is not already in a sender schedule, add it. - if reverse_queue.len() == 1 { + if input_queue.len() == 1 { let input_queue_type = input_queue_type_fn(&remote); self.input_schedule.schedule(remote, input_queue_type); } } - (Inbound, RequestOrResponse::Response(_)) => { - unreachable!("This case is handled above"); + RequestOrResponse::Response(_) => { + // Outbound (best-effort) responses can be dropped with impunity. } - - // Outbound (best-effort) responses can be dropped with impunity. - (Outbound, RequestOrResponse::Response(_)) => {} } } @@ -1239,8 +1501,9 @@ impl CanisterQueues { // Invariant: all canister queues (input or output) are either empty or start // with a non-stale reference. for (canister_id, (input_queue, output_queue)) in self.canister_queues.iter() { - queue_front_not_stale(input_queue, &self.pool, &self.shed_responses, canister_id)?; - queue_front_not_stale(output_queue, &self.pool, &NO_SHED_RESPONSES, canister_id)?; + self.store.queue_front_not_stale(input_queue, canister_id)?; + self.store + .queue_front_not_stale(output_queue, canister_id)?; } // Reserved slot stats match the actual number of reserved slots. @@ -1259,11 +1522,9 @@ impl CanisterQueues { // `callbacks_with_enqueued_response` contains the precise set of `CallbackIds` // of all inbound responses. - let enqueued_response_callbacks = callbacks_with_enqueued_response( - &self.canister_queues, - &self.pool, - &self.shed_responses, - )?; + let enqueued_response_callbacks = self + .store + .callbacks_with_enqueued_response(&self.canister_queues)?; if self.callbacks_with_enqueued_response != enqueued_response_callbacks { return Err(format!( "Inconsistent `callbacks_with_enqueued_response`:\n expected: {:?}\n actual: {:?}", @@ -1281,7 +1542,7 @@ impl CanisterQueues { /// /// Time complexity: `O(canister_queues.len())`. fn calculate_queue_stats( - canister_queues: &BTreeMap, + canister_queues: &BTreeMap, guaranteed_response_memory_reservations: usize, transient_stream_guaranteed_responses_size_bytes: usize, ) -> QueueStats { @@ -1306,9 +1567,9 @@ impl CanisterQueues { /// Written as a free function in order to avoid borrowing the full /// `CanisterQueues`, which then requires looking up the queues again. fn get_or_insert_queues<'a>( - canister_queues: &'a mut BTreeMap, + canister_queues: &'a mut BTreeMap, canister_id: &CanisterId, -) -> (&'a mut CanisterQueue, &'a mut CanisterQueue) { +) -> (&'a mut InputQueue, &'a mut OutputQueue) { let (input_queue, output_queue) = canister_queues.entry(*canister_id).or_insert_with(|| { let input_queue = CanisterQueue::new(DEFAULT_QUEUE_CAPACITY); let output_queue = CanisterQueue::new(DEFAULT_QUEUE_CAPACITY); @@ -1317,143 +1578,6 @@ fn get_or_insert_queues<'a>( (input_queue, output_queue) } -/// Checks whether the given reference is stale (i.e. neither in the pool, nor -/// a shed inbound response). -fn is_stale( - reference: message_pool::Id, - pool: &MessagePool, - shed_responses: &BTreeMap, -) -> bool { - pool.get(reference).is_none() - && (reference.context() != Context::Inbound - || reference.kind() != Kind::Response - || !shed_responses.contains_key(&reference)) -} - -/// An emoty map of shed responses, to be used for staleness checks in output -/// queues (that cannot contain any inbound shed responses). -const NO_SHED_RESPONSES: BTreeMap = BTreeMap::new(); - -/// Pops and returns the reference at the front of the given input queue and -/// advances the queue to the next non-stale reference. -fn input_queue_pop_and_advance( - queue: &mut CanisterQueue, - pool: &mut MessagePool, - shed_responses: &mut BTreeMap, -) -> Option { - let reference = queue.pop()?; - assert_eq!(Context::Inbound, reference.context()); - - // Advance to the next non-stale reference. - queue.pop_while(|reference| is_stale(reference, pool, shed_responses)); - - // Message must be either pooled; or a previously shed inbound response. - let msg = pool - .take(reference) - .map(|msg| msg.into()) - .unwrap_or_else(|| { - debug_assert_eq!(Kind::Response, reference.kind()); - debug_assert_eq!(Class::BestEffort, reference.class()); - - CanisterInput::ResponseDropped( - shed_responses - .remove(&reference) - .expect("stale reference at the front of input queue"), - ) - }); - Some(msg) -} - -/// Pops and returns the reference at the front of the given output queue and -/// advances the queue to the next non-stale reference. -fn output_queue_pop_and_advance( - queue: &mut CanisterQueue, - pool: &mut MessagePool, -) -> Option { - let reference = queue.pop()?; - assert!(reference.context() == Context::Outbound); - - queue.pop_while(|reference| is_stale(reference, pool, &NO_SHED_RESPONSES)); - - let msg = pool.take(reference); - assert!(msg.is_some(), "stale reference at the front of queue"); - msg -} - -/// Helper function for concisely validating the hard invariant that a canister -/// queue is either empty of has a non-stale reference at the front, by writing -/// `debug_assert_eq!(Ok(()), queue_front_not_stale(...)`. -/// -/// Time complexity: `O(log(n))`. -fn queue_front_not_stale( - queue: &CanisterQueue, - pool: &MessagePool, - shed_responses: &BTreeMap, - canister_id: &CanisterId, -) -> Result<(), String> { - if let Some(reference) = queue.peek() { - if is_stale(reference, pool, shed_responses) { - return Err(format!( - "Stale reference at the front of {:?} queue to/from {}", - reference.context(), - canister_id - )); - } - } - - Ok(()) -} - -/// Collects the `CallbackIds` of all responses and shed responses enqueued in -/// input queues. -/// -/// Returns an error if there are duplicate `CallbackIds` among the responses; -/// or if not all inbound responses or shed responses are enqueued. -/// -/// Time complexity: `O(n * log(n))`. -fn callbacks_with_enqueued_response( - canister_queues: &BTreeMap, - pool: &MessagePool, - shed_responses: &BTreeMap, -) -> Result, String> { - let callbacks_vec = canister_queues - .values() - .flat_map(|(input_queue, _)| input_queue.iter()) - .filter_map( - |reference| match (pool.get(*reference), shed_responses.get(reference)) { - (Some(RequestOrResponse::Response(rep)), None) => { - Some(Ok(rep.originator_reply_callback)) - } - (None, Some(callback_id)) => Some(Ok(*callback_id)), - (Some(_), Some(_)) => Some(Err(format!( - "CanisterQueues: Both response and shed response for reference {:?}", - reference - ))), - _ => None, - }, - ) - .collect::, String>>()?; - - let callbacks: BTreeSet<_> = callbacks_vec.iter().cloned().collect(); - if callbacks.len() != callbacks_vec.len() { - return Err(format!( - "CanisterQueues: Duplicate inbound response callback(s): {:?}", - callbacks_vec - )); - } - - let response_count = pool.message_stats().inbound_response_count + shed_responses.len(); - if callbacks_vec.len() != response_count { - return Err(format!( - "CanisterQueues: Have {} inbound responses, but only {} are enqueued", - response_count, - callbacks.len() - )); - } - - Ok(callbacks) -} - /// Generates a timeout reject response from a request, refunding its payment. fn generate_timeout_response(request: &Request) -> Response { Response { @@ -1489,6 +1613,15 @@ fn input_queue_type_fn<'a>( impl From<&CanisterQueues> for pb_queues::CanisterQueues { fn from(item: &CanisterQueues) -> Self { + fn callback_references_to_proto( + callback_references: &BTreeMap, + ) -> Vec { + callback_references + .iter() + .map(|(&id, &callback_id)| message_pool::CallbackReference(id, callback_id).into()) + .collect() + } + let (next_input_source, local_sender_schedule, remote_sender_schedule) = (&item.input_schedule).into(); @@ -1505,16 +1638,12 @@ impl From<&CanisterQueues> for pb_queues::CanisterQueues { output_queue: Some(oq.into()), }) .collect(), - pool: if item.pool != MessagePool::default() { - Some((&item.pool).into()) + pool: if item.store.pool != MessagePool::default() { + Some((&item.store.pool).into()) } else { None }, - shed_responses: item - .shed_responses - .iter() - .map(|(&id, &callback_id)| message_pool::CallbackReference(id, callback_id).into()) - .collect(), + shed_responses: callback_references_to_proto(&item.store.shed_responses), next_input_source, local_sender_schedule, remote_sender_schedule, @@ -1565,9 +1694,9 @@ impl TryFrom<(pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics)> for Can } let canister_id = try_from_option_field(ie.canister_id, "QueueEntry::canister_id")?; - let original_iq: queue::InputQueue = + let original_iq: queue::OldInputQueue = try_from_option_field(ie.queue, "QueueEntry::queue")?; - let original_oq: queue::OutputQueue = + let original_oq: queue::OldOutputQueue = try_from_option_field(oe.queue, "QueueEntry::queue")?; let iq = (original_iq, &mut pool).try_into()?; let oq = (original_oq, &mut pool).try_into()?; @@ -1581,14 +1710,20 @@ impl TryFrom<(pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics)> for Can } } else { pool = item.pool.unwrap_or_default().try_into()?; - shed_responses = item - .shed_responses - .into_iter() - .map(|sr| { - let sr = message_pool::CallbackReference::try_from(sr)?; - Ok((sr.0, sr.1)) - }) - .collect::>()?; + + fn callback_references_try_from_proto( + callback_references: Vec, + ) -> Result, ProxyDecodeError> + { + callback_references + .into_iter() + .map(|cr_proto| { + let cr = message_pool::CallbackReference::try_from(cr_proto)?; + Ok((cr.0, cr.1)) + }) + .collect() + } + shed_responses = callback_references_try_from_proto(item.shed_responses)?; let mut enqueued_pool_messages = BTreeSet::new(); canister_queues = item @@ -1597,18 +1732,24 @@ impl TryFrom<(pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics)> for Can .map(|qp| { let canister_id: CanisterId = try_from_option_field(qp.canister_id, "CanisterQueuePair::canister_id")?; - let iq: CanisterQueue = try_from_option_field( - qp.input_queue.map(|q| (q, Context::Inbound)), - "CanisterQueuePair::input_queue", - )?; - let oq: CanisterQueue = try_from_option_field( - qp.output_queue.map(|q| (q, Context::Outbound)), - "CanisterQueuePair::output_queue", - )?; - - iq.iter().chain(oq.iter()).for_each(|&reference| { + let iq: InputQueue = + try_from_option_field(qp.input_queue, "CanisterQueuePair::input_queue")?; + let oq: OutputQueue = + try_from_option_field(qp.output_queue, "CanisterQueuePair::output_queue")?; + + iq.iter().for_each(|&reference| { + if pool.get(reference).is_some() + && !enqueued_pool_messages.insert(SomeReference::Inbound(reference)) + { + metrics.observe_broken_soft_invariant(format!( + "CanisterQueues: {:?} enqueued more than once", + reference + )); + } + }); + oq.iter().for_each(|&reference| { if pool.get(reference).is_some() - && !enqueued_pool_messages.insert(reference) + && !enqueued_pool_messages.insert(SomeReference::Outbound(reference)) { metrics.observe_broken_soft_invariant(format!( "CanisterQueues: {:?} enqueued more than once", @@ -1642,15 +1783,18 @@ impl TryFrom<(pb_queues::CanisterQueues, &dyn CheckpointLoadingMetrics)> for Can item.remote_sender_schedule, ))?; - let callbacks_with_enqueued_response = - callbacks_with_enqueued_response(&canister_queues, &pool, &shed_responses) - .map_err(ProxyDecodeError::Other)?; + let store = MessageStoreImpl { + pool, + shed_responses, + }; + let callbacks_with_enqueued_response = store + .callbacks_with_enqueued_response(&canister_queues) + .map_err(ProxyDecodeError::Other)?; let queues = Self { ingress_queue: IngressQueue::try_from(item.ingress_queue)?, canister_queues, - pool, - shed_responses, + store, queue_stats, input_schedule, callbacks_with_enqueued_response, @@ -1715,7 +1859,7 @@ impl QueueStats { + self.transient_stream_guaranteed_responses_size_bytes } - /// Updates the stats to reflect the enqueuing of the given message in the given + /// Updates the stats to reflect the enqueueing of the given message in the given /// context. fn on_push(&mut self, msg: &RequestOrResponse, context: Context) { match msg { @@ -1724,8 +1868,8 @@ impl QueueStats { } } - /// Updates the stats to reflect the enqueuing of the given request in the given - /// context. + /// Updates the stats to reflect the enqueueing of the given request in the + /// given context. fn on_push_request(&mut self, request: &Request, context: Context) { // If pushing a guaranteed response request, make a memory reservation. if request.deadline == NO_DEADLINE { @@ -1741,7 +1885,7 @@ impl QueueStats { } } - /// Updates the stats to reflect the enqueuing of the given response in the + /// Updates the stats to reflect the enqueueing of the given response in the /// given context. fn on_push_response(&mut self, response: &Response, context: Context) { // If pushing a guaranteed response, consume a memory reservation. @@ -1775,7 +1919,7 @@ impl QueueStats { } /// Checks whether `available_memory` for guaranteed response messages is -/// sufficient to allow enqueuing `msg` into an input or output queue. +/// sufficient to allow enqueueing `msg` into an input or output queue. /// /// Returns: /// * `Ok(())` if `msg` is a best-effort message, as best-effort messages don't @@ -1816,7 +1960,7 @@ pub fn memory_required_to_push_request(req: &Request) -> usize { pub mod testing { use super::input_schedule::testing::InputScheduleTesting; - use super::CanisterQueues; + use super::{CanisterQueues, MessageStore}; use crate::{InputQueueType, StateError}; use ic_types::messages::{Request, RequestOrResponse}; use ic_types::{CanisterId, Time}; @@ -1857,7 +2001,7 @@ pub mod testing { fn output_queue_iter_for_testing( &self, canister_id: &CanisterId, - ) -> Option>; + ) -> Option>; } impl CanisterQueuesTesting for CanisterQueues { @@ -1867,7 +2011,7 @@ pub mod testing { fn pop_canister_output(&mut self, dst_canister: &CanisterId) -> Option { let queue = &mut self.canister_queues.get_mut(dst_canister).unwrap().1; - super::output_queue_pop_and_advance(queue, &mut self.pool) + self.store.queue_pop_and_advance(queue) } fn output_queues_len(&self) -> usize { @@ -1875,7 +2019,7 @@ pub mod testing { } fn output_message_count(&self) -> usize { - self.pool.message_stats().outbound_message_count + self.message_stats().outbound_message_count } fn push_input( @@ -1897,13 +2041,14 @@ pub mod testing { fn output_queue_iter_for_testing( &self, canister_id: &CanisterId, - ) -> Option> { + ) -> Option> { self.canister_queues .get(canister_id) .map(|(_, output_queue)| { output_queue .iter() - .filter_map(|&reference| self.pool.get(reference).cloned()) + .filter(|&reference| !self.store.is_stale(*reference)) + .map(|&reference| self.store.get(reference)) }) } } diff --git a/rs/replicated_state/src/canister_state/queues/input_schedule.rs b/rs/replicated_state/src/canister_state/queues/input_schedule.rs index babb3c9f04b..97ad10fd93d 100644 --- a/rs/replicated_state/src/canister_state/queues/input_schedule.rs +++ b/rs/replicated_state/src/canister_state/queues/input_schedule.rs @@ -1,4 +1,4 @@ -use super::queue::CanisterQueue; +use super::queue::InputQueue; use super::CanisterQueues; use crate::{InputQueueType, InputSource}; use ic_protobuf::proxy::ProxyDecodeError; @@ -174,7 +174,7 @@ impl InputSchedule { /// Time complexity: `O(n * log(n))`. pub(super) fn test_invariants<'a>( &self, - input_queues: impl Iterator, + input_queues: impl Iterator, input_queue_type_fn: &dyn Fn(&CanisterId) -> InputQueueType, ) -> Result<(), String> { let mut local_schedule: BTreeSet<_> = self.local_sender_schedule.iter().collect(); diff --git a/rs/replicated_state/src/canister_state/queues/input_schedule/tests.rs b/rs/replicated_state/src/canister_state/queues/input_schedule/tests.rs index 14b03142f99..b1fc269ea0b 100644 --- a/rs/replicated_state/src/canister_state/queues/input_schedule/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/input_schedule/tests.rs @@ -192,14 +192,12 @@ fn test_reschedule_in_local_while_in_remote() { #[test] fn test_invariants() { // Generates input queues with the given sizes for the given canisters. - fn input_queues_for_test( - queue_sizes: Vec<(CanisterId, u8)>, - ) -> Vec<(CanisterId, CanisterQueue)> { + fn input_queues_for_test(queue_sizes: Vec<(CanisterId, u8)>) -> Vec<(CanisterId, InputQueue)> { let mut pool = MessagePool::default(); queue_sizes .into_iter() .map(|(canister_id, size)| { - let mut queue = CanisterQueue::new(500); + let mut queue = InputQueue::new(500); for _ in 0..size { let id = pool.insert_inbound(RequestBuilder::default().build().into()); queue.push_request(id); diff --git a/rs/replicated_state/src/canister_state/queues/message_pool.rs b/rs/replicated_state/src/canister_state/queues/message_pool.rs index e7d31f0fa57..1f3962d260d 100644 --- a/rs/replicated_state/src/canister_state/queues/message_pool.rs +++ b/rs/replicated_state/src/canister_state/queues/message_pool.rs @@ -1,3 +1,4 @@ +use super::CanisterInput; use ic_protobuf::proxy::{try_from_option_field, ProxyDecodeError}; use ic_protobuf::state::queues::v1 as pb_queues; use ic_types::messages::{ @@ -8,6 +9,7 @@ use ic_types::{CountBytes, Time}; use ic_validate_eq::ValidateEq; use ic_validate_eq_derive::ValidateEq; use std::collections::{BTreeMap, BTreeSet}; +use std::marker::PhantomData; use std::ops::{AddAssign, SubAssign}; use std::sync::Arc; use std::time::Duration; @@ -80,8 +82,10 @@ impl From<&RequestOrResponse> for Class { /// A generated identifier for a message held in a `MessagePool` that also /// encodes the message kind (request or response), context (incoming or /// outgoing) and class (guaranteed response or best-effort). +/// +/// This is the key used internally by `MessagePool` to identify messages. #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)] -pub(super) struct Id(u64); +struct Id(u64); impl Id { /// Number of `Id` bits used as flags. @@ -90,11 +94,7 @@ impl Id { /// The minimum `Id` value, for use in e.g. `BTreeSet::split_off()` calls. const MIN: Self = Self(0); - fn new(kind: Kind, context: Context, class: Class, generator: u64) -> Self { - Self(kind as u64 | context as u64 | class as u64 | generator << Id::BITMASK_LEN) - } - - pub(super) fn kind(&self) -> Kind { + fn kind(&self) -> Kind { if self.0 & Kind::BIT == Kind::Request as u64 { Kind::Request } else { @@ -102,7 +102,7 @@ impl Id { } } - pub(super) fn context(&self) -> Context { + fn context(&self) -> Context { if self.0 & Context::BIT == Context::Inbound as u64 { Context::Inbound } else { @@ -110,7 +110,7 @@ impl Id { } } - pub(super) fn class(&self) -> Class { + fn class(&self) -> Class { if self.0 & Class::BIT == Class::GuaranteedResponse as u64 { Class::GuaranteedResponse } else { @@ -119,8 +119,124 @@ impl Id { } } -impl From<&Id> for pb_queues::canister_queue::QueueItem { - fn from(item: &Id) -> Self { +/// A typed reference -- inbound (`CanisterInput`) or outbound +/// (`RequestOrResponse`) -- to a message in the `MessagePool`. +#[derive(Debug)] +pub(super) struct Reference(u64, PhantomData); + +impl Reference +where + T: ToContext, +{ + /// Constructs a new `Reference` of the given `class` and `kind`. + fn new(class: Class, kind: Kind, generator: u64) -> Self { + Self( + T::context() as u64 | class as u64 | kind as u64 | generator << Id::BITMASK_LEN, + PhantomData, + ) + } +} + +impl Reference { + pub(super) fn kind(&self) -> Kind { + Id::from(self).kind() + } + + pub(super) fn context(&self) -> Context { + Id::from(self).context() + } + + pub(super) fn class(&self) -> Class { + Id::from(self).class() + } +} + +impl Clone for Reference { + fn clone(&self) -> Self { + *self + } +} + +// This and other traits must be explicitly implemented because +// `#[derive(Copy)]` generates something like `impl Copy for Reference +// where T: Copy`. And because neither `CanisterInput` nor `RequestOrResponse` +// are `Copy`, the attribute does nothing. +impl Copy for Reference {} + +impl PartialEq for Reference { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} + +impl Eq for Reference {} + +impl PartialOrd for Reference { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Reference { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.0.cmp(&other.0) + } +} + +impl From<&Reference> for Id { + fn from(reference: &Reference) -> Id { + Id(reference.0) + } +} + +impl From> for Id { + fn from(reference: Reference) -> Id { + Id(reference.0) + } +} + +/// A reference to an inbound message (returned as a `CanisterInput`). +pub(super) type InboundReference = Reference; + +/// A reference to an outbound message (returned as a `RequestOrResponse`). +pub(super) type OutboundReference = Reference; + +/// A means for queue item types to declare whether they're inbound or outbound. +pub(super) trait ToContext { + /// The context (inbound or outbound) of this queue item type. + fn context() -> Context; +} + +impl ToContext for CanisterInput { + fn context() -> Context { + Context::Inbound + } +} + +impl ToContext for RequestOrResponse { + fn context() -> Context { + Context::Outbound + } +} + +/// An enum that can hold either an inbound or an outbound reference. +#[derive(Eq, PartialEq, Ord, PartialOrd, Debug)] +pub(super) enum SomeReference { + Inbound(InboundReference), + Outbound(OutboundReference), +} + +impl From for SomeReference { + fn from(id: Id) -> SomeReference { + match id.context() { + Context::Inbound => SomeReference::Inbound(Reference(id.0, PhantomData)), + Context::Outbound => SomeReference::Outbound(Reference(id.0, PhantomData)), + } + } +} + +impl From<&Reference> for pb_queues::canister_queue::QueueItem { + fn from(item: &Reference) -> Self { use pb_queues::canister_queue::queue_item::R; Self { @@ -129,18 +245,47 @@ impl From<&Id> for pb_queues::canister_queue::QueueItem { } } -impl TryFrom for Id { +impl TryFrom for InboundReference { type Error = ProxyDecodeError; fn try_from(item: pb_queues::canister_queue::QueueItem) -> Result { match item.r { - Some(pb_queues::canister_queue::queue_item::R::Reference(id)) => Ok(Id(id)), + Some(pb_queues::canister_queue::queue_item::R::Reference(id)) => { + let reference = Reference(id, PhantomData); + if reference.context() == Context::Inbound { + Ok(reference) + } else { + Err(ProxyDecodeError::Other( + "Not an inbound reference".to_string(), + )) + } + } + None => Err(ProxyDecodeError::MissingField("QueueItem::r")), + } + } +} + +impl TryFrom for OutboundReference { + type Error = ProxyDecodeError; + fn try_from(item: pb_queues::canister_queue::QueueItem) -> Result { + match item.r { + Some(pb_queues::canister_queue::queue_item::R::Reference(id)) => { + let reference = Reference(id, PhantomData); + if reference.context() == Context::Outbound { + Ok(reference) + } else { + Err(ProxyDecodeError::Other( + "Not an outbound reference".to_string(), + )) + } + } None => Err(ProxyDecodeError::MissingField("QueueItem::r")), } } } /// Helper for encoding / decoding `pb_queues::canister_queues::CallbackReference`. -pub(super) struct CallbackReference(pub(super) Id, pub(super) CallbackId); +#[derive(Clone, Eq, PartialEq, Debug)] +pub(super) struct CallbackReference(pub(super) InboundReference, pub(super) CallbackId); impl From for pb_queues::canister_queues::CallbackReference { fn from(item: CallbackReference) -> Self { @@ -154,12 +299,12 @@ impl From for pb_queues::canister_queues::CallbackReference { impl TryFrom for CallbackReference { type Error = ProxyDecodeError; fn try_from(item: pb_queues::canister_queues::CallbackReference) -> Result { - let id = Id(item.id); - if id.context() == Context::Inbound - && id.class() == Class::BestEffort - && id.kind() == Kind::Response + let reference = Reference(item.id, PhantomData); + if reference.context() == Context::Inbound + && reference.class() == Class::BestEffort + && reference.kind() == Kind::Response { - Ok(CallbackReference(id, CallbackId::from(item.callback_id))) + Ok(CallbackReference(reference, item.callback_id.into())) } else { Err(ProxyDecodeError::Other( "Not an inbound best-effort response".to_string(), @@ -171,9 +316,11 @@ impl TryFrom for CallbackReferenc /// A pool of canister messages, guaranteed response and best effort, with /// built-in support for time-based expiration and load shedding. /// -/// Messages in the pool are identified by an `Id` generated by the pool. -/// The `Id` also encodes the message kind (request or response); and -/// context (inbound or outbound). +/// Messages in the pool are identified by a key (`Id`) generated by the pool. +/// The key also encodes the message kind (request or response); and context +/// (inbound or outbound). The public API however, uses exclusively typed +/// references (`Reference` for inbound references and +/// `Reference` for outbound references). /// /// Messages are added to the deadline queue based on their class (best-effort /// vs guaranteed response) and context: i.e. all best-effort messages except @@ -229,7 +376,7 @@ impl MessagePool { /// (best effort responses that already made it into an input queue should not /// expire). It is added to the load shedding queue if it is a best-effort /// message. - pub(super) fn insert_inbound(&mut self, msg: RequestOrResponse) -> Id { + pub(super) fn insert_inbound(&mut self, msg: RequestOrResponse) -> InboundReference { let actual_deadline = match &msg { RequestOrResponse::Request(request) => request.deadline, @@ -241,13 +388,17 @@ impl MessagePool { } /// Inserts an outbound request (one that is to be enqueued in an output queue) - /// into the pool. Returns the ID assigned to the request. + /// into the pool. Returns the reference assigned to the request. /// /// The request is always added to the deadline queue: if it is a best-effort /// request, with its explicit deadline; if it is a guaranteed response call /// request, with a deadline of `now + REQUEST_LIFETIME`. It is added to the /// load shedding queue iff it is a best-effort request. - pub(super) fn insert_outbound_request(&mut self, request: Arc, now: Time) -> Id { + pub(super) fn insert_outbound_request( + &mut self, + request: Arc, + now: Time, + ) -> OutboundReference { let actual_deadline = if request.deadline == NO_DEADLINE { // Guaranteed response call requests in canister output queues expire after // `REQUEST_LIFETIME`. @@ -265,11 +416,14 @@ impl MessagePool { } /// Inserts an outbound response (one that is to be enqueued in an output queue) - /// into the pool. Returns the ID assigned to the response. + /// into the pool. Returns the reference assigned to the response. /// /// The response is added to both the deadline queue and the load shedding queue /// iff it is a best-effort response. - pub(super) fn insert_outbound_response(&mut self, response: Arc) -> Id { + pub(super) fn insert_outbound_response( + &mut self, + response: Arc, + ) -> OutboundReference { let actual_deadline = response.deadline; self.insert_impl( RequestOrResponse::Response(response), @@ -278,23 +432,27 @@ impl MessagePool { ) } - /// Inserts the given message into the pool. Returns the ID assigned to the - /// message. + /// Inserts the given message into the pool. Returns the reference assigned to + /// the message. /// /// The message is recorded into the deadline queue with the provided /// `actual_deadline` iff it is non-zero (as opposed to the message's nominal /// deadline; this is so we can expire outgoing guaranteed response requests; /// and not expire incoming best-effort responses). It is recorded in the load /// shedding priority queue iff it is a best-effort message. - fn insert_impl( + fn insert_impl( &mut self, msg: RequestOrResponse, actual_deadline: CoarseTime, context: Context, - ) -> Id { + ) -> Reference + where + T: ToContext, + { let kind = Kind::from(&msg); let class = Class::from(&msg); - let id = self.next_message_id(kind, context, class); + let reference = self.next_reference(class, kind); + let id = reference.into(); let size_bytes = msg.count_bytes(); @@ -328,25 +486,29 @@ impl MessagePool { self.size_queue.insert((size_bytes, id)); } - id + reference } - /// Reserves and returns a new message ID. - fn next_message_id(&mut self, kind: Kind, context: Context, class: Class) -> Id { - let id = Id::new(kind, context, class, self.message_id_generator); + /// Reserves and returns a new message reference. + fn next_reference(&mut self, class: Class, kind: Kind) -> Reference + where + T: ToContext, + { + let reference = Reference::new(class, kind, self.message_id_generator); self.message_id_generator += 1; - id + reference } - /// Retrieves the message with the given `Id`. - pub(super) fn get(&self, id: Id) -> Option<&RequestOrResponse> { - self.messages.get(&id) + /// Retrieves the message with the given `Reference`. + pub(super) fn get(&self, reference: Reference) -> Option<&RequestOrResponse> { + self.messages.get(&reference.into()) } - /// Removes the message with the given `Id` from the pool. + /// Removes the message with the given `Reference` from the pool. /// /// Updates the stats; and the priority queues, where applicable. - pub(super) fn take(&mut self, id: Id) -> Option { + pub(super) fn take(&mut self, reference: Reference) -> Option { + let id = reference.into(); let msg = self.take_impl(id)?; self.remove_from_deadline_queue(id, &msg); @@ -356,7 +518,7 @@ impl MessagePool { Some(msg) } - /// Removes the message with the given `Id` from the pool. + /// Removes the message with the given `Reference` from the pool. /// /// Updates the stats, but not the priority queues. fn take_impl(&mut self, id: Id) -> Option { @@ -433,7 +595,7 @@ impl MessagePool { /// now`). Updates the stats; and the priority queues, where applicable. /// /// Time complexity per expired message: `O(log(self.len()))`. - pub(super) fn expire_messages(&mut self, now: Time) -> Vec<(Id, RequestOrResponse)> { + pub(super) fn expire_messages(&mut self, now: Time) -> Vec<(SomeReference, RequestOrResponse)> { if self.deadline_queue.is_empty() { // No messages with deadlines, bail out. return Vec::new(); @@ -456,7 +618,7 @@ impl MessagePool { let msg = self.take_impl(id).unwrap(); self.outbound_guaranteed_request_deadlines.remove(&id); self.remove_from_size_queue(id, &msg); - (id, msg) + (id.into(), msg) }) .collect(); @@ -468,7 +630,7 @@ impl MessagePool { /// Updates the stats; and the priority queues, where applicable. /// /// Time complexity: `O(log(self.len()))`. - pub(super) fn shed_largest_message(&mut self) -> Option<(Id, RequestOrResponse)> { + pub(super) fn shed_largest_message(&mut self) -> Option<(SomeReference, RequestOrResponse)> { if let Some((_, id)) = self.size_queue.pop_last() { debug_assert_eq!(Class::BestEffort, id.class()); @@ -476,7 +638,7 @@ impl MessagePool { self.remove_from_deadline_queue(id, &msg); debug_assert_eq!(Ok(()), self.check_invariants()); - return Some((id, msg)); + return Some((id.into(), msg)); } // Nothing to shed. @@ -490,8 +652,12 @@ impl MessagePool { /// Returns the implicitly assigned deadlines of enqueued outbound guaranteed /// response requests. - pub(super) fn outbound_guaranteed_request_deadlines(&self) -> &BTreeMap { - &self.outbound_guaranteed_request_deadlines + pub(super) fn outbound_guaranteed_request_deadline( + &self, + reference: Reference, + ) -> Option<&CoarseTime> { + self.outbound_guaranteed_request_deadlines + .get(&reference.into()) } /// Returns a reference to the pool's message stats. diff --git a/rs/replicated_state/src/canister_state/queues/message_pool/tests.rs b/rs/replicated_state/src/canister_state/queues/message_pool/tests.rs index 990529fdf9c..4efd4d07428 100644 --- a/rs/replicated_state/src/canister_state/queues/message_pool/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/message_pool/tests.rs @@ -1,4 +1,5 @@ use super::*; +use assert_matches::assert_matches; use ic_test_utilities_types::messages::{RequestBuilder, ResponseBuilder}; use ic_types::messages::{Payload, MAX_INTER_CANISTER_PAYLOAD_IN_BYTES_U64}; use ic_types::time::UNIX_EPOCH; @@ -15,35 +16,43 @@ fn test_insert() { let mut pool = MessagePool::default(); // Insert one message of each kind / class / context. - let id1 = pool.insert_inbound(request(NO_DEADLINE).into()); + let id1: Id = pool.insert_inbound(request(NO_DEADLINE).into()).into(); assert_eq!(Request, id1.kind()); assert_eq!(Inbound, id1.context()); assert_eq!(GuaranteedResponse, id1.class()); - let id2 = pool.insert_inbound(request(time(20)).into()); + let id2: Id = pool.insert_inbound(request(time(20)).into()).into(); assert_eq!(Request, id2.kind()); assert_eq!(Inbound, id2.context()); assert_eq!(BestEffort, id2.class()); - let id3 = pool.insert_inbound(response(NO_DEADLINE).into()); + let id3: Id = pool.insert_inbound(response(NO_DEADLINE).into()).into(); assert_eq!(Response, id3.kind()); assert_eq!(Inbound, id3.context()); assert_eq!(GuaranteedResponse, id3.class()); - let id4 = pool.insert_inbound(response(time(40)).into()); + let id4: Id = pool.insert_inbound(response(time(40)).into()).into(); assert_eq!(Response, id4.kind()); assert_eq!(Inbound, id4.context()); assert_eq!(BestEffort, id4.class()); - let id5 = pool.insert_outbound_request(request(NO_DEADLINE).into(), time(50).into()); + let id5: Id = pool + .insert_outbound_request(request(NO_DEADLINE).into(), time(50).into()) + .into(); assert_eq!(Request, id5.kind()); assert_eq!(Outbound, id5.context()); assert_eq!(GuaranteedResponse, id5.class()); - let id6 = pool.insert_outbound_request(request(time(60)).into(), time(65).into()); + let id6: Id = pool + .insert_outbound_request(request(time(60)).into(), time(65).into()) + .into(); assert_eq!(Request, id6.kind()); assert_eq!(Outbound, id6.context()); assert_eq!(BestEffort, id6.class()); - let id7 = pool.insert_outbound_response(response(NO_DEADLINE).into()); + let id7: Id = pool + .insert_outbound_response(response(NO_DEADLINE).into()) + .into(); assert_eq!(Response, id7.kind()); assert_eq!(Outbound, id7.context()); assert_eq!(GuaranteedResponse, id7.class()); - let id8 = pool.insert_outbound_response(response(time(80)).into()); + let id8: Id = pool + .insert_outbound_response(response(time(80)).into()) + .into(); assert_eq!(Response, id8.kind()); assert_eq!(Outbound, id8.context()); assert_eq!(BestEffort, id8.class()); @@ -119,8 +128,9 @@ fn test_get() { } // Also do a negative test. - let nonexistent_id = pool.next_message_id(Kind::Request, Context::Inbound, Class::BestEffort); - assert_eq!(None, pool.get(nonexistent_id)); + let nonexistent_reference: InboundReference = + pool.next_reference(Class::BestEffort, Kind::Request); + assert_eq!(None, pool.get(nonexistent_reference)); } #[test] @@ -132,36 +142,46 @@ fn test_take() { let request = request(deadline); let response = response(deadline); - // Insert the two messages. - let (request_id, response_id) = match context { - Context::Inbound => ( - pool.insert_inbound(request.clone().into()), - pool.insert_inbound(response.clone().into()), - ), - Context::Outbound => ( - pool.insert_outbound_request(request.clone().into(), time(14).into()), - pool.insert_outbound_response(response.clone().into()), - ), - }; - - let request: RequestOrResponse = request.into(); - let response: RequestOrResponse = response.into(); - - // Ensure that the messages are now in the pool. - assert_eq!(Some(&request), pool.get(request_id)); - assert_eq!(Some(&response), pool.get(response_id)); - - // Actually take the messages. - assert_eq!(Some(request), pool.take(request_id)); - assert_eq!(Some(response), pool.take(response_id)); - - // Messages are gone. - assert_eq!(None, pool.get(request_id)); - assert_eq!(None, pool.get(response_id)); - - // And cannot be taken out again. - assert_eq!(None, pool.take(request_id)); - assert_eq!(None, pool.take(response_id)); + match context { + Context::Inbound => { + let request_id = pool.insert_inbound(request.clone().into()); + let response_id = pool.insert_inbound(response.clone().into()); + test_take_impl(request_id, response_id, request, response, &mut pool); + } + Context::Outbound => { + let request_id = + pool.insert_outbound_request(request.clone().into(), time(14).into()); + let response_id = pool.insert_outbound_response(response.clone().into()); + test_take_impl(request_id, response_id, request, response, &mut pool); + } + } + + fn test_take_impl( + request_id: Reference, + response_id: Reference, + request: Request, + response: Response, + pool: &mut MessagePool, + ) { + let request: RequestOrResponse = request.into(); + let response: RequestOrResponse = response.into(); + + // Ensure that the messages are now in the pool. + assert_eq!(Some(&request), pool.get(request_id)); + assert_eq!(Some(&response), pool.get(response_id)); + + // Actually take the messages. + assert_eq!(Some(request), pool.take(request_id)); + assert_eq!(Some(response), pool.take(response_id)); + + // Messages are gone. + assert_eq!(None, pool.get(request_id)); + assert_eq!(None, pool.get(response_id)); + + // And cannot be taken out again. + assert_eq!(None, pool.take(request_id)); + assert_eq!(None, pool.take(response_id)); + } } } @@ -181,7 +201,7 @@ fn test_expiration() { let t41_plus_lifetime = Time::from(time(41)) + REQUEST_LIFETIME; let t_max = Time::from_nanos_since_unix_epoch(u64::MAX); let half_second = Duration::from_nanos(500_000_000); - let empty_vec = Vec::<(Id, RequestOrResponse)>::new(); + let empty_vec = Vec::<(SomeReference, RequestOrResponse)>::new(); let mut pool = MessagePool::default(); @@ -191,13 +211,17 @@ fn test_expiration() { // Insert one of each kind / class of message that expires. let msg1 = request(time(10)); - let id1 = pool.insert_inbound(msg1.clone().into()); + let ref1 = pool.insert_inbound(msg1.clone().into()); + let id1 = ref1.into(); let msg2 = request(time(20)); - let id2 = pool.insert_outbound_request(msg2.clone().into(), time(25).into()); + let ref2 = pool.insert_outbound_request(msg2.clone().into(), time(25).into()); + let id2 = ref2.into(); let msg3 = response(time(30)); - let id3 = pool.insert_outbound_response(msg3.clone().into()); + let ref3 = pool.insert_outbound_response(msg3.clone().into()); + let id3 = ref3.into(); let msg4 = request(NO_DEADLINE); - let id4 = pool.insert_outbound_request(msg4.clone().into(), time(40).into()); + let ref4 = pool.insert_outbound_request(msg4.clone().into(), time(40).into()); + let id4 = ref4.into(); // Sanity check. assert_eq!(4, pool.len()); @@ -227,10 +251,10 @@ fn test_expiration() { assert_eq!(empty_vec, pool.expire_messages(t10)); assert_eq!(empty_vec, pool.expire_messages(t10 + half_second)); // But (only) `msg1` expires at 11 seconds. - assert_eq!(vec![(id1, msg1.into())], pool.expire_messages(t11)); + assert_eq!(vec![(id1.into(), msg1.into())], pool.expire_messages(t11)); // Sanity check: `msg1` is now gone. - assert_eq!(None, pool.get(id1)); + assert_eq!(None, pool.get(ref1)); assert_eq!(3, pool.len()); // And there is nothing expiring at 11 seconds anymore. @@ -248,7 +272,7 @@ fn test_expiration() { assert!(pool.has_expired_deadlines(t21)); // Now pop it. - assert_eq!(Some(msg2.into()), pool.take(id2)); + assert_eq!(Some(msg2.into()), pool.take(ref2)); assert_eq!(2, pool.len()); // There is now no longer a message expiring at 21 seconds. @@ -268,7 +292,7 @@ fn test_expiration() { assert_eq!(empty_vec, pool.expire_messages(t30)); // But both remaining messages expire at `t41_plus_lifetime`. assert_eq!( - vec![(id3, msg3.into()), (id4, msg4.into())], + vec![(id3.into(), msg3.into()), (id4.into(), msg4.into())], pool.expire_messages(t41_plus_lifetime) ); @@ -311,30 +335,32 @@ fn test_shed_message() { // Insert one best-effort message of each kind / context. let msg1 = request_with_payload(1000, time(10)); - let id1 = pool.insert_inbound(msg1.clone().into()); + let ref1 = pool.insert_inbound(msg1.clone().into()); let msg2 = response_with_payload(4000, time(20)); - let id2 = pool.insert_inbound(msg2.clone().into()); + let ref2 = pool.insert_inbound(msg2.clone().into()); + let id2: Id = ref2.into(); let msg3 = request_with_payload(3000, time(30)); - let id3 = pool.insert_outbound_request(msg3.clone().into(), time(35).into()); + let ref3 = pool.insert_outbound_request(msg3.clone().into(), time(35).into()); let msg4 = response_with_payload(2000, time(40)); - let id4 = pool.insert_outbound_response(msg4.clone().into()); + let ref4 = pool.insert_outbound_response(msg4.clone().into()); + let id4: Id = ref4.into(); // Sanity check. assert_eq!(4, pool.len()); // Shed the largest message (`msg2`). - assert_eq!(Some((id2, msg2.into())), pool.shed_largest_message()); + assert_eq!(Some((id2.into(), msg2.into())), pool.shed_largest_message()); assert_eq!(3, pool.len()); // Pop the next largest message ('msg3`). - assert_eq!(Some(msg3.into()), pool.take(id3)); + assert_eq!(Some(msg3.into()), pool.take(ref3)); // Shedding will now produce `msg4`. - assert_eq!(Some((id4, msg4.into())), pool.shed_largest_message()); + assert_eq!(Some((id4.into(), msg4.into())), pool.shed_largest_message()); assert_eq!(1, pool.len()); // Pop the remaining message ('msg1`). - assert_eq!(Some(msg1.into()), pool.take(id1)); + assert_eq!(Some(msg1.into()), pool.take(ref1)); // Nothing left to shed. assert_eq!(None, pool.shed_largest_message()); @@ -364,14 +390,14 @@ fn test_equality() { let mut pool = MessagePool::default(); // Insert one message of each kind / class / context. - let id1 = pool.insert_inbound(request(NO_DEADLINE).into()); - let id2 = pool.insert_inbound(request_with_payload(2000, time(20)).into()); - let _id3 = pool.insert_inbound(response(NO_DEADLINE).into()); - let _id4 = pool.insert_inbound(response(time(40)).into()); - let _id5 = pool.insert_outbound_request(request(NO_DEADLINE).into(), time(50).into()); - let _id6 = pool.insert_outbound_request(request(time(60)).into(), time(65).into()); - let _id7 = pool.insert_outbound_response(response(NO_DEADLINE).into()); - let id8 = pool.insert_outbound_response(response(time(80)).into()); + let ref1 = pool.insert_inbound(request(NO_DEADLINE).into()); + let ref2 = pool.insert_inbound(request_with_payload(2000, time(20)).into()); + let _ref3 = pool.insert_inbound(response(NO_DEADLINE).into()); + let _ref4 = pool.insert_inbound(response(time(40)).into()); + let _ref5 = pool.insert_outbound_request(request(NO_DEADLINE).into(), time(50).into()); + let _ref6 = pool.insert_outbound_request(request(time(60)).into(), time(65).into()); + let _ref7 = pool.insert_outbound_response(response(NO_DEADLINE).into()); + let ref8 = pool.insert_outbound_response(response(time(80)).into()); // Make a clone. let mut other_pool = pool.clone(); @@ -380,14 +406,20 @@ fn test_equality() { assert_eq!(pool, other_pool); // Pop the same message from either pool. - assert!(pool.take(id1).is_some()); - assert!(other_pool.take(id1).is_some()); + assert!(pool.take(ref1).is_some()); + assert!(other_pool.take(ref1).is_some()); // The two pools should still be equal. assert_eq!(pool, other_pool); // Shed a message from either pool. - assert_eq!(id2, pool.shed_largest_message().unwrap().0); - assert_eq!(id2, other_pool.shed_largest_message().unwrap().0); + assert_eq!( + SomeReference::Inbound(ref2), + pool.shed_largest_message().unwrap().0 + ); + assert_eq!( + SomeReference::Inbound(ref2), + other_pool.shed_largest_message().unwrap().0 + ); // The two pools should still be equal. assert_eq!(pool, other_pool); @@ -399,13 +431,20 @@ fn test_equality() { // Expire a message from one pool (id8), take it from the other. assert_eq!(1, pool.expire_messages(time(81).into()).len()); - assert!(other_pool.take(id8).is_some()); + assert!(other_pool.take(ref8).is_some()); // The two pools should still be equal. assert_eq!(pool, other_pool); // Shed a message from one pool, take it from the other. - let id = pool.shed_largest_message().unwrap().0; - assert!(other_pool.take(id).is_some()); + let some_ref = pool.shed_largest_message().unwrap().0; + match some_ref { + SomeReference::Inbound(reference) => { + assert!(other_pool.take(reference).is_some()); + } + SomeReference::Outbound(reference) => { + assert!(other_pool.take(reference).is_some()); + } + } // The two pools should still be equal. assert_eq!(pool, other_pool); } @@ -456,19 +495,22 @@ fn test_message_id_sanity() { #[test] fn test_message_id_flags() { // Guaranteed inbound request. - let giq_id = Id::new( - Kind::Request, - Context::Inbound, + let giq_id = Id::from(InboundReference::new( Class::GuaranteedResponse, + Kind::Request, 13, - ); + )); assert_eq!(Kind::Request, giq_id.kind()); assert_eq!(Context::Inbound, giq_id.context()); assert_eq!(Class::GuaranteedResponse, giq_id.class()); assert_eq!(13, giq_id.0 >> Id::BITMASK_LEN); // Best-effort outbound response, same generator. - let bop_id = Id::new(Kind::Response, Context::Outbound, Class::BestEffort, 13); + let bop_id = Id::from(OutboundReference::new( + Class::BestEffort, + Kind::Response, + 13, + )); assert_eq!(Kind::Response, bop_id.kind()); assert_eq!(Context::Outbound, bop_id.context()); assert_eq!(Class::BestEffort, bop_id.class()); @@ -483,24 +525,139 @@ fn test_message_id_flags() { #[test] fn test_message_id_range() { - const REQUEST: Kind = Kind::Request; - const INBOUND: Context = Context::Inbound; - const GUARANTEED: Class = Class::GuaranteedResponse; + use Class::GuaranteedResponse; + use Kind::Request; - let id1 = Id::new(REQUEST, INBOUND, GUARANTEED, 0); + let id1 = Id::from(InboundReference::new(GuaranteedResponse, Request, 0)); assert_eq!(0, id1.0 >> Id::BITMASK_LEN); - let id2 = Id::new(REQUEST, INBOUND, GUARANTEED, 13); + let id2 = Id::from(InboundReference::new(GuaranteedResponse, Request, 13)); assert_eq!(13, id2.0 >> Id::BITMASK_LEN); // Maximum generator value that will be preserved const GENERATOR_MAX: u64 = u64::MAX >> Id::BITMASK_LEN; - let id3 = Id::new(REQUEST, INBOUND, GUARANTEED, GENERATOR_MAX); + let id3 = Id::from(InboundReference::new( + GuaranteedResponse, + Request, + GENERATOR_MAX, + )); assert_eq!(GENERATOR_MAX, id3.0 >> Id::BITMASK_LEN); // Larger generator values still work, their high bits are just ignored. - let id4 = Id::new(REQUEST, INBOUND, GUARANTEED, u64::MAX); - assert_eq!(GENERATOR_MAX, id4.0 >> Id::BITMASK_LEN); + let id4 = Id::from(InboundReference::new( + GuaranteedResponse, + Request, + GENERATOR_MAX + 3, + )); + assert_eq!(2, id4.0 >> Id::BITMASK_LEN); +} + +#[test] +fn test_id_from_reference_roundtrip() { + for kind in [Kind::Request, Kind::Response] { + for class in [Class::GuaranteedResponse, Class::BestEffort] { + // Inbound. + let reference = InboundReference::new(class, kind, 13); + let id = Id::from(reference); + assert_eq!(reference.0, id.0); + assert_eq!(id, reference.into()); + assert_eq!(SomeReference::Inbound(reference), SomeReference::from(id)); + + // Outbound. + let reference = OutboundReference::new(class, kind, 13); + let id = Id::from(reference); + assert_eq!(reference.0, id.0); + assert_eq!(id, reference.into()); + assert_eq!(SomeReference::Outbound(reference), SomeReference::from(id)); + } + } +} + +#[test] +fn test_reference_roundtrip_encode() { + fn queue_item(id: Id) -> pb_queues::canister_queue::QueueItem { + pb_queues::canister_queue::QueueItem { + r: Some(pb_queues::canister_queue::queue_item::R::Reference(id.0)), + } + } + + for kind in [Kind::Request, Kind::Response] { + for class in [Class::GuaranteedResponse, Class::BestEffort] { + // Inbound. + let id = Id::from(InboundReference::new(class, kind, 13)); + let item = queue_item(id); + // Can be converted to an `InboundReference`. + let reference = InboundReference::try_from(item).unwrap(); + assert_eq!(reference.0, id.0); + assert_eq!(id, reference.into()); + // Fails to convert to an `OutboundReference`. + assert_matches!( + OutboundReference::try_from(item), + Err(ProxyDecodeError::Other(msg)) if msg == "Not an outbound reference" + ); + // Roundtrip encode produces the same item. + assert_eq!(item, (&reference).into()); + + // Outbound. + let id = Id::from(OutboundReference::new(class, kind, 13)); + let item = queue_item(id); + // Fails to convert to an `InboundReference`. + assert_matches!( + InboundReference::try_from(item), + Err(ProxyDecodeError::Other(msg)) if msg == "Not an inbound reference" + ); + // Can be converted to an `OutboundReference`. + let reference = OutboundReference::try_from(item).unwrap(); + assert_eq!(reference.0, id.0); + assert_eq!(id, reference.into()); + // Roundtrip encode produces the same item. + assert_eq!(item, (&reference).into()); + } + } +} + +#[test] +fn test_callback_reference_roundtip_encode() { + let callback_reference = CallbackReference( + Reference::new(Class::BestEffort, Kind::Response, 13), + 42.into(), + ); + let encoded = pb_queues::canister_queues::CallbackReference::from(callback_reference.clone()); + + assert_eq!( + callback_reference, + CallbackReference::try_from(encoded).unwrap() + ); +} + +#[test] +fn test_decode_invalid_callback_reference() { + for kind in [Kind::Request, Kind::Response] { + for context in [Context::Inbound, Context::Outbound] { + for class in [Class::GuaranteedResponse, Class::BestEffort] { + if kind == Kind::Response + && context == Context::Inbound + && class == Class::BestEffort + { + // This would be a valid `CallbackReference`, skip it. + continue; + } + let id: Id = match context { + Context::Inbound => InboundReference::new(class, kind, 13).into(), + Context::Outbound => OutboundReference::new(class, kind, 13).into(), + }; + let invalid = pb_queues::canister_queues::CallbackReference { + id: id.0, + callback_id: 42, + }; + + assert_matches!( + CallbackReference::try_from(invalid), + Err(ProxyDecodeError::Other(msg)) if msg == "Not an inbound best-effort response" + ); + } + } + } } #[test] @@ -833,14 +990,14 @@ fn assert_exact_messages_in_queue(messages: BTreeSet, queue: &BTreeSet<(T assert_eq!(messages, queue.iter().map(|(_, id)| *id).collect()) } -/// Generates an `Id` for a best-effort inbound request. -pub(crate) fn new_request_message_id(generator: u64, class: Class) -> Id { - Id::new(Kind::Request, Context::Inbound, class, generator) +/// Generates an `InboundReference` for a request of the given class. +pub(crate) fn new_request_reference(generator: u64, class: Class) -> InboundReference { + Reference::new(class, Kind::Request, generator) } -/// Generates an `Id` for an inbound response. -pub(crate) fn new_response_message_id(generator: u64, class: Class) -> Id { - Id::new(Kind::Response, Context::Inbound, class, generator) +/// Generates an `InboundReference` for a response of the given class. +pub(crate) fn new_response_reference(generator: u64, class: Class) -> InboundReference { + Reference::new(class, Kind::Response, generator) } #[derive(PartialEq, Eq)] diff --git a/rs/replicated_state/src/canister_state/queues/queue.rs b/rs/replicated_state/src/canister_state/queues/queue.rs index 6e874ada432..2a2d4a45398 100644 --- a/rs/replicated_state/src/canister_state/queues/queue.rs +++ b/rs/replicated_state/src/canister_state/queues/queue.rs @@ -1,7 +1,8 @@ // TODO(MR-569) Remove when `CanisterQueues` has been updated to use this. #![allow(dead_code)] -use super::message_pool::{self, Context, Kind, MessagePool, REQUEST_LIFETIME}; +use super::message_pool::{Kind, MessagePool, Reference, REQUEST_LIFETIME}; +use super::CanisterInput; use crate::StateError; use ic_base_types::CanisterId; use ic_protobuf::proxy::ProxyDecodeError; @@ -13,17 +14,18 @@ use ic_validate_eq::ValidateEq; use ic_validate_eq_derive::ValidateEq; use std::collections::{BTreeMap, VecDeque}; use std::convert::{From, TryFrom, TryInto}; +use std::fmt::Debug; use std::mem::size_of; use std::sync::Arc; #[cfg(test)] mod tests; -/// A FIFO queue with equal but separate capacities for requests and responses, -/// ensuring full-duplex communication up to its capacity. +/// A typed FIFO queue with equal but separate capacities for requests and +/// responses, ensuring full-duplex communication up to its capacity. /// -/// The queue holds weak references into a `MessagePool`. The messages that -/// these references point to may expire or be shed, resulting in stale +/// The queue holds typed weak references into a `MessageStore`. The messages +/// that these references point to may expire or be shed, resulting in stale /// references that are not immediately removed from the queue. Which is why the /// queue stats track "request slots" and "response slots" instead of "requests" /// and "responses"; and `len()` returns the length of the queue, not the number @@ -43,7 +45,7 @@ mod tests; /// the queue; so we must additionally explicitly limit the number of slots used /// by requests to the queue capacity. #[derive(Clone, Eq, PartialEq, Debug)] -pub(crate) struct CanisterQueue { +pub(crate) struct CanisterQueue { /// A FIFO queue of request and response weak references into the pool. /// /// Since responses may be enqueued at arbitrary points in time, reserved slots @@ -68,7 +70,7 @@ pub(crate) struct CanisterQueue { /// as dangling references to begin with. They are to be handled as /// `SYS_UNKNOWN` reject responses ("timeout" if their deadline expired, /// "drop" otherwise). - queue: VecDeque, + queue: VecDeque>, /// Maximum number of requests; or responses + reserved slots; that can be held /// in the queue at any one time. @@ -88,9 +90,20 @@ pub(crate) struct CanisterQueue { /// * `response_slots >= queue.iter().filter(|r| r.kind() == Kind::Response).count()` /// * `response_slots <= capacity` response_slots: usize, + + /// The type of item referenced by the queue. + marker: std::marker::PhantomData, } -impl CanisterQueue { +/// An `InputQueue` is a `CanisterQueue` holding references to `CanisterInput` +/// items, i.e. either pooled messages or compact responses. +pub(super) type InputQueue = CanisterQueue; + +/// An `OutputQueue` is a `CanisterQueue` holding references to outbound +/// `RequestOrResponse` items. +pub(super) type OutputQueue = CanisterQueue; + +impl CanisterQueue { /// Creates a new `CanisterQueue` with the given capacity. pub(super) fn new(capacity: usize) -> Self { Self { @@ -98,6 +111,7 @@ impl CanisterQueue { capacity, request_slots: 0, response_slots: 0, + marker: std::marker::PhantomData, } } @@ -121,7 +135,7 @@ impl CanisterQueue { /// Enqueues a request. /// /// Panics if there is no available request slot. - pub(super) fn push_request(&mut self, reference: message_pool::Id) { + pub(super) fn push_request(&mut self, reference: Reference) { debug_assert!(reference.kind() == Kind::Request); assert!(self.request_slots < self.capacity); @@ -181,7 +195,7 @@ impl CanisterQueue { /// Enqueues a response into a reserved slot, consuming the slot. /// /// Panics if there is no reserved response slot. - pub(super) fn push_response(&mut self, reference: message_pool::Id) { + pub(super) fn push_response(&mut self, reference: Reference) { debug_assert!(reference.kind() == Kind::Response); self.check_has_reserved_response_slot() .expect("No reserved response slot"); @@ -191,7 +205,7 @@ impl CanisterQueue { } /// Pops a reference from the queue. Returns `None` if the queue is empty. - pub(super) fn pop(&mut self) -> Option { + pub(super) fn pop(&mut self) -> Option> { let reference = self.queue.pop_front()?; if reference.kind() == Kind::Response { @@ -207,7 +221,7 @@ impl CanisterQueue { } /// Returns the next reference in the queue; or `None` if the queue is empty. - pub(super) fn peek(&self) -> Option { + pub(super) fn peek(&self) -> Option> { self.queue.front().cloned() } @@ -229,7 +243,7 @@ impl CanisterQueue { /// Discards all references at the front of the queue for which the predicate /// holds. Stops when it encounters the first reference for which the predicate /// is false. - pub(super) fn pop_while(&mut self, predicate: impl Fn(message_pool::Id) -> bool) { + pub(super) fn pop_while(&mut self, predicate: impl Fn(Reference) -> bool) { while let Some(reference) = self.peek() { if !predicate(reference) { break; @@ -276,13 +290,13 @@ impl CanisterQueue { } /// Returns an iterator over the underlying references. - pub(super) fn iter(&self) -> impl Iterator { + pub(super) fn iter(&self) -> impl Iterator> { self.queue.iter() } } -impl From<&CanisterQueue> for pb_queues::CanisterQueue { - fn from(item: &CanisterQueue) -> Self { +impl From<&CanisterQueue> for pb_queues::CanisterQueue { + fn from(item: &CanisterQueue) -> Self { Self { queue: item.queue.iter().map(Into::into).collect(), capacity: item.capacity as u64, @@ -291,24 +305,19 @@ impl From<&CanisterQueue> for pb_queues::CanisterQueue { } } -impl TryFrom<(pb_queues::CanisterQueue, Context)> for CanisterQueue { +impl TryFrom for CanisterQueue +where + Reference: TryFrom, +{ type Error = ProxyDecodeError; - fn try_from((item, context): (pb_queues::CanisterQueue, Context)) -> Result { - let queue: VecDeque = item + fn try_from(item: pb_queues::CanisterQueue) -> Result { + let queue: VecDeque> = item .queue .into_iter() .map(|queue_item| match queue_item.r { Some(pb_queues::canister_queue::queue_item::R::Reference(_)) => { - let reference = message_pool::Id::try_from(queue_item)?; - if reference.context() != context { - return Err(ProxyDecodeError::Other(format!( - "CanisterQueue: {:?} message in {:?} queue", - reference.context(), - context - ))); - } - Ok(reference) + Ok(Reference::::try_from(queue_item)?) } None => Err(ProxyDecodeError::MissingField("CanisterQueue::queue::r")), }) @@ -323,6 +332,7 @@ impl TryFrom<(pb_queues::CanisterQueue, Context)> for CanisterQueue { capacity: super::DEFAULT_QUEUE_CAPACITY, request_slots, response_slots: item.response_slots as usize, + marker: std::marker::PhantomData, }; res.check_invariants() @@ -331,10 +341,10 @@ impl TryFrom<(pb_queues::CanisterQueue, Context)> for CanisterQueue { } } -impl TryFrom<(InputQueue, &mut MessagePool)> for CanisterQueue { +impl TryFrom<(OldInputQueue, &mut MessagePool)> for InputQueue { type Error = ProxyDecodeError; - fn try_from((iq, pool): (InputQueue, &mut MessagePool)) -> Result { + fn try_from((iq, pool): (OldInputQueue, &mut MessagePool)) -> Result { let mut queue = VecDeque::with_capacity(iq.len()); for msg in iq.queue.queue.into_iter() { let reference = pool.insert_inbound(msg); @@ -346,6 +356,7 @@ impl TryFrom<(InputQueue, &mut MessagePool)> for CanisterQueue { capacity: iq.queue.capacity, request_slots: iq.queue.num_request_slots, response_slots: iq.queue.num_response_slots, + marker: std::marker::PhantomData, }; queue .check_invariants() @@ -354,10 +365,10 @@ impl TryFrom<(InputQueue, &mut MessagePool)> for CanisterQueue { } } -impl TryFrom<(OutputQueue, &mut MessagePool)> for CanisterQueue { +impl TryFrom<(OldOutputQueue, &mut MessagePool)> for OutputQueue { type Error = ProxyDecodeError; - fn try_from((oq, pool): (OutputQueue, &mut MessagePool)) -> Result { + fn try_from((oq, pool): (OldOutputQueue, &mut MessagePool)) -> Result { let mut deadline_range_ends = oq.deadline_range_ends.iter(); let mut deadline_range_end = deadline_range_ends.next(); @@ -402,6 +413,7 @@ impl TryFrom<(OutputQueue, &mut MessagePool)> for CanisterQueue { capacity: oq.queue.capacity, request_slots: oq.queue.num_request_slots - none_entries, response_slots: oq.queue.num_response_slots, + marker: std::marker::PhantomData, }; queue .check_invariants() @@ -410,11 +422,11 @@ impl TryFrom<(OutputQueue, &mut MessagePool)> for CanisterQueue { } } -impl TryFrom<(&CanisterQueue, &MessagePool)> for InputQueue { +impl TryFrom<(&CanisterQueue, &MessagePool)> for OldInputQueue { type Error = ProxyDecodeError; - fn try_from((q, pool): (&CanisterQueue, &MessagePool)) -> Result { - let mut input_queue = InputQueue::new(q.capacity); + fn try_from((q, pool): (&CanisterQueue, &MessagePool)) -> Result { + let mut input_queue = OldInputQueue::new(q.capacity); for reference in q.iter() { let msg = pool.get(*reference).ok_or_else(|| { ProxyDecodeError::Other(format!( @@ -441,11 +453,11 @@ impl TryFrom<(&CanisterQueue, &MessagePool)> for InputQueue { } } -impl TryFrom<(&CanisterQueue, &MessagePool)> for OutputQueue { +impl TryFrom<(&CanisterQueue, &MessagePool)> for OldOutputQueue { type Error = ProxyDecodeError; - fn try_from((q, pool): (&CanisterQueue, &MessagePool)) -> Result { - let mut output_queue = OutputQueue::new(q.capacity); + fn try_from((q, pool): (&CanisterQueue, &MessagePool)) -> Result { + let mut output_queue = OldOutputQueue::new(q.capacity); let mut request_slots = 0; let mut response_slots = 0; for reference in q.iter() { @@ -467,8 +479,7 @@ impl TryFrom<(&CanisterQueue, &MessagePool)> for OutputQueue { match msg { RequestOrResponse::Request(req) => { let deadline = pool - .outbound_guaranteed_request_deadlines() - .get(reference) + .outbound_guaranteed_request_deadline(*reference) .cloned() .unwrap_or(req.deadline); // Safe to unwrap because we cannot exceed the queue capacity. @@ -502,7 +513,7 @@ impl TryFrom<(&CanisterQueue, &MessagePool)> for OutputQueue { /// either be a response or a request (including timed out requests). /// Since an item is either a request or a response, implementing /// `is_response()` is sufficient. -trait QueueItem { +trait OldQueueItem { /// Returns true if the queue item is a response. fn is_response(&self) -> bool; @@ -513,7 +524,7 @@ trait QueueItem { fn from_response(response: Arc) -> T; } -impl QueueItem for RequestOrResponse { +impl OldQueueItem for RequestOrResponse { fn is_response(&self) -> bool { matches!(*self, RequestOrResponse::Response(_)) } @@ -526,7 +537,7 @@ impl QueueItem for RequestOrResponse { } } -impl QueueItem> for Option { +impl OldQueueItem> for Option { fn is_response(&self) -> bool { matches!(*self, Some(RequestOrResponse::Response(_))) } @@ -554,7 +565,7 @@ impl QueueItem> for Option { /// slot, consuming the slot reservation. Attempting to push a response with no /// reserved slot available will produce an error. #[derive(Clone, Eq, PartialEq, Hash, Debug, ValidateEq)] -struct QueueWithReservation + std::clone::Clone + ValidateEq> { +struct QueueWithReservation + std::clone::Clone + ValidateEq> { /// A FIFO queue of all requests and responses. Since responses may be enqueued /// at arbitrary points in time, response reservations cannot be explicitly /// represented in `queue`. They only exist as the difference between @@ -570,7 +581,7 @@ struct QueueWithReservation + std::clone::Clone + ValidateEq> { num_response_slots: usize, } -impl + std::clone::Clone + ValidateEq> QueueWithReservation { +impl + std::clone::Clone + ValidateEq> QueueWithReservation { fn new(capacity: usize) -> Self { let queue = VecDeque::new(); @@ -621,7 +632,7 @@ impl + std::clone::Clone + ValidateEq> QueueWithReservation { if self.num_request_slots < self.capacity { self.num_request_slots += 1; self.queue - .push_back(>::from_request(request)); + .push_back(>::from_request(request)); debug_assert!(self.check_invariants()); Ok(()) } else { @@ -642,7 +653,7 @@ impl + std::clone::Clone + ValidateEq> QueueWithReservation { ) -> Result<(), (StateError, Arc)> { if self.reserved_slots() > 0 { self.queue - .push_back(>::from_response(response)); + .push_back(>::from_response(response)); debug_assert!(self.check_invariants()); Ok(()) } else { @@ -810,12 +821,12 @@ impl TryFrom for QueueWithReservation, } -impl InputQueue { +impl OldInputQueue { pub(super) fn new(capacity: usize) -> Self { Self { queue: QueueWithReservation::new(capacity), @@ -897,8 +908,8 @@ impl InputQueue { } } -impl From<&InputQueue> for pb_queues::InputOutputQueue { - fn from(q: &InputQueue) -> Self { +impl From<&OldInputQueue> for pb_queues::InputOutputQueue { + fn from(q: &OldInputQueue) -> Self { Self { queue: (&q.queue).into(), begin: 0, @@ -910,7 +921,7 @@ impl From<&InputQueue> for pb_queues::InputOutputQueue { } } -impl TryFrom for InputQueue { +impl TryFrom for OldInputQueue { type Error = ProxyDecodeError; fn try_from(q: pb_queues::InputOutputQueue) -> Result { @@ -938,7 +949,7 @@ impl TryFrom for InputQueue { /// front. This is ensured when a message is popped off the queue by also popping /// any subsequent `None` items. #[derive(Clone, Eq, PartialEq, Hash, Debug, ValidateEq)] -pub(crate) struct OutputQueue { +pub(crate) struct OldOutputQueue { #[validate_eq(CompareWithValidateEq)] queue: QueueWithReservation>, /// Queue begin index. @@ -961,7 +972,7 @@ pub(crate) struct OutputQueue { num_messages: usize, } -impl OutputQueue { +impl OldOutputQueue { pub(super) fn new(capacity: usize) -> Self { Self { queue: QueueWithReservation::new(capacity), @@ -1194,7 +1205,7 @@ impl OutputQueue { /// leaving `None` in their place and returning them one by one. pub(super) struct TimedOutRequestsIter<'a> { /// A mutable reference to the queue whose requests are to be timed out and returned. - q: &'a mut OutputQueue, + q: &'a mut OldOutputQueue, /// The time used to determine which requests should be considered timed out. /// This is compared to deadlines in q.deadline_range_ends. current_time: Time, @@ -1235,7 +1246,7 @@ impl<'a> Iterator for TimedOutRequestsIter<'a> { } } -impl std::iter::Iterator for OutputQueue { +impl std::iter::Iterator for OldOutputQueue { type Item = RequestOrResponse; fn next(&mut self) -> Option { @@ -1243,8 +1254,8 @@ impl std::iter::Iterator for OutputQueue { } } -impl From<&OutputQueue> for pb_queues::InputOutputQueue { - fn from(q: &OutputQueue) -> Self { +impl From<&OldOutputQueue> for pb_queues::InputOutputQueue { + fn from(q: &OldOutputQueue) -> Self { Self { queue: (&q.queue).into(), begin: q.begin as u64, @@ -1265,7 +1276,7 @@ impl From<&OutputQueue> for pb_queues::InputOutputQueue { } } -impl TryFrom for OutputQueue { +impl TryFrom for OldOutputQueue { type Error = ProxyDecodeError; fn try_from(q: pb_queues::InputOutputQueue) -> Result { diff --git a/rs/replicated_state/src/canister_state/queues/queue/tests.rs b/rs/replicated_state/src/canister_state/queues/queue/tests.rs index 85f0bec5bd4..18ae944a56d 100644 --- a/rs/replicated_state/src/canister_state/queues/queue/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/queue/tests.rs @@ -1,8 +1,7 @@ -use crate::canister_state::DEFAULT_QUEUE_CAPACITY; - use super::super::message_pool::tests::*; -use super::super::message_pool::Class; +use super::super::message_pool::{Class, InboundReference}; use super::*; +use crate::canister_state::DEFAULT_QUEUE_CAPACITY; use assert_matches::assert_matches; use ic_test_utilities_types::arbitrary; use ic_test_utilities_types::ids::{canister_test_id, message_test_id, user_test_id}; @@ -16,7 +15,7 @@ use std::time::Duration; #[test] fn canister_queue_constructor_test() { const CAPACITY: usize = 14; - let mut queue = CanisterQueue::new(CAPACITY); + let mut queue = InputQueue::new(CAPACITY); assert_eq!(0, queue.len()); assert!(!queue.has_used_slots()); @@ -34,9 +33,9 @@ fn canister_queue_constructor_test() { #[test] fn canister_queue_push_request_succeeds() { const CAPACITY: usize = 1; - let mut queue = CanisterQueue::new(CAPACITY); + let mut queue = InputQueue::new(CAPACITY); - let reference = new_request_message_id(13, Class::BestEffort); + let reference = new_request_reference(13, Class::BestEffort); queue.push_request(reference); assert_eq!(1, queue.len()); @@ -83,7 +82,7 @@ fn canister_queue_push_response_succeeds() { assert_eq!(Ok(()), queue.check_has_reserved_response_slot()); // Push response into reseerved slot. - let reference = new_response_message_id(13, GuaranteedResponse); + let reference = new_response_reference(13, GuaranteedResponse); queue.push_response(reference); assert_eq!(1, queue.len()); @@ -116,7 +115,7 @@ fn canister_queue_push_request_to_full_queue_fails() { const CAPACITY: usize = 2; let mut queue = CanisterQueue::new(CAPACITY); for i in 0..CAPACITY { - queue.push_request(new_request_message_id(i as u64, Class::BestEffort)); + queue.push_request(new_request_reference(i as u64, Class::BestEffort)); } assert_eq!(CAPACITY, queue.len()); @@ -130,7 +129,7 @@ fn canister_queue_push_request_to_full_queue_fails() { assert_eq!(0, queue.reserved_slots()); assert_eq!(Err(()), queue.check_has_reserved_response_slot()); - queue.push_request(new_request_message_id(13, Class::BestEffort)); + queue.push_request(new_request_reference(13, Class::BestEffort)); } /// Test that overfilling an output queue with slot reservations results in @@ -169,7 +168,7 @@ fn canister_queue_try_reserve_response_slot_in_full_queue_fails() { } else { GuaranteedResponse }; - queue.push_response(new_response_message_id(i as u64, class)); + queue.push_response(new_response_reference(i as u64, class)); } assert_eq!(2, queue.len()); @@ -193,11 +192,11 @@ fn canister_queue_try_reserve_response_slot_in_full_queue_fails() { fn canister_queue_full_duplex() { // First fill up the queue. const CAPACITY: usize = 2; - let mut queue = CanisterQueue::new(CAPACITY); + let mut queue = InputQueue::new(CAPACITY); for i in 0..CAPACITY as u64 { - queue.push_request(new_request_message_id(i * 2, Class::BestEffort)); + queue.push_request(new_request_reference(i * 2, Class::BestEffort)); queue.try_reserve_response_slot().unwrap(); - queue.push_response(new_response_message_id(i * 2 + 1, Class::BestEffort)); + queue.push_response(new_response_reference(i * 2 + 1, Class::BestEffort)); } assert_eq!(2 * CAPACITY, queue.len()); @@ -217,17 +216,17 @@ fn canister_queue_full_duplex() { #[test] #[should_panic(expected = "No reserved response slot")] fn canister_queue_push_without_reserved_slot_panics() { - let mut queue = CanisterQueue::new(10); - queue.push_response(new_response_message_id(13, Class::BestEffort)); + let mut queue = InputQueue::new(10); + queue.push_response(new_response_reference(13, Class::BestEffort)); } -/// Generator for an arbitrary message reference. -fn arbitrary_message_reference() -> impl Strategy + Clone { +/// Generator for an arbitrary inbound message reference. +fn arbitrary_message_reference() -> impl Strategy + Clone { prop_oneof![ - 1 => any::().prop_map(|gen| new_request_message_id(gen, Class::GuaranteedResponse)), - 1 => any::().prop_map(|gen| new_request_message_id(gen, Class::BestEffort)), - 1 => any::().prop_map(|gen| new_response_message_id(gen, Class::GuaranteedResponse)), - 1 => any::().prop_map(|gen| new_response_message_id(gen, Class::BestEffort)), + 1 => any::().prop_map(|gen| new_request_reference(gen, Class::GuaranteedResponse)), + 1 => any::().prop_map(|gen| new_request_reference(gen, Class::BestEffort)), + 1 => any::().prop_map(|gen| new_response_reference(gen, Class::GuaranteedResponse)), + 1 => any::().prop_map(|gen| new_response_reference(gen, Class::BestEffort)), ] } @@ -240,7 +239,7 @@ proptest! { ) ) { // Create a queue with large enough capacity. - let mut queue = CanisterQueue::new(20); + let mut queue = InputQueue::new(20); // Push all references onto the queue. for reference in references.iter() { @@ -297,7 +296,7 @@ proptest! { prop_assert_eq!(Ok(()), queue.check_invariants()); let encoded: pb_queues::CanisterQueue = (&queue).into(); - let decoded = (encoded, Context::Inbound).try_into().unwrap(); + let decoded = encoded.try_into().unwrap(); assert_eq!(queue, decoded); } @@ -305,42 +304,39 @@ proptest! { #[test] fn decode_inbound_message_in_output_queue_fails() { - // Queue with an inbound request. - let mut queue = CanisterQueue::new(DEFAULT_QUEUE_CAPACITY); - queue.push_request(new_request_message_id(13, Class::BestEffort)); + // Input queue with a request. + let mut queue = InputQueue::new(DEFAULT_QUEUE_CAPACITY); + queue.push_request(new_request_reference(13, Class::BestEffort)); let encoded: pb_queues::CanisterQueue = (&queue).into(); // Cannot be decoded as an output queue. assert_matches!( - CanisterQueue::try_from((encoded.clone(), Context::Outbound)), + OutputQueue::try_from(encoded.clone()), Err(ProxyDecodeError::Other(_)) ); // But can be decoded as an input queue. - assert_eq!(queue, (encoded, Context::Inbound).try_into().unwrap()); + assert_eq!(queue, encoded.try_into().unwrap()); } #[test] fn decode_with_invalid_response_slots_fails() { // Queue with two inbound responses. - let mut queue = CanisterQueue::new(DEFAULT_QUEUE_CAPACITY); + let mut queue = InputQueue::new(DEFAULT_QUEUE_CAPACITY); queue.try_reserve_response_slot().unwrap(); - queue.push_response(new_response_message_id(13, Class::BestEffort)); + queue.push_response(new_response_reference(13, Class::BestEffort)); queue.try_reserve_response_slot().unwrap(); - queue.push_response(new_response_message_id(14, Class::BestEffort)); + queue.push_response(new_response_reference(14, Class::BestEffort)); let encoded: pb_queues::CanisterQueue = (&queue).into(); // Can be decoded as is. - assert_eq!( - queue, - (encoded.clone(), Context::Inbound).try_into().unwrap() - ); + assert_eq!(queue, encoded.clone().try_into().unwrap()); // But fails to decode with a too low `response_slots` value. let mut too_few_response_slots = encoded.clone(); too_few_response_slots.response_slots = 1; assert_matches!( - CanisterQueue::try_from((too_few_response_slots.clone(), Context::Inbound)), + InputQueue::try_from(too_few_response_slots), Err(ProxyDecodeError::Other(_)) ); } @@ -363,7 +359,7 @@ fn canister_queue_try_from_input_queue() { // An `InputQueue` with a non-zero `begin`, a couple of requests, a response and // a reserved slot. - let mut input_queue = InputQueue::new(10); + let mut input_queue = OldInputQueue::new(10); input_queue.push(req1.clone().into()).unwrap(); input_queue.pop().unwrap(); input_queue.push(req2.clone().into()).unwrap(); @@ -382,7 +378,7 @@ fn canister_queue_try_from_input_queue() { expected_queue.try_reserve_response_slot().unwrap(); let mut pool = MessagePool::default(); - let queue: CanisterQueue = (input_queue, &mut pool).try_into().unwrap(); + let queue: InputQueue = (input_queue, &mut pool).try_into().unwrap(); assert_eq!((expected_pool, expected_queue), (pool, queue)); } @@ -405,7 +401,7 @@ fn canister_queue_try_from_output_queue() { // An `OutputQueue` with a non-zero `begin`, a response, a timed out request // (`None`), a couple of requests and a reserved slot. - let mut output_queue = OutputQueue::new(10); + let mut output_queue = OldOutputQueue::new(10); // Advance `begin`. output_queue.push_request(req1.clone().into(), d0).unwrap(); output_queue.pop().unwrap(); @@ -431,7 +427,7 @@ fn canister_queue_try_from_output_queue() { expected_queue.try_reserve_response_slot().unwrap(); let mut pool = MessagePool::default(); - let queue: CanisterQueue = (output_queue, &mut pool).try_into().unwrap(); + let queue: OutputQueue = (output_queue, &mut pool).try_into().unwrap(); assert_eq!((expected_pool, expected_queue), (pool, queue)); } @@ -457,14 +453,14 @@ fn input_queue_try_from_canister_queue() { queue.try_reserve_response_slot().unwrap(); // Expected `InputQueue`. - let mut expected_input_queue = InputQueue::new(10); + let mut expected_input_queue = OldInputQueue::new(10); expected_input_queue.push(req1.into()).unwrap(); expected_input_queue.push(req2.into()).unwrap(); expected_input_queue.reserve_slot().unwrap(); expected_input_queue.push(rep.into()).unwrap(); expected_input_queue.reserve_slot().unwrap(); - let input_queue: InputQueue = (&queue, &pool).try_into().unwrap(); + let input_queue: OldInputQueue = (&queue, &pool).try_into().unwrap(); assert_eq!(expected_input_queue, input_queue); } @@ -499,7 +495,7 @@ fn output_queue_try_from_canister_queue() { queue.try_reserve_response_slot().unwrap(); // Expected `OutputQueue`. The stale request and response are not preserved. - let mut expected_output_queue = OutputQueue::new(10); + let mut expected_output_queue = OldOutputQueue::new(10); expected_output_queue.push_request(req2.into(), d2).unwrap(); // `CanisterQueue` does not record when `req3` was pushed (at `t3`), so it // inherits `req2`'s deadline. @@ -508,7 +504,7 @@ fn output_queue_try_from_canister_queue() { expected_output_queue.push_response(rep.into()); expected_output_queue.reserve_slot().unwrap(); - let output_queue: OutputQueue = (&queue, &pool).try_into().unwrap(); + let output_queue: OldOutputQueue = (&queue, &pool).try_into().unwrap(); assert_eq!(expected_output_queue, output_queue); } @@ -516,7 +512,7 @@ fn output_queue_try_from_canister_queue() { #[test] fn input_queue_constructor_test() { let capacity: usize = 14; - let mut queue = InputQueue::new(capacity); + let mut queue = OldInputQueue::new(capacity); assert_eq!(queue.len(), 0); assert!(!queue.has_used_slots()); assert_eq!(queue.pop(), None); @@ -524,7 +520,7 @@ fn input_queue_constructor_test() { #[test] fn input_queue_with_message_is_not_empty() { - let mut input_queue = InputQueue::new(1); + let mut input_queue = OldInputQueue::new(1); input_queue .push(RequestBuilder::default().build().into()) @@ -535,7 +531,7 @@ fn input_queue_with_message_is_not_empty() { #[test] fn input_queue_with_reservation_is_not_empty() { - let mut input_queue = InputQueue::new(1); + let mut input_queue = OldInputQueue::new(1); input_queue.reserve_slot().unwrap(); assert_eq!(input_queue.len(), 0); @@ -546,7 +542,7 @@ fn input_queue_with_reservation_is_not_empty() { #[test] fn input_queue_pushed_messages_get_popped() { let capacity: usize = 4; - let mut input_queue = InputQueue::new(capacity); + let mut input_queue = OldInputQueue::new(capacity); let mut msg_queue = VecDeque::new(); for _ in 0..capacity { let req: RequestOrResponse = RequestBuilder::default().build().into(); @@ -565,7 +561,7 @@ fn input_queue_pushed_messages_get_popped() { #[test] fn input_queue_push_succeeds() { let capacity: usize = 1; - let mut input_queue = InputQueue::new(capacity); + let mut input_queue = OldInputQueue::new(capacity); // Push request. assert_eq!(input_queue.queue.available_request_slots(), 1); @@ -594,7 +590,7 @@ fn input_queue_push_succeeds() { fn input_queue_push_to_full_queue_fails() { // First fill up the queue. let capacity: usize = 2; - let mut input_queue = InputQueue::new(capacity); + let mut input_queue = OldInputQueue::new(capacity); for _ in 0..capacity { input_queue .push(RequestBuilder::default().build().into()) @@ -621,7 +617,7 @@ fn input_queue_push_to_full_queue_fails() { #[test] fn input_queue_push_response_without_reservation_fails() { - let mut queue = InputQueue::new(10); + let mut queue = OldInputQueue::new(10); queue .push(ResponseBuilder::default().build().into()) .unwrap_err(); @@ -629,7 +625,7 @@ fn input_queue_push_response_without_reservation_fails() { #[test] fn input_queue_decode_with_non_empty_deadlines_fails() { - let mut q = InputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); + let mut q = OldInputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); for _ in 0..2 { let _ = q.push(RequestOrResponse::Request( RequestBuilder::default().build().into(), @@ -642,19 +638,19 @@ fn input_queue_decode_with_non_empty_deadlines_fails() { deadline: 0, index: 0, }); - assert!(TryInto::::try_into(proto_queue).is_err()); + assert!(TryInto::::try_into(proto_queue).is_err()); } #[test] fn output_queue_constructor_test() { - let mut queue = OutputQueue::new(14); + let mut queue = OldOutputQueue::new(14); assert_eq!(queue.num_messages(), 0); assert_eq!(queue.pop(), None); } #[test] fn output_queue_with_message_is_not_empty() { - let mut queue = OutputQueue::new(14); + let mut queue = OldOutputQueue::new(14); queue .push_request(RequestBuilder::default().build().into(), UNIX_EPOCH) @@ -665,7 +661,7 @@ fn output_queue_with_message_is_not_empty() { #[test] fn output_queue_with_reservation_is_not_empty() { - let mut queue = OutputQueue::new(14); + let mut queue = OldOutputQueue::new(14); queue.reserve_slot().unwrap(); assert_eq!(queue.num_messages(), 0); @@ -676,7 +672,7 @@ fn output_queue_with_reservation_is_not_empty() { #[test] fn output_queue_push_request_succeeds() { let capacity: usize = 1; - let mut output_queue = OutputQueue::new(capacity); + let mut output_queue = OldOutputQueue::new(capacity); assert_eq!(output_queue.queue.available_request_slots(), 1); output_queue @@ -692,7 +688,7 @@ fn output_queue_push_request_succeeds() { #[test] fn output_queue_push_response_succeeds() { let capacity: usize = 1; - let mut output_queue = OutputQueue::new(capacity); + let mut output_queue = OldOutputQueue::new(capacity); assert_eq!(output_queue.queue.available_response_slots(), 1); output_queue.reserve_slot().unwrap(); @@ -710,7 +706,7 @@ fn output_queue_push_response_succeeds() { fn output_queue_push_to_full_queue_fails() { // First fill up the queue. let capacity: usize = 2; - let mut output_queue = OutputQueue::new(capacity); + let mut output_queue = OldOutputQueue::new(capacity); for _index in 0..capacity { output_queue .push_request(RequestBuilder::default().build().into(), UNIX_EPOCH) @@ -738,7 +734,7 @@ fn output_queue_push_to_full_queue_fails() { #[test] #[should_panic(expected = "called `Result::unwrap()` on an `Err` value")] fn output_push_without_reserved_slot_fails() { - let mut queue = OutputQueue::new(10); + let mut queue = OldOutputQueue::new(10); queue.push_response(ResponseBuilder::default().build().into()); } @@ -748,7 +744,7 @@ fn output_push_without_reserved_slot_fails() { /// the queue begin index, but `pop` does (by 1). #[test] fn output_queue_explicit_push_and_pop_test() { - let mut q = OutputQueue::new(100); + let mut q = OldOutputQueue::new(100); assert_eq!(0, q.num_messages()); let test_request = Arc::::from(RequestBuilder::default().build()); @@ -807,7 +803,7 @@ proptest! { ) }) ) { - let mut q = OutputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); + let mut q = OldOutputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); let mut expected_deadline_range_ends = VecDeque::<(Time, usize)>::new(); let mut index = 1; @@ -848,9 +844,9 @@ prop_compose! { proptest::collection::vec(any::(), 0..=num_msgs), ) }) - ) -> OutputQueue { + ) -> OldOutputQueue { - let mut q = OutputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); + let mut q = OldOutputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); q.begin = begin; // Boundaries of execution rounds. @@ -890,7 +886,7 @@ prop_compose! { prop_compose! { /// Generator for an arbitrary time in the interval [min_deadline - 5, max_deadline + 5] /// for an arbitrary `OutputQueue`. Returns 0 if there are no deadlines in the queue. - fn arb_time_for_output_queue_timeouts(q: &OutputQueue) ( + fn arb_time_for_output_queue_timeouts(q: &OldOutputQueue) ( time in { // Find time for timing out in [min_deadline-5, max_deadline+5]. if let (Some((min_deadline, _)), Some((max_deadline, _))) = @@ -915,7 +911,7 @@ prop_compose! { fn arb_output_queue() ( (time, num_pop, mut q) in arb_output_queue_no_timeout(5..=20) .prop_flat_map(|q| (arb_time_for_output_queue_timeouts(&q), 0..3_usize, Just(q))) - ) -> OutputQueue { + ) -> OldOutputQueue { q.time_out_requests(time).count(); q.check_invariants(); @@ -982,7 +978,7 @@ proptest! { /// Check whether a timed out request produces back pressure. #[test] fn output_queue_check_back_pressure_with_timed_out_requests() { - let mut q = OutputQueue::new(1); + let mut q = OldOutputQueue::new(1); q.reserve_slot().unwrap(); q.push_response(Arc::new(ResponseBuilder::default().build())); @@ -1011,7 +1007,7 @@ proptest! { q in arb_output_queue() ) { let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - let deserialized_q: OutputQueue = proto_queue.try_into().expect("bad conversion"); + let deserialized_q: OldOutputQueue = proto_queue.try_into().expect("bad conversion"); prop_assert_eq!(q, deserialized_q); } @@ -1019,8 +1015,8 @@ proptest! { /// Generates a simple `OutputQueue` holding a specified number of requests, /// each with a unique deadline. -fn generate_test_queue(num_requests: usize) -> OutputQueue { - let mut q = OutputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); +fn generate_test_queue(num_requests: usize) -> OldOutputQueue { + let mut q = OldOutputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); for t in 1..=num_requests { q.push_request( RequestBuilder::default().build().into(), @@ -1051,12 +1047,12 @@ fn output_queue_decode_with_deadlines_not_strictly_sorted_fails() { let deadline = q.deadline_range_ends[0].0; q.deadline_range_ends[0].0 = q.deadline_range_ends[1].0; let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - assert!(OutputQueue::try_from(proto_queue).is_err()); + assert!(OldOutputQueue::try_from(proto_queue).is_err()); // Check swapped deadline range ends cause error. q.deadline_range_ends[1].0 = deadline; let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - assert!(OutputQueue::try_from(proto_queue).is_err()); + assert!(OldOutputQueue::try_from(proto_queue).is_err()); } #[test] @@ -1067,12 +1063,12 @@ fn output_queue_decode_with_deadline_indices_not_strictly_sorted_fails() { let index = q.deadline_range_ends[0].1; q.deadline_range_ends[0].1 = q.deadline_range_ends[1].1; let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - assert!(OutputQueue::try_from(proto_queue).is_err()); + assert!(OldOutputQueue::try_from(proto_queue).is_err()); // Check swapped deadline range ends cause error. q.deadline_range_ends[1].1 = index; let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - assert!(OutputQueue::try_from(proto_queue).is_err()); + assert!(OldOutputQueue::try_from(proto_queue).is_err()); } #[test] @@ -1083,17 +1079,17 @@ fn output_queue_decode_with_deadlines_index_out_of_bounds_fails() { // Check deadline index before the queue causes error. q.deadline_range_ends[0].1 = 0; let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - assert!(OutputQueue::try_from(proto_queue).is_err()); + assert!(OldOutputQueue::try_from(proto_queue).is_err()); // Check deadline index after the queue causes error. q.deadline_range_ends[0].1 = 3; let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - assert!(OutputQueue::try_from(proto_queue).is_err()); + assert!(OldOutputQueue::try_from(proto_queue).is_err()); } #[test] fn output_queue_decode_with_none_head_fails() { - let mut q = OutputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); + let mut q = OldOutputQueue::new(super::super::DEFAULT_QUEUE_CAPACITY); for _ in 0..2 { q.push_request(RequestBuilder::default().build().into(), UNIX_EPOCH) .unwrap(); @@ -1102,7 +1098,7 @@ fn output_queue_decode_with_none_head_fails() { let proto_queue: pb_queues::InputOutputQueue = (&q).into(); assert!(matches!( - OutputQueue::try_from(proto_queue).err(), + OldOutputQueue::try_from(proto_queue).err(), Some(ProxyDecodeError::Other(_)) )); } @@ -1114,8 +1110,8 @@ fn output_queue_roundtrip_from_vec_deque( num_request_slots: usize, num_response_slots: usize, num_messages: usize, -) -> Result { - let q = OutputQueue { +) -> Result { + let q = OldOutputQueue { queue: QueueWithReservation::> { queue, capacity: super::super::DEFAULT_QUEUE_CAPACITY, @@ -1129,7 +1125,7 @@ fn output_queue_roundtrip_from_vec_deque( }; let proto_queue: pb_queues::InputOutputQueue = (&q).into(); - TryInto::::try_into(proto_queue) + TryInto::::try_into(proto_queue) } #[test] diff --git a/rs/replicated_state/src/canister_state/queues/tests.rs b/rs/replicated_state/src/canister_state/queues/tests.rs index 51ef6af58c7..b528eae58b5 100644 --- a/rs/replicated_state/src/canister_state/queues/tests.rs +++ b/rs/replicated_state/src/canister_state/queues/tests.rs @@ -1,6 +1,6 @@ use super::input_schedule::testing::InputScheduleTesting; use super::message_pool::{MessageStats, REQUEST_LIFETIME}; -use super::queue::{InputQueue, OutputQueue}; +use super::queue::{OldInputQueue, OldOutputQueue}; use super::testing::{new_canister_output_queues_for_test, CanisterQueuesTesting}; use super::*; use crate::{CanisterState, InputQueueType::*, SchedulerState, SystemState}; @@ -765,7 +765,7 @@ impl CanisterQueuesMultiFixture { } fn pool_is_empty(&self) -> bool { - self.queues.pool.len() == 0 + self.queues.store.is_empty() } } @@ -1036,7 +1036,7 @@ fn test_peek_input_round_robin() { assert_eq!(queues.pop_input().unwrap(), peeked_input); assert!(!queues.has_input()); - assert!(queues.pool.len() == 0); + assert!(queues.store.is_empty()); } #[test] @@ -1159,7 +1159,7 @@ fn test_peek_input_with_stale_references() { assert_eq!(expected, queues.pop_input().unwrap()); assert!(!queues.has_input()); - assert!(queues.pool.len() == 0); + assert!(queues.store.is_empty()); } #[test] @@ -1175,7 +1175,7 @@ fn test_pop_input_with_stale_references() { assert_eq!(expected, queues.pop_input().unwrap()); assert!(!queues.has_input()); - assert!(queues.pool.len() == 0); + assert!(queues.store.is_empty()); } #[test] @@ -1203,7 +1203,7 @@ fn test_skip_input_with_stale_references() { assert_eq!(request_3, queues.pop_input().unwrap()); assert!(!queues.has_input()); - assert!(queues.pool.len() == 0); + assert!(queues.store.is_empty()); } /// Produces a `CanisterQueues` with 3 local input queues and 3 remote input @@ -1283,7 +1283,7 @@ fn test_pop_input_with_empty_queue_in_input_schedule() { assert!(!queues.has_input()); assert_eq!(None, queues.pop_input()); - assert!(queues.pool.len() == 0); + assert!(queues.store.is_empty()); assert_eq!( Ok(()), queues.schedules_ok(&input_queue_type_from_local_canisters(vec![ @@ -1312,7 +1312,7 @@ fn test_pop_input_with_gced_queue_in_input_schedule() { assert!(!queues.has_input()); assert_eq!(None, queues.pop_input()); - assert!(queues.pool.len() == 0); + assert!(queues.store.is_empty()); assert_eq!(Ok(()), queues.schedules_ok(&|_| RemoteSubnet)); } @@ -1329,7 +1329,7 @@ fn test_peek_input_with_empty_queue_in_input_schedule() { assert_eq!(None, queues.peek_input()); assert_eq!(None, queues.pop_input()); - assert!(queues.pool.len() == 0); + assert!(queues.store.is_empty()); } #[test] @@ -1350,7 +1350,7 @@ fn test_peek_input_with_gced_queue_in_input_schedule() { assert_eq!(None, queues.peek_input()); assert_eq!(None, queues.pop_input()); - assert!(queues.pool.len() == 0); + assert!(queues.store.is_empty()); } #[test] @@ -1367,7 +1367,7 @@ fn test_skip_input_with_empty_queue_in_input_schedule() { assert_eq!(None, queues.peek_input()); assert_eq!(None, queues.pop_input()); - assert!(queues.pool.len() == 0); + assert!(queues.store.is_empty()); } #[test] @@ -1389,7 +1389,7 @@ fn test_skip_input_with_gced_queue_in_input_schedule() { assert_eq!(None, queues.peek_input()); assert_eq!(None, queues.pop_input()); - assert!(queues.pool.len() == 0); + assert!(queues.store.is_empty()); } #[test] @@ -1516,7 +1516,7 @@ fn test_output_into_iter() { } assert_eq!(0, queues.output_message_count()); - assert!(queues.pool.len() == 0); + assert!(queues.store.is_empty()); } #[test] @@ -1764,8 +1764,8 @@ fn encode_non_default_pool() { .unwrap(); queues.pop_canister_input(RemoteSubnet).unwrap(); // Sanity check that the pool is empty but not equal to the default. - assert_eq!(0, queues.pool.len()); - assert_ne!(MessagePool::default(), queues.pool); + assert!(queues.store.is_empty()); + assert_ne!(MessageStoreImpl::default(), queues.store); // And a roundtrip encode preserves the `CanisterQueues` unaltered. let encoded: pb_queues::CanisterQueues = (&queues).into(); @@ -1806,7 +1806,7 @@ fn decode_backward_compatibility() { // // An `InputQueue` with a request, a response and a reserved slot. - let mut iq1 = InputQueue::new(DEFAULT_QUEUE_CAPACITY); + let mut iq1 = OldInputQueue::new(DEFAULT_QUEUE_CAPACITY); iq1.push(req.clone().into()).unwrap(); iq1.reserve_slot().unwrap(); iq1.push(rep.clone().into()).unwrap(); @@ -1815,15 +1815,15 @@ fn decode_backward_compatibility() { // Expected input queue. let mut expected_iq1 = CanisterQueue::new(DEFAULT_QUEUE_CAPACITY); // Enqueue a request and a response. - expected_iq1.push_request(expected_queues.pool.insert_inbound(req.clone().into())); + expected_iq1.push_request(expected_queues.store.insert_inbound(req.clone().into())); expected_iq1.try_reserve_response_slot().unwrap(); - expected_iq1.push_response(expected_queues.pool.insert_inbound(rep.clone().into())); + expected_iq1.push_response(expected_queues.store.insert_inbound(rep.clone().into())); // Make an extra response reservation. expected_iq1.try_reserve_response_slot().unwrap(); // An output queue with a response, a timed out request, a non-timed out request // and a reserved slot. - let mut oq1 = OutputQueue::new(DEFAULT_QUEUE_CAPACITY); + let mut oq1 = OldOutputQueue::new(DEFAULT_QUEUE_CAPACITY); oq1.reserve_slot().unwrap(); oq1.push_response(rep.clone().into()); oq1.push_request(req.clone().into(), d1).unwrap(); @@ -1836,11 +1836,13 @@ fn decode_backward_compatibility() { expected_oq1.try_reserve_response_slot().unwrap(); expected_oq1.push_response( expected_queues + .store .pool .insert_outbound_response(rep.clone().into()), ); expected_oq1.push_request( expected_queues + .store .pool .insert_outbound_request(req.clone().into(), t2), ); @@ -1870,7 +1872,7 @@ fn decode_backward_compatibility() { // // Input queue with a reserved slot. - let mut iq2 = InputQueue::new(DEFAULT_QUEUE_CAPACITY); + let mut iq2 = OldInputQueue::new(DEFAULT_QUEUE_CAPACITY); iq2.reserve_slot().unwrap(); // Expected input queue. @@ -1878,7 +1880,7 @@ fn decode_backward_compatibility() { expected_iq2.try_reserve_response_slot().unwrap(); // Empty output queue. - let oq2 = OutputQueue::new(DEFAULT_QUEUE_CAPACITY); + let oq2 = OldOutputQueue::new(DEFAULT_QUEUE_CAPACITY); queues_proto.input_queues.push(pb_queues::QueueEntry { canister_id: Some(remote_canister.into()), @@ -1947,7 +1949,7 @@ fn canister_queues_proto_with_inbound_responses() -> pb_queues::CanisterQueues { assert!(queues.shed_largest_message(&canister_test_id(13), &BTreeMap::new())); assert_eq!( Some(&CallbackId::from(3)), - queues.shed_responses.values().next() + queues.store.shed_responses.values().next() ); // Sanity check: roundtrip encode succeeds. @@ -2030,7 +2032,7 @@ fn decode_with_both_response_and_shed_response_for_reference() { assert_matches!( CanisterQueues::try_from((encoded, &StrictMetrics as &dyn CheckpointLoadingMetrics)), - Err(ProxyDecodeError::Other(msg)) if msg.contains("CanisterQueues: Both response and shed response for reference Id(") + Err(ProxyDecodeError::Other(msg)) if msg.contains("CanisterQueues: Multiple responses for Reference(") ); } @@ -2163,7 +2165,7 @@ fn test_stats_best_effort() { let mut expected_queue_stats = QueueStats::default(); assert_eq!(expected_queue_stats, queues.queue_stats); - assert_eq!(&MessageStats::default(), queues.pool.message_stats()); + assert_eq!(&MessageStats::default(), queues.message_stats()); // Best-effort requests and best-effort responses, to be enqueued one each into // an input and an output queue. @@ -2224,7 +2226,7 @@ fn test_stats_best_effort() { inbound_guaranteed_response_count: 0, outbound_message_count: 2, }, - queues.pool.message_stats() + queues.message_stats() ); // Pop the incoming request and the outgoing response. @@ -2253,7 +2255,7 @@ fn test_stats_best_effort() { inbound_guaranteed_response_count: 0, outbound_message_count: 1, }, - queues.pool.message_stats() + queues.message_stats() ); // Time out the one message with a deadline of less than 20 (the outgoing @@ -2289,7 +2291,7 @@ fn test_stats_best_effort() { inbound_guaranteed_response_count: 0, outbound_message_count: 0, }, - queues.pool.message_stats() + queues.message_stats() ); // But the `CanisterQueues` getter methods know that there are two responses. assert_eq!(2, queues.input_queues_message_count()); @@ -2310,7 +2312,7 @@ fn test_stats_best_effort() { // No changes in slot and memory reservations. assert_eq!(expected_queue_stats, queues.queue_stats); // And we have all-zero message stats. - assert_eq!(&MessageStats::default(), queues.pool.message_stats()); + assert_eq!(&MessageStats::default(), queues.message_stats()); } #[test] @@ -2319,7 +2321,7 @@ fn test_stats_guaranteed_response() { let mut expected_queue_stats = QueueStats::default(); assert_eq!(expected_queue_stats, queues.queue_stats); - assert_eq!(&MessageStats::default(), queues.pool.message_stats()); + assert_eq!(&MessageStats::default(), queues.message_stats()); // Guaranteed response requests and guaranteed responses, to be enqueued one // each into an input and an output queue. @@ -2380,7 +2382,7 @@ fn test_stats_guaranteed_response() { inbound_guaranteed_response_count: 1, outbound_message_count: 2, }, - queues.pool.message_stats() + queues.message_stats() ); // Pop the incoming request and the outgoing response. @@ -2409,7 +2411,7 @@ fn test_stats_guaranteed_response() { inbound_guaranteed_response_count: 1, outbound_message_count: 1, }, - queues.pool.message_stats() + queues.message_stats() ); // Time out the one message that has an (implicit) deadline (the outgoing @@ -2437,7 +2439,7 @@ fn test_stats_guaranteed_response() { }; assert_eq!(expected_queue_stats, queues.queue_stats); // And we have all-zero message stats. - assert_eq!(&MessageStats::default(), queues.pool.message_stats()); + assert_eq!(&MessageStats::default(), queues.message_stats()); // Consume the output queue slot reservation. queues.push_output_response(response4_.clone().into()); @@ -2445,7 +2447,7 @@ fn test_stats_guaranteed_response() { // Default stats throughout. assert_eq!(QueueStats::default(), queues.queue_stats); - assert_eq!(&MessageStats::default(), queues.pool.message_stats()); + assert_eq!(&MessageStats::default(), queues.message_stats()); } #[test] @@ -2454,7 +2456,7 @@ fn test_stats_oversized_requests() { let mut expected_queue_stats = QueueStats::default(); assert_eq!(expected_queue_stats, queues.queue_stats); - assert_eq!(&MessageStats::default(), queues.pool.message_stats()); + assert_eq!(&MessageStats::default(), queues.message_stats()); // One oversized best-effort request and one oversized guaranteed response // request, to be enqueued into both an input and an output queue. @@ -2510,7 +2512,7 @@ fn test_stats_oversized_requests() { inbound_guaranteed_response_count: 0, outbound_message_count: 2, }, - queues.pool.message_stats() + queues.message_stats() ); // Pop the incoming best-effort request and the incoming guaranteed request. @@ -2539,7 +2541,7 @@ fn test_stats_oversized_requests() { inbound_guaranteed_response_count: 0, outbound_message_count: 2, }, - queues.pool.message_stats() + queues.message_stats() ); // Shed the outgoing best-effort request and time out the outgoing guaranteed one. @@ -2569,7 +2571,7 @@ fn test_stats_oversized_requests() { // No change in slot and memory reservations. assert_eq!(expected_queue_stats, queues.queue_stats); // But back to all-zero message stats. - assert_eq!(&MessageStats::default(), queues.pool.message_stats()); + assert_eq!(&MessageStats::default(), queues.message_stats()); } /// Simulates sending an outgoing request and receiving an incoming response, @@ -2697,7 +2699,7 @@ fn test_reject_subnet_output_request() { // And after popping it, there are no messages or reserved slots left. queues.garbage_collect(); assert!(queues.canister_queues.is_empty()); - assert!(queues.pool.len() == 0); + assert!(queues.store.is_empty()); } #[test] @@ -2786,7 +2788,7 @@ fn test_output_queues_for_each() { // No output left. assert!(!queues.has_output()); // And the pool is also empty. - assert!(queues.pool.len() == 0); + assert!(queues.store.is_empty()); } #[test] @@ -2823,22 +2825,22 @@ fn test_peek_output_with_stale_references() { assert!(queues.has_output()); // One message to canister 1. - let peeked = requests.get(2).unwrap().clone().into(); - assert_eq!(Some(&peeked), queues.peek_output(&canister1)); - assert_eq!(Some(peeked), queues.pop_canister_output(&canister1)); + let request2: RequestOrResponse = requests.get(2).unwrap().clone().into(); + assert_eq!(Some(&request2), queues.peek_output(&canister1)); + assert_eq!(Some(request2), queues.pop_canister_output(&canister1)); assert_eq!(None, queues.peek_output(&canister1)); // No message to canister 2. assert_eq!(None, queues.peek_output(&canister2)); // One message to canister 3. - let peeked = requests.get(3).unwrap().clone().into(); - assert_eq!(Some(&peeked), queues.peek_output(&canister3)); - assert_eq!(Some(peeked), queues.pop_canister_output(&canister3)); + let request3: RequestOrResponse = requests.get(3).unwrap().clone().into(); + assert_eq!(Some(&request3), queues.peek_output(&canister3)); + assert_eq!(Some(request3), queues.pop_canister_output(&canister3)); assert_eq!(None, queues.peek_output(&canister3)); assert!(!queues.has_output()); - assert!(queues.pool.len() == 2); + assert!(queues.store.pool.len() == 2); } // Must be duplicated here, because the `ic_test_utilities` one pulls in the @@ -2877,7 +2879,7 @@ fn output_into_iter_peek_and_next_consistent( prop_assert_eq!(output_iter.next(), None); prop_assert_eq!(raw_requests.len(), popped); - prop_assert!(canister_queues.pool.len() == 0); + prop_assert!(canister_queues.store.is_empty()); } #[test_strategy::proptest] @@ -2949,7 +2951,7 @@ fn output_into_iter_leaves_non_consumed_messages_untouched( // Ensure that there are no messages left in the canister queues. prop_assert_eq!(canister_queues.output_message_count(), 0); // And the pool is empty. - prop_assert!(canister_queues.pool.len() == 0); + prop_assert!(canister_queues.store.is_empty()); } #[test_strategy::proptest] @@ -3004,7 +3006,7 @@ fn output_into_iter_with_exclude_leaves_excluded_queues_untouched( // Ensure that there are no messages left in the canister queues. prop_assert_eq!(canister_queues.output_message_count(), 0); // And the pool is empty. - prop_assert!(canister_queues.pool.len() == 0); + prop_assert!(canister_queues.store.is_empty()); } #[test_strategy::proptest] @@ -3248,7 +3250,7 @@ fn time_out_messages_pushes_correct_reject_responses() { // Check that each canister has one request timed out in the output queue and one // reject response in the corresponding input queue. assert_eq!(1, canister_queues.queue_stats.input_queues_reserved_slots); - let message_stats = canister_queues.pool.message_stats(); + let message_stats = canister_queues.message_stats(); assert_eq!(3, message_stats.inbound_message_count); assert_eq!(2, message_stats.inbound_guaranteed_response_count); assert_eq!(1, message_stats.outbound_message_count); @@ -3264,9 +3266,9 @@ fn time_out_messages_pushes_correct_reject_responses() { .0; assert_eq!(1, input_queue_from_canister.len()); let reference = input_queue_from_canister.peek().unwrap(); - let reject_response = canister_queues.pool.get(reference).unwrap(); + let reject_response = canister_queues.store.get(reference); assert_eq!( - RequestOrResponse::from(Response { + CanisterInput::from(RequestOrResponse::from(Response { originator: own_canister_id, respondent: from_canister, originator_reply_callback: CallbackId::from(callback_id), @@ -3277,8 +3279,8 @@ fn time_out_messages_pushes_correct_reject_responses() { MR_SYNTHETIC_REJECT_MESSAGE_MAX_LEN )), deadline, - }), - *reject_response, + })), + reject_response, ); }; check_reject_response(own_canister_id, 0, NO_DEADLINE); @@ -3303,7 +3305,7 @@ fn time_out_messages_pushes_correct_reject_responses() { // Zero input queue reserved slots, 4 inbound responses, assert_eq!(0, canister_queues.queue_stats.input_queues_reserved_slots); - let message_stats = canister_queues.pool.message_stats(); + let message_stats = canister_queues.message_stats(); assert_eq!(4, message_stats.inbound_message_count); assert_eq!(3, message_stats.inbound_guaranteed_response_count); assert_eq!(0, message_stats.outbound_message_count);