From 926241838687983c8951fbb17e647353c9d06947 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 21 Mar 2024 15:04:20 +0900 Subject: [PATCH] Using a runtime with no lifo slot. As spotted by @Pseitz, the unstealable lifo-slot is causing the doc processor and the indexer to run on the same thread. --- .github/workflows/ci.yml | 2 +- .github/workflows/coverage.yml | 1 + Dockerfile | 1 + quickwit/.cargo/config.toml | 2 ++ quickwit/quickwit-common/src/runtimes.rs | 9 ++++++++- quickwit/quickwit-serve/src/lib.rs | 12 +----------- .../src/object_storage/s3_compatible_storage.rs | 8 ++------ 7 files changed, 16 insertions(+), 19 deletions(-) create mode 100644 quickwit/.cargo/config.toml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6389653b262..f87cdf504a8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ env: QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev RUST_BACKTRACE: 1 RUSTDOCFLAGS: -Dwarnings -Arustdoc::private_intra_doc_links - RUSTFLAGS: -Dwarnings + RUSTFLAGS: "--cfg tokio_unstable -Dwarnings" # Ensures that we cancel running jobs for the same PR / same workflow. concurrency: diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index ef9422951ad..38b1fa39f7d 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -21,6 +21,7 @@ env: QW_S3_ENDPOINT: "http://localhost:4566" # Services are exposed as localhost because we are not running coverage in a container. QW_S3_FORCE_PATH_STYLE_ACCESS: 1 QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev + RUSTFLAGS: "--cfg tokio_unstable" jobs: test: diff --git a/Dockerfile b/Dockerfile index 796c6a421a1..7fab6042abd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -38,6 +38,7 @@ COPY --from=ui-builder /quickwit/quickwit-ui/build /quickwit/quickwit-ui/build WORKDIR /quickwit +ENV RUSTFLAGS="--cfg tokio_unstable" RUN echo "Building workspace with feature(s) '$CARGO_FEATURES' and profile '$CARGO_PROFILE'" \ && cargo build \ -p quickwit-cli \ diff --git a/quickwit/.cargo/config.toml b/quickwit/.cargo/config.toml new file mode 100644 index 00000000000..bff29e6e175 --- /dev/null +++ b/quickwit/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/quickwit/quickwit-common/src/runtimes.rs b/quickwit/quickwit-common/src/runtimes.rs index be1e8ce16d0..6761e59fa9c 100644 --- a/quickwit/quickwit-common/src/runtimes.rs +++ b/quickwit/quickwit-common/src/runtimes.rs @@ -83,8 +83,15 @@ impl Default for RuntimesConfig { } fn start_runtimes(config: RuntimesConfig) -> HashMap { + let disable_lifo_slot: bool = + crate::get_from_env("DISABLE_LIFO_SLOT", false); + let mut runtimes = HashMap::default(); - let blocking_runtime = tokio::runtime::Builder::new_multi_thread() + let mut blocking_runtime_builder = tokio::runtime::Builder::new_multi_thread(); + if disable_lifo_slot { + blocking_runtime_builder.disable_lifo_slot(); + } + let blocking_runtime = blocking_runtime_builder .worker_threads(config.num_threads_blocking) .thread_name_fn(|| { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index c14d6885cfb..424d0691517 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -141,17 +141,7 @@ pub fn do_nothing_env_filter_reload_fn() -> EnvFilterReloadFn { } fn get_metastore_client_max_concurrency() -> usize { - std::env::var(METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY).ok() - .and_then(|metastore_client_max_concurrency_str| { - if let Ok(metastore_client_max_concurrency) = metastore_client_max_concurrency_str.parse::() { - info!("overriding max concurrent metastore requests to {metastore_client_max_concurrency}"); - Some(metastore_client_max_concurrency) - } else { - error!("failed to parse environment variable `{METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY}={metastore_client_max_concurrency_str}`"); - None - } - }) - .unwrap_or(DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY) + quickwit_common::get_from_env(METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY, DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY) } static CP_GRPC_CLIENT_METRICS_LAYER: Lazy = diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index ccd9ee8ea79..2eff65df1ab 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -22,7 +22,7 @@ use std::ops::Range; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::{env, fmt, io}; +use std::{fmt, io}; use anyhow::anyhow; use async_trait::async_trait; @@ -57,11 +57,7 @@ use crate::{ /// Semaphore to limit the number of concurent requests to the object store. Some object stores /// (R2, SeaweedFs...) return errors when too many concurrent requests are emitted. static REQUEST_SEMAPHORE: Lazy = Lazy::new(|| { - let num_permits: usize = env::var("QW_S3_MAX_CONCURRENCY") - .as_deref() - .unwrap_or("10000") - .parse() - .expect("QW_S3_MAX_CONCURRENCY value should be a number."); + let num_permits: usize = quickwit_common::get_from_env("QW_S3_MAX_CONCURRENCY", 10_000usize); Semaphore::new(num_permits) });