Skip to content

Commit

Permalink
refactor: error types (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd authored Jul 10, 2024
1 parent b5f801d commit c53461a
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 312 deletions.
6 changes: 2 additions & 4 deletions src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use dirs::home_dir;
use serde::{Deserialize, Serialize};
use std::{fs, path::Path, sync::Arc};

use crate::da::{CelestiaConnection, LocalDataAvailabilityLayer};

use crate::da::DataAvailabilityLayer;
use crate::da::{CelestiaConnection, DataAvailabilityLayer, LocalDataAvailabilityLayer};

#[derive(Clone, Debug, Subcommand, Deserialize)]
pub enum Commands {
Expand Down Expand Up @@ -218,7 +216,7 @@ pub async fn initialize_da_layer(config: &Config) -> Arc<dyn DataAvailabilityLay
{
Ok(da) => Arc::new(da) as Arc<dyn DataAvailabilityLayer + 'static>,
Err(e) => {
panic!("Failed to connect to Celestia: {}", e);
panic!("connecting to celestia: {}", e);
}
}
}
Expand Down
157 changes: 79 additions & 78 deletions src/da.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
consts::CHANNEL_BUFFER_SIZE,
error::{DataAvailabilityError, DatabaseError, DeimosError, GeneralError},
error::{DAResult, DataAvailabilityError, DeimosResult, GeneralError},
utils::Signable,
zk_snark::{Bls12Proof, VerifyingKey},
};
Expand Down Expand Up @@ -42,54 +42,45 @@ impl TryFrom<&Blob> for EpochJson {
fn try_from(value: &Blob) -> Result<Self, GeneralError> {
// convert blob data to utf8 string
let data_str = String::from_utf8(value.data.clone()).map_err(|e| {
GeneralError::ParsingError(format!("Could not convert blob data to utf8 string: {}", e))
GeneralError::EncodingError(format!("encoding blob data to utf8 string: {}", e))
})?;

// convert utf8 string to EpochJson
serde_json::from_str(&data_str)
.map_err(|e| GeneralError::ParsingError(format!("Could not parse epoch json: {}", e)))
.map_err(|e| GeneralError::DecodingError(format!("epoch json: {}", e)))
}
}

