diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 0b0e6abed17..d743700dfbf 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -4675,7 +4675,7 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "ownedbytes" version = "0.7.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "stable_deref_trait", ] @@ -8007,8 +8007,8 @@ dependencies = [ [[package]] name = "tantivy" -version = "0.22.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" +version = "0.23.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "aho-corasick", "arc-swap", @@ -8060,7 +8060,7 @@ dependencies = [ [[package]] name = "tantivy-bitpacker" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "bitpacking", ] @@ -8068,7 +8068,7 @@ dependencies = [ [[package]] name = "tantivy-columnar" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "downcast-rs", "fastdivide", @@ -8083,7 +8083,7 @@ dependencies = [ [[package]] name = "tantivy-common" version = "0.7.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "async-trait", "byteorder", @@ -8106,7 +8106,7 @@ dependencies = [ [[package]] name = "tantivy-query-grammar" version = "0.22.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "nom", ] @@ -8114,7 +8114,7 @@ dependencies = [ [[package]] name = "tantivy-sstable" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "tantivy-bitpacker", "tantivy-common", @@ -8125,7 +8125,7 @@ dependencies = [ [[package]] name = "tantivy-stacker" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "murmurhash32", "rand_distr", @@ -8135,7 +8135,7 @@ dependencies = [ [[package]] name = "tantivy-tokenizer-api" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=9737b5f#9737b5fae0ae56309429b28f27b3cd8cf16c2434" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=6181c1e#6181c1eb5e2e0126ec16ba352b219825a9640a9d" dependencies = [ "serde", ] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index e763ce8770e..7d9dca7249b 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -1,72 +1,72 @@ [workspace] resolver = "2" members = [ - "quickwit-actors", - "quickwit-aws", - "quickwit-cli", - "quickwit-cluster", - "quickwit-codegen", - "quickwit-codegen/example", - "quickwit-common", - "quickwit-config", - "quickwit-control-plane", - "quickwit-index-management", - "quickwit-datetime", - "quickwit-directories", - "quickwit-doc-mapper", - "quickwit-indexing", - "quickwit-ingest", - "quickwit-integration-tests", - "quickwit-jaeger", - "quickwit-janitor", - "quickwit-lambda", - "quickwit-macros", - "quickwit-metastore", + "quickwit-actors", + "quickwit-aws", + "quickwit-cli", + "quickwit-cluster", + "quickwit-codegen", + "quickwit-codegen/example", + "quickwit-common", + "quickwit-config", + "quickwit-control-plane", + "quickwit-index-management", + "quickwit-datetime", + "quickwit-directories", + "quickwit-doc-mapper", + "quickwit-indexing", + "quickwit-ingest", + "quickwit-integration-tests", + "quickwit-jaeger", + "quickwit-janitor", + "quickwit-lambda", + "quickwit-macros", + "quickwit-metastore", - # Disabling metastore-utils from the quickwit projects to ease build/deps. - # We can reenable it when we need it. - # "quickwit-metastore-utils", - "quickwit-opentelemetry", - "quickwit-proto", - "quickwit-query", - "quickwit-rest-client", - "quickwit-search", - "quickwit-serve", - "quickwit-storage", - "quickwit-telemetry", + # Disabling metastore-utils from the quickwit projects to ease build/deps. + # We can reenable it when we need it. + # "quickwit-metastore-utils", + "quickwit-opentelemetry", + "quickwit-proto", + "quickwit-query", + "quickwit-rest-client", + "quickwit-search", + "quickwit-serve", + "quickwit-storage", + "quickwit-telemetry", ] # The following list excludes `quickwit-metastore-utils` and `quickwit-lambda` # from the default member to ease build/deps. default-members = [ - "quickwit-actors", - "quickwit-aws", - "quickwit-cli", - "quickwit-cluster", - "quickwit-codegen", - "quickwit-codegen/example", - "quickwit-common", - "quickwit-config", - "quickwit-control-plane", - "quickwit-datetime", - "quickwit-directories", - "quickwit-doc-mapper", - "quickwit-index-management", - "quickwit-indexing", - "quickwit-ingest", - "quickwit-integration-tests", - "quickwit-jaeger", - "quickwit-janitor", - "quickwit-macros", - "quickwit-metastore", - "quickwit-opentelemetry", - "quickwit-proto", - "quickwit-query", - "quickwit-rest-client", - "quickwit-search", - "quickwit-serve", - "quickwit-storage", - "quickwit-telemetry", + "quickwit-actors", + "quickwit-aws", + "quickwit-cli", + "quickwit-cluster", + "quickwit-codegen", + "quickwit-codegen/example", + "quickwit-common", + "quickwit-config", + "quickwit-control-plane", + "quickwit-datetime", + "quickwit-directories", + "quickwit-doc-mapper", + "quickwit-index-management", + "quickwit-indexing", + "quickwit-ingest", + "quickwit-integration-tests", + "quickwit-jaeger", + "quickwit-janitor", + "quickwit-macros", + "quickwit-metastore", + "quickwit-opentelemetry", + "quickwit-proto", + "quickwit-query", + "quickwit-rest-client", + "quickwit-search", + "quickwit-serve", + "quickwit-storage", + "quickwit-telemetry", ] [workspace.package] @@ -91,8 +91,8 @@ bytesize = { version = "1.3.0", features = ["serde"] } bytestring = "1.3.0" chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "d039699" } chrono = { version = "0.4", default-features = false, features = [ - "clock", - "std", + "clock", + "std", ] } clap = { version = "4.5.0", features = ["env", "string"] } coarsetime = "0.1.33" @@ -124,12 +124,12 @@ http = "0.2.9" http-serde = "1.1.2" humantime = "2.1.0" hyper = { version = "0.14", features = [ - "client", - "http1", - "http2", - "server", - "stream", - "tcp", + "client", + "http1", + "http2", + "server", + "stream", + "tcp", ] } hyper-rustls = "0.24" indexmap = { version = "2.1.0", features = ["serde"] } @@ -141,12 +141,12 @@ lru = "0.12" lindera-core = "0.27.0" lindera-dictionary = "0.27.0" lindera-tokenizer = { version = "0.27.0", features = [ - "cc-cedict-compress", - "cc-cedict", - "ipadic-compress", - "ipadic", - "ko-dic-compress", - "ko-dic", + "cc-cedict-compress", + "cc-cedict", + "ipadic-compress", + "ipadic", + "ko-dic-compress", + "ko-dic", ] } matches = "0.1.9" md5 = "0.7" @@ -167,7 +167,7 @@ percent-encoding = "2.3.1" pin-project = "1.1.0" pnet = { version = "0.33.0", features = ["std"] } postcard = { version = "1.0.4", features = [ - "use-std", + "use-std", ], default-features = false } predicates = "3" prettyplease = "0.2.0" @@ -175,37 +175,37 @@ proc-macro2 = "1.0.50" prometheus = { version = "0.13", features = ["process"] } proptest = "1" prost = { version = "0.11.6", default-features = false, features = [ - "prost-derive", + "prost-derive", ] } prost-build = "0.11.6" prost-types = "0.11.6" pulsar = { git = "https://github.com/quickwit-oss/pulsar-rs.git", rev = "f9eff04", default-features = false, features = [ - "auth-oauth2", - "compression", - "tokio-runtime", + "auth-oauth2", + "compression", + "tokio-runtime", ] } quote = "1.0.23" rand = "0.8" rand_distr = "0.4" -rayon = "1" +rayon = "1.10" rdkafka = { version = "0.33", default-features = false, features = [ - "cmake-build", - "libz", - "ssl", - "tokio", - "zstd", + "cmake-build", + "libz", + "ssl", + "tokio", + "zstd", ] } regex = "1.10.0" regex-syntax = "0.8" reqwest = { version = "0.11", default-features = false, features = [ - "json", - "rustls-tls", + "json", + "rustls-tls", ] } rust-embed = "6.8.1" sea-query = { version = "0.30" } sea-query-binder = { version = "0.5", features = [ - "runtime-tokio-rustls", - "sqlx-postgres", + "runtime-tokio-rustls", + "sqlx-postgres", ] } # ^1.0.184 due to serde-rs/serde#2538 serde = { version = "1.0.184", features = ["derive", "rc"] } @@ -216,10 +216,10 @@ serde_yaml = "0.9" siphasher = "0.3" smallvec = "1" sqlx = { version = "0.7", features = [ - "migrate", - "postgres", - "runtime-tokio-rustls", - "time", + "migrate", + "postgres", + "runtime-tokio-rustls", + "time", ] } syn = { version = "2.0.11", features = ["extra-traits", "full", "parsing"] } sync_wrapper = "0.1.2" @@ -237,19 +237,19 @@ toml = "0.7.6" tonic = { version = "0.9.0", features = ["gzip"] } tonic-build = "0.9.0" tower = { version = "0.4.13", features = [ - "balance", - "buffer", - "load", - "retry", - "util", + "balance", + "buffer", + "load", + "retry", + "util", ] } tower-http = { version = "0.4.0", features = ["compression-gzip", "cors"] } tracing = "0.1.37" tracing-opentelemetry = "0.20.0" tracing-subscriber = { version = "0.3.16", features = [ - "env-filter", - "std", - "time", + "env-filter", + "std", + "time", ] } ttl_cache = "0.5" typetag = "0.2" @@ -258,10 +258,10 @@ username = "0.2" utoipa = "4.2.0" uuid = { version = "1.8", features = ["v4", "serde"] } vrl = { version = "0.8.1", default-features = false, features = [ - "compiler", - "diagnostic", - "stdlib", - "value", + "compiler", + "diagnostic", + "stdlib", + "value", ] } warp = "0.3" whichlang = { git = "https://github.com/quickwit-oss/whichlang", rev = "fe406416" } @@ -279,10 +279,10 @@ aws-types = "1.2" azure_core = { version = "0.13.0", features = ["enable_reqwest_rustls"] } azure_storage = { version = "0.13.0", default-features = false, features = [ - "enable_reqwest_rustls", + "enable_reqwest_rustls", ] } azure_storage_blobs = { version = "0.13.0", default-features = false, features = [ - "enable_reqwest_rustls", + "enable_reqwest_rustls", ] } opendal = { version = "0.44", default-features = false } @@ -317,11 +317,11 @@ quickwit-serve = { path = "quickwit-serve" } quickwit-storage = { path = "quickwit-storage" } quickwit-telemetry = { path = "quickwit-telemetry" } -tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "9737b5f", default-features = false, features = [ - "lz4-compression", - "mmap", - "quickwit", - "zstd-compression", +tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "6181c1e", default-features = false, features = [ + "lz4-compression", + "mmap", + "quickwit", + "zstd-compression", ] } # This is actually not used directly the goal is to fix the version diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index c26817e3b27..9b5609b14e1 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -30,7 +30,7 @@ pin-project = { workspace = true } pnet = { workspace = true } prometheus = { workspace = true } rand = { workspace = true } -rayon = { worskpace = true } +rayon = { workspace = true } regex = { workspace = true } serde = { workspace = true } siphasher = { workspace = true } diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index e3a0b57a211..6de4a337c0a 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -43,7 +43,7 @@ pub mod stream_utils; pub mod temp_dir; #[cfg(any(test, feature = "testsuite"))] pub mod test_utils; -mod executor; +pub mod thread_pool; pub mod tower; pub mod type_map; pub mod uri; @@ -55,7 +55,6 @@ use std::ops::{Range, RangeInclusive}; use std::str::FromStr; pub use coolid::new_coolid; -pub use executor::Executor; pub use kill_switch::KillSwitch; pub use path_hasher::PathHasher; pub use progress::{Progress, ProtectedZoneGuard}; diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 30ab074ee2d..c6b1dd8c18a 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -361,13 +361,3 @@ impl InFlightDataGauges { } pub static MEMORY_METRICS: Lazy = Lazy::new(MemoryMetrics::default); - -pub static ACTIVE_THREAD_COUNT: Lazy> = Lazy::new(|| { - new_gauge_vec( - "active_thread_count", - "Number of active threads in a given thread pool.", - "threads", - &[], - ["pool"] - ) -}); diff --git a/quickwit/quickwit-common/src/thread_pool.rs b/quickwit/quickwit-common/src/thread_pool.rs new file mode 100644 index 00000000000..a1859ff9e9d --- /dev/null +++ b/quickwit/quickwit-common/src/thread_pool.rs @@ -0,0 +1,190 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::fmt; +use std::sync::Arc; + +use futures::{Future, TryFutureExt}; +use once_cell::sync::Lazy; +use prometheus::IntGauge; +use tokio::sync::oneshot; +use tracing::error; + +use crate::metrics::{new_gauge_vec, GaugeGuard, IntGaugeVec}; + +/// An executor backed by a thread pool to run CPU intensive tasks. +/// +/// tokio::spawn_blocking should only used for IO-bound tasks, as it has not limit on its +/// thread count. +#[derive(Clone)] +pub struct ThreadPool { + thread_pool: Arc, + active_threads_gauge: IntGauge, +} + +impl ThreadPool { + pub fn new(name: &'static str, num_threads_opt: Option) -> ThreadPool { + let mut rayon_pool_builder = rayon::ThreadPoolBuilder::new() + .thread_name(move |thread_id| format!("quickwit-{name}-{thread_id}")) + .panic_handler(|_my_panic| { + error!("task running in the quickwit search pool panicked"); + }); + if let Some(num_threads) = num_threads_opt { + rayon_pool_builder = rayon_pool_builder.num_threads(num_threads); + } + let thread_pool = rayon_pool_builder + .build() + .expect("Failed to spawn the spawning pool"); + let active_threads_gauge = ACTIVE_THREAD_COUNT.with_label_values([name]); + ThreadPool { + thread_pool: Arc::new(thread_pool), + active_threads_gauge, + } + } + + pub fn get_underlying_rayon_thread_pool(&self) -> Arc { + self.thread_pool.clone() + } + + /// Function similar to `tokio::spawn_blocking`. + /// + /// Here are two important differences however: + /// + /// 1) The task is running on a rayon thread pool managed by quickwit. + /// This pool is specifically used only to run CPU intensive work + /// and is configured to contain `num_cpus` cores. + /// + /// 2) Before the task is effectively scheduled, we check that + /// the spawner is still interested by its result. + /// + /// It is therefore required to `await` the result of this + /// function to get anywork done. + /// + /// This is nice, because it makes work that has been scheduled + /// but is not running yet "cancellable". + pub fn run_cpu_intensive( + &self, + cpu_heavy_task: F, + ) -> impl Future> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let span = tracing::Span::current(); + let gauge = self.active_threads_gauge.clone(); + let (tx, rx) = oneshot::channel(); + self.thread_pool.spawn(move || { + if tx.is_closed() { + return; + } + let _guard = span.enter(); + let mut active_thread_guard = GaugeGuard::from_gauge(&gauge); + active_thread_guard.add(1i64); + let result = cpu_heavy_task(); + let _ = tx.send(result); + }); + rx.map_err(|_| Panicked) + } +} + +/// Run a small (<200ms) CPU intensive task on a dedicated thread pool with a few threads. +/// +/// When running blocking io (or side-effects in general), prefer use `tokio::spawn_blocking` +/// instead. When running long tasks or a set of tasks that you expect should take more than 33% of +/// your vCPUs, use a dedicated thread/runtime or executor instead. +/// +/// Disclaimer: The function will no be executed if the Future is dropped. +#[must_use = "run_cpu_intensive will not run if the future it returns is dropped"] +pub fn run_cpu_intensive(cpu_heavy_task: F) -> impl Future> +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + static SMALL_TASK_EXECUTOR: std::sync::OnceLock = std::sync::OnceLock::new(); + SMALL_TASK_EXECUTOR + .get_or_init(|| { + let num_threads: usize = (crate::num_cpus() / 3).max(2); + ThreadPool::new("small_tasks", Some(num_threads)) + }) + .run_cpu_intensive(cpu_heavy_task) +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct Panicked; + +impl fmt::Display for Panicked { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Scheduled job panicked") + } +} + +impl std::error::Error for Panicked {} + +pub static ACTIVE_THREAD_COUNT: Lazy> = Lazy::new(|| { + new_gauge_vec( + "active_thread_count", + "Number of active threads in a given thread pool.", + "thread_pool", + &[], + ["pool"], + ) +}); + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::Arc; + use std::time::Duration; + + use super::*; + + #[tokio::test] + async fn test_run_cpu_intensive() { + assert_eq!(run_cpu_intensive(|| 1).await, Ok(1)); + } + + #[tokio::test] + async fn test_run_cpu_intensive_panicks() { + assert!(run_cpu_intensive(|| panic!("")).await.is_err()); + } + + #[tokio::test] + async fn test_run_cpu_intensive_panicks_do_not_shrink_thread_pool() { + for _ in 0..100 { + assert!(run_cpu_intensive(|| panic!("")).await.is_err()); + } + } + + #[tokio::test] + async fn test_run_cpu_intensive_abort() { + let counter: Arc = Default::default(); + let mut futures = Vec::new(); + for _ in 0..1_000 { + let counter_clone = counter.clone(); + let fut = run_cpu_intensive(move || { + std::thread::sleep(Duration::from_millis(5)); + counter_clone.fetch_add(1, Ordering::SeqCst) + }); + // The first few num_cores tasks should run, but the other should get cancelled. + futures.push(tokio::time::timeout(Duration::from_millis(1), fut)); + } + futures::future::join_all(futures).await; + assert!(counter.load(Ordering::SeqCst) < 100); + } +} diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 13143b8eb85..93b8bb47061 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -361,8 +361,8 @@ struct IndexingWorkbench { // We use this value to set the `delete_opstamp` of the workbench splits. last_delete_opstamp: u64, // Number of bytes declared as used by tantivy. - memory_usage: GaugeGuard, - split_builders_guard: GaugeGuard, + memory_usage: GaugeGuard<'static>, + split_builders_guard: GaugeGuard<'static>, cooperative_indexing_period: Option, } diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs index 0dff364c6f3..6236728d99c 100644 --- a/quickwit/quickwit-indexing/src/models/indexed_split.rs +++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs @@ -184,8 +184,8 @@ pub struct IndexedSplitBatchBuilder { pub publish_token_opt: Option, pub commit_trigger: CommitTrigger, pub batch_parent_span: Span, - pub memory_usage: GaugeGuard, - pub _split_builders_guard: GaugeGuard, + pub memory_usage: GaugeGuard<'static>, + pub _split_builders_guard: GaugeGuard<'static>, } /// Sends notifications to the Publisher that the last batch of splits was emtpy. diff --git a/quickwit/quickwit-indexing/src/models/processed_doc.rs b/quickwit/quickwit-indexing/src/models/processed_doc.rs index cd9d1df0e24..14a75298d29 100644 --- a/quickwit/quickwit-indexing/src/models/processed_doc.rs +++ b/quickwit/quickwit-indexing/src/models/processed_doc.rs @@ -46,7 +46,7 @@ pub struct ProcessedDocBatch { pub docs: Vec, pub checkpoint_delta: SourceCheckpointDelta, pub force_commit: bool, - _gauge_guard: GaugeGuard, + _gauge_guard: GaugeGuard<'static>, } impl ProcessedDocBatch { diff --git a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs index f9cf0720c0b..8882c51e961 100644 --- a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs +++ b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs @@ -29,7 +29,7 @@ pub struct RawDocBatch { pub docs: Vec, pub checkpoint_delta: SourceCheckpointDelta, pub force_commit: bool, - _gauge_guard: GaugeGuard, + _gauge_guard: GaugeGuard<'static>, } impl RawDocBatch { diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index f42fbd444a9..1803dc6a7f5 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -485,7 +485,7 @@ pub(super) struct BatchBuilder { num_bytes: u64, checkpoint_delta: SourceCheckpointDelta, force_commit: bool, - gauge_guard: GaugeGuard, + gauge_guard: GaugeGuard<'static>, } impl BatchBuilder { diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 8fe8c173ca5..4cb6819ab08 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -31,6 +31,7 @@ use bytes::Bytes; use futures::TryStreamExt; pub use index_metadata::IndexMetadata; use itertools::Itertools; +use quickwit_common::thread_pool::run_cpu_intensive; use quickwit_config::{IndexConfig, RetentionPolicy, SearchSettings, SourceConfig}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::metastore::{ @@ -286,7 +287,7 @@ impl IndexesMetadataResponseExt for IndexesMetadataResponse { indexes_metadata: Vec, failures: Vec, ) -> MetastoreResult { - let indexes_metadata_json_zstd = tokio::task::spawn_blocking(move || { + let indexes_metadata_json_zstd = run_cpu_intensive(move || { serde_utils::to_json_zstd(&indexes_metadata, 0).map(Bytes::from) }) .await @@ -302,14 +303,12 @@ impl IndexesMetadataResponseExt for IndexesMetadataResponse { } async fn deserialize_indexes_metadata(self) -> MetastoreResult> { - tokio::task::spawn_blocking(move || { - serde_utils::from_json_zstd(&self.indexes_metadata_json_zstd) - }) - .await - .map_err(|join_error| MetastoreError::Internal { - message: "failed to deserialize indexes metadata".to_string(), - cause: join_error.to_string(), - })? + run_cpu_intensive(move || serde_utils::from_json_zstd(&self.indexes_metadata_json_zstd)) + .await + .map_err(|join_error| MetastoreError::Internal { + message: "failed to deserialize indexes metadata".to_string(), + cause: join_error.to_string(), + })? } } @@ -338,7 +337,7 @@ impl ListIndexesMetadataResponseExt for ListIndexesMetadataResponse { async fn try_from_indexes_metadata( indexes_metadata: Vec, ) -> MetastoreResult { - let indexes_metadata_json_zstd = tokio::task::spawn_blocking(move || { + let indexes_metadata_json_zstd = run_cpu_intensive(move || { serde_utils::to_json_zstd(&indexes_metadata, 0).map(Bytes::from) }) .await @@ -354,7 +353,7 @@ impl ListIndexesMetadataResponseExt for ListIndexesMetadataResponse { } async fn deserialize_indexes_metadata(self) -> MetastoreResult> { - tokio::task::spawn_blocking(move || { + run_cpu_intensive(move || { if let Some(indexes_metadata_json) = &self.indexes_metadata_json_opt { return serde_utils::from_json_str(indexes_metadata_json); }; diff --git a/quickwit/quickwit-opentelemetry/src/otlp/logs.rs b/quickwit/quickwit-opentelemetry/src/otlp/logs.rs index fdfa1a58b38..f803bcca1ad 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/logs.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/logs.rs @@ -23,6 +23,7 @@ use std::collections::{btree_set, BTreeSet, HashMap}; use async_trait::async_trait; use prost::Message; use quickwit_common::rate_limited_error; +use quickwit_common::thread_pool::run_cpu_intensive; use quickwit_common::uri::Uri; use quickwit_config::{load_index_config_from_user_config, ConfigFormat, IndexConfig}; use quickwit_ingest::{ @@ -262,7 +263,7 @@ impl OtlpGrpcLogsService { num_log_records, num_parse_errors, error_message, - } = tokio::task::spawn_blocking({ + } = run_cpu_intensive({ let parent_span = RuntimeSpan::current(); || Self::parse_logs(request, parent_span, index_id) }) diff --git a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs index 40e211697b8..f4ed43fd0b0 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs @@ -23,6 +23,7 @@ use std::str::FromStr; use async_trait::async_trait; use prost::Message; +use quickwit_common::thread_pool::run_cpu_intensive; use quickwit_common::uri::Uri; use quickwit_config::{load_index_config_from_user_config, ConfigFormat, IndexConfig}; use quickwit_ingest::{ @@ -711,7 +712,7 @@ impl OtlpGrpcTracesService { num_spans, num_parse_errors, error_message, - } = tokio::task::spawn_blocking({ + } = run_cpu_intensive({ let parent_span = RuntimeSpan::current(); || Self::parse_spans(request, parent_span, index_id) }) diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index c4411e8ea9d..7be3f885bb9 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -184,7 +184,9 @@ async fn fetch_docs_in_split( .context("open-index-for-split")?; // we add an executor here, we could add it in open_index_with_caches, though we should verify // the side-effect before - let executor_tantivy = crate::search_executor().get_underlying_rayon_thread_pool().into(); + let executor_tantivy = crate::search_thread_pool() + .get_underlying_rayon_thread_pool() + .into(); index.set_executor(executor_tantivy); let index_reader = index .reader_builder() diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 1f859cdc740..a983d3e9015 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -378,14 +378,15 @@ async fn leaf_search_single_split( warmup(&searcher, &warmup_info).await?; let span = info_span!("tantivy_search"); - let leaf_search_response = crate::search_executor().run_cpu_intensive(move || { - let _span_guard = span.enter(); - searcher.search(&query, &quickwit_collector) - }) - .await - .map_err(|_| { - crate::SearchError::Internal(format!("leaf search panicked. split={split_id}")) - })??; + let leaf_search_response = crate::search_thread_pool() + .run_cpu_intensive(move || { + let _span_guard = span.enter(); + searcher.search(&query, &quickwit_collector) + }) + .await + .map_err(|_| { + crate::SearchError::Internal(format!("leaf search panicked. split={split_id}")) + })??; searcher_context .leaf_search_cache @@ -921,7 +922,8 @@ pub async fn leaf_search( } } - crate::search_executor().run_cpu_intensive(|| incremental_merge_collector.finalize().map_err(Into::into)) + crate::search_thread_pool() + .run_cpu_intensive(|| incremental_merge_collector.finalize().map_err(Into::into)) .instrument(info_span!("incremental_merge_finalize")) .await .context("failed to merge split search responses")? diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 921e782a483..336ecabdb17 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -45,20 +45,13 @@ pub(crate) mod top_k_collector; mod metrics; - -fn search_executor() -> &'static Executor { - static SEARCH_EXECUTOR: OnceCell = OnceCell::new(); - &*SEARCH_EXECUTOR.get_or_init(|| quickwit_common::Executor::new("quickwit-search", None)) -} - #[cfg(test)] mod tests; pub use collector::QuickwitAggregations; use metrics::SEARCH_METRICS; -use once_cell::sync::OnceCell; +use quickwit_common::thread_pool::ThreadPool; use quickwit_common::tower::Pool; -use quickwit_common::Executor; use quickwit_doc_mapper::DocMapper; use quickwit_proto::metastore::{ ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, MetastoreServiceClient, @@ -69,7 +62,7 @@ use tantivy::schema::NamedFieldDocument; pub type Result = std::result::Result; use std::net::{Ipv4Addr, SocketAddr}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; pub use find_trace_ids_collector::FindTraceIdsCollector; use quickwit_config::SearcherConfig; @@ -103,6 +96,11 @@ pub use crate::service::{MockSearchService, SearchService, SearchServiceImpl}; /// A pool of searcher clients identified by their gRPC socket address. pub type SearcherPool = Pool; +fn search_thread_pool() -> &'static ThreadPool { + static SEARCH_EXECUTOR: OnceLock = OnceLock::new(); + &*SEARCH_EXECUTOR.get_or_init(|| ThreadPool::new("search", None)) +} + /// GlobalDocAddress serves as a hit address. #[derive(Clone, Eq, Debug, PartialEq, Hash, Ord, PartialOrd)] pub struct GlobalDocAddress { diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index 39ac1a4e890..47e06129d61 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -20,9 +20,7 @@ // See https://prometheus.io/docs/practices/naming/ use once_cell::sync::Lazy; -use quickwit_common::metrics::{ - new_counter, new_histogram, Histogram, IntCounter -}; +use quickwit_common::metrics::{new_counter, new_histogram, Histogram, IntCounter}; pub struct SearchMetrics { pub leaf_searches_splits_total: IntCounter, diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index e24c53b95b1..b5c8a2c1e27 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -667,13 +667,14 @@ pub(crate) async fn search_partial_hits_phase( let leaf_search_responses: Vec> = leaf_search_responses.into_iter().map(Ok).collect_vec(); let span = info_span!("merge_fruits"); - let leaf_search_response = crate::search_executor().run_cpu_intensive(move || { - let _span_guard = span.enter(); - merge_collector.merge_fruits(leaf_search_responses) - }) - .await - .context("failed to merge leaf search responses")? - .map_err(|error: TantivyError| crate::SearchError::Internal(error.to_string()))?; + let leaf_search_response = crate::search_thread_pool() + .run_cpu_intensive(move || { + let _span_guard = span.enter(); + merge_collector.merge_fruits(leaf_search_responses) + }) + .await + .context("failed to merge leaf search responses")? + .map_err(|error: TantivyError| crate::SearchError::Internal(error.to_string()))?; debug!( num_hits = leaf_search_response.num_hits, failed_splits = ?leaf_search_response.failed_splits, diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index fce1d3a4709..cca351ef4e2 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -196,7 +196,7 @@ async fn leaf_search_stream_single_split( let _ = span.enter(); let m_request_fields = request_fields.clone(); - let collect_handle = crate::search_executor().run_cpu_intensive(move || { + let collect_handle = crate::search_thread_pool().run_cpu_intensive(move || { let mut buffer = Vec::new(); match m_request_fields.fast_field_types() { (Type::I64, None) => { diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs index 37e68074949..7e6e2946b37 100644 --- a/quickwit/quickwit-serve/src/decompression.rs +++ b/quickwit/quickwit-serve/src/decompression.rs @@ -22,8 +22,8 @@ use std::io::Read; use bytes::Bytes; use flate2::read::GzDecoder; use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::thread_pool::run_cpu_intensive; use thiserror::Error; -use tokio::task; use warp::reject::Reject; use warp::Filter; @@ -37,7 +37,7 @@ use warp::Filter; async fn decompress_body(encoding: Option, body: Bytes) -> Result { match encoding.as_deref() { Some("gzip" | "x-gzip") => { - let decompressed = task::spawn_blocking(move || { + let decompressed = run_cpu_intensive(move || { let mut decompressed = Vec::new(); let mut decoder = GzDecoder::new(body.as_ref()); decoder @@ -50,7 +50,7 @@ async fn decompress_body(encoding: Option, body: Bytes) -> Result { - let decompressed = task::spawn_blocking(move || { + let decompressed = run_cpu_intensive(move || { zstd::decode_all(body.as_ref()) .map(Bytes::from) .map_err(|_| warp::reject::custom(CorruptedData)) @@ -89,7 +89,7 @@ pub(crate) fn get_body_bytes() -> impl Filter, } impl From for Body {