diff --git a/Cargo.toml b/Cargo.toml index b7901f9a..d9b56a64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,4 +52,4 @@ keystore = { git = "https://github.com/deltadevsde/keystore", rev = "40f21a41afd pyroscope = "0.5.7" pyroscope_pprofrs = "0.2.7" toml = "0.8.14" -dirs = "5.0.1" \ No newline at end of file +dirs = "5.0.1" diff --git a/src/cfg.rs b/src/cfg.rs index 4f01f187..73ec25b8 100644 --- a/src/cfg.rs +++ b/src/cfg.rs @@ -5,7 +5,9 @@ use dotenvy::dotenv; use serde::{Deserialize, Serialize}; use std::{fs, path::Path, sync::Arc}; -use crate::da::{CelestiaConnection, DataAvailabilityLayer, LocalDataAvailabilityLayer}; +use crate::da::{ + celestia::CelestiaConnection, mock::LocalDataAvailabilityLayer, DataAvailabilityLayer, +}; #[derive(Clone, Debug, Subcommand, Deserialize)] pub enum Commands { diff --git a/src/da/celestia.rs b/src/da/celestia.rs new file mode 100644 index 00000000..a52698c3 --- /dev/null +++ b/src/da/celestia.rs @@ -0,0 +1,205 @@ +use crate::da::{DataAvailabilityLayer, EpochJson}; +use crate::{ + consts::CHANNEL_BUFFER_SIZE, + error::{DAResult, DataAvailabilityError, GeneralError}, +}; +use async_trait::async_trait; +use celestia_rpc::{BlobClient, Client, HeaderClient}; +use celestia_types::{blob::GasPrice, nmt::Namespace, Blob}; +use std::{self, sync::Arc}; +use tokio::{ + sync::{ + mpsc::{channel, Receiver, Sender}, + Mutex, + }, + task::spawn, +}; + +impl TryFrom<&Blob> for EpochJson { + type Error = GeneralError; + + fn try_from(value: &Blob) -> Result { + // convert blob data to utf8 string + let data_str = String::from_utf8(value.data.clone()).map_err(|e| { + GeneralError::EncodingError(format!("encoding blob data to utf8 string: {}", e)) + })?; + + serde_json::from_str(&data_str) + .map_err(|e| GeneralError::DecodingError(format!("epoch json: {}", e))) + } +} + +pub struct CelestiaConnection { + pub client: celestia_rpc::Client, + pub namespace_id: Namespace, + + synctarget_tx: Arc>, + synctarget_rx: Arc>>, +} + +impl CelestiaConnection { + // TODO: Should take config + pub async fn new( + connection_string: &String, + auth_token: Option<&str>, + namespace_hex: &String, + ) -> DAResult { + let (tx, rx) = channel(CHANNEL_BUFFER_SIZE); + + let client = Client::new(&connection_string, auth_token) + .await + .map_err(|e| { + DataAvailabilityError::ConnectionError(format!( + "websocket initialization failed: {}", + e + )) + })?; + + let decoded_hex = match hex::decode(namespace_hex) { + Ok(hex) => hex, + Err(e) => { + return Err(DataAvailabilityError::GeneralError( + GeneralError::DecodingError(format!( + "decoding namespace '{}': {}", + namespace_hex, e + )), + )) + } + }; + + let namespace_id = Namespace::new_v0(&decoded_hex).map_err(|e| { + DataAvailabilityError::GeneralError(GeneralError::EncodingError(format!( + "creating namespace '{}': {}", + namespace_hex, e + ))) + })?; + + Ok(CelestiaConnection { + client, + namespace_id, + synctarget_tx: Arc::new(tx), + synctarget_rx: Arc::new(Mutex::new(rx)), + }) + } +} + +#[async_trait] +impl DataAvailabilityLayer for CelestiaConnection { + async fn get_message(&self) -> DAResult { + match self.synctarget_rx.lock().await.recv().await { + Some(height) => Ok(height), + None => Err(DataAvailabilityError::ChannelReceiveError), + } + } + + async fn initialize_sync_target(&self) -> DAResult { + match HeaderClient::header_network_head(&self.client).await { + Ok(extended_header) => Ok(extended_header.header.height.value()), + Err(err) => Err(DataAvailabilityError::NetworkError(format!( + "getting network head from da layer: {}", + err + ))), + } + } + + async fn get(&self, height: u64) -> DAResult> { + trace!("searching for epoch on da layer at height {}", height); + match BlobClient::blob_get_all(&self.client, height, &[self.namespace_id]).await { + Ok(blobs) => { + let mut epochs = Vec::new(); + for blob in blobs.iter() { + match EpochJson::try_from(blob) { + Ok(epoch_json) => epochs.push(epoch_json), + Err(_) => { + DataAvailabilityError::GeneralError(GeneralError::ParsingError( + format!( + "marshalling blob from height {} to epoch json: {}", + height, + serde_json::to_string(&blob).unwrap() + ), + )); + } + } + } + Ok(epochs) + } + Err(err) => { + // todo: this is a hack to handle a retarded error from cel-node that will be fixed in v0.15.0 + if err.to_string().contains("blob: not found") { + Ok(vec![]) + } else { + Err(DataAvailabilityError::DataRetrievalError( + height, + format!("getting epoch from da layer: {}", err), + )) + } + } + } + } + + async fn submit(&self, epoch: &EpochJson) -> DAResult { + debug!("posting epoch {} to da layer", epoch.height); + + let data = serde_json::to_string(&epoch).map_err(|e| { + DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!( + "serializing epoch json: {}", + e + ))) + })?; + let blob = Blob::new(self.namespace_id.clone(), data.into_bytes()).map_err(|e| { + DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(e.to_string())) + })?; + trace!("blob: {:?}", serde_json::to_string(&blob).unwrap()); + match self + .client + .blob_submit(&[blob.clone()], GasPrice::from(-1.0)) + .await + { + Ok(height) => Ok(height), + Err(err) => Err(DataAvailabilityError::SubmissionError( + epoch.height, + err.to_string(), + )), + } + } + + async fn start(&self) -> DAResult<()> { + let mut header_sub = HeaderClient::header_subscribe(&self.client) + .await + .map_err(|e| { + DataAvailabilityError::NetworkError(format!( + "subscribing to headers from da layer: {}", + e + )) + })?; + + let synctarget_buffer = self.synctarget_tx.clone(); + spawn(async move { + while let Some(extended_header_result) = header_sub.next().await { + match extended_header_result { + Ok(extended_header) => { + let height = extended_header.header.height.value(); + match synctarget_buffer.send(height).await { + Ok(_) => { + debug!("sent sync target update for height {}", height); + } + Err(_) => { + DataAvailabilityError::SyncTargetError(format!( + "sending sync target update message for height {}", + height + )); + } + } + } + Err(e) => { + DataAvailabilityError::NetworkError(format!( + "retrieving header from da layer: {}", + e + )); + } + } + } + }); + Ok(()) + } +} diff --git a/src/da.rs b/src/da/mock.rs similarity index 57% rename from src/da.rs rename to src/da/mock.rs index e1c7a194..492d489b 100644 --- a/src/da.rs +++ b/src/da/mock.rs @@ -1,95 +1,14 @@ -use crate::{ - consts::CHANNEL_BUFFER_SIZE, - error::{DAResult, DataAvailabilityError, DeimosResult, GeneralError}, - utils::Signable, - zk_snark::{Bls12Proof, VerifyingKey}, -}; +use crate::error::{DAResult, DataAvailabilityError}; use async_trait::async_trait; -use celestia_rpc::{BlobClient, Client, HeaderClient}; -use celestia_types::{blob::GasPrice, nmt::Namespace, Blob}; -use ed25519::Signature; use fs2::FileExt; -use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::{ self, fs::{File, OpenOptions}, io::{Read, Seek, Write}, - str::FromStr, - sync::Arc, -}; -use tokio::{ - sync::{ - mpsc::{channel, Receiver, Sender}, - Mutex, - }, - task::spawn, }; -#[derive(Serialize, Deserialize, Clone)] -pub struct EpochJson { - pub height: u64, - pub prev_commitment: String, - pub current_commitment: String, - pub proof: Bls12Proof, - pub verifying_key: VerifyingKey, - pub signature: Option, -} - -impl TryFrom<&Blob> for EpochJson { - type Error = GeneralError; - - fn try_from(value: &Blob) -> Result { - // convert blob data to utf8 string - let data_str = String::from_utf8(value.data.clone()).map_err(|e| { - GeneralError::EncodingError(format!("encoding blob data to utf8 string: {}", e)) - })?; - - serde_json::from_str(&data_str) - .map_err(|e| GeneralError::DecodingError(format!("epoch json: {}", e))) - } -} - -impl Signable for EpochJson { - fn get_signature(&self) -> DeimosResult { - match &self.signature { - Some(signature) => Signature::from_str(signature) - .map_err(|e| GeneralError::ParsingError(format!("signature: {}", e)).into()), - None => Err(GeneralError::MissingArgumentError("signature".to_string()).into()), - } - } - - fn get_content_to_sign(&self) -> DeimosResult { - let mut copy = self.clone(); - copy.signature = None; - serde_json::to_string(©).map_err(|e| GeneralError::EncodingError(e.to_string()).into()) - } - - fn get_public_key(&self) -> DeimosResult { - //TODO(@distractedm1nd): the below comment isn't good enough of an argument to not return the public key, it should be fixed - - // for epoch json the public key to verify is the one from the sequencer which should be already be public and known from every light client - // so if we use this function there should be an error - Err(GeneralError::MissingArgumentError("public key".to_string()).into()) - } -} - -#[async_trait] -pub trait DataAvailabilityLayer: Send + Sync { - async fn get_message(&self) -> DAResult; - async fn initialize_sync_target(&self) -> DAResult; - async fn get(&self, height: u64) -> DAResult>; - async fn submit(&self, epoch: &EpochJson) -> DAResult; - async fn start(&self) -> DAResult<()>; -} - -pub struct CelestiaConnection { - pub client: celestia_rpc::Client, - pub namespace_id: Namespace, - - synctarget_tx: Arc>, - synctarget_rx: Arc>>, -} +use crate::da::{DataAvailabilityLayer, EpochJson}; /// The `NoopDataAvailabilityLayer` is a mock implementation of the `DataAvailabilityLayer` trait. pub struct NoopDataAvailabilityLayer {} @@ -124,173 +43,6 @@ impl DataAvailabilityLayer for NoopDataAvailabilityLayer { /// This implementation is intended for testing and development only and should not be used in production environments. It provides a way to test the interactions with the data availability layer without the overhead of real network communication or data persistence. pub struct LocalDataAvailabilityLayer {} -impl CelestiaConnection { - // TODO: Should take config - pub async fn new( - connection_string: &String, - auth_token: Option<&str>, - namespace_hex: &String, - ) -> DAResult { - let (tx, rx) = channel(CHANNEL_BUFFER_SIZE); - - let client = Client::new(&connection_string, auth_token) - .await - .map_err(|e| { - DataAvailabilityError::ConnectionError(format!( - "websocket initialization failed: {}", - e - )) - })?; - - let decoded_hex = match hex::decode(namespace_hex) { - Ok(hex) => hex, - Err(e) => { - return Err(DataAvailabilityError::GeneralError( - GeneralError::DecodingError(format!( - "decoding namespace '{}': {}", - namespace_hex, e - )), - )) - } - }; - - let namespace_id = Namespace::new_v0(&decoded_hex).map_err(|e| { - DataAvailabilityError::GeneralError(GeneralError::EncodingError(format!( - "creating namespace '{}': {}", - namespace_hex, e - ))) - })?; - - Ok(CelestiaConnection { - client, - namespace_id, - synctarget_tx: Arc::new(tx), - synctarget_rx: Arc::new(Mutex::new(rx)), - }) - } -} - -#[async_trait] -impl DataAvailabilityLayer for CelestiaConnection { - async fn get_message(&self) -> DAResult { - match self.synctarget_rx.lock().await.recv().await { - Some(height) => Ok(height), - None => Err(DataAvailabilityError::ChannelReceiveError), - } - } - - async fn initialize_sync_target(&self) -> DAResult { - match HeaderClient::header_network_head(&self.client).await { - Ok(extended_header) => Ok(extended_header.header.height.value()), - Err(err) => Err(DataAvailabilityError::NetworkError(format!( - "getting network head from da layer: {}", - err - ))), - } - } - - async fn get(&self, height: u64) -> DAResult> { - trace!("searching for epoch on da layer at height {}", height); - match BlobClient::blob_get_all(&self.client, height, &[self.namespace_id]).await { - Ok(blobs) => { - let mut epochs = Vec::new(); - for blob in blobs.iter() { - match EpochJson::try_from(blob) { - Ok(epoch_json) => epochs.push(epoch_json), - Err(_) => { - DataAvailabilityError::GeneralError(GeneralError::ParsingError( - format!( - "marshalling blob from height {} to epoch json: {}", - height, - serde_json::to_string(&blob).unwrap() - ), - )); - } - } - } - Ok(epochs) - } - Err(err) => { - // todo: this is a hack to handle a retarded error from cel-node that will be fixed in v0.15.0 - if err.to_string().contains("blob: not found") { - Ok(vec![]) - } else { - Err(DataAvailabilityError::DataRetrievalError( - height, - format!("getting epoch from da layer: {}", err), - )) - } - } - } - } - - async fn submit(&self, epoch: &EpochJson) -> DAResult { - debug!("posting epoch {} to da layer", epoch.height); - - let data = serde_json::to_string(&epoch).map_err(|e| { - DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!( - "serializing epoch json: {}", - e - ))) - })?; - let blob = Blob::new(self.namespace_id.clone(), data.into_bytes()).map_err(|e| { - DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(e.to_string())) - })?; - trace!("blob: {:?}", serde_json::to_string(&blob).unwrap()); - match self - .client - .blob_submit(&[blob.clone()], GasPrice::from(-1.0)) - .await - { - Ok(height) => Ok(height), - Err(err) => Err(DataAvailabilityError::SubmissionError( - epoch.height, - err.to_string(), - )), - } - } - - async fn start(&self) -> DAResult<()> { - let mut header_sub = HeaderClient::header_subscribe(&self.client) - .await - .map_err(|e| { - DataAvailabilityError::NetworkError(format!( - "subscribing to headers from da layer: {}", - e - )) - })?; - - let synctarget_buffer = self.synctarget_tx.clone(); - spawn(async move { - while let Some(extended_header_result) = header_sub.next().await { - match extended_header_result { - Ok(extended_header) => { - let height = extended_header.header.height.value(); - match synctarget_buffer.send(height).await { - Ok(_) => { - debug!("sent sync target update for height {}", height); - } - Err(_) => { - DataAvailabilityError::SyncTargetError(format!( - "sending sync target update message for height {}", - height - )); - } - } - } - Err(e) => { - DataAvailabilityError::NetworkError(format!( - "retrieving header from da layer: {}", - e - )); - } - } - } - }); - Ok(()) - } -} - impl LocalDataAvailabilityLayer { pub fn new() -> Self { LocalDataAvailabilityLayer {} @@ -379,12 +131,12 @@ impl DataAvailabilityLayer for LocalDataAvailabilityLayer { } #[cfg(test)] -mod da_tests { +mod tests { use crate::{ utils::validate_epoch, zk_snark::{ deserialize_custom_to_verifying_key, deserialize_proof, serialize_proof, - serialize_verifying_key_to_custom, BatchMerkleProofCircuit, VerifyingKey, + serialize_verifying_key_to_custom, BatchMerkleProofCircuit, Bls12Proof, VerifyingKey, }, }; diff --git a/src/da/mod.rs b/src/da/mod.rs new file mode 100644 index 00000000..4f8df6c2 --- /dev/null +++ b/src/da/mod.rs @@ -0,0 +1,55 @@ +use crate::{ + error::{DAResult, DeimosResult, GeneralError}, + utils::Signable, + zk_snark::{Bls12Proof, VerifyingKey}, +}; +use async_trait::async_trait; +use ed25519::Signature; +use serde::{Deserialize, Serialize}; +use std::{self, str::FromStr}; + +pub mod celestia; +pub mod mock; + +#[derive(Serialize, Deserialize, Clone)] +pub struct EpochJson { + pub height: u64, + pub prev_commitment: String, + pub current_commitment: String, + pub proof: Bls12Proof, + pub verifying_key: VerifyingKey, + pub signature: Option, +} + +impl Signable for EpochJson { + fn get_signature(&self) -> DeimosResult { + match &self.signature { + Some(signature) => Signature::from_str(signature) + .map_err(|e| GeneralError::ParsingError(format!("signature: {}", e)).into()), + None => Err(GeneralError::MissingArgumentError("signature".to_string()).into()), + } + } + + fn get_content_to_sign(&self) -> DeimosResult { + let mut copy = self.clone(); + copy.signature = None; + serde_json::to_string(©).map_err(|e| GeneralError::EncodingError(e.to_string()).into()) + } + + fn get_public_key(&self) -> DeimosResult { + //TODO(@distractedm1nd): the below comment isn't good enough of an argument to not return the public key, it should be fixed + + // for epoch json the public key to verify is the one from the sequencer which should be already be public and known from every light client + // so if we use this function there should be an error + Err(GeneralError::MissingArgumentError("public key".to_string()).into()) + } +} + +#[async_trait] +pub trait DataAvailabilityLayer: Send + Sync { + async fn get_message(&self) -> DAResult; + async fn initialize_sync_target(&self) -> DAResult; + async fn get(&self, height: u64) -> DAResult>; + async fn submit(&self, epoch: &EpochJson) -> DAResult; + async fn start(&self) -> DAResult<()>; +} diff --git a/src/main.rs b/src/main.rs index 07c6c61a..75bb7edb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,7 @@ use clap::Parser; use keystore::{KeyChain, KeyStore, KeyStoreType}; use crate::cfg::{CommandLineArgs, Commands}; -use node_types::{LightClient, NodeType, Sequencer}; +use node_types::{lightclient::LightClient, sequencer::Sequencer, NodeType}; use std::sync::Arc; use storage::RedisConnections; diff --git a/src/node_types/lightclient.rs b/src/node_types/lightclient.rs new file mode 100644 index 00000000..4d99cf43 --- /dev/null +++ b/src/node_types/lightclient.rs @@ -0,0 +1,123 @@ +use crate::{ + cfg::CelestiaConfig, + error::{DataAvailabilityError, DeimosResult}, +}; +use async_trait::async_trait; +use std::{self, sync::Arc, time::Duration}; +use tokio::{task::spawn, time::interval}; + +use crate::{ + da::DataAvailabilityLayer, + node_types::NodeType, + utils::{validate_epoch, verify_signature}, + zk_snark::{deserialize_custom_to_verifying_key, deserialize_proof}, +}; + +pub struct LightClient { + pub da: Arc, + // verifying_key is the [`VerifyingKey`] used to verify epochs from the prover/sequencer + pub verifying_key: Option, + start_height: u64, +} + +#[async_trait] +impl NodeType for LightClient { + async fn start(self: Arc) -> DeimosResult<()> { + // start listening for new headers to update sync target + self.da.start().await.unwrap(); + + info!("starting main light client loop"); + + // todo: persist current_position in datastore + let start_height = self.start_height; + spawn(async move { + let mut current_position = start_height; + let mut ticker = interval(Duration::from_secs(1)); + loop { + // target is updated when a new header is received + let target = self.da.get_message().await.unwrap(); + debug!("updated sync target to height {}", target); + for i in current_position..target { + trace!("processing height: {}", i); + match self.da.get(i + 1).await { + Ok(epoch_json_vec) => { + if epoch_json_vec.len() > 0 { + debug!("light client: got epochs at height {}", i + 1); + } + + // Verify adjacency to last heights, <- for this we need some sort of storage of epochs + // Verify zk proofs, + for epoch_json in epoch_json_vec { + let prev_commitment = &epoch_json.prev_commitment; + let current_commitment = &epoch_json.current_commitment; + let proof = match deserialize_proof(&epoch_json.proof) { + Ok(proof) => proof, + Err(e) => { + error!("failed to deserialize proof, skipping a blob at height {}: {:?}", i, e); + continue; + }, + }; + + // TODO(@distractedm1nd): i don't know rust yet but this seems like non-idiomatic rust - + // is there not a Trait that can satisfy these properties for us? + let verifying_key = match deserialize_custom_to_verifying_key(&epoch_json.verifying_key) { + Ok(vk) => vk, + Err(e) => { + error!("failed to deserialize verifying key, skipping a blob at height {}: {:?}", i, e); + continue; + }, + }; + + // if the user does not add a verifying key, we will not verify the signature, + // but only log a warning on startup + if self.verifying_key.is_some() { + match verify_signature(&epoch_json.clone(), self.verifying_key.clone()) { + Ok(i) => trace!("valid signature for epoch {}", i), + Err(e) => { + panic!("invalid signature in epoch {}: {:?}", i, e) + } + } + } + + match validate_epoch( + &prev_commitment, + ¤t_commitment, + proof, + verifying_key, + ) { + Ok(_) => { + info!("zkSNARK for epoch {} was validated successfully", epoch_json.height) + } + Err(err) => panic!("failed to validate epoch: {:?}", err), + } + } + } + Err(e) => { + debug!("light client: getting epoch: {}", e) + } + }; + } + ticker.tick().await; // only for testing purposes + current_position = target; // Update the current position to the latest target + } + }) + .await + .map_err(|_| { + DataAvailabilityError::InitializationError("failed to initialize".to_string()).into() + }) + } +} + +impl LightClient { + pub fn new( + da: Arc, + cfg: CelestiaConfig, + sequencer_pub_key: Option, + ) -> LightClient { + LightClient { + da, + verifying_key: sequencer_pub_key, + start_height: cfg.start_height, + } + } +} diff --git a/src/node_types/mod.rs b/src/node_types/mod.rs new file mode 100644 index 00000000..c5e7a2f9 --- /dev/null +++ b/src/node_types/mod.rs @@ -0,0 +1,75 @@ +use crate::error::DeimosResult; +use async_trait::async_trait; +use std::{self, sync::Arc}; + +pub mod lightclient; +pub mod sequencer; + +#[async_trait] +pub trait NodeType { + async fn start(self: Arc) -> DeimosResult<()>; + // async fn stop(&self) -> Result<(), String>; +} + +#[cfg(test)] +mod tests { + use crate::{storage::UpdateEntryJson, utils::verify_signature}; + use base64::{engine::general_purpose, Engine as _}; + + fn setup_signature(valid_signature: bool) -> UpdateEntryJson { + let signed_message = if valid_signature { + "NRtq1sgoxllsPvljXZd5f4DV7570PdA9zWHa4ych2jBCDU1uUYXZvW72BS9O+C68hptk/4Y34sTJj4x92gq9DHsiaWQiOiJDb3NSWE9vU0xHN2E4c0NHeDc4S2h0ZkxFdWl5Tlk3TDRrc0Z0NzhtcDJNPSIsIm9wZXJhdGlvbiI6IkFkZCIsInZhbHVlIjoiMjE3OWM0YmIzMjc0NDQ1NGE0OTlhYTMwZTI0NTJlMTZhODcwMGQ5ODQyYjI5ZThlODcyN2VjMzczNWMwYjdhNiJ9".to_string() + } else { + "QVmk3wgoxllsPvljXZd5f4DV7570PdA9zWHa4ych2jBCDU1uUYXZvW72BS9O+C68hptk/4Y34sTJj4x92gq9DHsiaWQiOiJDb3NSWE9vU0xHN2E4c0NHeDc4S2h0ZkxFdWl5Tlk3TDRrc0Z0NzhtcDJNPSIsIm9wZXJhdGlvbiI6IkFkZCIsInZhbHVlIjoiMjE3OWM0YmIzMjc0NDQ1NGE0OTlhYTMwZTI0NTJlMTZhODcwMGQ5ODQyYjI5ZThlODcyN2VjMzczNWMwYjdhNiJ9".to_string() + }; + let id_public_key = "CosRXOoSLG7a8sCGx78KhtfLEuiyNY7L4ksFt78mp2M=".to_string(); + + UpdateEntryJson { + id: id_public_key.clone(), + signed_message, + public_key: id_public_key, + } + } + + #[test] + fn test_verify_valid_signature() { + let signature_with_key = setup_signature(true); + + let result = verify_signature( + &signature_with_key, + Some(signature_with_key.public_key.clone()), + ); + + assert!(result.is_ok()); + } + + #[test] + fn test_verify_invalid_signature() { + let signature_with_key = setup_signature(false); + + let result = verify_signature( + &signature_with_key, + Some(signature_with_key.public_key.clone()), + ); + assert!(result.is_err()); + } + + #[test] + fn test_verify_short_message() { + let signature_with_key = setup_signature(true); + + let short_message = + general_purpose::STANDARD.encode(&"this is a short message".to_string()); + + let signature_with_key = UpdateEntryJson { + signed_message: short_message, + ..signature_with_key + }; + + let result = verify_signature( + &signature_with_key, + Some(signature_with_key.public_key.clone()), + ); + assert!(result.is_err()); + } +} diff --git a/src/node_types.rs b/src/node_types/sequencer.rs similarity index 66% rename from src/node_types.rs rename to src/node_types/sequencer.rs index d6431c06..e1fc3a62 100644 --- a/src/node_types.rs +++ b/src/node_types/sequencer.rs @@ -1,5 +1,4 @@ use crate::{ - cfg::CelestiaConfig, consts::{CHANNEL_BUFFER_SIZE, DA_RETRY_COUNT, DA_RETRY_INTERVAL}, error::{DataAvailabilityError, DeimosResult}, }; @@ -21,21 +20,13 @@ use crate::{ cfg::Config, da::{DataAvailabilityLayer, EpochJson}, error::{DeimosError, GeneralError}, + node_types::NodeType, storage::{ChainEntry, Database, IncomingEntry, Operation, UpdateEntryJson}, - utils::{validate_epoch, verify_signature}, + utils::verify_signature, webserver::WebServer, - zk_snark::{ - deserialize_custom_to_verifying_key, deserialize_proof, serialize_proof, - serialize_verifying_key_to_custom, BatchMerkleProofCircuit, - }, + zk_snark::{serialize_proof, serialize_verifying_key_to_custom, BatchMerkleProofCircuit}, }; -#[async_trait] -pub trait NodeType { - async fn start(self: Arc) -> DeimosResult<()>; - // async fn stop(&self) -> Result<(), String>; -} - pub struct Sequencer { pub db: Arc, pub da: Arc, @@ -47,13 +38,6 @@ pub struct Sequencer { epoch_buffer_rx: Arc>>, } -pub struct LightClient { - pub da: Arc, - // verifying_key is the [`VerifyingKey`] used to verify epochs from the prover/sequencer - pub verifying_key: Option, - start_height: u64, -} - #[async_trait] impl NodeType for Sequencer { async fn start(self: Arc) -> DeimosResult<()> { @@ -85,108 +69,6 @@ impl NodeType for Sequencer { } } -#[async_trait] -impl NodeType for LightClient { - async fn start(self: Arc) -> DeimosResult<()> { - // start listening for new headers to update sync target - self.da.start().await.unwrap(); - - info!("starting main light client loop"); - - // todo: persist current_position in datastore - let start_height = self.start_height; - spawn(async move { - let mut current_position = start_height; - let mut ticker = interval(Duration::from_secs(1)); - loop { - // target is updated when a new header is received - let target = self.da.get_message().await.unwrap(); - debug!("updated sync target to height {}", target); - for i in current_position..target { - trace!("processing height: {}", i); - match self.da.get(i + 1).await { - Ok(epoch_json_vec) => { - if epoch_json_vec.len() > 0 { - debug!("light client: got epochs at height {}", i + 1); - } - - // Verify adjacency to last heights, <- for this we need some sort of storage of epochs - // Verify zk proofs, - for epoch_json in epoch_json_vec { - let prev_commitment = &epoch_json.prev_commitment; - let current_commitment = &epoch_json.current_commitment; - let proof = match deserialize_proof(&epoch_json.proof) { - Ok(proof) => proof, - Err(e) => { - error!("failed to deserialize proof, skipping a blob at height {}: {:?}", i, e); - continue; - }, - }; - - // TODO(@distractedm1nd): i don't know rust yet but this seems like non-idiomatic rust - - // is there not a Trait that can satisfy these properties for us? - let verifying_key = match deserialize_custom_to_verifying_key(&epoch_json.verifying_key) { - Ok(vk) => vk, - Err(e) => { - error!("failed to deserialize verifying key, skipping a blob at height {}: {:?}", i, e); - continue; - }, - }; - - // if the user does not add a verifying key, we will not verify the signature, - // but only log a warning on startup - if self.verifying_key.is_some() { - match verify_signature(&epoch_json.clone(), self.verifying_key.clone()) { - Ok(i) => trace!("valid signature for epoch {}", i), - Err(e) => { - panic!("invalid signature in epoch {}: {:?}", i, e) - } - } - } - - match validate_epoch( - &prev_commitment, - ¤t_commitment, - proof, - verifying_key, - ) { - Ok(_) => { - info!("zkSNARK for epoch {} was validated successfully", epoch_json.height) - } - Err(err) => panic!("failed to validate epoch: {:?}", err), - } - } - } - Err(e) => { - debug!("light client: getting epoch: {}", e) - } - }; - } - ticker.tick().await; // only for testing purposes - current_position = target; // Update the current position to the latest target - } - }) - .await - .map_err(|_| { - DataAvailabilityError::InitializationError("failed to initialize".to_string()).into() - }) - } -} - -impl LightClient { - pub fn new( - da: Arc, - cfg: CelestiaConfig, - sequencer_pub_key: Option, - ) -> LightClient { - LightClient { - da, - verifying_key: sequencer_pub_key, - start_height: cfg.start_height, - } - } -} - impl Sequencer { pub fn new( db: Arc, @@ -524,67 +406,3 @@ impl Sequencer { } } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::storage::UpdateEntryJson; - use base64::{engine::general_purpose, Engine as _}; - - fn setup_signature(valid_signature: bool) -> UpdateEntryJson { - let signed_message = if valid_signature { - "NRtq1sgoxllsPvljXZd5f4DV7570PdA9zWHa4ych2jBCDU1uUYXZvW72BS9O+C68hptk/4Y34sTJj4x92gq9DHsiaWQiOiJDb3NSWE9vU0xHN2E4c0NHeDc4S2h0ZkxFdWl5Tlk3TDRrc0Z0NzhtcDJNPSIsIm9wZXJhdGlvbiI6IkFkZCIsInZhbHVlIjoiMjE3OWM0YmIzMjc0NDQ1NGE0OTlhYTMwZTI0NTJlMTZhODcwMGQ5ODQyYjI5ZThlODcyN2VjMzczNWMwYjdhNiJ9".to_string() - } else { - "QVmk3wgoxllsPvljXZd5f4DV7570PdA9zWHa4ych2jBCDU1uUYXZvW72BS9O+C68hptk/4Y34sTJj4x92gq9DHsiaWQiOiJDb3NSWE9vU0xHN2E4c0NHeDc4S2h0ZkxFdWl5Tlk3TDRrc0Z0NzhtcDJNPSIsIm9wZXJhdGlvbiI6IkFkZCIsInZhbHVlIjoiMjE3OWM0YmIzMjc0NDQ1NGE0OTlhYTMwZTI0NTJlMTZhODcwMGQ5ODQyYjI5ZThlODcyN2VjMzczNWMwYjdhNiJ9".to_string() - }; - let id_public_key = "CosRXOoSLG7a8sCGx78KhtfLEuiyNY7L4ksFt78mp2M=".to_string(); - - UpdateEntryJson { - id: id_public_key.clone(), - signed_message, - public_key: id_public_key, - } - } - - #[test] - fn test_verify_valid_signature() { - let signature_with_key = setup_signature(true); - - let result = verify_signature( - &signature_with_key, - Some(signature_with_key.public_key.clone()), - ); - - assert!(result.is_ok()); - } - - #[test] - fn test_verify_invalid_signature() { - let signature_with_key = setup_signature(false); - - let result = verify_signature( - &signature_with_key, - Some(signature_with_key.public_key.clone()), - ); - assert!(result.is_err()); - } - - #[test] - fn test_verify_short_message() { - let signature_with_key = setup_signature(true); - - let short_message = - general_purpose::STANDARD.encode(&"this is a short message".to_string()); - - let signature_with_key = UpdateEntryJson { - signed_message: short_message, - ..signature_with_key - }; - - let result = verify_signature( - &signature_with_key, - Some(signature_with_key.public_key.clone()), - ); - assert!(result.is_err()); - } -} diff --git a/src/webserver.rs b/src/webserver.rs index cd75cf9a..afb2fbcc 100644 --- a/src/webserver.rs +++ b/src/webserver.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use crate::{ cfg::WebServerConfig, error::DatabaseError, - node_types::Sequencer, + node_types::sequencer::Sequencer, storage::{ChainEntry, DerivedEntry, Entry, UpdateEntryJson}, utils::{is_not_revoked, validate_proof}, zk_snark::{serialize_proof, BatchMerkleProofCircuit, HashChainEntryCircuit},