Skip to content

Commit

Permalink
feat(prover): Add cluster name autodetection (#3227)
Browse files Browse the repository at this point in the history
## What ❔

If cluster_name isn't provided via flag, get it from GCP. Removes unused
`cluster_name` config option.

<!-- What are the changes this PR brings about? -->
<!-- Example: This PR adds a PR template to the repo. -->
<!-- (For bigger PRs adding more context is appreciated) -->

## Why ❔

To reduce possibility of errors in configs.
<!-- Why are these changes done? What goal do they contribute to? What
are the principles behind them? -->
<!-- Example: PR templates ensure PR reviewers, observers, and future
iterators are in context about the evolution of repos. -->

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [x] Code has been formatted via `zkstack dev fmt` and `zkstack dev
lint`.

ref ZKD-1855
  • Loading branch information
yorik authored Nov 5, 2024
1 parent 1b33b5e commit bd32aec
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 10 deletions.
2 changes: 0 additions & 2 deletions prover/crates/bin/prover_autoscaler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ pub struct ProverAutoscalerAgentConfig {
/// List of namespaces to watch.
#[serde(default = "ProverAutoscalerAgentConfig::default_namespaces")]
pub namespaces: Vec<String>,
/// Watched cluster name. Also can be set via flag.
pub cluster_name: Option<String>,
/// If dry-run enabled don't do any k8s updates, just report success.
#[serde(default = "ProverAutoscalerAgentConfig::default_dry_run")]
pub dry_run: bool,
Expand Down
32 changes: 31 additions & 1 deletion prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use std::{collections::HashMap, sync::Arc};

use anyhow::Context;
use chrono::{DateTime, Utc};
use futures::{stream, StreamExt, TryStreamExt};
use k8s_openapi::api;
use kube::{
api::{Api, ResourceExt},
runtime::{watcher, WatchStreamExt},
};
use reqwest::{
header::{HeaderMap, HeaderValue},
Method,
};
use tokio::sync::Mutex;
use zksync_utils::http_with_retries::send_request_with_retries;

use crate::cluster_types::{Cluster, Deployment, Namespace, Pod, ScaleEvent};

Expand All @@ -17,13 +23,37 @@ pub struct Watcher {
pub cluster: Arc<Mutex<Cluster>>,
}

async fn get_cluster_name() -> anyhow::Result<String> {
let mut headers = HeaderMap::new();
headers.insert("Metadata-Flavor", HeaderValue::from_static("Google"));
let url = "http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name";
let response = send_request_with_retries(url, 5, Method::GET, Some(headers), None).await;
response
.map_err(|err| anyhow::anyhow!("Failed fetching response from url: {url}: {err:?}"))?
.text()
.await
.context("Failed to read response as text")
}

impl Watcher {
pub fn new(client: kube::Client, cluster_name: String, namespaces: Vec<String>) -> Self {
pub async fn new(
client: kube::Client,
cluster_name: Option<String>,
namespaces: Vec<String>,
) -> Self {
let mut ns = HashMap::new();
namespaces.into_iter().for_each(|n| {
ns.insert(n, Namespace::default());
});

let cluster_name = match cluster_name {
Some(c) => c,
None => get_cluster_name()
.await
.expect("Load cluster_name from GCP"),
};
tracing::info!("Agent cluster name is {cluster_name}");

Self {
client,
cluster: Arc::new(Mutex::new(Cluster {
Expand Down
10 changes: 3 additions & 7 deletions prover/crates/bin/prover_autoscaler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,16 @@ async fn main() -> anyhow::Result<()> {

match opt.job {
AutoscalerType::Agent => {
let cluster = opt
.cluster_name
.context("cluster_name is required for Agent")?;
tracing::info!("Starting ProverAutoscaler Agent for cluster {}", cluster);
tracing::info!("Starting ProverAutoscaler Agent");
let agent_config = general_config.agent_config.context("agent_config")?;
let exporter_config = PrometheusExporterConfig::pull(agent_config.prometheus_port);
tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone())));

let _ = rustls::crypto::ring::default_provider().install_default();
let client = kube::Client::try_default().await?;

// TODO: maybe get cluster name from curl -H "Metadata-Flavor: Google"
// http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name
let watcher = Watcher::new(client.clone(), cluster, agent_config.namespaces);
let watcher =
Watcher::new(client.clone(), opt.cluster_name, agent_config.namespaces).await;
let scaler = Scaler::new(client, agent_config.dry_run);
tasks.push(tokio::spawn(watcher.clone().run()));
tasks.push(tokio::spawn(agent::run_server(
Expand Down

0 comments on commit bd32aec

Please sign in to comment.