Skip to content

Commit

Permalink
refactor: [MR-603] Typed canister queues and references (#1697)
Browse files Browse the repository at this point in the history
[MR-603]: Assign type parameters to canister queues and references, to
designate them as either input/inbound or output/outbound.

Ensures that input queues can only hold inbound references; and output
queues can only hold outbound references. Implements separate logic (for
inbound and outbound references) for determining staleness, lookup and
removal.

[MR-603]:
https://dfinity.atlassian.net/browse/MR-603?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
  • Loading branch information
alin-at-dfinity authored Oct 1, 2024
1 parent e6ca551 commit 3221c59
Show file tree
Hide file tree
Showing 9 changed files with 1,169 additions and 694 deletions.
6 changes: 3 additions & 3 deletions rs/messaging/tests/queue_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl SubnetPairProxy {
&self.local_canister_id,
&self.remote_canister_id,
)
.map(|iter| iter.collect::<Vec<_>>())
.map(|iter| iter.cloned().collect::<Vec<_>>())
}

/// Generates a snapshot of the output queue on the remote canister and
Expand All @@ -189,7 +189,7 @@ impl SubnetPairProxy {
&self.remote_canister_id,
&self.local_canister_id,
)
.map(|iter| iter.collect::<Vec<_>>())
.map(|iter| iter.cloned().collect::<Vec<_>>())
}

/// Build backpressure on `local_env` until a minimum number of requests are found in the
Expand Down Expand Up @@ -272,7 +272,7 @@ fn get_output_queue_iter<'a>(
state: &'a ReplicatedState,
local_canister_id: &CanisterId,
remote_canister_id: &'a CanisterId,
) -> Option<impl Iterator<Item = RequestOrResponse> + 'a> {
) -> Option<impl Iterator<Item = &'a RequestOrResponse>> {
state
.canister_states
.get(local_canister_id)
Expand Down
847 changes: 496 additions & 351 deletions rs/replicated_state/src/canister_state/queues.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::queue::CanisterQueue;
use super::queue::InputQueue;
use super::CanisterQueues;
use crate::{InputQueueType, InputSource};
use ic_protobuf::proxy::ProxyDecodeError;
Expand Down Expand Up @@ -174,7 +174,7 @@ impl InputSchedule {
/// Time complexity: `O(n * log(n))`.
pub(super) fn test_invariants<'a>(
&self,
input_queues: impl Iterator<Item = (&'a CanisterId, &'a CanisterQueue)>,
input_queues: impl Iterator<Item = (&'a CanisterId, &'a InputQueue)>,
input_queue_type_fn: &dyn Fn(&CanisterId) -> InputQueueType,
) -> Result<(), String> {
let mut local_schedule: BTreeSet<_> = self.local_sender_schedule.iter().collect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,12 @@ fn test_reschedule_in_local_while_in_remote() {
#[test]
fn test_invariants() {
// Generates input queues with the given sizes for the given canisters.
fn input_queues_for_test(
queue_sizes: Vec<(CanisterId, u8)>,
) -> Vec<(CanisterId, CanisterQueue)> {
fn input_queues_for_test(queue_sizes: Vec<(CanisterId, u8)>) -> Vec<(CanisterId, InputQueue)> {
let mut pool = MessagePool::default();
queue_sizes
.into_iter()
.map(|(canister_id, size)| {
let mut queue = CanisterQueue::new(500);
let mut queue = InputQueue::new(500);
for _ in 0..size {
let id = pool.insert_inbound(RequestBuilder::default().build().into());
queue.push_request(id);
Expand Down
Loading

0 comments on commit 3221c59

Please sign in to comment.