diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index c6b1dd8c18a..96b42186f17 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -216,6 +216,43 @@ impl<'a> Drop for GaugeGuard<'a> { } } +pub struct OwnedGaugeGuard { + gauge: IntGauge, + delta: i64, +} + +impl std::fmt::Debug for OwnedGaugeGuard { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + self.delta.fmt(f) + } +} + +impl OwnedGaugeGuard { + pub fn from_gauge(gauge: IntGauge) -> Self { + Self { gauge, delta: 0i64 } + } + + pub fn get(&self) -> i64 { + self.delta + } + + pub fn add(&mut self, delta: i64) { + self.gauge.add(delta); + self.delta += delta; + } + + pub fn sub(&mut self, delta: i64) { + self.gauge.sub(delta); + self.delta -= delta; + } +} + +impl Drop for OwnedGaugeGuard { + fn drop(&mut self) { + self.gauge.sub(self.delta) + } +} + pub fn metrics_text_payload() -> String { let metric_families = prometheus::gather(); let mut buffer = Vec::new(); diff --git a/quickwit/quickwit-common/src/thread_pool.rs b/quickwit/quickwit-common/src/thread_pool.rs index a1859ff9e9d..f7469c24374 100644 --- a/quickwit/quickwit-common/src/thread_pool.rs +++ b/quickwit/quickwit-common/src/thread_pool.rs @@ -26,35 +26,38 @@ use prometheus::IntGauge; use tokio::sync::oneshot; use tracing::error; -use crate::metrics::{new_gauge_vec, GaugeGuard, IntGaugeVec}; +use crate::metrics::{new_gauge_vec, GaugeGuard, IntGaugeVec, OwnedGaugeGuard}; -/// An executor backed by a thread pool to run CPU intensive tasks. +/// An executor backed by a thread pool to run CPU-intensive tasks. /// /// tokio::spawn_blocking should only used for IO-bound tasks, as it has not limit on its /// thread count. #[derive(Clone)] pub struct ThreadPool { thread_pool: Arc, - active_threads_gauge: IntGauge, + ongoing_tasks: IntGauge, + pending_tasks: IntGauge, } impl ThreadPool { pub fn new(name: &'static str, num_threads_opt: Option) -> ThreadPool { let mut rayon_pool_builder = rayon::ThreadPoolBuilder::new() .thread_name(move |thread_id| format!("quickwit-{name}-{thread_id}")) - .panic_handler(|_my_panic| { - error!("task running in the quickwit search pool panicked"); + .panic_handler(move |_my_panic| { + error!("task running in the quickwit {name} thread pool panicked"); }); if let Some(num_threads) = num_threads_opt { rayon_pool_builder = rayon_pool_builder.num_threads(num_threads); } let thread_pool = rayon_pool_builder .build() - .expect("Failed to spawn the spawning pool"); - let active_threads_gauge = ACTIVE_THREAD_COUNT.with_label_values([name]); + .expect("failed to spawn the spawning pool"); + let ongoing_tasks = THREAD_POOL_METRICS.ongoing_tasks.with_label_values([name]); + let pending_tasks = THREAD_POOL_METRICS.pending_tasks.with_label_values([name]); ThreadPool { thread_pool: Arc::new(thread_pool), - active_threads_gauge, + ongoing_tasks, + pending_tasks, } } @@ -66,17 +69,17 @@ impl ThreadPool { /// /// Here are two important differences however: /// - /// 1) The task is running on a rayon thread pool managed by quickwit. - /// This pool is specifically used only to run CPU intensive work + /// 1) The task runs on a rayon thread pool managed by Quickwit. + /// This pool is specifically used only to run CPU-intensive work /// and is configured to contain `num_cpus` cores. /// /// 2) Before the task is effectively scheduled, we check that - /// the spawner is still interested by its result. + /// the spawner is still interested in its result. /// /// It is therefore required to `await` the result of this - /// function to get anywork done. + /// function to get any work done. /// - /// This is nice, because it makes work that has been scheduled + /// This is nice because it makes work that has been scheduled /// but is not running yet "cancellable". pub fn run_cpu_intensive( &self, @@ -87,15 +90,19 @@ impl ThreadPool { R: Send + 'static, { let span = tracing::Span::current(); - let gauge = self.active_threads_gauge.clone(); + let ongoing_tasks = self.ongoing_tasks.clone(); + let mut pending_tasks_guard: OwnedGaugeGuard = + OwnedGaugeGuard::from_gauge(self.pending_tasks.clone()); + pending_tasks_guard.add(1i64); let (tx, rx) = oneshot::channel(); self.thread_pool.spawn(move || { + drop(pending_tasks_guard); if tx.is_closed() { return; } let _guard = span.enter(); - let mut active_thread_guard = GaugeGuard::from_gauge(&gauge); - active_thread_guard.add(1i64); + let mut ongoing_task_guard = GaugeGuard::from_gauge(&ongoing_tasks); + ongoing_task_guard.add(1i64); let result = cpu_heavy_task(); let _ = tx.send(result); }); @@ -103,10 +110,10 @@ impl ThreadPool { } } -/// Run a small (<200ms) CPU intensive task on a dedicated thread pool with a few threads. +/// Run a small (<200ms) CPU-intensive task on a dedicated thread pool with a few threads. /// -/// When running blocking io (or side-effects in general), prefer use `tokio::spawn_blocking` -/// instead. When running long tasks or a set of tasks that you expect should take more than 33% of +/// When running blocking io (or side-effects in general), prefer using `tokio::spawn_blocking` +/// instead. When running long tasks or a set of tasks that you expect to take more than 33% of /// your vCPUs, use a dedicated thread/runtime or executor instead. /// /// Disclaimer: The function will no be executed if the Future is dropped. @@ -130,21 +137,39 @@ pub struct Panicked; impl fmt::Display for Panicked { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Scheduled job panicked") + write!(f, "scheduled task panicked") } } impl std::error::Error for Panicked {} -pub static ACTIVE_THREAD_COUNT: Lazy> = Lazy::new(|| { - new_gauge_vec( - "active_thread_count", - "Number of active threads in a given thread pool.", - "thread_pool", - &[], - ["pool"], - ) -}); +struct ThreadPoolMetrics { + ongoing_tasks: IntGaugeVec<1>, + pending_tasks: IntGaugeVec<1>, +} + +impl Default for ThreadPoolMetrics { + fn default() -> Self { + ThreadPoolMetrics { + ongoing_tasks: new_gauge_vec( + "ongoing_tasks", + "number of tasks being currently processed by threads in the thread pool", + "thread_pool", + &[], + ["pool"], + ), + pending_tasks: new_gauge_vec( + "pending_tasks", + "number of tasks waiting in the queue before being processed by the thread pool", + "thread_pool", + &[], + ["pool"], + ), + } + } +} + +static THREAD_POOL_METRICS: Lazy = Lazy::new(ThreadPoolMetrics::default); #[cfg(test)] mod tests { diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 7be3f885bb9..ed6da6347aa 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -184,10 +184,10 @@ async fn fetch_docs_in_split( .context("open-index-for-split")?; // we add an executor here, we could add it in open_index_with_caches, though we should verify // the side-effect before - let executor_tantivy = crate::search_thread_pool() + let tantivy_executor = crate::search_thread_pool() .get_underlying_rayon_thread_pool() .into(); - index.set_executor(executor_tantivy); + index.set_executor(tantivy_executor); let index_reader = index .reader_builder() // the docs are presorted so a cache size of NUM_CONCURRENT_REQUESTS is fine diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index b67fcecc164..c8f02a391c2 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -97,8 +97,8 @@ pub use crate::service::{MockSearchService, SearchService, SearchServiceImpl}; pub type SearcherPool = Pool; fn search_thread_pool() -> &'static ThreadPool { - static SEARCH_EXECUTOR: OnceLock = OnceLock::new(); - SEARCH_EXECUTOR.get_or_init(|| ThreadPool::new("search", None)) + static SEARCH_THREAD_POOL: OnceLock = OnceLock::new(); + SEARCH_THREAD_POOL.get_or_init(|| ThreadPool::new("search", None)) } /// GlobalDocAddress serves as a hit address.