From 52c0a0f19988e485b36d2eefdaaf8b1bf7f5f079 Mon Sep 17 00:00:00 2001 From: Dzmitry Kalabuk Date: Thu, 2 May 2024 12:31:04 +0300 Subject: [PATCH] Add more prometheus metrics --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/controller.rs | 93 ++++++++++++++++------- src/main.rs | 1 + src/metrics.rs | 164 +++++++++++++++++++++++++++++++++++++---- src/query/result.rs | 3 + src/storage/manager.rs | 9 ++- src/storage/state.rs | 14 +++- src/transport/p2p.rs | 23 +++--- 9 files changed, 248 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ecb809e..420fd79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7547,7 +7547,7 @@ dependencies = [ [[package]] name = "worker-rust" -version = "0.3.5" +version = "0.3.6" dependencies = [ "anyhow", "async-stream", diff --git a/Cargo.toml b/Cargo.toml index f377e54..b0996df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "worker-rust" -version = "0.3.5" +version = "0.3.6" edition = "2021" [[bin]] diff --git a/src/controller.rs b/src/controller.rs index 83150ea..a8ce9e1 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -1,5 +1,7 @@ +use crate::types::state::{ChunkRef, ChunkSet}; use crate::{ gateway_allocations::{self, allocations_checker::AllocationsChecker}, + metrics, query::{self, error::QueryError, eth::BatchRequest, result::QueryResult}, storage::manager::StateManager, transport::Transport, @@ -12,7 +14,6 @@ use subsquid_messages::{DatasetChunks, WorkerAssignment}; use tokio::{task::JoinError, time::MissedTickBehavior}; use tokio_util::sync::CancellationToken; use tracing::{debug, instrument, warn}; -use crate::types::state::{ChunkRef, ChunkSet}; lazy_static::lazy_static! { static ref PARALLEL_QUERIES: usize = std::env::var("PARALLEL_QUERIES") @@ -209,6 +210,7 @@ impl Worker { } Err(e) => panic!("Couldn't check CU allocations: {e:?}"), }; + report_metrics(&result); if query_task.response_sender.send(result).is_err() { tracing::error!("Query result couldn't be sent"); } @@ -237,39 +239,69 @@ pub async fn run_query( #[inline(always)] fn assignment_to_chunk_set(assignment: WorkerAssignment) -> anyhow::Result { - assignment.dataset_chunks.into_iter().flat_map(|DatasetChunks { dataset_url, chunks }| { - let dataset = Arc::new(dataset_url); - chunks.into_iter().map(move |chunk_str| Ok(ChunkRef { - dataset: dataset.clone(), - chunk: chunk_str.parse()?, - })) - }).collect() + assignment + .dataset_chunks + .into_iter() + .flat_map( + |DatasetChunks { + dataset_url, + chunks, + }| { + let dataset = Arc::new(dataset_url); + chunks.into_iter().map(move |chunk_str| { + Ok(ChunkRef { + dataset: dataset.clone(), + chunk: chunk_str.parse()?, + }) + }) + }, + ) + .collect() +} + +fn report_metrics(result: &std::result::Result) { + let (status, query_result) = match result { + Ok(result) => (metrics::QueryStatus::Ok, Some(result)), + Err(QueryError::NoAllocation) => (metrics::QueryStatus::NoAllocation, None), + Err(QueryError::NotFound | QueryError::BadRequest(_)) => { + (metrics::QueryStatus::BadRequest, None) + } + Err(QueryError::Other(_) | QueryError::ServiceOverloaded) => { + (metrics::QueryStatus::ServerError, None) + } + }; + metrics::query_executed(status, query_result); } #[cfg(test)] mod tests { + use super::{ + assignment_to_chunk_set, Arc, ChunkRef, ChunkSet, DatasetChunks, WorkerAssignment, + }; use crate::storage::layout::DataChunk; - use super::{Arc, assignment_to_chunk_set, ChunkRef, ChunkSet, DatasetChunks, WorkerAssignment}; #[test] fn test_assignment_to_chunk_set() { let dataset_1 = "s3://ethereum-mainnet".to_string(); let dataset_2 = "s3://moonbeam-evm".to_string(); let assignment = WorkerAssignment { - dataset_chunks: vec![DatasetChunks { - dataset_url: dataset_1.clone(), - chunks: vec![ - "0000000000/0000000000-0000697499-f6275b81".to_string(), - "0000000000/0000697500-0000962739-4eff9837".to_string(), - "0007293060/0007300860-0007308199-6aeb1a56".to_string(), - ], - }, DatasetChunks { - dataset_url: dataset_2.clone(), - chunks: vec![ - "0000000000/0000190280-0000192679-0999c6ce".to_string(), - "0003510340/0003515280-0003518379-b8613f00".to_string(), - ], - }], + dataset_chunks: vec![ + DatasetChunks { + dataset_url: dataset_1.clone(), + chunks: vec![ + "0000000000/0000000000-0000697499-f6275b81".to_string(), + "0000000000/0000697500-0000962739-4eff9837".to_string(), + "0007293060/0007300860-0007308199-6aeb1a56".to_string(), + ], + }, + DatasetChunks { + dataset_url: dataset_2.clone(), + chunks: vec![ + "0000000000/0000190280-0000192679-0999c6ce".to_string(), + "0003510340/0003515280-0003518379-b8613f00".to_string(), + ], + }, + ], }; let dataset_1 = Arc::new(dataset_1); let dataset_2 = Arc::new(dataset_2); @@ -282,7 +314,8 @@ mod tests { last_hash: "f6275b81".into(), top: 0.into(), }, - }, ChunkRef { + }, + ChunkRef { dataset: dataset_1.clone(), chunk: DataChunk { last_block: 962739.into(), @@ -290,14 +323,15 @@ mod tests { last_hash: "4eff9837".into(), top: 0.into(), }, - }, ChunkRef { + }, + ChunkRef { dataset: dataset_1.clone(), chunk: DataChunk { last_block: 7308199.into(), first_block: 7300860.into(), last_hash: "6aeb1a56".into(), top: 7293060.into(), - } + }, }, ChunkRef { dataset: dataset_2.clone(), @@ -307,7 +341,8 @@ mod tests { last_hash: "0999c6ce".into(), top: 0.into(), }, - }, ChunkRef { + }, + ChunkRef { dataset: dataset_2.clone(), chunk: DataChunk { last_block: 3518379.into(), @@ -316,7 +351,9 @@ mod tests { top: 3510340.into(), }, }, - ].into_iter().collect(); + ] + .into_iter() + .collect(); let chunk_set = assignment_to_chunk_set(assignment).expect("Valid assignment"); assert_eq!(chunk_set, exp_chunk_set); diff --git a/src/main.rs b/src/main.rs index 4c8a196..e3b93bf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -127,6 +127,7 @@ async fn main() -> anyhow::Result<()> { ("peer_id".to_owned(), transport.local_peer_id().to_string()), ]); metrics::register_metrics(&mut metrics_registry, info); + metrics::register_p2p_metrics(&mut metrics_registry); let worker = Worker::new( state_manager.clone(), diff --git a/src/metrics.rs b/src/metrics.rs index 83465d0..d0d6917 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,28 +1,164 @@ +use std::fmt::Write; + +use prometheus_client::encoding::{EncodeLabelSet, LabelValueEncoder}; use prometheus_client::metrics::counter::Counter; -use prometheus_client::metrics::info::Info; -use prometheus_client::registry::Registry; +use prometheus_client::metrics::{family::Family, gauge::Gauge, histogram::Histogram, info::Info}; +use prometheus_client::registry::{Registry, Unit}; + +use crate::query::result::QueryResult; + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub enum WorkerStatus { + Starting, + NotRegistered, + UnsupportedVersion, + Jailed, + Active, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub enum QueryStatus { + Ok, + BadRequest, + NoAllocation, + ServerError, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +struct StatusLabels { + worker_status: WorkerStatus, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +struct QueryExecutedLabels { + status: QueryStatus, +} lazy_static::lazy_static! { - pub static ref QUERY_OK: Counter = Default::default(); - pub static ref BAD_REQUEST: Counter = Default::default(); - pub static ref SERVER_ERROR: Counter = Default::default(); + static ref STATUS: Family = Default::default(); + pub static ref CHUNKS_AVAILABLE: Gauge = Default::default(); + pub static ref CHUNKS_DOWNLOADING: Gauge = Default::default(); + pub static ref CHUNKS_PENDING: Gauge = Default::default(); + pub static ref CHUNKS_DOWNLOADED: Counter = Default::default(); + pub static ref CHUNKS_FAILED_DOWNLOAD: Counter = Default::default(); + pub static ref CHUNKS_REMOVED: Counter = Default::default(); + pub static ref STORED_BYTES: Gauge = Default::default(); + + static ref QUERY_EXECUTED: Family = Default::default(); + static ref QUERY_RESULT_SIZE: Histogram = Histogram::new(std::iter::empty()); + static ref READ_CHUNKS: Histogram = Histogram::new(std::iter::empty()); + pub static ref PENDING_QUERIES: Gauge = Default::default(); +} + +pub fn set_status(status: WorkerStatus) { + STATUS.clear(); + STATUS + .get_or_create(&StatusLabels { + worker_status: status, + }) + .set(1); +} + +pub fn query_executed(status: QueryStatus, result: Option<&QueryResult>) { + QUERY_EXECUTED + .get_or_create(&QueryExecutedLabels { status }) + .inc(); + if let Some(result) = result { + QUERY_RESULT_SIZE.observe(result.compressed_size as f64); + READ_CHUNKS.observe(result.num_read_chunks as f64); + } } pub fn register_metrics(registry: &mut Registry, info: Info>) { registry.register("worker_info", "Worker info", info); registry.register( - "num_successful_queries", - "Number of queries which executed successfully", - QUERY_OK.clone(), + "chunks_available", + "Number of available chunks", + CHUNKS_AVAILABLE.clone(), + ); + registry.register( + "chunks_downloading", + "Number of chunks being downloaded", + CHUNKS_DOWNLOADING.clone(), + ); + registry.register( + "chunks_pending", + "Number of chunks pending download", + CHUNKS_PENDING.clone(), + ); + registry.register( + "chunks_downloaded", + "Number of chunks downloaded", + CHUNKS_DOWNLOADED.clone(), + ); + registry.register( + "chunks_failed_download", + "Number of chunks failed to download", + CHUNKS_FAILED_DOWNLOAD.clone(), + ); + registry.register( + "chunks_removed", + "Number of removed chunks", + CHUNKS_REMOVED.clone(), + ); + registry.register_with_unit( + "used_storage", + "Total bytes stored in the data directory", + Unit::Bytes, + STORED_BYTES.clone(), + ); + + registry.register( + "num_queries_executed", + "Number of executed queries", + QUERY_EXECUTED.clone(), + ); + registry.register_with_unit( + "query_result_size", + "(Gzipped) result size of an executed query (bytes)", + Unit::Bytes, + QUERY_RESULT_SIZE.clone(), ); registry.register( - "num_bad_requests", - "Number of received invalid queries", - BAD_REQUEST.clone(), + "num_read_chunks", + "Number of chunks read during query execution", + READ_CHUNKS.clone(), ); +} + +pub fn register_p2p_metrics(registry: &mut Registry) { + registry.register("worker_status", "Status of the worker", STATUS.clone()); + set_status(WorkerStatus::Starting); registry.register( - "num_server_errors", - "Number of queries which resulted in server error", - SERVER_ERROR.clone(), + "pending_queries", + "Current size of the queries queue", + PENDING_QUERIES.clone(), ); } + +impl prometheus_client::encoding::EncodeLabelValue for WorkerStatus { + fn encode(&self, encoder: &mut LabelValueEncoder) -> Result<(), std::fmt::Error> { + let status = match self { + WorkerStatus::Starting => "starting", + WorkerStatus::NotRegistered => "not_registered", + WorkerStatus::UnsupportedVersion => "unsupported_version", + WorkerStatus::Jailed => "jailed", + WorkerStatus::Active => "active", + }; + encoder.write_str(status)?; + Ok(()) + } +} + +impl prometheus_client::encoding::EncodeLabelValue for QueryStatus { + fn encode(&self, encoder: &mut LabelValueEncoder) -> Result<(), std::fmt::Error> { + let status = match self { + QueryStatus::Ok => "ok", + QueryStatus::BadRequest => "bad_request", + QueryStatus::NoAllocation => "no_allocation", + QueryStatus::ServerError => "server_error", + }; + encoder.write_str(status)?; + Ok(()) + } +} diff --git a/src/query/result.rs b/src/query/result.rs index 0f95273..b7adaa1 100644 --- a/src/query/result.rs +++ b/src/query/result.rs @@ -9,6 +9,7 @@ pub struct QueryResult { pub raw_data: Vec, pub compressed_data: Vec, pub data_size: usize, + pub compressed_size: usize, pub data_sha3_256: Vec, pub num_read_chunks: usize, } @@ -24,6 +25,7 @@ impl QueryResult { let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default()); encoder.write_all(&data)?; let compressed_data = encoder.finish()?; + let compressed_size = compressed_data.len(); let hash = sha3_256(&data); @@ -31,6 +33,7 @@ impl QueryResult { raw_data: data, compressed_data, data_size, + compressed_size, data_sha3_256: hash, num_read_chunks, }) diff --git a/src/storage/manager.rs b/src/storage/manager.rs index dc15c2a..e3e0276 100644 --- a/src/storage/manager.rs +++ b/src/storage/manager.rs @@ -6,10 +6,10 @@ use parking_lot::Mutex; use tokio_util::sync::CancellationToken; use tracing::{debug, info, instrument, warn}; -use crate::types::{ +use crate::{metrics, types::{ dataset, state::{to_ranges, ChunkRef, ChunkSet, Ranges}, -}; +}}; use super::{ downloader::ChunkDownloader, @@ -50,6 +50,8 @@ impl StateManager { let mut downloader = ChunkDownloader::default(); loop { self.state.lock().report_status(); + let stored_bytes = get_directory_size(&self.fs.root); + metrics::STORED_BYTES.set(stored_bytes as i64); tokio::select! { _ = self.notify.notified() => {} @@ -57,11 +59,13 @@ impl StateManager { match result { Ok(()) => { self.state.lock().complete_download(&chunk, true); + metrics::CHUNKS_DOWNLOADED.inc(); } Err(e) => { // TODO: skip logging if the download was cancelled warn!("Failed to download chunk '{chunk}':\n{e:?}"); self.state.lock().complete_download(&chunk, false); + metrics::CHUNKS_FAILED_DOWNLOAD.inc(); } } } @@ -76,6 +80,7 @@ impl StateManager { info!("Removing chunk {chunk}"); self.drop_chunk(&chunk) .unwrap_or_else(|_| panic!("Couldn't remove chunk {chunk}")); + metrics::CHUNKS_REMOVED.inc(); } while downloader.download_count() < concurrency { diff --git a/src/storage/state.rs b/src/storage/state.rs index 20e994d..3f774b3 100644 --- a/src/storage/state.rs +++ b/src/storage/state.rs @@ -3,9 +3,12 @@ use std::{collections::BTreeMap, sync::Arc}; use tracing::{info, instrument}; use super::layout::{BlockNumber, DataChunk}; -use crate::types::{ - dataset::Dataset, - state::{ChunkRef, ChunkSet}, +use crate::{ + metrics, + types::{ + dataset::Dataset, + state::{ChunkRef, ChunkSet}, + }, }; #[derive(Debug, Default)] @@ -190,7 +193,10 @@ impl State { self.available.len(), self.downloading.len(), self.to_download.len() - ) + ); + metrics::CHUNKS_AVAILABLE.set(self.available.len() as i64); + metrics::CHUNKS_DOWNLOADING.set(self.downloading.len() as i64); + metrics::CHUNKS_PENDING.set(self.to_download.len() as i64); } } diff --git a/src/transport/p2p.rs b/src/transport/p2p.rs index f59a3fe..7e47654 100644 --- a/src/transport/p2p.rs +++ b/src/transport/p2p.rs @@ -167,12 +167,15 @@ impl> P2PTransport { match pong.status { Some(Status::NotRegistered(())) => { error!("Worker not registered on chain"); + metrics::set_status(metrics::WorkerStatus::NotRegistered); } Some(Status::UnsupportedVersion(())) => { error!("Worker version not supported by the scheduler"); + metrics::set_status(metrics::WorkerStatus::UnsupportedVersion); } Some(Status::Jailed(reason)) => { warn!("Worker jailed until the end of epoch: {reason}"); + metrics::set_status(metrics::WorkerStatus::Jailed); } Some(Status::Active(_)) => { error!("Deprecated pong message format"); @@ -182,6 +185,7 @@ impl> P2PTransport { self.assignments_tx .send(assignment) .expect("Assignment subscriber dropped"); + metrics::set_status(metrics::WorkerStatus::Active); } None => { warn!("Invalid pong message: no status field"); @@ -237,7 +241,6 @@ impl> P2PTransport { } else { Some(self.generate_log(&result, query, peer_id)) }; - report_metrics(&result); self.send_query_result(query_id, peer_id, result).await; if let Some(log) = log { let result = self.logs_storage.save_log(log).await; @@ -268,7 +271,9 @@ impl> P2PTransport { Err(mpsc::error::TrySendError::Closed(_)) => { panic!("Query subscriber dropped"); } - _ => {} + Ok(_) => { + metrics::PENDING_QUERIES.inc(); + } } } else { Err(QueryError::BadRequest( @@ -420,7 +425,9 @@ impl + Send> super::Transport for P2PTransport fn stream_queries(&self) -> impl futures::Stream + 'static { let rx = self.queries_rx.take().unwrap(); - ReceiverStream::new(rx) + ReceiverStream::new(rx).inspect(|_| { + metrics::PENDING_QUERIES.dec(); + }) } async fn run(&self, cancellation_token: CancellationToken) { @@ -454,16 +461,6 @@ fn bundle_messages( bundles } -fn report_metrics(result: &std::result::Result) { - match result { - Ok(_) => metrics::QUERY_OK.inc(), - Err(QueryError::NotFound | QueryError::NoAllocation | QueryError::BadRequest(_)) => { - metrics::BAD_REQUEST.inc() - } - Err(QueryError::Other(_) | QueryError::ServiceOverloaded) => metrics::SERVER_ERROR.inc(), - }; -} - #[cfg(test)] mod tests { use super::*;