Skip to content

Commit

Permalink
blop
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed May 7, 2024
1 parent dddfcd1 commit 123572a
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 2 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-serve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ humantime = { workspace = true }
hyper = { workspace = true }
itertools = { workspace = true }
mime_guess = { workspace = true }
rayon = { workspace = true }
num_cpus = { workspace = true }
once_cell = { workspace = true }
opentelemetry = { workspace = true }
Expand Down
52 changes: 50 additions & 2 deletions quickwit/quickwit-serve/src/decompression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,59 @@ use std::io::Read;

use bytes::Bytes;
use flate2::read::GzDecoder;
use once_cell::sync::OnceCell;
use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
use thiserror::Error;
use tracing::error;
use warp::reject::Reject;
use warp::Filter;

fn thread_pool() -> &'static rayon::ThreadPool {
static THREAD_POOL: OnceCell<rayon::ThreadPool> = OnceCell::new();
THREAD_POOL.get_or_init(|| {
rayon::ThreadPoolBuilder::new()
.num_threads(1)
.thread_name(|thread_id| format!("quickwit-rest-{thread_id}"))
.panic_handler(|_my_panic| {
error!("task running in the quickwit rest pool panicked");
})
.build()
.expect("Failed to spawn the spawning pool")
})
}

/// Function similar to `tokio::spawn_blocking`.
///
/// Here are two important differences however:
///
/// 1) The task is running on a rayon thread pool managed by quickwit.
/// This pool is specifically used only to run CPU intensive work
/// and is configured to contain `num_cpus` cores.
///
/// 2) Before the task is effectively scheduled, we check that
/// the spawner is still interested by its result.
///
/// It is therefore required to `await` the result of this
/// function to get anywork done.
///
/// This is nice, because it makes work that has been scheduled
/// but is not running yet "cancellable".
pub async fn run_cpu_intensive<F, R>(cpu_heavy_task: F) -> Result<R, ()>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
thread_pool().spawn(move || {
if tx.is_closed() {
return;
}
let task_result = cpu_heavy_task();
let _ = tx.send(task_result);
});
rx.await.map_err(|_| ())
}

/// There are two ways to decompress the body:
/// - Stream the body through an async decompressor
/// - Fetch the body and then decompress the bytes
Expand All @@ -36,7 +84,7 @@ use warp::Filter;
async fn decompress_body(encoding: Option<String>, body: Bytes) -> Result<Bytes, warp::Rejection> {
match encoding.as_deref() {
Some("gzip" | "x-gzip") => {
let decompressed = quickwit_search::run_cpu_intensive(move || {
let decompressed = run_cpu_intensive(move || {
let mut decompressed = Vec::new();
let mut decoder = GzDecoder::new(body.as_ref());
decoder
Expand All @@ -49,7 +97,7 @@ async fn decompress_body(encoding: Option<String>, body: Bytes) -> Result<Bytes,
Ok(decompressed)
}
Some("zstd") => {
let decompressed = quickwit_search::run_cpu_intensive(move || {
let decompressed = run_cpu_intensive(move || {
zstd::decode_all(body.as_ref())
.map(Bytes::from)
.map_err(|_| warp::reject::custom(CorruptedData))
Expand Down

0 comments on commit 123572a

Please sign in to comment.