Skip to content

Commit

Permalink
feat:rearchive range
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Kai <[email protected]>
  • Loading branch information
GrapeBaBa committed Aug 6, 2024
1 parent 82ea105 commit 47b7b17
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 83 deletions.
104 changes: 84 additions & 20 deletions bin/archiver/src/archiver.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
use again::RetryPolicy;
use blob_archiver_beacon::beacon_client::BeaconClient;
use blob_archiver_storage::{
BackfillProcess, BackfillProcesses, BlobData, BlobSidecars, Header, LockFile, Storage,
};
use eth2::types::Slot;
use eth2::types::{BlockHeaderData, BlockId, Hash256};
use eyre::Result;
use eth2::Error;
use eyre::{eyre, Result};
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch::Receiver;
use tokio::time::{interval, sleep};
use tracing::log::{debug, error, info, trace};
use blob_archiver_beacon::beacon_client::BeaconClient;

#[allow(dead_code)]
const LIVE_FETCH_BLOB_MAXIMUM_RETRIES: usize = 10;
#[allow(dead_code)]
const STARTUP_FETCH_BLOB_MAXIMUM_RETRIES: i32 = 3;
const STARTUP_FETCH_BLOB_MAXIMUM_RETRIES: usize = 3;
#[allow(dead_code)]
const REARCHIVE_MAXIMUM_RETRIES: i32 = 3;
const REARCHIVE_MAXIMUM_RETRIES: usize = 3;
#[allow(dead_code)]
const BACKFILL_ERROR_RETRY_INTERVAL: Duration = Duration::from_secs(5);
#[allow(dead_code)]
Expand All @@ -30,6 +32,13 @@ const OBTAIN_LOCK_RETRY_INTERVAL_SECS: u64 = 10;
#[allow(dead_code)]
static OBTAIN_LOCK_RETRY_INTERVAL: AtomicU64 = AtomicU64::new(OBTAIN_LOCK_RETRY_INTERVAL_SECS);

#[derive(Debug, Serialize, Deserialize)]
pub struct RearchiveResp {
pub from: u64,
pub to: u64,
pub error: Option<String>,
}

