diff --git a/src/common.rs b/src/common.rs index d5f6f8b1..6a990417 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,8 +1,9 @@ use indexed_merkle_tree::{sha256_mod, Hash}; use serde::{Deserialize, Serialize}; use std::fmt::Display; +use borsh::{BorshDeserialize, BorshSerialize}; -#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] +#[derive(Clone, BorshDeserialize, BorshSerialize, Serialize, Deserialize, Debug, PartialEq)] // An [`Operation`] represents a state transition in the system. // In a blockchain analogy, this would be the full set of our transaction types. pub enum Operation { @@ -24,7 +25,7 @@ pub enum Operation { }, } -#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] +#[derive(Clone, BorshSerialize, BorshDeserialize, Serialize, Deserialize, Debug, PartialEq)] // An [`AccountSource`] represents the source of an account. See adr-002 for more information. pub enum AccountSource { SignedBySequencer { signature: String }, diff --git a/src/da/celestia.rs b/src/da/celestia.rs index 04adeb5a..723d8069 100644 --- a/src/da/celestia.rs +++ b/src/da/celestia.rs @@ -1,4 +1,5 @@ use crate::{ + common::Operation, consts::CHANNEL_BUFFER_SIZE, da::{DataAvailabilityLayer, FinalizedEpoch}, error::{DAResult, DataAvailabilityError, GeneralError}, @@ -25,9 +26,19 @@ impl TryFrom<&Blob> for FinalizedEpoch { } } +impl TryFrom<&Blob> for Operation { + type Error = GeneralError; + + fn try_from(value: &Blob) -> Result { + from_slice::(&value.data) + .map_err(|e| GeneralError::DecodingError(format!("decoding blob: {}", e))) + } +} + pub struct CelestiaConnection { pub client: celestia_rpc::Client, - pub namespace_id: Namespace, + pub snark_namespace: Namespace, + pub operation_namespace: Namespace, synctarget_tx: Arc>, synctarget_rx: Arc>>, @@ -72,7 +83,9 @@ impl CelestiaConnection { Ok(CelestiaConnection { client, - namespace_id, + snark_namespace: namespace_id, + // TODO: pass in second namespace + operation_namespace: namespace_id, synctarget_tx: Arc::new(tx), synctarget_rx: Arc::new(Mutex::new(rx)), }) @@ -100,7 +113,7 @@ impl DataAvailabilityLayer for CelestiaConnection { async fn get_snarks(&self, height: u64) -> DAResult> { trace!("searching for epoch on da layer at height {}", height); - match BlobClient::blob_get_all(&self.client, height, &[self.namespace_id]).await { + match BlobClient::blob_get_all(&self.client, height, &[self.snark_namespace]).await { Ok(blobs) => { let mut epochs = Vec::new(); for blob in blobs.iter() { @@ -129,6 +142,7 @@ impl DataAvailabilityLayer for CelestiaConnection { } } } + async fn submit_snarks(&self, epochs: Vec) -> DAResult { if epochs.is_empty() { return Err(DataAvailabilityError::GeneralError( @@ -147,7 +161,7 @@ impl DataAvailabilityLayer for CelestiaConnection { epoch.height, e ))) })?; - Blob::new(self.namespace_id, data).map_err(|e| { + Blob::new(self.snark_namespace, data).map_err(|e| { DataAvailabilityError::GeneralError(GeneralError::BlobCreationError( e.to_string(), )) @@ -172,6 +186,67 @@ impl DataAvailabilityLayer for CelestiaConnection { } } + async fn get_operations(&self, height: u64) -> DAResult> { + trace!("searching for operations on da layer at height {}", height); + match BlobClient::blob_get_all(&self.client, height, &[self.operation_namespace]).await { + Ok(blobs) => { + let mut operations = Vec::new(); + for blob in blobs.iter() { + match Operation::try_from(blob) { + Ok(operation) => operations.push(operation), + Err(_) => { + debug!( + "marshalling blob from height {} to operation failed: {:?}", + height, &blob + ) + } + } + } + Ok(operations) + } + Err(err) => Err(DataAvailabilityError::DataRetrievalError( + height, + format!("getting operations from da layer: {}", err), + ) + .into()), + } + } + + async fn submit_operations(&self, operations: Vec) -> DAResult { + debug!("posting {} operations to DA layer", operations.len()); + let blobs: Result, DataAvailabilityError> = operations + .iter() + .map(|operation| { + let data = borsh::to_vec(operation).map_err(|e| { + DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!( + "serializing operation {}: {}", + operation, e + ))) + })?; + Blob::new(self.operation_namespace, data).map_err(|e| { + DataAvailabilityError::GeneralError(GeneralError::BlobCreationError( + e.to_string(), + )) + }) + }) + .collect(); + + let blobs = blobs?; + + for (i, blob) in blobs.iter().enumerate() { + trace!("blob {}: {:?}", i, blob); + } + + match self.client.blob_submit(&blobs, GasPrice::from(-1.0)).await { + Ok(height) => Ok(height), + Err(err) => Err(DataAvailabilityError::SubmissionError( + // todo: fucking submission error is yikes, we need anyhow + 0, + err.to_string(), + )), + } + } + async fn start(&self) -> DAResult<()> { let mut header_sub = HeaderClient::header_subscribe(&self.client) .await diff --git a/src/da/mock.rs b/src/da/mock.rs index 8a835a3a..2a1533e5 100644 --- a/src/da/mock.rs +++ b/src/da/mock.rs @@ -1,4 +1,7 @@ -use crate::error::{DAResult, DataAvailabilityError}; +use crate::{ + common::Operation, + error::{DAResult, DataAvailabilityError}, +}; use async_trait::async_trait; use fs2::FileExt; use serde_json::{json, Value}; @@ -27,6 +30,14 @@ impl DataAvailabilityLayer for NoopDataAvailabilityLayer { Ok(vec![]) } + async fn get_operations(&self, _: u64) -> DAResult> { + Ok(vec![]) + } + + async fn submit_operations(&self, _: Vec) -> DAResult { + Ok(0) + } + async fn submit_snarks(&self, _: Vec) -> DAResult { Ok(0) } @@ -41,11 +52,13 @@ impl DataAvailabilityLayer for NoopDataAvailabilityLayer { /// This allows to write and test the functionality of systems that interact with a data availability layer without the need for an actual external service or network like we do with Celestia. /// /// This implementation is intended for testing and development only and should not be used in production environments. It provides a way to test the interactions with the data availability layer without the overhead of real network communication or data persistence. -pub struct LocalDataAvailabilityLayer {} +pub struct LocalDataAvailabilityLayer { + pub op_height: u64, +} impl LocalDataAvailabilityLayer { pub fn new() -> Self { - LocalDataAvailabilityLayer {} + LocalDataAvailabilityLayer { op_height: 0 } } } @@ -65,6 +78,83 @@ impl DataAvailabilityLayer for LocalDataAvailabilityLayer { Ok(0) // header starts always at zero in test cases } + async fn get_operations(&self, height: u64) -> DAResult> { + let mut file = File::open("operations.json").expect("Unable to open operations file"); + let mut contents = String::new(); + file.lock_exclusive() + .expect("Unable to lock operations file"); + file.read_to_string(&mut contents) + .expect("Unable to read operations file"); + + let data: Value = + serde_json::from_str(&contents).expect("Invalid JSON format in operations file"); + + if let Some(operations) = data.get(height.to_string()) { + let operations_hex = operations + .as_str() + .expect("Operations value is not a string"); + let operations_bytes = + hex::decode(operations_hex).expect("Invalid hex string for operations"); + + let result_operations: Result, _> = borsh::from_slice(&operations_bytes); + + file.unlock().expect("Unable to unlock operations file"); + Ok(result_operations.expect("Wrong format for operations")) + } else { + file.unlock().expect("Unable to unlock operations file"); + Err(DataAvailabilityError::DataRetrievalError( + height, + "Could not get operations from DA layer".to_string(), + )) + } + } + + async fn submit_operations(&self, operations: Vec) -> DAResult { + let mut file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open("operations.json") + .expect("Unable to open operations file"); + + let mut contents = String::new(); + + file.lock_exclusive() + .expect("Unable to lock operations file"); + info!("operations file locked"); + + file.read_to_string(&mut contents) + .expect("Unable to read operations file"); + + let mut data: Value = if contents.is_empty() { + json!({}) + } else { + serde_json::from_str(&contents).expect("Invalid JSON format in operations file") + }; + + // Add new operations to existing json-file data + data[self.op_height.to_string()] = + hex::encode(borsh::to_vec(&operations).expect("Unable to serialize operations")).into(); + + // Reset the file pointer to the beginning of the file + file.seek(std::io::SeekFrom::Start(0)) + .expect("Unable to seek to start of operations file"); + + // Write the updated data into the file + file.write_all(data.to_string().as_bytes()) + .expect("Unable to write operations file"); + + // Truncate the file to the current pointer to remove any extra data + file.set_len(data.to_string().as_bytes().len() as u64) + .expect("Unable to set operations file length"); + + file.unlock().expect("Unable to unlock operations file"); + info!("operations file unlocked"); + + Ok(self.op_height + 1) + } + async fn get_snarks(&self, height: u64) -> DAResult> { let mut file = File::open("data.json").expect("Unable to open file"); let mut contents = String::new(); diff --git a/src/da/mod.rs b/src/da/mod.rs index c6521b2d..c2897501 100644 --- a/src/da/mod.rs +++ b/src/da/mod.rs @@ -1,4 +1,5 @@ use crate::{ + common::Operation, error::{DAResult, GeneralError, PrismResult}, utils::SignedContent, zk_snark::{Bls12Proof, VerifyingKey}, @@ -53,5 +54,7 @@ pub trait DataAvailabilityLayer: Send + Sync { async fn initialize_sync_target(&self) -> DAResult; async fn get_snarks(&self, height: u64) -> DAResult>; async fn submit_snarks(&self, epoch: Vec) -> DAResult; + async fn get_operations(&self, height: u64) -> DAResult>; + async fn submit_operations(&self, operations: Vec) -> DAResult; async fn start(&self) -> DAResult<()>; } diff --git a/src/node_types/sequencer.rs b/src/node_types/sequencer.rs index 1f74b211..ace10752 100644 --- a/src/node_types/sequencer.rs +++ b/src/node_types/sequencer.rs @@ -7,7 +7,7 @@ use indexed_merkle_tree::{ tree::{IndexedMerkleTree, Proof}, Hash, }; -use std::{self, str::FromStr, sync::Arc, time::Duration}; +use std::{self, str::FromStr, sync::Arc}; use tokio::{ sync::{ mpsc::{channel, Receiver, Sender}, @@ -37,7 +37,9 @@ pub struct Sequencer { pub ws: WebServer, pub key: SigningKey, - pending_entries: Arc>>, + // pending_entries is a buffer for operations that have not yet been + // posted to the DA layer. + pub pending_entries: Arc>>, tree: Arc>, epoch_buffer_tx: Arc>, @@ -51,14 +53,14 @@ impl NodeType for Sequencer { return Err(DataAvailabilityError::InitializationError(e.to_string()).into()); } - let main_loop = self.clone().main_loop(); + let sync_loop = self.clone().sync_loop(); let da_loop = self.clone().da_loop(); let ws_self = self.clone(); let ws = ws_self.ws.start(self.clone()); tokio::select! { - _ = main_loop => Ok(()), + _ = sync_loop => Ok(()), _ = da_loop => Ok(()), _ = ws => Ok(()), } @@ -103,34 +105,60 @@ impl Sequencer { }) } - // main_loop is responsible for finalizing epochs every epoch length and writing them to the buffer for DA submission. - async fn main_loop(self: Arc) -> Result<(), tokio::task::JoinError> { - info!("starting main sequencer loop"); + async fn sync_loop(self: Arc) -> Result<(), tokio::task::JoinError> { + info!("starting operation sync loop"); + // todo: make self.start_height + let start_height = 1000; let epoch_buffer = self.epoch_buffer_tx.clone(); - let mut ticker = interval(Duration::from_secs(self.epoch_duration)); spawn(async move { + let mut current_position = start_height; loop { - ticker.tick().await; - match self.finalize_epoch().await { - Ok(epoch) => { - let epoch_height = match self.db.get_epoch() { - Ok(epoch) => epoch, - Err(e) => { - error!("sequencer_loop: getting epoch from db: {}", e); - continue; + // target is updated when a new header is received + let target = match self.da.get_latest_height().await { + Ok(target) => target, + Err(e) => { + error!("failed to update sync target, retrying: {:?}", e); + continue; + } + }; + + debug!("updated sync target to height {}", target); + while current_position < target { + trace!("processing height: {}", current_position); + match self.da.get_operations(current_position + 1).await { + Ok(operations) => { + if !operations.is_empty() { + debug!( + "sequencer: got operations at height {}", + current_position + 1 + ); } - }; - info!("sequencer_loop: finalized epoch {}", epoch_height); - match epoch_buffer.send(epoch).await { - Ok(_) => (), - Err(e) => { - error!("sequencer_loop: sending epoch to buffer: {}", e); + // TODO: doesn't this copy the Vec? Seems very inefficient + let epoch = match self.finalize_epoch(operations).await { + Ok(e) => e, + Err(e) => { + error!("sequencer_loop: finalizing epoch: {}", e); + continue; + } + }; + + info!("sequencer_loop: finalized epoch {}", epoch.height); + match epoch_buffer.send(epoch).await { + Ok(_) => { + current_position += 1; + } + Err(e) => { + error!("sequencer_loop: sending epoch to buffer: {}", e); + } } } - } - Err(e) => error!("sequencer_loop: finalizing epoch: {}", e), + Err(e) => { + debug!("light client: getting epoch: {}", e) + } + }; } + current_position = target; // Update the current position to the latest target } }) .await @@ -181,8 +209,8 @@ impl Sequencer { tree.get_commitment().map_err(|e| e.into()) } - // finalize_epoch is responsible for finalizing the pending epoch and returning the [`FinalizedEpoch`] to be posted on the DA layer. - pub async fn finalize_epoch(&self) -> PrismResult { + // finalize_epoch is responsible for finalizing the pending epoch and returning the epoch json to be posted on the DA layer. + pub async fn finalize_epoch(&self, operations: Vec) -> PrismResult { let epoch = match self.db.get_epoch() { Ok(epoch) => epoch + 1, Err(_) => 0, @@ -205,7 +233,11 @@ impl Sequencer { self.get_commitment().await? }; - let proofs = self.finalize_pending_entries().await?; + let mut proofs = Vec::new(); + for entry in operations.iter() { + let proof = self.process_operation(entry).await?; + proofs.push(proof); + } let current_commitment = { let tree = self.tree.lock().await; @@ -247,18 +279,6 @@ impl Sequencer { } } - // finalize_pending_entries processes all pending entries and returns the proofs. - async fn finalize_pending_entries(&self) -> PrismResult> { - let mut pending_entries = self.pending_entries.lock().await; - let mut proofs = Vec::new(); - for entry in pending_entries.iter() { - let proof = self.process_operation(entry).await?; - proofs.push(proof); - } - pending_entries.clear(); - Ok(proofs) - } - /// Updates the state from an already verified pending operation. async fn process_operation(&self, operation: &Operation) -> PrismResult { match operation { @@ -476,8 +496,9 @@ mod tests { let hashchain = sequencer.db.get_hashchain(id.as_str()); assert!(hashchain.is_err()); + let pending_entries = sequencer.pending_entries.lock().await.clone(); let prev_commitment = sequencer.get_commitment().await.unwrap(); - sequencer.finalize_epoch().await.unwrap(); + sequencer.finalize_epoch(pending_entries).await.unwrap(); let new_commitment = sequencer.get_commitment().await.unwrap(); assert_ne!(prev_commitment, new_commitment);