Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Coupling block sync to DAG state #3386

Draft
wants to merge 5 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions node/bft/src/bft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,11 +521,16 @@ impl<N: Network> BFT<N> {
return Ok(());
}

/* Proceeding to commit the leader. */
info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
// Commit the leader certificate if the primary is not syncing.
if !IS_SYNCING {
/* Proceeding to commit the leader. */
info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
// Commit the leader certificate, and all previous leader certificates since the last committed round.
self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await?;
}

Ok(())

// Commit the leader certificate, and all previous leader certificates since the last committed round.
self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
}

/// Commits the leader certificate, and all previous leader certificates since the last committed round.
Expand Down Expand Up @@ -814,6 +819,8 @@ impl<N: Network> BFT<N> {
mut rx_primary_certificate,
mut rx_sync_bft_dag_at_bootup,
mut rx_sync_bft,
mut rx_commit_bft,
mut rx_is_recently_committed,
} = bft_receiver;

// Process the current round from the primary.
Expand Down Expand Up @@ -855,6 +862,30 @@ impl<N: Network> BFT<N> {
callback.send(result).ok();
}
});

// Process the request to commit the leader certificate.
let self_ = self.clone();
self.spawn(async move {
while let Some((certificate, callback)) = rx_commit_bft.recv().await {
// Update the DAG with the certificate.
let result = self_.commit_leader_certificate::<true, true>(certificate).await;
// Send the callback **after** updating the DAG.
// Note: We must await the DAG update before proceeding.
callback.send(result).ok();
}
});

// Process the request to check if the batch certificate was recently committed.
let self_ = self.clone();
self.spawn(async move {
while let Some(((round, certificate_id), callback)) = rx_is_recently_committed.recv().await {
// Check if the certificate was recently committed.
let is_committed = self_.dag.read().is_recently_committed(round, certificate_id);
Copy link
Collaborator

@ljedrz ljedrz Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like a lot of hassle just to perform this one check (in its own task with a dedicated channel), especially with both the BFT and Dag being clonable 🤔... it does make it async, but do we expect the recent_committed_ids collection to grow large enough to make it blocking?

// Send the callback **after** updating the DAG.
// Note: We must await the DAG update before proceeding.
callback.send(is_committed).ok();
}
});
}

/// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
Expand Down
32 changes: 29 additions & 3 deletions node/bft/src/helpers/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use snarkvm::{
narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
puzzle::{Solution, SolutionID},
},
prelude::Result,
prelude::{Field, Result},
};

use indexmap::IndexMap;
Expand Down Expand Up @@ -66,6 +66,8 @@ pub struct BFTSender<N: Network> {
pub tx_primary_certificate: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
pub tx_sync_bft_dag_at_bootup: mpsc::Sender<Vec<BatchCertificate<N>>>,
pub tx_sync_bft: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
pub tx_commit_bft: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
pub tx_is_recently_committed: mpsc::Sender<((u64, Field<N>), oneshot::Sender<bool>)>,
}

impl<N: Network> BFTSender<N> {
Expand Down Expand Up @@ -98,6 +100,26 @@ impl<N: Network> BFTSender<N> {
// Await the callback to continue.
callback_receiver.await?
}

/// Sends the leader certificate to the BFT to commit.
pub async fn send_commit_bft(&self, certificate: BatchCertificate<N>) -> Result<()> {
// Initialize a callback sender and receiver.
let (callback_sender, callback_receiver) = oneshot::channel();
// Send the certificate to the BFT.
self.tx_commit_bft.send((certificate, callback_sender)).await?;
// Await the callback to continue.
callback_receiver.await?
}

/// Sends the certificate round and ID to the BFT to receive a callback on whether the certificate was recently committed.
pub async fn send_sync_is_recently_committed(&self, round: u64, certificate_id: Field<N>) -> Result<bool> {
// Initialize a callback sender and receiver.
let (callback_sender, callback_receiver) = oneshot::channel();
// Send the round and certificate ID to the BFT.
self.tx_is_recently_committed.send(((round, certificate_id), callback_sender)).await?;
// Await the callback to continue.
Ok(callback_receiver.await?)
}
}

#[derive(Debug)]
Expand All @@ -106,6 +128,8 @@ pub struct BFTReceiver<N: Network> {
pub rx_primary_certificate: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
pub rx_sync_bft_dag_at_bootup: mpsc::Receiver<Vec<BatchCertificate<N>>>,
pub rx_sync_bft: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
pub rx_commit_bft: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
pub rx_is_recently_committed: mpsc::Receiver<((u64, Field<N>), oneshot::Sender<bool>)>,
}

/// Initializes the BFT channels.
Expand All @@ -114,9 +138,11 @@ pub fn init_bft_channels<N: Network>() -> (BFTSender<N>, BFTReceiver<N>) {
let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_sync_bft_dag_at_bootup, rx_sync_bft_dag_at_bootup) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_sync_bft, rx_sync_bft) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_commit_bft, rx_commit_bft) = mpsc::channel(MAX_CHANNEL_SIZE);
let (tx_is_recently_committed, rx_is_recently_committed) = mpsc::channel(MAX_CHANNEL_SIZE);

let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft };
let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft };
let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft, tx_commit_bft, tx_is_recently_committed };
let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft, rx_commit_bft, rx_is_recently_committed };

(sender, receiver)
}
Expand Down
35 changes: 34 additions & 1 deletion node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,46 @@ impl<N: Network> Sync<N> {
continue;
}

if let Authority::Quorum(subdag) = block.authority() {
// Retrieve the leader certificate of the subdag.
let leader_certificate = subdag.leader_certificate();
let leader_round = leader_certificate.round();
let leader_author = leader_certificate.author();
let leader_id = leader_certificate.id();

// If a BFT sender was provided, commit the leader certificate.
if let Some(bft_sender) = self.bft_sender.get() {
// Send the leader certificate to the BFT.
if let Err(e) = bft_sender.send_commit_bft(leader_certificate.clone()).await {
bail!("Sync - {e}");
};

// Ensure that leader certificate was recently committed in the DAG.
match bft_sender.send_sync_is_recently_committed(leader_round, leader_id).await {
Ok(is_recently_committed) => {
if !is_recently_committed {
bail!(
"Sync - Failed to advance blocks - leader certificate with author {leader_author} from round {leader_round} was not recently committed.",
);
}
debug!(
"Sync - Leader certificate with author {leader_author} from round {leader_round} was recently committed.",
);
Comment on lines +503 to +511
Copy link

@gluax gluax Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a nit but the boolean can be moved into the match statement you get a compile warning if missing a branch and this has better readability IMO.

i.e:

Ok(true) => {
  debug!("...");
}
Ok(false) => {
  bail!("...");
}
// rest of cases

}
Err(e) => {
bail!("Sync - Failed to check if leader certificate was recently committed - {e}");
}
};
}
}

// Advance the ledger state.
let self_ = self.clone();
tokio::task::spawn_blocking(move || {
// Check the next block.
self_.ledger.check_next_block(&block)?;
// Attempt to advance to the next block.
self_.ledger.advance_to_next_block(&block)?;

// Sync the height with the block.
self_.storage.sync_height_with_block(block.height());
// Sync the round with the block.
Expand Down