Skip to content

Commit

Permalink
disabling arm build
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed May 14, 2024
1 parent b565983 commit 0633405
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 8 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/publish_docker_images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ jobs:
include:
- os: ubuntu-latest
platform: linux/amd64
# Using 16 vcpu on arm as 8vcpu seems to stay stuck on runner acquisition.
# (5/13). We can switch back to 8vcpu one buildjet starts working as intended again.
- os: buildjet-16vcpu-ubuntu-2204-arm
platform: linux/arm64
#- os: buildjet-8vcpu-ubuntu-2204-arm
# platform: linux/arm64
runs-on: ${{ matrix.os }}
steps:
- name: Checkout
Expand Down
16 changes: 13 additions & 3 deletions quickwit/quickwit-serve/src/decompression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::io::Read;
use std::sync::OnceLock;

use bytes::Bytes;
use flate2::read::GzDecoder;
Expand All @@ -29,7 +30,14 @@ use warp::Filter;

use crate::load_shield::{LoadShield, LoadShieldPermit};

static LOAD_SHIELD: LoadShield = LoadShield::new(10, 150);
fn load_shield() -> &'static LoadShield {
static LOAD_SHIELD: OnceLock<LoadShield> = OnceLock::new();
LOAD_SHIELD.get_or_init(|| {
let ingest_max_concurrency = quickwit_common::get_from_env("QW_INGEST_MAX_CONCURRENCY", 10);
let ingest_max_pending = quickwit_common::get_from_env("QW_INGEST_MAX_PENDING", 150);
LoadShield::new(ingest_max_concurrency, ingest_max_pending)
})
}

/// There are two ways to decompress the body:
/// - Stream the body through an async decompressor
Expand Down Expand Up @@ -87,8 +95,10 @@ pub(crate) fn get_body_bytes() -> impl Filter<Extract = (Body,), Error = warp::R
warp::header::optional("content-encoding")
.and(warp::body::bytes())
.and_then(|encoding: Option<String>, body: Bytes| async move {
let permit = LOAD_SHIELD.acquire_permit().await?;
decompress_body(encoding, body).await.map(|content| Body::new(content, permit))
let permit = load_shield().acquire_permit().await?;
decompress_body(encoding, body)
.await
.map(|content| Body::new(content, permit))
})
}

Expand Down
21 changes: 20 additions & 1 deletion quickwit/quickwit-serve/src/load_shield.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::time::Duration;

use tokio::sync::{Semaphore, SemaphorePermit};
Expand Down Expand Up @@ -34,7 +53,7 @@ impl LoadShield {
let Ok(load_shed_permit) = self.load_shed_semaphore.try_acquire() else {
// Wait a little to deal before load shedding. The point is to lower the load associated
// with super aggressive clients.
tokio::time::sleep(Duration::from_millis(100)).await;
// tokio::time::sleep(Duration::from_millis(100)).await;
return Err(warp::reject::custom(TooManyRequests))
};
let Ok(concurrency_permit) = self.concurrency_semaphore.acquire().await else {
Expand Down

0 comments on commit 0633405

Please sign in to comment.