diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock
index 0b0e6abed17..d743700dfbf 100644
--- a/quickwit/Cargo.lock
+++ b/quickwit/Cargo.lock
@@ -4675,7 +4675,7 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "ownedbytes"
version = "0.7.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"stable_deref_trait",
]
@@ -8007,8 +8007,8 @@ dependencies = [
[[package]]
name = "tantivy"
-version = "0.22.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+version = "0.23.0"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"aho-corasick",
"arc-swap",
@@ -8060,7 +8060,7 @@ dependencies = [
[[package]]
name = "tantivy-bitpacker"
version = "0.6.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"bitpacking",
]
@@ -8068,7 +8068,7 @@ dependencies = [
[[package]]
name = "tantivy-columnar"
version = "0.3.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"downcast-rs",
"fastdivide",
@@ -8083,7 +8083,7 @@ dependencies = [
[[package]]
name = "tantivy-common"
version = "0.7.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"async-trait",
"byteorder",
@@ -8106,7 +8106,7 @@ dependencies = [
[[package]]
name = "tantivy-query-grammar"
version = "0.22.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"nom",
]
@@ -8114,7 +8114,7 @@ dependencies = [
[[package]]
name = "tantivy-sstable"
version = "0.3.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"tantivy-bitpacker",
"tantivy-common",
@@ -8125,7 +8125,7 @@ dependencies = [
[[package]]
name = "tantivy-stacker"
version = "0.3.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"murmurhash32",
"rand_distr",
@@ -8135,7 +8135,7 @@ dependencies = [
[[package]]
name = "tantivy-tokenizer-api"
version = "0.3.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d"
dependencies = [
"serde",
]
diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml
index e763ce8770e..daf3cac5850 100644
--- a/quickwit/Cargo.toml
+++ b/quickwit/Cargo.toml
@@ -1,72 +1,72 @@
[workspace]
resolver = "2"
members = [
- "quickwit-actors",
- "quickwit-aws",
- "quickwit-cli",
- "quickwit-cluster",
- "quickwit-codegen",
- "quickwit-codegen/example",
- "quickwit-common",
- "quickwit-config",
- "quickwit-control-plane",
- "quickwit-index-management",
- "quickwit-datetime",
- "quickwit-directories",
- "quickwit-doc-mapper",
- "quickwit-indexing",
- "quickwit-ingest",
- "quickwit-integration-tests",
- "quickwit-jaeger",
- "quickwit-janitor",
- "quickwit-lambda",
- "quickwit-macros",
- "quickwit-metastore",
+ "quickwit-actors",
+ "quickwit-aws",
+ "quickwit-cli",
+ "quickwit-cluster",
+ "quickwit-codegen",
+ "quickwit-codegen/example",
+ "quickwit-common",
+ "quickwit-config",
+ "quickwit-control-plane",
+ "quickwit-index-management",
+ "quickwit-datetime",
+ "quickwit-directories",
+ "quickwit-doc-mapper",
+ "quickwit-indexing",
+ "quickwit-ingest",
+ "quickwit-integration-tests",
+ "quickwit-jaeger",
+ "quickwit-janitor",
+ "quickwit-lambda",
+ "quickwit-macros",
+ "quickwit-metastore",
- # Disabling metastore-utils from the quickwit projects to ease build/deps.
- # We can reenable it when we need it.
- # "quickwit-metastore-utils",
- "quickwit-opentelemetry",
- "quickwit-proto",
- "quickwit-query",
- "quickwit-rest-client",
- "quickwit-search",
- "quickwit-serve",
- "quickwit-storage",
- "quickwit-telemetry",
+ # Disabling metastore-utils from the quickwit projects to ease build/deps.
+ # We can reenable it when we need it.
+ # "quickwit-metastore-utils",
+ "quickwit-opentelemetry",
+ "quickwit-proto",
+ "quickwit-query",
+ "quickwit-rest-client",
+ "quickwit-search",
+ "quickwit-serve",
+ "quickwit-storage",
+ "quickwit-telemetry",
]
# The following list excludes `quickwit-metastore-utils` and `quickwit-lambda`
# from the default member to ease build/deps.
default-members = [
- "quickwit-actors",
- "quickwit-aws",
- "quickwit-cli",
- "quickwit-cluster",
- "quickwit-codegen",
- "quickwit-codegen/example",
- "quickwit-common",
- "quickwit-config",
- "quickwit-control-plane",
- "quickwit-datetime",
- "quickwit-directories",
- "quickwit-doc-mapper",
- "quickwit-index-management",
- "quickwit-indexing",
- "quickwit-ingest",
- "quickwit-integration-tests",
- "quickwit-jaeger",
- "quickwit-janitor",
- "quickwit-macros",
- "quickwit-metastore",
- "quickwit-opentelemetry",
- "quickwit-proto",
- "quickwit-query",
- "quickwit-rest-client",
- "quickwit-search",
- "quickwit-serve",
- "quickwit-storage",
- "quickwit-telemetry",
+ "quickwit-actors",
+ "quickwit-aws",
+ "quickwit-cli",
+ "quickwit-cluster",
+ "quickwit-codegen",
+ "quickwit-codegen/example",
+ "quickwit-common",
+ "quickwit-config",
+ "quickwit-control-plane",
+ "quickwit-datetime",
+ "quickwit-directories",
+ "quickwit-doc-mapper",
+ "quickwit-index-management",
+ "quickwit-indexing",
+ "quickwit-ingest",
+ "quickwit-integration-tests",
+ "quickwit-jaeger",
+ "quickwit-janitor",
+ "quickwit-macros",
+ "quickwit-metastore",
+ "quickwit-opentelemetry",
+ "quickwit-proto",
+ "quickwit-query",
+ "quickwit-rest-client",
+ "quickwit-search",
+ "quickwit-serve",
+ "quickwit-storage",
+ "quickwit-telemetry",
]
[workspace.package]
@@ -91,8 +91,8 @@ bytesize = { version = "1.3.0", features = ["serde"] }
bytestring = "1.3.0"
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "d039699" }
chrono = { version = "0.4", default-features = false, features = [
- "clock",
- "std",
+ "clock",
+ "std",
] }
clap = { version = "4.5.0", features = ["env", "string"] }
coarsetime = "0.1.33"
@@ -124,12 +124,12 @@ http = "0.2.9"
http-serde = "1.1.2"
humantime = "2.1.0"
hyper = { version = "0.14", features = [
- "client",
- "http1",
- "http2",
- "server",
- "stream",
- "tcp",
+ "client",
+ "http1",
+ "http2",
+ "server",
+ "stream",
+ "tcp",
] }
hyper-rustls = "0.24"
indexmap = { version = "2.1.0", features = ["serde"] }
@@ -141,12 +141,12 @@ lru = "0.12"
lindera-core = "0.27.0"
lindera-dictionary = "0.27.0"
lindera-tokenizer = { version = "0.27.0", features = [
- "cc-cedict-compress",
- "cc-cedict",
- "ipadic-compress",
- "ipadic",
- "ko-dic-compress",
- "ko-dic",
+ "cc-cedict-compress",
+ "cc-cedict",
+ "ipadic-compress",
+ "ipadic",
+ "ko-dic-compress",
+ "ko-dic",
] }
matches = "0.1.9"
md5 = "0.7"
@@ -167,7 +167,7 @@ percent-encoding = "2.3.1"
pin-project = "1.1.0"
pnet = { version = "0.33.0", features = ["std"] }
postcard = { version = "1.0.4", features = [
- "use-std",
+ "use-std",
], default-features = false }
predicates = "3"
prettyplease = "0.2.0"
@@ -175,37 +175,37 @@ proc-macro2 = "1.0.50"
prometheus = { version = "0.13", features = ["process"] }
proptest = "1"
prost = { version = "0.11.6", default-features = false, features = [
- "prost-derive",
+ "prost-derive",
] }
prost-build = "0.11.6"
prost-types = "0.11.6"
pulsar = { git = "https://github.com/quickwit-oss/pulsar-rs.git", rev = "f9eff04", default-features = false, features = [
- "auth-oauth2",
- "compression",
- "tokio-runtime",
+ "auth-oauth2",
+ "compression",
+ "tokio-runtime",
] }
quote = "1.0.23"
rand = "0.8"
rand_distr = "0.4"
-rayon = "1"
rdkafka = { version = "0.33", default-features = false, features = [
- "cmake-build",
- "libz",
- "ssl",
- "tokio",
- "zstd",
+ "cmake-build",
+ "libz",
+ "ssl",
+ "tokio",
+ "zstd",
] }
+rayon = "1.4"
regex = "1.10.0"
regex-syntax = "0.8"
reqwest = { version = "0.11", default-features = false, features = [
- "json",
- "rustls-tls",
+ "json",
+ "rustls-tls",
] }
rust-embed = "6.8.1"
sea-query = { version = "0.30" }
sea-query-binder = { version = "0.5", features = [
- "runtime-tokio-rustls",
- "sqlx-postgres",
+ "runtime-tokio-rustls",
+ "sqlx-postgres",
] }
# ^1.0.184 due to serde-rs/serde#2538
serde = { version = "1.0.184", features = ["derive", "rc"] }
@@ -216,10 +216,10 @@ serde_yaml = "0.9"
siphasher = "0.3"
smallvec = "1"
sqlx = { version = "0.7", features = [
- "migrate",
- "postgres",
- "runtime-tokio-rustls",
- "time",
+ "migrate",
+ "postgres",
+ "runtime-tokio-rustls",
+ "time",
] }
syn = { version = "2.0.11", features = ["extra-traits", "full", "parsing"] }
sync_wrapper = "0.1.2"
@@ -237,19 +237,19 @@ toml = "0.7.6"
tonic = { version = "0.9.0", features = ["gzip"] }
tonic-build = "0.9.0"
tower = { version = "0.4.13", features = [
- "balance",
- "buffer",
- "load",
- "retry",
- "util",
+ "balance",
+ "buffer",
+ "load",
+ "retry",
+ "util",
] }
tower-http = { version = "0.4.0", features = ["compression-gzip", "cors"] }
tracing = "0.1.37"
tracing-opentelemetry = "0.20.0"
tracing-subscriber = { version = "0.3.16", features = [
- "env-filter",
- "std",
- "time",
+ "env-filter",
+ "std",
+ "time",
] }
ttl_cache = "0.5"
typetag = "0.2"
@@ -258,10 +258,10 @@ username = "0.2"
utoipa = "4.2.0"
uuid = { version = "1.8", features = ["v4", "serde"] }
vrl = { version = "0.8.1", default-features = false, features = [
- "compiler",
- "diagnostic",
- "stdlib",
- "value",
+ "compiler",
+ "diagnostic",
+ "stdlib",
+ "value",
] }
warp = "0.3"
whichlang = { git = "https://github.com/quickwit-oss/whichlang", rev = "fe406416" }
@@ -279,10 +279,10 @@ aws-types = "1.2"
azure_core = { version = "0.13.0", features = ["enable_reqwest_rustls"] }
azure_storage = { version = "0.13.0", default-features = false, features = [
- "enable_reqwest_rustls",
+ "enable_reqwest_rustls",
] }
azure_storage_blobs = { version = "0.13.0", default-features = false, features = [
- "enable_reqwest_rustls",
+ "enable_reqwest_rustls",
] }
opendal = { version = "0.44", default-features = false }
@@ -317,11 +317,11 @@ quickwit-serve = { path = "quickwit-serve" }
quickwit-storage = { path = "quickwit-storage" }
quickwit-telemetry = { path = "quickwit-telemetry" }
-tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "9737b5f", default-features = false, features = [
- "lz4-compression",
- "mmap",
- "quickwit",
- "zstd-compression",
+tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "6181c1e", default-features = false, features = [
+ "lz4-compression",
+ "mmap",
+ "quickwit",
+ "zstd-compression",
] }
# This is actually not used directly the goal is to fix the version
diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml
index c26817e3b27..9b5609b14e1 100644
--- a/quickwit/quickwit-common/Cargo.toml
+++ b/quickwit/quickwit-common/Cargo.toml
@@ -30,7 +30,7 @@ pin-project = { workspace = true }
pnet = { workspace = true }
prometheus = { workspace = true }
rand = { workspace = true }
-rayon = { worskpace = true }
+rayon = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
siphasher = { workspace = true }
diff --git a/quickwit/quickwit-common/src/executor.rs b/quickwit/quickwit-common/src/executor.rs
new file mode 100644
index 00000000000..3ee69a38ee2
--- /dev/null
+++ b/quickwit/quickwit-common/src/executor.rs
@@ -0,0 +1,153 @@
+// Copyright (C) 2024 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use std::fmt;
+use std::sync::Arc;
+
+use prometheus::IntGauge;
+use tokio::sync::oneshot;
+use crate::metrics::GaugeGuard;
+use tracing::error;
+
+#[derive(Clone)]
+pub struct Executor {
+ thread_pool: Arc,
+ active_threads_gauge: IntGauge,
+}
+
+impl Executor {
+ pub fn new(name: &'static str, num_threads_opt: Option) -> Executor {
+ let mut rayon_pool_builder = rayon::ThreadPoolBuilder::new()
+ .thread_name(move |thread_id| format!("quickwit-{name}-{thread_id}"))
+ .panic_handler(|_my_panic| {
+ error!("task running in the quickwit search pool panicked");
+ });
+ if let Some(num_threads) = num_threads_opt {
+ rayon_pool_builder = rayon_pool_builder.num_threads(num_threads);
+ }
+ let thread_pool = rayon_pool_builder.build()
+ .expect("Failed to spawn the spawning pool");
+ // let tantivy_executor = TantivyExecutor::from(Arc::new(rayon_pool));
+ let active_threads_gauge = crate::metrics::ACTIVE_THREAD_COUNT.with_label_values([name]);
+ Executor {
+ thread_pool: Arc::new(thread_pool),
+ active_threads_gauge,
+ }
+ }
+
+ pub fn get_underlying_rayon_thread_pool(&self) -> Arc {
+ self.thread_pool.clone()
+ }
+
+ /// Function similar to `tokio::spawn_blocking`.
+ ///
+ /// Here are two important differences however:
+ ///
+ /// 1) The task is running on a rayon thread pool managed by quickwit.
+ /// This pool is specifically used only to run CPU intensive work
+ /// and is configured to contain `num_cpus` cores.
+ ///
+ /// 2) Before the task is effectively scheduled, we check that
+ /// the spawner is still interested by its result.
+ ///
+ /// It is therefore required to `await` the result of this
+ /// function to get anywork done.
+ ///
+ /// This is nice, because it makes work that has been scheduled
+ /// but is not running yet "cancellable".
+ pub async fn run_cpu_intensive(&self, cpu_heavy_task: F) -> Result
+ where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+ {
+ let span = tracing::Span::current();
+ let gauge = self.active_threads_gauge.clone();
+ let (tx, rx) = oneshot::channel();
+ self.thread_pool.spawn(move || {
+ if tx.is_closed() {
+ return;
+ }
+ let _guard = span.enter();
+ let mut active_thread_guard = GaugeGuard::from_gauge(&gauge);
+ active_thread_guard.add(1i64);
+ let result = cpu_heavy_task();
+ let _ = tx.send(result);
+ });
+ rx.await.map_err(|_| Panicked)
+ }
+}
+
+// pub(crate) fn search_executor() -> Arc {
+// SEARCH_THREAD_POOL.get_or_init(build_executor).clone()
+// }
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub struct Panicked;
+
+impl fmt::Display for Panicked {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Scheduled job panicked")
+ }
+}
+
+impl std::error::Error for Panicked {}
+
+
+#[cfg(test)]
+mod tests {
+ use std::sync::atomic::{AtomicU64, Ordering};
+ use std::sync::Arc;
+ use std::time::Duration;
+
+ use super::*;
+
+ #[tokio::test]
+ async fn test_run_cpu_intensive() {
+ assert_eq!(run_cpu_intensive(|| 1).await, Ok(1));
+ }
+
+ #[tokio::test]
+ async fn test_run_cpu_intensive_panicks() {
+ assert!(run_cpu_intensive(|| panic!("")).await.is_err());
+ }
+
+ #[tokio::test]
+ async fn test_run_cpu_intensive_panicks_do_not_shrink_thread_pool() {
+ for _ in 0..100 {
+ assert!(run_cpu_intensive(|| panic!("")).await.is_err());
+ }
+ }
+
+ #[tokio::test]
+ async fn test_run_cpu_intensive_abort() {
+ let counter: Arc = Default::default();
+ let mut futures = Vec::new();
+ for _ in 0..1_000 {
+ let counter_clone = counter.clone();
+ let fut = run_cpu_intensive(move || {
+ std::thread::sleep(Duration::from_millis(5));
+ counter_clone.fetch_add(1, Ordering::SeqCst)
+ });
+ // The first few num_cores tasks should run, but the other should get cancelled.
+ futures.push(tokio::time::timeout(Duration::from_millis(1), fut));
+ }
+ futures::future::join_all(futures).await;
+ assert!(counter.load(Ordering::SeqCst) < 100);
+ }
+}
diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs
index 13143b8eb85..93b8bb47061 100644
--- a/quickwit/quickwit-indexing/src/actors/indexer.rs
+++ b/quickwit/quickwit-indexing/src/actors/indexer.rs
@@ -361,8 +361,8 @@ struct IndexingWorkbench {
// We use this value to set the `delete_opstamp` of the workbench splits.
last_delete_opstamp: u64,
// Number of bytes declared as used by tantivy.
- memory_usage: GaugeGuard,
- split_builders_guard: GaugeGuard,
+ memory_usage: GaugeGuard<'static>,
+ split_builders_guard: GaugeGuard<'static>,
cooperative_indexing_period: Option,
}
diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs
index 0dff364c6f3..6236728d99c 100644
--- a/quickwit/quickwit-indexing/src/models/indexed_split.rs
+++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs
@@ -184,8 +184,8 @@ pub struct IndexedSplitBatchBuilder {
pub publish_token_opt: Option,
pub commit_trigger: CommitTrigger,
pub batch_parent_span: Span,
- pub memory_usage: GaugeGuard,
- pub _split_builders_guard: GaugeGuard,
+ pub memory_usage: GaugeGuard<'static>,
+ pub _split_builders_guard: GaugeGuard<'static>,
}
/// Sends notifications to the Publisher that the last batch of splits was emtpy.
diff --git a/quickwit/quickwit-indexing/src/models/processed_doc.rs b/quickwit/quickwit-indexing/src/models/processed_doc.rs
index cd9d1df0e24..14a75298d29 100644
--- a/quickwit/quickwit-indexing/src/models/processed_doc.rs
+++ b/quickwit/quickwit-indexing/src/models/processed_doc.rs
@@ -46,7 +46,7 @@ pub struct ProcessedDocBatch {
pub docs: Vec,
pub checkpoint_delta: SourceCheckpointDelta,
pub force_commit: bool,
- _gauge_guard: GaugeGuard,
+ _gauge_guard: GaugeGuard<'static>,
}
impl ProcessedDocBatch {
diff --git a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs
index f9cf0720c0b..8882c51e961 100644
--- a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs
+++ b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs
@@ -29,7 +29,7 @@ pub struct RawDocBatch {
pub docs: Vec,
pub checkpoint_delta: SourceCheckpointDelta,
pub force_commit: bool,
- _gauge_guard: GaugeGuard,
+ _gauge_guard: GaugeGuard<'static>,
}
impl RawDocBatch {
diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs
index f42fbd444a9..1803dc6a7f5 100644
--- a/quickwit/quickwit-indexing/src/source/mod.rs
+++ b/quickwit/quickwit-indexing/src/source/mod.rs
@@ -485,7 +485,7 @@ pub(super) struct BatchBuilder {
num_bytes: u64,
checkpoint_delta: SourceCheckpointDelta,
force_commit: bool,
- gauge_guard: GaugeGuard,
+ gauge_guard: GaugeGuard<'static>,
}
impl BatchBuilder {
diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs
index 921e782a483..769d52371c3 100644
--- a/quickwit/quickwit-search/src/lib.rs
+++ b/quickwit/quickwit-search/src/lib.rs
@@ -47,8 +47,8 @@ mod metrics;
fn search_executor() -> &'static Executor {
- static SEARCH_EXECUTOR: OnceCell = OnceCell::new();
- &*SEARCH_EXECUTOR.get_or_init(|| quickwit_common::Executor::new("quickwit-search", None))
+ static SEARCH_EXECUTOR: OnceLock = OnceLock::new();
+ &*SEARCH_EXECUTOR.get_or_init(|| quickwit_common::Executor::new("search", None))
}
#[cfg(test)]
@@ -56,7 +56,6 @@ mod tests;
pub use collector::QuickwitAggregations;
use metrics::SEARCH_METRICS;
-use once_cell::sync::OnceCell;
use quickwit_common::tower::Pool;
use quickwit_common::Executor;
use quickwit_doc_mapper::DocMapper;
@@ -69,7 +68,7 @@ use tantivy::schema::NamedFieldDocument;
pub type Result = std::result::Result;
use std::net::{Ipv4Addr, SocketAddr};
-use std::sync::Arc;
+use std::sync::{Arc, OnceLock};
pub use find_trace_ids_collector::FindTraceIdsCollector;
use quickwit_config::SearcherConfig;
diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs
index 37e68074949..e954830467d 100644
--- a/quickwit/quickwit-serve/src/decompression.rs
+++ b/quickwit/quickwit-serve/src/decompression.rs
@@ -22,11 +22,17 @@ use std::io::Read;
use bytes::Bytes;
use flate2::read::GzDecoder;
use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
+use quickwit_common::Executor;
use thiserror::Error;
use tokio::task;
use warp::reject::Reject;
use warp::Filter;
+fn decompression_executor() -> &'static Executor {
+ static DECOMPRESSION_POOL: std::sync::OnceLock = std::sync::OnceLock::new();
+ &*DECOMPRESSION_POOL.get_or_init(|| Executor::new("decompression", Some(2)))
+}
+
/// There are two ways to decompress the body:
/// - Stream the body through an async decompressor
/// - Fetch the body and then decompress the bytes
@@ -37,7 +43,7 @@ use warp::Filter;
async fn decompress_body(encoding: Option, body: Bytes) -> Result {
match encoding.as_deref() {
Some("gzip" | "x-gzip") => {
- let decompressed = task::spawn_blocking(move || {
+ let decompressed = decompression_executor().run_cpu_intensive(move || {
let mut decompressed = Vec::new();
let mut decoder = GzDecoder::new(body.as_ref());
decoder
@@ -89,7 +95,7 @@ pub(crate) fn get_body_bytes() -> impl Filter,
}
impl From for Body {
diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs
index 5e92984328c..ffee44fbe46 100644
--- a/quickwit/quickwit-serve/src/lib.rs
+++ b/quickwit/quickwit-serve/src/lib.rs
@@ -142,6 +142,8 @@ pub fn do_nothing_env_filter_reload_fn() -> EnvFilterReloadFn {
Arc::new(|_| Ok(()))
}
+
+
fn get_metastore_client_max_concurrency() -> usize {
std::env::var(METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY).ok()
.and_then(|metastore_client_max_concurrency_str| {
@@ -176,6 +178,7 @@ static METASTORE_GRPC_CLIENT_METRICS_LAYER: Lazy =
static METASTORE_GRPC_SERVER_METRICS_LAYER: Lazy =
Lazy::new(|| GrpcMetricsLayer::new("metastore", "server"));
+
struct QuickwitServices {
pub node_config: Arc,
pub cluster: Cluster,