Skip to content

Commit

Permalink
Add more prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
kalabukdima committed May 3, 2024
1 parent a6b0ab7 commit 52c0a0f
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 63 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "worker-rust"
version = "0.3.5"
version = "0.3.6"
edition = "2021"

[[bin]]
Expand Down
93 changes: 65 additions & 28 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::types::state::{ChunkRef, ChunkSet};
use crate::{
gateway_allocations::{self, allocations_checker::AllocationsChecker},
metrics,
query::{self, error::QueryError, eth::BatchRequest, result::QueryResult},
storage::manager::StateManager,
transport::Transport,
Expand All @@ -12,7 +14,6 @@ use subsquid_messages::{DatasetChunks, WorkerAssignment};
use tokio::{task::JoinError, time::MissedTickBehavior};
use tokio_util::sync::CancellationToken;
use tracing::{debug, instrument, warn};
use crate::types::state::{ChunkRef, ChunkSet};

lazy_static::lazy_static! {
static ref PARALLEL_QUERIES: usize = std::env::var("PARALLEL_QUERIES")
Expand Down Expand Up @@ -209,6 +210,7 @@ impl<T: Transport + 'static> Worker<T> {
}
Err(e) => panic!("Couldn't check CU allocations: {e:?}"),
};
report_metrics(&result);
if query_task.response_sender.send(result).is_err() {
tracing::error!("Query result couldn't be sent");
}
Expand Down Expand Up @@ -237,39 +239,69 @@ pub async fn run_query(

#[inline(always)]
fn assignment_to_chunk_set(assignment: WorkerAssignment) -> anyhow::Result<ChunkSet> {
assignment.dataset_chunks.into_iter().flat_map(|DatasetChunks { dataset_url, chunks }| {
let dataset = Arc::new(dataset_url);
chunks.into_iter().map(move |chunk_str| Ok(ChunkRef {
dataset: dataset.clone(),
chunk: chunk_str.parse()?,
}))
}).collect()
assignment
.dataset_chunks
.into_iter()
.flat_map(
|DatasetChunks {
dataset_url,
chunks,
}| {
let dataset = Arc::new(dataset_url);
chunks.into_iter().map(move |chunk_str| {
Ok(ChunkRef {
dataset: dataset.clone(),
chunk: chunk_str.parse()?,
})
})
},
)
.collect()
}

fn report_metrics(result: &std::result::Result<QueryResult, QueryError>) {
let (status, query_result) = match result {
Ok(result) => (metrics::QueryStatus::Ok, Some(result)),
Err(QueryError::NoAllocation) => (metrics::QueryStatus::NoAllocation, None),
Err(QueryError::NotFound | QueryError::BadRequest(_)) => {
(metrics::QueryStatus::BadRequest, None)
}
Err(QueryError::Other(_) | QueryError::ServiceOverloaded) => {
(metrics::QueryStatus::ServerError, None)
}
};
metrics::query_executed(status, query_result);
}

#[cfg(test)]
mod tests {
use super::{
assignment_to_chunk_set, Arc, ChunkRef, ChunkSet, DatasetChunks, WorkerAssignment,
};
use crate::storage::layout::DataChunk;
use super::{Arc, assignment_to_chunk_set, ChunkRef, ChunkSet, DatasetChunks, WorkerAssignment};

#[test]
fn test_assignment_to_chunk_set() {
let dataset_1 = "s3://ethereum-mainnet".to_string();
let dataset_2 = "s3://moonbeam-evm".to_string();
let assignment = WorkerAssignment {
dataset_chunks: vec![DatasetChunks {
dataset_url: dataset_1.clone(),
chunks: vec![
"0000000000/0000000000-0000697499-f6275b81".to_string(),
"0000000000/0000697500-0000962739-4eff9837".to_string(),
"0007293060/0007300860-0007308199-6aeb1a56".to_string(),
],
}, DatasetChunks {
dataset_url: dataset_2.clone(),
chunks: vec![
"0000000000/0000190280-0000192679-0999c6ce".to_string(),
"0003510340/0003515280-0003518379-b8613f00".to_string(),
],
}],
dataset_chunks: vec![
DatasetChunks {
dataset_url: dataset_1.clone(),
chunks: vec![
"0000000000/0000000000-0000697499-f6275b81".to_string(),
"0000000000/0000697500-0000962739-4eff9837".to_string(),
"0007293060/0007300860-0007308199-6aeb1a56".to_string(),
],
},
DatasetChunks {
dataset_url: dataset_2.clone(),
chunks: vec![
"0000000000/0000190280-0000192679-0999c6ce".to_string(),
"0003510340/0003515280-0003518379-b8613f00".to_string(),
],
},
],
};
let dataset_1 = Arc::new(dataset_1);
let dataset_2 = Arc::new(dataset_2);
Expand All @@ -282,22 +314,24 @@ mod tests {
last_hash: "f6275b81".into(),
top: 0.into(),
},
}, ChunkRef {
},
ChunkRef {
dataset: dataset_1.clone(),
chunk: DataChunk {
last_block: 962739.into(),
first_block: 697500.into(),
last_hash: "4eff9837".into(),
top: 0.into(),
},
}, ChunkRef {
},
ChunkRef {
dataset: dataset_1.clone(),
chunk: DataChunk {
last_block: 7308199.into(),
first_block: 7300860.into(),
last_hash: "6aeb1a56".into(),
top: 7293060.into(),
}
},
},
ChunkRef {
dataset: dataset_2.clone(),
Expand All @@ -307,7 +341,8 @@ mod tests {
last_hash: "0999c6ce".into(),
top: 0.into(),
},
}, ChunkRef {
},
ChunkRef {
dataset: dataset_2.clone(),
chunk: DataChunk {
last_block: 3518379.into(),
Expand All @@ -316,7 +351,9 @@ mod tests {
top: 3510340.into(),
},
},
].into_iter().collect();
]
.into_iter()
.collect();