impl Signable for EpochJson {
fn get_signature(&self) -> Result<Signature, DeimosError> {
fn get_signature(&self) -> DeimosResult<Signature> {
match &self.signature {
Some(signature) => {
let signature = Signature::from_str(signature).map_err(|_| {
DeimosError::General(GeneralError::ParsingError(
"Cannot parse signature".to_string(),
))
})?;
Ok(signature)
}
None => Err(DeimosError::General(GeneralError::MissingArgumentError)),
Some(signature) => Signature::from_str(signature)
.map_err(|e| GeneralError::ParsingError(format!("signature: {}", e)).into()),
None => Err(GeneralError::MissingArgumentError("signature".to_string()).into()),
}
}

fn get_content_to_sign(&self) -> Result<String, DeimosError> {
fn get_content_to_sign(&self) -> DeimosResult<String> {
let mut copy = self.clone();
copy.signature = None;
serde_json::to_string(&copy).map_err(|_| {
DeimosError::General(GeneralError::ParsingError("Cannot serialize".to_string()))
})
serde_json::to_string(&copy).map_err(|e| GeneralError::EncodingError(e.to_string()).into())
}

fn get_public_key(&self) -> Result<String, DeimosError> {
fn get_public_key(&self) -> DeimosResult<String> {
//TODO(@distractedm1nd): the below comment isn't good enough of an argument to not return the public key, it should be fixed

// for epoch json the public key to verify is the one from the sequencer which should be already be public and known from every light client
// so if we use this function there should be an error
Err(DeimosError::Database(DatabaseError::NotFoundError(
"Public key not found".to_string(),
)))
Err(GeneralError::MissingArgumentError("public key".to_string()).into())
}
}

#[async_trait]
pub trait DataAvailabilityLayer: Send + Sync {
async fn get_message(&self) -> Result<u64, DataAvailabilityError>;
async fn initialize_sync_target(&self) -> Result<u64, DataAvailabilityError>;
async fn get(&self, height: u64) -> Result<Vec<EpochJson>, DataAvailabilityError>;
async fn submit(&self, epoch: &EpochJson) -> Result<u64, DataAvailabilityError>;
async fn start(&self) -> Result<(), DataAvailabilityError>;
async fn get_message(&self) -> DAResult<u64>;
async fn initialize_sync_target(&self) -> DAResult<u64>;
async fn get(&self, height: u64) -> DAResult<Vec<EpochJson>>;
async fn submit(&self, epoch: &EpochJson) -> DAResult<u64>;
async fn start(&self) -> DAResult<()>;
}

pub struct CelestiaConnection {
Expand All @@ -105,23 +96,23 @@ pub struct NoopDataAvailabilityLayer {}

#[async_trait]
impl DataAvailabilityLayer for NoopDataAvailabilityLayer {
async fn get_message(&self) -> Result<u64, DataAvailabilityError> {
async fn get_message(&self) -> DAResult<u64> {
Ok(0)
}

async fn initialize_sync_target(&self) -> Result<u64, DataAvailabilityError> {
async fn initialize_sync_target(&self) -> DAResult<u64> {
Ok(0)
}

async fn get(&self, _: u64) -> Result<Vec<EpochJson>, DataAvailabilityError> {
async fn get(&self, _: u64) -> DAResult<Vec<EpochJson>> {
Ok(vec![])
}

async fn submit(&self, _: &EpochJson) -> Result<u64, DataAvailabilityError> {
async fn submit(&self, _: &EpochJson) -> DAResult<u64> {
Ok(0)
}

async fn start(&self) -> Result<(), DataAvailabilityError> {
async fn start(&self) -> DAResult<()> {
Ok(())
}
}
Expand All @@ -139,30 +130,35 @@ impl CelestiaConnection {
connection_string: &String,
auth_token: Option<&str>,
namespace_hex: &String,
) -> Result<Self, DataAvailabilityError> {
) -> DAResult<Self> {
let (tx, rx) = channel(CHANNEL_BUFFER_SIZE);

let client = Client::new(&connection_string, auth_token)
.await
.map_err(|e| {
DataAvailabilityError::InitializationError(format!(
"Websocket initialization failed: {}",
DataAvailabilityError::ConnectionError(format!(
"websocket initialization failed: {}",
e
))
})?;

let decoded_hex = match hex::decode(namespace_hex) {
Ok(hex) => hex,
Err(e) => {
return Err(DataAvailabilityError::InitializationError(format!(
"Hex decoding failed: {}",
e
)))
return Err(DataAvailabilityError::GeneralError(
GeneralError::DecodingError(format!(
"decoding namespace '{}': {}",
namespace_hex, e
)),
))
}
};

let namespace_id = Namespace::new_v0(&decoded_hex).map_err(|e| {
DataAvailabilityError::InitializationError(format!("Namespace creation failed: {}", e))
DataAvailabilityError::GeneralError(GeneralError::EncodingError(format!(
"creating namespace '{}': {}",
namespace_hex, e
)))
})?;

Ok(CelestiaConnection {
Expand All @@ -176,80 +172,87 @@ impl CelestiaConnection {

#[async_trait]
impl DataAvailabilityLayer for CelestiaConnection {
async fn get_message(&self) -> Result<u64, DataAvailabilityError> {
async fn get_message(&self) -> DAResult<u64> {
match self.synctarget_rx.lock().await.recv().await {
Some(height) => Ok(height),
None => Err(DataAvailabilityError::ChannelReceiveError),
}
}

async fn initialize_sync_target(&self) -> Result<u64, DataAvailabilityError> {
async fn initialize_sync_target(&self) -> DAResult<u64> {
match HeaderClient::header_network_head(&self.client).await {
Ok(extended_header) => Ok(extended_header.header.height.value()),
Err(err) => Err(DataAvailabilityError::NetworkError(format!(
"Could not get network head from DA layer: {}",
"getting network head from da layer: {}",
err
))),
}
}

async fn get(&self, height: u64) -> Result<Vec<EpochJson>, DataAvailabilityError> {
debug! {"Getting epoch {} from DA layer", height};
async fn get(&self, height: u64) -> DAResult<Vec<EpochJson>> {
trace!("searching for epoch on da layer at height {}", height);
match BlobClient::blob_get_all(&self.client, height, &[self.namespace_id]).await {
Ok(blobs) => {
let mut epochs = Vec::new();
for blob in blobs.iter() {
match EpochJson::try_from(blob) {
Ok(epoch_json) => epochs.push(epoch_json),
Err(_) => {
DataAvailabilityError::DataRetrievalError(
height,
"Could not parse epoch json for blob".to_string(),
);
DataAvailabilityError::GeneralError(GeneralError::ParsingError(
format!(
"marshalling blob from height {} to epoch json: {}",
height,
serde_json::to_string(&blob).unwrap()
),
));
}
}
}
Ok(epochs)
}
Err(err) => Err(DataAvailabilityError::DataRetrievalError(
height,
format!("Could not get epoch from DA layer: {}", err),
format!("getting epoch from da layer: {}", err),
)),
}
}

async fn submit(&self, epoch: &EpochJson) -> Result<u64, DataAvailabilityError> {
debug! {"Posting epoch {} to DA layer", epoch.height};
async fn submit(&self, epoch: &EpochJson) -> DAResult<u64> {
debug!("posting epoch {} to da layer", epoch.height);

let data = serde_json::to_string(&epoch).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"Could not serialize epoch json: {}",
"serializing epoch json: {}",
e
)))
})?;
let blob = Blob::new(self.namespace_id.clone(), data.into_bytes())
.map_err(|_| DataAvailabilityError::GeneralError(GeneralError::BlobCreationError))?;
debug!("submitted blob with commitment {:?}", serde_json::to_string(&blob.clone().commitment).unwrap());
let blob = Blob::new(self.namespace_id.clone(), data.into_bytes()).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(e.to_string()))
})?;
debug!(
"submitted blob with commitment {:?}",
serde_json::to_string(&blob.clone().commitment).unwrap()
);
trace!("blob: {:?}", serde_json::to_string(&blob).unwrap());
match self
.client
.blob_submit(&[blob.clone()], GasPrice::from(-1.0))
.await
{
Ok(height) => Ok(height),
Err(err) => Err(DataAvailabilityError::NetworkError(format!(
"Could not submit epoch to DA layer: {}",
err
))),
Err(err) => Err(DataAvailabilityError::SubmissionError(
epoch.height,
err.to_string(),
)),
}
}

async fn start(&self) -> Result<(), DataAvailabilityError> {
async fn start(&self) -> DAResult<()> {
let mut header_sub = HeaderClient::header_subscribe(&self.client)
.await
.map_err(|e| {
DataAvailabilityError::NetworkError(format!(
"Could not subscribe to header updates from DA layer: {}",
"subscribing to headers from da layer: {}",
e
))
})?;
Expand All @@ -265,20 +268,18 @@ impl DataAvailabilityLayer for CelestiaConnection {
debug!("sent sync target update for height {}", height);
}
Err(_) => {
DataAvailabilityError::SyncTargetError(
"sending".to_string(),
format!(
"Failed to send sync target update message for height {}",
height
),
);
DataAvailabilityError::SyncTargetError(format!(
"sending sync target update message for height {}",
height
));
}
}
}
Err(_) => {
DataAvailabilityError::NetworkError(
"Could not get header from DA layer".to_string(),
);
Err(e) => {
DataAvailabilityError::NetworkError(format!(
"retrieving header from da layer: {}",
e
));
}
}
}
Expand All @@ -295,15 +296,15 @@ impl LocalDataAvailabilityLayer {

#[async_trait]
impl DataAvailabilityLayer for LocalDataAvailabilityLayer {
async fn get_message(&self) -> Result<u64, DataAvailabilityError> {
async fn get_message(&self) -> DAResult<u64> {
Ok(100)
}

async fn initialize_sync_target(&self) -> Result<u64, DataAvailabilityError> {
async fn initialize_sync_target(&self) -> DAResult<u64> {
Ok(0) // header starts always at zero in test cases
}

async fn get(&self, height: u64) -> Result<Vec<EpochJson>, DataAvailabilityError> {
async fn get(&self, height: u64) -> DAResult<Vec<EpochJson>> {
let mut file = File::open("data.json").expect("Unable to open file");
let mut contents = String::new();
file.lock_exclusive().expect("Unable to lock file");
Expand All @@ -326,7 +327,7 @@ impl DataAvailabilityLayer for LocalDataAvailabilityLayer {
}
}

async fn submit(&self, epoch: &EpochJson) -> Result<u64, DataAvailabilityError> {
async fn submit(&self, epoch: &EpochJson) -> DAResult<u64> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
Expand Down Expand Up @@ -369,7 +370,7 @@ impl DataAvailabilityLayer for LocalDataAvailabilityLayer {
Ok(epoch.height)
}

async fn start(&self) -> Result<(), DataAvailabilityError> {
async fn start(&self) -> DAResult<()> {
Ok(())
}
}
Expand Down
Loading

0 comments on commit c53461a

Please sign in to comment.