Skip to content

Commit

Permalink
progress on JMT migration: removing lifetime annotations to own TreeR…
Browse files Browse the repository at this point in the history
…eader
  • Loading branch information
distractedm1nd committed Aug 11, 2024
1 parent fbb8aa9 commit 8ee6bd4
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 60 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ bellpepper = { version = "0.4.0", default-features = false }
arecibo = { version = "0.1.1", default-features = false }
sha2 = "0.10.8"
proptest = "1.5.0"
auto_impl = "1.2.0"

[dev-dependencies]
serial_test = "3.1.1"
Expand Down
33 changes: 18 additions & 15 deletions src/node_types/sequencer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::tree::{hash, Digest, KeyDirectoryTree, Proof, RedisKDTree};
use crate::{
storage::RedisConnection,
tree::{hash, Digest, KeyDirectory, KeyDirectoryTree, Proof},
};
use anyhow::{Context, Result};
use async_trait::async_trait;
use ed25519::Signature;
Expand Down Expand Up @@ -28,7 +31,7 @@ use crate::{
webserver::{OperationInput, WebServer},
};

pub struct Sequencer<'a> {
pub struct Sequencer {
pub db: Arc<dyn Database>,
pub da: Arc<dyn DataAvailabilityLayer>,
pub ws: WebServer,
Expand All @@ -43,14 +46,14 @@ pub struct Sequencer<'a> {
// [`pending_operations`] is a buffer for operations that have not yet been
// posted to the DA layer.
pending_operations: Arc<Mutex<Vec<Operation>>>,
tree: Arc<Mutex<RedisKDTree<'a>>>,
tree: Arc<Mutex<KeyDirectoryTree<Box<dyn Database>>>>,

epoch_buffer_tx: Arc<Sender<FinalizedEpoch>>,
epoch_buffer_rx: Arc<Mutex<Receiver<FinalizedEpoch>>>,
}

#[async_trait]
impl<'a> NodeType for Sequencer<'a> {
impl NodeType for Sequencer {
async fn start(self: Arc<Self>) -> Result<()> {
self.da.start().await.context("Failed to start DA layer")?;

Expand All @@ -72,21 +75,20 @@ impl<'a> NodeType for Sequencer<'a> {

impl Sequencer {
pub fn new(
db: Arc<dyn Database>,
db: Arc<Box<dyn Database>>,
da: Arc<dyn DataAvailabilityLayer>,
cfg: Config,
key: SigningKey,
) -> Result<Sequencer> {
let (tx, rx) = channel(CHANNEL_BUFFER_SIZE);

let ws = cfg.webserver.context("Missing webserver configuration")?;

let start_height = cfg.celestia_config.unwrap_or_default().start_height;

let tree = KeyDirectoryTree::new(&db.clone());
// Create the KeyDirectory
let tree = Arc::new(Mutex::new(KeyDirectoryTree::new(db.clone())));

Ok(Sequencer {
db,
db: db.clone(),
da,
ws: WebServer::new(ws),
key,
Expand All @@ -100,13 +102,14 @@ impl Sequencer {

// sync_loop is responsible for downloading operations from the DA layer
async fn sync_loop(self: Arc<Self>) -> Result<(), tokio::task::JoinError> {
let self_clone = self.clone();
info!("starting operation sync loop");
let epoch_buffer = self.epoch_buffer_tx.clone();
spawn(async move {
let mut current_position = self.start_height;
let mut current_position = self_clone.start_height;
loop {
// target is updated when a new header is received
let target = match self.da.get_latest_height().await {
let target = match self_clone.da.get_latest_height().await {
Ok(target) => target,
Err(e) => {
error!("failed to update sync target, retrying: {:?}", e);
Expand All @@ -121,7 +124,7 @@ impl Sequencer {
debug!("updated sync target to height {}", target);
while current_position < target {
trace!("processing height: {}", current_position);
match self.da.get_operations(current_position + 1).await {
match self_clone.da.get_operations(current_position + 1).await {
Ok(operations) => {
if !operations.is_empty() {
debug!(
Expand All @@ -130,7 +133,7 @@ impl Sequencer {
);
}

let epoch = match self.finalize_epoch(operations).await {
let epoch = match self_clone.finalize_epoch(operations).await {
Ok(e) => e,
Err(e) => {
error!("sequencer_loop: finalizing epoch: {}", e);
Expand Down Expand Up @@ -452,7 +455,7 @@ mod tests {
use serial_test::serial;

// set up redis connection and flush database before each test
fn setup_db() -> RedisConnection {
fn setup_db<'a>() -> RedisConnection {
let redis_connection = RedisConnection::new(&RedisConfig::default()).unwrap();
redis_connection.flush_database().unwrap();
redis_connection
Expand All @@ -464,7 +467,7 @@ mod tests {
}

// Helper function to create a test Sequencer instance
async fn create_test_sequencer() -> Arc<Sequencer<'static>> {
async fn create_test_sequencer() -> Arc<Sequencer> {
let (da_layer, _rx, _brx) = InMemoryDataAvailabilityLayer::new(1);
let da_layer = Arc::new(da_layer);
let db = Arc::new(setup_db());
Expand Down
16 changes: 7 additions & 9 deletions src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use anyhow::{anyhow, Result};
use auto_impl::auto_impl;
use indexed_merkle_tree::Hash;
use jmt::{
storage::{LeafNode, Node, NodeBatch, NodeKey, TreeReader, TreeWriter},
KeyHash, OwnedValue, Version,
};
use mockall::{predicate::*, *};
use mockall::predicate::*;
use redis::{Client, Commands, Connection};
use std::{
self,
process::Command,
sync::{Mutex, MutexGuard},
sync::{Arc, Mutex, MutexGuard},
thread::sleep,
time::Duration,
};
Expand All @@ -27,8 +28,8 @@ pub struct RedisConnection {
connection: Mutex<Connection>,
}

#[automock]
pub trait Database: Send + Sync {
#[auto_impl(&, Box, Arc)]
pub trait Database: Send + Sync + TreeReader + TreeWriter {
fn get_hashchain(&self, key: &str) -> Result<Hashchain>;
fn update_hashchain(
&self,
Expand All @@ -39,10 +40,6 @@ pub trait Database: Send + Sync {
fn get_commitment(&self, epoch: &u64) -> Result<String>;
fn set_commitment(&self, epoch: &u64, commitment: &Hash) -> Result<()>;

// fn get_node_option(&self, node_key: &NodeKey) -> Result<Option<Node>>;
// fn get_value_option(&self, max_epoch: u64, key_hash: KeyHash) -> Result<Option<OwnedValue>>;
// fn write_node_batch(&self, node_batch: &NodeBatch) -> Result<()>;

fn get_epoch(&self) -> Result<u64>;
fn set_epoch(&self, epoch: &u64) -> Result<()>;

Expand Down Expand Up @@ -252,14 +249,15 @@ impl Database for RedisConnection {
mod tests {
use super::*;
use crate::common::Operation;
use crate::storage::Database;
use crate::tree::hash;
use serde::{Deserialize, Serialize};
use serial_test::serial;

// Helper functions

// set up redis connection and flush database before each test
fn setup() -> RedisConnection {
fn setup<'a>() -> RedisConnection<'a> {
let redis_connection = RedisConnection::new(&RedisConfig::default()).unwrap();
redis_connection.flush_database().unwrap();
redis_connection
Expand Down
Loading

0 comments on commit 8ee6bd4

Please sign in to comment.