From 26d4c7e09f7f9499de54265612388e3f2f71608c Mon Sep 17 00:00:00 2001 From: sebasti810 Date: Mon, 29 Jul 2024 16:17:41 +0200 Subject: [PATCH] refac: sequencer and lc error handling, refactored sync loop --- src/node_types/lightclient.rs | 146 +++++++++++++++++---------------- src/node_types/sequencer.rs | 148 +++++++++++++++------------------- 2 files changed, 144 insertions(+), 150 deletions(-) diff --git a/src/node_types/lightclient.rs b/src/node_types/lightclient.rs index 2f91b2d..b75e1dd 100644 --- a/src/node_types/lightclient.rs +++ b/src/node_types/lightclient.rs @@ -2,7 +2,9 @@ use crate::{ cfg::CelestiaConfig, error::{DataAvailabilityError, GeneralError, PrismResult}, }; +use anyhow::Context; use async_trait::async_trait; +use celestia_types::Height; use std::{self, sync::Arc, time::Duration}; use tokio::{task::spawn, time::interval}; @@ -23,13 +25,15 @@ pub struct LightClient { impl NodeType for LightClient { async fn start(self: Arc) -> PrismResult<()> { // start listening for new headers to update sync target - match self.da.start().await { - Ok(_) => (), - Err(e) => return Err(DataAvailabilityError::InitializationError(e.to_string()).into()), - }; + self.da + .start() + .await + .context("Failed to start DataAvailabilityLayer") + .map_err(|e| DataAvailabilityError::InitializationError(e.to_string()))?; self.sync_loop() .await + .context("Sync loop failed") .map_err(|e| GeneralError::InitializationError(e.to_string()).into()) } } @@ -64,75 +68,81 @@ impl LightClient { }; debug!("updated sync target to height {}", target); - for i in current_position..target { - trace!("processing height: {}", i); - match self.da.get(i + 1).await { - Ok(epoch_json_vec) => { - if !epoch_json_vec.is_empty() { - debug!("light client: got epochs at height {}", i + 1); - } - - // todo: verify adjacency to last heights, <- for this we need some sort of storage of epochs - for epoch_json in epoch_json_vec { - let prev_commitment = &epoch_json.prev_commitment; - let current_commitment = &epoch_json.current_commitment; - - let proof = match epoch_json.proof.clone().try_into() { - Ok(proof) => proof, - Err(e) => { - error!("failed to deserialize proof, skipping a blob at height {}: {:?}", i, e); - continue; - } - }; - - // TODO(@distractedm1nd): i don't know rust yet but this seems like non-idiomatic rust - - // is there not a Trait that can satisfy these properties for us? - let verifying_key = match epoch_json.verifying_key.clone().try_into() { - Ok(vk) => vk, - Err(e) => { - error!("failed to deserialize verifying key, skipping a blob at height {}: {:?}", i, e); - continue; - } - }; - - // if the user does not add a verifying key, we will not verify the signature, - // but only log a warning on startup - if self.verifying_key.is_some() { - match verify_signature( - &epoch_json.clone(), - self.verifying_key.clone(), - ) { - Ok(_) => trace!("valid signature for epoch {}", epoch_json.height), - Err(e) => { - panic!("invalid signature in epoch {}: {:?}", i, e) - } - } - } - - match validate_epoch( - prev_commitment, - current_commitment, - proof, - verifying_key, - ) { - Ok(_) => { - info!( - "zkSNARK for epoch {} was validated successfully", - epoch_json.height - ) - } - Err(err) => panic!("failed to validate epoch: {:?}", err), - } - } - } + + for height in current_position..target { + match self.process_height(height + 1).await { + Ok(_) => {} Err(e) => { - debug!("light client: getting epoch: {}", e) + error!("Error processing height {}: {:?}", height + 1, e); + // @distractedm1nd: should we break the loop here? or continue? retry? + continue; } - }; + } } + ticker.tick().await; // only for testing purposes current_position = target; // Update the current position to the latest target } - }).await + }) + .await + } + + async fn process_height(&self, height: u64) -> PrismResult<()> { + let epoch_json_vec = self + .da + .get(height) + .await + .context(format!("Failed to get epoch at height {}", height))?; + + if !epoch_json_vec.is_empty() { + debug!("Light client: got epochs at height {}", height); + } + + for epoch_json in epoch_json_vec { + self.process_epoch(epoch_json, height).await?; + } + + Ok(()) + } + + async fn process_epoch( + &self, + epoch_json: crate::da::FinalizedEpoch, + height: u64, + ) -> PrismResult<()> { + let proof = epoch_json + .proof + .clone() + .try_into() + .context(format!("Failed to deserialize proof at height {}", height))?; + + let verifying_key = epoch_json + .verifying_key + .clone() + .try_into() + .context(format!( + "Failed to deserialize verifying key at height {}", + height + ))?; + + if let Some(ref vk) = self.verifying_key { + verify_signature(&epoch_json, Some(vk.clone())) + .context(format!("Invalid signature in epoch at height {}", height))?; + trace!("Valid signature for epoch {}", epoch_json.height); + } + + validate_epoch( + &epoch_json.prev_commitment, + &epoch_json.current_commitment, + proof, + verifying_key, + ) + .context(format!("Failed to validate epoch at height {}", height))?; + + info!( + "zkSNARK for epoch {} was validated successfully", + epoch_json.height + ); + Ok(()) } } diff --git a/src/node_types/sequencer.rs b/src/node_types/sequencer.rs index 2804ab2..4baae6a 100644 --- a/src/node_types/sequencer.rs +++ b/src/node_types/sequencer.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use async_trait::async_trait; use ed25519::Signature; use ed25519_dalek::{Signer, SigningKey}; @@ -47,9 +48,7 @@ pub struct Sequencer { #[async_trait] impl NodeType for Sequencer { async fn start(self: Arc) -> PrismResult<()> { - if let Err(e) = self.da.start().await { - return Err(DataAvailabilityError::InitializationError(e.to_string()).into()); - } + self.da.start().await.context("Failed to start DA layer")?; let main_loop = self.clone().main_loop(); let da_loop = self.clone().da_loop(); @@ -58,9 +57,9 @@ impl NodeType for Sequencer { let ws = ws_self.ws.start(self.clone()); tokio::select! { - _ = main_loop => Ok(()), - _ = da_loop => Ok(()), - _ = ws => Ok(()), + res = main_loop => Ok(res.context("Main loop failed")?), + res = da_loop => Ok(res.context("DA loop failed")?), + res = ws => Ok(res.context("WebServer failed")?), } } } @@ -74,27 +73,17 @@ impl Sequencer { ) -> PrismResult { let (tx, rx) = channel(CHANNEL_BUFFER_SIZE); - let epoch_duration = match cfg.epoch_time { - Some(epoch_time) => epoch_time, - None => { - return Err(GeneralError::MissingArgumentError("epoch_time".to_string()).into()); - } - }; + let epoch_duration = cfg + .epoch_time + .context("Missing epoch_time in configuration")?; - let ws = match cfg.webserver { - Some(webserver) => WebServer::new(webserver), - None => { - return Err( - GeneralError::MissingArgumentError("webserver config".to_string()).into(), - ); - } - }; + let ws = cfg.webserver.context("Missing webserver configuration")?; Ok(Sequencer { db, da, epoch_duration, - ws, + ws: WebServer::new(ws), key, tree: Arc::new(Mutex::new(IndexedMerkleTree::new_with_size(1024).unwrap())), pending_entries: Arc::new(Mutex::new(Vec::new())), @@ -178,29 +167,22 @@ impl Sequencer { pub async fn get_commitment(&self) -> PrismResult { let tree = self.tree.lock().await; - tree.get_commitment().map_err(|e| e.into()) + tree.get_commitment() + .context("Failed to get commitment") + .map_err(|e| e.into()) } // finalize_epoch is responsible for finalizing the pending epoch and returning the epoch json to be posted on the DA layer. pub async fn finalize_epoch(&self) -> PrismResult { - let epoch = match self.db.get_epoch() { - Ok(epoch) => epoch + 1, - Err(_) => 0, - }; + let epoch = self.db.get_epoch().unwrap_or(0) + 1; let prev_commitment = if epoch > 0 { let prev_epoch = epoch - 1; - match self.db.get_commitment(&prev_epoch) { - Ok(commitment) => Hash::from_hex(commitment.as_str()).unwrap(), - Err(e) => { - return Err(DatabaseError::ReadError(format!( - "commitment for prev epoch {:?}: {:?}", - prev_epoch, - e.to_string() - )) - .into()); - } - } + let hash_string = self.db.get_commitment(&prev_epoch).context(format!( + "Failed to get commitment for previous epoch {}", + prev_epoch + ))?; + Hash::from_hex(&hash_string).context("Failed to parse commitment")? } else { self.get_commitment().await? }; @@ -209,16 +191,24 @@ impl Sequencer { let current_commitment = { let tree = self.tree.lock().await; - tree.get_commitment().map_err(PrismError::MerkleTree)? + tree.get_commitment() + .context("Failed to get current commitment")? }; - self.db.set_epoch(&epoch)?; + self.db + .set_epoch(&epoch) + .context("Failed to set new epoch")?; // add the commitment for the operations ran since the last epoch - self.db.add_commitment(&epoch, ¤t_commitment)?; + self.db + .add_commitment(&epoch, ¤t_commitment) + .context("Failed to add commitment for new epoch")?; let batch_circuit = - BatchMerkleProofCircuit::new(&prev_commitment, ¤t_commitment, proofs)?; - let (proof, verifying_key) = batch_circuit.create_and_verify_snark()?; + BatchMerkleProofCircuit::new(&prev_commitment, ¤t_commitment, proofs) + .context("Failed to create BatchMerkleProofCircuit")?; + let (proof, verifying_key) = batch_circuit + .create_and_verify_snark() + .context("Failed to create and verify snark")?; let epoch_json = FinalizedEpoch { height: epoch, @@ -229,8 +219,8 @@ impl Sequencer { signature: None, }; - let serialized_epoch_json_without_signature = borsh::to_vec(&epoch_json) - .map_err(|e| GeneralError::ParsingError(format!("epoch: {}", e)))?; + let serialized_epoch_json_without_signature = + borsh::to_vec(&epoch_json).context("Failed to serialize epoch json")?; let signature = self .key .sign(serialized_epoch_json_without_signature.as_slice()) @@ -241,10 +231,13 @@ impl Sequencer { } async fn receive_finalized_epoch(&self) -> PrismResult { - match self.epoch_buffer_rx.lock().await.recv().await { - Some(epoch) => Ok(epoch), - None => Err(DataAvailabilityError::ChannelReceiveError.into()), - } + self.epoch_buffer_rx + .lock() + .await + .recv() + .await + .context("Failed to receive finalized epoch") + .map_err(|e| e.into()) } async fn finalize_pending_entries(&self) -> PrismResult> { @@ -263,22 +256,20 @@ impl Sequencer { match operation { Operation::Add { id, value } | Operation::Revoke { id, value } => { // verify that the hashchain already exists - let mut current_chain = self.db.get_hashchain(id).map_err(|e| { - DatabaseError::NotFoundError(format!("hashchain for ID {}: {}", id, e)) - })?; + let mut current_chain = self + .db + .get_hashchain(id) + .context(format!("Failed to get hashchain for ID {}", id))?; let mut tree = self.tree.lock().await; let hashed_id = sha256_mod(id.as_bytes()); - let node = tree.find_leaf_by_label(&hashed_id).ok_or_else(|| { - // TODO: before merging, change error type - GeneralError::DecodingError(format!( - "node with label {} not found in the tree", - hashed_id - )) - })?; + let node = tree.find_leaf_by_label(&hashed_id).context(format!( + "Node with label {} not found in the tree", + hashed_id + ))?; - let previous_hash = current_chain.last().unwrap().hash; + let previous_hash = current_chain.last().context("Hashchain is empty")?.hash; let new_chain_entry = HashchainEntry::new(operation.clone(), previous_hash); current_chain.push(new_chain_entry.clone()); @@ -290,25 +281,22 @@ impl Sequencer { node.get_next(), ); - let index = tree.find_node_index(&node).ok_or_else(|| { - GeneralError::DecodingError(format!( - "node with label {} not found in the tree, but has a hashchain entry", - hashed_id - )) - })?; + let index = tree.find_node_index(&node).context(format!( + "Node with label {} not found in the tree, but has a hashchain entry", + hashed_id + ))?; self.db .update_hashchain(operation, ¤t_chain) - .map_err(|e| { - PrismError::Database(DatabaseError::WriteError(format!( - "hashchain for incoming operation {:?}: {:?}", - operation, e - ))) - })?; + .context(format!( + "Failed to update hashchain for operation {:?}", + operation + ))?; // TODO: Possible optimization: cache the last update proof for each id for serving the proofs tree.update_node(index, updated_node) .map(Proof::Update) + .context("Failed to update node in tree") .map_err(|e| e.into()) } Operation::CreateAccount { id, value, source } => { @@ -316,11 +304,8 @@ impl Sequencer { match source { // TODO: use Signature, not String AccountSource::SignedBySequencer { signature } => { - let sig = Signature::from_str(signature).map_err(|_| { - PrismError::General(GeneralError::ParsingError( - "sequencer's signature on operation".to_string(), - )) - })?; + let sig = Signature::from_str(signature) + .context("Failed to parse sequencer's signature")?; self.key .verify(format!("{}{}", id, value).as_bytes(), &sig) .map_err(|e| PrismError::General(GeneralError::InvalidSignature(e))) @@ -342,12 +327,10 @@ impl Sequencer { // question: why do we not need to do this for Operation::Add/Revoke as well? self.db .update_hashchain(operation, &new_chain) - .map_err(|e| { - PrismError::Database(DatabaseError::WriteError(format!( - "hashchain for incoming operation {:?}: {:?}", - operation, e - ))) - })?; + .context(format!( + "Failed to create hashchain for operation {:?}", + operation + ))?; let mut tree = self.tree.lock().await; let hashed_id = sha256_mod(id.as_bytes()); @@ -356,6 +339,7 @@ impl Sequencer { Node::new_leaf(true, hashed_id, new_chain.first().unwrap().hash, Node::TAIL); tree.insert_node(&mut node) .map(Proof::Insert) + .context("Failed to insert node into tree") .map_err(|e| e.into()) } }