Skip to content

Commit

Permalink
fix: resolved events sending
Browse files Browse the repository at this point in the history
  • Loading branch information
NikitaMasych committed Aug 21, 2024
1 parent 55dc0b4 commit 2ebd22c
Showing 1 changed file with 40 additions and 36 deletions.
76 changes: 40 additions & 36 deletions src/party.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,21 +269,17 @@ impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
self.value_selector.select(&self.parties_voted_before)
}

/// Start the next ballot. It's expected from the external system to re-run ballot protocol in
/// case of failed ballot.
pub async fn launch_ballot(&mut self) -> Result<Option<V>, BallotError> {
self.prepare_next_ballot();

self.status = PartyStatus::Launched;

let launch1a_timer = time::sleep(self.cfg.launch1a_timeout);
let launch1b_timer = time::sleep(self.cfg.launch1b_timeout);
let launch2a_timer = time::sleep(self.cfg.launch2a_timeout);
let launch2av_timer = time::sleep(self.cfg.launch2av_timeout);
let launch2b_timer = time::sleep(self.cfg.launch2a_timeout);
let launch2b_timer = time::sleep(self.cfg.launch2b_timeout);
let finalize_timer = time::sleep(self.cfg.finalize_timeout);

// Prevent the timers from firing immediately
tokio::pin!(
launch1a_timer,
launch1b_timer,
Expand All @@ -293,54 +289,66 @@ impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
finalize_timer
);

let mut launch1a_fired = false;
let mut launch1b_fired = false;
let mut launch2a_fired = false;
let mut launch2av_fired = false;
let mut launch2b_fired = false;
let mut finalize_fired = false;

while self.is_launched() {
tokio::select! {
_ = &mut launch1a_timer => {
_ = &mut launch1a_timer, if !launch1a_fired => {
self.event_sender.send(PartyEvent::Launch1a).map_err(|_| {
self.status = PartyStatus::Failed;
BallotError::Communication("Failed to send Launch1a event".into())
})?;
launch1a_fired = true;
},
_ = &mut launch1b_timer => {
_ = &mut launch1b_timer, if !launch1b_fired => {
self.event_sender.send(PartyEvent::Launch1b).map_err(|_| {
self.status = PartyStatus::Failed;
BallotError::Communication("Failed to send Launch1b event".into())
})?;
launch1b_fired = true;
},
_ = &mut launch2a_timer => {
_ = &mut launch2a_timer, if !launch2a_fired => {
self.event_sender.send(PartyEvent::Launch2a).map_err(|_| {
self.status = PartyStatus::Failed;
BallotError::Communication("Failed to send Launch2a event".into())
})?;
launch2a_fired = true;
},
_ = &mut launch2av_timer => {
_ = &mut launch2av_timer, if !launch2av_fired => {
self.event_sender.send(PartyEvent::Launch2av).map_err(|_| {
self.status = PartyStatus::Failed;
BallotError::Communication("Failed to send Launch2av event".into())
})?;
launch2av_fired = true;
},
_ = &mut launch2b_timer => {
_ = &mut launch2b_timer, if !launch2b_fired => {
self.event_sender.send(PartyEvent::Launch2b).map_err(|_| {
self.status = PartyStatus::Failed;
BallotError::Communication("Failed to send Launch2b event".into())
})?;
launch2b_fired = true;
},
_ = &mut finalize_timer => {
_ = &mut finalize_timer, if !finalize_fired => {
self.event_sender.send(PartyEvent::Finalize).map_err(|_| {
self.status = PartyStatus::Failed;
BallotError::Communication("Failed to send Finalize event".into())
})?;
finalize_fired = true;
},
msg_wire = self.msg_in_receiver.recv() => {
if let Some(msg_wire) = msg_wire {
if let Err(err) = self.update_state(msg_wire.content_bytes, msg_wire.routing) {
self.status = PartyStatus::Failed;
return Err(err);
}
} else {
// Handle the case where the channel has been closed
self.status = PartyStatus::Failed;
return Err(BallotError::Communication("Message channel closed".into()));
}else if self.msg_in_receiver.is_closed(){
self.status = PartyStatus::Failed;
return Err(BallotError::Communication("msg-in channel closed".into()));
}
},
event = self.event_receiver.recv() => {
Expand All @@ -349,10 +357,9 @@ impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
self.status = PartyStatus::Failed;
return Err(err);
}
} else {
// Handle the case where the channel has been closed
}else if self.event_receiver.is_closed(){
self.status = PartyStatus::Failed;
return Err(BallotError::Communication("Event channel closed".into()));
return Err(BallotError::Communication("event receiver channel closed".into()));
}
},
}
Expand Down Expand Up @@ -1066,48 +1073,45 @@ mod tests {
}

#[tokio::test]
#[ignore] // This test is unfinished, turning off temporarily.
async fn test_launch_ballot_timeouts() {
async fn test_launch_ballot_events() {
// Pause the Tokio time so we can manipulate it
time::pause();

// Set up the Party with necessary configuration
let cfg = default_config();

let mut party =
Party::<MockValue, MockValueSelector>::new(0, cfg.clone(), MockValueSelector).0;

// Create the event and message channels to substitute for testing.
let (event_sender, mut event_receiver) = unbounded_channel();

// Need to return all 3 values, so that they don't get dropped
// and associated channels don't get closed.
let (mut party, _msg_out_receiver, _msg_in_sender) =
Party::<MockValue, MockValueSelector>::new(0, cfg.clone(), MockValueSelector);

// Same here, we would like to not lose party's event_receiver, so that test doesn't fail.
let _event_sender = party.event_sender;
party.event_sender = event_sender;
// Note that we don't change party.event_receiver

// Spawn the launch_ballot function in a separate task
let ballot_task = tokio::spawn(async move {
let _ballot_task = tokio::spawn(async move {
party.launch_ballot().await.unwrap();
});

// Fast-forward time and check that the correct event is sent after each interval
// Sequential time advance and event check
time::advance(cfg.launch1a_timeout).await;
assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1a);

time::advance(cfg.launch1b_timeout).await;
time::advance(cfg.launch1b_timeout - cfg.launch1a_timeout).await;
assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1b);

time::advance(cfg.launch2a_timeout).await;
time::advance(cfg.launch2a_timeout - cfg.launch1b_timeout).await;
assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2a);

time::advance(cfg.launch2av_timeout).await;
time::advance(cfg.launch2av_timeout - cfg.launch2a_timeout).await;
assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2av);

time::advance(cfg.launch2b_timeout).await;
time::advance(cfg.launch2b_timeout - cfg.launch2av_timeout).await;
assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2b);

time::advance(cfg.finalize_timeout).await;
time::advance(cfg.finalize_timeout - cfg.launch2b_timeout).await;
assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Finalize);

// Ensure that the task completes successfully
ballot_task.await.unwrap();
}
}

0 comments on commit 2ebd22c

Please sign in to comment.