Skip to content

Commit

Permalink
removing epoch_duration and main_loop
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Jul 30, 2024
1 parent 178fa5a commit 5d7d2f2
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 69 deletions.
12 changes: 0 additions & 12 deletions src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ pub struct CommandLineArgs {
#[arg(short = 's', long)]
celestia_start_height: Option<u64>,

/// Duration between epochs in seconds
#[arg(short, long)]
epoch_time: Option<u64>,

/// IP address for the webserver to listen on
#[arg(long)]
host: Option<String>,
Expand All @@ -73,7 +69,6 @@ pub struct Config {
pub celestia_config: Option<CelestiaConfig>,
pub da_layer: Option<DALayerOption>,
pub redis_config: Option<RedisConfig>,
pub epoch_time: Option<u64>,
pub verifying_key: Option<String>,
}

Expand Down Expand Up @@ -139,7 +134,6 @@ impl Default for Config {
da_layer: Some(DALayerOption::default()),
celestia_config: Some(CelestiaConfig::default()),
redis_config: Some(RedisConfig::default()),
epoch_time: Some(60),
verifying_key: None,
}
}
Expand Down Expand Up @@ -218,7 +212,6 @@ fn merge_configs(loaded: Config, default: Config) -> Config {
redis_config: loaded.redis_config.or(default.redis_config),
celestia_config: loaded.celestia_config.or(default.celestia_config),
da_layer: loaded.da_layer.or(default.da_layer),
epoch_time: loaded.epoch_time.or(default.epoch_time),
verifying_key: loaded.verifying_key.or(default.verifying_key),
}
}
Expand Down Expand Up @@ -282,11 +275,6 @@ fn apply_command_line_args(config: Config, args: CommandLineArgs) -> Config {
})),
}),
da_layer: config.da_layer,
epoch_time: Some(args.epoch_time.unwrap_or_else(|| {
config
.epoch_time
.unwrap_or_else(|| Config::default().epoch_time.unwrap())
})),
verifying_key: args.verifying_key.or(config.verifying_key),
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/da/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl DataAvailabilityLayer for LocalDataAvailabilityLayer {
async fn get_operations(&self, height: u64) -> DAResult<Vec<Operation>> {
let data = self.read_file(false).await?;

if let Some(operations) = data.get(&height.to_string()) {
if let Some(operations) = data.get(height.to_string()) {
let operations_hex = operations.as_str().ok_or_else(|| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(
"Operations value is not a string".to_string(),
Expand Down Expand Up @@ -229,7 +229,7 @@ impl DataAvailabilityLayer for LocalDataAvailabilityLayer {
async fn get_snarks(&self, height: u64) -> DAResult<Vec<FinalizedEpoch>> {
let data = self.read_file(true).await?;

if let Some(epoch) = data.get(&height.to_string()) {
if let Some(epoch) = data.get(height.to_string()) {
let epoch_hex = epoch.as_str().ok_or_else(|| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(
"Epoch value is not a string".to_string(),
Expand Down
59 changes: 4 additions & 55 deletions src/node_types/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::{
Mutex,
},
task::spawn,
time::{self, interval, Duration},
time::interval,
};

use crate::{
Expand All @@ -34,7 +34,6 @@ pub struct Sequencer {
pub db: Arc<dyn Database>,
pub da: Arc<dyn DataAvailabilityLayer>,

pub epoch_duration: Duration,
pub ws: WebServer,
// [`key`] is the [`SigningKey`] used to sign [`Operation::CreateAccount`]s
// (specifically, [`AccountSource::SignedBySequencer`]), as well as [`FinalizedEpoch`]s.
Expand All @@ -58,14 +57,12 @@ impl NodeType for Sequencer {

let sync_loop = self.clone().sync_loop();
let da_loop = self.clone().da_loop();
let main_loop = self.clone().main_loop();

let ws_self = self.clone();
let ws = ws_self.ws.start(self.clone());

tokio::select! {
_ = sync_loop => Ok(()),
_ = main_loop => Ok(()),
_ = da_loop => Ok(()),
_ = ws => Ok(()),
}
Expand All @@ -81,13 +78,6 @@ impl Sequencer {
) -> PrismResult<Sequencer> {
let (tx, rx) = channel(CHANNEL_BUFFER_SIZE);

let epoch_duration = match cfg.epoch_time {
Some(epoch_time) => epoch_time,
None => {
return Err(GeneralError::MissingArgumentError("epoch_time".to_string()).into());
}
};

let ws = match cfg.webserver {
Some(webserver) => WebServer::new(webserver),
None => {
Expand All @@ -100,7 +90,6 @@ impl Sequencer {
Ok(Sequencer {
db,
da,
epoch_duration: Duration::from_secs(epoch_duration),
ws,
key,
tree: Arc::new(Mutex::new(IndexedMerkleTree::new_with_size(1024).unwrap())),
Expand All @@ -110,44 +99,6 @@ impl Sequencer {
})
}

// main_loop is responsible for finalizing epochs and writing them to the buffer.
async fn main_loop(self: Arc<Self>) -> Result<(), tokio::task::JoinError> {
spawn(async move {
let mut ticker = time::interval(self.epoch_duration);
loop {
ticker.tick().await;
let pending = self.pending_operations.lock().await;
let epoch = match self.finalize_epoch(pending.clone()).await {
Ok(epoch) => epoch,
Err(e) => {
error!("exiting loop due to finalize_epoch error: {}", e);
return;
}
};

let mut success = false;
// TODO: make retry count a constant
for _ in 0..5 {
match self.epoch_buffer_tx.send(epoch.clone()).await {
Ok(_) => {
success = true;
break;
}
Err(e) => {
error!("Failed to send epoch to buffer: {}", e);
continue;
}
};
}
if !success {
error!("Failed to send epoch to buffer after 5 retries");
return;
}
}
})
.await
}

// sync_loop is responsible for downloading operations from the DA layer
async fn sync_loop(self: Arc<Self>) -> Result<(), tokio::task::JoinError> {
info!("starting operation sync loop");
Expand Down Expand Up @@ -480,11 +431,9 @@ mod tests {
let da_layer = Arc::new(LocalDataAvailabilityLayer::new());
let db = Arc::new(setup_db());
let signing_key = create_signing_key();
let cfg = Config {
epoch_time: Some(1),
..Default::default()
};
Arc::new(Sequencer::new(db.clone(), da_layer, cfg, signing_key.clone()).unwrap())
Arc::new(
Sequencer::new(db.clone(), da_layer, Config::default(), signing_key.clone()).unwrap(),
)
}

fn create_new_account_operation(id: String, value: String, key: SigningKey) -> OperationInput {
Expand Down

0 comments on commit 5d7d2f2

Please sign in to comment.