Skip to content

Commit

Permalink
sequencer cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Sep 18, 2024
1 parent 6a94cef commit a1328f7
Show file tree
Hide file tree
Showing 6 changed files with 392 additions and 638 deletions.
137 changes: 56 additions & 81 deletions crates/prism/src/da/celestia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use celestia_rpc::{BlobClient, Client, HeaderClient};
use celestia_types::{blob::GasPrice, nmt::Namespace, Blob};
use prism_common::operation::Operation;
use prism_errors::{DataAvailabilityError, GeneralError};
use std::{self, sync::Arc};
use tokio::{
use std::{
self,
sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
atomic::{AtomicU64, Ordering},
Arc,
},
task::spawn,
};
use tokio::{sync::broadcast, task::spawn};

use bincode;

Expand All @@ -35,14 +35,12 @@ pub struct CelestiaConnection {
pub snark_namespace: Namespace,
pub operation_namespace: Namespace,

sync_target_tx: Arc<Sender<u64>>,
sync_target_rx: Arc<Mutex<Receiver<u64>>>,
height_update_tx: broadcast::Sender<u64>,
sync_target: Arc<AtomicU64>,
}

impl CelestiaConnection {
pub async fn new(config: &CelestiaConfig, auth_token: Option<&str>) -> Result<Self> {
let (tx, rx) = channel(CHANNEL_BUFFER_SIZE);

let client = Client::new(&config.connection_string, auth_token)
.await
.context("Failed to initialize websocket connection")
Expand All @@ -61,12 +59,14 @@ impl CelestiaConnection {
None => snark_namespace,
};

let (height_update_tx, _) = broadcast::channel(100);

Ok(CelestiaConnection {
client,
snark_namespace,
operation_namespace,
sync_target_tx: Arc::new(tx),
sync_target_rx: Arc::new(Mutex::new(rx)),
height_update_tx,
sync_target: Arc::new(AtomicU64::new(0)),
})
}
}
Expand All @@ -86,41 +86,38 @@ fn create_namespace(namespace_hex: &str) -> Result<Namespace> {
#[async_trait]
impl DataAvailabilityLayer for CelestiaConnection {
async fn get_latest_height(&self) -> Result<u64> {
match self.sync_target_rx.lock().await.recv().await {
Some(height) => Ok(height),
None => Err(anyhow!(DataAvailabilityError::ChannelReceiveError)),
}
Ok(self.sync_target.load(Ordering::Relaxed))
}

async fn initialize_sync_target(&self) -> Result<u64> {
HeaderClient::header_network_head(&self.client)
let height = HeaderClient::header_network_head(&self.client)
.await
.context("Failed to get network head from DA layer")
.map(|extended_header| extended_header.header.height.value())
.map(|extended_header| extended_header.header.height.value())?;

self.sync_target.store(height, Ordering::Relaxed);
Ok(height)
}

async fn get_snarks(&self, height: u64) -> Result<Vec<FinalizedEpoch>> {
async fn get_snark(&self, height: u64) -> Result<Option<FinalizedEpoch>> {
trace!("searching for epoch on da layer at height {}", height);

match BlobClient::blob_get_all(&self.client, height, &[self.snark_namespace]).await {
Ok(blobs) => {
let mut epochs = Vec::new();
for blob in blobs.iter() {
match FinalizedEpoch::try_from(blob) {
Ok(epoch_json) => epochs.push(epoch_json),
Err(_) => {
GeneralError::ParsingError(format!(
"marshalling blob from height {} to epoch json: {:?}",
height, &blob
));
}
}
}
Ok(epochs)
}
Ok(blobs) => blobs
.into_iter()
.next()
.map(|blob| {
FinalizedEpoch::try_from(&blob).map_err(|_| {
anyhow!(GeneralError::ParsingError(format!(
"marshalling blob from height {} to epoch json: {:?}",
height, &blob
)))
})
})
.transpose(),
Err(err) => {
// todo: this is a hack to handle a retarded error from cel-node that will be fixed in v0.15.0
if err.to_string().contains("blob: not found") {
Ok(vec![])
Ok(None)
} else {
Err(anyhow!(DataAvailabilityError::DataRetrievalError(
height,
Expand All @@ -131,38 +128,22 @@ impl DataAvailabilityLayer for CelestiaConnection {
}
}

async fn submit_snarks(&self, epochs: Vec<FinalizedEpoch>) -> Result<u64> {
if epochs.is_empty() {
bail!("no epochs provided for submission");
}
async fn submit_snark(&self, epoch: FinalizedEpoch) -> Result<u64> {
debug!("posting {}th epoch to da layer", epoch.height);

debug!("posting {} epochs to da layer", epochs.len());
let data = bincode::serialize(&epoch).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing epoch {}: {}",
epoch.height, e
)))
})?;

let blobs: Result<Vec<Blob>, DataAvailabilityError> = epochs
.iter()
.map(|epoch| {
let data = bincode::serialize(epoch).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing epoch {}: {}",
epoch.height, e
)))
})?;
Blob::new(self.snark_namespace, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(
e.to_string(),
))
})
})
.collect();

