From 640237e414f9fc0798d2cbadcc278db0dc52f5c4 Mon Sep 17 00:00:00 2001 From: Lucas Meier Date: Tue, 17 Oct 2023 00:29:40 -0700 Subject: [PATCH] Summonerd: Implement phase1 contributions --- Cargo.lock | 2 + tools/summonerd/Cargo.toml | 2 + tools/summonerd/src/coordinator.rs | 49 ++++---- tools/summonerd/src/main.rs | 23 +++- tools/summonerd/src/participant.rs | 13 ++- tools/summonerd/src/phase.rs | 173 +++++++++++++++++++++++++++++ tools/summonerd/src/server.rs | 6 +- tools/summonerd/src/storage.rs | 58 +++++++--- 8 files changed, 273 insertions(+), 53 deletions(-) create mode 100644 tools/summonerd/src/phase.rs diff --git a/Cargo.lock b/Cargo.lock index 8f6d773805..3721a00f51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7545,6 +7545,7 @@ dependencies = [ "anyhow", "ark-groth16", "ark-serialize", + "async-trait", "atty", "bytes", "camino", @@ -7564,6 +7565,7 @@ dependencies = [ "r2d2", "r2d2_sqlite", "rand 0.8.5", + "rand_core 0.6.4", "tokio", "tokio-stream", "tonic", diff --git a/tools/summonerd/Cargo.toml b/tools/summonerd/Cargo.toml index e3ad90cb78..e5e12f11bc 100644 --- a/tools/summonerd/Cargo.toml +++ b/tools/summonerd/Cargo.toml @@ -18,6 +18,7 @@ rust-version = "1.65" anyhow = "1" ark-groth16 = "0.4" ark-serialize = "0.4" +async-trait = "0.1.52" atty = "0.2" bytes = "1" camino = "1" @@ -35,6 +36,7 @@ penumbra-proof-setup = { path = "../../crates/crypto/proof-setup/" } penumbra-proto = { path = "../../crates/proto/" } penumbra-view = { path = "../../crates/view/" } rand = "0.8" +rand_core = "0.6.4" tokio = { version = "1.22", features = ["full"] } tokio-stream = "0.1" tonic = "0.10" diff --git a/tools/summonerd/src/coordinator.rs b/tools/summonerd/src/coordinator.rs index fb453befd4..f26d15c98b 100644 --- a/tools/summonerd/src/coordinator.rs +++ b/tools/summonerd/src/coordinator.rs @@ -7,10 +7,7 @@ use rand::rngs::OsRng; use tokio::sync::mpsc::{self}; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; -use crate::{participant::Participant, storage::Storage}; - -/// Wait time of 10 minutes -const CONTRIBUTION_TIME_SECS: u64 = 10 * 60; +use crate::{participant::Participant, phase::Phase, storage::Storage}; struct ContributionHandler { storage: Storage, @@ -40,7 +37,7 @@ impl ContributionHandler { } #[tracing::instrument(skip(self))] - pub async fn run(mut self) -> Result<()> { + pub async fn run(mut self) -> Result<()> { loop { tracing::debug!("start of contribution handler loop"); let (who, participant) = match self.start_contribution_rx.recv().await { @@ -51,16 +48,20 @@ impl ContributionHandler { Some((w, p)) => (w, p), }; tracing::debug!(?who, "waiting for contribution"); - self.contribute(who, participant).await?; + self.contribute::

(who, participant).await?; self.done_contribution_tx.send(()).await?; } } #[tracing::instrument(skip(self, participant))] - async fn contribute(&mut self, contributor: Address, participant: Participant) -> Result<()> { + async fn contribute( + &mut self, + contributor: Address, + participant: Participant, + ) -> Result<()> { match tokio::time::timeout( - Duration::from_secs(CONTRIBUTION_TIME_SECS), - self.contribute_inner(contributor, participant), + Duration::from_secs(P::CONTRIBUTION_TIME_SECS), + self.contribute_inner::

