Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wip): based sequencing #101

Merged
merged 9 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use indexed_merkle_tree::{sha256_mod, Hash};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use borsh::{BorshDeserialize, BorshSerialize};

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
#[derive(Clone, BorshDeserialize, BorshSerialize, Serialize, Deserialize, Debug, PartialEq)]
// An [`Operation`] represents a state transition in the system.
// In a blockchain analogy, this would be the full set of our transaction types.
pub enum Operation {
Expand All @@ -24,7 +25,7 @@ pub enum Operation {
},
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
#[derive(Clone, BorshSerialize, BorshDeserialize, Serialize, Deserialize, Debug, PartialEq)]
// An [`AccountSource`] represents the source of an account. See adr-002 for more information.
pub enum AccountSource {
SignedBySequencer { signature: String },
Expand Down
136 changes: 114 additions & 22 deletions src/da/celestia.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
common::Operation,
consts::CHANNEL_BUFFER_SIZE,
da::{DataAvailabilityLayer, FinalizedEpoch},
error::{DAResult, DataAvailabilityError, GeneralError},
Expand All @@ -25,9 +26,19 @@ impl TryFrom<&Blob> for FinalizedEpoch {
}
}

impl TryFrom<&Blob> for Operation {
type Error = GeneralError;

fn try_from(value: &Blob) -> Result<Self, GeneralError> {
from_slice::<Self>(&value.data)
.map_err(|e| GeneralError::DecodingError(format!("decoding blob: {}", e)))
}
}

pub struct CelestiaConnection {
pub client: celestia_rpc::Client,
pub namespace_id: Namespace,
pub snark_namespace: Namespace,
pub operation_namespace: Namespace,
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved

synctarget_tx: Arc<Sender<u64>>,
synctarget_rx: Arc<Mutex<Receiver<u64>>>,
Expand Down Expand Up @@ -72,7 +83,9 @@ impl CelestiaConnection {

Ok(CelestiaConnection {
client,
namespace_id,
snark_namespace: namespace_id,
// TODO: pass in second namespace
operation_namespace: namespace_id,
synctarget_tx: Arc::new(tx),
synctarget_rx: Arc::new(Mutex::new(rx)),
})
Expand All @@ -98,9 +111,9 @@ impl DataAvailabilityLayer for CelestiaConnection {
}
}

async fn get(&self, height: u64) -> DAResult<Vec<FinalizedEpoch>> {
async fn get_snarks(&self, height: u64) -> DAResult<Vec<FinalizedEpoch>> {
trace!("searching for epoch on da layer at height {}", height);
match BlobClient::blob_get_all(&self.client, height, &[self.namespace_id]).await {
match BlobClient::blob_get_all(&self.client, height, &[self.snark_namespace]).await {
Ok(blobs) => {
let mut epochs = Vec::new();
for blob in blobs.iter() {
Expand Down Expand Up @@ -130,27 +143,106 @@ impl DataAvailabilityLayer for CelestiaConnection {
}
}

async fn submit(&self, epoch: &FinalizedEpoch) -> DAResult<u64> {
debug!("posting epoch {} to da layer", epoch.height);
async fn submit_snarks(&self, epochs: Vec<FinalizedEpoch>) -> DAResult<u64> {
if epochs.is_empty() {
return Err(DataAvailabilityError::GeneralError(
GeneralError::MissingArgumentError("No epochs provided for submission".to_string()),
));
}

let data = borsh::to_vec(&epoch).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing epoch: {}",
e
)))
})?;
let blob = Blob::new(self.namespace_id, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(e.to_string()))
})?;
trace!("blob: {:?}", &blob);
match self
.client
.blob_submit(&[blob.clone()], GasPrice::from(-1.0))
.await
{
debug!("posting {} epochs to da layer", epochs.len());

let blobs: Result<Vec<Blob>, DataAvailabilityError> = epochs
.iter()
.map(|epoch| {
let data = borsh::to_vec(epoch).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing epoch {}: {}",
epoch.height, e
)))
})?;
Blob::new(self.snark_namespace, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(
e.to_string(),
))
})
})
.collect();

let blobs = blobs?;

for (i, blob) in blobs.iter().enumerate() {
trace!("blob {}: {:?}", i, blob);
}

let last_epoch_height = epochs.last().map(|e| e.height).unwrap_or(0);

