diff --git a/node/Cargo.lock b/node/Cargo.lock index 8e33dab2..8f242b32 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -3060,17 +3060,6 @@ dependencies = [ "syn 2.0.51", ] -[[package]] -name = "tokio-retry" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" -dependencies = [ - "pin-project", - "rand 0.8.5", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.24.1" @@ -3870,7 +3859,6 @@ dependencies = [ "serde_json", "tempfile", "tokio", - "tokio-retry", "tower", "tracing", "tracing-subscriber", diff --git a/node/Cargo.toml b/node/Cargo.toml index d08616dc..0d2e3bcd 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -81,7 +81,6 @@ test-casing = "0.1.0" thiserror = "1.0.40" time = "0.3.23" tokio = { version = "1.34.0", features = ["full"] } -tokio-retry = "0.3.0" tracing = { version = "0.1.37", features = ["attributes"] } tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } kube = { version = "0.88.1", features = ["runtime", "derive"] } diff --git a/node/tools/Cargo.toml b/node/tools/Cargo.toml index 93e80fa4..3546e97c 100644 --- a/node/tools/Cargo.toml +++ b/node/tools/Cargo.toml @@ -26,7 +26,6 @@ rocksdb.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true -tokio-retry.workspace = true tracing.workspace = true tracing-subscriber.workspace = true vise-exporter.workspace = true diff --git a/node/tools/src/bin/deployer.rs b/node/tools/src/bin/deployer.rs index ccfa2310..78e6bbf8 100644 --- a/node/tools/src/bin/deployer.rs +++ b/node/tools/src/bin/deployer.rs @@ -1,13 +1,12 @@ //! Deployer for the kubernetes cluster. -use std::net::SocketAddr; -use std::str::FromStr; +use std::collections::HashMap; use std::{fs, path::PathBuf}; use anyhow::Context; use clap::{Parser, Subcommand}; use zksync_consensus_crypto::{Text, TextFmt}; use zksync_consensus_roles::{node, validator}; -use zksync_consensus_tools::{k8s, AppConfig, NodeAddr, NODES_PORT}; +use zksync_consensus_tools::{k8s, AppConfig}; /// K8s namespace for consensus nodes. const NAMESPACE: &str = "consensus"; @@ -94,54 +93,79 @@ fn generate_config(nodes: usize) -> anyhow::Result<()> { } /// Deploys the nodes to the kubernetes cluster. -async fn deploy(nodes: usize, seed_nodes: Option) -> anyhow::Result<()> { +async fn deploy(nodes_amount: usize, seed_nodes_amount: Option) -> anyhow::Result<()> { let client = k8s::get_client().await?; k8s::create_or_reuse_namespace(&client, NAMESPACE).await?; - - let seed_nodes = seed_nodes.unwrap_or(1); - - // deploy seed peer(s) - for i in 0..seed_nodes { - k8s::deploy_node( - &client, - i, - true, - vec![], // Seed peers don't have other peer information - NAMESPACE, - ) - .await?; + let seed_nodes_amount = seed_nodes_amount.unwrap_or(1); + + let seed_nodes = &mut HashMap::new(); + let mut non_seed_nodes = HashMap::new(); + + // Split the nodes in different hash maps as they will be deployed at different stages + let mut consensus_nodes = from_configs(nodes_amount)?; + for (index, node) in consensus_nodes.iter_mut().enumerate() { + if index < seed_nodes_amount { + node.is_seed = true; + seed_nodes.insert(node.id.to_owned(), node); + } else { + non_seed_nodes.insert(node.id.to_owned(), node); + } } - // obtain seed peer(s) IP(s) - let peer_ips = k8s::get_seed_node_addrs(&client, seed_nodes, NAMESPACE).await?; - - let mut peers = vec![]; + // Deploy seed peer(s) + for node in seed_nodes.values_mut() { + node.deploy(&client, NAMESPACE).await?; + } - for i in 0..seed_nodes { - let node_id = &format!("consensus-node-{i:0>2}"); - let node_key = read_node_key_from_config(node_id)?; - let address = peer_ips.get(node_id).context("IP address not found")?; - peers.push(NodeAddr { - key: node_key.public(), - addr: SocketAddr::from_str(&format!("{address}:{NODES_PORT}"))?, - }); + // Fetch and complete node addrs into seed nodes + for node in seed_nodes.values_mut() { + node.fetch_and_assign_pod_ip(&client, NAMESPACE).await?; } - // deploy the rest of nodes - for i in seed_nodes..nodes { - k8s::deploy_node(&client, i, false, peers.clone(), NAMESPACE).await?; + // Build a vector of seed peers NodeAddrs to provide as gossip_static_outbound to the rest of the nodes + let peers: Vec<_> = seed_nodes + .values() + .map(|n| { + n.node_addr + .as_ref() + .expect("Seed node address not defined") + .clone() + }) + .collect(); + + // Deploy the rest of the nodes + for node in non_seed_nodes.values_mut() { + node.gossip_static_outbound = peers.clone(); + node.deploy(&client, NAMESPACE).await?; } Ok(()) } -/// Obtain node key from config file. -fn read_node_key_from_config(node_id: &String) -> anyhow::Result { +/// Build ConsensusNodes representation list from configurations +// TODO once we can provide config via cli args, this will be replaced +// using in-memory config structs +fn from_configs(nodes: usize) -> anyhow::Result> { let manifest_path = std::env::var("CARGO_MANIFEST_DIR")?; let root = PathBuf::from(manifest_path).join("k8s_configs"); - let node_key_path = root.join(node_id).join("node_key"); - let key = fs::read_to_string(node_key_path).context("failed reading file")?; - Text::new(&key).decode().context("failed decoding key") + let mut consensus_nodes = vec![]; + + for i in 0..nodes { + let node_id = format!("consensus-node-{i:0>2}"); + let node_key_path = root.join(&node_id).join("node_key"); + let key_string = fs::read_to_string(node_key_path).context("failed reading file")?; + let key = Text::new(&key_string) + .decode() + .context("failed decoding key")?; + consensus_nodes.push(k8s::ConsensusNode { + id: node_id, + key, + node_addr: None, + is_seed: false, + gossip_static_outbound: vec![], + }); + } + Ok(consensus_nodes) } #[tokio::main] diff --git a/node/tools/src/k8s.rs b/node/tools/src/k8s.rs index 89735d03..f53bfa12 100644 --- a/node/tools/src/k8s.rs +++ b/node/tools/src/k8s.rs @@ -1,5 +1,5 @@ use crate::{config, NodeAddr}; -use anyhow::{anyhow, ensure, Context}; +use anyhow::{ensure, Context}; use k8s_openapi::{ api::{ apps::v1::{Deployment, DeploymentSpec}, @@ -12,16 +12,13 @@ use k8s_openapi::{ }; use kube::{ api::{ListParams, PostParams}, - core::{ObjectList, ObjectMeta}, - Api, Client, ResourceExt, + core::ObjectMeta, + Api, Client, }; -use std::{ - collections::{BTreeMap, HashMap}, - net::SocketAddr, -}; -use tokio_retry::strategy::FixedInterval; -use tokio_retry::Retry; +use std::{collections::BTreeMap, net::SocketAddr, time::Duration}; +use tokio::time; use tracing::log::info; +use zksync_consensus_roles::node; use zksync_protobuf::serde::Serde; /// Docker image name for consensus nodes. @@ -30,6 +27,156 @@ const DOCKER_IMAGE_NAME: &str = "consensus-node"; /// K8s namespace for consensus nodes. pub const DEFAULT_NAMESPACE: &str = "consensus"; +/// Consensus Node Representation +#[derive(Debug)] +pub struct ConsensusNode { + /// Node identifier + pub id: String, + /// Node key + pub key: node::SecretKey, + /// Full NodeAddr + pub node_addr: Option, + /// Is seed node (meaning it has no gossipStaticOutbound configuration) + pub is_seed: bool, + /// known gossipStaticOutbound peers + pub gossip_static_outbound: Vec, +} + +impl ConsensusNode { + /// Wait for a deployed consensus node to be ready and have an IP address + pub async fn await_running_pod( + &mut self, + client: &Client, + namespace: &str, + ) -> anyhow::Result { + let pods: Api = Api::namespaced(client.clone(), namespace); + // Wait until the pod is running, otherwise we get an error. + retry(15, Duration::from_millis(1000), || async { + get_running_pod(&pods, &self.id).await + }) + .await + } + + /// Fetchs the pod's IP address and assignts to self.node_addr + pub async fn fetch_and_assign_pod_ip( + &mut self, + client: &Client, + namespace: &str, + ) -> anyhow::Result<()> { + let ip = self + .await_running_pod(client, namespace) + .await? + .status + .context("Status not present")? + .pod_ip + .context("Pod IP address not present")?; + self.node_addr = Some(NodeAddr { + key: self.key.public(), + addr: SocketAddr::new(ip.parse()?, config::NODES_PORT), + }); + Ok(()) + } + + /// Creates a deployment + pub async fn deploy(&self, client: &Client, namespace: &str) -> anyhow::Result<()> { + let cli_args = get_cli_args(&self.gossip_static_outbound); + let deployment = Deployment { + metadata: ObjectMeta { + name: Some(self.id.to_owned()), + namespace: Some(namespace.to_owned()), + ..Default::default() + }, + spec: Some(DeploymentSpec { + selector: LabelSelector { + match_labels: Some(BTreeMap::from([("app".to_owned(), self.id.to_owned())])), + ..Default::default() + }, + replicas: Some(1), + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(BTreeMap::from([ + ("app".to_owned(), self.id.to_owned()), + ("id".to_owned(), self.id.to_owned()), + ("seed".to_owned(), self.is_seed.to_string()), + ])), + ..Default::default() + }), + spec: Some(PodSpec { + containers: vec![Container { + name: self.id.to_owned(), + image: Some(DOCKER_IMAGE_NAME.to_owned()), + env: Some(vec![ + EnvVar { + name: "NODE_ID".to_owned(), + value: Some(self.id.to_owned()), + ..Default::default() + }, + EnvVar { + name: "PUBLIC_ADDR".to_owned(), + value_from: Some(EnvVarSource { + field_ref: Some(ObjectFieldSelector { + field_path: "status.podIP".to_owned(), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }, + ]), + command: Some(vec!["./k8s_entrypoint.sh".to_owned()]), + args: Some(cli_args), + image_pull_policy: Some("Never".to_owned()), + ports: Some(vec![ + ContainerPort { + container_port: i32::from(config::NODES_PORT), + ..Default::default() + }, + ContainerPort { + container_port: 3154, + ..Default::default() + }, + ]), + liveness_probe: Some(Probe { + http_get: Some(HTTPGetAction { + path: Some("/health".to_owned()), + port: Int(3154), + ..Default::default() + }), + ..Default::default() + }), + readiness_probe: Some(Probe { + http_get: Some(HTTPGetAction { + path: Some("/health".to_owned()), + port: Int(3154), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }], + ..Default::default() + }), + }, + ..Default::default() + }), + ..Default::default() + }; + + let deployments: Api = Api::namespaced(client.clone(), namespace); + let post_params = PostParams::default(); + let result = deployments.create(&post_params, &deployment).await?; + + info!( + "Deployment: {} , created", + result + .metadata + .name + .context("Name not defined in metadata")? + ); + Ok(()) + } +} + /// Get a kube client pub async fn get_client() -> anyhow::Result { Ok(Client::try_default().await?) @@ -167,146 +314,18 @@ pub async fn create_tests_deployment(client: &Client) -> anyhow::Result<()> { Ok(()) } -/// Creates a deployment -pub async fn deploy_node( - client: &Client, - node_index: usize, - is_seed: bool, - peers: Vec, - namespace: &str, -) -> anyhow::Result<()> { - let cli_args = get_cli_args(peers); - let node_name = format!("consensus-node-{node_index:0>2}"); - let deployment = Deployment { - metadata: ObjectMeta { - name: Some(node_name.to_owned()), - namespace: Some(namespace.to_owned()), - ..Default::default() - }, - spec: Some(DeploymentSpec { - selector: LabelSelector { - match_labels: Some(BTreeMap::from([("app".to_owned(), node_name.to_owned())])), - ..Default::default() - }, - replicas: Some(1), - template: PodTemplateSpec { - metadata: Some(ObjectMeta { - labels: Some(BTreeMap::from([ - ("app".to_owned(), node_name.to_owned()), - ("id".to_owned(), node_name.to_owned()), - ("seed".to_owned(), is_seed.to_string()), - ])), - ..Default::default() - }), - spec: Some(PodSpec { - containers: vec![Container { - name: node_name.to_owned(), - image: Some("consensus-node".to_owned()), - env: Some(vec![ - EnvVar { - name: "NODE_ID".to_owned(), - value: Some(node_name.to_owned()), - ..Default::default() - }, - EnvVar { - name: "PUBLIC_ADDR".to_owned(), - value_from: Some(EnvVarSource { - field_ref: Some(ObjectFieldSelector { - field_path: "status.podIP".to_owned(), - ..Default::default() - }), - ..Default::default() - }), - ..Default::default() - }, - ]), - command: Some(vec!["./k8s_entrypoint.sh".to_owned()]), - args: Some(cli_args), - image_pull_policy: Some("Never".to_owned()), - ports: Some(vec![ - ContainerPort { - container_port: i32::from(config::NODES_PORT), - ..Default::default() - }, - ContainerPort { - container_port: 3154, - ..Default::default() - }, - ]), - liveness_probe: Some(Probe { - http_get: Some(HTTPGetAction { - path: Some("/health".to_owned()), - port: Int(3154), - ..Default::default() - }), - ..Default::default() - }), - readiness_probe: Some(Probe { - http_get: Some(HTTPGetAction { - path: Some("/health".to_owned()), - port: Int(3154), - ..Default::default() - }), - ..Default::default() - }), - ..Default::default() - }], - ..Default::default() - }), - }, - ..Default::default() - }), - ..Default::default() - }; - - let deployments: Api = Api::namespaced(client.clone(), namespace); - let post_params = PostParams::default(); - let result = deployments.create(&post_params, &deployment).await?; - - info!( - "Deployment: {} , created", - result - .metadata - .name - .context("Name not defined in metadata")? - ); - Ok(()) -} - -/// Returns a HashMap with mapping: node_id -> IP address -pub async fn get_seed_node_addrs( - client: &Client, - amount: usize, - namespace: &str, -) -> anyhow::Result> { - let mut seed_nodes = HashMap::new(); - let pods: Api = Api::namespaced(client.clone(), namespace); - - // Will retry 15 times during 15 seconds to allow pods to start and obtain an IP - let retry_strategy = FixedInterval::from_millis(1000).take(15); - let pod_list = Retry::spawn(retry_strategy, || get_seed_pods(&pods, amount)).await?; - - for p in pod_list { - let node_id = p.labels()["id"].to_owned(); - seed_nodes.insert( - node_id, - p.status - .context("Status not present")? - .pod_ip - .context("Pod IP address not present")?, - ); - } - Ok(seed_nodes) -} - -async fn get_seed_pods(pods: &Api, amount: usize) -> anyhow::Result> { - let lp = ListParams::default().labels("seed=true"); - let p = pods.list(&lp).await?; - if p.items.len() == amount && p.iter().all(is_pod_running) { - Ok(p) - } else { - Err(anyhow!("Pods are not ready")) +async fn get_running_pod(pods: &Api, label: &str) -> anyhow::Result { + let lp = ListParams::default().labels(&format!("app={label}")); + let pod = pods + .list(&lp) + .await? + .items + .pop() + .with_context(|| format!("Pod not found: {label}"))?; + if !is_pod_running(&pod) { + anyhow::bail!("Pod is not running"); } + Ok(pod) } fn is_pod_running(pod: &Pod) -> bool { @@ -318,7 +337,7 @@ fn is_pod_running(pod: &Pod) -> bool { false } -fn get_cli_args(peers: Vec) -> Vec { +fn get_cli_args(peers: &[NodeAddr]) -> Vec { if peers.is_empty() { [].to_vec() } else { @@ -335,3 +354,19 @@ fn get_cli_args(peers: Vec) -> Vec { .to_vec() } } + +async fn retry(retries: usize, delay: Duration, mut f: F) -> anyhow::Result +where + F: FnMut() -> Fut, + Fut: std::future::Future>, +{ + let mut interval = time::interval(delay); + for count in 0.. { + interval.tick().await; + let result = f().await; + if result.is_ok() || count > retries { + return result; + } + } + unreachable!("Loop sould always return") +}