Skip to content

Commit

Permalink
Measure the memory usage of rest body more accurately.
Browse files Browse the repository at this point in the history
This PR uses content-length to make sure we account the memory
of the REST body upfront.
  • Loading branch information
fulmicoton committed Apr 23, 2024
1 parent ffbf6cf commit 021accc
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 50 deletions.
6 changes: 6 additions & 0 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ impl GaugeGuard {
self.delta
}

pub fn set_delta(&mut self, new_delta: i64) {
let diff_delta = new_delta - self.delta;
self.delta = diff_delta;
self.gauge.add(diff_delta);
}

pub fn add(&mut self, delta: i64) {
self.gauge.add(delta);
self.delta += delta;
Expand Down
155 changes: 105 additions & 50 deletions quickwit/quickwit-serve/src/decompression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,85 +19,140 @@

use std::io::Read;

use bytes::Bytes;
use bytes::{Buf, Bytes};
use flate2::read::GzDecoder;
use futures_util::{Stream, StreamExt};
use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
use thiserror::Error;
use tokio::task;
use tokio::sync::{Semaphore, SemaphorePermit};
use warp::reject::Reject;
use warp::Filter;

/// There are two ways to decompress the body:
/// - Stream the body through an async decompressor
/// - Fetch the body and then decompress the bytes
///
/// The first approach lowers the latency, while the second approach is more CPU efficient.
/// Ingesting data is usually CPU bound and there is considerable latency until the data is
/// searchable, so the second approach is more suitable for this use case.
async fn decompress_body(encoding: Option<String>, body: Bytes) -> Result<Bytes, warp::Rejection> {
match encoding.as_deref() {
Some("gzip" | "x-gzip") => {
let decompressed = task::spawn_blocking(move || {
let mut decompressed = Vec::new();
let mut decoder = GzDecoder::new(body.as_ref());
decoder
.read_to_end(&mut decompressed)
.map_err(|_| warp::reject::custom(CorruptedData))?;
Result::<_, warp::Rejection>::Ok(Bytes::from(decompressed))
})
.await
.map_err(|_| warp::reject::custom(CorruptedData))??;
Ok(decompressed)
}
Some("zstd") => {
let decompressed = task::spawn_blocking(move || {
zstd::decode_all(body.as_ref())
.map(Bytes::from)
.map_err(|_| warp::reject::custom(CorruptedData))
})
.await
.map_err(|_| warp::reject::custom(CorruptedData))??;
Ok(decompressed)
}
Some(encoding) => Err(warp::reject::custom(UnsupportedEncoding(
encoding.to_string(),
))),
_ => Ok(body),
}
}
/// Semaphore used to unsure we do not decompress too many bodies at the same time.
/// (the spawn blocking thread pool is usually meant for io and does not really limit
/// the number of threads.)
static DECOMPRESSION_PERMITS: Semaphore = Semaphore::const_new(3);

#[derive(Debug, Error)]
#[error("Error while decompressing the data")]
pub(crate) struct CorruptedData;

impl Reject for CorruptedData {}

#[derive(Debug, Error)]
#[error("Error while receiving the body")]
pub(crate) struct BodyTransferError;

impl Reject for BodyTransferError {}

#[derive(Debug, Error)]
#[error("Unsupported Content-Encoding {}. Supported encodings are 'gzip' and 'zstd'", self.0)]
pub(crate) struct UnsupportedEncoding(String);

impl Reject for UnsupportedEncoding {}

enum Compression {
Gzip,
Zstd,
}

impl TryFrom<String> for Compression {
type Error = warp::Rejection;

fn try_from(unknown_encoding: String) -> Result<Self, Self::Error> {
match unknown_encoding.as_str() {
"gzip" | "x-gzip" => Ok(Compression::Gzip),
"zstd" => Ok(Compression::Zstd),
_ => Err(warp::reject::custom(UnsupportedEncoding(unknown_encoding))),
}
}
}

fn append_buf_to_vec(mut buf: impl Buf, output: &mut Vec<u8>) {
output.reserve(buf.remaining());
while buf.has_remaining() {
let chunk = buf.chunk();
output.extend_from_slice(chunk);
buf.advance(chunk.len());
}
}

/// There are two ways to decompress the body:
/// - Stream the body through an async decompressor
/// - Fetch the body and then decompress the bytes
///
/// The first approach lowers the latency, while the second approach is more CPU efficient.
/// Ingesting data is usually CPU bound and there is considerable latency until the data is
/// searchable, so the second approach is more suitable for this use case.
fn decompress_body(
encoding: String,
compressed_body: &[u8],
mut gauge_guard: GaugeGuard,
) -> Result<Body, warp::Rejection> {
let compression: Compression = Compression::try_from(encoding)?;
let compressed_body_len = compressed_body.len();
let decompressed_body_len = compressed_body_len * 10;
gauge_guard.add(decompressed_body_len as i64);
let mut decompressed_body = Vec::with_capacity(decompressed_body_len);
match compression {
Compression::Gzip => GzDecoder::new(compressed_body).read_to_end(&mut decompressed_body),
Compression::Zstd => zstd::Decoder::with_buffer(compressed_body)
.and_then(|mut decoder| decoder.read_to_end(&mut decompressed_body)),
}
.map_err(|_| warp::reject::custom(CorruptedData))?;
Ok(Body::new(decompressed_body, gauge_guard))
}

/// Gets the body from the stream and decompresses it if necessary.
async fn get_decompressed_body(
content_length_opt: Option<u64>,
encoding: Option<String>,
mut body_stream: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin + Send + Sync,
) -> Result<Body, warp::Rejection> {
let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.rest_server);
let mut buffer = Vec::new();
if let Some(content_length) = content_length_opt {
gauge_guard.add(content_length as i64);
buffer.reserve_exact(content_length as usize);
}
while let Some(body_chunk_res) = body_stream.next().await {
let Ok(body_chunk) = body_chunk_res else {
return Err(warp::reject::custom(BodyTransferError));
};
append_buf_to_vec(body_chunk, &mut buffer);
gauge_guard.set_delta(buffer.capacity() as i64);
}
// At this point we have the entire buffer.
// We may still need to decompress it.
if let Some(encoding) = encoding {
let _decompression_permit: SemaphorePermit = DECOMPRESSION_PERMITS.acquire().await.unwrap();
tokio::task::spawn_blocking(move || decompress_body(encoding, &buffer[..], gauge_guard))
.await
.map_err(|_| warp::reject::custom(CorruptedData))?
} else {
Ok(Body::new(buffer, gauge_guard))
}
}

/// Custom filter for optional decompression
pub(crate) fn get_body_bytes() -> impl Filter<Extract = (Body,), Error = warp::Rejection> + Clone {
warp::header::optional("content-encoding")
.and(warp::body::bytes())
.and_then(|encoding: Option<String>, body: Bytes| async move {
decompress_body(encoding, body).await.map(Body::from)
})
warp::header::optional::<u64>("content-length")
.and(warp::header::optional("content-encoding"))
.and(warp::body::stream())
.and_then(get_decompressed_body)
}

pub(crate) struct Body {
pub content: Bytes,
_gauge_guard: GaugeGuard,
}

impl From<Bytes> for Body {
fn from(content: Bytes) -> Self {
let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.rest_server);
gauge_guard.add(content.len() as i64);
impl Body {
pub fn new(mut content: Vec<u8>, mut gauge_guard: GaugeGuard) -> Body {
content.shrink_to_fit();
gauge_guard.set_delta(content.capacity() as i64);
Body {
content,
content: Bytes::from(content),
_gauge_guard: gauge_guard,
}
}
Expand Down

0 comments on commit 021accc

Please sign in to comment.