diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock
index 0b0e6abed17..d743700dfbf 100644
--- a/quickwit/Cargo.lock
+++ b/quickwit/Cargo.lock
@@ -4675,7 +4675,7 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "ownedbytes"
version = "0.7.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"stable_deref_trait",
]
@@ -8007,8 +8007,8 @@ dependencies = [
[[package]]
name = "tantivy"
-version = "0.22.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+version = "0.23.0"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"aho-corasick",
"arc-swap",
@@ -8060,7 +8060,7 @@ dependencies = [
[[package]]
name = "tantivy-bitpacker"
version = "0.6.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"bitpacking",
]
@@ -8068,7 +8068,7 @@ dependencies = [
[[package]]
name = "tantivy-columnar"
version = "0.3.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"downcast-rs",
"fastdivide",
@@ -8083,7 +8083,7 @@ dependencies = [
[[package]]
name = "tantivy-common"
version = "0.7.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"async-trait",
"byteorder",
@@ -8106,7 +8106,7 @@ dependencies = [
[[package]]
name = "tantivy-query-grammar"
version = "0.22.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"nom",
]
@@ -8114,7 +8114,7 @@ dependencies = [
[[package]]
name = "tantivy-sstable"
version = "0.3.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"tantivy-bitpacker",
"tantivy-common",
@@ -8125,7 +8125,7 @@ dependencies = [
[[package]]
name = "tantivy-stacker"
version = "0.3.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"murmurhash32",
"rand_distr",
@@ -8135,7 +8135,7 @@ dependencies = [
[[package]]
name = "tantivy-tokenizer-api"
version = "0.3.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"serde",
]
diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml
index e763ce8770e..7d9dca7249b 100644
--- a/quickwit/Cargo.toml
+++ b/quickwit/Cargo.toml
@@ -1,72 +1,72 @@
[workspace]
resolver = "2"
members = [
- "quickwit-actors",
- "quickwit-aws",
- "quickwit-cli",
- "quickwit-cluster",
- "quickwit-codegen",
- "quickwit-codegen/example",
- "quickwit-common",
- "quickwit-config",
- "quickwit-control-plane",
- "quickwit-index-management",
- "quickwit-datetime",
- "quickwit-directories",
- "quickwit-doc-mapper",
- "quickwit-indexing",
- "quickwit-ingest",
- "quickwit-integration-tests",
- "quickwit-jaeger",
- "quickwit-janitor",
- "quickwit-lambda",
- "quickwit-macros",
- "quickwit-metastore",
+ "quickwit-actors",
+ "quickwit-aws",
+ "quickwit-cli",
+ "quickwit-cluster",
+ "quickwit-codegen",
+ "quickwit-codegen/example",
+ "quickwit-common",
+ "quickwit-config",
+ "quickwit-control-plane",
+ "quickwit-index-management",
+ "quickwit-datetime",
+ "quickwit-directories",
+ "quickwit-doc-mapper",
+ "quickwit-indexing",
+ "quickwit-ingest",
+ "quickwit-integration-tests",
+ "quickwit-jaeger",
+ "quickwit-janitor",
+ "quickwit-lambda",
+ "quickwit-macros",
+ "quickwit-metastore",
- # Disabling metastore-utils from the quickwit projects to ease build/deps.
- # We can reenable it when we need it.
- # "quickwit-metastore-utils",
- "quickwit-opentelemetry",
- "quickwit-proto",
- "quickwit-query",
- "quickwit-rest-client",
- "quickwit-search",
- "quickwit-serve",
- "quickwit-storage",
- "quickwit-telemetry",
+ # Disabling metastore-utils from the quickwit projects to ease build/deps.
+ # We can reenable it when we need it.
+ # "quickwit-metastore-utils",
+ "quickwit-opentelemetry",
+ "quickwit-proto",
+ "quickwit-query",
+ "quickwit-rest-client",
+ "quickwit-search",
+ "quickwit-serve",
+ "quickwit-storage",
+ "quickwit-telemetry",
]
# The following list excludes `quickwit-metastore-utils` and `quickwit-lambda`
# from the default member to ease build/deps.
default-members = [
- "quickwit-actors",
- "quickwit-aws",
- "quickwit-cli",
- "quickwit-cluster",
- "quickwit-codegen",
- "quickwit-codegen/example",
- "quickwit-common",
- "quickwit-config",
- "quickwit-control-plane",
- "quickwit-datetime",
- "quickwit-directories",
- "quickwit-doc-mapper",
- "quickwit-index-management",
- "quickwit-indexing",
- "quickwit-ingest",
- "quickwit-integration-tests",
- "quickwit-jaeger",
- "quickwit-janitor",
- "quickwit-macros",
- "quickwit-metastore",
- "quickwit-opentelemetry",
- "quickwit-proto",
- "quickwit-query",
- "quickwit-rest-client",
- "quickwit-search",
- "quickwit-serve",
- "quickwit-storage",
- "quickwit-telemetry",
+ "quickwit-actors",
+ "quickwit-aws",
+ "quickwit-cli",
+ "quickwit-cluster",
+ "quickwit-codegen",
+ "quickwit-codegen/example",
+ "quickwit-common",
+ "quickwit-config",
+ "quickwit-control-plane",
+ "quickwit-datetime",
+ "quickwit-directories",
+ "quickwit-doc-mapper",
+ "quickwit-index-management",
+ "quickwit-indexing",
+ "quickwit-ingest",
+ "quickwit-integration-tests",
+ "quickwit-jaeger",
+ "quickwit-janitor",
+ "quickwit-macros",
+ "quickwit-metastore",
+ "quickwit-opentelemetry",
+ "quickwit-proto",
+ "quickwit-query",
+ "quickwit-rest-client",
+ "quickwit-search",
+ "quickwit-serve",
+ "quickwit-storage",
+ "quickwit-telemetry",
]
[workspace.package]
@@ -91,8 +91,8 @@ bytesize = { version = "1.3.0", features = ["serde"] }
bytestring = "1.3.0"
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "d039699" }
chrono = { version = "0.4", default-features = false, features = [
- "clock",
- "std",
+ "clock",
+ "std",
] }
clap = { version = "4.5.0", features = ["env", "string"] }
coarsetime = "0.1.33"
@@ -124,12 +124,12 @@ http = "0.2.9"
http-serde = "1.1.2"
humantime = "2.1.0"
hyper = { version = "0.14", features = [
- "client",
- "http1",
- "http2",
- "server",
- "stream",
- "tcp",
+ "client",
+ "http1",
+ "http2",
+ "server",
+ "stream",
+ "tcp",
] }
hyper-rustls = "0.24"
indexmap = { version = "2.1.0", features = ["serde"] }
@@ -141,12 +141,12 @@ lru = "0.12"
lindera-core = "0.27.0"
lindera-dictionary = "0.27.0"
lindera-tokenizer = { version = "0.27.0", features = [
- "cc-cedict-compress",
- "cc-cedict",
- "ipadic-compress",
- "ipadic",
- "ko-dic-compress",
- "ko-dic",
+ "cc-cedict-compress",
+ "cc-cedict",
+ "ipadic-compress",
+ "ipadic",
+ "ko-dic-compress",
+ "ko-dic",
] }
matches = "0.1.9"
md5 = "0.7"
@@ -167,7 +167,7 @@ percent-encoding = "2.3.1"
pin-project = "1.1.0"
pnet = { version = "0.33.0", features = ["std"] }
postcard = { version = "1.0.4", features = [
- "use-std",
+ "use-std",
], default-features = false }
predicates = "3"
prettyplease = "0.2.0"
@@ -175,37 +175,37 @@ proc-macro2 = "1.0.50"
prometheus = { version = "0.13", features = ["process"] }
proptest = "1"
prost = { version = "0.11.6", default-features = false, features = [
- "prost-derive",
+ "prost-derive",
] }
prost-build = "0.11.6"
prost-types = "0.11.6"
pulsar = { git = "https://github.com/quickwit-oss/pulsar-rs.git", rev = "f9eff04", default-features = false, features = [
- "auth-oauth2",
- "compression",
- "tokio-runtime",
+ "auth-oauth2",
+ "compression",
+ "tokio-runtime",
] }
quote = "1.0.23"
rand = "0.8"
rand_distr = "0.4"
-rayon = "1"
+rayon = "1.10"
rdkafka = { version = "0.33", default-features = false, features = [
- "cmake-build",
- "libz",
- "ssl",
- "tokio",
- "zstd",
+ "cmake-build",
+ "libz",
+ "ssl",
+ "tokio",
+ "zstd",
] }
regex = "1.10.0"
regex-syntax = "0.8"
reqwest = { version = "0.11", default-features = false, features = [
- "json",
- "rustls-tls",
+ "json",
+ "rustls-tls",
] }
rust-embed = "6.8.1"
sea-query = { version = "0.30" }
sea-query-binder = { version = "0.5", features = [
- "runtime-tokio-rustls",
- "sqlx-postgres",
+ "runtime-tokio-rustls",
+ "sqlx-postgres",
] }
# ^1.0.184 due to serde-rs/serde#2538
serde = { version = "1.0.184", features = ["derive", "rc"] }
@@ -216,10 +216,10 @@ serde_yaml = "0.9"
siphasher = "0.3"
smallvec = "1"
sqlx = { version = "0.7", features = [
- "migrate",
- "postgres",
- "runtime-tokio-rustls",
- "time",
+ "migrate",
+ "postgres",
+ "runtime-tokio-rustls",
+ "time",
] }
syn = { version = "2.0.11", features = ["extra-traits", "full", "parsing"] }
sync_wrapper = "0.1.2"
@@ -237,19 +237,19 @@ toml = "0.7.6"
tonic = { version = "0.9.0", features = ["gzip"] }
tonic-build = "0.9.0"
tower = { version = "0.4.13", features = [
- "balance",
- "buffer",
- "load",
- "retry",
- "util",
+ "balance",
+ "buffer",
+ "load",
+ "retry",
+ "util",
] }
tower-http = { version = "0.4.0", features = ["compression-gzip", "cors"] }
tracing = "0.1.37"
tracing-opentelemetry = "0.20.0"
tracing-subscriber = { version = "0.3.16", features = [
- "env-filter",
- "std",
- "time",
+ "env-filter",
+ "std",
+ "time",
] }
ttl_cache = "0.5"
typetag = "0.2"
@@ -258,10 +258,10 @@ username = "0.2"
utoipa = "4.2.0"
uuid = { version = "1.8", features = ["v4", "serde"] }
vrl = { version = "0.8.1", default-features = false, features = [
- "compiler",
- "diagnostic",
- "stdlib",
- "value",
+ "compiler",
+ "diagnostic",
+ "stdlib",
+ "value",
] }
warp = "0.3"
whichlang = { git = "https://github.com/quickwit-oss/whichlang", rev = "fe406416" }
@@ -279,10 +279,10 @@ aws-types = "1.2"
azure_core = { version = "0.13.0", features = ["enable_reqwest_rustls"] }
azure_storage = { version = "0.13.0", default-features = false, features = [
- "enable_reqwest_rustls",
+ "enable_reqwest_rustls",
] }
azure_storage_blobs = { version = "0.13.0", default-features = false, features = [
- "enable_reqwest_rustls",
+ "enable_reqwest_rustls",
] }
opendal = { version = "0.44", default-features = false }
@@ -317,11 +317,11 @@ quickwit-serve = { path = "quickwit-serve" }
quickwit-storage = { path = "quickwit-storage" }
quickwit-telemetry = { path = "quickwit-telemetry" }
-tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "9737b5f", default-features = false, features = [
- "lz4-compression",
- "mmap",
- "quickwit",
- "zstd-compression",
+tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "6181c1e", default-features = false, features = [
+ "lz4-compression",
+ "mmap",
+ "quickwit",
+ "zstd-compression",
] }
# This is actually not used directly the goal is to fix the version
diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml
index c26817e3b27..9b5609b14e1 100644
--- a/quickwit/quickwit-common/Cargo.toml
+++ b/quickwit/quickwit-common/Cargo.toml
@@ -30,7 +30,7 @@ pin-project = { workspace = true }
pnet = { workspace = true }
prometheus = { workspace = true }
rand = { workspace = true }
-rayon = { worskpace = true }
+rayon = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
siphasher = { workspace = true }
diff --git a/quickwit/quickwit-common/src/executor.rs b/quickwit/quickwit-common/src/executor.rs
new file mode 100644
index 00000000000..51d2d57251e
--- /dev/null
+++ b/quickwit/quickwit-common/src/executor.rs
@@ -0,0 +1,175 @@
+// Copyright (C) 2024 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use std::fmt;
+use std::sync::Arc;
+
+use once_cell::sync::Lazy;
+use prometheus::IntGauge;
+use tokio::sync::oneshot;
+use tracing::error;
+
+use crate::metrics::{new_gauge_vec, GaugeGuard, IntGaugeVec};
+
+/// An executor backed by a thread pool to run CPU intensive tasks.
+///
+/// tokio::spawn_blocking should only used for IO-bound tasks, as it has not limit on its
+/// thread count.
+#[derive(Clone)]
+pub struct Executor {
+ thread_pool: Arc,
+ active_threads_gauge: IntGauge,
+}
+
+impl Executor {
+ pub fn new(name: &'static str, num_threads_opt: Option) -> Executor {
+ let mut rayon_pool_builder = rayon::ThreadPoolBuilder::new()
+ .thread_name(move |thread_id| format!("quickwit-{name}-{thread_id}"))
+ .panic_handler(|_my_panic| {
+ error!("task running in the quickwit search pool panicked");
+ });
+ if let Some(num_threads) = num_threads_opt {
+ rayon_pool_builder = rayon_pool_builder.num_threads(num_threads);
+ }
+ let thread_pool = rayon_pool_builder
+ .build()
+ .expect("Failed to spawn the spawning pool");
+ let active_threads_gauge = ACTIVE_THREAD_COUNT.with_label_values([name]);
+ Executor {
+ thread_pool: Arc::new(thread_pool),
+ active_threads_gauge,
+ }
+ }
+
+ pub fn get_underlying_rayon_thread_pool(&self) -> Arc {
+ self.thread_pool.clone()
+ }
+
+ /// Function similar to `tokio::spawn_blocking`.
+ ///
+ /// Here are two important differences however:
+ ///
+ /// 1) The task is running on a rayon thread pool managed by quickwit.
+ /// This pool is specifically used only to run CPU intensive work
+ /// and is configured to contain `num_cpus` cores.
+ ///
+ /// 2) Before the task is effectively scheduled, we check that
+ /// the spawner is still interested by its result.
+ ///
+ /// It is therefore required to `await` the result of this
+ /// function to get anywork done.
+ ///
+ /// This is nice, because it makes work that has been scheduled
+ /// but is not running yet "cancellable".
+ pub async fn run_cpu_intensive(&self, cpu_heavy_task: F) -> Result
+ where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+ {
+ let span = tracing::Span::current();
+ let gauge = self.active_threads_gauge.clone();
+ let (tx, rx) = oneshot::channel();
+ self.thread_pool.spawn(move || {
+ if tx.is_closed() {
+ return;
+ }
+ let _guard = span.enter();
+ let mut active_thread_guard = GaugeGuard::from_gauge(&gauge);
+ active_thread_guard.add(1i64);
+ let result = cpu_heavy_task();
+ let _ = tx.send(result);
+ });
+ rx.await.map_err(|_| Panicked)
+ }
+}
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub struct Panicked;
+
+impl fmt::Display for Panicked {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Scheduled job panicked")
+ }
+}
+
+impl std::error::Error for Panicked {}
+
+pub static ACTIVE_THREAD_COUNT: Lazy> = Lazy::new(|| {
+ new_gauge_vec(
+ "active_thread_count",
+ "Number of active threads in a given thread pool.",
+ "thread_pool",
+ &[],
+ ["pool"],
+ )
+});
+
+#[cfg(test)]
+mod tests {
+ use std::sync::atomic::{AtomicU64, Ordering};
+ use std::sync::Arc;
+ use std::time::Duration;
+
+ use super::*;
+
+ fn test_executor() -> &'static Executor {
+ static TEST_EXECUTOR: std::sync::OnceLock = std::sync::OnceLock::new();
+ &*TEST_EXECUTOR.get_or_init(|| Executor::new("test", Some(1)))
+ }
+
+ #[tokio::test]
+ async fn test_run_cpu_intensive() {
+ assert_eq!(test_executor().run_cpu_intensive(|| 1).await, Ok(1));
+ }
+
+ #[tokio::test]
+ async fn test_run_cpu_intensive_panicks() {
+ assert!(test_executor()
+ .run_cpu_intensive(|| panic!(""))
+ .await
+ .is_err());
+ }
+
+ #[tokio::test]
+ async fn test_run_cpu_intensive_panicks_do_not_shrink_thread_pool() {
+ for _ in 0..100 {
+ assert!(test_executor()
+ .run_cpu_intensive(|| panic!(""))
+ .await
+ .is_err());
+ }
+ }
+
+ #[tokio::test]
+ async fn test_run_cpu_intensive_abort() {
+ let counter: Arc = Default::default();
+ let mut futures = Vec::new();
+ for _ in 0..1_000 {
+ let counter_clone = counter.clone();
+ let fut = test_executor().run_cpu_intensive(move || {
+ std::thread::sleep(Duration::from_millis(5));
+ counter_clone.fetch_add(1, Ordering::SeqCst)
+ });
+ // The first few num_cores tasks should run, but the other should get cancelled.
+ futures.push(tokio::time::timeout(Duration::from_millis(1), fut));
+ }
+ futures::future::join_all(futures).await;
+ assert!(counter.load(Ordering::SeqCst) < 100);
+ }
+}
diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs
index e3a0b57a211..4453826a77a 100644
--- a/quickwit/quickwit-common/src/lib.rs
+++ b/quickwit/quickwit-common/src/lib.rs
@@ -22,6 +22,7 @@
mod coolid;
pub mod binary_heap;
+mod executor;
pub mod fs;
pub mod io;
mod kill_switch;
@@ -43,7 +44,6 @@ pub mod stream_utils;
pub mod temp_dir;
#[cfg(any(test, feature = "testsuite"))]
pub mod test_utils;
-mod executor;
pub mod tower;
pub mod type_map;
pub mod uri;
diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs
index 30ab074ee2d..c6b1dd8c18a 100644
--- a/quickwit/quickwit-common/src/metrics.rs
+++ b/quickwit/quickwit-common/src/metrics.rs
@@ -361,13 +361,3 @@ impl InFlightDataGauges {
}
pub static MEMORY_METRICS: Lazy = Lazy::new(MemoryMetrics::default);
-
-pub static ACTIVE_THREAD_COUNT: Lazy> = Lazy::new(|| {
- new_gauge_vec(
- "active_thread_count",
- "Number of active threads in a given thread pool.",
- "threads",
- &[],
- ["pool"]
- )
-});
diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs
index 13143b8eb85..93b8bb47061 100644
--- a/quickwit/quickwit-indexing/src/actors/indexer.rs
+++ b/quickwit/quickwit-indexing/src/actors/indexer.rs
@@ -361,8 +361,8 @@ struct IndexingWorkbench {
// We use this value to set the `delete_opstamp` of the workbench splits.
last_delete_opstamp: u64,
// Number of bytes declared as used by tantivy.
- memory_usage: GaugeGuard,
- split_builders_guard: GaugeGuard,
+ memory_usage: GaugeGuard<'static>,
+ split_builders_guard: GaugeGuard<'static>,
cooperative_indexing_period: Option,
}
diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs
index 0dff364c6f3..6236728d99c 100644
--- a/quickwit/quickwit-indexing/src/models/indexed_split.rs
+++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs
@@ -184,8 +184,8 @@ pub struct IndexedSplitBatchBuilder {
pub publish_token_opt: Option,
pub commit_trigger: CommitTrigger,
pub batch_parent_span: Span,
- pub memory_usage: GaugeGuard,
- pub _split_builders_guard: GaugeGuard,
+ pub memory_usage: GaugeGuard<'static>,
+ pub _split_builders_guard: GaugeGuard<'static>,
}
/// Sends notifications to the Publisher that the last batch of splits was emtpy.
diff --git a/quickwit/quickwit-indexing/src/models/processed_doc.rs b/quickwit/quickwit-indexing/src/models/processed_doc.rs
index cd9d1df0e24..14a75298d29 100644
--- a/quickwit/quickwit-indexing/src/models/processed_doc.rs
+++ b/quickwit/quickwit-indexing/src/models/processed_doc.rs
@@ -46,7 +46,7 @@ pub struct ProcessedDocBatch {
pub docs: Vec,
pub checkpoint_delta: SourceCheckpointDelta,
pub force_commit: bool,
- _gauge_guard: GaugeGuard,
+ _gauge_guard: GaugeGuard<'static>,
}
impl ProcessedDocBatch {
diff --git a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs
index f9cf0720c0b..8882c51e961 100644
--- a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs
+++ b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs
@@ -29,7 +29,7 @@ pub struct RawDocBatch {
pub docs: Vec,
pub checkpoint_delta: SourceCheckpointDelta,
pub force_commit: bool,
- _gauge_guard: GaugeGuard,
+ _gauge_guard: GaugeGuard<'static>,
}
impl RawDocBatch {
diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs
index f42fbd444a9..1803dc6a7f5 100644
--- a/quickwit/quickwit-indexing/src/source/mod.rs
+++ b/quickwit/quickwit-indexing/src/source/mod.rs
@@ -485,7 +485,7 @@ pub(super) struct BatchBuilder {
num_bytes: u64,
checkpoint_delta: SourceCheckpointDelta,
force_commit: bool,
- gauge_guard: GaugeGuard,
+ gauge_guard: GaugeGuard<'static>,
}
impl BatchBuilder {
diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs
index c4411e8ea9d..ffa7bba4450 100644
--- a/quickwit/quickwit-search/src/fetch_docs.rs
+++ b/quickwit/quickwit-search/src/fetch_docs.rs
@@ -184,7 +184,9 @@ async fn fetch_docs_in_split(
.context("open-index-for-split")?;
// we add an executor here, we could add it in open_index_with_caches, though we should verify
// the side-effect before
- let executor_tantivy = crate::search_executor().get_underlying_rayon_thread_pool().into();
+ let executor_tantivy = crate::search_executor()
+ .get_underlying_rayon_thread_pool()
+ .into();
index.set_executor(executor_tantivy);
let index_reader = index
.reader_builder()
diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs
index 1f859cdc740..f563691d3fd 100644
--- a/quickwit/quickwit-search/src/leaf.rs
+++ b/quickwit/quickwit-search/src/leaf.rs
@@ -378,14 +378,15 @@ async fn leaf_search_single_split(
warmup(&searcher, &warmup_info).await?;
let span = info_span!("tantivy_search");
- let leaf_search_response = crate::search_executor().run_cpu_intensive(move || {
- let _span_guard = span.enter();
- searcher.search(&query, &quickwit_collector)
- })
- .await
- .map_err(|_| {
- crate::SearchError::Internal(format!("leaf search panicked. split={split_id}"))
- })??;
+ let leaf_search_response = crate::search_executor()
+ .run_cpu_intensive(move || {
+ let _span_guard = span.enter();
+ searcher.search(&query, &quickwit_collector)
+ })
+ .await
+ .map_err(|_| {
+ crate::SearchError::Internal(format!("leaf search panicked. split={split_id}"))
+ })??;
searcher_context
.leaf_search_cache
@@ -921,7 +922,8 @@ pub async fn leaf_search(
}
}
- crate::search_executor().run_cpu_intensive(|| incremental_merge_collector.finalize().map_err(Into::into))
+ crate::search_executor()
+ .run_cpu_intensive(|| incremental_merge_collector.finalize().map_err(Into::into))
.instrument(info_span!("incremental_merge_finalize"))
.await
.context("failed to merge split search responses")?
diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs
index 921e782a483..88f11b62eba 100644
--- a/quickwit/quickwit-search/src/lib.rs
+++ b/quickwit/quickwit-search/src/lib.rs
@@ -45,10 +45,9 @@ pub(crate) mod top_k_collector;
mod metrics;
-
fn search_executor() -> &'static Executor {
- static SEARCH_EXECUTOR: OnceCell = OnceCell::new();
- &*SEARCH_EXECUTOR.get_or_init(|| quickwit_common::Executor::new("quickwit-search", None))
+ static SEARCH_EXECUTOR: OnceLock = OnceLock::new();
+ &*SEARCH_EXECUTOR.get_or_init(|| quickwit_common::Executor::new("search", None))
}
#[cfg(test)]
@@ -56,7 +55,6 @@ mod tests;
pub use collector::QuickwitAggregations;
use metrics::SEARCH_METRICS;
-use once_cell::sync::OnceCell;
use quickwit_common::tower::Pool;
use quickwit_common::Executor;
use quickwit_doc_mapper::DocMapper;
@@ -69,7 +67,7 @@ use tantivy::schema::NamedFieldDocument;
pub type Result = std::result::Result;
use std::net::{Ipv4Addr, SocketAddr};
-use std::sync::Arc;
+use std::sync::{Arc, OnceLock};
pub use find_trace_ids_collector::FindTraceIdsCollector;
use quickwit_config::SearcherConfig;
diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs
index 39ac1a4e890..47e06129d61 100644
--- a/quickwit/quickwit-search/src/metrics.rs
+++ b/quickwit/quickwit-search/src/metrics.rs
@@ -20,9 +20,7 @@
// See https://prometheus.io/docs/practices/naming/
use once_cell::sync::Lazy;
-use quickwit_common::metrics::{
- new_counter, new_histogram, Histogram, IntCounter
-};
+use quickwit_common::metrics::{new_counter, new_histogram, Histogram, IntCounter};
pub struct SearchMetrics {
pub leaf_searches_splits_total: IntCounter,
diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs
index e24c53b95b1..28c02b14756 100644
--- a/quickwit/quickwit-search/src/root.rs
+++ b/quickwit/quickwit-search/src/root.rs
@@ -667,13 +667,14 @@ pub(crate) async fn search_partial_hits_phase(
let leaf_search_responses: Vec> =
leaf_search_responses.into_iter().map(Ok).collect_vec();
let span = info_span!("merge_fruits");
- let leaf_search_response = crate::search_executor().run_cpu_intensive(move || {
- let _span_guard = span.enter();
- merge_collector.merge_fruits(leaf_search_responses)
- })
- .await
- .context("failed to merge leaf search responses")?
- .map_err(|error: TantivyError| crate::SearchError::Internal(error.to_string()))?;
+ let leaf_search_response = crate::search_executor()
+ .run_cpu_intensive(move || {
+ let _span_guard = span.enter();
+ merge_collector.merge_fruits(leaf_search_responses)
+ })
+ .await
+ .context("failed to merge leaf search responses")?
+ .map_err(|error: TantivyError| crate::SearchError::Internal(error.to_string()))?;
debug!(
num_hits = leaf_search_response.num_hits,
failed_splits = ?leaf_search_response.failed_splits,
diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs
index 37e68074949..39627a48218 100644
--- a/quickwit/quickwit-serve/src/decompression.rs
+++ b/quickwit/quickwit-serve/src/decompression.rs
@@ -22,11 +22,16 @@ use std::io::Read;
use bytes::Bytes;
use flate2::read::GzDecoder;
use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
+use quickwit_common::Executor;
use thiserror::Error;
-use tokio::task;
use warp::reject::Reject;
use warp::Filter;
+fn decompression_executor() -> &'static Executor {
+ static DECOMPRESSION_POOL: std::sync::OnceLock = std::sync::OnceLock::new();
+ &*DECOMPRESSION_POOL.get_or_init(|| Executor::new("decompression", Some(2)))
+}
+
/// There are two ways to decompress the body:
/// - Stream the body through an async decompressor
/// - Fetch the body and then decompress the bytes
@@ -37,26 +42,28 @@ use warp::Filter;
async fn decompress_body(encoding: Option, body: Bytes) -> Result {
match encoding.as_deref() {
Some("gzip" | "x-gzip") => {
- let decompressed = task::spawn_blocking(move || {
- let mut decompressed = Vec::new();
- let mut decoder = GzDecoder::new(body.as_ref());
- decoder
- .read_to_end(&mut decompressed)
- .map_err(|_| warp::reject::custom(CorruptedData))?;
- Result::<_, warp::Rejection>::Ok(Bytes::from(decompressed))
- })
- .await
- .map_err(|_| warp::reject::custom(CorruptedData))??;
+ let decompressed = decompression_executor()
+ .run_cpu_intensive(move || {
+ let mut decompressed = Vec::new();
+ let mut decoder = GzDecoder::new(body.as_ref());
+ decoder
+ .read_to_end(&mut decompressed)
+ .map_err(|_| warp::reject::custom(CorruptedData))?;
+ Result::<_, warp::Rejection>::Ok(Bytes::from(decompressed))
+ })
+ .await
+ .map_err(|_| warp::reject::custom(CorruptedData))??;
Ok(decompressed)
}
Some("zstd") => {
- let decompressed = task::spawn_blocking(move || {
- zstd::decode_all(body.as_ref())
- .map(Bytes::from)
- .map_err(|_| warp::reject::custom(CorruptedData))
- })
- .await
- .map_err(|_| warp::reject::custom(CorruptedData))??;
+ let decompressed = decompression_executor()
+ .run_cpu_intensive(move || {
+ zstd::decode_all(body.as_ref())
+ .map(Bytes::from)
+ .map_err(|_| warp::reject::custom(CorruptedData))
+ })
+ .await
+ .map_err(|_| warp::reject::custom(CorruptedData))??;
Ok(decompressed)
}
Some(encoding) => Err(warp::reject::custom(UnsupportedEncoding(
@@ -89,7 +96,7 @@ pub(crate) fn get_body_bytes() -> impl Filter,
}
impl From for Body {