Skip to content

Commit

Permalink
refactor(node): remove test code from production code (#876)
Browse files Browse the repository at this point in the history
Remove the test config and test setup from production code.
This causes us to create a new run_consensus binary for the test flow.
  • Loading branch information
matan-starkware committed Sep 22, 2024
1 parent e428a6b commit 602ec6f
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 218 deletions.
30 changes: 0 additions & 30 deletions config/papyrus/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,36 +99,6 @@
"privacy": "Public",
"value": 0
},
"consensus.test.#is_none": {
"description": "Flag for an optional field.",
"privacy": "TemporaryValue",
"value": true
},
"consensus.test.cache_size": {
"description": "The cache size for the test simulation.",
"privacy": "Public",
"value": 1000
},
"consensus.test.drop_probability": {
"description": "The probability of dropping a message.",
"privacy": "Public",
"value": 0.0
},
"consensus.test.invalid_probability": {
"description": "The probability of sending an invalid message.",
"privacy": "Public",
"value": 0.0
},
"consensus.test.random_seed": {
"description": "The random seed for the test simulation to ensure repeatable test results.",
"privacy": "Public",
"value": 0
},
"consensus.test.sync_topic": {
"description": "The network topic for sync messages.",
"privacy": "Public",
"value": "consensus_test_sync"
},
"consensus.timeouts.precommit_timeout": {
"description": "The timeout (seconds) for a precommit.",
"privacy": "Public",
Expand Down
6 changes: 6 additions & 0 deletions crates/papyrus_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@ normal = ["clap", "papyrus_base_layer", "reqwest", "tokio"]
[features]
default = ["rpc"]
rpc = ["papyrus_rpc"]
testing = []

[[bin]]
name = "central_source_integration_test"
path = "src/bin/central_source_integration_test.rs"
required-features = ["futures-util", "tokio-stream"]

[[bin]]
name = "run_consensus"
path = "src/bin/run_consensus.rs"
required-features = ["testing"]

[dependencies]
anyhow.workspace = true
clap = { workspace = true }
Expand Down
113 changes: 113 additions & 0 deletions crates/papyrus_node/src/bin/run_consensus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! Run a papyrus node with consensus enabled and the ability to simulate network issues for
//! consensus.
use clap::Parser;
use futures::stream::StreamExt;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_node::bin_utils::build_configs;
use papyrus_node::run::{run, PapyrusResources, PapyrusTaskHandles};
use papyrus_p2p_sync::BUFFER_SIZE;
use papyrus_storage::StorageReader;
use starknet_api::block::BlockNumber;
use tokio::task::JoinHandle;

/// Test configuration for consensus.
#[derive(Parser, Debug, Clone, PartialEq)]
pub struct TestConfig {
#[arg(long = "cache_size", help = "The cache size for the test network receiver.")]
pub cache_size: usize,
#[arg(
long = "random_seed",
help = "The random seed for the test simulation to ensure repeatable test results."
)]
pub random_seed: u64,
#[arg(long = "drop_probability", help = "The probability of dropping a message.")]
pub drop_probability: f64,
#[arg(long = "invalid_probability", help = "The probability of sending an invalid message.")]
pub invalid_probability: f64,
#[arg(long = "sync_topic", help = "The network topic for sync messages.")]
pub sync_topic: String,
}

impl Default for TestConfig {
fn default() -> Self {
Self {
cache_size: 1000,
random_seed: 0,
drop_probability: 0.0,
invalid_probability: 0.0,
sync_topic: "consensus_test_sync".to_string(),
}
}
}

fn build_consensus(
consensus_config: ConsensusConfig,
test_config: TestConfig,
storage_reader: StorageReader,
network_manager: &mut NetworkManager,
) -> anyhow::Result<Option<JoinHandle<anyhow::Result<()>>>> {
let network_channels = network_manager.register_broadcast_topic(
Topic::new(consensus_config.network_topic.clone()),
BUFFER_SIZE,
)?;
// TODO(matan): connect this to an actual channel.
let sync_channels = network_manager
.register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
consensus_config.num_validators,
Some(sync_channels.messages_to_broadcast_sender),
);
let sync_receiver =
sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| {
BlockNumber(vote.expect("Sync channel should never have errors").height)
});
let network_receiver = NetworkReceiver::new(
network_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
let broadcast_channels = BroadcastTopicChannels {
messages_to_broadcast_sender: network_channels.messages_to_broadcast_sender,
broadcasted_messages_receiver: Box::new(network_receiver),
reported_messages_sender: network_channels.reported_messages_sender,
continue_propagation_sender: network_channels.continue_propagation_sender,
};

Ok(Some(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
consensus_config.start_height,
consensus_config.validator_id,
consensus_config.consensus_delay,
consensus_config.timeouts.clone(),
broadcast_channels,
sync_receiver,
)
.await?)
})))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (test_config, node_config) = build_configs::<TestConfig>()?;

let mut resources = PapyrusResources::new(&node_config)?;

let consensus_handle = build_consensus(
node_config.consensus.clone().unwrap(),
test_config,
resources.storage_reader.clone(),
resources.maybe_network_manager.as_mut().unwrap(),
)?;
let tasks = PapyrusTaskHandles { consensus_handle, ..Default::default() };

