From 123572a80d4714f5d5aa00574931575f68bea74c Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 7 May 2024 17:45:27 +0900 Subject: [PATCH] blop --- quickwit/Cargo.lock | 1 + quickwit/quickwit-serve/Cargo.toml | 1 + quickwit/quickwit-serve/src/decompression.rs | 52 +++++++++++++++++++- 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 536d69f428b..84cb5e58582 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6327,6 +6327,7 @@ dependencies = [ "quickwit-search", "quickwit-storage", "quickwit-telemetry", + "rayon", "regex", "rust-embed", "serde", diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index b45e64971c1..b5347458408 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -27,6 +27,7 @@ humantime = { workspace = true } hyper = { workspace = true } itertools = { workspace = true } mime_guess = { workspace = true } +rayon = { workspace = true } num_cpus = { workspace = true } once_cell = { workspace = true } opentelemetry = { workspace = true } diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs index ad882bf98a1..cc299fc445c 100644 --- a/quickwit/quickwit-serve/src/decompression.rs +++ b/quickwit/quickwit-serve/src/decompression.rs @@ -21,11 +21,59 @@ use std::io::Read; use bytes::Bytes; use flate2::read::GzDecoder; +use once_cell::sync::OnceCell; use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; use thiserror::Error; +use tracing::error; use warp::reject::Reject; use warp::Filter; +fn thread_pool() -> &'static rayon::ThreadPool { + static THREAD_POOL: OnceCell = OnceCell::new(); + THREAD_POOL.get_or_init(|| { + rayon::ThreadPoolBuilder::new() + .num_threads(1) + .thread_name(|thread_id| format!("quickwit-rest-{thread_id}")) + .panic_handler(|_my_panic| { + error!("task running in the quickwit rest pool panicked"); + }) + .build() + .expect("Failed to spawn the spawning pool") + }) +} + +/// Function similar to `tokio::spawn_blocking`. +/// +/// Here are two important differences however: +/// +/// 1) The task is running on a rayon thread pool managed by quickwit. +/// This pool is specifically used only to run CPU intensive work +/// and is configured to contain `num_cpus` cores. +/// +/// 2) Before the task is effectively scheduled, we check that +/// the spawner is still interested by its result. +/// +/// It is therefore required to `await` the result of this +/// function to get anywork done. +/// +/// This is nice, because it makes work that has been scheduled +/// but is not running yet "cancellable". +pub async fn run_cpu_intensive(cpu_heavy_task: F) -> Result +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + let (tx, rx) = tokio::sync::oneshot::channel(); + thread_pool().spawn(move || { + if tx.is_closed() { + return; + } + let task_result = cpu_heavy_task(); + let _ = tx.send(task_result); + }); + rx.await.map_err(|_| ()) +} + /// There are two ways to decompress the body: /// - Stream the body through an async decompressor /// - Fetch the body and then decompress the bytes @@ -36,7 +84,7 @@ use warp::Filter; async fn decompress_body(encoding: Option, body: Bytes) -> Result { match encoding.as_deref() { Some("gzip" | "x-gzip") => { - let decompressed = quickwit_search::run_cpu_intensive(move || { + let decompressed = run_cpu_intensive(move || { let mut decompressed = Vec::new(); let mut decoder = GzDecoder::new(body.as_ref()); decoder @@ -49,7 +97,7 @@ async fn decompress_body(encoding: Option, body: Bytes) -> Result { - let decompressed = quickwit_search::run_cpu_intensive(move || { + let decompressed = run_cpu_intensive(move || { zstd::decode_all(body.as_ref()) .map(Bytes::from) .map_err(|_| warp::reject::custom(CorruptedData))