Skip to content

Commit

Permalink
Summonerd: Implement phase1 contributions
Browse files Browse the repository at this point in the history
  • Loading branch information
cronokirby committed Oct 17, 2023
1 parent 475e748 commit 640237e
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 53 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions tools/summonerd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rust-version = "1.65"
anyhow = "1"
ark-groth16 = "0.4"
ark-serialize = "0.4"
async-trait = "0.1.52"
atty = "0.2"
bytes = "1"
camino = "1"
Expand All @@ -35,6 +36,7 @@ penumbra-proof-setup = { path = "../../crates/crypto/proof-setup/" }
penumbra-proto = { path = "../../crates/proto/" }
penumbra-view = { path = "../../crates/view/" }
rand = "0.8"
rand_core = "0.6.4"
tokio = { version = "1.22", features = ["full"] }
tokio-stream = "0.1"
tonic = "0.10"
Expand Down
49 changes: 24 additions & 25 deletions tools/summonerd/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use rand::rngs::OsRng;
use tokio::sync::mpsc::{self};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};

use crate::{participant::Participant, storage::Storage};

/// Wait time of 10 minutes
const CONTRIBUTION_TIME_SECS: u64 = 10 * 60;
use crate::{participant::Participant, phase::Phase, storage::Storage};

struct ContributionHandler {
storage: Storage,
Expand Down Expand Up @@ -40,7 +37,7 @@ impl ContributionHandler {
}

#[tracing::instrument(skip(self))]
pub async fn run(mut self) -> Result<()> {
pub async fn run<P: Phase>(mut self) -> Result<()> {
loop {
tracing::debug!("start of contribution handler loop");
let (who, participant) = match self.start_contribution_rx.recv().await {
Expand All @@ -51,16 +48,20 @@ impl ContributionHandler {
Some((w, p)) => (w, p),
};
tracing::debug!(?who, "waiting for contribution");
self.contribute(who, participant).await?;
self.contribute::<P>(who, participant).await?;
self.done_contribution_tx.send(()).await?;
}
}

#[tracing::instrument(skip(self, participant))]
async fn contribute(&mut self, contributor: Address, participant: Participant) -> Result<()> {
async fn contribute<P: Phase>(
&mut self,
contributor: Address,
participant: Participant,
) -> Result<()> {
match tokio::time::timeout(
Duration::from_secs(CONTRIBUTION_TIME_SECS),
self.contribute_inner(contributor, participant),
Duration::from_secs(P::CONTRIBUTION_TIME_SECS),
self.contribute_inner::<P>(contributor, participant),
)
.await
{
Expand All @@ -74,28 +75,26 @@ impl ContributionHandler {
}

#[tracing::instrument(skip(self, participant))]
async fn contribute_inner(
async fn contribute_inner<P: Phase>(
&mut self,
contributor: Address,
mut participant: Participant,
) -> Result<()> {
let parent = self
.storage
.phase2_current_crs()
let parent = P::current_crs(&self.storage)
.await?
.expect("phase2 should've been initialized by now");
let maybe = participant.contribute(&parent).await?;
.expect("the phase should've been initialized by now");
let maybe = participant.contribute::<P>(&parent).await?;
if let Some(unvalidated) = maybe {
tracing::debug!("validating contribution");
if let Some(contribution) =
unvalidated.validate(&mut OsRng, &self.storage.phase_2_root().await?)
{
if contribution.is_linked_to(&parent) {
self.storage
.commit_contribution(contributor, contribution)
.await?;
if let Some(contribution) = P::validate(
&mut OsRng,
&P::fetch_root(&self.storage).await?,
unvalidated,
) {
if P::is_linked_to(&contribution, &parent) {
P::commit_contribution(&self.storage, contributor, contribution).await?;
participant
.confirm(self.storage.current_slot().await?)
.confirm(self.storage.current_slot(P::MARKER).await?)
.await?;
return Ok(());
}
Expand Down Expand Up @@ -181,15 +180,15 @@ impl Coordinator {
)
}

pub async fn run(mut self) -> Result<()> {
pub async fn run<P: Phase + 'static>(mut self) -> Result<()> {
enum Event {
NewParticipant(Participant, Amount),
ContributionDone,
}

let (contribution_handler, start_contribution_tx, done_contribution_rx) =
ContributionHandler::new(self.storage);
tokio::spawn(contribution_handler.run());
tokio::spawn(contribution_handler.run::<P>());
// Merge the events from both being notified of new participants, and of completed
// contributions.
let mut stream = ReceiverStream::new(self.new_participant_rx)
Expand Down
23 changes: 20 additions & 3 deletions tools/summonerd/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod coordinator;
mod participant;
mod penumbra_knower;
mod phase;
mod server;
mod storage;

Expand Down Expand Up @@ -32,6 +33,9 @@ use tonic::transport::Server;
use tracing_subscriber::{prelude::*, EnvFilter};
use url::Url;

use crate::phase::Phase1;
use crate::phase::Phase2;
use crate::phase::PhaseMarker;
use crate::{penumbra_knower::PenumbraKnower, server::CoordinatorService};
use penumbra_proof_setup::all::{Phase1CeremonyCRS, Phase1RawCeremonyCRS};

Expand Down Expand Up @@ -79,6 +83,8 @@ enum Command {
},
/// Start the coordinator.
Start {
#[clap(long, display_order = 100)]
phase: u8,
#[clap(long, display_order = 700)]
storage_dir: Utf8PathBuf,
#[clap(long, display_order = 800)]
Expand Down Expand Up @@ -107,22 +113,33 @@ impl Opt {
Ok(())
}
Command::Start {
phase,
storage_dir,
fvk,
node,
listen,
} => {
let marker = match phase {
1 => PhaseMarker::P1,
2 => PhaseMarker::P2,
_ => anyhow::bail!("Phase must be 1 or 2."),
};
let storage = Storage::load_or_initialize(ceremony_db(&storage_dir)).await?;
// Check if we've transitioned, for a nice error message
if storage.transition_extra_information().await?.is_none() {
if marker == PhaseMarker::P2
&& storage.transition_extra_information().await?.is_none()
{
anyhow::bail!("Please run the transition command before this command 8^)");
}
let knower =
PenumbraKnower::load_or_initialize(storage_dir.join("penumbra.db"), &fvk, node)
.await?;
let (coordinator, participant_tx) = Coordinator::new(storage.clone());
let coordinator_handle = tokio::spawn(coordinator.run());
let service = CoordinatorService::new(knower, storage, participant_tx);
let coordinator_handle = match marker {
PhaseMarker::P1 => tokio::spawn(coordinator.run::<Phase1>()),
PhaseMarker::P2 => tokio::spawn(coordinator.run::<Phase2>()),
};
let service = CoordinatorService::new(knower, storage, participant_tx, marker);
let grpc_server =
Server::builder()
.accept_http1(true)
Expand Down
13 changes: 7 additions & 6 deletions tools/summonerd/src/participant.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use anyhow::{Context, Result};
use penumbra_keys::Address;
use penumbra_num::Amount;
use penumbra_proof_setup::all::{Phase2CeremonyCRS, Phase2RawCeremonyContribution};
use penumbra_proto::{
penumbra::tools::summoning::v1alpha1::{
self as pb,
Expand All @@ -15,6 +14,8 @@ use penumbra_proto::{
use tokio::sync::mpsc;
use tonic::{Status, Streaming};

use crate::phase::Phase;

pub struct Participant {
address: Address,
rx: Streaming<pb::ParticipateRequest>,
Expand Down Expand Up @@ -63,14 +64,14 @@ impl Participant {
}

#[tracing::instrument(skip(self, parent))]
pub async fn contribute(
pub async fn contribute<P: Phase>(
&mut self,
parent: &Phase2CeremonyCRS,
) -> Result<Option<Phase2RawCeremonyContribution>> {
parent: &P::CRS,
) -> Result<Option<P::RawContribution>> {
self.tx
.send(Ok(ParticipateResponse {
msg: Some(ResponseMsg::ContributeNow(ContributeNow {
parent: Some(parent.clone().try_into()?),
parent: Some(P::serialize_crs(parent.clone())?),
})),
}))
.await?;
Expand All @@ -79,7 +80,7 @@ impl Participant {
msg: Some(RequestMsg::Contribution(contribution)),
}) = msg
{
Ok(Some(Phase2RawCeremonyContribution::try_from(contribution)?))
Ok(Some(P::deserialize_contribution(contribution)?))
} else {
Ok(None)
}
Expand Down
Loading

0 comments on commit 640237e

Please sign in to comment.