diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f8d9382..4e6ca37 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -4,6 +4,7 @@ on: pull_request: branches: - main + - epic/* push: branches: - main diff --git a/Cargo.toml b/Cargo.toml index 4383852..4d16495 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,8 +6,10 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -rand = "0.9.0-alpha.2" log = "0.4.22" rkyv = { version = "0.7.44", features = ["validation"]} serde = { version = "1.0.207", features = ["derive"] } bincode = "1.3.3" +tokio = { version = "1.39.2", features = ["full", "test-util"] } +rand = "0.9.0-alpha.2" +seeded-random = "0.6.0" \ No newline at end of file diff --git a/src/party.rs b/src/party.rs index 85f6b6f..c32ddd1 100644 --- a/src/party.rs +++ b/src/party.rs @@ -6,18 +6,18 @@ use crate::message::{ MessagePacket, MessageRouting, ProtocolMessage, }; use crate::{Value, ValueSelector}; -use rand::prelude::StdRng; -use rand::prelude::*; -use rand::Rng; use rkyv::{AlignedVec, Deserialize, Infallible}; -use std::cmp::PartialEq; +use seeded_random::{Random, Seed}; +use std::cmp::{Ordering, PartialEq}; use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::Entry::Vacant; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::time::{self, Duration}; /// BPCon configuration. Includes ballot time bounds and other stuff. +#[derive(PartialEq, Eq, Debug, Clone)] pub struct BPConConfig { /// Parties weights: `party_weights[i]` corresponds to the i-th party weight pub party_weights: Vec, @@ -25,62 +25,31 @@ pub struct BPConConfig { /// Threshold weight to define BFT quorum: should be > 2/3 of total weight pub threshold: u128, - /// Leader of the ballot, computed using seed obtained from config. - leader: u64, - // TODO: define other config fields. -} + /// Timeout before ballot is launched. + /// Differs from `launch1a_timeout` having another status and not listening + /// to external events and messages. + pub launch_timeout: Duration, -impl BPConConfig { - /// Create new config instance. - pub fn new(party_weights: Vec, threshold: u128) -> Self { - let mut cfg = Self { - party_weights, - threshold, - leader: 0, - }; - cfg.leader = cfg.compute_leader().unwrap(); + /// Timeout before 1a stage is launched. + pub launch1a_timeout: Duration, - cfg - } + /// Timeout before 1b stage is launched. + pub launch1b_timeout: Duration, - /// Compute leader in a weighed randomized manner. - /// Uses seed from the config, making it deterministic. - fn compute_leader(&self) -> Result { - let seed = self.compute_seed(); + /// Timeout before 2a stage is launched. + pub launch2a_timeout: Duration, - let total_weight: u64 = self.party_weights.iter().sum(); - if total_weight == 0 { - return Err(BallotError::LeaderElection("Zero weight sum".into())); - } - - // Use the seed from the config to create a deterministic random number generator. - let mut rng = StdRng::seed_from_u64(seed); + /// Timeout before 2av stage is launched. + pub launch2av_timeout: Duration, - let random_value: u64 = rng.gen_range(0..total_weight); + /// Timeout before 2b stage is launched. + pub launch2b_timeout: Duration, - let mut cumulative_weight = 0; - for (i, &weight) in self.party_weights.iter().enumerate() { - cumulative_weight += weight; - if random_value < cumulative_weight { - return Ok(i as u64); - } - } - Err(BallotError::LeaderElection("Election failed".into())) - } + /// Timeout before finalization stage is launched. + pub finalize_timeout: Duration, - /// Compute seed for randomized leader election. - fn compute_seed(&self) -> u64 { - let mut hasher = DefaultHasher::new(); - - // Hash each field that should contribute to the seed - self.party_weights.hash(&mut hasher); - self.threshold.hash(&mut hasher); - - // You can add more fields as needed - - // Generate the seed from the hash - hasher.finish() - } + /// Timeout for a graceful period to help parties with latency. + pub grace_period: Duration, } /// Party status defines the statuses of the ballot for the particular participant @@ -99,7 +68,7 @@ pub(crate) enum PartyStatus { } /// Party events is used for the ballot flow control. -#[derive(PartialEq)] +#[derive(PartialEq, Debug)] pub(crate) enum PartyEvent { Launch1a, Launch1b, @@ -110,6 +79,7 @@ pub(crate) enum PartyEvent { } /// A struct to keep track of senders and the cumulative weight of their messages. +#[derive(PartialEq, Debug)] struct MessageRoundState { senders: HashSet, weight: u128, @@ -142,6 +112,92 @@ impl MessageRoundState { } } +/// Trait incorporating logic for leader election. +pub trait LeaderElector>: Send { + /// Get leader for current ballot. + /// Returns id of the elected party or error. + fn get_leader(&self, party: &Party) -> Result; +} + +pub struct DefaultLeaderElector {} + +impl DefaultLeaderElector { + /// Compute seed for randomized leader election. + fn compute_seed>(party: &Party) -> u64 { + let mut hasher = DefaultHasher::new(); + + // Hash each field that should contribute to the seed + party.cfg.party_weights.hash(&mut hasher); + party.cfg.threshold.hash(&mut hasher); + party.ballot.hash(&mut hasher); + + // You can add more fields as needed + + // Generate the seed from the hash + hasher.finish() + } + + /// Hash the seed to a value within a given range. + fn hash_to_range(seed: u64, range: u64) -> u64 { + // Select the `k` suck that value 2^k >= `range` and 2^k is the smallest. + let mut k = 64; + while 1u64 << (k - 1) >= range { + k -= 1; + } + + // The following algorithm selects a random u64 value using `ChaCha12Rng` + // and reduces the result to the k-bits such that 2^k >= `range` the closes power of to the `range`. + // After we check if the result lies in [0..`range`) or [`range`..2^k). + // In the first case result is an acceptable value generated uniformly. + // In the second case we repeat the process again with the incremented iterations counter. + // Ref: Practical Cryptography 1st Edition by Niels Ferguson, Bruce Schneier, paragraph 10.8 + let rng = Random::from_seed(Seed::unsafe_new(seed)); + loop { + let mut raw_res: u64 = rng.gen(); + raw_res >>= 64 - k; + + if raw_res < range { + return raw_res; + } + // Executing this loop does not require a large number of iterations. + // Check tests for more info + } + } +} + +impl> LeaderElector for DefaultLeaderElector { + /// Compute leader in a weighed randomized manner. + /// Uses seed from the config, making it deterministic. + fn get_leader(&self, party: &Party) -> Result { + let seed = DefaultLeaderElector::compute_seed(party); + + let total_weight: u64 = party.cfg.party_weights.iter().sum(); + if total_weight == 0 { + return Err(BallotError::LeaderElection("Zero weight sum".into())); + } + + // Generate a random number in the range [0, total_weight) + let random_value = DefaultLeaderElector::hash_to_range(seed, total_weight); + + // Use binary search to find the corresponding participant + let mut cumulative_weights = vec![0; party.cfg.party_weights.len()]; + cumulative_weights[0] = party.cfg.party_weights[0]; + + for i in 1..party.cfg.party_weights.len() { + cumulative_weights[i] = cumulative_weights[i - 1] + party.cfg.party_weights[i]; + } + + match cumulative_weights.binary_search_by(|&weight| { + if random_value < weight { + Ordering::Greater + } else { + Ordering::Less + } + }) { + Ok(index) | Err(index) => Ok(index as u64), + } + } +} /// Party of the BPCon protocol that executes ballot. /// /// The communication between party and external @@ -158,12 +214,12 @@ pub struct Party> { pub id: u64, /// Communication queues. - msg_in_receiver: Receiver, - msg_out_sender: Sender, + msg_in_receiver: UnboundedReceiver, + msg_out_sender: UnboundedSender, /// Query to receive and send events that run ballot protocol - event_receiver: Receiver, - event_sender: Sender, + event_receiver: UnboundedReceiver, + event_sender: UnboundedSender, /// BPCon config (e.g. ballot time bounds, parties weights, etc.). cfg: BPConConfig, @@ -171,12 +227,18 @@ pub struct Party> { /// Main functional for value selection. value_selector: VS, + /// Main functional for leader election. + elector: Box>, + /// Status of the ballot execution status: PartyStatus, /// Current ballot number ballot: u64, + /// Current ballot leader + leader: u64, + /// Last ballot where party submitted 2b message last_ballot_voted: Option, @@ -208,10 +270,15 @@ impl> Party { id: u64, cfg: BPConConfig, value_selector: VS, - ) -> (Self, Receiver, Sender) { - let (event_sender, event_receiver) = channel(); - let (msg_in_sender, msg_in_receiver) = channel(); - let (msg_out_sender, msg_out_receiver) = channel(); + elector: Box>, + ) -> ( + Self, + UnboundedReceiver, + UnboundedSender, + ) { + let (event_sender, event_receiver) = unbounded_channel(); + let (msg_in_sender, msg_in_receiver) = unbounded_channel(); + let (msg_out_sender, msg_out_receiver) = unbounded_channel(); ( Self { @@ -222,8 +289,10 @@ impl> Party { event_sender, cfg, value_selector, + elector, status: PartyStatus::None, ballot: 0, + leader: 0, last_ballot_voted: None, last_value_voted: None, parties_voted_before: HashMap::new(), @@ -262,65 +331,114 @@ impl> Party { self.value_selector.select(&self.parties_voted_before) } - /// Start the next ballot. It's expected from the external system to re-run ballot protocol in - /// case of failed ballot. pub async fn launch_ballot(&mut self) -> Result, BallotError> { - self.prepare_next_ballot(); + self.prepare_next_ballot()?; + time::sleep(self.cfg.launch_timeout).await; - while self.is_launched() { - if let Ok(msg_wire) = self.msg_in_receiver.try_recv() { - if let Err(err) = self.update_state(msg_wire.content_bytes, msg_wire.routing) { - self.status = PartyStatus::Failed; - return Err(err); - } - } + self.status = PartyStatus::Launched; - if let Ok(event) = self.event_receiver.try_recv() { - if let Err(err) = self.follow_event(event) { - self.status = PartyStatus::Failed; - return Err(err); - } - } + let launch1a_timer = time::sleep(self.cfg.launch1a_timeout); + let launch1b_timer = time::sleep(self.cfg.launch1b_timeout); + let launch2a_timer = time::sleep(self.cfg.launch2a_timeout); + let launch2av_timer = time::sleep(self.cfg.launch2av_timeout); + let launch2b_timer = time::sleep(self.cfg.launch2b_timeout); + let finalize_timer = time::sleep(self.cfg.finalize_timeout); + + tokio::pin!( + launch1a_timer, + launch1b_timer, + launch2a_timer, + launch2av_timer, + launch2b_timer, + finalize_timer + ); + + let mut launch1a_fired = false; + let mut launch1b_fired = false; + let mut launch2a_fired = false; + let mut launch2av_fired = false; + let mut launch2b_fired = false; + let mut finalize_fired = false; - // TODO: Emit events to run ballot protocol according to the ballot configuration - self.event_sender.send(PartyEvent::Launch1a).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch1a event".into()) - })?; - - self.event_sender.send(PartyEvent::Launch1b).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch1b event".into()) - })?; - - self.event_sender.send(PartyEvent::Launch2a).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch2a event".into()) - })?; - - self.event_sender.send(PartyEvent::Launch2av).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch2av event".into()) - })?; - - self.event_sender.send(PartyEvent::Launch2b).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch2b event".into()) - })?; - - self.event_sender.send(PartyEvent::Finalize).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Finalize event".into()) - })?; + while self.is_launched() { + tokio::select! { + _ = &mut launch1a_timer, if !launch1a_fired => { + self.event_sender.send(PartyEvent::Launch1a).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch1a event".into()) + })?; + launch1a_fired = true; + }, + _ = &mut launch1b_timer, if !launch1b_fired => { + self.event_sender.send(PartyEvent::Launch1b).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch1b event".into()) + })?; + launch1b_fired = true; + }, + _ = &mut launch2a_timer, if !launch2a_fired => { + self.event_sender.send(PartyEvent::Launch2a).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch2a event".into()) + })?; + launch2a_fired = true; + }, + _ = &mut launch2av_timer, if !launch2av_fired => { + self.event_sender.send(PartyEvent::Launch2av).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch2av event".into()) + })?; + launch2av_fired = true; + }, + _ = &mut launch2b_timer, if !launch2b_fired => { + self.event_sender.send(PartyEvent::Launch2b).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch2b event".into()) + })?; + launch2b_fired = true; + }, + _ = &mut finalize_timer, if !finalize_fired => { + self.event_sender.send(PartyEvent::Finalize).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Finalize event".into()) + })?; + finalize_fired = true; + }, + msg_wire = self.msg_in_receiver.recv() => { + tokio::time::sleep(self.cfg.grace_period).await; + if let Some(msg_wire) = msg_wire { + if let Err(err) = self.update_state(msg_wire.content_bytes, msg_wire.routing) { + self.status = PartyStatus::Failed; + return Err(err); + } + }else if self.msg_in_receiver.is_closed(){ + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("msg-in channel closed".into())); + } + }, + event = self.event_receiver.recv() => { + tokio::time::sleep(self.cfg.grace_period).await; + if let Some(event) = event { + if let Err(err) = self.follow_event(event) { + self.status = PartyStatus::Failed; + return Err(err); + } + }else if self.event_receiver.is_closed(){ + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("event receiver channel closed".into())); + } + }, + } } Ok(self.get_value_selected()) } /// Prepare state before running a ballot. - fn prepare_next_ballot(&mut self) { + fn prepare_next_ballot(&mut self) -> Result<(), BallotError> { self.status = PartyStatus::None; self.ballot += 1; + self.leader = self.elector.get_leader(self)?; // Clean state self.parties_voted_before = HashMap::new(); @@ -334,12 +452,18 @@ impl> Party { while self.msg_in_receiver.try_recv().is_ok() {} self.status = PartyStatus::Launched; + Ok(()) } /// Update party's state based on message type. fn update_state(&mut self, m: AlignedVec, routing: MessageRouting) -> Result<(), BallotError> { match routing.msg_type { ProtocolMessage::Msg1a => { + if self.status != PartyStatus::Launched { + return Err(BallotError::InvalidState( + "Received Msg1a message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -356,13 +480,18 @@ impl> Party { )); } - if routing.sender != self.cfg.leader { + if routing.sender != self.leader { return Err(BallotError::InvalidState("Invalid leader in Msg1a".into())); } self.status = PartyStatus::Passed1a; } ProtocolMessage::Msg1b => { + if self.status != PartyStatus::Passed1a { + return Err(BallotError::InvalidState( + "Received Msg1b message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -406,6 +535,11 @@ impl> Party { } } ProtocolMessage::Msg2a => { + if self.status != PartyStatus::Passed1b { + return Err(BallotError::InvalidState( + "Received Msg2a message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -422,7 +556,7 @@ impl> Party { )); } - if routing.sender != self.cfg.leader { + if routing.sender != self.leader { return Err(BallotError::InvalidState("Invalid leader in Msg2a".into())); } @@ -443,6 +577,11 @@ impl> Party { } } ProtocolMessage::Msg2av => { + if self.status != PartyStatus::Passed2a { + return Err(BallotError::InvalidState( + "Received Msg2av message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -484,6 +623,11 @@ impl> Party { } } ProtocolMessage::Msg2b => { + if self.status != PartyStatus::Passed2av { + return Err(BallotError::InvalidState( + "Received Msg2b message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -526,7 +670,7 @@ impl> Party { "Cannot launch 1a, incorrect state".into(), )); } - if self.cfg.leader == self.id { + if self.leader == self.id { self.msg_out_sender .send(MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&Message1aContent { @@ -576,7 +720,7 @@ impl> Party { "Cannot launch 2a, incorrect state".into(), )); } - if self.cfg.leader == self.id { + if self.leader == self.id { self.msg_out_sender .send(MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&Message2aContent { @@ -649,7 +793,10 @@ impl> Party { mod tests { use super::*; + use rand::Rng; + use seeded_random::{Random, Seed}; use std::collections::HashMap; + use std::thread; // Mock implementation of Value #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -670,18 +817,36 @@ mod tests { } } + fn default_config() -> BPConConfig { + BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + launch_timeout: Duration::from_secs(0), + launch1a_timeout: Duration::from_secs(0), + launch1b_timeout: Duration::from_secs(10), + launch2a_timeout: Duration::from_secs(20), + launch2av_timeout: Duration::from_secs(30), + launch2b_timeout: Duration::from_secs(40), + finalize_timeout: Duration::from_secs(50), + grace_period: Duration::from_secs(1), + } + } + #[test] fn test_compute_leader_determinism() { - let party_weights = vec![1, 2, 7]; // Weighted case - let threshold = 7; // example threshold - - // Initialize the configuration once - let config = BPConConfig::new(party_weights.clone(), threshold); + let cfg = default_config(); + let party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; // Compute the leader multiple times - let leader1 = config.compute_leader().unwrap(); - let leader2 = config.compute_leader().unwrap(); - let leader3 = config.compute_leader().unwrap(); + let leader1 = party.elector.get_leader(&party).unwrap(); + let leader2 = party.elector.get_leader(&party).unwrap(); + let leader3 = party.elector.get_leader(&party).unwrap(); // All leaders should be the same due to deterministic seed assert_eq!( @@ -695,29 +860,45 @@ mod tests { } #[test] - #[should_panic] fn test_compute_leader_zero_weights() { - let party_weights = vec![0, 0, 0]; - let threshold = 1; // example threshold + let mut cfg = default_config(); + cfg.party_weights = vec![0, 0, 0]; + + let party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; - // Create the config, which will attempt to compute the leader - BPConConfig::new(party_weights, threshold); + match party.elector.get_leader(&party) { + Err(BallotError::LeaderElection(_)) => { + // The test passes if the error is of type LeaderElection + } + _ => panic!("Expected BallotError::LeaderElection"), + } } #[test] fn test_update_state_msg1a() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 1, - }; - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let cfg = default_config(); + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Launched; party.ballot = 1; + // Must send this message from leader of the ballot. + party.leader = 0; // this party's id + let msg = Message1aContent { ballot: 1 }; let routing = MessageRouting { - sender: 1, + sender: 0, receivers: vec![2, 3], is_broadcast: false, msg_type: ProtocolMessage::Msg1a, @@ -736,12 +917,14 @@ mod tests { #[test] fn test_update_state_msg1b() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], // Total weight is 6 - threshold: 4, // Threshold is 4 - leader: 1, - }; - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let cfg = default_config(); + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Passed1a; party.ballot = 1; @@ -795,21 +978,26 @@ mod tests { #[test] fn test_update_state_msg2a() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 1, - }; - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let cfg = default_config(); + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Passed1b; party.ballot = 1; + // Must send this message from leader of the ballot. + party.leader = 0; // this party's id + let msg = Message2aContent { ballot: 1, value: bincode::serialize(&MockValue(42)).unwrap(), }; let routing = MessageRouting { - sender: 1, + sender: 0, receivers: vec![0], is_broadcast: false, msg_type: ProtocolMessage::Msg2a, @@ -829,12 +1017,14 @@ mod tests { #[test] fn test_update_state_msg2av() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 1, - }; - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let cfg = default_config(); + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Passed2a; party.ballot = 1; party.value_2a = Some(MockValue(42)); @@ -887,12 +1077,14 @@ mod tests { #[test] fn test_update_state_msg2b() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 1, - }; - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let cfg = default_config(); + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Passed2av; party.ballot = 1; @@ -954,13 +1146,14 @@ mod tests { #[test] fn test_follow_event_launch1a() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 0, - }; + let cfg = default_config(); let (mut party, _msg_out_receiver, _msg_in_sender) = - Party::::new(0, cfg, MockValueSelector); + Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ); party.status = PartyStatus::Launched; party.ballot = 1; @@ -975,18 +1168,18 @@ mod tests { #[test] fn test_ballot_reset_after_failure() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 0, - }; - let (mut party, _, _) = - Party::::new(0, cfg, MockValueSelector); + let cfg = default_config(); + let (mut party, _, _) = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ); party.status = PartyStatus::Failed; party.ballot = 1; - party.prepare_next_ballot(); + party.prepare_next_ballot().unwrap(); // Check that state has been reset assert_eq!(party.status, PartyStatus::Launched); @@ -1001,13 +1194,18 @@ mod tests { #[test] fn test_follow_event_communication_failure() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - leader: 0, - }; - let (mut party, msg_out_receiver, _) = - Party::::new(0, cfg, MockValueSelector); + let cfg = default_config(); + + // This party id is precomputed for this specific party_weights, threshold and ballot. + // Because we need leader to send 1a. + let party_id = 0; + + let (mut party, msg_out_receiver, _) = Party::::new( + party_id, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ); party.status = PartyStatus::Launched; party.ballot = 1; @@ -1026,4 +1224,123 @@ mod tests { _ => panic!("Expected BallotError::Communication, got {:?}", result), } } + + #[tokio::test] + async fn test_launch_ballot_events() { + // Pause the Tokio time so we can manipulate it + time::pause(); + + // Set up the Party with necessary configuration + let cfg = default_config(); + let (event_sender, mut event_receiver) = unbounded_channel(); + + // Need to return all 3 values, so that they don't get dropped + // and associated channels don't get closed. + let (mut party, _msg_out_receiver, _msg_in_sender) = + Party::::new( + 0, + cfg.clone(), + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ); + + // Same here, we would like to not lose party's event_receiver, so that test doesn't fail. + let _event_sender = party.event_sender; + party.event_sender = event_sender; + + // Spawn the launch_ballot function in a separate task + let _ballot_task = tokio::spawn(async move { + party.launch_ballot().await.unwrap(); + }); + + // Sequential time advance and event check + time::advance(cfg.launch1a_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1a); + + time::advance(cfg.launch1b_timeout - cfg.launch1a_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1b); + + time::advance(cfg.launch2a_timeout - cfg.launch1b_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2a); + + time::advance(cfg.launch2av_timeout - cfg.launch2a_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2av); + + time::advance(cfg.launch2b_timeout - cfg.launch2av_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2b); + + time::advance(cfg.finalize_timeout - cfg.launch2b_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Finalize); + } + + fn debug_hash_to_range_new(seed: u64, range: u64) -> u64 { + assert!(range > 1); + + let mut k = 64; + while 1u64 << (k - 1) >= range { + k -= 1; + } + + let rng = Random::from_seed(Seed::unsafe_new(seed)); + + let mut iteration = 1u64; + loop { + let mut raw_res: u64 = rng.gen(); + raw_res >>= 64 - k; + + if raw_res < range { + return raw_res; + } + + iteration += 1; + assert!(iteration <= 50) + } + } + + #[test] + #[ignore] // Ignoring since it takes a while to run + fn test_hash_range_random() { + // test the uniform distribution + + const N: usize = 37; + const M: i64 = 10000000; + + let mut cnt1: [i64; N] = [0; N]; + + for _ in 0..M { + let mut rng = rand::thread_rng(); + let seed: u64 = rng.random(); + + let res1 = debug_hash_to_range_new(seed, N as u64); + assert!(res1 < N as u64); + + cnt1[res1 as usize] += 1; + } + + println!("1: {:?}", cnt1); + + let mut avg1: i64 = 0; + + for item in cnt1.iter().take(N) { + avg1 += (M / (N as i64) - item).abs(); + } + + avg1 /= N as i64; + + println!("Avg 1: {}", avg1); + } + + #[test] + fn test_rng() { + let rng1 = Random::from_seed(Seed::unsafe_new(123456)); + let rng2 = Random::from_seed(Seed::unsafe_new(123456)); + + println!("{}", rng1.gen::()); + println!("{}", rng2.gen::()); + + thread::sleep(Duration::from_secs(2)); + + println!("{}", rng1.gen::()); + println!("{}", rng2.gen::()); + } }