Skip to content

Commit

Permalink
fix(consensus): avoid shutdown race
Browse files Browse the repository at this point in the history
  • Loading branch information
matthias-wright committed Feb 16, 2024
1 parent d48c9bb commit bdeb34b
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub struct Consensus<C: Collection> {
/// Called from the shutdown function to notify the start event loop to
/// exit.
shutdown_notify: Arc<Notify>,
/// To notify the epoch state when consensus is shutting down
shutdown_notify_epoch_state: Arc<Notify>,
/// bool indicating if narwhal is running
is_running: AtomicBool,
}
Expand Down Expand Up @@ -96,7 +98,7 @@ struct EpochState<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static, N
pub_sub: P,
/// Narhwal sends payloads ready for broadcast to this receiver
rx_narwhal_batches: Option<mpsc::Receiver<(AuthenticStampedParcel, bool)>>,
/// To notify when consensus is shutting down
/// To notify when consensus is shutting down.
shutdown_notify: Arc<Notify>,
}

Expand Down Expand Up @@ -365,7 +367,8 @@ impl<C: Collection> WithStartAndShutdown for Consensus<C> {

/// Send the shutdown signal to the system.
async fn shutdown(&self) {
self.shutdown_notify.notify_waiters();
self.shutdown_notify.notify_one();
self.shutdown_notify_epoch_state.notify_one();
self.is_running.store(false, Ordering::Relaxed);
}
}
Expand Down Expand Up @@ -419,6 +422,7 @@ impl<C: Collection> ConsensusInterface<C> for Consensus<C> {
));

let shutdown_notify = Arc::new(Notify::new());
let shutdown_notify_epoch_state = Arc::new(Notify::new());

let epoch_state = EpochState::new(
signer.get_ed25519_pk(),
Expand All @@ -430,14 +434,15 @@ impl<C: Collection> ConsensusInterface<C> for Consensus<C> {
signer.get_socket(),
pubsub,
rx_narwhal_batches,
shutdown_notify.clone(),
shutdown_notify_epoch_state.clone(),
);

Ok(Self {
epoch_state: Mutex::new(Some(epoch_state)),
mempool_socket: TokioSpawn::spawn_async(forwarder),
reconfigure_notify,
shutdown_notify,
shutdown_notify_epoch_state,
is_running: AtomicBool::new(false),
})
}
Expand Down

0 comments on commit bdeb34b

Please sign in to comment.