#[derive(Debug, PartialEq, Eq, Clone, Default, Serialize, Deserialize)]
pub struct Config {
pub poll_interval: Duration,
Expand All @@ -43,9 +52,9 @@ pub struct Archiver {
pub beacon_client: Arc<dyn BeaconClient>,

storage: Arc<dyn Storage>,
#[allow(dead_code)]

id: String,
#[allow(dead_code)]

pub config: Config,

shutdown_rx: Receiver<bool>,
Expand All @@ -70,20 +79,20 @@ impl Archiver {
&self,
block_id: BlockId,
overwrite: bool,
) -> Result<(BlockHeaderData, bool)> {
) -> Result<Option<(BlockHeaderData, bool)>> {
let header_resp_opt = self
.beacon_client
.get_beacon_headers_block_id(block_id)
.await
.map_err(|e| eyre::eyre!(e))?;

match header_resp_opt {
None => Err(eyre::eyre!("No header response")),
None => Ok(None),
Some(header) => {
let exists = self.storage.exists(&header.data.root).await;

if exists && !overwrite {
return Ok((header.data, true));
return Ok(Some((header.data, true)));
}

let blobs_resp_opt = self
Expand All @@ -103,9 +112,9 @@ impl Archiver {
);
self.storage.write_blob_data(blob_data).await?;
trace!("Persisting blobs for block: {:?}", blob_data);
return Ok((header.data, exists));
return Ok(Some((header.data, exists)));
}
Ok((header.data, exists))
Ok(Some((header.data, exists)))
}
}
}
Expand Down Expand Up @@ -228,7 +237,7 @@ impl Archiver {
&process.current_block,
&mut processes,
)
.await;
.await;
}
}
Err(e) => {
Expand All @@ -248,7 +257,7 @@ impl Archiver {
let mut curr = current.clone();
let mut already_exists = false;
let mut count = 0;
let mut res: Result<(BlockHeaderData, bool)>;
let mut res: Result<Option<(BlockHeaderData, bool)>>;
let shutdown_rx = self.shutdown_rx.clone();
info!("backfill process initiated, curr_hash: {:#?}, curr_slot: {:#?}, start_hash: {:#?},start_slot: {:#?}", curr.root, curr.header.message.slot.clone(), start.root, start.header.message.slot.clone());

Expand Down Expand Up @@ -276,7 +285,14 @@ impl Archiver {
continue;
};

let (parent, parent_exists) = res.unwrap();
let Some((parent, parent_exists)) = res.unwrap() else {
error!(
"failed to persist blobs for block, will retry, hash: {:#?}",
curr.header.message.parent_root
);
sleep(BACKFILL_ERROR_RETRY_INTERVAL).await;
continue;
};
curr = parent;
already_exists = parent_exists;

Expand Down Expand Up @@ -322,7 +338,7 @@ impl Archiver {
let mut current_block_id = BlockId::Head;

loop {
let retry_policy = RetryPolicy::exponential(Duration::from_secs(1))
let retry_policy = RetryPolicy::exponential(Duration::from_millis(250))
.with_jitter(true)
.with_max_delay(Duration::from_secs(10))
.with_max_retries(LIVE_FETCH_BLOB_MAXIMUM_RETRIES);
Expand All @@ -335,7 +351,11 @@ impl Archiver {
return;
}

let (curr, already_exists) = res.unwrap();
let Some((curr, already_exists)) = res.unwrap() else {
error!("Error fetching blobs for block");
return;
};

if start.is_none() {
start = Some(curr.clone());
}
Expand Down Expand Up @@ -376,6 +396,52 @@ impl Archiver {

#[allow(dead_code)]
async fn start(&self) {}

#[allow(dead_code)]
async fn rearchive_range(&self, from: u64, to: u64) -> RearchiveResp {
for i in from..=to {
info!("rearchiving block: {}", i);
let retry_policy = RetryPolicy::exponential(Duration::from_millis(250))
.with_jitter(true)
.with_max_delay(Duration::from_secs(10))
.with_max_retries(REARCHIVE_MAXIMUM_RETRIES);
let r = retry_policy.retry(|| self.rearchive(i)).await;

match r {
Err(e) => {
error!("Error fetching blobs for block: {:#?}", e);
return RearchiveResp {
from,
to,
error: Some(e.downcast::<Error>().unwrap().to_string()),
};
}
Ok(false) => {
info!("block not found, skipping");
}
Ok(true) => {
info!("block rearchived successfully")
}
}
}
RearchiveResp {
from,
to,
error: None,
}
}

async fn rearchive(&self, i: u64) -> Result<bool> {
let res = self
.persist_blobs_for_block(BlockId::Slot(Slot::new(i)), true)
.await;

match res {
Err(e) => Err(eyre!(e)),
Ok(None) => Ok(false),
Ok(Some(_)) => Ok(true),
}
}
}

#[cfg(test)]
Expand All @@ -385,9 +451,9 @@ mod tests {
use std::time::Duration;

use super::*;
use blob_archiver_beacon::beacon_client::BeaconClientEth2;
use blob_archiver_storage::fs::FSStorage;
use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts};
use blob_archiver_beacon::beacon_client::BeaconClientEth2;

#[tokio::test]
async fn test_persist_blobs_for_block() {
Expand All @@ -399,9 +465,7 @@ mod tests {
let storage = FSStorage::new(dir.clone()).await.unwrap();
tokio::fs::create_dir_all(dir).await.unwrap();
let (_, rx) = tokio::sync::watch::channel(false);
let beacon_client_eth2 = BeaconClientEth2 {
beacon_client,
};
let beacon_client_eth2 = BeaconClientEth2 { beacon_client };
let archiver = Archiver::new(Arc::new(beacon_client_eth2), Arc::new(storage), rx);

let block_id = BlockId::Head;
Expand Down
6 changes: 2 additions & 4 deletions bin/archiver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use std::sync::Arc;
use std::time::Duration;

use crate::archiver::Archiver;
use blob_archiver_beacon::beacon_client::BeaconClientEth2;
use blob_archiver_storage::fs::FSStorage;
use eth2::types::BlockId;
use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts};
use blob_archiver_beacon::beacon_client::BeaconClientEth2;

mod archiver;

Expand All @@ -19,9 +19,7 @@ async fn main() {
);
let storage = FSStorage::new(PathBuf::from("test_dir")).await.unwrap();
let (_, shutdown_rx) = tokio::sync::watch::channel(false);
let beacon_client_eth2 = BeaconClientEth2 {
beacon_client,
};
let beacon_client_eth2 = BeaconClientEth2 { beacon_client };
let archiver = Archiver::new(Arc::new(beacon_client_eth2), Arc::new(storage), shutdown_rx);

let block_id = BlockId::Head;
Expand Down
105 changes: 50 additions & 55 deletions crates/beacon/src/beacon_client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use std::collections::HashMap;

use async_trait::async_trait;
use eth2::types::{
BeaconBlockHeader, BlobSidecarList, BlockHeaderAndSignature, BlockHeaderData, BlockId, EthSpec,
ExecutionOptimisticFinalizedResponse, GenericResponse, Hash256, MainnetEthSpec, SignatureBytes,
Slot,
};
use eth2::{BeaconNodeHttpClient, Error};
use eth2::types::{BeaconBlockHeader, BlobSidecarList, BlockHeaderAndSignature, BlockHeaderData, BlockId, EthSpec, ExecutionOptimisticFinalizedResponse, GenericResponse, Hash256, MainnetEthSpec, SignatureBytes, Slot};

use crate::blob_test_helper::{FIVE, FOUR, new_blob_sidecars, ONE, ORIGIN_BLOCK, START_SLOT, THREE, TWO};
use crate::blob_test_helper::{
new_blob_sidecars, FIVE, FOUR, ONE, ORIGIN_BLOCK, START_SLOT, THREE, TWO,
};

#[async_trait]
pub trait BeaconClient {
Expand All @@ -27,7 +33,10 @@ pub struct BeaconClientStub<E: EthSpec> {

#[async_trait]
impl BeaconClient for BeaconClientStub<MainnetEthSpec> {
async fn get_beacon_headers_block_id(&self, block_id: BlockId) -> Result<Option<ExecutionOptimisticFinalizedResponse<BlockHeaderData>>, Error> {
async fn get_beacon_headers_block_id(
&self,
block_id: BlockId,
) -> Result<Option<ExecutionOptimisticFinalizedResponse<BlockHeaderData>>, Error> {
let header = self.headers.get(block_id.to_string().as_str());

Ok(header.map(|h| ExecutionOptimisticFinalizedResponse {
Expand All @@ -37,12 +46,14 @@ impl BeaconClient for BeaconClientStub<MainnetEthSpec> {
}))
}

async fn get_blobs(&self, block_id: BlockId, _indices: Option<&[u64]>) -> Result<Option<GenericResponse<BlobSidecarList<MainnetEthSpec>>>, Error> {
async fn get_blobs(
&self,
block_id: BlockId,
_indices: Option<&[u64]>,
) -> Result<Option<GenericResponse<BlobSidecarList<MainnetEthSpec>>>, Error> {
let blobs = self.blobs.get(block_id.to_string().as_str());

Ok(blobs.map(|b| GenericResponse {
data: b.clone(),
}))
Ok(blobs.map(|b| GenericResponse { data: b.clone() }))
}
}

Expand Down Expand Up @@ -83,22 +94,10 @@ impl Default for BeaconClientStub<MainnetEthSpec> {
ONE.to_string(),
make_header(start_slot + 1, *ONE, *ORIGIN_BLOCK),
),
(
TWO.to_string(),
make_header(start_slot + 2, *TWO, *ONE),
),
(
THREE.to_string(),
make_header(start_slot + 3, *THREE, *TWO),
),
(
FOUR.to_string(),
make_header(start_slot + 4, *FOUR, *THREE),
),
(
FIVE.to_string(),
make_header(start_slot + 5, *FIVE, *FOUR),
),
(TWO.to_string(), make_header(start_slot + 2, *TWO, *ONE)),
(THREE.to_string(), make_header(start_slot + 3, *THREE, *TWO)),
(FOUR.to_string(), make_header(start_slot + 4, *FOUR, *THREE)),
(FIVE.to_string(), make_header(start_slot + 5, *FIVE, *FOUR)),
(
"head".to_string(),
make_header(start_slot + 5, *FIVE, *FOUR),
Expand Down Expand Up @@ -134,34 +133,20 @@ impl Default for BeaconClientStub<MainnetEthSpec> {
]),

blobs: HashMap::from([
(ORIGIN_BLOCK.to_string(),
origin_blobs.clone()),
(ONE.to_string(),
one_blobs.clone()),
(TWO.to_string(),
two_blobs.clone()),
(THREE.to_string(),
three_blobs.clone()),
(FOUR.to_string(),
four_blobs.clone()),
(FIVE.to_string(),
five_blobs.clone()),
("head".to_string(),
five_blobs.clone()),
("finalized".to_string(),
three_blobs.clone()),
(start_slot.as_u64().to_string(),
origin_blobs.clone()),
((start_slot + 1).as_u64().to_string(),
one_blobs.clone()),
((start_slot + 2).as_u64().to_string(),
two_blobs.clone()),
((start_slot + 3).as_u64().to_string(),
three_blobs.clone()),
((start_slot + 4).as_u64().to_string(),
four_blobs.clone()),
((start_slot + 5).as_u64().to_string(),
five_blobs.clone()),
(ORIGIN_BLOCK.to_string(), origin_blobs.clone()),
(ONE.to_string(), one_blobs.clone()),
(TWO.to_string(), two_blobs.clone()),
(THREE.to_string(), three_blobs.clone()),
(FOUR.to_string(), four_blobs.clone()),
(FIVE.to_string(), five_blobs.clone()),
("head".to_string(), five_blobs.clone()),
("finalized".to_string(), three_blobs.clone()),
(start_slot.as_u64().to_string(), origin_blobs.clone()),
((start_slot + 1).as_u64().to_string(), one_blobs.clone()),
((start_slot + 2).as_u64().to_string(), two_blobs.clone()),
((start_slot + 3).as_u64().to_string(), three_blobs.clone()),
((start_slot + 4).as_u64().to_string(), four_blobs.clone()),
((start_slot + 5).as_u64().to_string(), five_blobs.clone()),
]),
}
}
Expand All @@ -182,11 +167,21 @@ pub struct BeaconClientEth2 {

#[async_trait]
impl BeaconClient for BeaconClientEth2 {
async fn get_beacon_headers_block_id(&self, block_id: BlockId) -> Result<Option<ExecutionOptimisticFinalizedResponse<BlockHeaderData>>, Error> {
return self.beacon_client.get_beacon_headers_block_id(block_id).await;
async fn get_beacon_headers_block_id(
&self,
block_id: BlockId,
) -> Result<Option<ExecutionOptimisticFinalizedResponse<BlockHeaderData>>, Error> {
return self
.beacon_client
.get_beacon_headers_block_id(block_id)
.await;
}

async fn get_blobs(&self, block_id: BlockId, indices: Option<&[u64]>) -> Result<Option<GenericResponse<BlobSidecarList<MainnetEthSpec>>>, Error> {
async fn get_blobs(
&self,
block_id: BlockId,
indices: Option<&[u64]>,
) -> Result<Option<GenericResponse<BlobSidecarList<MainnetEthSpec>>>, Error> {
return self.beacon_client.get_blobs(block_id, indices).await;
}
}
}
Loading

0 comments on commit 47b7b17

Please sign in to comment.