Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into gprusak-snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Mar 15, 2024
2 parents 28e456f + 2ec7e40 commit 2afdb72
Show file tree
Hide file tree
Showing 13 changed files with 255 additions and 208 deletions.
12 changes: 0 additions & 12 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ test-casing = "0.1.0"
thiserror = "1.0.40"
time = "0.3.23"
tokio = { version = "1.34.0", features = ["full"] }
tokio-retry = "0.3.0"
tracing = { version = "0.1.37", features = ["attributes"] }
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] }
kube = { version = "0.88.1", features = ["runtime", "derive"] }
Expand Down
2 changes: 1 addition & 1 deletion node/actors/bft/src/replica/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ async fn leader_prepare_reproposal_invalid_block() {
.unwrap();
}

/// Check that replica provides expecte high_vote and high_qc after finalizing a block.
/// Check that replica provides expected high_vote and high_qc after finalizing a block.
#[tokio::test]
async fn leader_commit_sanity_yield_replica_prepare() {
zksync_concurrency::testonly::abort_on_panic();
Expand Down
4 changes: 2 additions & 2 deletions node/actors/network/src/mux/reusable_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub(crate) struct StreamQueue {
}

impl StreamQueue {
/// Constructs a new StreamQueue with the specificied number of reusable streams.
/// Constructs a new StreamQueue with the specified number of reusable streams.
/// During multiplexer handshake, peers exchange information about
/// how many reusable streams they support per capability.
pub(crate) fn new(max_streams: u32) -> Arc<Self> {
Expand Down Expand Up @@ -289,7 +289,7 @@ impl ReusableStream {
read_receiver = new_read_receiver;
let (write_lock, new_write_receiver) = sync::ExclusiveLock::new(write);
write_receiver = new_write_receiver;
// Sending may fail because the requestor is not interested in the stream any more.
// Sending may fail because the requester is not interested in the stream any more.
// In this case we just close the transient stream immediately.
let _ = reservation.send(Stream {
read: ReadStream(read_lock),
Expand Down
2 changes: 1 addition & 1 deletion node/libs/concurrency/src/ctx/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{
};
use tokio::sync::watch;

// Instant doesn't have a deterministic contructor.
// Instant doesn't have a deterministic constructor.
// However since Instant is not convertible to an unix timestamp,
// we can snapshot Instant::now() once and treat it as a constant.
// All observable effects will be then deterministic.
Expand Down
2 changes: 1 addition & 1 deletion node/libs/concurrency/src/limiter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl Limiter {
}

/// Acquires reservation for `permits` permits from the rate limiter.
/// It blocks until enought permits are available.
/// It blocks until enough permits are available.
/// It is fair in a sense that in case a later acquire() call is
/// executed, but for a smaller number of permits, it has to wait
/// until the previous call (for a larger number of permits) completes.
Expand Down
2 changes: 1 addition & 1 deletion node/libs/concurrency/src/limiter/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn immediate_permit_consumption() {
}

#[tokio::test]
async fn inifinite_refresh_rate() {
async fn infinite_refresh_rate() {
testonly::abort_on_panic();
let clock = &ctx::ManualClock::new();
let ctx = &ctx::test_root(clock);
Expand Down
4 changes: 2 additions & 2 deletions node/libs/concurrency/src/scope/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
//! Task can be either async or blocking:
//! * Async tasks are Futures executed via `Task::run`. They MUSN'T call blocking operations,
//! because they are executed on a shared thread pool.
//! * Blocking tasks are `FnOnce()` functions/closures exeucted via `Task::run_blocking`. Blocking
//! * Blocking tasks are `FnOnce()` functions/closures executed via `Task::run_blocking`. Blocking
//! task MUST be executed on a dedicated thread rather than a shared thread pool.
//! * All functions which perform blocking calls should be documented as blocking.
//! If a function has multiple versions and the async version is called `<f>`, then the sync
Expand Down Expand Up @@ -117,7 +117,7 @@ impl<E: 'static + Send> Task<E> {
}

/// Runs an sync blocking task in the scope. MUST be executed on a dedicated thread.
/// See `Task::run` for behavior. See module docs for desciption of blocking tasks.
/// See `Task::run` for behavior. See module docs for description of blocking tasks.
pub(super) fn run_blocking<T>(self, f: impl FnOnce() -> Result<T, E>) -> Result<T, Terminated> {
let panic_reporter = PanicReporter::new(self);
let res = f();
Expand Down
2 changes: 1 addition & 1 deletion node/libs/protobuf/src/proto_fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ pub(super) fn read_fields(
}

/// Converts an encoded protobuf message to its canonical form, given the descriptor of the message
/// type. Retuns an error if:
/// type. Returns an error if:
/// * an unknown field is detected
/// * the message type doesn't support canonical encoding (implicit presence, map fields)
pub fn canonical_raw(
Expand Down
2 changes: 1 addition & 1 deletion node/libs/roles/src/proto/validator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ message NetAddress {
// Currently the IP of the validator is static, but this scheme might also
// be used to provide dynamic IP support (if validator discovers that its
// own IP has changed - by pinging a trusted STUN server for example - it can
// broadcast a new discovery message), or (mutli)proxy support (a validator
// broadcast a new discovery message), or (multi)proxy support (a validator
// may maintain a dynamic set of trusted proxy servers which forward traffic
// to it - this way validator wouldn't have to have a public IP at all).
optional uint64 version = 2; // required
Expand Down
1 change: 0 additions & 1 deletion node/tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ rocksdb.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tokio-retry.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
vise-exporter.workspace = true
Expand Down
97 changes: 61 additions & 36 deletions node/tools/src/bin/deployer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! Deployer for the kubernetes cluster.
use anyhow::Context;
use clap::{Parser, Subcommand};
use std::{fs, net::SocketAddr, path::PathBuf, str::FromStr};
use std::{collections::HashMap, fs, path::PathBuf};
use zksync_consensus_crypto::{Text, TextFmt};
use zksync_consensus_roles::{node, validator};
use zksync_consensus_tools::{k8s, AppConfig, NodeAddr, NODES_PORT};
use zksync_consensus_tools::{k8s, AppConfig};

/// K8s namespace for consensus nodes.
const NAMESPACE: &str = "consensus";
Expand Down Expand Up @@ -91,54 +91,79 @@ fn generate_config(nodes: usize) -> anyhow::Result<()> {
}

/// Deploys the nodes to the kubernetes cluster.
async fn deploy(nodes: usize, seed_nodes: Option<usize>) -> anyhow::Result<()> {
async fn deploy(nodes_amount: usize, seed_nodes_amount: Option<usize>) -> anyhow::Result<()> {
let client = k8s::get_client().await?;
k8s::create_or_reuse_namespace(&client, NAMESPACE).await?;

let seed_nodes = seed_nodes.unwrap_or(1);

// deploy seed peer(s)
for i in 0..seed_nodes {
k8s::deploy_node(
&client,
i,
true,
vec![], // Seed peers don't have other peer information
NAMESPACE,
)
.await?;
let seed_nodes_amount = seed_nodes_amount.unwrap_or(1);

let seed_nodes = &mut HashMap::new();
let mut non_seed_nodes = HashMap::new();

// Split the nodes in different hash maps as they will be deployed at different stages
let mut consensus_nodes = from_configs(nodes_amount)?;
for (index, node) in consensus_nodes.iter_mut().enumerate() {
if index < seed_nodes_amount {
node.is_seed = true;
seed_nodes.insert(node.id.to_owned(), node);
} else {
non_seed_nodes.insert(node.id.to_owned(), node);
}
}

// obtain seed peer(s) IP(s)
let peer_ips = k8s::get_seed_node_addrs(&client, seed_nodes, NAMESPACE).await?;

let mut peers = vec![];
// Deploy seed peer(s)
for node in seed_nodes.values_mut() {
node.deploy(&client, NAMESPACE).await?;
}

for i in 0..seed_nodes {
let node_id = &format!("consensus-node-{i:0>2}");
let node_key = read_node_key_from_config(node_id)?;
let address = peer_ips.get(node_id).context("IP address not found")?;
peers.push(NodeAddr {
key: node_key.public(),
addr: SocketAddr::from_str(&format!("{address}:{NODES_PORT}"))?,
});
// Fetch and complete node addrs into seed nodes
for node in seed_nodes.values_mut() {
node.fetch_and_assign_pod_ip(&client, NAMESPACE).await?;
}

// deploy the rest of nodes
for i in seed_nodes..nodes {
k8s::deploy_node(&client, i, false, peers.clone(), NAMESPACE).await?;
// Build a vector of seed peers NodeAddrs to provide as gossip_static_outbound to the rest of the nodes
let peers: Vec<_> = seed_nodes
.values()
.map(|n| {
n.node_addr
.as_ref()
.expect("Seed node address not defined")
.clone()
})
.collect();

// Deploy the rest of the nodes
for node in non_seed_nodes.values_mut() {
node.gossip_static_outbound = peers.clone();
node.deploy(&client, NAMESPACE).await?;
}

Ok(())
}

/// Obtain node key from config file.
fn read_node_key_from_config(node_id: &String) -> anyhow::Result<node::SecretKey> {
/// Build ConsensusNodes representation list from configurations
// TODO once we can provide config via cli args, this will be replaced
// using in-memory config structs
fn from_configs(nodes: usize) -> anyhow::Result<Vec<k8s::ConsensusNode>> {
let manifest_path = std::env::var("CARGO_MANIFEST_DIR")?;
let root = PathBuf::from(manifest_path).join("k8s_configs");
let node_key_path = root.join(node_id).join("node_key");
let key = fs::read_to_string(node_key_path).context("failed reading file")?;
Text::new(&key).decode().context("failed decoding key")
let mut consensus_nodes = vec![];

for i in 0..nodes {
let node_id = format!("consensus-node-{i:0>2}");
let node_key_path = root.join(&node_id).join("node_key");
let key_string = fs::read_to_string(node_key_path).context("failed reading file")?;
let key = Text::new(&key_string)
.decode()
.context("failed decoding key")?;
consensus_nodes.push(k8s::ConsensusNode {
id: node_id,
key,
node_addr: None,
is_seed: false,
gossip_static_outbound: vec![],
});
}
Ok(consensus_nodes)
}

#[tokio::main]
Expand Down
Loading

0 comments on commit 2afdb72

Please sign in to comment.