From 4c3e9228da5e07ef3d0570143470d1b30fff017a Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Tue, 14 May 2024 14:43:18 -0400 Subject: [PATCH] Add Tokio runtimes metrics --- .cargo/config.toml | 4 + .github/workflows/cbench.yml | 4 +- .github/workflows/ci.yml | 2 +- .github/workflows/coverage.yml | 1 + Dockerfile | 3 +- quickwit/Cargo.lock | 13 +++ quickwit/Cargo.toml | 1 + quickwit/quickwit-cli/src/main.rs | 11 ++- quickwit/quickwit-cluster/src/metrics.rs | 5 ++ quickwit/quickwit-common/Cargo.toml | 1 + quickwit/quickwit-common/src/metrics.rs | 35 +++++++- quickwit/quickwit-common/src/runtimes.rs | 79 ++++++++++++++++++- quickwit/quickwit-common/src/thread_pool.rs | 2 +- .../quickwit-control-plane/src/metrics.rs | 4 + quickwit/quickwit-ingest/src/metrics.rs | 4 + quickwit/quickwit-search/src/metrics.rs | 1 + quickwit/quickwit-storage/src/metrics.rs | 8 ++ 17 files changed, 166 insertions(+), 12 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 567fba3b269..fef53742d4a 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,7 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] +rustdocflags = ["--cfg", "tokio_unstable"] + [target.x86_64-unknown-linux-gnu] # Targetting x86-64-v2 gives a ~2% performance boost while only # disallowing Intel CPUs older than 2008 and AMD CPUs older than 2011. diff --git a/.github/workflows/cbench.yml b/.github/workflows/cbench.yml index bbe8bb19da5..87ced8e41aa 100644 --- a/.github/workflows/cbench.yml +++ b/.github/workflows/cbench.yml @@ -13,11 +13,13 @@ on: # pull request. pull_request_target: +env: + RUSTFLAGS: "--cfg tokio_unstable" + concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} cancel-in-progress: true - jobs: tests: name: Benchmark diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d667aa84643..bbf084078da 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ env: QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev RUST_BACKTRACE: 1 RUSTDOCFLAGS: -Dwarnings -Arustdoc::private_intra_doc_links - RUSTFLAGS: -Dwarnings + RUSTFLAGS: --cfg tokio_unstable -Dwarnings # Ensures that we cancel running jobs for the same PR / same workflow. concurrency: diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 49ac1d517fc..15294ed9741 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -21,6 +21,7 @@ env: QW_S3_ENDPOINT: "http://localhost:4566" # Services are exposed as localhost because we are not running coverage in a container. QW_S3_FORCE_PATH_STYLE_ACCESS: 1 QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev + RUSTFLAGS: --cfg tokio_unstable jobs: test: diff --git a/Dockerfile b/Dockerfile index 796c6a421a1..87ff176f9e1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -39,7 +39,8 @@ COPY --from=ui-builder /quickwit/quickwit-ui/build /quickwit/quickwit-ui/build WORKDIR /quickwit RUN echo "Building workspace with feature(s) '$CARGO_FEATURES' and profile '$CARGO_PROFILE'" \ - && cargo build \ + && ENV RUSTFLAGS="--cfg tokio_unstable" \ + cargo build \ -p quickwit-cli \ --features $CARGO_FEATURES \ --bin quickwit \ diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index eb6fd30ea7c..da8d17e0b2a 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5706,6 +5706,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-metrics", "tokio-stream", "tonic", "tower", @@ -8380,6 +8381,18 @@ dependencies = [ "syn 2.0.59", ] +[[package]] +name = "tokio-metrics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eace09241d62c98b7eeb1107d4c5c64ca3bd7da92e8c218c153ab3a78f9be112" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 737d6e957b1..a207e2a7521 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -231,6 +231,7 @@ tikv-jemalloc-ctl = "0.5" tikv-jemallocator = "0.5" time = { version = "0.3", features = ["std", "formatting", "macros"] } tokio = { version = "1.37", features = ["full"] } +tokio-metrics = { version = "0.3.1", features = ["rt"] } tokio-stream = { version = "0.1", features = ["sync"] } tokio-util = { version = "0.7", features = ["full"] } toml = "0.7.6" diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 8ba975529a3..ad4f889c6ce 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -21,6 +21,7 @@ use std::collections::BTreeMap; +use anyhow::Context; use colored::Colorize; use opentelemetry::global; use quickwit_cli::busy_detector; @@ -29,17 +30,21 @@ use quickwit_cli::cli::{build_cli, CliCommand}; #[cfg(feature = "jemalloc")] use quickwit_cli::jemalloc::start_jemalloc_metrics_loop; use quickwit_cli::logger::setup_logging_and_tracing; +use quickwit_common::runtimes::scrape_tokio_runtime_metrics; use quickwit_serve::BuildInfo; use tracing::error; fn main() -> anyhow::Result<()> { - tokio::runtime::Builder::new_multi_thread() + let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .on_thread_unpark(busy_detector::thread_unpark) .on_thread_park(busy_detector::thread_park) .build() - .unwrap() - .block_on(main_impl()) + .context("failed to start main Tokio runtime")?; + + scrape_tokio_runtime_metrics(rt.handle(), "main"); + + rt.block_on(main_impl()) } fn register_build_info_metric() { diff --git a/quickwit/quickwit-cluster/src/metrics.rs b/quickwit/quickwit-cluster/src/metrics.rs index 8e5534f503a..a4ddf5506b7 100644 --- a/quickwit/quickwit-cluster/src/metrics.rs +++ b/quickwit/quickwit-cluster/src/metrics.rs @@ -93,26 +93,31 @@ impl Default for ClusterMetrics { "gossip_recv_messages_total", "Total number of gossip messages received.", "cluster", + &[], ), gossip_recv_bytes_total: new_counter( "gossip_recv_bytes_total", "Total amount of gossip data received in bytes.", "cluster", + &[], ), gossip_sent_messages_total: new_counter( "gossip_sent_messages_total", "Total number of gossip messages sent.", "cluster", + &[], ), gossip_sent_bytes_total: new_counter( "gossip_sent_bytes_total", "Total amount of gossip data sent in bytes.", "cluster", + &[], ), grpc_gossip_rounds_total: new_counter( "grpc_gossip_rounds_total", "Total number of gRPC gossip rounds performed with peer nodes.", "cluster", + &[], ), } } diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 9b5609b14e1..675522c7d9f 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -37,6 +37,7 @@ siphasher = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } +tokio-metrics ={ workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } tower = { workspace = true } diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 1e8c98f9157..00a600eea43 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -26,7 +26,7 @@ pub use prometheus::{ IntCounter, IntCounterVec as PrometheusIntCounterVec, IntGauge, IntGaugeVec as PrometheusIntGaugeVec, }; -use prometheus::{Encoder, HistogramOpts, Opts, TextEncoder}; +use prometheus::{Encoder, Gauge, HistogramOpts, Opts, TextEncoder}; #[derive(Clone)] pub struct HistogramVec { @@ -71,10 +71,20 @@ pub fn register_info(name: &'static str, help: &'static str, kvs: BTreeMap<&'sta prometheus::register(Box::new(counter)).expect("failed to register counter"); } -pub fn new_counter(name: &str, help: &str, subsystem: &str) -> IntCounter { +pub fn new_counter( + name: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], +) -> IntCounter { + let owned_const_labels: HashMap = const_labels + .iter() + .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) + .collect(); let counter_opts = Opts::new(name, help) .namespace("quickwit") - .subsystem(subsystem); + .subsystem(subsystem) + .const_labels(owned_const_labels); let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter"); prometheus::register(Box::new(counter.clone())).expect("failed to register counter"); counter @@ -104,6 +114,25 @@ pub fn new_counter_vec( IntCounterVec { underlying } } +pub fn new_float_gauge( + name: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], +) -> Gauge { + let owned_const_labels: HashMap = const_labels + .iter() + .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) + .collect(); + let gauge_opts = Opts::new(name, help) + .namespace("quickwit") + .subsystem(subsystem) + .const_labels(owned_const_labels); + let gauge = Gauge::with_opts(gauge_opts).expect("failed to create float gauge"); + prometheus::register(Box::new(gauge.clone())).expect("failed to register float gauge"); + gauge +} + pub fn new_gauge( name: &str, help: &str, diff --git a/quickwit/quickwit-common/src/runtimes.rs b/quickwit/quickwit-common/src/runtimes.rs index 42435ce6662..9f03fb1b4dc 100644 --- a/quickwit/quickwit-common/src/runtimes.rs +++ b/quickwit/quickwit-common/src/runtimes.rs @@ -19,9 +19,14 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; use once_cell::sync::OnceCell; +use prometheus::{Gauge, IntCounter, IntGauge}; use tokio::runtime::Runtime; +use tokio_metrics::{RuntimeMetrics, RuntimeMonitor}; + +use crate::metrics::{new_counter, new_float_gauge, new_gauge}; static RUNTIMES: OnceCell> = OnceCell::new(); @@ -63,7 +68,7 @@ impl RuntimesConfig { } pub fn with_num_cpus(num_cpus: usize) -> Self { - // Non blocking task are supposed to be io intensive, and not require many threads... + // Non blocking task are supposed to be io intensive, and not require many threads... let num_threads_non_blocking = if num_cpus > 6 { 2 } else { 1 }; // On the other hand the blocking actors are cpu intensive. We allocate // almost all of the threads to them. @@ -83,7 +88,8 @@ impl Default for RuntimesConfig { } fn start_runtimes(config: RuntimesConfig) -> HashMap { - let mut runtimes = HashMap::default(); + let mut runtimes = HashMap::with_capacity(2); + let blocking_runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(config.num_threads_blocking) .thread_name_fn(|| { @@ -94,7 +100,10 @@ fn start_runtimes(config: RuntimesConfig) -> HashMap { .enable_all() .build() .unwrap(); + + scrape_tokio_runtime_metrics(blocking_runtime.handle(), "blocking"); runtimes.insert(RuntimeType::Blocking, blocking_runtime); + let non_blocking_runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(config.num_threads_non_blocking) .thread_name_fn(|| { @@ -105,7 +114,10 @@ fn start_runtimes(config: RuntimesConfig) -> HashMap { .enable_all() .build() .unwrap(); + + scrape_tokio_runtime_metrics(non_blocking_runtime.handle(), "non_blocking"); runtimes.insert(RuntimeType::NonBlocking, non_blocking_runtime); + runtimes } @@ -135,6 +147,69 @@ impl RuntimeType { } } +/// Spawns a background task +pub fn scrape_tokio_runtime_metrics(handle: &tokio::runtime::Handle, label: &'static str) { + let runtime_monitor = RuntimeMonitor::new(handle); + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + let prometheus_runtime_metrics = PrometheusRuntimeMetrics::new(label); + + for tokio_runtime_metrics in runtime_monitor.intervals() { + interval.tick().await; + prometheus_runtime_metrics.update(&tokio_runtime_metrics); + } + }); +} + +struct PrometheusRuntimeMetrics { + scheduled_tasks: IntGauge, + worker_busy_duration_milliseconds_total: IntCounter, + worker_busy_ratio: Gauge, + worker_threads: IntGauge, +} + +impl PrometheusRuntimeMetrics { + pub fn new(label: &'static str) -> Self { + Self { + scheduled_tasks: new_gauge( + "tokio.scheduled_tasks", + "The total number of tasks currently scheduled in workers' local queues.", + "runtime", + &[("runtime_type", label)], + ), + worker_busy_duration_milliseconds_total: new_counter( + "tokio.worker_busy_duration_milliseconds_total", + " The amount of time worker threads were busy.", + "runtime", + &[("runtime_type", label)], + ), + worker_busy_ratio: new_float_gauge( + "tokio.worker_busy_ratio", + "The ratio of time worker threads were busy since the last time runtime metrics \ + were collected.", + "runtime", + &[("runtime_type", label)], + ), + worker_threads: new_gauge( + "tokio.worker_threads", + "The number of worker threads used by the runtime.", + "runtime", + &[("runtime_type", label)], + ), + } + } + + pub fn update(&self, runtime_metrics: &RuntimeMetrics) { + self.scheduled_tasks + .set(runtime_metrics.total_local_queue_depth as i64); + self.worker_busy_duration_milliseconds_total + .inc_by(runtime_metrics.total_busy_duration.as_millis() as u64); + self.worker_busy_ratio.set(runtime_metrics.busy_ratio()); + self.worker_threads + .set(runtime_metrics.workers_count as i64); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/quickwit/quickwit-common/src/thread_pool.rs b/quickwit/quickwit-common/src/thread_pool.rs index f7469c24374..03d1179e826 100644 --- a/quickwit/quickwit-common/src/thread_pool.rs +++ b/quickwit/quickwit-common/src/thread_pool.rs @@ -51,7 +51,7 @@ impl ThreadPool { } let thread_pool = rayon_pool_builder .build() - .expect("failed to spawn the spawning pool"); + .expect("failed to spawn thread pool"); let ongoing_tasks = THREAD_POOL_METRICS.ongoing_tasks.with_label_values([name]); let pending_tasks = THREAD_POOL_METRICS.pending_tasks.with_label_values([name]); ThreadPool { diff --git a/quickwit/quickwit-control-plane/src/metrics.rs b/quickwit/quickwit-control-plane/src/metrics.rs index 564576429c3..f2976f093e5 100644 --- a/quickwit/quickwit-control-plane/src/metrics.rs +++ b/quickwit/quickwit-control-plane/src/metrics.rs @@ -65,23 +65,27 @@ impl Default for ControlPlaneMetrics { "restart_total", "Number of control plane restart.", "control_plane", + &[], ), schedule_total: new_counter( "schedule_total", "Number of control plane `schedule` operations.", "control_plane", + &[], ), metastore_error_aborted: new_counter( "metastore_error_aborted", "Number of aborted metastore transaction (= do not trigger a control plane \ restart)", "control_plane", + &[], ), metastore_error_maybe_executed: new_counter( "metastore_error_maybe_executed", "Number of metastore transaction with an uncertain outcome (= do trigger a \ control plane restart)", "control_plane", + &[], ), open_shards_total: new_gauge_vec( "open_shards_total", diff --git a/quickwit/quickwit-ingest/src/metrics.rs b/quickwit/quickwit-ingest/src/metrics.rs index 4df779a37cb..3fc8fef7863 100644 --- a/quickwit/quickwit-ingest/src/metrics.rs +++ b/quickwit/quickwit-ingest/src/metrics.rs @@ -35,21 +35,25 @@ impl Default for IngestMetrics { "ingested_num_bytes", "Total size of the docs ingested in bytes", "ingest", + &[], ), ingested_num_docs: new_counter( "ingested_num_docs", "Number of docs received to be ingested", "ingest", + &[], ), replicated_num_bytes_total: new_counter( "replicated_num_bytes_total", "Total size in bytes of the replicated docs.", "ingest", + &[], ), replicated_num_docs_total: new_counter( "replicated_num_docs_total", "Total number of docs replicated.", "ingest", + &[], ), queue_count: new_gauge( "queue_count", diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index 4031230f085..718d7b307b1 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -36,6 +36,7 @@ impl Default for SearchMetrics { "leaf_searches_splits_total", "Number of leaf searches (count of splits) started.", "search", + &[], ), leaf_search_split_duration_secs: new_histogram( "leaf_search_split_duration_secs", diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 6589d3cf133..9fd94de575e 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -51,27 +51,32 @@ impl Default for StorageMetrics { "object_storage_gets_total", "Number of objects fetched.", "storage", + &[], ), object_storage_put_total: new_counter( "object_storage_puts_total", "Number of objects uploaded. May differ from object_storage_requests_parts due to \ multipart upload.", "storage", + &[], ), object_storage_put_parts: new_counter( "object_storage_puts_parts", "Number of object parts uploaded.", "", + &[], ), object_storage_download_num_bytes: new_counter( "object_storage_download_num_bytes", "Amount of data downloaded from an object storage.", "storage", + &[], ), object_storage_upload_num_bytes: new_counter( "object_storage_upload_num_bytes", "Amount of data uploaded to an object storage.", "storage", + &[], ), } } @@ -109,16 +114,19 @@ impl CacheMetrics { "cache_hits_total", "Number of {component_name} cache hits", &namespace, + &[], ), hits_num_bytes: new_counter( "cache_hits_bytes", "Number of {component_name} cache hits in bytes", &namespace, + &[], ), misses_num_items: new_counter( "cache_misses_total", "Number of {component_name} cache misses", &namespace, + &[], ), } }