Skip to content

Commit

Permalink
refactor(consensus): handle stagnation when consensus didn't start
Browse files Browse the repository at this point in the history
  • Loading branch information
asmaastarkware committed Sep 5, 2024
1 parent 9745223 commit 5f2a0ff
Showing 1 changed file with 67 additions and 64 deletions.
131 changes: 67 additions & 64 deletions crates/sequencing/papyrus_consensus/src/bin/run_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,61 @@ impl Node {
}
}

#[derive(Parser)]
#[command(name = "Papyrus CLI")]
struct Cli {
#[command(flatten)]
papyrus_args: PapyrusArgs,
#[command(flatten)]
run_consensus_args: RunConsensusArgs,
}

#[derive(Parser)]
// Args passed to the test script that are forwarded to the node.
struct PapyrusArgs {
#[arg(long = "base_layer_node_url")]
base_layer_node_url: String,
#[arg(long = "num_validators")]
num_validators: usize,
#[arg(long = "db_dir", help = "Directory with existing DBs that this simulation can reuse.")]
db_dir: Option<String>,
#[arg(long = "proposal_timeout", help = "The timeout (seconds) for a proposal.")]
proposal_timeout: Option<f64>,
#[arg(long = "prevote_timeout", help = "The timeout (seconds) for a prevote.")]
prevote_timeout: Option<f64>,
#[arg(long = "precommit_timeout", help = "The timeout (seconds) for a precommit.")]
precommit_timeout: Option<f64>,
#[arg(long = "cache_size", help = "Cache size for the test simulation.")]
cache_size: Option<usize>,
#[arg(long = "random_seed", help = "Random seed for test simulation.")]
random_seed: Option<u64>,
#[arg(
long = "drop_probability",
help = "Probability of dropping a message for test simulation."
)]
drop_probability: Option<f64>,
#[arg(
long = "invalid_probability",
help = "Probability of sending an invalid message for test simulation."
)]
invalid_probability: Option<f64>,
}

#[derive(Parser)]
// Args passed to the script that are not forwarded to the node.
struct RunConsensusArgs {
#[arg(
long = "stagnation_threshold",
help = "Time in seconds to check for height stagnation.",
default_value = "60", value_parser = parse_duration
)]
stagnation_threshold: Duration,
#[arg(long = "duration", help = "Maximum test duration in seconds.",
default_value = "123456789123456789",
value_parser = parse_duration)]
max_test_duration: Duration,
}

struct LockDir {
file: File,
}
Expand All @@ -115,6 +170,11 @@ impl Drop for LockDir {
}
}

fn parse_duration(s: &str) -> Result<Duration, std::num::ParseIntError> {
let secs = u64::from_str(s)?;
Ok(Duration::from_secs(secs))
}

fn find_free_port() -> u16 {
// The socket is automatically closed when the function exits.
// The port may still be available when accessed, but this is not guaranteed.
Expand All @@ -141,10 +201,13 @@ async fn monitor_simulation(
"Node: {}, height: {:?}, sync_count: {:?}",
node.validator_id, height, node.sync_count
);
if let (Some(_), Some(last_update)) = (height, last_update) {
if last_update.elapsed() > stagnation_timeout {
stagnated_nodes.push(node.validator_id);
}
// height is None when consensus has not been started yet.
let elapsed = match height {
Some(_) => last_update.expect("Must be set if height is set").elapsed(),
None => start_time.elapsed(),
};
if elapsed > stagnation_timeout {
stagnated_nodes.push(node.validator_id);
}
}

Expand Down Expand Up @@ -267,66 +330,6 @@ async fn build_all_nodes(data_dir: &str, logs_dir: &str, papyrus_args: &PapyrusA
nodes
}

#[derive(Parser)]
#[command(name = "Papyrus CLI")]
struct Cli {
#[command(flatten)]
papyrus_args: PapyrusArgs,
#[command(flatten)]
run_consensus_args: RunConsensusArgs,
}

#[derive(Parser)]
// Args passed to the test script that are forwarded to the node.
struct PapyrusArgs {
#[arg(long = "base_layer_node_url")]
base_layer_node_url: String,
#[arg(long = "num_validators")]
num_validators: usize,
#[arg(long = "db_dir", help = "Directory with existing DBs that this simulation can reuse.")]
db_dir: Option<String>,
#[arg(long = "proposal_timeout", help = "The timeout (seconds) for a proposal.")]
proposal_timeout: Option<f64>,
#[arg(long = "prevote_timeout", help = "The timeout (seconds) for a prevote.")]
prevote_timeout: Option<f64>,
#[arg(long = "precommit_timeout", help = "The timeout (seconds) for a precommit.")]
precommit_timeout: Option<f64>,
#[arg(long = "cache_size", help = "Cache size for the test simulation.")]
cache_size: Option<usize>,
#[arg(long = "random_seed", help = "Random seed for test simulation.")]
random_seed: Option<u64>,
#[arg(
long = "drop_probability",
help = "Probability of dropping a message for test simulation."
)]
drop_probability: Option<f64>,
#[arg(
long = "invalid_probability",
help = "Probability of sending an invalid message for test simulation."
)]
invalid_probability: Option<f64>,
}

#[derive(Parser)]
// Args passed to the script that are not forwarded to the node.
struct RunConsensusArgs {
#[arg(
long = "stagnation_threshold",
help = "Time in seconds to check for height stagnation.",
default_value = "60", value_parser = parse_duration
)]
stagnation_threshold: Duration,
#[arg(long = "duration", help = "Maximum test duration in seconds.",
default_value = "123456789123456789",
value_parser = parse_duration)]
max_test_duration: Duration,
}

fn parse_duration(s: &str) -> Result<Duration, std::num::ParseIntError> {
let secs = u64::from_str(s)?;
Ok(Duration::from_secs(secs))
}

#[tokio::main]
async fn main() {
let Cli { papyrus_args, run_consensus_args } = Cli::parse();
Expand Down

0 comments on commit 5f2a0ff

Please sign in to comment.