Skip to content

Commit

Permalink
Using a runtime with no lifo slot.
Browse files Browse the repository at this point in the history
As spotted by @PSeitz, the unstealable lifo-slot is causing the
doc processor and the indexer to run on the same thread.
  • Loading branch information
fulmicoton committed May 15, 2024
1 parent a0c1760 commit bc71017
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 20 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/cbench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ on:
# pull request.
pull_request_target:

env:
RUSTFLAGS: "--cfg tokio_unstable"

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true


jobs:
tests:
name: Benchmark
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ env:
QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev
RUST_BACKTRACE: 1
RUSTDOCFLAGS: -Dwarnings -Arustdoc::private_intra_doc_links
RUSTFLAGS: -Dwarnings
RUSTFLAGS: "--cfg tokio_unstable -Dwarnings"

# Ensures that we cancel running jobs for the same PR / same workflow.
concurrency:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ env:
QW_S3_ENDPOINT: "http://localhost:4566" # Services are exposed as localhost because we are not running coverage in a container.
QW_S3_FORCE_PATH_STYLE_ACCESS: 1
QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev
RUSTFLAGS: "--cfg tokio_unstable"

jobs:
test:
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ COPY --from=ui-builder /quickwit/quickwit-ui/build /quickwit/quickwit-ui/build

WORKDIR /quickwit

ENV RUSTFLAGS="--cfg tokio_unstable"
RUN echo "Building workspace with feature(s) '$CARGO_FEATURES' and profile '$CARGO_PROFILE'" \
&& cargo build \
-p quickwit-cli \
Expand Down
2 changes: 2 additions & 0 deletions quickwit/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
8 changes: 7 additions & 1 deletion quickwit/quickwit-common/src/runtimes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,14 @@ impl Default for RuntimesConfig {
}

fn start_runtimes(config: RuntimesConfig) -> HashMap<RuntimeType, Runtime> {
let disable_lifo_slot: bool = crate::get_from_env("QW_DISABLE_TOKIO_LIFO_SLOT", true);

let mut runtimes = HashMap::default();
let blocking_runtime = tokio::runtime::Builder::new_multi_thread()
let mut blocking_runtime_builder = tokio::runtime::Builder::new_multi_thread();
if disable_lifo_slot {
blocking_runtime_builder.disable_lifo_slot();
}
let blocking_runtime = blocking_runtime_builder
.worker_threads(config.num_threads_blocking)
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
Expand Down
15 changes: 4 additions & 11 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,10 @@ pub fn do_nothing_env_filter_reload_fn() -> EnvFilterReloadFn {
}

fn get_metastore_client_max_concurrency() -> usize {
std::env::var(METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY).ok()
.and_then(|metastore_client_max_concurrency_str| {
if let Ok(metastore_client_max_concurrency) = metastore_client_max_concurrency_str.parse::<usize>() {
info!("overriding max concurrent metastore requests to {metastore_client_max_concurrency}");
Some(metastore_client_max_concurrency)
} else {
error!("failed to parse environment variable `{METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY}={metastore_client_max_concurrency_str}`");
None
}
})
.unwrap_or(DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY)
quickwit_common::get_from_env(
METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY,
DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY,
)
}

static CP_GRPC_CLIENT_METRICS_LAYER: Lazy<GrpcMetricsLayer> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::ops::Range;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{env, fmt, io};
use std::{fmt, io};

use anyhow::{anyhow, Context as AnyhhowContext};
use async_trait::async_trait;
Expand Down Expand Up @@ -59,11 +59,7 @@ use crate::{
/// Semaphore to limit the number of concurent requests to the object store. Some object stores
/// (R2, SeaweedFs...) return errors when too many concurrent requests are emitted.
static REQUEST_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| {
let num_permits: usize = env::var("QW_S3_MAX_CONCURRENCY")
.as_deref()
.unwrap_or("10000")
.parse()
.expect("QW_S3_MAX_CONCURRENCY value should be a number.");
let num_permits: usize = quickwit_common::get_from_env("QW_S3_MAX_CONCURRENCY", 10_000usize);
Semaphore::new(num_permits)
});

Expand Down

0 comments on commit bc71017

Please sign in to comment.