match self.client.blob_submit(&blobs, GasPrice::from(-1.0)).await {
Ok(height) => Ok(height),
Err(err) => Err(DataAvailabilityError::SubmissionError(
last_epoch_height,
err.to_string(),
)),
}
}

async fn get_operations(&self, height: u64) -> DAResult<Vec<Operation>> {
trace!("searching for operations on da layer at height {}", height);
match BlobClient::blob_get_all(&self.client, height, &[self.operation_namespace]).await {
Ok(blobs) => {
let mut operations = Vec::new();
for blob in blobs.iter() {
match Operation::try_from(blob) {
Ok(operation) => operations.push(operation),
Err(_) => {
debug!(
"marshalling blob from height {} to operation failed: {:?}",
height, &blob
)
}
}
}
Ok(operations)
}
// TODO: replace this error
Err(err) => Err(DataAvailabilityError::DataRetrievalError(
height,
format!("getting operations from da layer: {}", err),
)
.into()),
}
}

async fn submit_operations(&self, operations: Vec<Operation>) -> DAResult<u64> {
debug!("posting {} operations to DA layer", operations.len());
let blobs: Result<Vec<Blob>, DataAvailabilityError> = operations
.iter()
.map(|operation| {
let data = borsh::to_vec(operation).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing operation {}: {}",
operation, e
)))
})?;
Blob::new(self.operation_namespace, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(
e.to_string(),
))
})
})
.collect();

let blobs = blobs?;

for (i, blob) in blobs.iter().enumerate() {
trace!("blob {}: {:?}", i, blob);
}

match self.client.blob_submit(&blobs, GasPrice::from(-1.0)).await {
Ok(height) => Ok(height),
Err(err) => Err(DataAvailabilityError::SubmissionError(
epoch.height,
// todo: fucking submission error is yikes, we need anyhow
0,
err.to_string(),
)),
}
Expand Down
121 changes: 108 additions & 13 deletions src/da/mock.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::error::{DAResult, DataAvailabilityError};
use crate::{
common::Operation,
error::{DAResult, DataAvailabilityError},
};
use async_trait::async_trait;
use fs2::FileExt;
use serde_json::{json, Value};
Expand All @@ -23,11 +26,19 @@ impl DataAvailabilityLayer for NoopDataAvailabilityLayer {
Ok(0)
}

