Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement persistent storage in summonerd #3152

Merged
merged 5 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions crates/crypto/proof-setup/src/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ impl From<Phase2CeremonyCRS> for Phase2RawCeremonyCRS {
}
}

impl TryInto<pb::CeremonyCrs> for Phase2CeremonyCRS {
impl TryFrom<Phase2CeremonyCRS> for pb::CeremonyCrs {
type Error = anyhow::Error;

fn try_into(self) -> Result<pb::CeremonyCrs> {
Phase2RawCeremonyCRS::from(self).try_into()
fn try_from(data: Phase2CeremonyCRS) -> Result<pb::CeremonyCrs> {
Phase2RawCeremonyCRS::from(data).try_into()
}
}

Expand Down Expand Up @@ -413,11 +413,11 @@ impl From<Phase2CeremonyContribution> for Phase2RawCeremonyContribution {
}
}

impl TryInto<pb::participate_request::Contribution> for Phase2CeremonyContribution {
impl TryFrom<Phase2CeremonyContribution> for pb::participate_request::Contribution {
type Error = anyhow::Error;

fn try_into(self) -> Result<pb::participate_request::Contribution> {
Phase2RawCeremonyContribution::from(self).try_into()
fn try_from(data: Phase2CeremonyContribution) -> Result<pb::participate_request::Contribution> {
Phase2RawCeremonyContribution::from(data).try_into()
}
}

Expand Down
5 changes: 5 additions & 0 deletions tools/summonerd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ tower = "0.4"
tracing = "0.1"
tracing-subscriber = "0.3"
url = "2"
r2d2 = "0.8"
# Depending on our fork of r2d2-sqlite, which updates the rusqlite dependency to 0.29
r2d2_sqlite = { version = "0.22", git = "https://github.com/penumbra-zone/r2d2-sqlite.git", features = [
"bundled",
] }

[build-dependencies]
vergen = "5"
Expand Down
2 changes: 1 addition & 1 deletion tools/summonerd/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl Coordinator {
if let Some(contribution) = contribution.validate(&mut OsRng, &self.storage.root().await?) {
if contribution.is_linked_to(&parent) {
self.storage
.commit_contribution(contributor, &contribution)
.commit_contribution(contributor, contribution)
.await?;
participant
.confirm(self.storage.current_slot().await?)
Expand Down
14 changes: 10 additions & 4 deletions tools/summonerd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ mod server;
mod storage;

use anyhow::Result;
use camino::Utf8PathBuf;
use clap::Parser;
use console_subscriber::ConsoleLayer;
use coordinator::Coordinator;
use metrics_tracing_context::MetricsLayer;
use penumbra_proof_setup::all::Phase2CeremonyCRS;
use penumbra_proto::tools::summoning::v1alpha1::ceremony_coordinator_service_server::CeremonyCoordinatorServiceServer;
use std::net::SocketAddr;
use storage::Storage;
Expand Down Expand Up @@ -39,6 +39,8 @@ struct Opt {
enum Command {
/// Start the coordinator.
Start {
#[clap(long, display_order = 800)]
storage_path: Utf8PathBuf,
#[clap(long, display_order = 900, default_value = "127.0.0.1:8081")]
listen: SocketAddr,
},
Expand All @@ -47,9 +49,13 @@ enum Command {
impl Opt {
async fn exec(self) -> Result<()> {
match self.cmd {
Command::Start { listen } => {
let root = Phase2CeremonyCRS::root()?;
let storage = Storage::new(root);
Command::Start {
listen,
storage_path,
} => {
//let root = Phase2CeremonyCRS::root()?;
//let storage = Storage::new(root);
let storage = Storage::load_or_initialize(storage_path).await?;
let (coordinator, participant_tx) = Coordinator::new(storage.clone());
let coordinator_handle = tokio::spawn(coordinator.run());
let service = CoordinatorService::new(storage, participant_tx);
Expand Down
148 changes: 122 additions & 26 deletions tools/summonerd/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,81 @@
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};

use anyhow::Result;
use camino::Utf8Path;
use penumbra_keys::Address;
use penumbra_proof_setup::all::{Phase2CeremonyCRS, Phase2CeremonyContribution};
use tokio::sync::Mutex;
use penumbra_proof_setup::all::{
Phase2CeremonyCRS, Phase2CeremonyContribution, Phase2RawCeremonyCRS,
Phase2RawCeremonyContribution,
};
use penumbra_proto::{
penumbra::tools::summoning::v1alpha1::{
self as pb, participate_request::Contribution as PBContribution,
},
Message,
};
use r2d2_sqlite::{rusqlite::OpenFlags, SqliteConnectionManager};
use tokio::task::spawn_blocking;

#[derive(Clone)]
pub struct Storage {
// TODO: This should have methods for persisting all of the state of the coordinator,
// using a sqlite database.
crs: Arc<Mutex<Phase2CeremonyCRS>>,
slot: Arc<AtomicU64>,
root: Phase2CeremonyCRS,
pool: r2d2::Pool<SqliteConnectionManager>,
}

impl Storage {
pub fn new(root: Phase2CeremonyCRS) -> Self {
Self {
crs: Arc::new(Mutex::new(root.clone())),
slot: Arc::new(AtomicU64::new(0)),
root,
/// If the database at `storage_path` exists, [`Self::load`] it, otherwise, [`Self::initialize`] it.
pub async fn load_or_initialize(storage_path: impl AsRef<Utf8Path>) -> anyhow::Result<Self> {
if storage_path.as_ref().exists() {
return Self::load(storage_path).await;
}

Self::initialize(storage_path).await
}

pub async fn root(&self) -> Result<Phase2CeremonyCRS> {
Ok(self.root.clone())
pub async fn initialize(storage_path: impl AsRef<Utf8Path>) -> anyhow::Result<Self> {
// Connect to the database (or create it)
let pool = Self::connect(storage_path)?;

spawn_blocking(move || {
// In one database transaction, populate everything
let mut conn = pool.get()?;
let tx = conn.transaction()?;

// Create the tables
tx.execute_batch(include_str!("storage/schema.sql"))?;
// TODO: Remove this in favor of a specific command for initializing root
let root = Phase2CeremonyCRS::root()?;
tx.execute(
"INSERT INTO phase2_contributions VALUES (0, 1, ?1, NULL)",
[pb::CeremonyCrs::try_from(root)?.encode_to_vec()],
)?;

tx.commit()?;

Ok(Storage { pool })
})
.await?
}

fn connect(path: impl AsRef<Utf8Path>) -> anyhow::Result<r2d2::Pool<SqliteConnectionManager>> {
let manager = SqliteConnectionManager::file(path.as_ref())
.with_flags(
// Don't allow opening URIs, because they can change the behavior of the database; we
// just want to open normal filepaths.
OpenFlags::default() & !OpenFlags::SQLITE_OPEN_URI,
)
.with_init(|conn| {
// We use `prepare_cached` a fair amount: this is an overestimate of the number
// of cached prepared statements likely to be used.
conn.set_prepared_statement_cache_capacity(32);
Ok(())
});
Ok(r2d2::Pool::new(manager)?)
}

pub async fn load(path: impl AsRef<Utf8Path>) -> anyhow::Result<Self> {
let storage = Self {
pool: Self::connect(path)?,
};

Ok(storage)
}

pub async fn can_contribute(&self, _address: Address) -> Result<()> {
Expand All @@ -39,21 +87,69 @@ impl Storage {
}

pub async fn current_crs(&self) -> Result<Phase2CeremonyCRS> {
Ok(self.crs.lock().await.clone())
let mut conn = self.pool.get()?;
let tx = conn.transaction()?;
let (is_root, contribution_or_crs) = tx.query_row(
"SELECT is_root, contribution_or_crs FROM phase2_contributions ORDER BY slot DESC LIMIT 1",
[],
|row| Ok((row.get::<usize, bool>(0)?, row.get::<usize, Vec<u8>>(1)?)),
)?;
let crs = if is_root {
Phase2RawCeremonyCRS::try_from(pb::CeremonyCrs::decode(
contribution_or_crs.as_slice(),
)?)?
.assume_valid()
} else {
Phase2RawCeremonyContribution::try_from(PBContribution::decode(
contribution_or_crs.as_slice(),
)?)?
.assume_valid()
.new_elements()
};
Ok(crs)
}

// TODO: Add other stuff here
pub async fn commit_contribution(
&self,
_contributor: Address,
contribution: &Phase2CeremonyContribution,
contributor: Address,
contribution: Phase2CeremonyContribution,
) -> Result<()> {
*self.crs.lock().await = contribution.new_elements();
self.slot.fetch_add(1, Ordering::SeqCst);
let mut conn = self.pool.get()?;
let tx = conn.transaction()?;
let contributor_bytes = contributor.to_vec();
tx.execute(
"INSERT INTO phase2_contributions VALUES(NULL, 0, ?1, ?2)",
[
PBContribution::try_from(contribution)?.encode_to_vec(),
contributor_bytes,
],
)?;
tx.commit()?;
Ok(())
}

pub async fn current_slot(&self) -> Result<u64> {
Ok(self.slot.load(Ordering::SeqCst))
let mut conn = self.pool.get()?;
let tx = conn.transaction()?;
let out = tx
.query_row("SELECT MAX(slot) FROM phase2_contributions", [], |row| {
row.get::<usize, Option<u64>>(0)
})?
.unwrap_or(0);
Ok(out)
}

pub async fn root(&self) -> Result<Phase2CeremonyCRS> {
let mut conn = self.pool.get()?;
let tx = conn.transaction()?;
let data = tx.query_row(
"SELECT contribution_or_crs FROM phase2_contributions WHERE is_root LIMIT 1",
[],
|row| row.get::<usize, Vec<u8>>(0),
)?;
Ok(
Phase2RawCeremonyCRS::try_from(pb::CeremonyCrs::decode(data.as_slice())?)?
.assume_valid(),
)
}
}
10 changes: 10 additions & 0 deletions tools/summonerd/src/storage/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- used for storing phase 2 contribution information
CREATE TABLE phase2_contributions (
slot INTEGER PRIMARY KEY,
-- 1 if this is a root
is_root INTEGER NOT NULL,
-- If this is the root, will be just the elements, and not a full contribution
contribution_or_crs BLOB NOT NULL,
-- NULL in the specific case that this is the root
address BLOB
);
Loading