Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
aditiharini committed Nov 8, 2024
1 parent 139f55a commit 686c5cd
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
20 changes: 19 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod storage;
use clap::Parser;
use snapchain::consensus::consensus::Decision;
use futures::stream::StreamExt;
use libp2p::identity::ed25519::Keypair;
use malachite_config::TimeoutConfig;
Expand All @@ -8,6 +9,7 @@ use std::error::Error;
use std::io;
use std::net::SocketAddr;
use std::time::Duration;
use storage::store::put_block;
use tokio::signal::ctrl_c;
use tokio::sync::mpsc;
use tokio::time::sleep;
Expand All @@ -27,6 +29,7 @@ use snapchain::network::gossip::GossipEvent;
use snapchain::network::gossip::SnapchainGossip;
use snapchain::network::server::MySnapchainService;
use snapchain::proto::rpc::snapchain_service_server::SnapchainServiceServer;
use crate::storage::db::RocksDB;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand Down Expand Up @@ -75,6 +78,8 @@ async fn main() -> Result<(), Box<dyn Error>> {

let (system_tx, mut system_rx) = mpsc::channel::<SystemMessage>(100);

let (decision_tx, mut decision_rx) = mpsc::channel::<Decision<SnapchainValidatorContext>>(100);

let gossip_result = SnapchainGossip::create(keypair.clone(), addr, system_tx.clone());
if let Err(e) = gossip_result {
error!(error = ?e, "Failed to create SnapchainGossip");
Expand Down Expand Up @@ -156,13 +161,26 @@ async fn main() -> Result<(), Box<dyn Error>> {
consensus_params,
TimeoutConfig::default(),
metrics.clone(),
None,
Some(decision_tx),
gossip_tx.clone(),
shard_validator,
)
.await
.unwrap();

let rocksdb = RocksDB::new("./rocks");
tokio::spawn(async move {
while let Some((height, round, shard_hash)) = decision_rx.recv().await {
let block = shard
match put_block(&rocksdb, block) {
Err(err) => {
error!("Error writing block to db {:#?}", err)
}
Ok(()) => {}
}
}
});

// Create a timer for block creation
let mut block_interval = time::interval(Duration::from_secs(2));

Expand Down
2 changes: 1 addition & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
mod db;
pub mod db;
pub mod store;
6 changes: 2 additions & 4 deletions src/storage/store/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,8 @@ pub fn get_blocks_in_range(
get_block_page_by_prefix(db, page_options, Some(start_primary_key), stop_prefix)
}

pub fn put_block(
txn: &mut RocksDbTransactionBatch,
block: &Block,
) -> Result<(), BlockStorageError> {
pub fn put_block(db: &RocksDB, block: &Block) -> Result<(), BlockStorageError> {
let mut txn = db.txn();
let header = block
.header
.as_ref()
Expand Down

0 comments on commit 686c5cd

Please sign in to comment.