From cfe61ec6f07ae0b583f942548d2d1b80d6c34768 Mon Sep 17 00:00:00 2001 From: Ryan Date: Thu, 1 Aug 2024 21:28:06 +0200 Subject: [PATCH] tests: integration testing --- src/cfg.rs | 4 +- src/da/memory.rs | 40 +++++--- src/da/mod.rs | 2 +- src/lib.rs | 2 +- src/node_types/sequencer.rs | 82 ++++++++++++++-- tests/integration_tests.rs | 186 ++++++++++++++++++++++++------------ 6 files changed, 233 insertions(+), 83 deletions(-) diff --git a/src/cfg.rs b/src/cfg.rs index b3da1cbf..d3217979 100644 --- a/src/cfg.rs +++ b/src/cfg.rs @@ -90,7 +90,7 @@ impl Default for WebServerConfig { fn default() -> Self { WebServerConfig { host: "127.0.0.1".to_string(), - port: 8080, + port: 8089, } } } @@ -298,7 +298,7 @@ pub async fn initialize_da_layer( unreachable!() // This line should never be reached due to the return in the last iteration } DALayerOption::InMemory => { - let (da_layer, _rx) = InMemoryDataAvailabilityLayer::new(1); + let (da_layer, _height_rx, _block_rx) = InMemoryDataAvailabilityLayer::new(1); Ok(Arc::new(da_layer) as Arc) } DALayerOption::None => Err(anyhow!(PrismError::ConfigError( diff --git a/src/da/memory.rs b/src/da/memory.rs index 88bf0369..78efea9a 100644 --- a/src/da/memory.rs +++ b/src/da/memory.rs @@ -8,6 +8,13 @@ use std::sync::Arc; use tokio::sync::{broadcast, RwLock}; use tokio::time::{interval, Duration}; +#[derive(Clone, Debug)] +pub struct Block { + pub height: u64, + pub operations: Vec, + pub epochs: Vec, +} + #[derive(Clone)] pub struct InMemoryDataAvailabilityLayer { blocks: Arc>>, @@ -15,28 +22,26 @@ pub struct InMemoryDataAvailabilityLayer { pending_epochs: Arc>>, latest_height: Arc>, height_update_tx: broadcast::Sender, + block_update_tx: broadcast::Sender, block_time: u64, } -struct Block { - height: u64, - operations: Vec, - epochs: Vec, -} - impl InMemoryDataAvailabilityLayer { - pub fn new(block_time: u64) -> (Self, broadcast::Receiver) { - let (tx, rx) = broadcast::channel(100); + pub fn new(block_time: u64) -> (Self, broadcast::Receiver, broadcast::Receiver) { + let (height_tx, height_rx) = broadcast::channel(100); + let (block_tx, block_rx) = broadcast::channel(100); ( Self { blocks: Arc::new(RwLock::new(Vec::new())), pending_operations: Arc::new(RwLock::new(Vec::new())), pending_epochs: Arc::new(RwLock::new(Vec::new())), latest_height: Arc::new(RwLock::new(0)), - height_update_tx: tx, + height_update_tx: height_tx, + block_update_tx: block_tx, block_time, }, - rx, + height_rx, + block_rx, ) } @@ -55,12 +60,23 @@ impl InMemoryDataAvailabilityLayer { operations: std::mem::take(&mut *pending_operations), epochs: std::mem::take(&mut *pending_epochs), }; - blocks.push(new_block); + debug!( + "new block produced at height {} with {} operations and {} snarks", + new_block.height, + new_block.operations.len(), + new_block.epochs.len() + ); + blocks.push(new_block.clone()); - // Notify subscribers of the new height + // Notify subscribers of the new height and block let _ = self.height_update_tx.send(*latest_height); + let _ = self.block_update_tx.send(new_block); } } + + pub fn subscribe_blocks(&self) -> broadcast::Receiver { + self.block_update_tx.subscribe() + } } #[async_trait] diff --git a/src/da/mod.rs b/src/da/mod.rs index 7449b4ff..3f10be05 100644 --- a/src/da/mod.rs +++ b/src/da/mod.rs @@ -15,7 +15,7 @@ pub mod celestia; pub mod memory; // FinalizedEpoch is the data structure that represents the finalized epoch data, and is posted to the DA layer. -#[derive(BorshSerialize, BorshDeserialize, Clone)] +#[derive(BorshSerialize, BorshDeserialize, Clone, Debug)] pub struct FinalizedEpoch { pub height: u64, pub prev_commitment: Hash, diff --git a/src/lib.rs b/src/lib.rs index 334ea039..7d93911b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,6 @@ pub mod error; pub mod node_types; pub mod storage; pub mod utils; -mod webserver; +pub mod webserver; #[macro_use] extern crate log; diff --git a/src/node_types/sequencer.rs b/src/node_types/sequencer.rs index a39c2883..ee937817 100644 --- a/src/node_types/sequencer.rs +++ b/src/node_types/sequencer.rs @@ -60,14 +60,16 @@ impl NodeType for Sequencer { self.da.start().await.context("Failed to start DA layer")?; let sync_loop = self.clone().sync_loop(); - let da_loop = self.clone().da_loop(); + let snark_loop = self.clone().post_snarks_loop(); + let operation_loop = self.clone().post_operations_loop(); let ws_self = self.clone(); let ws = ws_self.ws.start(self.clone()); tokio::select! { res = sync_loop => Ok(res.context("sync loop failed")?), - res = da_loop => Ok(res.context("DA loop failed")?), + res = snark_loop => Ok(res.context("DA loop failed")?), + res = operation_loop => Ok(res.context("Operation loop failed")?), res = ws => Ok(res.context("WebServer failed")?), } } @@ -115,6 +117,10 @@ impl Sequencer { } }; + if current_position == target { + continue; + } + debug!("updated sync target to height {}", target); while current_position < target { trace!("processing height: {}", current_position); @@ -156,8 +162,62 @@ impl Sequencer { .await } - // da_loop is responsible for submitting finalized epochs to the DA layer. - async fn da_loop(self: Arc) -> Result<(), tokio::task::JoinError> { + pub async fn post_operations_loop(self: Arc) -> Result<(), tokio::task::JoinError> { + info!("Starting operation posting loop"); + let mut ticker = interval(std::time::Duration::from_secs(1)); // Adjust the interval as needed + let mut last_processed_height = 0; + + spawn(async move { + loop { + ticker.tick().await; + + // Check for new block + let current_height = match self.da.get_latest_height().await { + Ok(height) => height, + Err(e) => { + error!("operation_loop: Failed to get latest height: {}", e); + continue; + } + }; + + // If there's a new block + if current_height > last_processed_height { + // Get pending operations + let pending_operations = { + let mut ops = self.pending_operations.lock().await; + std::mem::take(&mut *ops) + }; + + // If there are pending operations, submit them + if !pending_operations.is_empty() { + match self.da.submit_operations(pending_operations).await { + Ok(submitted_height) => { + info!( + "operation_loop: Submitted operations at height {}", + submitted_height + ); + last_processed_height = submitted_height; + } + Err(e) => { + error!("operation_loop: Failed to submit operations: {}", e); + // TODO: Handle error (e.g., retry logic, returning operations to pending_operations) + } + } + } else { + debug!( + "operation_loop: No pending operations to submit at height {}", + current_height + ); + last_processed_height = current_height; + } + } + } + }) + .await + } + + // post_snarks_loop is responsible for submitting finalized epochs to the DA layer. + async fn post_snarks_loop(self: Arc) -> Result<(), tokio::task::JoinError> { info!("starting da submission loop"); let mut ticker = interval(DA_RETRY_INTERVAL); spawn(async move { @@ -170,6 +230,11 @@ impl Sequencer { } }; + // don't post to DA layer if no epochs have been finalized + if epochs.is_empty() { + continue; + } + let mut retry_counter = 0; loop { if retry_counter > DA_RETRY_COUNT { @@ -322,6 +387,7 @@ impl Sequencer { hashed_id ))?; + debug!("updating hashchain for user id {}", id.clone()); self.db .update_hashchain(operation, ¤t_chain) .context(format!( @@ -416,7 +482,7 @@ mod tests { // Helper function to create a test Sequencer instance async fn create_test_sequencer() -> Arc { - let (da_layer, _rx) = InMemoryDataAvailabilityLayer::new(1); + let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1); let da_layer = Arc::new(da_layer); let db = Arc::new(setup_db()); let signing_key = create_signing_key(); @@ -459,7 +525,7 @@ mod tests { #[tokio::test] #[serial] async fn test_validate_and_queue_update() { - let (da_layer, _rx) = InMemoryDataAvailabilityLayer::new(1); + let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1); let da_layer = Arc::new(da_layer); let db = Arc::new(setup_db()); let sequencer = Arc::new( @@ -485,7 +551,7 @@ mod tests { #[tokio::test] #[serial] async fn test_queued_update_gets_finalized() { - let (da_layer, _rx) = InMemoryDataAvailabilityLayer::new(1); + let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1); let da_layer = Arc::new(da_layer); let db = Arc::new(setup_db()); let signing_key = create_signing_key(); @@ -525,7 +591,7 @@ mod tests { #[tokio::test] #[serial] async fn test_validate_invalid_update_fails() { - let (da_layer, _rx) = InMemoryDataAvailabilityLayer::new(1); + let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1); let da_layer = Arc::new(da_layer); let db = Arc::new(setup_db()); let sequencer = Arc::new( diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 218de6b5..cb91d9be 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,73 +1,141 @@ -use std::time::{Duration, Instant}; - -use bellman::groth16; -use bls12_381::Bls12; -use indexed_merkle_tree::{ - node::Node, - sha256_mod, - tree::{IndexedMerkleTree, Proof}, +use base64::{engine::general_purpose::STANDARD as engine, Engine as _}; +use ed25519_dalek::{Signer, SigningKey}; +use keystore_rs::create_signing_key; +use prism::{ + cfg::{Config, RedisConfig}, + common::{AccountSource, Operation}, + da::memory::InMemoryDataAvailabilityLayer, + node_types::{lightclient::LightClient, sequencer::Sequencer, NodeType}, + storage::{Database, RedisConnection}, + webserver::OperationInput, }; -use prism::{circuits::BatchMerkleProofCircuit, utils::validate_epoch}; use rand::rngs::OsRng; +use rand::{rngs::StdRng, Rng, SeedableRng}; +use std::{sync::Arc, time::Duration}; -fn generate_test_tree(size: usize, node_count: usize) -> Duration { - let mut tree = IndexedMerkleTree::new_with_size(size).unwrap(); - - let prev_commitment = tree.get_commitment().unwrap(); - let mut proofs = Vec::with_capacity(node_count); - let mut insertion_times: Vec = Vec::with_capacity(node_count); - for i in 0..node_count { - let mut leaf = Node::new_leaf( - true, - sha256_mod(&[(i + 1) as u8]), - sha256_mod(&[i as u8]), - Node::TAIL, - ); - - let start = Instant::now(); - let proof = tree.insert_node(&mut leaf).unwrap(); - let end = Instant::now(); - insertion_times.push(end.duration_since(start)); - proofs.push(Proof::Insert(proof)) +fn create_new_account_operation(id: String, value: String, key: &SigningKey) -> OperationInput { + let incoming = Operation::CreateAccount { + id: id.clone(), + value: value.clone(), + source: AccountSource::SignedBySequencer { + signature: key.sign(format!("{}{}", id, value).as_bytes()).to_string(), + }, + }; + let content = serde_json::to_string(&incoming).unwrap(); + let sig = key.sign(content.clone().as_bytes()); + OperationInput { + operation: incoming, + signed_operation: sig.to_string(), + public_key: engine.encode(key.verifying_key().to_bytes()), } - println!( - "{}x{} Average Insertion Time: {:?}", - size, - node_count, - insertion_times.iter().sum::() / node_count as u32 - ); +} - let current_commitment = tree.get_commitment().unwrap(); +fn create_update_operation(id: String, value: String) -> OperationInput { + let key = create_signing_key(); + let incoming = Operation::Add { id, value }; + let content = serde_json::to_string(&incoming).unwrap(); + let sig = key.sign(content.clone().as_bytes()); + OperationInput { + operation: incoming, + signed_operation: sig.to_string(), + public_key: engine.encode(key.verifying_key().to_bytes()), + } +} - let start = Instant::now(); - let batched_proof = - BatchMerkleProofCircuit::new(&prev_commitment, ¤t_commitment, proofs).unwrap(); +#[tokio::test] +async fn test_light_client_sequencer_talking() { + std::env::set_var("RUST_LOG", "DEBUG"); + pretty_env_logger::init(); - let rng = &mut OsRng; - let params = - groth16::generate_random_parameters::(batched_proof.clone(), rng).unwrap(); - let proof = groth16::create_random_proof(batched_proof.clone(), ¶ms, rng).unwrap(); - let end = Instant::now(); + let (da_layer, mut height_rx, mut block_rx) = InMemoryDataAvailabilityLayer::new(1); + let da_layer = Arc::new(da_layer); + let db = Arc::new(setup_db()); + let cfg = Config::default(); + let signing_key = create_signing_key(); + let pubkey = engine.encode(signing_key.verifying_key().to_bytes()); - let result = validate_epoch( - &prev_commitment, - ¤t_commitment, - proof.clone(), - params.vk, + let sequencer = Arc::new( + Sequencer::new( + db.clone(), + da_layer.clone(), + cfg.clone(), + signing_key.clone(), + ) + .unwrap(), ); - assert!(result.is_ok()); - assert_eq!(result.unwrap(), proof); - end.duration_since(start) -} + let lightclient = Arc::new(LightClient::new( + da_layer, + cfg.celestia_config.unwrap(), + Some(pubkey), + )); + + let seq_1 = sequencer.clone(); + tokio::spawn(async move { + seq_1.start().await.unwrap(); + }); + + tokio::spawn(async move { + lightclient.clone().start().await.unwrap(); + }); + + let seq = sequencer.clone(); + tokio::spawn(async move { + let mut rng = StdRng::from_entropy(); + let mut accounts = Vec::new(); + let mut i = 0; -#[test] -fn test_prover_time() { - // add more test cases while benchmarking, obviously - let test_cases: Vec<(usize, usize)> = vec![(usize::pow(2, 13), 8)]; + loop { + let seq_clone = seq.clone(); + // Create 1 or 2 new accounts + let num_new_accounts = rng.gen_range(1..=10); + for _ in 0..num_new_accounts { + let seq_i = seq_clone.clone(); + let new_acc = create_new_account_operation( + format!("{}@gmail.com", i), + format!("key_{}", i), + &signing_key, + ); + seq_i.validate_and_queue_update(&new_acc).await.unwrap(); + accounts.push(format!("{}@gmail.com", i)); + i += 1; + } - for (size, node_count) in test_cases { - let duration = generate_test_tree(size, node_count); - println!("{}x{}: Proof Time {:?}", size, node_count, duration) + // Update 5 random existing accounts (if we have at least 5) + if accounts.len() >= 5 { + for _ in 0..5 { + let seq_i = seq_clone.clone(); + let account_index = rng.gen_range(0..accounts.len()); + let account_id = accounts[account_index].clone(); + let update_op = create_update_operation( + account_id, + format!("updated_key_{}", rng.gen::()), + ); + seq_i.validate_and_queue_update(&update_op).await.unwrap(); + } + } + + tokio::time::sleep(Duration::from_millis(500)).await; + } + }); + + while let Ok(height) = height_rx.recv().await { + if height == 60 { + break; + } } + + teardown_db(db.clone()) +} + +// set up redis connection and flush database before each test +fn setup_db() -> RedisConnection { + let redis_connection = RedisConnection::new(&RedisConfig::default()).unwrap(); + redis_connection.flush_database().unwrap(); + redis_connection +} + +// flush database after each test +fn teardown_db(redis_connections: Arc) { + redis_connections.flush_database().unwrap(); }