let chunk_set = assignment_to_chunk_set(assignment).expect("Valid assignment");
assert_eq!(chunk_set, exp_chunk_set);
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ async fn main() -> anyhow::Result<()> {
("peer_id".to_owned(), transport.local_peer_id().to_string()),
]);
metrics::register_metrics(&mut metrics_registry, info);
metrics::register_p2p_metrics(&mut metrics_registry);

let worker = Worker::new(
state_manager.clone(),
Expand Down
164 changes: 150 additions & 14 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,164 @@
use std::fmt::Write;

use prometheus_client::encoding::{EncodeLabelSet, LabelValueEncoder};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::info::Info;
use prometheus_client::registry::Registry;
use prometheus_client::metrics::{family::Family, gauge::Gauge, histogram::Histogram, info::Info};
use prometheus_client::registry::{Registry, Unit};

use crate::query::result::QueryResult;

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum WorkerStatus {
Starting,
NotRegistered,
UnsupportedVersion,
Jailed,
Active,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum QueryStatus {
Ok,
BadRequest,
NoAllocation,
ServerError,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct StatusLabels {
worker_status: WorkerStatus,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct QueryExecutedLabels {
status: QueryStatus,
}

lazy_static::lazy_static! {
pub static ref QUERY_OK: Counter = Default::default();
pub static ref BAD_REQUEST: Counter = Default::default();
pub static ref SERVER_ERROR: Counter = Default::default();
static ref STATUS: Family<StatusLabels, Gauge> = Default::default();
pub static ref CHUNKS_AVAILABLE: Gauge = Default::default();
pub static ref CHUNKS_DOWNLOADING: Gauge = Default::default();
pub static ref CHUNKS_PENDING: Gauge = Default::default();
pub static ref CHUNKS_DOWNLOADED: Counter = Default::default();
pub static ref CHUNKS_FAILED_DOWNLOAD: Counter = Default::default();
pub static ref CHUNKS_REMOVED: Counter = Default::default();
pub static ref STORED_BYTES: Gauge = Default::default();

static ref QUERY_EXECUTED: Family<QueryExecutedLabels, Counter> = Default::default();
static ref QUERY_RESULT_SIZE: Histogram = Histogram::new(std::iter::empty());
static ref READ_CHUNKS: Histogram = Histogram::new(std::iter::empty());
pub static ref PENDING_QUERIES: Gauge = Default::default();
}

pub fn set_status(status: WorkerStatus) {
STATUS.clear();
STATUS
.get_or_create(&StatusLabels {
worker_status: status,
})
.set(1);
}

pub fn query_executed(status: QueryStatus, result: Option<&QueryResult>) {
QUERY_EXECUTED
.get_or_create(&QueryExecutedLabels { status })
.inc();
if let Some(result) = result {
QUERY_RESULT_SIZE.observe(result.compressed_size as f64);
READ_CHUNKS.observe(result.num_read_chunks as f64);
}
}

pub fn register_metrics(registry: &mut Registry, info: Info<Vec<(String, String)>>) {
registry.register("worker_info", "Worker info", info);
registry.register(
"num_successful_queries",
"Number of queries which executed successfully",
QUERY_OK.clone(),
"chunks_available",
"Number of available chunks",
CHUNKS_AVAILABLE.clone(),
);
registry.register(
"chunks_downloading",
"Number of chunks being downloaded",
CHUNKS_DOWNLOADING.clone(),
);
registry.register(
"chunks_pending",
"Number of chunks pending download",
CHUNKS_PENDING.clone(),
);
registry.register(
"chunks_downloaded",
"Number of chunks downloaded",
CHUNKS_DOWNLOADED.clone(),
);
registry.register(
"chunks_failed_download",
"Number of chunks failed to download",
CHUNKS_FAILED_DOWNLOAD.clone(),
);
registry.register(
"chunks_removed",
"Number of removed chunks",
CHUNKS_REMOVED.clone(),
);
registry.register_with_unit(
"used_storage",
"Total bytes stored in the data directory",
Unit::Bytes,
STORED_BYTES.clone(),
);

registry.register(
"num_queries_executed",
"Number of executed queries",
QUERY_EXECUTED.clone(),
);
registry.register_with_unit(
"query_result_size",
"(Gzipped) result size of an executed query (bytes)",
Unit::Bytes,
QUERY_RESULT_SIZE.clone(),
);
registry.register(
"num_bad_requests",
"Number of received invalid queries",
BAD_REQUEST.clone(),
"num_read_chunks",
"Number of chunks read during query execution",
READ_CHUNKS.clone(),
);
}

pub fn register_p2p_metrics(registry: &mut Registry) {
registry.register("worker_status", "Status of the worker", STATUS.clone());
set_status(WorkerStatus::Starting);
registry.register(
"num_server_errors",
"Number of queries which resulted in server error",
SERVER_ERROR.clone(),
"pending_queries",
"Current size of the queries queue",
PENDING_QUERIES.clone(),
);
}

impl prometheus_client::encoding::EncodeLabelValue for WorkerStatus {
fn encode(&self, encoder: &mut LabelValueEncoder) -> Result<(), std::fmt::Error> {
let status = match self {
WorkerStatus::Starting => "starting",
WorkerStatus::NotRegistered => "not_registered",
WorkerStatus::UnsupportedVersion => "unsupported_version",
WorkerStatus::Jailed => "jailed",
WorkerStatus::Active => "active",
};
encoder.write_str(status)?;
Ok(())
}
}

impl prometheus_client::encoding::EncodeLabelValue for QueryStatus {
fn encode(&self, encoder: &mut LabelValueEncoder) -> Result<(), std::fmt::Error> {
let status = match self {
QueryStatus::Ok => "ok",
QueryStatus::BadRequest => "bad_request",
QueryStatus::NoAllocation => "no_allocation",
QueryStatus::ServerError => "server_error",
};
encoder.write_str(status)?;
Ok(())
}
}
3 changes: 3 additions & 0 deletions src/query/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub struct QueryResult {
pub raw_data: Vec<u8>,
pub compressed_data: Vec<u8>,
pub data_size: usize,
pub compressed_size: usize,
pub data_sha3_256: Vec<u8>,
pub num_read_chunks: usize,
}
Expand All @@ -24,13 +25,15 @@ impl QueryResult {
let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default());
encoder.write_all(&data)?;
let compressed_data = encoder.finish()?;
let compressed_size = compressed_data.len();

let hash = sha3_256(&data);

Ok(Self {
raw_data: data,
compressed_data,
data_size,
compressed_size,
data_sha3_256: hash,
num_read_chunks,
})
Expand Down
Loading

0 comments on commit 52c0a0f

Please sign in to comment.