Skip to content

Commit

Permalink
fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Sep 18, 2024
1 parent 767a07e commit d346197
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 111 deletions.
141 changes: 56 additions & 85 deletions crates/prism/src/node_types/lightclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use async_trait::async_trait;
use prism_common::tree::Digest;
use prism_errors::{DataAvailabilityError, GeneralError};
use sp1_sdk::{ProverClient, SP1VerifyingKey};
use std::{self, sync::Arc, time::Duration};
use tokio::{task::spawn, time::interval};
use std::{self, sync::Arc};
use tokio::{sync::broadcast, task::spawn};

use crate::{da::DataAvailabilityLayer, node_types::NodeType, utils::verify_signature};

Expand Down Expand Up @@ -58,97 +58,68 @@ impl LightClient {
let start_height = self.start_height;
spawn(async move {
let mut current_position = start_height;
let mut ticker = interval(Duration::from_secs(1));
loop {
// target is updated when a new header is received
let target = match self.da.get_latest_height().await {
Ok(target) => target,
Err(e) => {
error!("failed to update sync target, retrying: {:?}", e);
continue;
}
};

debug!("updated sync target to height {}", target);
for i in current_position..target {
trace!("processing height: {}", i);
match self.da.get_snark(i + 1).await {
Ok(epoch_json) => {
if epoch_json.is_none() {
continue;
}

let finalized_epoch = epoch_json.unwrap();
debug!("light client: got epochs at height {}", i + 1);
let mut height_rx = self.da.subscribe_to_heights();

// todo: verify adjacency to last heights, <- for this we need some sort of storage of epochs
let _prev_commitment = &finalized_epoch.prev_commitment;
let _current_commitment = &finalized_epoch.current_commitment;
loop {
match height_rx.recv().await {
Ok(target) => {
debug!("updated sync target to height {}", target);
for i in current_position..target {
trace!("processing height: {}", i);
match self.da.get_snark(i + 1).await {
Ok(Some(finalized_epoch)) => {
debug!("light client: got epochs at height {}", i + 1);

// if the user does not add a verifying key, we will not verify the signature,
// but only log a warning on startup
if self.sequencer_pubkey.is_some() {
match verify_signature(
&finalized_epoch.clone(),
self.sequencer_pubkey.clone(),
) {
Ok(_) => {
trace!(
"valid signature for epoch {}",
finalized_epoch.height
)
}
Err(e) => {
panic!("invalid signature in epoch {}: {:?}", i, e)
// Signature verification
if let Some(pubkey) = &self.sequencer_pubkey {
match verify_signature(&finalized_epoch, Some(pubkey.clone())) {
Ok(_) => trace!("valid signature for epoch {}", finalized_epoch.height),
Err(e) => panic!("invalid signature in epoch {}: {:?}", i, e),
}
}
}
}

let prev_commitment = &finalized_epoch.prev_commitment;
let current_commitment = &finalized_epoch.current_commitment;
// Commitment verification
let prev_commitment = &finalized_epoch.prev_commitment;
let current_commitment = &finalized_epoch.current_commitment;
let mut public_values = finalized_epoch.proof.public_values.clone();
let proof_prev_commitment: Digest = public_values.read();
let proof_current_commitment: Digest = public_values.read();

let mut public_values = finalized_epoch.proof.public_values.clone();
let proof_prev_commitment: Digest = public_values.read();
let proof_current_commitment: Digest = public_values.read();

if prev_commitment != &proof_prev_commitment
|| current_commitment != &proof_current_commitment
{
error!(
"Commitment mismatch:
prev_commitment: {:?}, proof_prev_commitment: {:?},
current_commitment: {:?}, proof_current_commitment: {:?}",
prev_commitment,
proof_prev_commitment,
current_commitment,
proof_current_commitment
);
panic!("Commitment mismatch in epoch {}", finalized_epoch.height);
}
if prev_commitment != &proof_prev_commitment
|| current_commitment != &proof_current_commitment
{
error!(
"Commitment mismatch:
prev_commitment: {:?}, proof_prev_commitment: {:?},
current_commitment: {:?}, proof_current_commitment: {:?}",
prev_commitment, proof_prev_commitment,
current_commitment, proof_current_commitment
);
panic!("Commitment mismatch in epoch {}", finalized_epoch.height);
}

match self
.client
.verify(&finalized_epoch.proof, &self.verifying_key)
{
Ok(_) => {
info!(
"zkSNARK for epoch {} was validated successfully",
finalized_epoch.height
)
}
Err(err) => panic!(
"failed to validate epoch at height {}: {:?}",
finalized_epoch.height, err
),
}
}
Err(e) => {
debug!("light client: getting epoch: {}", e)
// SNARK verification
match self.client.verify(&finalized_epoch.proof, &self.verifying_key) {
Ok(_) => info!("zkSNARK for epoch {} was validated successfully", finalized_epoch.height),
Err(err) => panic!("failed to validate epoch at height {}: {:?}", finalized_epoch.height, err),
}
},
Ok(None) => {
debug!("no finalized epoch found at height: {}", i + 1);
},
Err(e) => debug!("light client: getting epoch: {}", e),
};
}
};
current_position = target;
},
Err(broadcast::error::RecvError::Closed) => {
error!("Height channel closed unexpectedly");
break;
},
Err(broadcast::error::RecvError::Lagged(skipped)) => {
warn!("Lagged behind by {} messages", skipped);
},
}
ticker.tick().await; // only for testing purposes
current_position = target; // Update the current position to the latest target
}
})
.await
Expand Down
56 changes: 33 additions & 23 deletions crates/prism/src/node_types/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ed25519::Signature;
use ed25519_dalek::{Signer, SigningKey};
use jmt::KeyHash;
use prism_common::tree::{hash, Batch, Digest, Hasher, KeyDirectoryTree, Proof, SnarkableTree};
use std::{self, collections::VecDeque, str::FromStr, sync::Arc};
use std::{self, collections::VecDeque, str::FromStr, sync::Arc, thread::current};
use tokio::sync::{broadcast, Mutex};

use sp1_sdk::{ProverClient, SP1ProvingKey, SP1Stdin, SP1VerifyingKey};
Expand Down Expand Up @@ -136,6 +136,10 @@ impl Sequencer {
current_height += 1;
}

info!(
"finished historical sync from height {} to {}",
start_height, end_height
);
Ok(())
}

Expand All @@ -144,30 +148,32 @@ impl Sequencer {

loop {
let height = height_rx.recv().await?;
debug!("received height {}", height);

// Get pending operations
let pending_operations = {
let mut ops = self.pending_operations.lock().await;
std::mem::take(&mut *ops)
};

let op_count = pending_operations.len();

// If there are pending operations, submit them
if !pending_operations.is_empty() {
if !pending_operations.clone().is_empty() {
match self.da.submit_operations(pending_operations).await {
Ok(submitted_height) => {
info!(
"operation_loop: Submitted operations at height {}",
submitted_height
"post_batch_loop: submitted {} operations at height {}",
op_count, submitted_height
);
}
Err(e) => {
error!("operation_loop: Failed to submit operations: {}", e);
// TODO: Handle error (e.g., retry logic, returning operations to pending_operations)
error!("post_batch_loop: Failed to submit operations: {}", e);
}
}
} else {
debug!(
"operation_loop: No pending operations to submit at height {}",
"post_batch_loop: No pending operations to submit at height {}",
height
);
}
Expand Down Expand Up @@ -208,14 +214,21 @@ impl Sequencer {
let mut tree = self.tree.lock().await;
let prev_commitment = tree.get_commitment()?;

// once current_epoch > saved_epoch, the sequencer must create and post new FinalizedEpochs
if *current_epoch > saved_epoch {
debug!(
"processing height {}, saved_epoch: {}, current_epoch: {}",
height, saved_epoch, current_epoch
);

if !operations.is_empty() {
buffered_operations.push_back(operations);
}

if !buffered_operations.is_empty() && height > saved_epoch {
let all_ops: Vec<Operation> = buffered_operations.drain(..).flatten().collect();
*current_epoch = height;
self.finalize_new_epoch(*current_epoch, all_ops, &mut tree)
.await?;
}
// we haven't fully synced yet, the FinalizedEpoch is in the block
else if let Some(epoch) = epoch_result {
} else if let Some(epoch) = epoch_result {
self.process_existing_epoch(
epoch,
current_epoch,
Expand All @@ -225,14 +238,8 @@ impl Sequencer {
height,
)
.await?;
}
// there was no FinalizedEpoch in this block, so buffer its operations until we come across the next one
else {
buffered_operations.push_back(operations);
warn!(
"Epoch JSON not found for height {}. Operations buffered.",
height
);
} else {
debug!("No operations to process at height {}", height);
}

Ok(())
Expand Down Expand Up @@ -280,6 +287,8 @@ impl Sequencer {
operations: Vec<Operation>,
tree: &mut KeyDirectoryTree<Box<dyn Database>>,
) -> Result<Vec<Proof>> {
debug!("executing block with {} operations", operations.len());

let mut proofs = Vec::new();

for operation in operations {
Expand Down Expand Up @@ -314,9 +323,10 @@ impl Sequencer {
self.da.submit_snark(finalized_epoch).await?;

self.db.set_commitment(&height, &new_commitment)?;
self.db
.set_epoch(&height)
.context("Failed to set new epoch")?;
self.db.set_epoch(&height)?;

info!("Finalized new epoch at height {}", height);

Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions crates/prism/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn test_light_client_sequencer_talking() -> Result<()> {
std::env::set_var("RUST_LOG", "DEBUG");
pretty_env_logger::init();

let (da_layer, mut height_rx, mut _block_rx) = InMemoryDataAvailabilityLayer::new(10);
let (da_layer, mut height_rx, mut _block_rx) = InMemoryDataAvailabilityLayer::new(30);
let da_layer = Arc::new(da_layer);
let db = setup_db();
let cfg = Config::default();
Expand Down Expand Up @@ -128,12 +128,12 @@ async fn test_light_client_sequencer_talking() -> Result<()> {
}
}

tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(5000)).await;
}
});

while let Ok(height) = height_rx.recv().await {
if height == 60 {
if height == 5 {
break;
}
}
Expand Down

0 comments on commit d346197

Please sign in to comment.