From 4322a91823bc4ad6ce0a72ad0ce7194b8d5d1792 Mon Sep 17 00:00:00 2001 From: Nikita Masych <92444221+NikitaMasych@users.noreply.github.com> Date: Thu, 29 Aug 2024 14:48:14 +0300 Subject: [PATCH] feat: implementing configuration for ballots (#7) * feat: moved leader computation to party, added ballot number to seed * feat: add configuration of timebounds for events, async * feat: use hash-based leader election instead of rand * fix: resolved events sending * feat: added status checks to update_state * feat: add timeout handling for latency between parties * feat: add opportunity to configure prior to launch timeout * feat: changed logic for leader election * feat: refactor leader election components * Rewriting the hash_to_range function (#8) * rewriting the hash_to_range function to achieve better uniform distribution. Now it uses seeded random based on ChaCha12Rng instead of DefaultHash as before. Also, it fixes distribution in range by executing selection several times to achieve uniform distribution when range != 2^k * Adding comments * fix: resolved linter issues/failing tests --------- Co-authored-by: Nikita Masych --------- Co-authored-by: Oleg Fomenko <35123037+olegfomenko@users.noreply.github.com> --- .github/workflows/rust.yml | 1 + Cargo.toml | 4 +- src/party.rs | 677 +++++++++++++++++++++++++++---------- 3 files changed, 501 insertions(+), 181 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f8d9382..4e6ca37 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -4,6 +4,7 @@ on: pull_request: branches: - main + - epic/* push: branches: - main diff --git a/Cargo.toml b/Cargo.toml index 4383852..4d16495 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,8 +6,10 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -rand = "0.9.0-alpha.2" log = "0.4.22" rkyv = { version = "0.7.44", features = ["validation"]} serde = { version = "1.0.207", features = ["derive"] } bincode = "1.3.3" +tokio = { version = "1.39.2", features = ["full", "test-util"] } +rand = "0.9.0-alpha.2" +seeded-random = "0.6.0" \ No newline at end of file diff --git a/src/party.rs b/src/party.rs index 85f6b6f..c32ddd1 100644 --- a/src/party.rs +++ b/src/party.rs @@ -6,18 +6,18 @@ use crate::message::{ MessagePacket, MessageRouting, ProtocolMessage, }; use crate::{Value, ValueSelector}; -use rand::prelude::StdRng; -use rand::prelude::*; -use rand::Rng; use rkyv::{AlignedVec, Deserialize, Infallible}; -use std::cmp::PartialEq; +use seeded_random::{Random, Seed}; +use std::cmp::{Ordering, PartialEq}; use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::Entry::Vacant; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::time::{self, Duration}; /// BPCon configuration. Includes ballot time bounds and other stuff. +#[derive(PartialEq, Eq, Debug, Clone)] pub struct BPConConfig { /// Parties weights: `party_weights[i]` corresponds to the i-th party weight pub party_weights: Vec, @@ -25,62 +25,31 @@ pub struct BPConConfig { /// Threshold weight to define BFT quorum: should be > 2/3 of total weight pub threshold: u128, - /// Leader of the ballot, computed using seed obtained from config. - leader: u64, - // TODO: define other config fields. -} + /// Timeout before ballot is launched. + /// Differs from `launch1a_timeout` having another status and not listening + /// to external events and messages. + pub launch_timeout: Duration, -impl BPConConfig { - /// Create new config instance. - pub fn new(party_weights: Vec, threshold: u128) -> Self { - let mut cfg = Self { - party_weights, - threshold, - leader: 0, - }; - cfg.leader = cfg.compute_leader().unwrap(); + /// Timeout before 1a stage is launched. + pub launch1a_timeout: Duration, - cfg - } + /// Timeout before 1b stage is launched. + pub launch1b_timeout: Duration, - /// Compute leader in a weighed randomized manner. - /// Uses seed from the config, making it deterministic. - fn compute_leader(&self) -> Result { - let seed = self.compute_seed(); + /// Timeout before 2a stage is launched. + pub launch2a_timeout: Duration, - let total_weight: u64 = self.party_weights.iter().sum(); - if total_weight == 0 { - return Err(BallotError::LeaderElection("Zero weight sum".into())); - } - - // Use the seed from the config to create a deterministic random number generator. - let mut rng = StdRng::seed_from_u64(seed); + /// Timeout before 2av stage is launched. + pub launch2av_timeout: Duration, - let random_value: u64 = rng.gen_range(0..total_weight); + /// Timeout before 2b stage is launched. + pub launch2b_timeout: Duration, - let mut cumulative_weight = 0; - for (i, &weight) in self.party_weights.iter().enumerate() { - cumulative_weight += weight; - if random_value < cumulative_weight { - return Ok(i as u64); - } - } - Err(BallotError::LeaderElection("Election failed".into())) - } + /// Timeout before finalization stage is launched. + pub finalize_timeout: Duration, - /// Compute seed for randomized leader election. - fn compute_seed(&self) -> u64 { - let mut hasher = DefaultHasher::new(); - - // Hash each field that should contribute to the seed - self.party_weights.hash(&mut hasher); - self.threshold.hash(&mut hasher); - - // You can add more fields as needed - - // Generate the seed from the hash - hasher.finish() - } + /// Timeout for a graceful period to help parties with latency. + pub grace_period: Duration, } /// Party status defines the statuses of the ballot for the particular participant @@ -99,7 +68,7 @@ pub(crate) enum PartyStatus { } /// Party events is used for the ballot flow control. -#[derive(PartialEq)] +#[derive(PartialEq, Debug)] pub(crate) enum PartyEvent { Launch1a, Launch1b, @@ -110,6 +79,7 @@ pub(crate) enum PartyEvent { } /// A struct to keep track of senders and the cumulative weight of their messages. +#[derive(PartialEq, Debug)] struct MessageRoundState { senders: HashSet, weight: u128, @@ -142,6 +112,92 @@ impl MessageRoundState { } } +/// Trait incorporating logic for leader election. +pub trait LeaderElector>: Send { + /// Get leader for current ballot. + /// Returns id of the elected party or error. + fn get_leader(&self, party: &Party) -> Result; +} + +pub struct DefaultLeaderElector {} + +impl DefaultLeaderElector { + /// Compute seed for randomized leader election. + fn compute_seed>(party: &Party) -> u64 { + let mut hasher = DefaultHasher::new(); + + // Hash each field that should contribute to the seed + party.cfg.party_weights.hash(&mut hasher); + party.cfg.threshold.hash(&mut hasher); + party.ballot.hash(&mut hasher); + + // You can add more fields as needed + + // Generate the seed from the hash + hasher.finish() + } + + /// Hash the seed to a value within a given range. + fn hash_to_range(seed: u64, range: u64) -> u64 { + // Select the `k` suck that value 2^k >= `range` and 2^k is the smallest. + let mut k = 64; + while 1u64 << (k - 1) >= range { + k -= 1; + } + + // The following algorithm selects a random u64 value using `ChaCha12Rng` + // and reduces the result to the k-bits such that 2^k >= `range` the closes power of to the `range`. + // After we check if the result lies in [0..`range`) or [`range`..2^k). + // In the first case result is an acceptable value generated uniformly. + // In the second case we repeat the process again with the incremented iterations counter. + // Ref: Practical Cryptography 1st Edition by Niels Ferguson, Bruce Schneier, paragraph 10.8 + let rng = Random::from_seed(Seed::unsafe_new(seed)); + loop { + let mut raw_res: u64 = rng.gen(); + raw_res >>= 64 - k; + + if raw_res < range { + return raw_res; + } + // Executing this loop does not require a large number of iterations. + // Check tests for more info + } + } +} + +impl> LeaderElector for DefaultLeaderElector { + /// Compute leader in a weighed randomized manner. + /// Uses seed from the config, making it deterministic. + fn get_leader(&self, party: &Party) -> Result { + let seed = DefaultLeaderElector::compute_seed(party); + + let total_weight: u64 = party.cfg.party_weights.iter().sum(); + if total_weight == 0 { + return Err(BallotError::LeaderElection("Zero weight sum".into())); + } + + // Generate a random number in the range [0, total_weight) + let random_value = DefaultLeaderElector::hash_to_range(seed, total_weight); + + // Use binary search to find the corresponding participant + let mut cumulative_weights = vec![0; party.cfg.party_weights.len()]; + cumulative_weights[0] = party.cfg.party_weights[0]; + + for i in 1..party.cfg.party_weights.len() { + cumulative_weights[i] = cumulative_weights[i - 1] + party.cfg.party_weights[i]; + } + + match cumulative_weights.binary_search_by(|&weight| { + if random_value < weight { + Ordering::Greater + } else { + Ordering::Less + } + }) { + Ok(index) | Err(index) => Ok(index as u64), + } + } +} /// Party of the BPCon protocol that executes ballot. /// /// The communication between party and external @@ -158,12 +214,12 @@ pub struct Party> { pub id: u64, /// Communication queues. - msg_in_receiver: Receiver, - msg_out_sender: Sender, + msg_in_receiver: UnboundedReceiver, + msg_out_sender: UnboundedSender, /// Query to receive and send events that run ballot protocol - event_receiver: Receiver, - event_sender: Sender, + event_receiver: UnboundedReceiver, + event_sender: UnboundedSender, /// BPCon config (e.g. ballot time bounds, parties weights, etc.). cfg: BPConConfig, @@ -171,12 +227,18 @@ pub struct Party> { /// Main functional for value selection. value_selector: VS, + /// Main functional for leader election. + elector: Box>, + /// Status of the ballot execution status: PartyStatus, /// Current ballot number ballot: u64, + /// Current ballot leader + leader: u64, + /// Last ballot where party submitted 2b message last_ballot_voted: Option, @@ -208,10 +270,15 @@ impl> Party { id: u64, cfg: BPConConfig, value_selector: VS, - ) -> (Self, Receiver, Sender) { - let (event_sender, event_receiver) = channel(); - let (msg_in_sender, msg_in_receiver) = channel(); - let (msg_out_sender, msg_out_receiver) = channel(); + elector: Box>, + ) -> ( + Self, + UnboundedReceiver, + UnboundedSender, + ) { + let (event_sender, event_receiver) = unbounded_channel(); + let (msg_in_sender, msg_in_receiver) = unbounded_channel(); + let (msg_out_sender, msg_out_receiver) = unbounded_channel(); ( Self { @@ -222,8 +289,10 @@ impl> Party { event_sender, cfg, value_selector, + elector, status: PartyStatus::None, ballot: 0, + leader: 0, last_ballot_voted: None, last_value_voted: None, parties_voted_before: HashMap::new(), @@ -262,65 +331,114 @@ impl> Party { self.value_selector.select(&self.parties_voted_before) } - /// Start the next ballot. It's expected from the external system to re-run ballot protocol in - /// case of failed ballot. pub async fn launch_ballot(&mut self) -> Result, BallotError> { - self.prepare_next_ballot(); + self.prepare_next_ballot()?; + time::sleep(self.cfg.launch_timeout).await; - while self.is_launched() { - if let Ok(msg_wire) = self.msg_in_receiver.try_recv() { - if let Err(err) = self.update_state(msg_wire.content_bytes, msg_wire.routing) { - self.status = PartyStatus::Failed; - return Err(err); - } - } + self.status = PartyStatus::Launched; - if let Ok(event) = self.event_receiver.try_recv() { - if let Err(err) = self.follow_event(event) { - self.status = PartyStatus::Failed; - return Err(err); - } - } + let launch1a_timer = time::sleep(self.cfg.launch1a_timeout); + let launch1b_timer = time::sleep(self.cfg.launch1b_timeout); + let launch2a_timer = time::sleep(self.cfg.launch2a_timeout); + let launch2av_timer = time::sleep(self.cfg.launch2av_timeout); + let launch2b_timer = time::sleep(self.cfg.launch2b_timeout); + let finalize_timer = time::sleep(self.cfg.finalize_timeout); + + tokio::pin!( + launch1a_timer, + launch1b_timer, + launch2a_timer, + launch2av_timer, + launch2b_timer, + finalize_timer + ); + + let mut launch1a_fired = false; + let mut launch1b_fired = false; + let mut launch2a_fired = false; + let mut launch2av_fired = false; + let mut launch2b_fired = false; + let mut finalize_fired = false; - // TODO: Emit events to run ballot protocol according to the ballot configuration - self.event_sender.send(PartyEvent::Launch1a).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch1a event".into()) - })?; - - self.event_sender.send(PartyEvent::Launch1b).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch1b event".into()) - })?; - - self.event_sender.send(PartyEvent::Launch2a).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch2a event".into()) - })?; - - self.event_sender.send(PartyEvent::Launch2av).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch2av event".into()) - })?; - - self.event_sender.send(PartyEvent::Launch2b).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch2b event".into()) - })?; - - self.event_sender.send(PartyEvent::Finalize).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Finalize event".into()) - })?; + while self.is_launched() { + tokio::select! { + _ = &mut launch1a_timer, if !launch1a_fired => { + self.event_sender.send(PartyEvent::Launch1a).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch1a event".into()) + })?; + launch1a_fired = true; + }, + _ = &mut launch1b_timer, if !launch1b_fired => { + self.event_sender.send(PartyEvent::Launch1b).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch1b event".into()) + })?; + launch1b_fired = true; + }, + _ = &mut launch2a_timer, if !launch2a_fired => { + self.event_sender.send(PartyEvent::Launch2a).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch2a event".into()) + })?; + launch2a_fired = true; + }, + _ = &mut launch2av_timer, if !launch2av_fired => { + self.event_sender.send(PartyEvent::Launch2av).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch2av event".into()) + })?; + launch2av_fired = true; + }, + _ = &mut launch2b_timer, if !launch2b_fired => { + self.event_sender.send(PartyEvent::Launch2b).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch2b event".into()) + })?; + launch2b_fired = true; + }, + _ = &mut finalize_timer, if !finalize_fired => { + self.event_sender.send(PartyEvent::Finalize).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Finalize event".into()) + })?; + finalize_fired = true; + }, + msg_wire = self.msg_in_receiver.recv() => { + tokio::time::sleep(self.cfg.grace_period).await; + if let Some(msg_wire) = msg_wire { + if let Err(err) = self.update_state(msg_wire.content_bytes, msg_wire.routing) { + self.status = PartyStatus::Failed; + return Err(err); + } + }else if self.msg_in_receiver.is_closed(){ + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("msg-in channel closed".into())); + } + }, + event = self.event_receiver.recv() => { + tokio::time::sleep(self.cfg.grace_period).await; + if let Some(event) = event { + if let Err(err) = self.follow_event(event) { + self.status = PartyStatus::Failed; + return Err(err); + } + }else if self.event_receiver.is_closed(){ + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("event receiver channel closed".into())); + } + }, + } } Ok(self.get_value_selected()) } /// Prepare state before running a ballot. - fn prepare_next_ballot(&mut self) { + fn prepare_next_ballot(&mut self) -> Result<(), BallotError> { self.status = PartyStatus::None; self.ballot += 1; + self.leader = self.elector.get_leader(self)?; // Clean state self.parties_voted_before = HashMap::new(); @@ -334,12 +452,18 @@ impl> Party { while self.msg_in_receiver.try_recv().is_ok() {} self.status = PartyStatus::Launched; + Ok(()) } /// Update party's state based on message type. fn update_state(&mut self, m: AlignedVec, routing: MessageRouting) -> Result<(), BallotError> { match routing.msg_type { ProtocolMessage::Msg1a => { + if self.status != PartyStatus::Launched { + return Err(BallotError::InvalidState( + "Received Msg1a message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -356,13 +480,18 @@ impl> Party { )); } - if routing.sender != self.cfg.leader { + if routing.sender != self.leader { return Err(BallotError::InvalidState("Invalid leader in Msg1a".into())); } self.status = PartyStatus::Passed1a; } ProtocolMessage::Msg1b => { + if self.status != PartyStatus::Passed1a { + return Err(BallotError::InvalidState( + "Received Msg1b message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -406,6 +535,11 @@ impl> Party { } } ProtocolMessage::Msg2a => { + if self.status != PartyStatus::Passed1b { + return Err(BallotError::InvalidState( + "Received Msg2a message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -422,7 +556,7 @@ impl> Party { )); } - if routing.sender != self.cfg.leader { + if routing.sender != self.leader { return Err(BallotError::InvalidState("Invalid leader in Msg2a".into())); } @@ -443,6 +577,11 @@ impl> Party { } } ProtocolMessage::Msg2av => { + if self.status != PartyStatus::Passed2a { + return Err(BallotError::InvalidState( + "Received Msg2av message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -484,6 +623,11 @@ impl> Party { } } ProtocolMessage::Msg2b => { + if self.status != PartyStatus::Passed2av { + return Err(BallotError::InvalidState( + "Received Msg2b message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -526,7 +670,7 @@ impl> Party { "Cannot launch 1a, incorrect state".into(), )); } - if self.cfg.leader == self.id { + if self.leader == self.id { self.msg_out_sender .send(MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&Message1aContent { @@ -576,7 +720,7 @@ impl> Party { "Cannot launch 2a, incorrect state".into(), )); } - if self.cfg.leader == self.id { + if self.leader == self.id { self.msg_out_sender .send(MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&Message2aContent { @@ -649,7 +793,10 @@ impl> Party { mod tests { use super::*; + use rand::Rng; + use seeded_random::{Random, Seed}; use std::collections::HashMap; + use std::thread; // Mock implementation of Value #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -670,18 +817,36 @@ mod tests { } } + fn default_config() -> BPConConfig { + BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + launch_timeout: Duration::from_secs(0), + launch1a_timeout: Duration::from_secs(0), + launch1b_timeout: Duration::from_secs(10), + launch2a_timeout: Duration::from_secs(20), + launch2av_timeout: Duration::from_secs(30), + launch2b_timeout: Duration::from_secs(40), + finalize_timeout: Duration::from_secs(50), + grace_period: Duration::from_secs(1), + } + } + #[test] fn test_compute_leader_determinism() { - let party_weights = vec![1, 2, 7]; // Weighted case - let threshold = 7; // example threshold - - // Initialize the configuration once - let config = BPConConfig::new(party_weights.clone(), threshold); + let cfg = default_config(); + let party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; // Compute the leader multiple times - let leader1 = config.compute_leader().unwrap(); - let leader2 = config.compute_leader().unwrap(); - let leader3 = config.compute_leader().unwrap(); + let leader1 = party.elector.get_leader(&party).unwrap(); + let leader2 = party.elector.get_leader(&party).unwrap(); + let leader3 = party.elector.get_leader(&party).unwrap(); // All leaders should be the same due to deterministic seed assert_eq!( @@ -695,29 +860,45 @@ mod tests { } #[test] - #[should_panic] fn test_compute_leader_zero_weights() { - let party_weights = vec![0, 0, 0]; - let threshold = 1; // example threshold + let mut cfg = default_config(); + cfg.party_weights = vec![0, 0, 0]; + + let party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; - // Create the config, which will attempt to compute the leader - BPConConfig::new(party_weights, threshold); + match party.elector.get_leader(&party) { + Err(BallotError::LeaderElection(_)) => { + // The test passes if the error is of type LeaderElection + } + _ => panic!("Expected BallotError::LeaderElection"), + } } #[test] fn test_update_state_msg1a() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 1, - }; - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let cfg = default_config(); + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Launched; party.ballot = 1; + // Must send this message from leader of the ballot. + party.leader = 0; // this party's id + let msg = Message1aContent { ballot: 1 }; let routing = MessageRouting { - sender: 1, + sender: 0, receivers: vec![2, 3], is_broadcast: false, msg_type: ProtocolMessage::Msg1a, @@ -736,12 +917,14 @@ mod tests { #[test] fn test_update_state_msg1b() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], // Total weight is 6 - threshold: 4, // Threshold is 4 - leader: 1, - }; - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let cfg = default_config(); + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Passed1a; party.ballot = 1; @@ -795,21 +978,26 @@ mod tests { #[test] fn test_update_state_msg2a() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 1, - }; - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let cfg = default_config(); + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Passed1b; party.ballot = 1; + // Must send this message from leader of the ballot. + party.leader = 0; // this party's id + let msg = Message2aContent { ballot: 1, value: bincode::serialize(&MockValue(42)).unwrap(), }; let routing = MessageRouting { - sender: 1, + sender: 0, receivers: vec![0], is_broadcast: false, msg_type: ProtocolMessage::Msg2a, @@ -829,12 +1017,14 @@ mod tests { #[test] fn test_update_state_msg2av() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 1, - }; - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let cfg = default_config(); + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Passed2a; party.ballot = 1; party.value_2a = Some(MockValue(42)); @@ -887,12 +1077,14 @@ mod tests { #[test] fn test_update_state_msg2b() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 1, - }; - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let cfg = default_config(); + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Passed2av; party.ballot = 1; @@ -954,13 +1146,14 @@ mod tests { #[test] fn test_follow_event_launch1a() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 0, - }; + let cfg = default_config(); let (mut party, _msg_out_receiver, _msg_in_sender) = - Party::::new(0, cfg, MockValueSelector); + Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ); party.status = PartyStatus::Launched; party.ballot = 1; @@ -975,18 +1168,18 @@ mod tests { #[test] fn test_ballot_reset_after_failure() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 0, - }; - let (mut party, _, _) = - Party::::new(0, cfg, MockValueSelector); + let cfg = default_config(); + let (mut party, _, _) = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ); party.status = PartyStatus::Failed; party.ballot = 1; - party.prepare_next_ballot(); + party.prepare_next_ballot().unwrap(); // Check that state has been reset assert_eq!(party.status, PartyStatus::Launched); @@ -1001,13 +1194,18 @@ mod tests { #[test] fn test_follow_event_communication_failure() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 0, - }; - let (mut party, msg_out_receiver, _) = - Party::::new(0, cfg, MockValueSelector); + let cfg = default_config(); + + // This party id is precomputed for this specific party_weights, threshold and ballot. + // Because we need leader to send 1a. + let party_id = 0; + + let (mut party, msg_out_receiver, _) = Party::::new( + party_id, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ); party.status = PartyStatus::Launched; party.ballot = 1; @@ -1026,4 +1224,123 @@ mod tests { _ => panic!("Expected BallotError::Communication, got {:?}", result), } } + + #[tokio::test] + async fn test_launch_ballot_events() { + // Pause the Tokio time so we can manipulate it + time::pause(); + + // Set up the Party with necessary configuration + let cfg = default_config(); + let (event_sender, mut event_receiver) = unbounded_channel(); + + // Need to return all 3 values, so that they don't get dropped + // and associated channels don't get closed. + let (mut party, _msg_out_receiver, _msg_in_sender) = + Party::::new( + 0, + cfg.clone(), + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ); + + // Same here, we would like to not lose party's event_receiver, so that test doesn't fail. + let _event_sender = party.event_sender; + party.event_sender = event_sender; + + // Spawn the launch_ballot function in a separate task + let _ballot_task = tokio::spawn(async move { + party.launch_ballot().await.unwrap(); + }); + + // Sequential time advance and event check + time::advance(cfg.launch1a_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1a); + + time::advance(cfg.launch1b_timeout - cfg.launch1a_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1b); + + time::advance(cfg.launch2a_timeout - cfg.launch1b_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2a); + + time::advance(cfg.launch2av_timeout - cfg.launch2a_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2av); + + time::advance(cfg.launch2b_timeout - cfg.launch2av_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2b); + + time::advance(cfg.finalize_timeout - cfg.launch2b_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Finalize); + } + + fn debug_hash_to_range_new(seed: u64, range: u64) -> u64 { + assert!(range > 1); + + let mut k = 64; + while 1u64 << (k - 1) >= range { + k -= 1; + } + + let rng = Random::from_seed(Seed::unsafe_new(seed)); + + let mut iteration = 1u64; + loop { + let mut raw_res: u64 = rng.gen(); + raw_res >>= 64 - k; + + if raw_res < range { + return raw_res; + } + + iteration += 1; + assert!(iteration <= 50) + } + } + + #[test] + #[ignore] // Ignoring since it takes a while to run + fn test_hash_range_random() { + // test the uniform distribution + + const N: usize = 37; + const M: i64 = 10000000; + + let mut cnt1: [i64; N] = [0; N]; + + for _ in 0..M { + let mut rng = rand::thread_rng(); + let seed: u64 = rng.random(); + + let res1 = debug_hash_to_range_new(seed, N as u64); + assert!(res1 < N as u64); + + cnt1[res1 as usize] += 1; + } + + println!("1: {:?}", cnt1); + + let mut avg1: i64 = 0; + + for item in cnt1.iter().take(N) { + avg1 += (M / (N as i64) - item).abs(); + } + + avg1 /= N as i64; + + println!("Avg 1: {}", avg1); + } + + #[test] + fn test_rng() { + let rng1 = Random::from_seed(Seed::unsafe_new(123456)); + let rng2 = Random::from_seed(Seed::unsafe_new(123456)); + + println!("{}", rng1.gen::()); + println!("{}", rng2.gen::()); + + thread::sleep(Duration::from_secs(2)); + + println!("{}", rng1.gen::()); + println!("{}", rng2.gen::()); + } }