Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wip): based sequencing #101

Merged
merged 9 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions src/cfg.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::consts::{DA_RETRY_COUNT, DA_RETRY_INTERVAL};
use crate::error::{DataAvailabilityError, PrismError, PrismResult, GeneralError};
use crate::error::{DataAvailabilityError, GeneralError, PrismError, PrismResult};
use clap::{Parser, Subcommand};
use config::{builder::DefaultState, ConfigBuilder, File};
use dirs::home_dir;
Expand Down Expand Up @@ -31,9 +31,13 @@ pub struct CommandLineArgs {
#[arg(short = 'r', long)]
redis_client: Option<String>,

/// Celestia Namespace ID
#[arg(short = 'n', long)]
celestia_namespace_id: Option<String>,
/// Celestia Snark Namespace ID
#[arg(long)]
snark_namespace_id: Option<String>,

/// Celestia Operation Namespace ID
#[arg(long)]
operation_namespace_id: Option<String>,

// Height to start searching the DA layer for SNARKs on
#[arg(short = 's', long)]
Expand Down Expand Up @@ -113,15 +117,17 @@ impl Default for RedisConfig {
pub struct CelestiaConfig {
pub connection_string: String,
pub start_height: u64,
pub namespace_id: String,
pub snark_namespace_id: String,
pub operation_namespace_id: Option<String>,
}

impl Default for CelestiaConfig {
fn default() -> Self {
CelestiaConfig {
connection_string: "ws://localhost:26658".to_string(),
start_height: 0,
namespace_id: "00000000000000de1008".to_string(),
snark_namespace_id: "00000000000000de1008".to_string(),
operation_namespace_id: Some("00000000000000de1009".to_string()),
}
}
}
Expand Down Expand Up @@ -174,9 +180,7 @@ pub fn load_config(args: CommandLineArgs) -> PrismResult<Config> {
fn get_config_path(args: &CommandLineArgs) -> PrismResult<String> {
args.config_path
.clone()
.or_else(|| {
home_dir().map(|path| format!("{}/.prism/config.toml", path.to_string_lossy()))
})
.or_else(|| home_dir().map(|path| format!("{}/.prism/config.toml", path.to_string_lossy())))
.ok_or_else(|| {
GeneralError::MissingArgumentError("could not determine config path".to_string()).into()
})
Expand Down Expand Up @@ -261,13 +265,21 @@ fn apply_command_line_args(config: Config, args: CommandLineArgs) -> Config {
.map(|c| c.start_height)
.unwrap_or_else(|| CelestiaConfig::default().start_height)
}),
namespace_id: args.celestia_namespace_id.unwrap_or_else(|| {
snark_namespace_id: args.snark_namespace_id.unwrap_or_else(|| {
config
.celestia_config
.as_ref()
.map(|c| c.namespace_id.clone())
.unwrap_or_else(|| CelestiaConfig::default().namespace_id)
.map(|c| c.snark_namespace_id.clone())
.unwrap_or_else(|| CelestiaConfig::default().snark_namespace_id)
}),
operation_namespace_id: Some(args.operation_namespace_id.unwrap_or_else(|| {
config
.celestia_config
.as_ref()
.map(|c| c.operation_namespace_id.clone())
.unwrap_or_else(|| CelestiaConfig::default().operation_namespace_id)
.unwrap()
})),
}),
da_layer: config.da_layer,
epoch_time: Some(args.epoch_time.unwrap_or_else(|| {
Expand Down Expand Up @@ -296,13 +308,7 @@ pub async fn initialize_da_layer(
))?;

for attempt in 1..=DA_RETRY_COUNT {
match CelestiaConnection::new(
&celestia_conf.connection_string,
None,
&celestia_conf.namespace_id,
)
.await
{
match CelestiaConnection::new(&celestia_conf, None).await {
Ok(da) => return Ok(Arc::new(da) as Arc<dyn DataAvailabilityLayer + 'static>),
Err(e) => {
if attempt == DA_RETRY_COUNT {
Expand All @@ -323,8 +329,6 @@ pub async fn initialize_da_layer(
Ok(Arc::new(LocalDataAvailabilityLayer::new())
as Arc<dyn DataAvailabilityLayer + 'static>)
}
DALayerOption::None => Err(PrismError::ConfigError(
"No DA Layer specified".to_string(),
)),
DALayerOption::None => Err(PrismError::ConfigError("No DA Layer specified".to_string())),
}
}
5 changes: 3 additions & 2 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use indexed_merkle_tree::{sha256_mod, Hash};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use borsh::{BorshDeserialize, BorshSerialize};

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
#[derive(Clone, BorshDeserialize, BorshSerialize, Serialize, Deserialize, Debug, PartialEq)]
// An [`Operation`] represents a state transition in the system.
// In a blockchain analogy, this would be the full set of our transaction types.
pub enum Operation {
Expand All @@ -24,7 +25,7 @@ pub enum Operation {
},
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
#[derive(Clone, BorshSerialize, BorshDeserialize, Serialize, Deserialize, Debug, PartialEq)]
// An [`AccountSource`] represents the source of an account. See adr-002 for more information.
pub enum AccountSource {
SignedBySequencer { signature: String },
Expand Down
196 changes: 143 additions & 53 deletions src/da/celestia.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{
cfg::CelestiaConfig,
common::Operation,
consts::CHANNEL_BUFFER_SIZE,
da::{DataAvailabilityLayer, FinalizedEpoch},
error::{DAResult, DataAvailabilityError, GeneralError},
Expand All @@ -25,24 +27,29 @@ impl TryFrom<&Blob> for FinalizedEpoch {
}
}

impl TryFrom<&Blob> for Operation {
type Error = GeneralError;

fn try_from(value: &Blob) -> Result<Self, GeneralError> {
from_slice::<Self>(&value.data)
.map_err(|e| GeneralError::DecodingError(format!("decoding blob: {}", e)))
}
}

pub struct CelestiaConnection {
pub client: celestia_rpc::Client,
pub namespace_id: Namespace,
pub snark_namespace: Namespace,
pub operation_namespace: Namespace,
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved

synctarget_tx: Arc<Sender<u64>>,
synctarget_rx: Arc<Mutex<Receiver<u64>>>,
sync_target_tx: Arc<Sender<u64>>,
sync_target_rx: Arc<Mutex<Receiver<u64>>>,
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
}

impl CelestiaConnection {
// TODO: Should take config
pub async fn new(
connection_string: &str,
auth_token: Option<&str>,
namespace_hex: &String,
) -> DAResult<Self> {
pub async fn new(config: &CelestiaConfig, auth_token: Option<&str>) -> DAResult<Self> {
let (tx, rx) = channel(CHANNEL_BUFFER_SIZE);

let client = Client::new(connection_string, auth_token)
let client = Client::new(&config.connection_string, auth_token)
.await
.map_err(|e| {
DataAvailabilityError::ConnectionError(format!(
Expand All @@ -51,38 +58,42 @@ impl CelestiaConnection {
))
})?;

let decoded_hex = match hex::decode(namespace_hex) {
Ok(hex) => hex,
Err(e) => {
return Err(DataAvailabilityError::GeneralError(
GeneralError::DecodingError(format!(
"decoding namespace '{}': {}",
namespace_hex, e
)),
))
}
let snark_namespace = create_namespace(&config.snark_namespace_id)?;
let operation_namespace = match &config.operation_namespace_id {
Some(id) => create_namespace(id)?,
None => snark_namespace.clone(),
};

let namespace_id = Namespace::new_v0(&decoded_hex).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::EncodingError(format!(
"creating namespace '{}': {}",
namespace_hex, e
)))
})?;

Ok(CelestiaConnection {
client,
namespace_id,
synctarget_tx: Arc::new(tx),
synctarget_rx: Arc::new(Mutex::new(rx)),
snark_namespace,
operation_namespace,
sync_target_tx: Arc::new(tx),
sync_target_rx: Arc::new(Mutex::new(rx)),
})
}
}

fn create_namespace(namespace_hex: &str) -> DAResult<Namespace> {
let decoded_hex = hex::decode(namespace_hex).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::DecodingError(format!(
"decoding namespace '{}': {}",
namespace_hex, e
)))
})?;

Namespace::new_v0(&decoded_hex).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::EncodingError(format!(
"creating namespace '{}': {}",
namespace_hex, e
)))
})
}
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved

#[async_trait]
impl DataAvailabilityLayer for CelestiaConnection {
async fn get_latest_height(&self) -> DAResult<u64> {
match self.synctarget_rx.lock().await.recv().await {
match self.sync_target_rx.lock().await.recv().await {
Some(height) => Ok(height),
None => Err(DataAvailabilityError::ChannelReceiveError),
}
Expand All @@ -98,9 +109,9 @@ impl DataAvailabilityLayer for CelestiaConnection {
}
}

async fn get(&self, height: u64) -> DAResult<Vec<FinalizedEpoch>> {
async fn get_snarks(&self, height: u64) -> DAResult<Vec<FinalizedEpoch>> {
trace!("searching for epoch on da layer at height {}", height);
match BlobClient::blob_get_all(&self.client, height, &[self.namespace_id]).await {
match BlobClient::blob_get_all(&self.client, height, &[self.snark_namespace]).await {
Ok(blobs) => {
let mut epochs = Vec::new();
for blob in blobs.iter() {
Expand Down Expand Up @@ -130,27 +141,106 @@ impl DataAvailabilityLayer for CelestiaConnection {
}
}

async fn submit(&self, epoch: &FinalizedEpoch) -> DAResult<u64> {
debug!("posting epoch {} to da layer", epoch.height);

let data = borsh::to_vec(&epoch).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing epoch: {}",
e
)))
})?;
let blob = Blob::new(self.namespace_id, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(e.to_string()))
})?;
trace!("blob: {:?}", &blob);
match self
.client
.blob_submit(&[blob.clone()], GasPrice::from(-1.0))
.await
{
async fn submit_snarks(&self, epochs: Vec<FinalizedEpoch>) -> DAResult<u64> {
if epochs.is_empty() {
return Err(DataAvailabilityError::GeneralError(
GeneralError::MissingArgumentError("No epochs provided for submission".to_string()),
));
}

debug!("posting {} epochs to da layer", epochs.len());

let blobs: Result<Vec<Blob>, DataAvailabilityError> = epochs
.iter()
.map(|epoch| {
let data = borsh::to_vec(epoch).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing epoch {}: {}",
epoch.height, e
)))
})?;
Blob::new(self.snark_namespace, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(
e.to_string(),
))
})
})
.collect();

let blobs = blobs?;

for (i, blob) in blobs.iter().enumerate() {
trace!("blob {}: {:?}", i, blob);
}

let last_epoch_height = epochs.last().map(|e| e.height).unwrap_or(0);

match self.client.blob_submit(&blobs, GasPrice::from(-1.0)).await {
Ok(height) => Ok(height),
Err(err) => Err(DataAvailabilityError::SubmissionError(
last_epoch_height,
err.to_string(),
)),
}
}
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved

async fn get_operations(&self, height: u64) -> DAResult<Vec<Operation>> {
trace!("searching for operations on da layer at height {}", height);
match BlobClient::blob_get_all(&self.client, height, &[self.operation_namespace]).await {
Ok(blobs) => {
let mut operations = Vec::new();
for blob in blobs.iter() {
match Operation::try_from(blob) {
Ok(operation) => operations.push(operation),
Err(_) => {
debug!(
"marshalling blob from height {} to operation failed: {:?}",
height, &blob
)
}
}
}
Ok(operations)
}
// TODO: replace this error
Err(err) => Err(DataAvailabilityError::DataRetrievalError(
height,
format!("getting operations from da layer: {}", err),
)
.into()),
}
}

async fn submit_operations(&self, operations: Vec<Operation>) -> DAResult<u64> {
debug!("posting {} operations to DA layer", operations.len());
let blobs: Result<Vec<Blob>, DataAvailabilityError> = operations
.iter()
.map(|operation| {
let data = borsh::to_vec(operation).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing operation {}: {}",
operation, e
)))
})?;
Blob::new(self.operation_namespace, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(
e.to_string(),
))
})
})
.collect();

let blobs = blobs?;

for (i, blob) in blobs.iter().enumerate() {
trace!("blob {}: {:?}", i, blob);
}

match self.client.blob_submit(&blobs, GasPrice::from(-1.0)).await {
Ok(height) => Ok(height),
Err(err) => Err(DataAvailabilityError::SubmissionError(
epoch.height,
// todo: fucking submission error is yikes, we need anyhow
0,
err.to_string(),
)),
}
Expand All @@ -166,7 +256,7 @@ impl DataAvailabilityLayer for CelestiaConnection {
))
})?;

let synctarget_buffer = self.synctarget_tx.clone();
let synctarget_buffer = self.sync_target_tx.clone();
spawn(async move {
while let Some(extended_header_result) = header_sub.next().await {
match extended_header_result {
Expand Down
Loading
Loading