From ff17f3844543bebbf667e2c6161f4da01323c9fa Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 23 Apr 2024 14:47:10 +0900 Subject: [PATCH] Measure the memory usage of rest body more accurately. This PR uses content-length to make sure we account the memory of the REST body upfront. --- quickwit/quickwit-common/src/metrics.rs | 6 + quickwit/quickwit-serve/src/decompression.rs | 155 +++++++++++++------ 2 files changed, 111 insertions(+), 50 deletions(-) diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 323362e65ed..7f42f409218 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -199,6 +199,12 @@ impl GaugeGuard { self.delta } + pub fn set_delta(&mut self, new_delta: i64) { + let diff_delta = new_delta - self.delta; + self.delta = new_delta; + self.gauge.add(diff_delta); + } + pub fn add(&mut self, delta: i64) { self.gauge.add(delta); self.delta += delta; diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs index 37e68074949..06cc091c303 100644 --- a/quickwit/quickwit-serve/src/decompression.rs +++ b/quickwit/quickwit-serve/src/decompression.rs @@ -19,52 +19,19 @@ use std::io::Read; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use flate2::read::GzDecoder; +use futures_util::{Stream, StreamExt}; use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; use thiserror::Error; -use tokio::task; +use tokio::sync::{Semaphore, SemaphorePermit}; use warp::reject::Reject; use warp::Filter; -/// There are two ways to decompress the body: -/// - Stream the body through an async decompressor -/// - Fetch the body and then decompress the bytes -/// -/// The first approach lowers the latency, while the second approach is more CPU efficient. -/// Ingesting data is usually CPU bound and there is considerable latency until the data is -/// searchable, so the second approach is more suitable for this use case. -async fn decompress_body(encoding: Option, body: Bytes) -> Result { - match encoding.as_deref() { - Some("gzip" | "x-gzip") => { - let decompressed = task::spawn_blocking(move || { - let mut decompressed = Vec::new(); - let mut decoder = GzDecoder::new(body.as_ref()); - decoder - .read_to_end(&mut decompressed) - .map_err(|_| warp::reject::custom(CorruptedData))?; - Result::<_, warp::Rejection>::Ok(Bytes::from(decompressed)) - }) - .await - .map_err(|_| warp::reject::custom(CorruptedData))??; - Ok(decompressed) - } - Some("zstd") => { - let decompressed = task::spawn_blocking(move || { - zstd::decode_all(body.as_ref()) - .map(Bytes::from) - .map_err(|_| warp::reject::custom(CorruptedData)) - }) - .await - .map_err(|_| warp::reject::custom(CorruptedData))??; - Ok(decompressed) - } - Some(encoding) => Err(warp::reject::custom(UnsupportedEncoding( - encoding.to_string(), - ))), - _ => Ok(body), - } -} +/// Semaphore used to unsure we do not decompress too many bodies at the same time. +/// (the spawn blocking thread pool is usually meant for io and does not really limit +/// the number of threads.) +static DECOMPRESSION_PERMITS: Semaphore = Semaphore::const_new(3); #[derive(Debug, Error)] #[error("Error while decompressing the data")] @@ -72,19 +39,107 @@ pub(crate) struct CorruptedData; impl Reject for CorruptedData {} +#[derive(Debug, Error)] +#[error("Error while receiving the body")] +pub(crate) struct BodyTransferError; + +impl Reject for BodyTransferError {} + #[derive(Debug, Error)] #[error("Unsupported Content-Encoding {}. Supported encodings are 'gzip' and 'zstd'", self.0)] pub(crate) struct UnsupportedEncoding(String); impl Reject for UnsupportedEncoding {} +enum Compression { + Gzip, + Zstd, +} + +impl TryFrom for Compression { + type Error = warp::Rejection; + + fn try_from(unknown_encoding: String) -> Result { + match unknown_encoding.as_str() { + "gzip" | "x-gzip" => Ok(Compression::Gzip), + "zstd" => Ok(Compression::Zstd), + _ => Err(warp::reject::custom(UnsupportedEncoding(unknown_encoding))), + } + } +} + +fn append_buf_to_vec(mut buf: impl Buf, output: &mut Vec) { + output.reserve(buf.remaining()); + while buf.has_remaining() { + let chunk = buf.chunk(); + output.extend_from_slice(chunk); + buf.advance(chunk.len()); + } +} + +/// There are two ways to decompress the body: +/// - Stream the body through an async decompressor +/// - Fetch the body and then decompress the bytes +/// +/// The first approach lowers the latency, while the second approach is more CPU efficient. +/// Ingesting data is usually CPU bound and there is considerable latency until the data is +/// searchable, so the second approach is more suitable for this use case. +fn decompress_body( + encoding: String, + compressed_body: &[u8], + mut gauge_guard: GaugeGuard, +) -> Result { + let compression: Compression = Compression::try_from(encoding)?; + let compressed_body_len = compressed_body.len(); + let decompressed_body_len = compressed_body_len * 10; + gauge_guard.add(decompressed_body_len as i64); + let mut decompressed_body = Vec::with_capacity(decompressed_body_len); + match compression { + Compression::Gzip => GzDecoder::new(compressed_body).read_to_end(&mut decompressed_body), + Compression::Zstd => zstd::Decoder::with_buffer(compressed_body) + .and_then(|mut decoder| decoder.read_to_end(&mut decompressed_body)), + } + .map_err(|_| warp::reject::custom(CorruptedData))?; + Ok(Body::new(decompressed_body, gauge_guard)) +} + +/// Gets the body from the stream and decompresses it if necessary. +async fn get_decompressed_body( + content_length_opt: Option, + encoding: Option, + mut body_stream: impl Stream> + Unpin + Send + Sync, +) -> Result { + let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.rest_server); + let mut buffer = Vec::new(); + if let Some(content_length) = content_length_opt { + gauge_guard.add(content_length as i64); + buffer.reserve_exact(content_length as usize); + } + while let Some(body_chunk_res) = body_stream.next().await { + let Ok(body_chunk) = body_chunk_res else { + return Err(warp::reject::custom(BodyTransferError)); + }; + append_buf_to_vec(body_chunk, &mut buffer); + gauge_guard.set_delta(buffer.capacity() as i64); + } + // At this point we have the entire buffer. + // We may still need to decompress it. + if let Some(encoding) = encoding { + let _decompression_permit: SemaphorePermit = DECOMPRESSION_PERMITS.acquire().await.unwrap(); + tokio::task::spawn_blocking(move || decompress_body(encoding, &buffer[..], gauge_guard)) + .await + .map_err(|_| warp::reject::custom(CorruptedData))? + } else { + Ok(Body::new(buffer, gauge_guard)) + } +} + /// Custom filter for optional decompression pub(crate) fn get_body_bytes() -> impl Filter + Clone { - warp::header::optional("content-encoding") - .and(warp::body::bytes()) - .and_then(|encoding: Option, body: Bytes| async move { - decompress_body(encoding, body).await.map(Body::from) - }) + warp::header::optional::("content-length") + .and(warp::header::optional("content-encoding")) + .and(warp::body::stream()) + .and_then(get_decompressed_body) } pub(crate) struct Body { @@ -92,12 +147,12 @@ pub(crate) struct Body { _gauge_guard: GaugeGuard, } -impl From for Body { - fn from(content: Bytes) -> Self { - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.rest_server); - gauge_guard.add(content.len() as i64); +impl Body { + pub fn new(mut content: Vec, mut gauge_guard: GaugeGuard) -> Body { + content.shrink_to_fit(); + gauge_guard.set_delta(content.capacity() as i64); Body { - content, + content: Bytes::from(content), _gauge_guard: gauge_guard, } }