diff --git a/crates/prism/src/da/celestia.rs b/crates/prism/src/da/celestia.rs index bd4e47d..facd55c 100644 --- a/crates/prism/src/da/celestia.rs +++ b/crates/prism/src/da/celestia.rs @@ -9,14 +9,14 @@ use celestia_rpc::{BlobClient, Client, HeaderClient}; use celestia_types::{blob::GasPrice, nmt::Namespace, Blob}; use prism_common::operation::Operation; use prism_errors::{DataAvailabilityError, GeneralError}; -use std::{self, sync::Arc}; -use tokio::{ +use std::{ + self, sync::{ - mpsc::{channel, Receiver, Sender}, - Mutex, + atomic::{AtomicU64, Ordering}, + Arc, }, - task::spawn, }; +use tokio::{sync::broadcast, task::spawn}; use bincode; @@ -35,14 +35,12 @@ pub struct CelestiaConnection { pub snark_namespace: Namespace, pub operation_namespace: Namespace, - sync_target_tx: Arc>, - sync_target_rx: Arc>>, + height_update_tx: broadcast::Sender, + sync_target: Arc, } impl CelestiaConnection { pub async fn new(config: &CelestiaConfig, auth_token: Option<&str>) -> Result { - let (tx, rx) = channel(CHANNEL_BUFFER_SIZE); - let client = Client::new(&config.connection_string, auth_token) .await .context("Failed to initialize websocket connection") @@ -61,12 +59,14 @@ impl CelestiaConnection { None => snark_namespace, }; + let (height_update_tx, _) = broadcast::channel(100); + Ok(CelestiaConnection { client, snark_namespace, operation_namespace, - sync_target_tx: Arc::new(tx), - sync_target_rx: Arc::new(Mutex::new(rx)), + height_update_tx, + sync_target: Arc::new(AtomicU64::new(0)), }) } } @@ -86,41 +86,38 @@ fn create_namespace(namespace_hex: &str) -> Result { #[async_trait] impl DataAvailabilityLayer for CelestiaConnection { async fn get_latest_height(&self) -> Result { - match self.sync_target_rx.lock().await.recv().await { - Some(height) => Ok(height), - None => Err(anyhow!(DataAvailabilityError::ChannelReceiveError)), - } + Ok(self.sync_target.load(Ordering::Relaxed)) } async fn initialize_sync_target(&self) -> Result { - HeaderClient::header_network_head(&self.client) + let height = HeaderClient::header_network_head(&self.client) .await .context("Failed to get network head from DA layer") - .map(|extended_header| extended_header.header.height.value()) + .map(|extended_header| extended_header.header.height.value())?; + + self.sync_target.store(height, Ordering::Relaxed); + Ok(height) } - async fn get_snarks(&self, height: u64) -> Result> { + async fn get_snark(&self, height: u64) -> Result> { trace!("searching for epoch on da layer at height {}", height); + match BlobClient::blob_get_all(&self.client, height, &[self.snark_namespace]).await { - Ok(blobs) => { - let mut epochs = Vec::new(); - for blob in blobs.iter() { - match FinalizedEpoch::try_from(blob) { - Ok(epoch_json) => epochs.push(epoch_json), - Err(_) => { - GeneralError::ParsingError(format!( - "marshalling blob from height {} to epoch json: {:?}", - height, &blob - )); - } - } - } - Ok(epochs) - } + Ok(blobs) => blobs + .into_iter() + .next() + .map(|blob| { + FinalizedEpoch::try_from(&blob).map_err(|_| { + anyhow!(GeneralError::ParsingError(format!( + "marshalling blob from height {} to epoch json: {:?}", + height, &blob + ))) + }) + }) + .transpose(), Err(err) => { - // todo: this is a hack to handle a retarded error from cel-node that will be fixed in v0.15.0 if err.to_string().contains("blob: not found") { - Ok(vec![]) + Ok(None) } else { Err(anyhow!(DataAvailabilityError::DataRetrievalError( height, @@ -131,38 +128,22 @@ impl DataAvailabilityLayer for CelestiaConnection { } } - async fn submit_snarks(&self, epochs: Vec) -> Result { - if epochs.is_empty() { - bail!("no epochs provided for submission"); - } + async fn submit_snark(&self, epoch: FinalizedEpoch) -> Result { + debug!("posting {}th epoch to da layer", epoch.height); - debug!("posting {} epochs to da layer", epochs.len()); + let data = bincode::serialize(&epoch).map_err(|e| { + DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!( + "serializing epoch {}: {}", + epoch.height, e + ))) + })?; - let blobs: Result, DataAvailabilityError> = epochs - .iter() - .map(|epoch| { - let data = bincode::serialize(epoch).map_err(|e| { - DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!( - "serializing epoch {}: {}", - epoch.height, e - ))) - })?; - Blob::new(self.snark_namespace, data).map_err(|e| { - DataAvailabilityError::GeneralError(GeneralError::BlobCreationError( - e.to_string(), - )) - }) - }) - .collect(); - - let blobs = blobs?; - - for (i, blob) in blobs.iter().enumerate() { - trace!("blob {}: {:?}", i, blob); - } + let blob = Blob::new(self.snark_namespace, data).map_err(|e| { + DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(e.to_string())) + })?; self.client - .blob_submit(&blobs, GasPrice::from(-1.0)) + .blob_submit(&[blob], GasPrice::from(-1.0)) .await .map_err(|e| anyhow!(DataAvailabilityError::SubmissionError(e.to_string()))) } @@ -230,35 +211,29 @@ impl DataAvailabilityLayer for CelestiaConnection { .map_err(|e| anyhow!(DataAvailabilityError::SubmissionError(e.to_string()))) } + fn subscribe_to_heights(&self) -> broadcast::Receiver { + self.height_update_tx.subscribe() + } + async fn start(&self) -> Result<()> { let mut header_sub = HeaderClient::header_subscribe(&self.client) .await - .context("Failed to subscribe to headers from DA layer") - .map_err(|e| DataAvailabilityError::NetworkError(e.to_string()))?; + .context("Failed to subscribe to headers from DA layer")?; + + let sync_target = self.sync_target.clone(); + let height_update_tx = self.height_update_tx.clone(); - let synctarget_buffer = self.sync_target_tx.clone(); spawn(async move { while let Some(extended_header_result) = header_sub.next().await { match extended_header_result { Ok(extended_header) => { let height = extended_header.header.height.value(); - match synctarget_buffer.send(height).await { - Ok(_) => { - debug!("sent sync target update for height {}", height); - } - Err(_) => { - DataAvailabilityError::SyncTargetError(format!( - "sending sync target update message for height {}", - height - )); - } - } + sync_target.store(height, Ordering::Relaxed); + let _ = height_update_tx.send(height); + debug!("updated sync target for height {}", height); } Err(e) => { - DataAvailabilityError::NetworkError(format!( - "retrieving header from da layer: {}", - e - )); + error!("Error retrieving header from DA layer: {}", e); } } } diff --git a/crates/prism/src/da/memory.rs b/crates/prism/src/da/memory.rs index cc4651a..1d249e6 100644 --- a/crates/prism/src/da/memory.rs +++ b/crates/prism/src/da/memory.rs @@ -2,7 +2,7 @@ use crate::da::{DataAvailabilityLayer, FinalizedEpoch}; use anyhow::Result; use async_trait::async_trait; use prism_common::operation::Operation; -use std::sync::Arc; +use std::{collections::VecDeque, sync::Arc}; use tokio::{ sync::{broadcast, RwLock}, time::{interval, Duration}, @@ -12,14 +12,14 @@ use tokio::{ pub struct Block { pub height: u64, pub operations: Vec, - pub epochs: Vec, + pub epoch: Option, } #[derive(Clone)] pub struct InMemoryDataAvailabilityLayer { blocks: Arc>>, pending_operations: Arc>>, - pending_epochs: Arc>>, + pending_epochs: Arc>>, latest_height: Arc>, height_update_tx: broadcast::Sender, block_update_tx: broadcast::Sender, @@ -34,7 +34,7 @@ impl InMemoryDataAvailabilityLayer { Self { blocks: Arc::new(RwLock::new(Vec::new())), pending_operations: Arc::new(RwLock::new(Vec::new())), - pending_epochs: Arc::new(RwLock::new(Vec::new())), + pending_epochs: Arc::new(RwLock::new(VecDeque::new())), latest_height: Arc::new(RwLock::new(0)), height_update_tx: height_tx, block_update_tx: block_tx, @@ -58,13 +58,12 @@ impl InMemoryDataAvailabilityLayer { let new_block = Block { height: *latest_height, operations: std::mem::take(&mut *pending_operations), - epochs: std::mem::take(&mut *pending_epochs), + epoch: pending_epochs.pop_front(), }; debug!( - "new block produced at height {} with {} operations and {} snarks", + "new block produced at height {} with {} operations", new_block.height, new_block.operations.len(), - new_block.epochs.len() ); blocks.push(new_block.clone()); @@ -81,6 +80,10 @@ impl InMemoryDataAvailabilityLayer { #[async_trait] impl DataAvailabilityLayer for InMemoryDataAvailabilityLayer { + fn subscribe_to_heights(&self) -> broadcast::Receiver { + self.height_update_tx.subscribe() + } + async fn get_latest_height(&self) -> Result { Ok(*self.latest_height.read().await) } @@ -89,18 +92,18 @@ impl DataAvailabilityLayer for InMemoryDataAvailabilityLayer { self.get_latest_height().await } - async fn get_snarks(&self, height: u64) -> Result> { + async fn get_snark(&self, height: u64) -> Result> { let blocks = self.blocks.read().await; Ok(blocks .iter() .find(|block| block.height == height) - .map(|block| block.epochs.clone()) + .map(|block| block.epoch.clone()) .unwrap_or_default()) } - async fn submit_snarks(&self, epochs: Vec) -> Result { + async fn submit_snark(&self, epoch: FinalizedEpoch) -> Result { let mut pending_epochs = self.pending_epochs.write().await; - pending_epochs.extend(epochs); + pending_epochs.push_back(epoch); self.get_latest_height().await } diff --git a/crates/prism/src/da/mod.rs b/crates/prism/src/da/mod.rs index 7fc90c8..8620792 100644 --- a/crates/prism/src/da/mod.rs +++ b/crates/prism/src/da/mod.rs @@ -8,6 +8,7 @@ use prism_errors::GeneralError; use serde::{Deserialize, Serialize}; use sp1_sdk::SP1ProofWithPublicValues; use std::{self, str::FromStr}; +use tokio::sync::broadcast; pub mod celestia; pub mod memory; @@ -50,9 +51,10 @@ impl SignedContent for FinalizedEpoch { pub trait DataAvailabilityLayer: Send + Sync { async fn get_latest_height(&self) -> Result; async fn initialize_sync_target(&self) -> Result; - async fn get_snarks(&self, height: u64) -> Result>; - async fn submit_snarks(&self, epoch: Vec) -> Result; + async fn get_snark(&self, height: u64) -> Result>; + async fn submit_snark(&self, epoch: FinalizedEpoch) -> Result; async fn get_operations(&self, height: u64) -> Result>; async fn submit_operations(&self, operations: Vec) -> Result; async fn start(&self) -> Result<()>; + fn subscribe_to_heights(&self) -> broadcast::Receiver; } diff --git a/crates/prism/src/node_types/lightclient.rs b/crates/prism/src/node_types/lightclient.rs index bb2f43d..8458826 100644 --- a/crates/prism/src/node_types/lightclient.rs +++ b/crates/prism/src/node_types/lightclient.rs @@ -72,68 +72,74 @@ impl LightClient { debug!("updated sync target to height {}", target); for i in current_position..target { trace!("processing height: {}", i); - match self.da.get_snarks(i + 1).await { - Ok(epoch_json_vec) => { - if !epoch_json_vec.is_empty() { - debug!("light client: got epochs at height {}", i + 1); + match self.da.get_snark(i + 1).await { + Ok(epoch_json) => { + if epoch_json.is_none() { + continue; } + let finalized_epoch = epoch_json.unwrap(); + debug!("light client: got epochs at height {}", i + 1); + // todo: verify adjacency to last heights, <- for this we need some sort of storage of epochs - for epoch_json in epoch_json_vec { - let _prev_commitment = &epoch_json.prev_commitment; - let _current_commitment = &epoch_json.current_commitment; + let _prev_commitment = &finalized_epoch.prev_commitment; + let _current_commitment = &finalized_epoch.current_commitment; - // if the user does not add a verifying key, we will not verify the signature, - // but only log a warning on startup - if self.sequencer_pubkey.is_some() { - match verify_signature( - &epoch_json.clone(), - self.sequencer_pubkey.clone(), - ) { - Ok(_) => trace!( + // if the user does not add a verifying key, we will not verify the signature, + // but only log a warning on startup + if self.sequencer_pubkey.is_some() { + match verify_signature( + &finalized_epoch.clone(), + self.sequencer_pubkey.clone(), + ) { + Ok(_) => { + trace!( "valid signature for epoch {}", - epoch_json.height - ), - Err(e) => { - panic!("invalid signature in epoch {}: {:?}", i, e) - } + finalized_epoch.height + ) + } + Err(e) => { + panic!("invalid signature in epoch {}: {:?}", i, e) } } + } - let prev_commitment = &epoch_json.prev_commitment; - let current_commitment = &epoch_json.current_commitment; + let prev_commitment = &finalized_epoch.prev_commitment; + let current_commitment = &finalized_epoch.current_commitment; - let mut public_values = epoch_json.proof.public_values.clone(); - let proof_prev_commitment: Digest = public_values.read(); - let proof_current_commitment: Digest = public_values.read(); + let mut public_values = finalized_epoch.proof.public_values.clone(); + let proof_prev_commitment: Digest = public_values.read(); + let proof_current_commitment: Digest = public_values.read(); - if prev_commitment != &proof_prev_commitment - || current_commitment != &proof_current_commitment - { - error!( - "Commitment mismatch: + if prev_commitment != &proof_prev_commitment + || current_commitment != &proof_current_commitment + { + error!( + "Commitment mismatch: prev_commitment: {:?}, proof_prev_commitment: {:?}, current_commitment: {:?}, proof_current_commitment: {:?}", - prev_commitment, - proof_prev_commitment, - current_commitment, - proof_current_commitment - ); - panic!("Commitment mismatch in epoch {}", epoch_json.height); - } + prev_commitment, + proof_prev_commitment, + current_commitment, + proof_current_commitment + ); + panic!("Commitment mismatch in epoch {}", finalized_epoch.height); + } - match self.client.verify(&epoch_json.proof, &self.verifying_key) { - Ok(_) => { - info!( - "zkSNARK for epoch {} was validated successfully", - epoch_json.height - ) - } - Err(err) => panic!( - "failed to validate epoch at height {}: {:?}", - epoch_json.height, err - ), + match self + .client + .verify(&finalized_epoch.proof, &self.verifying_key) + { + Ok(_) => { + info!( + "zkSNARK for epoch {} was validated successfully", + finalized_epoch.height + ) } + Err(err) => panic!( + "failed to validate epoch at height {}: {:?}", + finalized_epoch.height, err + ), } } Err(e) => { diff --git a/crates/prism/src/node_types/sequencer.rs b/crates/prism/src/node_types/sequencer.rs index e81fff8..5dfea98 100644 --- a/crates/prism/src/node_types/sequencer.rs +++ b/crates/prism/src/node_types/sequencer.rs @@ -1,12 +1,13 @@ -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use ed25519::Signature; use ed25519_dalek::{Signer, SigningKey}; use jmt::KeyHash; use prism_common::tree::{hash, Batch, Digest, Hasher, KeyDirectoryTree, Proof, SnarkableTree}; -use std::{self, str::FromStr, sync::Arc}; +use std::{self, collections::VecDeque, str::FromStr, sync::Arc}; use tokio::{ sync::{ + broadcast, mpsc::{channel, Receiver, Sender}, Mutex, }, @@ -55,9 +56,6 @@ pub struct Sequencer { proving_key: SP1ProvingKey, verifying_key: SP1VerifyingKey, - - epoch_buffer_tx: Arc>, - epoch_buffer_rx: Arc>>, } #[async_trait] @@ -65,17 +63,13 @@ impl NodeType for Sequencer { async fn start(self: Arc) -> Result<()> { self.da.start().await.context("Failed to start DA layer")?; - let sync_loop = self.clone().sync_loop(); - let snark_loop = self.clone().post_snarks_loop(); - let operation_loop = self.clone().post_operations_loop(); + let main_loop = self.clone().main_loop(); let ws_self = self.clone(); let ws = ws_self.ws.start(self.clone()); tokio::select! { - res = sync_loop => Ok(res.context("sync loop failed")?), - res = snark_loop => Ok(res.context("DA loop failed")?), - res = operation_loop => Ok(res.context("Operation loop failed")?), + res = main_loop => Ok(res.context("main loop failed")?), res = ws => Ok(res.context("WebServer failed")?), } } @@ -88,7 +82,6 @@ impl Sequencer { cfg: Config, key: SigningKey, ) -> Result { - let (tx, rx) = channel(CHANNEL_BUFFER_SIZE); let ws = cfg.webserver.context("Missing webserver configuration")?; let start_height = cfg.celestia_config.unwrap_or_default().start_height; @@ -109,219 +102,198 @@ impl Sequencer { prover_client: Arc::new(Mutex::new(prover_client)), tree, pending_operations: Arc::new(Mutex::new(Vec::new())), - epoch_buffer_tx: Arc::new(tx), - epoch_buffer_rx: Arc::new(Mutex::new(rx)), }) } - // sync_loop is responsible for downloading operations from the DA layer - async fn sync_loop(self: Arc) -> Result<(), tokio::task::JoinError> { - let self_clone = self.clone(); - info!("starting operation sync loop"); - let epoch_buffer = self.epoch_buffer_tx.clone(); - spawn(async move { - let mut current_position = self_clone.start_height; - loop { - // target is updated when a new header is received - let target = match self_clone.da.get_latest_height().await { - Ok(target) => target, - Err(e) => { - error!("failed to update sync target, retrying: {:?}", e); - continue; - } - }; + async fn main_loop(self: Arc) -> Result<()> { + let mut height_rx = self.da.subscribe_to_heights(); + let current_height = height_rx.recv().await?; + let historical_sync_height = current_height - 1; - if current_position == target { - continue; - } + self.sync_range(self.start_height, historical_sync_height) + .await?; + self.real_time_sync(height_rx).await + } - debug!("updated sync target to height {}", target); - while current_position < target { - trace!("processing height: {}", current_position); - match self_clone.da.get_operations(current_position + 1).await { - Ok(operations) => { - if !operations.is_empty() { - debug!( - "sequencer: got operations at height {}", - current_position + 1 - ); - } - - let epoch = match self_clone.finalize_epoch(operations).await { - Ok(e) => e, - Err(e) => { - error!("sequencer_loop: finalizing epoch: {}", e); - continue; - } - }; - - info!("sequencer_loop: finalized epoch {}", epoch.height); - match epoch_buffer.send(epoch).await { - Ok(_) => { - current_position += 1; - } - Err(e) => { - error!("sequencer_loop: sending epoch to buffer: {}", e); - } - } - } - Err(e) => { - debug!("light client: getting epoch: {}", e) - } - }; - } - current_position = target; // Update the current position to the latest target - } - }) - .await + async fn sync_range(&self, start_height: u64, end_height: u64) -> Result<()> { + let saved_epoch = self.db.get_epoch()?; + let mut current_epoch: u64 = 0; + let mut buffered_operations: VecDeque> = VecDeque::new(); + let mut current_height = start_height; + + while current_height < end_height { + let height = current_height + 1; + let operations = self.da.get_operations(height).await?; + let epoch_result = self.da.get_snark(height).await?; + + self.process_height( + height, + operations, + epoch_result, + &mut current_epoch, + &mut buffered_operations, + saved_epoch, + ) + .await?; + + current_height += 1; + } + + Ok(()) } - pub async fn post_operations_loop(self: Arc) -> Result<(), tokio::task::JoinError> { - info!("Starting operation posting loop"); - let mut ticker = interval(std::time::Duration::from_secs(1)); // Adjust the interval as needed - let mut last_processed_height = 0; - - spawn(async move { - loop { - ticker.tick().await; - - // Check for new block - let current_height = match self.da.get_latest_height().await { - Ok(height) => height, - Err(e) => { - error!("operation_loop: Failed to get latest height: {}", e); - continue; - } - }; - - // If there's a new block - if current_height > last_processed_height { - // Get pending operations - let pending_operations = { - let mut ops = self.pending_operations.lock().await; - std::mem::take(&mut *ops) - }; - - // If there are pending operations, submit them - if !pending_operations.is_empty() { - match self.da.submit_operations(pending_operations).await { - Ok(submitted_height) => { - info!( - "operation_loop: Submitted operations at height {}", - submitted_height - ); - last_processed_height = submitted_height; - } - Err(e) => { - error!("operation_loop: Failed to submit operations: {}", e); - // TODO: Handle error (e.g., retry logic, returning operations to pending_operations) - } - } - } else { - debug!( - "operation_loop: No pending operations to submit at height {}", - current_height - ); - last_processed_height = current_height; - } - } - } - }) - .await + async fn real_time_sync(&self, mut height_rx: broadcast::Receiver) -> Result<()> { + let saved_epoch = self.db.get_epoch()?; + let mut current_epoch: u64 = saved_epoch; + let mut buffered_operations: VecDeque> = VecDeque::new(); + + loop { + let height = height_rx.recv().await?; + let operations = self.da.get_operations(height).await?; + let epoch_result = self.da.get_snark(height).await?; + + self.process_height( + height, + operations, + epoch_result, + &mut current_epoch, + &mut buffered_operations, + saved_epoch, + ) + .await?; + } } - // post_snarks_loop is responsible for submitting finalized epochs to the DA layer. - async fn post_snarks_loop(self: Arc) -> Result<(), tokio::task::JoinError> { - info!("starting da submission loop"); - let mut ticker = interval(DA_RETRY_INTERVAL); - spawn(async move { - loop { - let epochs = match self.receive_finalized_epochs().await { - Ok(epochs) => epochs, - Err(e) => { - error!("da_loop: getting finalized epochs: {}", e); - continue; - } - }; + async fn process_height( + &self, + height: u64, + operations: Vec, + epoch_result: Option, + current_epoch: &mut u64, + buffered_operations: &mut VecDeque>, + saved_epoch: u64, + ) -> Result<()> { + let mut tree = self.tree.lock().await; + let prev_commitment = tree.get_commitment()?; + + // once current_epoch > saved_epoch, the sequencer must create and post new FinalizedEpochs + if *current_epoch > saved_epoch { + let all_ops: Vec = buffered_operations.drain(..).flatten().collect(); + self.finalize_new_epoch(*current_epoch, all_ops, &mut tree) + .await?; + } + // we haven't fully synced yet, the FinalizedEpoch is in the block + else if let Some(epoch) = epoch_result { + self.process_existing_epoch( + epoch, + current_epoch, + buffered_operations, + &mut tree, + prev_commitment, + height, + ) + .await?; + } + // there was no FinalizedEpoch in this block, so buffer its operations until we come across the next one + else { + buffered_operations.push_back(operations); + warn!( + "Epoch JSON not found for height {}. Operations buffered.", + height + ); + } - // don't post to DA layer if no epochs have been finalized - if epochs.is_empty() { - continue; - } + Ok(()) + } - let mut retry_counter = 0; - loop { - if retry_counter > DA_RETRY_COUNT { - // todo: graceful shutdown - panic!("da_loop: too many retries, giving up"); - } - match self.da.submit_snarks(epochs.clone()).await { - Ok(height) => { - info!("da_loop: submitted epoch at height {}", height); - break; - } - Err(e) => { - // code = NotFound means the account is not funded - if e.to_string().contains("rpc error: code = NotFound") { - panic!("da_loop: celestia account not funded, causing: {}", e); - } - error!("da_loop: submitting epoch: {}", e); - retry_counter += 1; - } - }; - ticker.tick().await; + async fn process_existing_epoch( + &self, + epoch: FinalizedEpoch, + current_epoch: &mut u64, + buffered_operations: &mut VecDeque>, + tree: &mut KeyDirectoryTree>, + prev_commitment: Digest, + height: u64, + ) -> Result<()> { + if epoch.height != *current_epoch { + return Err(anyhow!( + "Epoch height mismatch: expected {}, got {}", + current_epoch, + epoch.height + )); + } + if epoch.prev_commitment != prev_commitment { + return Err(anyhow!("Commitment mismatch at epoch {}", current_epoch)); + } + + while let Some(buffered_ops) = buffered_operations.pop_front() { + self.execute_block(buffered_ops, tree).await?; + } + + let new_commitment = tree.get_commitment()?; + if epoch.current_commitment != new_commitment { + return Err(anyhow!("Commitment mismatch at epoch {}", current_epoch)); + } + + debug!( + "Processed height {}. New commitment: {:?}", + height, new_commitment + ); + *current_epoch += 1; + Ok(()) + } + + async fn execute_block( + &self, + operations: Vec, + tree: &mut KeyDirectoryTree>, + ) -> Result> { + let mut proofs = Vec::new(); + + for operation in operations { + match self.process_operation(&operation, tree).await { + Ok(proof) => proofs.push(proof), + Err(e) => { + // Log the error and continue with the next operation + warn!("Failed to process operation: {:?}. Error: {}", operation, e); } } - }) - .await - } + } - pub async fn get_commitment(&self) -> Result { - let tree = self.tree.lock().await; - tree.get_commitment().context("Failed to get commitment") + Ok(proofs) } - // finalize_epoch is responsible for finalizing the pending epoch and returning the epoch json to be posted on the DA layer. - pub async fn finalize_epoch(&self, operations: Vec) -> Result { - let epoch = match self.db.get_epoch() { - Ok(epoch) => epoch + 1, - Err(_) => 0, - }; + async fn finalize_new_epoch( + &self, + height: u64, + operations: Vec, + tree: &mut KeyDirectoryTree>, + ) -> Result<()> { + let prev_commitment = tree.get_commitment()?; - let prev_commitment = if epoch > 0 { - let prev_epoch = epoch - 1; - let hash_string = self.db.get_commitment(&prev_epoch).context(format!( - "Failed to get commitment for previous epoch {}", - prev_epoch - ))?; - Digest::from_hex(&hash_string).context("Failed to parse commitment")? - } else { - self.get_commitment().await? - }; + let proofs = self.execute_block(operations, tree).await?; - let mut proofs = Vec::new(); - for entry in operations.iter() { - let proof = self.process_operation(entry).await?; - proofs.push(proof); - } + let new_commitment = tree.get_commitment()?; + self.db.set_commitment(&height, &new_commitment); - let current_commitment = { - let tree = self.tree.lock().await; - tree.get_commitment() - .context("Failed to get current commitment")? - }; + let finalized_epoch = self + .prove_epoch(height, prev_commitment, new_commitment, proofs) + .await?; - self.db - .set_epoch(&epoch) - .context("Failed to set new epoch")?; - // add the commitment for the operations ran since the last epoch - self.db - .set_commitment(&epoch, ¤t_commitment) - .context("Failed to add commitment for new epoch")?; + self.da.submit_snark(finalized_epoch).await?; + Ok(()) + } + async fn prove_epoch( + &self, + height: u64, + prev_commitment: Digest, + new_commitment: Digest, + proofs: Vec, + ) -> Result { let batch = Batch { prev_root: prev_commitment, - new_root: current_commitment, + new_root: new_commitment, proofs, }; @@ -330,21 +302,21 @@ impl Sequencer { let client = self.prover_client.lock().await; - info!("generating proof for epoch height {}", epoch); + info!("generating proof for epoch height {}", height); #[cfg(not(feature = "plonk"))] let proof = client.prove(&self.proving_key, stdin).run()?; #[cfg(feature = "plonk")] let proof = client.prove(&self.proving_key, stdin).plonk().run()?; - info!("successfully generated proof for epoch height {}", epoch); + info!("successfully generated proof for epoch height {}", height); client.verify(&proof, &self.verifying_key)?; - info!("verified proof for epoch height {}", epoch); + info!("verified proof for epoch height {}", height); let epoch_json = FinalizedEpoch { - height: epoch, + height, prev_commitment, - current_commitment, + current_commitment: new_commitment, proof, signature: None, }; @@ -360,28 +332,17 @@ impl Sequencer { Ok(epoch_json_with_signature) } - // receive_finalized_epochs empties the epoch buffer into a vector and returns it. - async fn receive_finalized_epochs(&self) -> Result> { - let mut epochs = Vec::new(); - let mut receiver = self.epoch_buffer_rx.lock().await; - - while let Ok(epoch) = receiver.try_recv() { - epochs.push(epoch); - } - - Ok(epochs) - } - - #[cfg(test)] - pub async fn send_finalized_epoch(&self, epoch: &FinalizedEpoch) -> Result<()> { - self.epoch_buffer_tx - .send(epoch.clone()) - .await - .map_err(|_| DataAvailabilityError::ChannelClosed.into()) + pub async fn get_commitment(&self) -> Result { + let tree = self.tree.lock().await; + tree.get_commitment().context("Failed to get commitment") } /// Updates the state from an already verified pending operation. - async fn process_operation(&self, operation: &Operation) -> Result { + async fn process_operation( + &self, + operation: &Operation, + tree: &mut KeyDirectoryTree>, + ) -> Result { match operation { Operation::Add { id, .. } | Operation::Revoke { id, .. } => { // verify that the hashchain already exists @@ -390,7 +351,6 @@ impl Sequencer { .get_hashchain(id) .context(format!("Failed to get hashchain for ID {}", id))?; - let mut tree = self.tree.lock().await; let hashed_id = hash(id.as_bytes()); let previous_hash = current_chain.last().context("Hashchain is empty")?.hash; @@ -441,7 +401,6 @@ impl Sequencer { operation ))?; - let mut tree = self.tree.lock().await; let hashed_id = hash(id.as_bytes()); Ok(Proof::Insert( @@ -463,7 +422,6 @@ impl Sequencer { Ok(()) } } - #[cfg(test)] mod tests { use super::*; @@ -476,16 +434,16 @@ mod tests { use keystore_rs::create_signing_key; use serial_test::serial; - // set up redis connection and flush database before each test + // Helper function to set up redis connection and flush database before each test fn setup_db() -> RedisConnection { let redis_connection = RedisConnection::new(&RedisConfig::default()).unwrap(); redis_connection.flush_database().unwrap(); redis_connection } - // flush database after each test - fn teardown_db(redis_connections: Arc>) { - redis_connections.flush_database().unwrap(); + // Helper function to flush database after each test + fn teardown_db(redis_connection: Arc) { + redis_connection.flush_database().unwrap(); } // Helper function to create a test Sequencer instance @@ -499,7 +457,7 @@ mod tests { ) } - fn create_new_account_operation(id: String, value: String, key: SigningKey) -> OperationInput { + fn create_new_account_operation(id: String, value: String, key: &SigningKey) -> OperationInput { let incoming = Operation::CreateAccount { id: id.clone(), value: value.clone(), @@ -533,321 +491,131 @@ mod tests { #[tokio::test] #[serial] async fn test_validate_and_queue_update() { - let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1); - let da_layer = Arc::new(da_layer); - let db: Arc> = Arc::new(Box::new(setup_db())); - let sequencer = Arc::new( - Sequencer::new( - db.clone(), - da_layer, - Config::default(), - create_signing_key(), - ) - .unwrap(), - ); + let sequencer = create_test_sequencer().await; let update_entry = - create_update_operation("test@deltadevs.xyz".to_string(), "test".to_string()); + create_update_operation("test@example.com".to_string(), "test".to_string()); sequencer + .clone() .validate_and_queue_update(&update_entry) .await .unwrap(); - teardown_db(db); + + let pending_ops = sequencer.pending_operations.lock().await; + assert_eq!(pending_ops.len(), 1); + + teardown_db(sequencer.db.clone()); } #[tokio::test] #[serial] - async fn test_queued_update_gets_finalized() { - let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1); - let da_layer = Arc::new(da_layer); - let db: Arc> = Arc::new(Box::new(setup_db())); - let signing_key = create_signing_key(); - let sequencer = Arc::new( - Sequencer::new(db.clone(), da_layer, Config::default(), signing_key.clone()).unwrap(), - ); - - let id = "test@deltadevs.xyz".to_string(); - let update_entry = - create_new_account_operation(id.clone(), "test".to_string(), signing_key.clone()); + async fn test_process_operation() { + let sequencer = create_test_sequencer().await; + let mut tree = sequencer.tree.lock().await; - sequencer - .clone() - .validate_and_queue_update(&update_entry) + // Test CreateAccount operation + let create_op = create_new_account_operation( + "user@example.com".to_string(), + "initial".to_string(), + &sequencer.key, + ) + .operation; + let proof = sequencer + .process_operation(&create_op, &mut tree) .await .unwrap(); + assert!(matches!(proof, Proof::Insert(_))); - // hashchain doesn't exist yet, because operation is only queued - let hashchain = sequencer.db.get_hashchain(id.as_str()); - assert!(hashchain.is_err()); - - let pending_operations = sequencer.pending_operations.lock().await.clone(); - let prev_commitment = sequencer.get_commitment().await.unwrap(); - sequencer.finalize_epoch(pending_operations).await.unwrap(); - let new_commitment = sequencer.get_commitment().await.unwrap(); - assert_ne!(prev_commitment, new_commitment); - - let hashchain = sequencer.db.get_hashchain(id.as_str()); - let value = hashchain.unwrap().get(0).operation.value(); - assert_eq!(value, "test"); - - teardown_db(db); - } - - #[tokio::test] - #[serial] - async fn test_validate_invalid_update_fails() { - let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1); - let da_layer = Arc::new(da_layer); - let db: Arc> = Arc::new(Box::new(setup_db())); - let sequencer = Arc::new( - Sequencer::new( - db.clone(), - da_layer, - Config::default(), - create_signing_key(), - ) - .unwrap(), - ); + // Test Add operation + let add_op = Operation::Add { + id: "user@example.com".to_string(), + value: "new_value".to_string(), + }; + let proof = sequencer + .process_operation(&add_op, &mut tree) + .await + .unwrap(); + assert!(matches!(proof, Proof::Update(_))); - let mut update_entry = - create_update_operation("test@deltadevs.xyz".to_string(), "test".to_string()); - let second_signer = - create_update_operation("abcd".to_string(), "test".to_string()).public_key; - update_entry.public_key = second_signer; + // Test Revoke operation + let revoke_op = Operation::Revoke { + id: "user@example.com".to_string(), + value: "initial".to_string(), + }; + let proof = sequencer + .process_operation(&revoke_op, &mut tree) + .await + .unwrap(); + assert!(matches!(proof, Proof::Update(_))); - let res = sequencer.validate_and_queue_update(&update_entry).await; - assert!(res.is_err()); - teardown_db(db); + teardown_db(sequencer.db.clone()); } #[tokio::test] #[serial] - async fn test_finalize_epoch_first_epoch() { + async fn test_execute_block() { let sequencer = create_test_sequencer().await; + let mut tree = sequencer.tree.lock().await; + let operations = vec![ create_new_account_operation( "user1@example.com".to_string(), "value1".to_string(), - sequencer.key.clone(), + &sequencer.key, ) .operation, create_new_account_operation( "user2@example.com".to_string(), "value2".to_string(), - sequencer.key.clone(), + &sequencer.key, ) .operation, + Operation::Add { + id: "user1@example.com".to_string(), + value: "new_value1".to_string(), + }, ]; - let prev_commitment = sequencer.get_commitment().await.unwrap(); - let epoch = sequencer.finalize_epoch(operations).await.unwrap(); - assert_eq!(epoch.height, 0); - assert_eq!(epoch.prev_commitment, prev_commitment); - assert_eq!( - epoch.current_commitment, - sequencer.get_commitment().await.unwrap() - ); + let proofs = sequencer + .execute_block(operations, &mut tree) + .await + .unwrap(); + assert_eq!(proofs.len(), 3); + + teardown_db(sequencer.db.clone()); } #[tokio::test] #[serial] - async fn test_finalize_epoch_multiple_epochs() { + async fn test_finalize_new_epoch() { let sequencer = create_test_sequencer().await; + let mut tree = sequencer.tree.lock().await; - // First epoch - let operations1 = vec![ + let operations = vec![ create_new_account_operation( "user1@example.com".to_string(), "value1".to_string(), - sequencer.key.clone(), + &sequencer.key, ) .operation, - ]; - let epoch1 = sequencer.finalize_epoch(operations1).await.unwrap(); - - // Second epoch - let operations2 = vec![ create_new_account_operation( "user2@example.com".to_string(), "value2".to_string(), - sequencer.key.clone(), + &sequencer.key, ) .operation, ]; - let epoch2 = sequencer.finalize_epoch(operations2).await.unwrap(); - - assert_eq!(epoch2.height, 1); - assert_eq!(epoch2.prev_commitment, epoch1.current_commitment); - } - - #[tokio::test] - #[serial] - async fn test_commitment_verification() { - let sequencer = create_test_sequencer().await; - - // First epoch - let operations1 = vec![ - create_new_account_operation( - "user1@example.com".to_string(), - "value1".to_string(), - sequencer.key.clone(), - ) - .operation, - ]; - let epoch1 = sequencer.finalize_epoch(operations1).await.unwrap(); - - let mut public_values = epoch1.proof.public_values.clone(); - let proof_prev_commitment: Digest = public_values.read(); - let proof_current_commitment: Digest = public_values.read(); - - assert_eq!( - &epoch1.prev_commitment, &proof_prev_commitment, - "Previous commitment mismatch" - ); - assert_eq!( - &epoch1.current_commitment, &proof_current_commitment, - "Current commitment mismatch" - ); - } - - #[tokio::test] - #[serial] - async fn test_process_operation_add() { - let sequencer = create_test_sequencer().await; - - // First, create an account - let create_op = create_new_account_operation( - "user@example.com".to_string(), - "initial".to_string(), - sequencer.key.clone(), - ) - .operation; - sequencer.process_operation(&create_op).await.unwrap(); - - // Then, add a new value - let add_op = Operation::Add { - id: "user@example.com".to_string(), - value: "new_value".to_string(), - }; - let proof = sequencer.process_operation(&add_op).await.unwrap(); - - assert!(matches!(proof, Proof::Update(_))); - let hashchain = sequencer.db.get_hashchain("user@example.com").unwrap(); - assert_eq!(hashchain.len(), 2); - assert_eq!(hashchain.get(1).operation.value(), "new_value"); - } - - #[tokio::test] - #[serial] - async fn test_process_operation_revoke() { - let sequencer = create_test_sequencer().await; - - // First, create an account - let create_op = create_new_account_operation( - "user@example.com".to_string(), - "initial".to_string(), - sequencer.key.clone(), - ) - .operation; - sequencer.process_operation(&create_op).await.unwrap(); - - // Then, revoke a value - let revoke_op = Operation::Revoke { - id: "user@example.com".to_string(), - value: "initial".to_string(), - }; - let proof = sequencer.process_operation(&revoke_op).await.unwrap(); - - assert!(matches!(proof, Proof::Update(_))); - - let hashchain = sequencer.db.get_hashchain("user@example.com").unwrap(); - assert_eq!(hashchain.len(), 2); - assert!(matches!( - hashchain.get(1).operation, - Operation::Revoke { .. } - )); - } - - #[tokio::test] - #[serial] - async fn test_process_operation_create_account_duplicate() { - let sequencer = create_test_sequencer().await; - - // Create an account - let create_op = create_new_account_operation( - "user@example.com".to_string(), - "initial".to_string(), - sequencer.key.clone(), - ) - .operation; - sequencer.process_operation(&create_op).await.unwrap(); - - // Try to create the same account again - let result = sequencer.process_operation(&create_op).await; - assert!(result.is_err()); - } - - #[tokio::test] - #[serial] - async fn test_receive_finalized_epochs() { - let sequencer = create_test_sequencer().await; - - // Create some realistic operations - let op1 = create_new_account_operation( - "user1@example.com".to_string(), - "value1".to_string(), - sequencer.key.clone(), - ) - .operation; - let op2 = create_new_account_operation( - "user2@example.com".to_string(), - "value2".to_string(), - sequencer.key.clone(), - ) - .operation; - let op3 = Operation::Add { - id: "user1@example.com".to_string(), - value: "new_value1".to_string(), - }; - - // Create FinalizedEpoch instances - let epoch1 = sequencer.finalize_epoch(vec![op1]).await.unwrap(); - let epoch2 = sequencer.finalize_epoch(vec![op2, op3]).await.unwrap(); - - // Send the epochs to the sequencer - sequencer.send_finalized_epoch(&epoch1).await.unwrap(); - sequencer.send_finalized_epoch(&epoch2).await.unwrap(); - - // Receive and verify the epochs - let received_epochs = sequencer.receive_finalized_epochs().await.unwrap(); - assert_eq!(received_epochs.len(), 2); - - // Verify first epoch - assert_eq!(received_epochs[0].height, epoch1.height); - assert_eq!(received_epochs[0].prev_commitment, epoch1.prev_commitment); - assert_eq!( - received_epochs[0].current_commitment, - epoch1.current_commitment - ); - - // Verify second epoch - assert_eq!(received_epochs[1].height, epoch2.height); - assert_eq!(received_epochs[1].prev_commitment, epoch2.prev_commitment); - assert_eq!( - received_epochs[1].current_commitment, - epoch2.current_commitment - ); + let prev_commitment = tree.get_commitment().unwrap(); + sequencer + .finalize_new_epoch(0, operations, &mut tree) + .await + .unwrap(); - // Verify that the epochs are connected - assert_eq!( - received_epochs[1].prev_commitment, - received_epochs[0].current_commitment - ); + let new_commitment = tree.get_commitment().unwrap(); + assert_ne!(prev_commitment, new_commitment); - // Verify that the buffer is now empty - let empty_epochs = sequencer.receive_finalized_epochs().await.unwrap(); - assert!(empty_epochs.is_empty()); + teardown_db(sequencer.db.clone()); } } diff --git a/crates/prism/src/storage.rs b/crates/prism/src/storage.rs index ff465f1..af2fd13 100644 --- a/crates/prism/src/storage.rs +++ b/crates/prism/src/storage.rs @@ -14,7 +14,7 @@ use std::{ time::Duration, }; -use crate::cfg::RedisConfig; +use crate::{cfg::RedisConfig, da::FinalizedEpoch}; use prism_common::{ hashchain::{Hashchain, HashchainEntry}, operation::Operation,