Skip to content

Commit

Permalink
tests: integration testing
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Aug 1, 2024
1 parent 079b33f commit cfe61ec
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 83 deletions.
4 changes: 2 additions & 2 deletions src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Default for WebServerConfig {
fn default() -> Self {
WebServerConfig {
host: "127.0.0.1".to_string(),
port: 8080,
port: 8089,
}
}
}
Expand Down Expand Up @@ -298,7 +298,7 @@ pub async fn initialize_da_layer(
unreachable!() // This line should never be reached due to the return in the last iteration
}
DALayerOption::InMemory => {
let (da_layer, _rx) = InMemoryDataAvailabilityLayer::new(1);
let (da_layer, _height_rx, _block_rx) = InMemoryDataAvailabilityLayer::new(1);
Ok(Arc::new(da_layer) as Arc<dyn DataAvailabilityLayer + 'static>)
}
DALayerOption::None => Err(anyhow!(PrismError::ConfigError(
Expand Down
40 changes: 28 additions & 12 deletions src/da/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,40 @@ use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tokio::time::{interval, Duration};

#[derive(Clone, Debug)]
pub struct Block {
pub height: u64,
pub operations: Vec<Operation>,
pub epochs: Vec<FinalizedEpoch>,
}

#[derive(Clone)]
pub struct InMemoryDataAvailabilityLayer {
blocks: Arc<RwLock<Vec<Block>>>,
pending_operations: Arc<RwLock<Vec<Operation>>>,
pending_epochs: Arc<RwLock<Vec<FinalizedEpoch>>>,
latest_height: Arc<RwLock<u64>>,
height_update_tx: broadcast::Sender<u64>,
block_update_tx: broadcast::Sender<Block>,
block_time: u64,
}

struct Block {
height: u64,
operations: Vec<Operation>,
epochs: Vec<FinalizedEpoch>,
}

impl InMemoryDataAvailabilityLayer {
pub fn new(block_time: u64) -> (Self, broadcast::Receiver<u64>) {
let (tx, rx) = broadcast::channel(100);
pub fn new(block_time: u64) -> (Self, broadcast::Receiver<u64>, broadcast::Receiver<Block>) {
let (height_tx, height_rx) = broadcast::channel(100);
let (block_tx, block_rx) = broadcast::channel(100);
(
Self {
blocks: Arc::new(RwLock::new(Vec::new())),
pending_operations: Arc::new(RwLock::new(Vec::new())),
pending_epochs: Arc::new(RwLock::new(Vec::new())),
latest_height: Arc::new(RwLock::new(0)),
height_update_tx: tx,
height_update_tx: height_tx,
block_update_tx: block_tx,
block_time,
},
rx,
height_rx,
block_rx,
)
}

Expand All @@ -55,12 +60,23 @@ impl InMemoryDataAvailabilityLayer {
operations: std::mem::take(&mut *pending_operations),
epochs: std::mem::take(&mut *pending_epochs),
};
blocks.push(new_block);
debug!(
"new block produced at height {} with {} operations and {} snarks",
new_block.height,
new_block.operations.len(),
new_block.epochs.len()
);
blocks.push(new_block.clone());

// Notify subscribers of the new height
// Notify subscribers of the new height and block
let _ = self.height_update_tx.send(*latest_height);
let _ = self.block_update_tx.send(new_block);
}
}

pub fn subscribe_blocks(&self) -> broadcast::Receiver<Block> {
self.block_update_tx.subscribe()
}
}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion src/da/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub mod celestia;
pub mod memory;

// FinalizedEpoch is the data structure that represents the finalized epoch data, and is posted to the DA layer.
#[derive(BorshSerialize, BorshDeserialize, Clone)]
#[derive(BorshSerialize, BorshDeserialize, Clone, Debug)]
pub struct FinalizedEpoch {
pub height: u64,
pub prev_commitment: Hash,
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ pub mod error;
pub mod node_types;
pub mod storage;
pub mod utils;
mod webserver;
pub mod webserver;
#[macro_use]
extern crate log;
82 changes: 74 additions & 8 deletions src/node_types/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ impl NodeType for Sequencer {
self.da.start().await.context("Failed to start DA layer")?;

let sync_loop = self.clone().sync_loop();
let da_loop = self.clone().da_loop();
let snark_loop = self.clone().post_snarks_loop();
let operation_loop = self.clone().post_operations_loop();

let ws_self = self.clone();
let ws = ws_self.ws.start(self.clone());

tokio::select! {
res = sync_loop => Ok(res.context("sync loop failed")?),
res = da_loop => Ok(res.context("DA loop failed")?),
res = snark_loop => Ok(res.context("DA loop failed")?),
res = operation_loop => Ok(res.context("Operation loop failed")?),
res = ws => Ok(res.context("WebServer failed")?),
}
}
Expand Down Expand Up @@ -115,6 +117,10 @@ impl Sequencer {
}
};

if current_position == target {
continue;
}

debug!("updated sync target to height {}", target);
while current_position < target {
trace!("processing height: {}", current_position);
Expand Down Expand Up @@ -156,8 +162,62 @@ impl Sequencer {
.await
}

// da_loop is responsible for submitting finalized epochs to the DA layer.
async fn da_loop(self: Arc<Self>) -> Result<(), tokio::task::JoinError> {
pub async fn post_operations_loop(self: Arc<Self>) -> Result<(), tokio::task::JoinError> {
info!("Starting operation posting loop");
let mut ticker = interval(std::time::Duration::from_secs(1)); // Adjust the interval as needed
let mut last_processed_height = 0;

spawn(async move {
loop {
ticker.tick().await;

// Check for new block
let current_height = match self.da.get_latest_height().await {
Ok(height) => height,
Err(e) => {
error!("operation_loop: Failed to get latest height: {}", e);
continue;
}
};

// If there's a new block
if current_height > last_processed_height {
// Get pending operations
let pending_operations = {
let mut ops = self.pending_operations.lock().await;
std::mem::take(&mut *ops)
};

// If there are pending operations, submit them
if !pending_operations.is_empty() {
match self.da.submit_operations(pending_operations).await {
Ok(submitted_height) => {
info!(
"operation_loop: Submitted operations at height {}",
submitted_height
);
last_processed_height = submitted_height;
}
Err(e) => {
error!("operation_loop: Failed to submit operations: {}", e);
// TODO: Handle error (e.g., retry logic, returning operations to pending_operations)
}
}
} else {
debug!(
"operation_loop: No pending operations to submit at height {}",
current_height
);
last_processed_height = current_height;
}
}
}
})
.await
}

// post_snarks_loop is responsible for submitting finalized epochs to the DA layer.
async fn post_snarks_loop(self: Arc<Self>) -> Result<(), tokio::task::JoinError> {
info!("starting da submission loop");
let mut ticker = interval(DA_RETRY_INTERVAL);
spawn(async move {
Expand All @@ -170,6 +230,11 @@ impl Sequencer {
}
};

// don't post to DA layer if no epochs have been finalized
if epochs.is_empty() {
continue;
}

let mut retry_counter = 0;
loop {
if retry_counter > DA_RETRY_COUNT {
Expand Down Expand Up @@ -322,6 +387,7 @@ impl Sequencer {
hashed_id
))?;

debug!("updating hashchain for user id {}", id.clone());
self.db
.update_hashchain(operation, &current_chain)
.context(format!(
Expand Down Expand Up @@ -416,7 +482,7 @@ mod tests {

// Helper function to create a test Sequencer instance
async fn create_test_sequencer() -> Arc<Sequencer> {
let (da_layer, _rx) = InMemoryDataAvailabilityLayer::new(1);
let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1);
let da_layer = Arc::new(da_layer);
let db = Arc::new(setup_db());
let signing_key = create_signing_key();
Expand Down Expand Up @@ -459,7 +525,7 @@ mod tests {
#[tokio::test]
#[serial]
async fn test_validate_and_queue_update() {
let (da_layer, _rx) = InMemoryDataAvailabilityLayer::new(1);
let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1);
let da_layer = Arc::new(da_layer);
let db = Arc::new(setup_db());
let sequencer = Arc::new(
Expand All @@ -485,7 +551,7 @@ mod tests {
#[tokio::test]
#[serial]
async fn test_queued_update_gets_finalized() {
let (da_layer, _rx) = InMemoryDataAvailabilityLayer::new(1);
let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1);
let da_layer = Arc::new(da_layer);
let db = Arc::new(setup_db());
let signing_key = create_signing_key();
Expand Down Expand Up @@ -525,7 +591,7 @@ mod tests {
#[tokio::test]
#[serial]
async fn test_validate_invalid_update_fails() {
let (da_layer, _rx) = InMemoryDataAvailabilityLayer::new(1);
let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1);
let da_layer = Arc::new(da_layer);
let db = Arc::new(setup_db());
let sequencer = Arc::new(
Expand Down
Loading

0 comments on commit cfe61ec

Please sign in to comment.