run(node_config, resources, tasks).await
}
48 changes: 48 additions & 0 deletions crates/papyrus_node/src/bin_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::env::args;

use clap::Parser;
use papyrus_config::ConfigError;

use crate::config::NodeConfig;

// Test arguments passed on the command line are prefixed with `test.<ARG_NAME>`.
const TEST_ARG_PREFIX: &str = "--test.";

/// Split the elements of `input_args` into 2 groups:
/// 1. Those prefixed with "--test."
/// 2. Other.
///
/// Presumes input is: program_name (--flag_name value)*
pub fn split_args(input_args: Vec<String>) -> (Vec<String>, Vec<String>) {
input_args[1..].chunks(2).fold(
(vec![input_args[0].clone()], vec![input_args[0].clone()]),
|(mut matching_args, mut mismatched_args), input_arg| {
let (name, value) = (&input_arg[0], &input_arg[1]);
// String leading `--` for comparison.
if &name[..TEST_ARG_PREFIX.len()] == TEST_ARG_PREFIX {
matching_args.push(format!("--{}", &name[TEST_ARG_PREFIX.len()..]));
matching_args.push(value.clone());
} else {
mismatched_args.push(name.clone());
mismatched_args.push(value.clone());
}
(matching_args, mismatched_args)
},
)
}

/// Build both the node and test configs from the command line arguments.
pub fn build_configs<T: Parser + Default>() -> Result<(T, NodeConfig), ConfigError> {
let input_args = args().collect::<Vec<_>>();
let (test_input_args, node_input_args) = split_args(input_args);

let mut test_config = T::default();
test_config.update_from(test_input_args.iter());

let node_config = NodeConfig::load_and_process(node_input_args);
if let Err(ConfigError::CommandInput(clap_err)) = node_config {
clap_err.exit();
}
let node_config = node_config?;
Ok((test_config, node_config))
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,44 +115,6 @@ expression: dumped_default_config
},
"privacy": "Public"
},
"consensus.test.#is_none": {
"description": "Flag for an optional field.",
"value": true,
"privacy": "TemporaryValue"
},
"consensus.test.cache_size": {
"description": "The cache size for the test simulation.",
"value": {
"$serde_json::private::Number": "1000"
},
"privacy": "Public"
},
"consensus.test.drop_probability": {
"description": "The probability of dropping a message.",
"value": {
"$serde_json::private::Number": "0.0"
},
"privacy": "Public"
},
"consensus.test.invalid_probability": {
"description": "The probability of sending an invalid message.",
"value": {
"$serde_json::private::Number": "0.0"
},
"privacy": "Public"
},
"consensus.test.random_seed": {
"description": "The random seed for the test simulation to ensure repeatable test results.",
"value": {
"$serde_json::private::Number": "0"
},
"privacy": "Public"
},
"consensus.test.sync_topic": {
"description": "The network topic for sync messages.",
"value": "consensus_test_sync",
"privacy": "Public"
},
"consensus.timeouts.precommit_timeout": {
"description": "The timeout (seconds) for a precommit.",
"value": {
Expand Down
2 changes: 2 additions & 0 deletions crates/papyrus_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// within this crate
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

#[cfg(any(test, feature = "testing"))]
pub mod bin_utils;
#[allow(unused_imports)]
pub mod config;
#[cfg(test)]
Expand Down
83 changes: 20 additions & 63 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use futures::stream::StreamExt;
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_common::metrics::COLLECT_PROFILING_METRICS;
use papyrus_common::pending_classes::PendingClasses;
use papyrus_common::BlockHashAndNumber;
use papyrus_config::presentation::get_config_presentation;
use papyrus_config::validators::config_validate;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_network::network_manager::NetworkManager;
use papyrus_network::{network_manager, NetworkConfig};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
Expand All @@ -31,7 +29,7 @@ use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerS
use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig};
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{StateSync, SyncConfig};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::block::BlockHash;
use starknet_api::felt;
use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated};
use starknet_client::reader::PendingData;
Expand Down Expand Up @@ -188,65 +186,24 @@ fn spawn_consensus(

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
// TODO(matan): connect this to an actual channel.
if let Some(test_config) = config.test.as_ref() {
let sync_channels = network_manager
.register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
config.num_validators,
Some(sync_channels.messages_to_broadcast_sender),
);
let network_receiver = NetworkReceiver::new(
network_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
let broadcast_channels = BroadcastTopicChannels {
messages_to_broadcast_sender: network_channels.messages_to_broadcast_sender,
broadcasted_messages_receiver: Box::new(network_receiver),
reported_messages_sender: network_channels.reported_messages_sender,
continue_propagation_sender: network_channels.continue_propagation_sender,
};
let sync_receiver =
sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| {
BlockNumber(vote.expect("Sync channel should never have errors").height)
});
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
broadcast_channels,
sync_receiver,
)
.await?)
}))
} else {
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
config.num_validators,
None,
);
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
network_channels,
futures::stream::pending(),
)
.await?)
}))
}
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
config.num_validators,
None,
);
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
network_channels,
futures::stream::pending(),
)
.await?)
}))
}

async fn run_sync(
Expand Down
Loading

0 comments on commit 602ec6f

Please sign in to comment.