Skip to content

Commit

Permalink
posting to DA layer
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Jul 29, 2024
1 parent 2050fec commit 6f1266f
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 49 deletions.
5 changes: 3 additions & 2 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use indexed_merkle_tree::{sha256_mod, Hash};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use borsh::{BorshDeserialize, BorshSerialize};

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
#[derive(Clone, BorshDeserialize, BorshSerialize, Serialize, Deserialize, Debug, PartialEq)]
// An [`Operation`] represents a state transition in the system.
// In a blockchain analogy, this would be the full set of our transaction types.
pub enum Operation {
Expand All @@ -24,7 +25,7 @@ pub enum Operation {
},
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
#[derive(Clone, BorshSerialize, BorshDeserialize, Serialize, Deserialize, Debug, PartialEq)]
// An [`AccountSource`] represents the source of an account. See adr-002 for more information.
pub enum AccountSource {
SignedBySequencer { signature: String },
Expand Down
83 changes: 79 additions & 4 deletions src/da/celestia.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
common::Operation,
consts::CHANNEL_BUFFER_SIZE,
da::{DataAvailabilityLayer, FinalizedEpoch},
error::{DAResult, DataAvailabilityError, GeneralError},
Expand All @@ -25,9 +26,19 @@ impl TryFrom<&Blob> for FinalizedEpoch {
}
}

impl TryFrom<&Blob> for Operation {
type Error = GeneralError;

fn try_from(value: &Blob) -> Result<Self, GeneralError> {
from_slice::<Self>(&value.data)
.map_err(|e| GeneralError::DecodingError(format!("decoding blob: {}", e)))
}
}

pub struct CelestiaConnection {
pub client: celestia_rpc::Client,
pub namespace_id: Namespace,
pub snark_namespace: Namespace,
pub operation_namespace: Namespace,

synctarget_tx: Arc<Sender<u64>>,
synctarget_rx: Arc<Mutex<Receiver<u64>>>,
Expand Down Expand Up @@ -72,7 +83,9 @@ impl CelestiaConnection {

Ok(CelestiaConnection {
client,
namespace_id,
snark_namespace: namespace_id,
// TODO: pass in second namespace
operation_namespace: namespace_id,
synctarget_tx: Arc::new(tx),
synctarget_rx: Arc::new(Mutex::new(rx)),
})
Expand Down Expand Up @@ -100,7 +113,7 @@ impl DataAvailabilityLayer for CelestiaConnection {

async fn get_snarks(&self, height: u64) -> DAResult<Vec<FinalizedEpoch>> {
trace!("searching for epoch on da layer at height {}", height);
match BlobClient::blob_get_all(&self.client, height, &[self.namespace_id]).await {
match BlobClient::blob_get_all(&self.client, height, &[self.snark_namespace]).await {
Ok(blobs) => {
let mut epochs = Vec::new();
for blob in blobs.iter() {
Expand Down Expand Up @@ -129,6 +142,7 @@ impl DataAvailabilityLayer for CelestiaConnection {
}
}
}

async fn submit_snarks(&self, epochs: Vec<FinalizedEpoch>) -> DAResult<u64> {
if epochs.is_empty() {
return Err(DataAvailabilityError::GeneralError(
Expand All @@ -147,7 +161,7 @@ impl DataAvailabilityLayer for CelestiaConnection {
epoch.height, e
)))
})?;
Blob::new(self.namespace_id, data).map_err(|e| {
Blob::new(self.snark_namespace, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(
e.to_string(),
))
Expand All @@ -172,6 +186,67 @@ impl DataAvailabilityLayer for CelestiaConnection {
}
}

async fn get_operations(&self, height: u64) -> DAResult<Vec<Operation>> {
trace!("searching for operations on da layer at height {}", height);
match BlobClient::blob_get_all(&self.client, height, &[self.operation_namespace]).await {
Ok(blobs) => {
let mut operations = Vec::new();
for blob in blobs.iter() {
match Operation::try_from(blob) {
Ok(operation) => operations.push(operation),
Err(_) => {
debug!(
"marshalling blob from height {} to operation failed: {:?}",
height, &blob
)
}
}
}
Ok(operations)
}
Err(err) => Err(DataAvailabilityError::DataRetrievalError(
height,
format!("getting operations from da layer: {}", err),
)
.into()),
}
}

async fn submit_operations(&self, operations: Vec<Operation>) -> DAResult<u64> {
debug!("posting {} operations to DA layer", operations.len());
let blobs: Result<Vec<Blob>, DataAvailabilityError> = operations
.iter()
.map(|operation| {
let data = borsh::to_vec(operation).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing operation {}: {}",
operation, e
)))
})?;
Blob::new(self.operation_namespace, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(
e.to_string(),
))
})
})
.collect();

let blobs = blobs?;

for (i, blob) in blobs.iter().enumerate() {
trace!("blob {}: {:?}", i, blob);
}

match self.client.blob_submit(&blobs, GasPrice::from(-1.0)).await {
Ok(height) => Ok(height),
Err(err) => Err(DataAvailabilityError::SubmissionError(
// todo: fucking submission error is yikes, we need anyhow
0,
err.to_string(),
)),
}
}

async fn start(&self) -> DAResult<()> {
let mut header_sub = HeaderClient::header_subscribe(&self.client)
.await
Expand Down
96 changes: 93 additions & 3 deletions src/da/mock.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::error::{DAResult, DataAvailabilityError};
use crate::{
common::Operation,
error::{DAResult, DataAvailabilityError},
};
use async_trait::async_trait;
use fs2::FileExt;
use serde_json::{json, Value};
Expand Down Expand Up @@ -27,6 +30,14 @@ impl DataAvailabilityLayer for NoopDataAvailabilityLayer {
Ok(vec![])
}

async fn get_operations(&self, _: u64) -> DAResult<Vec<Operation>> {
Ok(vec![])
}

async fn submit_operations(&self, _: Vec<Operation>) -> DAResult<u64> {
Ok(0)
}

async fn submit_snarks(&self, _: Vec<FinalizedEpoch>) -> DAResult<u64> {
Ok(0)
}
Expand All @@ -41,11 +52,13 @@ impl DataAvailabilityLayer for NoopDataAvailabilityLayer {
/// This allows to write and test the functionality of systems that interact with a data availability layer without the need for an actual external service or network like we do with Celestia.
///
/// This implementation is intended for testing and development only and should not be used in production environments. It provides a way to test the interactions with the data availability layer without the overhead of real network communication or data persistence.
pub struct LocalDataAvailabilityLayer {}
pub struct LocalDataAvailabilityLayer {
pub op_height: u64,
}

impl LocalDataAvailabilityLayer {
pub fn new() -> Self {
LocalDataAvailabilityLayer {}
LocalDataAvailabilityLayer { op_height: 0 }
}
}

Expand All @@ -65,6 +78,83 @@ impl DataAvailabilityLayer for LocalDataAvailabilityLayer {
Ok(0) // header starts always at zero in test cases
}

async fn get_operations(&self, height: u64) -> DAResult<Vec<Operation>> {
let mut file = File::open("operations.json").expect("Unable to open operations file");
let mut contents = String::new();
file.lock_exclusive()
.expect("Unable to lock operations file");
file.read_to_string(&mut contents)
.expect("Unable to read operations file");

let data: Value =
serde_json::from_str(&contents).expect("Invalid JSON format in operations file");

if let Some(operations) = data.get(height.to_string()) {
let operations_hex = operations
.as_str()
.expect("Operations value is not a string");
let operations_bytes =
hex::decode(operations_hex).expect("Invalid hex string for operations");

let result_operations: Result<Vec<Operation>, _> = borsh::from_slice(&operations_bytes);

file.unlock().expect("Unable to unlock operations file");
Ok(result_operations.expect("Wrong format for operations"))
} else {
file.unlock().expect("Unable to unlock operations file");
Err(DataAvailabilityError::DataRetrievalError(
height,
"Could not get operations from DA layer".to_string(),
))
}
}

async fn submit_operations(&self, operations: Vec<Operation>) -> DAResult<u64> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open("operations.json")
.expect("Unable to open operations file");

let mut contents = String::new();

file.lock_exclusive()
.expect("Unable to lock operations file");
info!("operations file locked");

file.read_to_string(&mut contents)
.expect("Unable to read operations file");

let mut data: Value = if contents.is_empty() {
json!({})
} else {
serde_json::from_str(&contents).expect("Invalid JSON format in operations file")
};

// Add new operations to existing json-file data
data[self.op_height.to_string()] =
hex::encode(borsh::to_vec(&operations).expect("Unable to serialize operations")).into();

// Reset the file pointer to the beginning of the file
file.seek(std::io::SeekFrom::Start(0))
.expect("Unable to seek to start of operations file");

// Write the updated data into the file
file.write_all(data.to_string().as_bytes())
.expect("Unable to write operations file");

// Truncate the file to the current pointer to remove any extra data
file.set_len(data.to_string().as_bytes().len() as u64)
.expect("Unable to set operations file length");

file.unlock().expect("Unable to unlock operations file");
info!("operations file unlocked");

Ok(self.op_height + 1)
}

async fn get_snarks(&self, height: u64) -> DAResult<Vec<FinalizedEpoch>> {
let mut file = File::open("data.json").expect("Unable to open file");
let mut contents = String::new();
Expand Down
3 changes: 3 additions & 0 deletions src/da/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
common::Operation,
error::{DAResult, GeneralError, PrismResult},
utils::SignedContent,
zk_snark::{Bls12Proof, VerifyingKey},
Expand Down Expand Up @@ -53,5 +54,7 @@ pub trait DataAvailabilityLayer: Send + Sync {
async fn initialize_sync_target(&self) -> DAResult<u64>;
async fn get_snarks(&self, height: u64) -> DAResult<Vec<FinalizedEpoch>>;
async fn submit_snarks(&self, epoch: Vec<FinalizedEpoch>) -> DAResult<u64>;
async fn get_operations(&self, height: u64) -> DAResult<Vec<Operation>>;
async fn submit_operations(&self, operations: Vec<Operation>) -> DAResult<u64>;
async fn start(&self) -> DAResult<()>;
}
Loading

0 comments on commit 6f1266f

Please sign in to comment.