Skip to content

Commit

Permalink
feat: instrument code with tracing spans (#178)
Browse files Browse the repository at this point in the history
## What ❔

This PR instruments all interesting* places within the system.

**\*** "interesting" here is very subjective and I tried to use my
understanding of the code to judge what should be covered and what
shouldn't. Feel free to give feedback

## Why ❔

Better observability
  • Loading branch information
itegulov authored Aug 8, 2024
1 parent 635d3b7 commit e9f8dfe
Show file tree
Hide file tree
Showing 14 changed files with 264 additions and 149 deletions.
41 changes: 26 additions & 15 deletions node/actors/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::io::{InputMessage, OutputMessage};
use anyhow::Context;
pub use config::Config;
use std::sync::Arc;
use tracing::Instrument;
use zksync_concurrency::{ctx, error::Wrap as _, oneshot, scope};
use zksync_consensus_network::io::ConsensusReq;
use zksync_consensus_roles::validator;
Expand Down Expand Up @@ -90,24 +91,34 @@ impl Config {
// This is the infinite loop where the consensus actually runs. The validator waits for either
// a message from the network or for a timeout, and processes each accordingly.
loop {
let InputMessage::Network(req) = pipe.recv.recv(ctx).await?;
use validator::ConsensusMsg as M;
match &req.msg.msg {
M::ReplicaPrepare(_) => {
// This is a hacky way to do a clone. This is necessary since we don't want to derive
// Clone for ConsensusReq. When we change to ChonkyBFT this will be removed anyway.
let (ack, _) = oneshot::channel();
let new_req = ConsensusReq {
msg: req.msg.clone(),
ack,
};
async {
let InputMessage::Network(req) = pipe
.recv
.recv(ctx)
.instrument(tracing::info_span!("wait_for_message"))
.await?;
use validator::ConsensusMsg as M;
match &req.msg.msg {
M::ReplicaPrepare(_) => {
// This is a hacky way to do a clone. This is necessary since we don't want to derive
// Clone for ConsensusReq. When we change to ChonkyBFT this will be removed anyway.
let (ack, _) = oneshot::channel();
let new_req = ConsensusReq {
msg: req.msg.clone(),
ack,
};

replica_send.send(new_req);
leader_send.send(req);
replica_send.send(new_req);
leader_send.send(req);
}
M::ReplicaCommit(_) => leader_send.send(req),
M::LeaderPrepare(_) | M::LeaderCommit(_) => replica_send.send(req),
}
M::ReplicaCommit(_) => leader_send.send(req),
M::LeaderPrepare(_) | M::LeaderCommit(_) => replica_send.send(req),

ctx::Ok(())
}
.instrument(tracing::info_span!("bft_iter"))
.await?;
}
})
.await;
Expand Down
75 changes: 45 additions & 30 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::Attester;
use anyhow::Context;
use std::sync::Arc;
use tracing::Instrument;
use zksync_concurrency::{ctx, sync, time};
use zksync_consensus_network::gossip::{
AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher,
Expand Down Expand Up @@ -58,56 +59,70 @@ impl AttesterRunner {
self.status.mark_changed();

loop {
let Some(batch_number) = sync::changed(ctx, &mut self.status)
.await?
.next_batch_to_attest
else {
continue;
};

tracing::info!(%batch_number, "attestation status");

// We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence
// to be indicated in memory (which itself relies on polling). This happens once we have the commitment,
// which for nodes that get the blocks through BFT should happen after execution. Nodes which
// rely on batch sync don't participate in attestations as they need the batch on L1 first.
self.batch_store
.wait_until_persisted(ctx, batch_number)
async {
let Some(batch_number) = sync::changed(ctx, &mut self.status)
.instrument(tracing::info_span!("wait_for_attestation_status"))
.await?
.next_batch_to_attest
else {
return Ok(());
};

tracing::info!(%batch_number, "attestation status");

// We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence
// to be indicated in memory (which itself relies on polling). This happens once we have the commitment,
// which for nodes that get the blocks through BFT should happen after execution. Nodes which
// rely on batch sync don't participate in attestations as they need the batch on L1 first.
self.batch_store
.wait_until_persisted(ctx, batch_number)
.await?;

// Try to get the next batch to sign; the commitment might not be available just yet.
let batch = AttesterRunner::wait_for_batch_to_sign(
ctx,
batch_number,
&self.batch_store,
self.poll_interval,
)
.await?;

// Try to get the next batch to sign; the commitment might not be available just yet.
let batch = self.wait_for_batch_to_sign(ctx, batch_number).await?;
// The certificates might be collected out of order because of how gossip works;
// we could query the DB to see if we already have a QC, or we can just go ahead
// and publish our vote, and let others ignore it.

// The certificates might be collected out of order because of how gossip works;
// we could query the DB to see if we already have a QC, or we can just go ahead
// and publish our vote, and let others ignore it.
tracing::info!(%batch_number, "publishing attestation");

tracing::info!(%batch_number, "publishing attestation");
// We only have to publish a vote once; future peers can pull it from the register.
self.publisher
.publish(attesters, &genesis, &self.attester.key, batch)
.await
.context("publish")?;

// We only have to publish a vote once; future peers can pull it from the register.
self.publisher
.publish(attesters, &genesis, &self.attester.key, batch)
.await
.context("publish")?;
ctx::Ok(())
}
.instrument(tracing::info_span!("attestation_iter"))
.await?;
}
}

/// Wait for the batch commitment to become available.
#[tracing::instrument(skip_all, fields(l1_batch = %number))]
async fn wait_for_batch_to_sign(
&self,
ctx: &ctx::Ctx,
number: attester::BatchNumber,
batch_store: &BatchStore,
poll_interval: time::Duration,
) -> ctx::Result<attester::Batch> {
loop {
if let Some(batch) = self
.batch_store
if let Some(batch) = batch_store
.batch_to_sign(ctx, number)
.await
.context("batch_to_sign")?
{
return Ok(batch);
} else {
ctx.sleep(self.poll_interval).await?;
ctx.sleep(poll_interval).await?;
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/actors/network/src/consensus/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub(super) enum Error {
Stream(#[from] ctx::Error),
}

#[tracing::instrument(name = "handshake::outbound", skip_all)]
pub(super) async fn outbound(
ctx: &ctx::Ctx,
me: &validator::SecretKey,
Expand Down Expand Up @@ -93,6 +94,7 @@ pub(super) async fn outbound(
Ok(())
}

#[tracing::instrument(name = "handshake::inbound", skip_all)]
pub(super) async fn inbound(
ctx: &ctx::Ctx,
me: &validator::SecretKey,
Expand Down
40 changes: 21 additions & 19 deletions node/actors/network/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl Network {

/// Performs handshake of an inbound stream.
/// Closes the stream if there is another inbound stream opened from the same validator.
#[tracing::instrument(level = "info", name = "consensus", skip_all)]
#[tracing::instrument(name = "consensus::run_inbound_stream", skip_all)]
pub(crate) async fn run_inbound_stream(
&self,
ctx: &ctx::Ctx,
Expand Down Expand Up @@ -195,7 +195,7 @@ impl Network {
res
}

#[tracing::instrument(level = "info", name = "consensus", skip_all)]
#[tracing::instrument(name = "consensus::run_outbound_stream", skip_all, fields(?peer, %addr))]
async fn run_outbound_stream(
&self,
ctx: &ctx::Ctx,
Expand Down Expand Up @@ -282,6 +282,7 @@ impl Network {
res
}

#[tracing::instrument(skip_all)]
async fn run_loopback_stream(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let addr = *self
.gossip
Expand All @@ -295,7 +296,6 @@ impl Network {
format!("{:?} resolved to no addresses", self.gossip.cfg.public_addr)
})?;
self.run_outbound_stream(ctx, &self.key.public(), addr)
.instrument(tracing::info_span!("loopback", ?addr))
.await
}

Expand All @@ -314,24 +314,26 @@ impl Network {
}
let addrs = &mut self.gossip.validator_addrs.subscribe();
let mut addr = None;

while ctx.is_active() {
// Wait for a new address, or retry with the old one after timeout.
if let Ok(new) =
sync::wait_for(&ctx.with_timeout(config::CONNECT_RETRY), addrs, |addrs| {
addrs.get(peer).map(|x| x.msg.addr) != addr
})
.await
{
addr = new.get(peer).map(|x| x.msg.addr);
}
let Some(addr) = addr else { continue };
if let Err(err) = self
.run_outbound_stream(ctx, peer, addr)
.instrument(tracing::info_span!("out", ?addr))
.await
{
tracing::info!("run_outbound_stream({peer:?},{addr}): {err:#}");
async {
// Wait for a new address, or retry with the old one after timeout.
if let Ok(new) =
sync::wait_for(&ctx.with_timeout(config::CONNECT_RETRY), addrs, |addrs| {
addrs.get(peer).map(|x| x.msg.addr) != addr
})
.instrument(tracing::info_span!("wait_for_address"))
.await
{
addr = new.get(peer).map(|x| x.msg.addr);
}
let Some(addr) = addr else { return };
if let Err(err) = self.run_outbound_stream(ctx, peer, addr).await {
tracing::info!("run_outbound_stream({peer:?},{addr}): {err:#}");
}
}
.instrument(tracing::info_span!("maintain_connection_iter"))
.await;
}
}
}
2 changes: 2 additions & 0 deletions node/actors/network/src/gossip/batch_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ impl BatchVotesWatch {
}

/// Set the minimum batch number on the votes and discard old data.
#[tracing::instrument(skip_all, fields(%min_batch_number))]
pub(crate) async fn set_min_batch_number(&self, min_batch_number: attester::BatchNumber) {
let this = self.0.lock().await;
this.send_modify(|votes| votes.set_min_batch_number(min_batch_number));
Expand All @@ -308,6 +309,7 @@ impl fmt::Debug for BatchVotesPublisher {

impl BatchVotesPublisher {
/// Sign an L1 batch and push it into the batch, which should cause it to be gossiped by the network.
#[tracing::instrument(skip_all, fields(l1_batch = %batch.number))]
pub async fn publish(
&self,
attesters: &attester::Committee,
Expand Down
Loading

0 comments on commit e9f8dfe

Please sign in to comment.