async fn get(&self, _: u64) -> DAResult<Vec<FinalizedEpoch>> {
async fn get_snarks(&self, _: u64) -> DAResult<Vec<FinalizedEpoch>> {
Ok(vec![])
}

async fn get_operations(&self, _: u64) -> DAResult<Vec<Operation>> {
Ok(vec![])
}

async fn submit(&self, _: &FinalizedEpoch) -> DAResult<u64> {
async fn submit_operations(&self, _: Vec<Operation>) -> DAResult<u64> {
Ok(0)
}

async fn submit_snarks(&self, _: Vec<FinalizedEpoch>) -> DAResult<u64> {
Ok(0)
}

Expand All @@ -41,11 +52,13 @@ impl DataAvailabilityLayer for NoopDataAvailabilityLayer {
/// This allows to write and test the functionality of systems that interact with a data availability layer without the need for an actual external service or network like we do with Celestia.
///
/// This implementation is intended for testing and development only and should not be used in production environments. It provides a way to test the interactions with the data availability layer without the overhead of real network communication or data persistence.
pub struct LocalDataAvailabilityLayer {}
pub struct LocalDataAvailabilityLayer {
pub op_height: u64,
}

impl LocalDataAvailabilityLayer {
pub fn new() -> Self {
LocalDataAvailabilityLayer {}
LocalDataAvailabilityLayer { op_height: 0 }
}
}

Expand All @@ -65,7 +78,84 @@ impl DataAvailabilityLayer for LocalDataAvailabilityLayer {
Ok(0) // header starts always at zero in test cases
}

async fn get(&self, height: u64) -> DAResult<Vec<FinalizedEpoch>> {
async fn get_operations(&self, height: u64) -> DAResult<Vec<Operation>> {
let mut file = File::open("operations.json").expect("Unable to open operations file");
let mut contents = String::new();
file.lock_exclusive()
.expect("Unable to lock operations file");
file.read_to_string(&mut contents)
.expect("Unable to read operations file");

let data: Value =
serde_json::from_str(&contents).expect("Invalid JSON format in operations file");

if let Some(operations) = data.get(height.to_string()) {
let operations_hex = operations
.as_str()
.expect("Operations value is not a string");
let operations_bytes =
hex::decode(operations_hex).expect("Invalid hex string for operations");

let result_operations: Result<Vec<Operation>, _> = borsh::from_slice(&operations_bytes);

file.unlock().expect("Unable to unlock operations file");
Ok(result_operations.expect("Wrong format for operations"))
} else {
file.unlock().expect("Unable to unlock operations file");
Err(DataAvailabilityError::DataRetrievalError(
height,
"Could not get operations from DA layer".to_string(),
))
}
}

async fn submit_operations(&self, operations: Vec<Operation>) -> DAResult<u64> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open("operations.json")
.expect("Unable to open operations file");

let mut contents = String::new();

file.lock_exclusive()
.expect("Unable to lock operations file");
info!("operations file locked");

file.read_to_string(&mut contents)
.expect("Unable to read operations file");

let mut data: Value = if contents.is_empty() {
json!({})
} else {
serde_json::from_str(&contents).expect("Invalid JSON format in operations file")
};

// Add new operations to existing json-file data
data[self.op_height.to_string()] =
hex::encode(borsh::to_vec(&operations).expect("Unable to serialize operations")).into();

// Reset the file pointer to the beginning of the file
file.seek(std::io::SeekFrom::Start(0))
.expect("Unable to seek to start of operations file");

// Write the updated data into the file
file.write_all(data.to_string().as_bytes())
.expect("Unable to write operations file");

// Truncate the file to the current pointer to remove any extra data
file.set_len(data.to_string().as_bytes().len() as u64)
.expect("Unable to set operations file length");

file.unlock().expect("Unable to unlock operations file");
info!("operations file unlocked");

Ok(self.op_height + 1)
}

async fn get_snarks(&self, height: u64) -> DAResult<Vec<FinalizedEpoch>> {
let mut file = File::open("data.json").expect("Unable to open file");
let mut contents = String::new();
file.lock_exclusive().expect("Unable to lock file");
Expand All @@ -91,7 +181,12 @@ impl DataAvailabilityLayer for LocalDataAvailabilityLayer {
}
}

async fn submit(&self, epoch: &FinalizedEpoch) -> DAResult<u64> {
async fn submit_snarks(&self, epochs: Vec<FinalizedEpoch>) -> DAResult<u64> {
// we only expect one epoch to be submitted
assert!(epochs.len() == 1);

let epoch = epochs.first().expect("No epoch to submit");

let mut file = OpenOptions::new()
.read(true)
.write(true)
Expand Down Expand Up @@ -261,14 +356,14 @@ mod tests {
);

sequencer_layer
.submit(&FinalizedEpoch {
.submit_snarks(vec![FinalizedEpoch {
height: 1,
prev_commitment,
current_commitment: tree.get_commitment().unwrap(),
proof: bls12proof,
verifying_key: vk,
signature: None,
})
}])
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(65)).await;
Expand All @@ -293,14 +388,14 @@ mod tests {
vec![second_insert_zk_snark, third_insert_zk_snark],
);
sequencer_layer
.submit(&FinalizedEpoch {
.submit_snarks(vec![FinalizedEpoch {
height: 2,
prev_commitment,
current_commitment: tree.get_commitment().unwrap(),
proof,
verifying_key: vk,
signature: None,
})
}])
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(65)).await;
Expand All @@ -310,7 +405,7 @@ mod tests {
debug!("light client started");
let light_client_layer = LocalDataAvailabilityLayer::new();
loop {
let epoch = light_client_layer.get(1).await.unwrap();
let epoch = light_client_layer.get_snarks(1).await.unwrap();
// verify proofs
verify_epoch_json(epoch);
debug!("light client verified epoch 1");
Expand All @@ -319,7 +414,7 @@ mod tests {
tokio::time::sleep(tokio::time::Duration::from_secs(70)).await;

// Der Light Client liest Beweise und Commitments
let epoch = light_client_layer.get(2).await.unwrap();
let epoch = light_client_layer.get_snarks(2).await.unwrap();
// verify proofs
verify_epoch_json(epoch);
debug!("light client verified epoch 2");
Expand Down
Loading
Loading