Skip to content

Commit

Permalink
Attempting to make the rest server more stable under load of an
Browse files Browse the repository at this point in the history
aggressive client.
  • Loading branch information
fulmicoton committed May 7, 2024
1 parent f62e425 commit dddfcd1
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 4 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ tower = { version = "0.4.13", features = [
"load",
"retry",
"util",
"load-shed",
] }
tower-http = { version = "0.4.0", features = ["compression-gzip", "cors"] }
tracing = "0.1.37"
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub use crate::search_job_placer::{Job, SearchJobPlacer};
pub use crate::search_response_rest::SearchResponseRest;
pub use crate::search_stream::root_search_stream;
pub use crate::service::{MockSearchService, SearchService, SearchServiceImpl};
use crate::thread_pool::run_cpu_intensive;
pub use crate::thread_pool::run_cpu_intensive;

/// A pool of searcher clients identified by their gRPC socket address.
pub type SearcherPool = Pool<SocketAddr, SearchServiceClient>;
Expand Down
5 changes: 2 additions & 3 deletions quickwit/quickwit-serve/src/decompression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use bytes::Bytes;
use flate2::read::GzDecoder;
use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
use thiserror::Error;
use tokio::task;
use warp::reject::Reject;
use warp::Filter;

Expand All @@ -37,7 +36,7 @@ use warp::Filter;
async fn decompress_body(encoding: Option<String>, body: Bytes) -> Result<Bytes, warp::Rejection> {
match encoding.as_deref() {
Some("gzip" | "x-gzip") => {
let decompressed = task::spawn_blocking(move || {
let decompressed = quickwit_search::run_cpu_intensive(move || {
let mut decompressed = Vec::new();
let mut decoder = GzDecoder::new(body.as_ref());
decoder
Expand All @@ -50,7 +49,7 @@ async fn decompress_body(encoding: Option<String>, body: Bytes) -> Result<Bytes,
Ok(decompressed)
}
Some("zstd") => {
let decompressed = task::spawn_blocking(move || {
let decompressed = quickwit_search::run_cpu_intensive(move || {
zstd::decode_all(body.as_ref())
.map(Bytes::from)
.map_err(|_| warp::reject::custom(CorruptedData))
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-serve/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ pub(crate) async fn start_rest_server(
let cors = build_cors(&quickwit_services.node_config.rest_config.cors_allow_origins);

let service = ServiceBuilder::new()
.concurrency_limit(20)
.load_shed()
.concurrency_limit(150)
.layer(
CompressionLayer::new()
.gzip(true)
Expand Down

0 comments on commit dddfcd1

Please sign in to comment.