Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
fulmicoton and guilload committed May 11, 2024
1 parent 006eb73 commit 327a256
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 33 deletions.
37 changes: 37 additions & 0 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,43 @@ impl<'a> Drop for GaugeGuard<'a> {
}
}

pub struct OwnedGaugeGuard {
gauge: IntGauge,
delta: i64,
}

impl std::fmt::Debug for OwnedGaugeGuard {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
self.delta.fmt(f)
}
}

impl OwnedGaugeGuard {
pub fn from_gauge(gauge: IntGauge) -> Self {
Self { gauge, delta: 0i64 }
}

pub fn get(&self) -> i64 {
self.delta
}

pub fn add(&mut self, delta: i64) {
self.gauge.add(delta);
self.delta += delta;
}

pub fn sub(&mut self, delta: i64) {
self.gauge.sub(delta);
self.delta -= delta;
}
}

impl Drop for OwnedGaugeGuard {
fn drop(&mut self) {
self.gauge.sub(self.delta)
}
}

pub fn metrics_text_payload() -> String {
let metric_families = prometheus::gather();
let mut buffer = Vec::new();
Expand Down
83 changes: 54 additions & 29 deletions quickwit/quickwit-common/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,38 @@ use prometheus::IntGauge;
use tokio::sync::oneshot;
use tracing::error;

use crate::metrics::{new_gauge_vec, GaugeGuard, IntGaugeVec};
use crate::metrics::{new_gauge_vec, GaugeGuard, IntGaugeVec, OwnedGaugeGuard};

/// An executor backed by a thread pool to run CPU intensive tasks.
/// An executor backed by a thread pool to run CPU-intensive tasks.
///
/// tokio::spawn_blocking should only used for IO-bound tasks, as it has not limit on its
/// thread count.
#[derive(Clone)]
pub struct ThreadPool {
thread_pool: Arc<rayon::ThreadPool>,
active_threads_gauge: IntGauge,
ongoing_tasks: IntGauge,
pending_tasks: IntGauge,
}

impl ThreadPool {
pub fn new(name: &'static str, num_threads_opt: Option<usize>) -> ThreadPool {
let mut rayon_pool_builder = rayon::ThreadPoolBuilder::new()
.thread_name(move |thread_id| format!("quickwit-{name}-{thread_id}"))
.panic_handler(|_my_panic| {
error!("task running in the quickwit search pool panicked");
.panic_handler(move |_my_panic| {
error!("task running in the quickwit {name} thread pool panicked");
});
if let Some(num_threads) = num_threads_opt {
rayon_pool_builder = rayon_pool_builder.num_threads(num_threads);
}
let thread_pool = rayon_pool_builder
.build()
.expect("Failed to spawn the spawning pool");
let active_threads_gauge = ACTIVE_THREAD_COUNT.with_label_values([name]);
.expect("failed to spawn the spawning pool");
let ongoing_tasks = THREAD_POOL_METRICS.ongoing_tasks.with_label_values([name]);
let pending_tasks = THREAD_POOL_METRICS.pending_tasks.with_label_values([name]);
ThreadPool {
thread_pool: Arc::new(thread_pool),
active_threads_gauge,
ongoing_tasks,
pending_tasks,
}
}

Expand All @@ -66,17 +69,17 @@ impl ThreadPool {
///
/// Here are two important differences however:
///
/// 1) The task is running on a rayon thread pool managed by quickwit.
/// This pool is specifically used only to run CPU intensive work
/// 1) The task runs on a rayon thread pool managed by Quickwit.
/// This pool is specifically used only to run CPU-intensive work
/// and is configured to contain `num_cpus` cores.
///
/// 2) Before the task is effectively scheduled, we check that
/// the spawner is still interested by its result.
/// the spawner is still interested in its result.
///
/// It is therefore required to `await` the result of this
/// function to get anywork done.
/// function to get any work done.
///
/// This is nice, because it makes work that has been scheduled
/// This is nice because it makes work that has been scheduled
/// but is not running yet "cancellable".
pub fn run_cpu_intensive<F, R>(
&self,
Expand All @@ -87,26 +90,30 @@ impl ThreadPool {
R: Send + 'static,
{
let span = tracing::Span::current();
let gauge = self.active_threads_gauge.clone();
let ongoing_tasks = self.ongoing_tasks.clone();
let mut pending_tasks_guard: OwnedGaugeGuard =
OwnedGaugeGuard::from_gauge(self.pending_tasks.clone());
pending_tasks_guard.add(1i64);
let (tx, rx) = oneshot::channel();
self.thread_pool.spawn(move || {
drop(pending_tasks_guard);
if tx.is_closed() {
return;
}
let _guard = span.enter();
let mut active_thread_guard = GaugeGuard::from_gauge(&gauge);
active_thread_guard.add(1i64);
let mut ongoing_task_guard = GaugeGuard::from_gauge(&ongoing_tasks);
ongoing_task_guard.add(1i64);
let result = cpu_heavy_task();
let _ = tx.send(result);
});
rx.map_err(|_| Panicked)
}
}

/// Run a small (<200ms) CPU intensive task on a dedicated thread pool with a few threads.
/// Run a small (<200ms) CPU-intensive task on a dedicated thread pool with a few threads.
///
/// When running blocking io (or side-effects in general), prefer use `tokio::spawn_blocking`
/// instead. When running long tasks or a set of tasks that you expect should take more than 33% of
/// When running blocking io (or side-effects in general), prefer using `tokio::spawn_blocking`
/// instead. When running long tasks or a set of tasks that you expect to take more than 33% of
/// your vCPUs, use a dedicated thread/runtime or executor instead.
///
/// Disclaimer: The function will no be executed if the Future is dropped.
Expand All @@ -130,21 +137,39 @@ pub struct Panicked;

impl fmt::Display for Panicked {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Scheduled job panicked")
write!(f, "scheduled task panicked")
}
}

impl std::error::Error for Panicked {}

pub static ACTIVE_THREAD_COUNT: Lazy<IntGaugeVec<1>> = Lazy::new(|| {
new_gauge_vec(
"active_thread_count",
"Number of active threads in a given thread pool.",
"thread_pool",
&[],
["pool"],
)
});
struct ThreadPoolMetrics {
ongoing_tasks: IntGaugeVec<1>,
pending_tasks: IntGaugeVec<1>,
}

impl Default for ThreadPoolMetrics {
fn default() -> Self {
ThreadPoolMetrics {
ongoing_tasks: new_gauge_vec(
"ongoing_tasks",
"number of tasks being currently processed by threads in the thread pool",
"thread_pool",
&[],
["pool"],
),
pending_tasks: new_gauge_vec(
"pending_tasks",
"number of tasks waiting in the queue before being processed by the thread pool",
"thread_pool",
&[],
["pool"],
),
}
}
}

static THREAD_POOL_METRICS: Lazy<ThreadPoolMetrics> = Lazy::new(ThreadPoolMetrics::default);

#[cfg(test)]
mod tests {
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ async fn fetch_docs_in_split(
.context("open-index-for-split")?;
// we add an executor here, we could add it in open_index_with_caches, though we should verify
// the side-effect before
let executor_tantivy = crate::search_thread_pool()
let tantivy_executor = crate::search_thread_pool()
.get_underlying_rayon_thread_pool()
.into();
index.set_executor(executor_tantivy);
index.set_executor(tantivy_executor);
let index_reader = index
.reader_builder()
// the docs are presorted so a cache size of NUM_CONCURRENT_REQUESTS is fine
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ pub use crate::service::{MockSearchService, SearchService, SearchServiceImpl};
pub type SearcherPool = Pool<SocketAddr, SearchServiceClient>;

fn search_thread_pool() -> &'static ThreadPool {
static SEARCH_EXECUTOR: OnceLock<ThreadPool> = OnceLock::new();
SEARCH_EXECUTOR.get_or_init(|| ThreadPool::new("search", None))
static SEARCH_THREAD_POOL: OnceLock<ThreadPool> = OnceLock::new();
SEARCH_THREAD_POOL.get_or_init(|| ThreadPool::new("search", None))
}

/// GlobalDocAddress serves as a hit address.
Expand Down

0 comments on commit 327a256

Please sign in to comment.