Skip to content

Commit

Permalink
summonerd: clean up tracing for coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
hdevalence committed Oct 31, 2023
1 parent 6ae3445 commit 01c2c3b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 8 deletions.
14 changes: 9 additions & 5 deletions tools/summonerd/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ impl Coordinator {
pub async fn run<P: Phase + 'static>(mut self) -> Result<()> {
loop {
let participant_count = self.queue.len().await;
tracing::debug!(
tracing::info!(
participant_count = participant_count,
"top of coordinator loop"
"starting ceremony round"
);
// 1. Inform all participants of their position
self.queue.inform_all().await?;
Expand All @@ -33,12 +33,16 @@ impl Coordinator {
}
tokio::time::sleep(Duration::from_secs(QUEUE_SLEEP_TIME_SECS)).await;
};
tracing::info!(
address = ?contributor.address().display_short_form(),
"requesting contribution from participant"
);
// 3. Get their contribution, or time out.
self.contribute::<P>(contributor).await?;
}
}

#[tracing::instrument(skip_all, fields(address = ?contributor.address()))]
#[tracing::instrument(skip_all, fields(address = ?contributor.address().display_short_form()))]
async fn contribute<P: Phase>(&mut self, contributor: Participant) -> Result<()> {
let address = contributor.address();
match tokio::time::timeout(
Expand All @@ -57,15 +61,14 @@ impl Coordinator {
}
}

#[tracing::instrument(skip_all)]
async fn contribute_inner<P: Phase>(&mut self, mut contributor: Participant) -> Result<()> {
let address = contributor.address();
let parent = P::current_crs(&self.storage)
.await?
.expect("the phase should've been initialized by now");
let maybe = contributor.contribute::<P>(&parent).await?;
if let Some(unvalidated) = maybe {
tracing::debug!("validating contribution");
tracing::info!("validating contribution");
let root = P::fetch_root(&self.storage).await?;
let maybe_contribution = tokio::task::spawn_blocking(move || {
if let Some(contribution) = P::validate(&root, unvalidated) {
Expand All @@ -76,6 +79,7 @@ impl Coordinator {
None
})
.await?;
tracing::info!("saving contribution");
if let Some(contribution) = maybe_contribution {
P::commit_contribution(&self.storage, address, contribution).await?;
contributor
Expand Down
10 changes: 8 additions & 2 deletions tools/summonerd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use std::io::Read;
use std::net::SocketAddr;
use storage::Storage;
use tonic::transport::Server;
use tracing::Instrument;
use tracing_subscriber::{prelude::*, EnvFilter};
use url::Url;

Expand Down Expand Up @@ -175,9 +176,14 @@ impl Opt {
.await?;
let queue = ParticipantQueue::new();
let coordinator = Coordinator::new(storage.clone(), queue.clone());
let coordinator_span = tracing::error_span!("coordinator");
let coordinator_handle = match marker {
PhaseMarker::P1 => tokio::spawn(coordinator.run::<Phase1>()),
PhaseMarker::P2 => tokio::spawn(coordinator.run::<Phase2>()),
PhaseMarker::P1 => {
tokio::spawn(coordinator.run::<Phase1>().instrument(coordinator_span))
}
PhaseMarker::P2 => {
tokio::spawn(coordinator.run::<Phase2>().instrument(coordinator_span))
}
};
let service = CoordinatorService::new(knower, storage.clone(), queue, marker);
let grpc_server = Server::builder().add_service(
Expand Down
3 changes: 2 additions & 1 deletion tools/summonerd/src/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ impl Participant {
})
}

#[tracing::instrument(skip(self, parent))]
pub async fn contribute<P: Phase>(
&mut self,
parent: &P::CRS,
) -> Result<Option<P::RawContribution>> {
tracing::info!("sending ContributeNow message to participant");
self.tx
.send(Ok(ParticipateResponse {
msg: Some(ResponseMsg::ContributeNow(ContributeNow {
Expand All @@ -89,6 +89,7 @@ impl Participant {
msg: Some(RequestMsg::Contribution(contribution)),
}) = msg
{
tracing::info!("got Contribution message from participant, deserializing...");
let deserialized =
tokio::task::spawn_blocking(move || P::deserialize_contribution(contribution))
.await??;
Expand Down

0 comments on commit 01c2c3b

Please sign in to comment.