let blobs = blobs?;

for (i, blob) in blobs.iter().enumerate() {
trace!("blob {}: {:?}", i, blob);
}
let blob = Blob::new(self.snark_namespace, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(e.to_string()))
})?;

self.client
.blob_submit(&blobs, GasPrice::from(-1.0))
.blob_submit(&[blob], GasPrice::from(-1.0))
.await
.map_err(|e| anyhow!(DataAvailabilityError::SubmissionError(e.to_string())))
}
Expand Down Expand Up @@ -230,35 +211,29 @@ impl DataAvailabilityLayer for CelestiaConnection {
.map_err(|e| anyhow!(DataAvailabilityError::SubmissionError(e.to_string())))
}

fn subscribe_to_heights(&self) -> broadcast::Receiver<u64> {
self.height_update_tx.subscribe()
}

async fn start(&self) -> Result<()> {
let mut header_sub = HeaderClient::header_subscribe(&self.client)
.await
.context("Failed to subscribe to headers from DA layer")
.map_err(|e| DataAvailabilityError::NetworkError(e.to_string()))?;
.context("Failed to subscribe to headers from DA layer")?;

let sync_target = self.sync_target.clone();
let height_update_tx = self.height_update_tx.clone();

let synctarget_buffer = self.sync_target_tx.clone();
spawn(async move {
while let Some(extended_header_result) = header_sub.next().await {
match extended_header_result {
Ok(extended_header) => {
let height = extended_header.header.height.value();
match synctarget_buffer.send(height).await {
Ok(_) => {
debug!("sent sync target update for height {}", height);
}
Err(_) => {
DataAvailabilityError::SyncTargetError(format!(
"sending sync target update message for height {}",
height
));
}
}
sync_target.store(height, Ordering::Relaxed);
let _ = height_update_tx.send(height);
debug!("updated sync target for height {}", height);
}
Err(e) => {
DataAvailabilityError::NetworkError(format!(
"retrieving header from da layer: {}",
e
));
error!("Error retrieving header from DA layer: {}", e);
}
}
}
Expand Down
25 changes: 14 additions & 11 deletions crates/prism/src/da/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::da::{DataAvailabilityLayer, FinalizedEpoch};
use anyhow::Result;
use async_trait::async_trait;
use prism_common::operation::Operation;
use std::sync::Arc;
use std::{collections::VecDeque, sync::Arc};
use tokio::{
sync::{broadcast, RwLock},
time::{interval, Duration},
Expand All @@ -12,14 +12,14 @@ use tokio::{
pub struct Block {
pub height: u64,
pub operations: Vec<Operation>,
pub epochs: Vec<FinalizedEpoch>,
pub epoch: Option<FinalizedEpoch>,
}

#[derive(Clone)]
pub struct InMemoryDataAvailabilityLayer {
blocks: Arc<RwLock<Vec<Block>>>,
pending_operations: Arc<RwLock<Vec<Operation>>>,
pending_epochs: Arc<RwLock<Vec<FinalizedEpoch>>>,
pending_epochs: Arc<RwLock<VecDeque<FinalizedEpoch>>>,
latest_height: Arc<RwLock<u64>>,
height_update_tx: broadcast::Sender<u64>,
block_update_tx: broadcast::Sender<Block>,
Expand All @@ -34,7 +34,7 @@ impl InMemoryDataAvailabilityLayer {
Self {
blocks: Arc::new(RwLock::new(Vec::new())),
pending_operations: Arc::new(RwLock::new(Vec::new())),
pending_epochs: Arc::new(RwLock::new(Vec::new())),
pending_epochs: Arc::new(RwLock::new(VecDeque::new())),
latest_height: Arc::new(RwLock::new(0)),
height_update_tx: height_tx,
block_update_tx: block_tx,
Expand All @@ -58,13 +58,12 @@ impl InMemoryDataAvailabilityLayer {
let new_block = Block {
height: *latest_height,
operations: std::mem::take(&mut *pending_operations),
epochs: std::mem::take(&mut *pending_epochs),
epoch: pending_epochs.pop_front(),
};
debug!(
"new block produced at height {} with {} operations and {} snarks",
"new block produced at height {} with {} operations",
new_block.height,
new_block.operations.len(),
new_block.epochs.len()
);
blocks.push(new_block.clone());

Expand All @@ -81,6 +80,10 @@ impl InMemoryDataAvailabilityLayer {

#[async_trait]
impl DataAvailabilityLayer for InMemoryDataAvailabilityLayer {
fn subscribe_to_heights(&self) -> broadcast::Receiver<u64> {
self.height_update_tx.subscribe()
}

async fn get_latest_height(&self) -> Result<u64> {
Ok(*self.latest_height.read().await)
}
Expand All @@ -89,18 +92,18 @@ impl DataAvailabilityLayer for InMemoryDataAvailabilityLayer {
self.get_latest_height().await
}

async fn get_snarks(&self, height: u64) -> Result<Vec<FinalizedEpoch>> {
async fn get_snark(&self, height: u64) -> Result<Option<FinalizedEpoch>> {
let blocks = self.blocks.read().await;
Ok(blocks
.iter()
.find(|block| block.height == height)
.map(|block| block.epochs.clone())
.map(|block| block.epoch.clone())
.unwrap_or_default())
}

async fn submit_snarks(&self, epochs: Vec<FinalizedEpoch>) -> Result<u64> {
async fn submit_snark(&self, epoch: FinalizedEpoch) -> Result<u64> {
let mut pending_epochs = self.pending_epochs.write().await;
pending_epochs.extend(epochs);
pending_epochs.push_back(epoch);
self.get_latest_height().await
}

Expand Down
6 changes: 4 additions & 2 deletions crates/prism/src/da/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use prism_errors::GeneralError;
use serde::{Deserialize, Serialize};
use sp1_sdk::SP1ProofWithPublicValues;
use std::{self, str::FromStr};
use tokio::sync::broadcast;

pub mod celestia;
pub mod memory;
Expand Down Expand Up @@ -50,9 +51,10 @@ impl SignedContent for FinalizedEpoch {
pub trait DataAvailabilityLayer: Send + Sync {
async fn get_latest_height(&self) -> Result<u64>;
async fn initialize_sync_target(&self) -> Result<u64>;
async fn get_snarks(&self, height: u64) -> Result<Vec<FinalizedEpoch>>;
async fn submit_snarks(&self, epoch: Vec<FinalizedEpoch>) -> Result<u64>;
async fn get_snark(&self, height: u64) -> Result<Option<FinalizedEpoch>>;
async fn submit_snark(&self, epoch: FinalizedEpoch) -> Result<u64>;
async fn get_operations(&self, height: u64) -> Result<Vec<Operation>>;
async fn submit_operations(&self, operations: Vec<Operation>) -> Result<u64>;
async fn start(&self) -> Result<()>;
fn subscribe_to_heights(&self) -> broadcast::Receiver<u64>;
}
Loading

0 comments on commit a1328f7

Please sign in to comment.