Skip to content

Commit

Permalink
Use of filter_map and add context to errors
Browse files Browse the repository at this point in the history
  • Loading branch information
IAvecilla committed Feb 22, 2024
1 parent 573d238 commit abab39a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 29 deletions.
8 changes: 6 additions & 2 deletions node/tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,19 @@ fn get_config_path() -> PathBuf {
/// Generate a config file with the IPs of the consensus nodes in the kubernetes cluster.
pub async fn generate_config() -> anyhow::Result<()> {
let client = k8s::get_client().await?;
let pods_ip = k8s::get_consensus_node_ips(&client).await?;
let pods_ip = k8s::get_consensus_nodes_address(&client)
.await
.context("Failed to get consensus pods address")?;
let config_file_path = get_config_path();
for addr in pods_ip {
let mut config_file = fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&config_file_path)?;
config_file.write_all(addr.to_string().as_bytes())?;
config_file
.write_all(addr.to_string().as_bytes())
.with_context(|| "Failed to write to config file")?;
}
Ok(())
}
Expand Down
63 changes: 36 additions & 27 deletions node/tools/src/k8s.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{config, NodeAddr};
use anyhow::{anyhow, bail, ensure, Context};
use anyhow::{anyhow, ensure, Context};
use k8s_openapi::{
api::{
apps::v1::{Deployment, DeploymentSpec},
core::v1::{Container, Namespace, Pod, PodSpec, PodStatus, PodTemplateSpec},
core::v1::{Container, Namespace, Pod, PodSpec, PodTemplateSpec},
},
apimachinery::pkg::apis::meta::v1::LabelSelector,
};
Expand Down Expand Up @@ -31,43 +31,52 @@ pub async fn get_client() -> anyhow::Result<Client> {
}

/// Get the IP addresses and the exposed port of the RPC server of the consensus nodes in the kubernetes cluster.
pub async fn get_consensus_node_ips(client: &Client) -> anyhow::Result<Vec<SocketAddr>> {
pub async fn get_consensus_nodes_address(client: &Client) -> anyhow::Result<Vec<SocketAddr>> {
let pods: Api<Pod> = Api::namespaced(client.clone(), DEFAULT_NAMESPACE);
let lp = ListParams::default();
let pods = pods.list(&lp).await?;
ensure!(
!pods.items.is_empty(),
"No consensus pods found in the k8s cluster"
);
let pods_addresses: Result<Vec<SocketAddr>, _> = pods
let pod_addresses: Vec<SocketAddr> = pods
.into_iter()
.filter(|pod| {
let docker_image = pod
.spec
.clone()
.and_then(|spec| spec.containers[0].clone().image);
if let Some(docker_image) = docker_image {
docker_image.contains(DOCKER_IMAGE_NAME)
.filter_map(|pod| {
let pod_spec = pod.spec.clone().context("Failed to get pod spec").ok()?;
let pod_running_container = pod_spec
.containers
.first()
.context("Failed to get pod container")
.ok()?
.to_owned();
let docker_image = pod_running_container
.image
.context("Failed to get pod docker image")
.ok()?;

if docker_image.contains(DOCKER_IMAGE_NAME) {
let pod_ip = pod
.status
.context("Failed to get pod status")
.ok()?
.pod_ip
.context("Failed to get pod ip")
.ok()?;
let port = pod_running_container.ports?.iter().find_map(|port| {
let port = port.container_port.try_into().ok()?;
(port != config::NODES_PORT).then_some(port)
});
Some(SocketAddr::new(pod_ip.parse().ok()?, port?))
} else {
false
None
}
})
.map(|pod| {
let pod_ip = pod.status.and_then(|status| status.pod_ip);
let port = pod.spec.and_then(|spec| {
spec.containers[0].clone().ports.and_then(|ports| {
ports
.iter()
.find(|port| port.container_port != config::NODES_PORT as i32)
.map(|port| port.container_port)
})
});
let pod_ip = pod_ip.context("pod_ip")?;
let port: u16 = port.context("port")?.try_into().context("port")?;
Ok(SocketAddr::new(pod_ip.parse()?, port))
})
.collect();
pods_addresses
ensure!(
!pod_addresses.is_empty(),
"No consensus pods found in the k8s cluster"
);
Ok(pod_addresses)
}

/// Creates a namespace in k8s cluster
Expand Down

0 comments on commit abab39a

Please sign in to comment.