diff --git a/.github/workflows/cbench.yml b/.github/workflows/cbench.yml index 641fe071bbe..cb30ed6e90e 100644 --- a/.github/workflows/cbench.yml +++ b/.github/workflows/cbench.yml @@ -13,11 +13,13 @@ on: # pull request. pull_request_target: +env: + RUSTFLAGS: "--cfg tokio_unstable" + concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} cancel-in-progress: true - jobs: tests: name: Benchmark diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d667aa84643..c4b4823df0d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ env: QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev RUST_BACKTRACE: 1 RUSTDOCFLAGS: -Dwarnings -Arustdoc::private_intra_doc_links - RUSTFLAGS: -Dwarnings + RUSTFLAGS: "--cfg tokio_unstable -Dwarnings" # Ensures that we cancel running jobs for the same PR / same workflow. concurrency: diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 530d4e2bbde..11bf4da86c7 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -21,6 +21,7 @@ env: QW_S3_ENDPOINT: "http://localhost:4566" # Services are exposed as localhost because we are not running coverage in a container. QW_S3_FORCE_PATH_STYLE_ACCESS: 1 QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev + RUSTFLAGS: "--cfg tokio_unstable" jobs: test: diff --git a/Dockerfile b/Dockerfile index 796c6a421a1..7fab6042abd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -38,6 +38,7 @@ COPY --from=ui-builder /quickwit/quickwit-ui/build /quickwit/quickwit-ui/build WORKDIR /quickwit +ENV RUSTFLAGS="--cfg tokio_unstable" RUN echo "Building workspace with feature(s) '$CARGO_FEATURES' and profile '$CARGO_PROFILE'" \ && cargo build \ -p quickwit-cli \ diff --git a/quickwit/.cargo/config.toml b/quickwit/.cargo/config.toml new file mode 100644 index 00000000000..bff29e6e175 --- /dev/null +++ b/quickwit/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/quickwit/quickwit-common/src/runtimes.rs b/quickwit/quickwit-common/src/runtimes.rs index 42435ce6662..fa7a4486e43 100644 --- a/quickwit/quickwit-common/src/runtimes.rs +++ b/quickwit/quickwit-common/src/runtimes.rs @@ -83,8 +83,14 @@ impl Default for RuntimesConfig { } fn start_runtimes(config: RuntimesConfig) -> HashMap { + let disable_lifo_slot: bool = crate::get_from_env("QW_DISABLE_TOKIO_LIFO_SLOT", true); + let mut runtimes = HashMap::default(); - let blocking_runtime = tokio::runtime::Builder::new_multi_thread() + let mut blocking_runtime_builder = tokio::runtime::Builder::new_multi_thread(); + if disable_lifo_slot { + blocking_runtime_builder.disable_lifo_slot(); + } + let blocking_runtime = blocking_runtime_builder .worker_threads(config.num_threads_blocking) .thread_name_fn(|| { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 5e92984328c..9f0e31d28dd 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -143,17 +143,10 @@ pub fn do_nothing_env_filter_reload_fn() -> EnvFilterReloadFn { } fn get_metastore_client_max_concurrency() -> usize { - std::env::var(METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY).ok() - .and_then(|metastore_client_max_concurrency_str| { - if let Ok(metastore_client_max_concurrency) = metastore_client_max_concurrency_str.parse::() { - info!("overriding max concurrent metastore requests to {metastore_client_max_concurrency}"); - Some(metastore_client_max_concurrency) - } else { - error!("failed to parse environment variable `{METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY}={metastore_client_max_concurrency_str}`"); - None - } - }) - .unwrap_or(DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY) + quickwit_common::get_from_env( + METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY, + DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY, + ) } static CP_GRPC_CLIENT_METRICS_LAYER: Lazy = diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index f43e2a4ae42..3f912799a53 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -22,7 +22,7 @@ use std::ops::Range; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::{env, fmt, io}; +use std::{fmt, io}; use anyhow::{anyhow, Context as AnyhhowContext}; use async_trait::async_trait; @@ -59,11 +59,7 @@ use crate::{ /// Semaphore to limit the number of concurent requests to the object store. Some object stores /// (R2, SeaweedFs...) return errors when too many concurrent requests are emitted. static REQUEST_SEMAPHORE: Lazy = Lazy::new(|| { - let num_permits: usize = env::var("QW_S3_MAX_CONCURRENCY") - .as_deref() - .unwrap_or("10000") - .parse() - .expect("QW_S3_MAX_CONCURRENCY value should be a number."); + let num_permits: usize = quickwit_common::get_from_env("QW_S3_MAX_CONCURRENCY", 10_000usize); Semaphore::new(num_permits) });