diff --git a/core/lib/basic_types/src/prover_dal.rs b/core/lib/basic_types/src/prover_dal.rs index bec5a55ced1..d86f79ba77a 100644 --- a/core/lib/basic_types/src/prover_dal.rs +++ b/core/lib/basic_types/src/prover_dal.rs @@ -28,12 +28,6 @@ pub struct ExtendedJobCountStatistics { pub successful: usize, } -#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] -pub struct JobCountStatistics { - pub queued: usize, - pub in_progress: usize, -} - impl Add for ExtendedJobCountStatistics { type Output = ExtendedJobCountStatistics; @@ -47,6 +41,19 @@ impl Add for ExtendedJobCountStatistics { } } +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] +pub struct JobCountStatistics { + pub queued: usize, + pub in_progress: usize, +} + +impl JobCountStatistics { + /// all returns sum of queued and in_progress. + pub fn all(&self) -> usize { + self.queued + self.in_progress + } +} + #[derive(Debug)] pub struct StuckJobs { pub id: u64, diff --git a/core/lib/config/src/configs/prover_autoscaler.rs b/core/lib/config/src/configs/prover_autoscaler.rs index b24a1a26651..d345b53e6f3 100644 --- a/core/lib/config/src/configs/prover_autoscaler.rs +++ b/core/lib/config/src/configs/prover_autoscaler.rs @@ -61,6 +61,8 @@ pub struct ProverAutoscalerScalerConfig { /// Duration after which pending pod considered long pending. #[serde(default = "ProverAutoscalerScalerConfig::default_long_pending_duration")] pub long_pending_duration: Duration, + /// List of simple autoscaler targets. + pub scaler_targets: Vec, } #[derive( @@ -93,6 +95,41 @@ pub enum Gpu { A100, } +// TODO: generate this enum by QueueReport from https://github.com/matter-labs/zksync-era/blob/main/prover/crates/bin/prover_job_monitor/src/autoscaler_queue_reporter.rs#L23 +// and remove allowing of non_camel_case_types by generating field name parser. +#[derive(Debug, Display, PartialEq, Eq, Hash, Clone, Deserialize, EnumString, Default)] +#[allow(non_camel_case_types)] +pub enum QueueReportFields { + #[strum(ascii_case_insensitive)] + basic_witness_jobs, + #[strum(ascii_case_insensitive)] + leaf_witness_jobs, + #[strum(ascii_case_insensitive)] + node_witness_jobs, + #[strum(ascii_case_insensitive)] + recursion_tip_witness_jobs, + #[strum(ascii_case_insensitive)] + scheduler_witness_jobs, + #[strum(ascii_case_insensitive)] + proof_compressor_jobs, + #[default] + #[strum(ascii_case_insensitive)] + prover_jobs, +} + +/// ScalerTarget can be configured to autoscale any of services for which queue is reported by +/// prover-job-monitor, except of provers. Provers need special treatment due to GPU requirement. +#[derive(Debug, Clone, PartialEq, Deserialize, Default)] +pub struct ScalerTarget { + pub queue_report_field: QueueReportFields, + pub pod_name_prefix: String, + /// Max replicas per cluster. + pub max_replicas: HashMap, + /// The queue will be divided by the speed and rounded up to get number of replicas. + #[serde(default = "ScalerTarget::default_speed")] + pub speed: usize, +} + impl ProverAutoscalerConfig { /// Default graceful shutdown timeout -- 5 seconds pub fn default_graceful_shutdown_timeout() -> Duration { @@ -126,3 +163,9 @@ impl ProverAutoscalerScalerConfig { Duration::minutes(10) } } + +impl ScalerTarget { + pub fn default_speed() -> usize { + 1 + } +} diff --git a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto index 9b7f201e9b7..0f723e22a93 100644 --- a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto +++ b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto @@ -45,15 +45,28 @@ message MinProver { optional uint32 min = 2; // required } +message MaxReplica { + optional string cluster = 1; // required + optional uint64 max = 2; // required +} + +message ScalerTarget { + optional string queue_report_field = 1; // required + optional string pod_name_prefix = 2; // required + repeated MaxReplica max_replicas = 3; // required at least one + optional uint64 speed = 4; // optional +} + message ProverAutoscalerScalerConfig { optional uint32 prometheus_port = 1; // required optional std.Duration scaler_run_interval = 2; // optional optional string prover_job_monitor_url = 3; // required repeated string agents = 4; // required at least one - repeated ProtocolVersion protocol_versions = 5; // repeated at least one + repeated ProtocolVersion protocol_versions = 5; // required at least one repeated ClusterPriority cluster_priorities = 6; // optional repeated ProverSpeed prover_speed = 7; // optional optional uint32 long_pending_duration_s = 8; // optional repeated MaxProver max_provers = 9; // optional repeated MinProver min_provers = 10; // optional + repeated ScalerTarget scaler_targets = 11; // optional } diff --git a/core/lib/protobuf_config/src/prover_autoscaler.rs b/core/lib/protobuf_config/src/prover_autoscaler.rs index 51f1b162d4c..c3e7c9719f1 100644 --- a/core/lib/protobuf_config/src/prover_autoscaler.rs +++ b/core/lib/protobuf_config/src/prover_autoscaler.rs @@ -112,6 +112,12 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig { .map(|(i, e)| e.read().context(i)) .collect::>() .context("min_provers")?, + scaler_targets: self + .scaler_targets + .iter() + .enumerate() + .map(|(i, x)| x.read().context(i).unwrap()) + .collect::>(), }) } @@ -151,6 +157,7 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig { .iter() .map(|(k, v)| proto::MinProver::build(&(k.clone(), *v))) .collect(), + scaler_targets: this.scaler_targets.iter().map(ProtoRepr::build).collect(), } } } @@ -238,3 +245,57 @@ impl ProtoRepr for proto::MinProver { } } } + +impl ProtoRepr for proto::MaxReplica { + type Type = (String, usize); + fn read(&self) -> anyhow::Result { + Ok(( + required(&self.cluster).context("cluster")?.parse()?, + *required(&self.max).context("max")? as usize, + )) + } + fn build(this: &Self::Type) -> Self { + Self { + cluster: Some(this.0.to_string()), + max: Some(this.1 as u64), + } + } +} + +impl ProtoRepr for proto::ScalerTarget { + type Type = configs::prover_autoscaler::ScalerTarget; + fn read(&self) -> anyhow::Result { + Ok(Self::Type { + queue_report_field: required(&self.queue_report_field) + .and_then(|x| Ok((*x).parse()?)) + .context("queue_report_field")?, + pod_name_prefix: required(&self.pod_name_prefix) + .context("pod_name_prefix")? + .clone(), + max_replicas: self + .max_replicas + .iter() + .enumerate() + .map(|(i, e)| e.read().context(i)) + .collect::>() + .context("max_replicas")?, + speed: match self.speed { + Some(x) => x as usize, + None => Self::Type::default_speed(), + }, + }) + } + + fn build(this: &Self::Type) -> Self { + Self { + queue_report_field: Some(this.queue_report_field.to_string()), + pod_name_prefix: Some(this.pod_name_prefix.clone()), + max_replicas: this + .max_replicas + .iter() + .map(|(k, v)| proto::MaxReplica::build(&(k.clone(), *v))) + .collect(), + speed: Some(this.speed as u64), + } + } +} diff --git a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs index e3e4c9b4df0..db215e570ef 100644 --- a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs +++ b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs @@ -40,6 +40,7 @@ pub struct Namespace { #[serde(serialize_with = "ordered_map")] pub deployments: HashMap, pub pods: HashMap, + #[serde(default)] pub scale_errors: Vec, } @@ -64,4 +65,5 @@ pub enum PodStatus { Pending, LongPending, NeedToMove, + Failed, } diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index 32610ebf3c3..e2cd1c6a4fb 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -1,17 +1,22 @@ -use std::collections::HashMap; +use std::{collections::HashMap, ops::Deref}; use anyhow::{Context, Ok}; use reqwest::Method; -use zksync_prover_job_monitor::autoscaler_queue_reporter::VersionedQueueReport; +use zksync_config::configs::prover_autoscaler::QueueReportFields; +use zksync_prover_job_monitor::autoscaler_queue_reporter::{QueueReport, VersionedQueueReport}; use zksync_utils::http_with_retries::send_request_with_retries; use crate::metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}; const MAX_RETRIES: usize = 5; -#[derive(Debug)] -pub struct Queue { - pub queue: HashMap, +pub struct Queue(HashMap<(String, QueueReportFields), u64>); + +impl Deref for Queue { + type Target = HashMap<(String, QueueReportFields), u64>; + fn deref(&self) -> &Self::Target { + &self.0 + } } #[derive(Default)] @@ -19,6 +24,19 @@ pub struct Queuer { pub prover_job_monitor_url: String, } +fn target_to_queue(target: &QueueReportFields, report: &QueueReport) -> u64 { + let res = match target { + QueueReportFields::basic_witness_jobs => report.basic_witness_jobs.all(), + QueueReportFields::leaf_witness_jobs => report.leaf_witness_jobs.all(), + QueueReportFields::node_witness_jobs => report.node_witness_jobs.all(), + QueueReportFields::recursion_tip_witness_jobs => report.recursion_tip_witness_jobs.all(), + QueueReportFields::scheduler_witness_jobs => report.scheduler_witness_jobs.all(), + QueueReportFields::proof_compressor_jobs => report.proof_compressor_jobs.all(), + QueueReportFields::prover_jobs => report.prover_jobs.all(), + }; + res as u64 +} + impl Queuer { pub fn new(pjm_url: String) -> Self { Self { @@ -26,12 +44,14 @@ impl Queuer { } } - pub async fn get_queue(&self) -> anyhow::Result { + /// Requests queue report from prover-job-monitor and parse it into Queue HashMap for provided + /// list of jobs. + pub async fn get_queue(&self, jobs: &[QueueReportFields]) -> anyhow::Result { let url = &self.prover_job_monitor_url; let response = send_request_with_retries(url, MAX_RETRIES, Method::GET, None, None).await; let response = response.map_err(|err| { AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc(); - anyhow::anyhow!("Failed fetching queue from url: {url}: {err:?}") + anyhow::anyhow!("Failed fetching queue from URL: {url}: {err:?}") })?; AUTOSCALER_METRICS.calls[&(url.clone(), response.status().as_u16())].inc(); @@ -39,11 +59,18 @@ impl Queuer { .json::>() .await .context("Failed to read response as json")?; - Ok(Queue { - queue: response + Ok(Queue( + response .iter() - .map(|x| (x.version.to_string(), x.report.prover_jobs.queued as u64)) + .flat_map(|versioned_report| { + jobs.iter().map(move |j| { + ( + (versioned_report.version.to_string(), j.clone()), + target_to_queue(j, &versioned_report.report), + ) + }) + }) .collect::>(), - }) + )) } } diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index eb4249d071f..1bdd2b25104 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -4,7 +4,9 @@ use chrono::Utc; use debug_map_sorted::SortedOutputExt; use once_cell::sync::Lazy; use regex::Regex; -use zksync_config::configs::prover_autoscaler::{Gpu, ProverAutoscalerScalerConfig}; +use zksync_config::configs::prover_autoscaler::{ + Gpu, ProverAutoscalerScalerConfig, QueueReportFields, ScalerTarget, +}; use super::{queuer, watcher}; use crate::{ @@ -65,6 +67,12 @@ pub struct Scaler { watcher: watcher::Watcher, queuer: queuer::Queuer, + jobs: Vec, + prover_scaler: GpuScaler, + simple_scalers: Vec, +} + +pub struct GpuScaler { /// Which cluster to use first. cluster_priorities: HashMap, min_provers: HashMap, @@ -73,6 +81,16 @@ pub struct Scaler { long_pending_duration: chrono::Duration, } +pub struct SimpleScaler { + queue_report_field: QueueReportFields, + pod_name_prefix: String, + /// Which cluster to use first. + cluster_priorities: HashMap, + max_replicas: HashMap, + speed: usize, + long_pending_duration: chrono::Duration, +} + struct ProverPodGpu<'a> { name: &'a str, pod: &'a Pod, @@ -102,10 +120,31 @@ impl Scaler { AUTOSCALER_METRICS.prover_protocol_version[&(namespace.clone(), version.clone())] .set(1); }); + + let mut simple_scalers = Vec::default(); + let mut jobs = vec![QueueReportFields::prover_jobs]; + for c in &config.scaler_targets { + jobs.push(c.queue_report_field.clone()); + simple_scalers.push(SimpleScaler::new( + c, + config.cluster_priorities.clone(), + chrono::Duration::seconds(config.long_pending_duration.whole_seconds()), + )) + } Self { - namespaces: config.protocol_versions, + namespaces: config.protocol_versions.clone(), watcher, queuer, + jobs, + prover_scaler: GpuScaler::new(config), + simple_scalers, + } + } +} + +impl GpuScaler { + pub fn new(config: ProverAutoscalerScalerConfig) -> Self { + Self { cluster_priorities: config.cluster_priorities, min_provers: config.min_provers, max_provers: config.max_provers, @@ -116,6 +155,7 @@ impl Scaler { } } + /// Converts a single cluster into vec of GPUPools, one for each GPU. fn convert_to_gpu_pool(&self, namespace: &String, cluster: &Cluster) -> Vec { let mut gp_map = HashMap::new(); // let Some(namespace_value) = &cluster.namespaces.get(namespace) else { @@ -218,6 +258,10 @@ impl Scaler { .then(b.max_pool_size.cmp(&a.max_pool_size)) // Reverse sort by cluster size. }); + gpu_pools.iter().for_each(|p| { + AUTOSCALER_METRICS.scale_errors[&p.name.clone()].set(p.scale_errors as u64); + }); + gpu_pools } @@ -323,6 +367,192 @@ impl Scaler { } } +#[derive(Default, Debug, PartialEq, Eq)] +struct Pool { + name: String, + pods: HashMap, + scale_errors: usize, + max_pool_size: usize, +} + +impl Pool { + fn sum_by_pod_status(&self, ps: PodStatus) -> usize { + self.pods.get(&ps).cloned().unwrap_or(0) + } +} + +impl SimpleScaler { + pub fn new( + config: &ScalerTarget, + cluster_priorities: HashMap, + long_pending_duration: chrono::Duration, + ) -> Self { + Self { + queue_report_field: config.queue_report_field.clone(), + pod_name_prefix: config.pod_name_prefix.clone(), + cluster_priorities, + max_replicas: config.max_replicas.clone(), + speed: config.speed, + long_pending_duration, + } + } + + fn convert_to_pool(&self, namespace: &String, cluster: &Cluster) -> Option { + let Some(namespace_value) = &cluster.namespaces.get(namespace) else { + // No namespace in config, ignoring. + return None; + }; + + // TODO: Check if related deployment exists. + let mut pool = Pool { + name: cluster.name.clone(), + max_pool_size: self.max_replicas.get(&cluster.name).copied().unwrap_or(0), + scale_errors: namespace_value + .scale_errors + .iter() + .filter(|v| v.time < Utc::now() - chrono::Duration::hours(1)) // TODO Move the duration into config. + .count(), + ..Default::default() + }; + + // Initialize pool only if we have ready deployments. + pool.pods.insert(PodStatus::Running, 0); + + let pod_re = Regex::new(&format!("^{}-", self.pod_name_prefix)).unwrap(); + for (_, pod) in namespace_value + .pods + .iter() + .filter(|(name, _)| pod_re.is_match(name)) + { + let mut status = PodStatus::from_str(&pod.status).unwrap_or_default(); + if status == PodStatus::Pending && pod.changed < Utc::now() - self.long_pending_duration + { + status = PodStatus::LongPending; + } + pool.pods.entry(status).and_modify(|n| *n += 1).or_insert(1); + } + + tracing::debug!("Pool pods {:?}", pool); + + Some(pool) + } + + fn sorted_clusters(&self, namespace: &String, clusters: &Clusters) -> Vec { + let mut pools: Vec = clusters + .clusters + .values() + .flat_map(|c| self.convert_to_pool(namespace, c)) + .collect(); + + pools.sort_by(|a, b| { + a.sum_by_pod_status(PodStatus::NeedToMove) + .cmp(&b.sum_by_pod_status(PodStatus::NeedToMove)) // Sort by need to evict. + .then( + a.sum_by_pod_status(PodStatus::LongPending) + .cmp(&b.sum_by_pod_status(PodStatus::LongPending)), + ) // Sort by long Pending pods. + .then(a.scale_errors.cmp(&b.scale_errors)) // Sort by scale_errors in the cluster. + .then( + self.cluster_priorities + .get(&a.name) + .unwrap_or(&1000) + .cmp(self.cluster_priorities.get(&b.name).unwrap_or(&1000)), + ) // Sort by priority. + .then(b.max_pool_size.cmp(&a.max_pool_size)) // Reverse sort by cluster size. + }); + + pools + } + + fn pods_to_speed(&self, n: usize) -> u64 { + (self.speed * n) as u64 + } + + fn normalize_queue(&self, queue: u64) -> u64 { + let speed = self.speed as u64; + // Divide and round up if there's any remainder. + (queue + speed - 1) / speed * speed + } + + fn run(&self, namespace: &String, queue: u64, clusters: &Clusters) -> HashMap { + let sorted_clusters = self.sorted_clusters(namespace, clusters); + tracing::debug!( + "Sorted clusters for namespace {}: {:?}", + namespace, + &sorted_clusters + ); + + let mut total: i64 = 0; + let mut pods: HashMap = HashMap::new(); + for cluster in &sorted_clusters { + for (status, replicas) in &cluster.pods { + match status { + PodStatus::Running | PodStatus::Pending => { + total += self.pods_to_speed(*replicas) as i64; + pods.entry(cluster.name.clone()) + .and_modify(|x| *x += replicas) + .or_insert(*replicas); + } + _ => (), // Ignore LongPending as not running here. + } + } + } + + // Remove unneeded pods. + if (total as u64) > self.normalize_queue(queue) { + for cluster in sorted_clusters.iter().rev() { + let mut excess_queue = total as u64 - self.normalize_queue(queue); + let mut excess_pods = excess_queue as usize / self.speed; + let replicas = pods.entry(cluster.name.clone()).or_default(); + if *replicas < excess_pods { + excess_pods = *replicas; + excess_queue = *replicas as u64 * self.speed as u64; + } + *replicas -= excess_pods; + total -= excess_queue as i64; + if total <= 0 { + break; + }; + } + } + + // Reduce load in over capacity pools. + for cluster in &sorted_clusters { + let replicas = pods.entry(cluster.name.clone()).or_default(); + if cluster.max_pool_size < *replicas { + let excess = *replicas - cluster.max_pool_size; + total -= (excess * self.speed) as i64; + *replicas -= excess; + } + } + + tracing::debug!("Queue covered with provers: {}", total); + // Add required pods. + if (total as u64) < queue { + for cluster in &sorted_clusters { + let mut required_queue = queue - total as u64; + let mut required_pods = self.normalize_queue(required_queue) as usize / self.speed; + let replicas = pods.entry(cluster.name.clone()).or_default(); + if *replicas + required_pods > cluster.max_pool_size { + required_pods = cluster.max_pool_size - *replicas; + required_queue = (required_pods * self.speed) as u64; + } + *replicas += required_pods; + total += required_queue as i64; + } + } + + tracing::debug!( + "run result for namespace {}: provers {:?}, total: {}", + namespace, + &pods, + total + ); + + pods + } +} + fn diff( namespace: &str, provers: HashMap, @@ -383,7 +613,7 @@ fn is_namespace_running(namespace: &str, clusters: &Clusters) -> bool { #[async_trait::async_trait] impl Task for Scaler { async fn invoke(&self) -> anyhow::Result<()> { - let queue = self.queuer.get_queue().await.unwrap(); + let queue = self.queuer.get_queue(&self.jobs).await.unwrap(); let mut scale_requests: HashMap = HashMap::new(); { @@ -396,16 +626,38 @@ impl Task for Scaler { } for (ns, ppv) in &self.namespaces { - let q = queue.queue.get(ppv).cloned().unwrap_or(0); + // Prover + let q = queue + .get(&(ppv.to_string(), QueueReportFields::prover_jobs)) + .cloned() + .unwrap_or(0); tracing::debug!("Running eval for namespace {ns} and PPV {ppv} found queue {q}"); if q > 0 || is_namespace_running(ns, &guard.clusters) { - let provers = self.run(ns, q, &guard.clusters); + let provers = self.prover_scaler.run(ns, q, &guard.clusters); for (k, num) in &provers { AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)] .set(*num as u64); } diff(ns, provers, &guard.clusters, &mut scale_requests); } + + // Simple Scalers. + for scaler in &self.simple_scalers { + let q = queue + .get(&(ppv.to_string(), scaler.queue_report_field.clone())) + .cloned() + .unwrap_or(0); + tracing::debug!("Running eval for namespace {ns}, PPV {ppv}, simple scaler {} found queue {q}", scaler.pod_name_prefix); + if q > 0 || is_namespace_running(ns, &guard.clusters) { + let pods = scaler.run(ns, q, &guard.clusters); + for (k, num) in &pods { + AUTOSCALER_METRICS.jobs + [&(scaler.pod_name_prefix.clone(), k.clone(), ns.clone())] + .set(*num as u64); + } + // TODO: diff and add into scale_requests. + } + } } } // Unlock self.watcher.data. @@ -420,28 +672,21 @@ impl Task for Scaler { #[cfg(test)] mod tests { use super::*; - use crate::{ - cluster_types::{Deployment, Namespace, Pod}, - global::{queuer, watcher}, - }; + use crate::cluster_types::{Deployment, Namespace, Pod, ScaleEvent}; #[tracing_test::traced_test] #[test] fn test_run() { - let scaler = Scaler::new( - watcher::Watcher::default(), - queuer::Queuer::default(), - ProverAutoscalerScalerConfig { - cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), - min_provers: [("prover-other".into(), 2)].into(), - max_provers: [ - ("foo".into(), [(Gpu::L4, 100)].into()), - ("bar".into(), [(Gpu::L4, 100)].into()), - ] - .into(), - ..Default::default() - }, - ); + let scaler = GpuScaler::new(ProverAutoscalerScalerConfig { + cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), + min_provers: [("prover-other".into(), 2)].into(), + max_provers: [ + ("foo".into(), [(Gpu::L4, 100)].into()), + ("bar".into(), [(Gpu::L4, 100)].into()), + ] + .into(), + ..Default::default() + }); assert_eq!( scaler.run( @@ -570,20 +815,16 @@ mod tests { #[tracing_test::traced_test] #[test] fn test_run_min_provers() { - let scaler = Scaler::new( - watcher::Watcher::default(), - queuer::Queuer::default(), - ProverAutoscalerScalerConfig { - cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), - min_provers: [("prover".into(), 2)].into(), - max_provers: [ - ("foo".into(), [(Gpu::L4, 100)].into()), - ("bar".into(), [(Gpu::L4, 100)].into()), - ] - .into(), - ..Default::default() - }, - ); + let scaler = GpuScaler::new(ProverAutoscalerScalerConfig { + cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), + min_provers: [("prover".into(), 2)].into(), + max_provers: [ + ("foo".into(), [(Gpu::L4, 100)].into()), + ("bar".into(), [(Gpu::L4, 100)].into()), + ] + .into(), + ..Default::default() + }); assert_eq!( scaler.run( @@ -765,4 +1006,121 @@ mod tests { "Min 2 provers, 5 running" ); } + + #[tracing_test::traced_test] + #[test] + fn test_run_need_move() { + let scaler = GpuScaler::new(ProverAutoscalerScalerConfig { + cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), + min_provers: [("prover".into(), 2)].into(), + max_provers: [ + ("foo".into(), [(Gpu::L4, 100)].into()), + ("bar".into(), [(Gpu::L4, 100)].into()), + ] + .into(), + long_pending_duration: ProverAutoscalerScalerConfig::default_long_pending_duration(), + ..Default::default() + }); + + assert_eq!( + scaler.run( + &"prover".into(), + 1400, + &Clusters { + clusters: [ + ( + "foo".into(), + Cluster { + name: "foo".into(), + namespaces: [( + "prover".into(), + Namespace { + deployments: [( + "circuit-prover-gpu".into(), + Deployment { + running: 3, + desired: 3, + }, + )] + .into(), + pods: [ + ( + "circuit-prover-gpu-7c5f8fc747-gmtcr".into(), + Pod { + status: "Running".into(), + changed: Utc::now(), + ..Default::default() + }, + ), + ( + "circuit-prover-gpu-7c5f8fc747-gmtc2".into(), + Pod { + status: "Pending".into(), + changed: Utc::now(), + ..Default::default() + }, + ), + ( + "circuit-prover-gpu-7c5f8fc747-gmtc3".into(), + Pod { + status: "Running".into(), + changed: Utc::now(), + ..Default::default() + }, + ) + ] + .into(), + scale_errors: vec![ScaleEvent { + name: "circuit-prover-gpu-7c5f8fc747-gmtc2.123456" + .into(), + time: Utc::now() - chrono::Duration::hours(1) + }], + }, + )] + .into(), + }, + ), + ( + "bar".into(), + Cluster { + name: "bar".into(), + namespaces: [( + "prover".into(), + Namespace { + deployments: [( + "circuit-prover-gpu".into(), + Deployment::default(), + )] + .into(), + ..Default::default() + }, + )] + .into(), + }, + ) + ] + .into(), + ..Default::default() + }, + ), + [ + ( + GPUPoolKey { + cluster: "foo".into(), + gpu: Gpu::L4, + }, + 2, + ), + ( + GPUPoolKey { + cluster: "bar".into(), + gpu: Gpu::L4, + }, + 1, + ) + ] + .into(), + "Move 1 prover to bar" + ); + } } diff --git a/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs index 5384db082bc..707ff04f183 100644 --- a/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs @@ -134,6 +134,11 @@ impl Watcher { } pod.status = phase; + if pod.status == "Succeeded" || pod.status == "Failed" { + // Cleaning up list of pods. + v.pods.remove(&p.name_any()); + } + tracing::info!("Got pod: {}", p.name_any()) } Watched::Event(e) => { diff --git a/prover/crates/bin/prover_autoscaler/src/metrics.rs b/prover/crates/bin/prover_autoscaler/src/metrics.rs index d94ac8b97e9..853e3db000f 100644 --- a/prover/crates/bin/prover_autoscaler/src/metrics.rs +++ b/prover/crates/bin/prover_autoscaler/src/metrics.rs @@ -10,10 +10,13 @@ pub(crate) struct AutoscalerMetrics { pub prover_protocol_version: LabeledFamily<(String, String), Gauge, 2>, #[metrics(labels = ["target_cluster", "target_namespace", "gpu"])] pub provers: LabeledFamily<(String, String, Gpu), Gauge, 3>, + #[metrics(labels = ["job", "target_cluster", "target_namespace"])] + pub jobs: LabeledFamily<(String, String, String), Gauge, 3>, pub clusters_not_ready: Counter, #[metrics(labels = ["target", "status"])] pub calls: LabeledFamily<(String, u16), Counter, 2>, - // TODO: count of command send succes/fail + #[metrics(labels = ["target_cluster"])] + pub scale_errors: LabeledFamily, 1>, } #[vise::register]