Skip to content

Commit

Permalink
feat:finish archiver
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Kai <[email protected]>
  • Loading branch information
GrapeBaBa committed Aug 28, 2024
1 parent e584c6f commit 8963c73
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ tracing-appender = "0.2.3"
uuid = { version = "1.10.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
warp = "0.3.2"
ssz_rs = "0.9.0"
ctrlc = { version = "3.2.3", features = ["termination"] }
Empty file added Dockerfile
Empty file.
1 change: 1 addition & 0 deletions bin/archiver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tracing-subscriber.workspace = true
uuid.workspace = true
warp.workspace = true
tracing-appender.workspace = true
ctrlc.workspace = true
blob-archiver-storage = { path = "../../crates/storage" }
blob-archiver-beacon = { path = "../../crates/beacon" }

Expand Down
59 changes: 40 additions & 19 deletions bin/archiver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Duration;

use again::RetryPolicy;
use clap::Parser;
use ctrlc::set_handler;
use eth2::types::{BlockId, Hash256};
use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts};
use serde::{Deserialize, Serialize};
Expand All @@ -16,12 +17,12 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, EnvFilter};

use blob_archiver_beacon::beacon_client;
use blob_archiver_beacon::beacon_client::BeaconClientEth2;
use blob_archiver_beacon::{beacon_client, blob_test_helper};
use blob_archiver_storage::fs::FSStorage;
use blob_archiver_storage::s3::S3Config;
use blob_archiver_storage::s3::{S3Config, S3Storage};
use blob_archiver_storage::storage;
use blob_archiver_storage::storage::StorageType;
use blob_archiver_storage::storage::{Storage, StorageType};

use crate::archiver::{Archiver, Config, STARTUP_FETCH_BLOB_MAXIMUM_RETRIES};

Expand All @@ -34,27 +35,47 @@ static INIT: std::sync::Once = std::sync::Once::new();
async fn main() {
let args = CliArgs::parse();

let config = args.to_config();
println!("{:#?}", config);
init_logging(0, None, None);
let config: Config = args.to_config();
init_logging(
config.log_config.verbose,
config.log_config.log_dir.clone(),
config
.log_config
.log_rotation
.clone()
.map(|s| to_rotation(s.as_str())),
);
let beacon_client = BeaconNodeHttpClient::new(
SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(),
Timeouts::set_all(Duration::from_secs(30)),
SensitiveUrl::from_str(config.beacon_config.beacon_endpoint.as_str()).unwrap(),
Timeouts::set_all(config.beacon_config.beacon_client_timeout),
);
let storage = FSStorage::new(PathBuf::from("test_dir")).await.unwrap();
let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let beacon_client_eth2 = BeaconClientEth2 { beacon_client };
let config = Config {
poll_interval: Duration::from_secs(5),
listen_addr: "".to_string(),
origin_block: *blob_test_helper::ORIGIN_BLOCK,
beacon_config: Default::default(),
storage_config: Default::default(),
log_config: Default::default(),
let storage: Arc<Mutex<dyn Storage>> = if config.storage_config.storage_type == StorageType::FS
{
Arc::new(Mutex::new(
FSStorage::new(config.storage_config.fs_dir.clone().unwrap())
.await
.unwrap(),
))
} else {
Arc::new(Mutex::new(
S3Storage::new(config.storage_config.s3_config.clone().unwrap())
.await
.unwrap(),
))
};
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
set_handler(move || {
tracing::info!("shutting down");
shutdown_tx
.send(true)
.expect("could not send shutdown signal");
})
.expect("could not register shutdown handler");

let beacon_client_eth2 = BeaconClientEth2 { beacon_client };
let archiver = Archiver::new(
Arc::new(Mutex::new(beacon_client_eth2)),
Arc::new(Mutex::new(storage)),
storage,
config,
shutdown_rx,
);
Expand Down
8 changes: 6 additions & 2 deletions crates/storage/src/s3.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::path::Path;
use std::time::Duration;

use crate::storage::{
BackfillProcesses, BlobData, LockFile, Storage, StorageReader, StorageWriter,
};
use async_trait::async_trait;
use aws_sdk_s3::config::retry::RetryConfig;
use aws_sdk_s3::config::timeout::TimeoutConfig;
Expand All @@ -14,8 +17,6 @@ use serde::{Deserialize, Serialize};
use tracing::info;
use tracing::log::trace;

use crate::storage::{BackfillProcesses, BlobData, LockFile, StorageReader, StorageWriter};

use crate::storage::BACKFILL_LOCK;

#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -89,6 +90,9 @@ impl S3Storage {
}
}

#[async_trait]
impl Storage for S3Storage {}

#[async_trait]
impl StorageReader for S3Storage {
async fn read_blob_data(&self, hash: &Hash256) -> Result<BlobData> {
Expand Down

0 comments on commit 8963c73

Please sign in to comment.