diff --git a/.github/workflows/publish_docker_images.yml b/.github/workflows/publish_docker_images.yml index cb95d6a78f3..69b4de2e13c 100644 --- a/.github/workflows/publish_docker_images.yml +++ b/.github/workflows/publish_docker_images.yml @@ -22,10 +22,8 @@ jobs: include: - os: ubuntu-latest platform: linux/amd64 - # Using 16 vcpu on arm as 8vcpu seems to stay stuck on runner acquisition. - # (5/13). We can switch back to 8vcpu one buildjet starts working as intended again. - - os: buildjet-16vcpu-ubuntu-2204-arm - platform: linux/arm64 + #- os: buildjet-8vcpu-ubuntu-2204-arm + # platform: linux/arm64 runs-on: ${{ matrix.os }} steps: - name: Checkout diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs index 02cc8926064..bfb1eee3ad7 100644 --- a/quickwit/quickwit-serve/src/decompression.rs +++ b/quickwit/quickwit-serve/src/decompression.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::io::Read; +use std::sync::OnceLock; use bytes::Bytes; use flate2::read::GzDecoder; @@ -29,7 +30,14 @@ use warp::Filter; use crate::load_shield::{LoadShield, LoadShieldPermit}; -static LOAD_SHIELD: LoadShield = LoadShield::new(10, 150); +fn load_shield() -> &'static LoadShield { + static LOAD_SHIELD: OnceLock = OnceLock::new(); + LOAD_SHIELD.get_or_init(|| { + let ingest_max_concurrency = quickwit_common::get_from_env("QW_INGEST_MAX_CONCURRENCY", 10); + let ingest_max_pending = quickwit_common::get_from_env("QW_INGEST_MAX_PENDING", 150); + LoadShield::new(ingest_max_concurrency, ingest_max_pending) + }) +} /// There are two ways to decompress the body: /// - Stream the body through an async decompressor @@ -87,8 +95,10 @@ pub(crate) fn get_body_bytes() -> impl Filter, body: Bytes| async move { - let permit = LOAD_SHIELD.acquire_permit().await?; - decompress_body(encoding, body).await.map(|content| Body::new(content, permit)) + let permit = load_shield().acquire_permit().await?; + decompress_body(encoding, body) + .await + .map(|content| Body::new(content, permit)) }) } diff --git a/quickwit/quickwit-serve/src/load_shield.rs b/quickwit/quickwit-serve/src/load_shield.rs index dbd67ce1e13..19838c5c57d 100644 --- a/quickwit/quickwit-serve/src/load_shield.rs +++ b/quickwit/quickwit-serve/src/load_shield.rs @@ -1,3 +1,22 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + use std::time::Duration; use tokio::sync::{Semaphore, SemaphorePermit}; @@ -34,7 +53,7 @@ impl LoadShield { let Ok(load_shed_permit) = self.load_shed_semaphore.try_acquire() else { // Wait a little to deal before load shedding. The point is to lower the load associated // with super aggressive clients. - tokio::time::sleep(Duration::from_millis(100)).await; + // tokio::time::sleep(Duration::from_millis(100)).await; return Err(warp::reject::custom(TooManyRequests)) }; let Ok(concurrency_permit) = self.concurrency_semaphore.acquire().await else {