Skip to content

Commit

Permalink
Rewrite collator assignment to support parathreads (#385)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmpolaczyk authored Jan 22, 2024
1 parent f135910 commit 496d044
Show file tree
Hide file tree
Showing 17 changed files with 1,433 additions and 371 deletions.
436 changes: 436 additions & 0 deletions pallets/collator-assignment/src/assignment.rs

Large diffs are not rendered by default.

199 changes: 60 additions & 139 deletions pallets/collator-assignment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@

pub use pallet::*;
use {
crate::weights::WeightInfo,
crate::{
assignment::{Assignment, ChainNumCollators},
weights::WeightInfo,
},
dp_collator_assignment::AssignedCollators,
frame_support::pallet_prelude::*,
frame_system::pallet_prelude::BlockNumberFor,
Expand All @@ -60,6 +63,7 @@ use {
},
};

mod assignment;
#[cfg(feature = "runtime-benchmarks")]
mod benchmarking;
pub mod weights;
Expand Down Expand Up @@ -164,6 +168,7 @@ pub mod pallet {
// We get the containerChains that we will have at the target session
let mut container_chain_ids =
T::ContainerChains::session_container_chains(target_session_index);
let mut parathreads = T::ContainerChains::session_parathreads(target_session_index);
let num_total_registered_paras = container_chain_ids.len() as u32;
// Remove the containerChains that do not have enough credits for block production
T::RemoveParaIdsWithNoCredits::remove_para_ids_with_no_credits(
Expand All @@ -176,11 +181,47 @@ pub mod pallet {
if random_seed != [0; 32] {
let mut rng: ChaCha20Rng = SeedableRng::from_seed(random_seed);
collators.shuffle(&mut rng);
// TODO: in the future, instead of shuffling the list of para ids, we need to use the priority fee to
// determine priority
container_chain_ids.shuffle(&mut rng);
parathreads.shuffle(&mut rng);
}

// We read current assigned collators
let old_assigned = Self::read_assigned_collators();
let orchestrator_chain = ChainNumCollators {
para_id: T::SelfParaId::get(),
min_collators: T::HostConfiguration::min_collators_for_orchestrator(
target_session_index,
),
max_collators: T::HostConfiguration::max_collators_for_orchestrator(
target_session_index,
),
};
// Initialize list of chains as `[container1, container2, parathread1, parathread2]`.
// The order means priority: the first chain in the list will be the first one to get assigned collators.
// Chains will not be assigned less than `min_collators`, except the orchestrator chain.
// First all chains will be assigned `min_collators`, and then the first one will be assigned up to `max`,
// then the second one, and so on.
let mut chains = vec![];
let collators_per_container =
T::HostConfiguration::collators_per_container(target_session_index);
for para_id in &container_chain_ids {
chains.push(ChainNumCollators {
para_id: *para_id,
min_collators: collators_per_container,
max_collators: collators_per_container,
});
}
let collators_per_parathread =
T::HostConfiguration::collators_per_parathread(target_session_index);
for para_id in &parathreads {
chains.push(ChainNumCollators {
para_id: *para_id,
min_collators: collators_per_parathread,
max_collators: collators_per_parathread,
});
}
// We assign new collators
// we use the config scheduled at the target_session_index
let new_assigned =
Expand All @@ -197,15 +238,10 @@ pub mod pallet {
target_session: target_session_index,
});

Self::assign_collators_rotate_all(
Assignment::<T>::assign_collators_rotate_all(
collators,
&container_chain_ids,
T::HostConfiguration::min_collators_for_orchestrator(target_session_index)
as usize,
T::HostConfiguration::max_collators_for_orchestrator(target_session_index)
as usize,
T::HostConfiguration::collators_per_container(target_session_index)
as usize,
orchestrator_chain,
chains,
)
} else {
log::info!(
Expand All @@ -220,19 +256,26 @@ pub mod pallet {
target_session: target_session_index,
});

Self::assign_collators_always_keep_old(
Assignment::<T>::assign_collators_always_keep_old(
collators,
&container_chain_ids,
T::HostConfiguration::min_collators_for_orchestrator(target_session_index)
as usize,
T::HostConfiguration::max_collators_for_orchestrator(target_session_index)
as usize,
T::HostConfiguration::collators_per_container(target_session_index)
as usize,
orchestrator_chain,
chains,
old_assigned.clone(),
)
};

let new_assigned = match new_assigned {
Ok(x) => x,
Err(e) => {
log::error!(
"Error in collator assignment, will keep previous assignment. {:?}",
e
);

old_assigned.clone()
}
};

let mut pending = PendingCollatorContainerChain::<T>::get();
let old_assigned_changed = old_assigned != new_assigned;
let mut pending_changed = false;
Expand Down Expand Up @@ -267,128 +310,6 @@ pub mod pallet {
}
}

/// Recompute collator assignment from scratch. If the list of collators and the list of
/// container chains are shuffled, this returns a random assignment.
fn assign_collators_rotate_all(
collators: Vec<T::AccountId>,
container_chain_ids: &[ParaId],
min_num_orchestrator_chain: usize,
max_num_orchestrator_chain: usize,
num_each_container_chain: usize,
) -> AssignedCollators<T::AccountId> {
// This is just the "always_keep_old" algorithm but with an empty "old"
let old_assigned = Default::default();

Self::assign_collators_always_keep_old(
collators,
container_chain_ids,
min_num_orchestrator_chain,
max_num_orchestrator_chain,
num_each_container_chain,
old_assigned,
)
}

/// Assign new collators to missing container_chains.
/// Old collators always have preference to remain on the same chain.
/// If there are no missing collators, nothing is changed.
///
/// `container_chain_ids` should be shuffled or at least rotated on every session to ensure
/// a fair distribution, because the order of that list affects container chain priority:
/// the first container chain on that list will be the first one to get new collators.
fn assign_collators_always_keep_old(
collators: Vec<T::AccountId>,
container_chain_ids: &[ParaId],
min_num_orchestrator_chain: usize,
max_num_orchestrator_chain: usize,
num_each_container_chain: usize,
old_assigned: AssignedCollators<T::AccountId>,
) -> AssignedCollators<T::AccountId> {
// TODO: the performance of this function is sad, could be improved by having sets of
// old_collators and new_collators instead of doing array.contains() every time.
let mut new_assigned = old_assigned;
new_assigned.remove_collators_not_in_list(&collators);
new_assigned.remove_container_chains_not_in_list(container_chain_ids);
let extra_orchestrator_collators =
new_assigned.remove_orchestrator_chain_excess_collators(min_num_orchestrator_chain);
// Only need to do this if the config params change
new_assigned.remove_container_chain_excess_collators(num_each_container_chain);

// Collators that are not present in old_assigned
// This is used to keep track of which collators are old and which ones are new, to keep
// the old collators on the same chain if possible.
let mut new_collators = vec![];
for c in collators {
if !new_assigned.find_collator(&c) && !extra_orchestrator_collators.contains(&c) {
new_collators.push(c);
}
}

// Fill orchestrator chain collators up to min_num_orchestrator_chain
// Give priority to invulnerables
let num_missing_orchestrator_collators =
min_num_orchestrator_chain.saturating_sub(new_assigned.orchestrator_chain.len());
let invulnerables_for_orchestrator = T::RemoveInvulnerables::remove_invulnerables(
&mut new_collators,
num_missing_orchestrator_collators,
);
new_assigned.fill_orchestrator_chain_collators(
min_num_orchestrator_chain,
&mut invulnerables_for_orchestrator.into_iter(),
);
// If there are no enough invulnerables, or if the invulnerables are currently assigned to other chains,
// fill orchestrator chain with regular collators
let mut new_collators = new_collators.into_iter();
new_assigned
.fill_orchestrator_chain_collators(min_num_orchestrator_chain, &mut new_collators);

// Fill container chain collators using new collators and also the extra
// collators that were previously assigned to the orchestrator chain,
// but give preference to new collators
let mut extra_orchestrator_collators = extra_orchestrator_collators.into_iter();
let mut new_plus_extra_collators = new_collators
.by_ref()
.chain(&mut extra_orchestrator_collators);

new_assigned.add_and_fill_new_container_chains_in_order(
num_each_container_chain,
container_chain_ids,
&mut new_plus_extra_collators,
);

// Fill orchestrator chain collators back up to max_num_orchestrator_chain,
// but give preference to collators that were already there
let mut extra_collators_plus_new = extra_orchestrator_collators
.by_ref()
.chain(&mut new_collators);
new_assigned.fill_orchestrator_chain_collators(
max_num_orchestrator_chain,
&mut extra_collators_plus_new,
);

// Reorganize container chain collators to fill the maximum number of container
// chains. For example, if num_each_container_chain == 2 and the number of collators
// in each container chain is
// [1, 1, 1, 1, 1]
// Then we can convert that into
// [2, 2, 0, 0, 0]
// and assign 1 extra collator to the orchestrator chain, if needed.
let incomplete_container_chains_collators = new_assigned
.reorganize_incomplete_container_chains_collators(
container_chain_ids,
num_each_container_chain,
);

// Assign collators from container chains that do not reach
// "num_each_container_chain" to orchestrator chain
new_assigned.fill_orchestrator_chain_collators(
max_num_orchestrator_chain,
&mut incomplete_container_chains_collators.into_iter(),
);

new_assigned
}

// Returns the assigned collators as read from storage.
// If there is any item in PendingCollatorContainerChain, returns that element.
// Otherwise, reads and returns the current CollatorContainerChain
Expand Down
32 changes: 29 additions & 3 deletions pallets/collator-assignment/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use {
},
frame_system as system,
parity_scale_codec::{Decode, Encode},
sp_core::H256,
sp_core::{Get, H256},
sp_runtime::{
traits::{BlakeTwo256, IdentityLookup},
BuildStorage,
Expand Down Expand Up @@ -111,9 +111,13 @@ pub struct Mocks {
pub min_orchestrator_chain_collators: u32,
pub max_orchestrator_chain_collators: u32,
pub collators_per_container: u32,
pub collators_per_parathread: u32,
pub collators: Vec<u64>,
pub container_chains: Vec<u32>,
pub parathreads: Vec<u32>,
pub random_seed: [u8; 32],
// None means 5
pub full_rotation_period: Option<u32>,
}

impl mock_data::Config for Test {}
Expand All @@ -123,7 +127,7 @@ impl mock_data::Config for Test {}
pub struct HostConfigurationGetter;

parameter_types! {
pub const ParachainId: ParaId = ParaId::new(200);
pub const ParachainId: ParaId = ParaId::new(1000);
}

impl pallet_collator_assignment::GetHostConfiguration<u32> for HostConfigurationGetter {
Expand All @@ -138,6 +142,10 @@ impl pallet_collator_assignment::GetHostConfiguration<u32> for HostConfiguration
fn collators_per_container(_session_index: u32) -> u32 {
MockData::mock().collators_per_container
}

fn collators_per_parathread(_session_index: u32) -> u32 {
MockData::mock().collators_per_parathread
}
}

pub struct CollatorsGetter;
Expand All @@ -160,6 +168,15 @@ impl tp_traits::GetSessionContainerChains<u32> for ContainerChainsGetter {
.collect()
}

fn session_parathreads(_session_index: u32) -> Vec<ParaId> {
MockData::mock()
.parathreads
.iter()
.cloned()
.map(ParaId::from)
.collect()
}

#[cfg(feature = "runtime-benchmarks")]
fn set_session_container_chains(_session_index: u32, para_ids: &[ParaId]) {
MockData::mutate(|mocks| {
Expand All @@ -184,13 +201,22 @@ parameter_types! {
pub const CollatorRotationSessionPeriod: u32 = 5;
}

pub struct MockCollatorRotationSessionPeriod;

impl Get<u32> for MockCollatorRotationSessionPeriod {
fn get() -> u32 {
MockData::mock().full_rotation_period.unwrap_or(5)
}
}

impl pallet_collator_assignment::Config for Test {
type RuntimeEvent = RuntimeEvent;
type SessionIndex = u32;
type HostConfiguration = HostConfigurationGetter;
type ContainerChains = ContainerChainsGetter;
type SelfParaId = ParachainId;
type ShouldRotateAllCollators = RotateCollatorsEveryNSessions<CollatorRotationSessionPeriod>;
type ShouldRotateAllCollators =
RotateCollatorsEveryNSessions<MockCollatorRotationSessionPeriod>;
type GetRandomnessForNextBlock = MockGetRandomnessForNextBlock;
type RemoveInvulnerables = RemoveAccountIdsAbove100;
type RemoveParaIdsWithNoCredits = RemoveParaIdsAbove5000;
Expand Down
Loading

0 comments on commit 496d044

Please sign in to comment.