(contributor, participant), ) .await { @@ -74,28 +75,26 @@ impl ContributionHandler { } #[tracing::instrument(skip(self, participant))] - async fn contribute_inner( + async fn contribute_inner( &mut self, contributor: Address, mut participant: Participant, ) -> Result<()> { - let parent = self - .storage - .phase2_current_crs() + let parent = P::current_crs(&self.storage) .await? - .expect("phase2 should've been initialized by now"); - let maybe = participant.contribute(&parent).await?; + .expect("the phase should've been initialized by now"); + let maybe = participant.contribute::

(&parent).await?; if let Some(unvalidated) = maybe { tracing::debug!("validating contribution"); - if let Some(contribution) = - unvalidated.validate(&mut OsRng, &self.storage.phase_2_root().await?) - { - if contribution.is_linked_to(&parent) { - self.storage - .commit_contribution(contributor, contribution) - .await?; + if let Some(contribution) = P::validate( + &mut OsRng, + &P::fetch_root(&self.storage).await?, + unvalidated, + ) { + if P::is_linked_to(&contribution, &parent) { + P::commit_contribution(&self.storage, contributor, contribution).await?; participant - .confirm(self.storage.current_slot().await?) + .confirm(self.storage.current_slot(P::MARKER).await?) .await?; return Ok(()); } @@ -181,7 +180,7 @@ impl Coordinator { ) } - pub async fn run(mut self) -> Result<()> { + pub async fn run(mut self) -> Result<()> { enum Event { NewParticipant(Participant, Amount), ContributionDone, @@ -189,7 +188,7 @@ impl Coordinator { let (contribution_handler, start_contribution_tx, done_contribution_rx) = ContributionHandler::new(self.storage); - tokio::spawn(contribution_handler.run()); + tokio::spawn(contribution_handler.run::

()); // Merge the events from both being notified of new participants, and of completed // contributions. let mut stream = ReceiverStream::new(self.new_participant_rx) diff --git a/tools/summonerd/src/main.rs b/tools/summonerd/src/main.rs index e15878d4f7..22a23da09b 100644 --- a/tools/summonerd/src/main.rs +++ b/tools/summonerd/src/main.rs @@ -1,6 +1,7 @@ mod coordinator; mod participant; mod penumbra_knower; +mod phase; mod server; mod storage; @@ -32,6 +33,9 @@ use tonic::transport::Server; use tracing_subscriber::{prelude::*, EnvFilter}; use url::Url; +use crate::phase::Phase1; +use crate::phase::Phase2; +use crate::phase::PhaseMarker; use crate::{penumbra_knower::PenumbraKnower, server::CoordinatorService}; use penumbra_proof_setup::all::{Phase1CeremonyCRS, Phase1RawCeremonyCRS}; @@ -79,6 +83,8 @@ enum Command { }, /// Start the coordinator. Start { + #[clap(long, display_order = 100)] + phase: u8, #[clap(long, display_order = 700)] storage_dir: Utf8PathBuf, #[clap(long, display_order = 800)] @@ -107,22 +113,33 @@ impl Opt { Ok(()) } Command::Start { + phase, storage_dir, fvk, node, listen, } => { + let marker = match phase { + 1 => PhaseMarker::P1, + 2 => PhaseMarker::P2, + _ => anyhow::bail!("Phase must be 1 or 2."), + }; let storage = Storage::load_or_initialize(ceremony_db(&storage_dir)).await?; // Check if we've transitioned, for a nice error message - if storage.transition_extra_information().await?.is_none() { + if marker == PhaseMarker::P2 + && storage.transition_extra_information().await?.is_none() + { anyhow::bail!("Please run the transition command before this command 8^)"); } let knower = PenumbraKnower::load_or_initialize(storage_dir.join("penumbra.db"), &fvk, node) .await?; let (coordinator, participant_tx) = Coordinator::new(storage.clone()); - let coordinator_handle = tokio::spawn(coordinator.run()); - let service = CoordinatorService::new(knower, storage, participant_tx); + let coordinator_handle = match marker { + PhaseMarker::P1 => tokio::spawn(coordinator.run::()), + PhaseMarker::P2 => tokio::spawn(coordinator.run::()), + }; + let service = CoordinatorService::new(knower, storage, participant_tx, marker); let grpc_server = Server::builder() .accept_http1(true) diff --git a/tools/summonerd/src/participant.rs b/tools/summonerd/src/participant.rs index 469db485b2..7d932572d4 100644 --- a/tools/summonerd/src/participant.rs +++ b/tools/summonerd/src/participant.rs @@ -1,7 +1,6 @@ use anyhow::{Context, Result}; use penumbra_keys::Address; use penumbra_num::Amount; -use penumbra_proof_setup::all::{Phase2CeremonyCRS, Phase2RawCeremonyContribution}; use penumbra_proto::{ penumbra::tools::summoning::v1alpha1::{ self as pb, @@ -15,6 +14,8 @@ use penumbra_proto::{ use tokio::sync::mpsc; use tonic::{Status, Streaming}; +use crate::phase::Phase; + pub struct Participant { address: Address, rx: Streaming, @@ -63,14 +64,14 @@ impl Participant { } #[tracing::instrument(skip(self, parent))] - pub async fn contribute( + pub async fn contribute( &mut self, - parent: &Phase2CeremonyCRS, - ) -> Result> { + parent: &P::CRS, + ) -> Result> { self.tx .send(Ok(ParticipateResponse { msg: Some(ResponseMsg::ContributeNow(ContributeNow { - parent: Some(parent.clone().try_into()?), + parent: Some(P::serialize_crs(parent.clone())?), })), })) .await?; @@ -79,7 +80,7 @@ impl Participant { msg: Some(RequestMsg::Contribution(contribution)), }) = msg { - Ok(Some(Phase2RawCeremonyContribution::try_from(contribution)?)) + Ok(Some(P::deserialize_contribution(contribution)?)) } else { Ok(None) } diff --git a/tools/summonerd/src/phase.rs b/tools/summonerd/src/phase.rs new file mode 100644 index 0000000000..3d605733f6 --- /dev/null +++ b/tools/summonerd/src/phase.rs @@ -0,0 +1,173 @@ +use anyhow::Result; +use async_trait::async_trait; +use penumbra_keys::Address; +use penumbra_proof_setup::all::{ + Phase1CeremonyCRS, Phase1CeremonyContribution, Phase1RawCeremonyContribution, + Phase2CeremonyCRS, Phase2CeremonyContribution, Phase2RawCeremonyContribution, +}; +use penumbra_proto::tools::summoning::v1alpha1::{ + participate_request::Contribution as PBContribution, CeremonyCrs, +}; +use rand_core::CryptoRngCore; + +use crate::storage::Storage; + +/// A simple marker for which phase we're in, which some code can depend on at runtime. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PhaseMarker { + P1, + P2, +} + +/// A utility trait to exist solely for plumbing around code that's varies with phases. +/// +/// This contains some types and constants, along with various stub methods that are +/// simple one-liners, or should be. +#[async_trait] +pub trait Phase { + /// The type for the elements. + type CRS: Clone + Send + Sync; + + /// The type for unvalidated contributions. + type RawContribution: Send + Sync; + + /// The type for validated contributions. + type Contribution: Send + Sync; + + /// The constant value for the marker we use, for runtime dispatch. + const MARKER: PhaseMarker; + + /// The amount of time we should wait for a contribution. + /// + /// This varies since one phase is more expensive than the other. + const CONTRIBUTION_TIME_SECS: u64; + + /// Serialize the CRS value, in a potentially failing way. + fn serialize_crs(data: Self::CRS) -> Result; + + /// Deserialize a contribution, without validation. + fn deserialize_contribution(data: PBContribution) -> Result; + + /// Validate a contribution, using some randomness, and the root for the phase. + /// + /// Note: this can be expensive. + fn validate( + rng: &mut impl CryptoRngCore, + root: &Self::CRS, + contribution: Self::RawContribution, + ) -> Option; + + /// Check if a contribution is linked to some parent elements. + fn is_linked_to(contribution: &Self::Contribution, parent: &Self::CRS) -> bool; + + /// Fetch the root for this phase from storage. + async fn fetch_root(storage: &Storage) -> Result; + + /// Fetch the latest elements for this phase from storage. + async fn current_crs(storage: &Storage) -> Result>; + + /// Commit a contribution to the right phase table in storage. + async fn commit_contribution( + storage: &Storage, + contributor: Address, + contribution: Self::Contribution, + ) -> Result<()>; +} + +pub struct Phase1; + +#[async_trait] +impl Phase for Phase1 { + type CRS = Phase1CeremonyCRS; + type RawContribution = Phase1RawCeremonyContribution; + type Contribution = Phase1CeremonyContribution; + const MARKER: PhaseMarker = PhaseMarker::P1; + const CONTRIBUTION_TIME_SECS: u64 = 20 * 60; + + fn serialize_crs(data: Self::CRS) -> Result { + data.try_into() + } + + fn deserialize_contribution(data: PBContribution) -> Result { + data.try_into() + } + + fn validate( + rng: &mut impl CryptoRngCore, + _root: &Self::CRS, + contribution: Self::RawContribution, + ) -> Option { + contribution.validate(rng) + } + + fn is_linked_to(contribution: &Self::Contribution, parent: &Self::CRS) -> bool { + contribution.is_linked_to(parent) + } + + async fn fetch_root(storage: &Storage) -> Result { + Ok(storage.phase1_root().await?) + } + + async fn current_crs(storage: &Storage) -> Result> { + Ok(storage.phase1_current_crs().await?) + } + + async fn commit_contribution( + storage: &Storage, + contributor: Address, + contribution: Self::Contribution, + ) -> Result<()> { + Ok(storage + .phase1_commit_contribution(contributor, contribution) + .await?) + } +} + +pub struct Phase2; + +#[async_trait] +impl Phase for Phase2 { + type CRS = Phase2CeremonyCRS; + type RawContribution = Phase2RawCeremonyContribution; + type Contribution = Phase2CeremonyContribution; + const MARKER: PhaseMarker = PhaseMarker::P2; + const CONTRIBUTION_TIME_SECS: u64 = 10 * 60; + + fn serialize_crs(data: Self::CRS) -> Result { + data.try_into() + } + + fn deserialize_contribution(data: PBContribution) -> Result { + data.try_into() + } + + fn validate( + rng: &mut impl CryptoRngCore, + root: &Self::CRS, + contribution: Self::RawContribution, + ) -> Option { + contribution.validate(rng, root) + } + + fn is_linked_to(contribution: &Self::Contribution, parent: &Self::CRS) -> bool { + contribution.is_linked_to(parent) + } + + async fn fetch_root(storage: &Storage) -> Result { + Ok(storage.phase2_root().await?) + } + + async fn current_crs(storage: &Storage) -> Result> { + Ok(storage.phase2_current_crs().await?) + } + + async fn commit_contribution( + storage: &Storage, + contributor: Address, + contribution: Self::Contribution, + ) -> Result<()> { + Ok(storage + .phase2_commit_contribution(contributor, contribution) + .await?) + } +} diff --git a/tools/summonerd/src/server.rs b/tools/summonerd/src/server.rs index d1b6558fc1..2a031223ab 100644 --- a/tools/summonerd/src/server.rs +++ b/tools/summonerd/src/server.rs @@ -11,6 +11,7 @@ use tonic::{async_trait, Request, Response, Status, Streaming}; use crate::{ participant::Participant, penumbra_knower::PenumbraKnower, + phase::PhaseMarker, storage::{ContributionAllowed, Storage}, }; @@ -19,6 +20,7 @@ pub struct CoordinatorService { knower: PenumbraKnower, storage: Storage, participant_tx: mpsc::Sender<(Participant, Amount)>, + marker: PhaseMarker, } impl CoordinatorService { @@ -26,11 +28,13 @@ impl CoordinatorService { knower: PenumbraKnower, storage: Storage, participant_tx: mpsc::Sender<(Participant, Amount)>, + marker: PhaseMarker, ) -> Self { Self { knower, storage, participant_tx, + marker, } } } @@ -65,7 +69,7 @@ impl server::CeremonyCoordinatorService for CoordinatorService { // Errors are on our end, None is on their end let amount = match self .storage - .can_contribute(&self.knower, &address) + .can_contribute(&self.knower, &address, self.marker) .await .map_err(|e| { Status::internal(format!("failed to look up contributor metadata {:#}", e)) diff --git a/tools/summonerd/src/storage.rs b/tools/summonerd/src/storage.rs index 904c74332c..672ca8afe7 100644 --- a/tools/summonerd/src/storage.rs +++ b/tools/summonerd/src/storage.rs @@ -3,9 +3,9 @@ use camino::Utf8Path; use penumbra_keys::Address; use penumbra_num::Amount; use penumbra_proof_setup::all::{ - AllExtraTransitionInformation, Phase1CeremonyCRS, Phase1RawCeremonyCRS, - Phase1RawCeremonyContribution, Phase2CeremonyCRS, Phase2CeremonyContribution, - Phase2RawCeremonyCRS, Phase2RawCeremonyContribution, + AllExtraTransitionInformation, Phase1CeremonyCRS, Phase1CeremonyContribution, + Phase1RawCeremonyCRS, Phase1RawCeremonyContribution, Phase2CeremonyCRS, + Phase2CeremonyContribution, Phase2RawCeremonyCRS, Phase2RawCeremonyContribution, }; use penumbra_proto::{ penumbra::tools::summoning::v1alpha1::{ @@ -19,7 +19,7 @@ use r2d2_sqlite::{ }; use tokio::task::spawn_blocking; -use crate::penumbra_knower::PenumbraKnower; +use crate::{penumbra_knower::PenumbraKnower, phase::PhaseMarker}; const MIN_BID_AMOUNT_U64: u64 = 1u64; const MAX_STRIKES: u64 = 3u64; @@ -165,6 +165,7 @@ impl Storage { &self, knower: &PenumbraKnower, address: &Address, + marker: PhaseMarker, ) -> Result { // Criteria: // - Bid more than min amount @@ -177,13 +178,13 @@ impl Storage { let has_contributed = { let mut conn = self.pool.get()?; let tx = conn.transaction()?; - tx.query_row( - "SELECT 1 FROM phase2_contributions WHERE address = ?1", - [address.to_vec()], - |_| Ok(()), - ) - .optional()? - .is_some() + let query = match marker { + PhaseMarker::P1 => "SELECT 1 FROM phase1_contributions WHERE address = ?1", + PhaseMarker::P2 => "SELECT 1 FROM phase2_contributions WHERE address = ?1", + }; + tx.query_row(query, [address.to_vec()], |_| Ok(())) + .optional()? + .is_some() }; if has_contributed { return Ok(ContributionAllowed::AlreadyContributed); @@ -248,7 +249,26 @@ impl Storage { Ok(Some(crs)) } - pub async fn commit_contribution( + pub async fn phase1_commit_contribution( + &self, + contributor: Address, + contribution: Phase1CeremonyContribution, + ) -> Result<()> { + let mut conn = self.pool.get()?; + let tx = conn.transaction()?; + let contributor_bytes = contributor.to_vec(); + tx.execute( + "INSERT INTO phase1_contributions VALUES(NULL, 0, ?1, ?2)", + [ + PBContribution::try_from(contribution)?.encode_to_vec(), + contributor_bytes, + ], + )?; + tx.commit()?; + Ok(()) + } + + pub async fn phase2_commit_contribution( &self, contributor: Address, contribution: Phase2CeremonyContribution, @@ -267,20 +287,22 @@ impl Storage { Ok(()) } - pub async fn current_slot(&self) -> Result { + pub async fn current_slot(&self, marker: PhaseMarker) -> Result { let mut conn = self.pool.get()?; let tx = conn.transaction()?; + let query = match marker { + PhaseMarker::P1 => "SELECT MAX(slot) from phase1_contributions", + PhaseMarker::P2 => "SELECT MAX(slot) from phase2_contributions", + }; let out = tx - .query_row("SELECT MAX(slot) FROM phase2_contributions", [], |row| { - row.get::>(0) - })? + .query_row(query, [], |row| row.get::>(0))? .unwrap_or(0); Ok(out) } /// Get Phase 1 root. #[allow(dead_code)] - pub async fn phase_1_root(&self) -> Result { + pub async fn phase1_root(&self) -> Result { let mut conn = self.pool.get()?; let tx = conn.transaction()?; let data = tx.query_row( @@ -295,7 +317,7 @@ impl Storage { } /// Get Phase 2 root. - pub async fn phase_2_root(&self) -> Result { + pub async fn phase2_root(&self) -> Result { let mut conn = self.pool.get()?; let tx = conn.transaction()?; let data = tx.query_row(