Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Summonerd: Implement phase1 contributions #3201

Merged
merged 2 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified crates/proto/src/gen/proto_descriptor.bin.no_lfs
Binary file not shown.
2 changes: 2 additions & 0 deletions tools/summonerd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rust-version = "1.65"
anyhow = "1"
ark-groth16 = "0.4"
ark-serialize = "0.4"
async-trait = "0.1.52"
atty = "0.2"
bytes = "1"
camino = "1"
Expand All @@ -35,6 +36,7 @@ penumbra-proof-setup = { path = "../../crates/crypto/proof-setup/" }
penumbra-proto = { path = "../../crates/proto/" }
penumbra-view = { path = "../../crates/view/" }
rand = "0.8"
rand_core = "0.6.4"
tokio = { version = "1.22", features = ["full"] }
tokio-stream = "0.1"
tonic = "0.10"
Expand Down
49 changes: 24 additions & 25 deletions tools/summonerd/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use rand::rngs::OsRng;
use tokio::sync::mpsc::{self};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};

use crate::{participant::Participant, storage::Storage};

/// Wait time of 10 minutes
const CONTRIBUTION_TIME_SECS: u64 = 10 * 60;
use crate::{participant::Participant, phase::Phase, storage::Storage};

struct ContributionHandler {
storage: Storage,
Expand Down Expand Up @@ -40,7 +37,7 @@ impl ContributionHandler {
}

#[tracing::instrument(skip(self))]
pub async fn run(mut self) -> Result<()> {
pub async fn run<P: Phase>(mut self) -> Result<()> {
loop {
tracing::debug!("start of contribution handler loop");
let (who, participant) = match self.start_contribution_rx.recv().await {
Expand All @@ -51,16 +48,20 @@ impl ContributionHandler {
Some((w, p)) => (w, p),
};
tracing::debug!(?who, "waiting for contribution");
self.contribute(who, participant).await?;
self.contribute::<P>(who, participant).await?;
self.done_contribution_tx.send(()).await?;
}
}

#[tracing::instrument(skip(self, participant))]
async fn contribute(&mut self, contributor: Address, participant: Participant) -> Result<()> {
async fn contribute<P: Phase>(
&mut self,
contributor: Address,
participant: Participant,
) -> Result<()> {
match tokio::time::timeout(
Duration::from_secs(CONTRIBUTION_TIME_SECS),
self.contribute_inner(contributor, participant),
Duration::from_secs(P::CONTRIBUTION_TIME_SECS),
self.contribute_inner::<P>(contributor, participant),
)
.await
{
Expand All @@ -74,28 +75,26 @@ impl ContributionHandler {
}

#[tracing::instrument(skip(self, participant))]
async fn contribute_inner(
async fn contribute_inner<P: Phase>(
&mut self,
contributor: Address,
mut participant: Participant,
) -> Result<()> {
let parent = self
.storage
.phase2_current_crs()
let parent = P::current_crs(&self.storage)
.await?
.expect("phase2 should've been initialized by now");
let maybe = participant.contribute(&parent).await?;
.expect("the phase should've been initialized by now");
let maybe = participant.contribute::<P>(&parent).await?;
if let Some(unvalidated) = maybe {
tracing::debug!("validating contribution");
if let Some(contribution) =
unvalidated.validate(&mut OsRng, &self.storage.phase_2_root().await?)
{
if contribution.is_linked_to(&parent) {
self.storage
.commit_contribution(contributor, contribution)
.await?;
if let Some(contribution) = P::validate(
&mut OsRng,
&P::fetch_root(&self.storage).await?,
unvalidated,
) {
if P::is_linked_to(&contribution, &parent) {
P::commit_contribution(&self.storage, contributor, contribution).await?;
participant
.confirm(self.storage.current_slot().await?)
.confirm(self.storage.current_slot(P::MARKER).await?)
.await?;
return Ok(());
}
Expand Down Expand Up @@ -181,15 +180,15 @@ impl Coordinator {
)
}

pub async fn run(mut self) -> Result<()> {
pub async fn run<P: Phase + 'static>(mut self) -> Result<()> {
enum Event {
NewParticipant(Participant, Amount),
ContributionDone,
}

let (contribution_handler, start_contribution_tx, done_contribution_rx) =
ContributionHandler::new(self.storage);
tokio::spawn(contribution_handler.run());
tokio::spawn(contribution_handler.run::<P>());
// Merge the events from both being notified of new participants, and of completed
// contributions.
let mut stream = ReceiverStream::new(self.new_participant_rx)
Expand Down
23 changes: 20 additions & 3 deletions tools/summonerd/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod coordinator;
mod participant;
mod penumbra_knower;
mod phase;
mod server;
mod storage;

Expand Down Expand Up @@ -32,6 +33,9 @@ use tonic::transport::Server;
use tracing_subscriber::{prelude::*, EnvFilter};
use url::Url;

use crate::phase::Phase1;
use crate::phase::Phase2;
use crate::phase::PhaseMarker;
use crate::{penumbra_knower::PenumbraKnower, server::CoordinatorService};
use penumbra_proof_setup::all::{Phase1CeremonyCRS, Phase1RawCeremonyCRS};

Expand Down Expand Up @@ -79,6 +83,8 @@ enum Command {
},
/// Start the coordinator.
Start {
#[clap(long, display_order = 100)]
phase: u8,
#[clap(long, display_order = 700)]
storage_dir: Utf8PathBuf,
#[clap(long, display_order = 800)]
Expand Down Expand Up @@ -107,22 +113,33 @@ impl Opt {
Ok(())
}
Command::Start {
phase,
storage_dir,
fvk,
node,
listen,
} => {
let marker = match phase {
1 => PhaseMarker::P1,
2 => PhaseMarker::P2,
_ => anyhow::bail!("Phase must be 1 or 2."),
};
let storage = Storage::load_or_initialize(ceremony_db(&storage_dir)).await?;
// Check if we've transitioned, for a nice error message
if storage.transition_extra_information().await?.is_none() {
if marker == PhaseMarker::P2
&& storage.transition_extra_information().await?.is_none()
{
anyhow::bail!("Please run the transition command before this command 8^)");
}
let knower =
PenumbraKnower::load_or_initialize(storage_dir.join("penumbra.db"), &fvk, node)
.await?;
let (coordinator, participant_tx) = Coordinator::new(storage.clone());
let coordinator_handle = tokio::spawn(coordinator.run());
let service = CoordinatorService::new(knower, storage, participant_tx);
let coordinator_handle = match marker {
PhaseMarker::P1 => tokio::spawn(coordinator.run::<Phase1>()),
PhaseMarker::P2 => tokio::spawn(coordinator.run::<Phase2>()),
};
let service = CoordinatorService::new(knower, storage, participant_tx, marker);
let grpc_server =
Server::builder()
.accept_http1(true)
Expand Down
13 changes: 7 additions & 6 deletions tools/summonerd/src/participant.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use anyhow::{Context, Result};
use penumbra_keys::Address;
use penumbra_num::Amount;
use penumbra_proof_setup::all::{Phase2CeremonyCRS, Phase2RawCeremonyContribution};
use penumbra_proto::{
penumbra::tools::summoning::v1alpha1::{
self as pb,
Expand All @@ -15,6 +14,8 @@ use penumbra_proto::{
use tokio::sync::mpsc;
use tonic::{Status, Streaming};

use crate::phase::Phase;

pub struct Participant {
address: Address,
rx: Streaming<pb::ParticipateRequest>,
Expand Down Expand Up @@ -63,14 +64,14 @@ impl Participant {
}

#[tracing::instrument(skip(self, parent))]
pub async fn contribute(
pub async fn contribute<P: Phase>(
&mut self,
parent: &Phase2CeremonyCRS,
) -> Result<Option<Phase2RawCeremonyContribution>> {
parent: &P::CRS,
) -> Result<Option<P::RawContribution>> {
self.tx
.send(Ok(ParticipateResponse {
msg: Some(ResponseMsg::ContributeNow(ContributeNow {
parent: Some(parent.clone().try_into()?),
parent: Some(P::serialize_crs(parent.clone())?),
})),
}))
.await?;
Expand All @@ -79,7 +80,7 @@ impl Participant {
msg: Some(RequestMsg::Contribution(contribution)),
}) = msg
{
Ok(Some(Phase2RawCeremonyContribution::try_from(contribution)?))
Ok(Some(P::deserialize_contribution(contribution)?))
} else {
Ok(None)
}
Expand Down
Loading
Loading