diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 95af1d67c60..1530b29f02f 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -4667,7 +4667,7 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "ownedbytes" version = "0.7.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=92b5526#92b5526310ae8863bdc9b160d3042be0f14018cb" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" dependencies = [ "stable_deref_trait", ] @@ -7998,7 +7998,7 @@ dependencies = [ [[package]] name = "tantivy" version = "0.22.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=92b5526#92b5526310ae8863bdc9b160d3042be0f14018cb" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" dependencies = [ "aho-corasick", "arc-swap", @@ -8021,7 +8021,6 @@ dependencies = [ "lz4_flex", "measure_time", "memmap2", - "num_cpus", "once_cell", "oneshot", "rayon", @@ -8051,7 +8050,7 @@ dependencies = [ [[package]] name = "tantivy-bitpacker" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=92b5526#92b5526310ae8863bdc9b160d3042be0f14018cb" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" dependencies = [ "bitpacking", ] @@ -8059,7 +8058,7 @@ dependencies = [ [[package]] name = "tantivy-columnar" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=92b5526#92b5526310ae8863bdc9b160d3042be0f14018cb" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" dependencies = [ "downcast-rs", "fastdivide", @@ -8074,7 +8073,7 @@ dependencies = [ [[package]] name = "tantivy-common" version = "0.7.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=92b5526#92b5526310ae8863bdc9b160d3042be0f14018cb" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" dependencies = [ "async-trait", "byteorder", @@ -8097,7 +8096,7 @@ dependencies = [ [[package]] name = "tantivy-query-grammar" version = "0.22.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=92b5526#92b5526310ae8863bdc9b160d3042be0f14018cb" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" dependencies = [ "nom", ] @@ -8105,7 +8104,7 @@ dependencies = [ [[package]] name = "tantivy-sstable" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=92b5526#92b5526310ae8863bdc9b160d3042be0f14018cb" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" dependencies = [ "tantivy-bitpacker", "tantivy-common", @@ -8116,7 +8115,7 @@ dependencies = [ [[package]] name = "tantivy-stacker" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=92b5526#92b5526310ae8863bdc9b160d3042be0f14018cb" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" dependencies = [ "murmurhash32", "rand_distr", @@ -8126,7 +8125,7 @@ dependencies = [ [[package]] name = "tantivy-tokenizer-api" version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=92b5526#92b5526310ae8863bdc9b160d3042be0f14018cb" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=1ee5f907#1ee5f90761f8890ddc94282b8397240e9eded350" dependencies = [ "serde", ] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 80a654c99a1..1b80aa94bb5 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -321,7 +321,7 @@ quickwit-serve = { path = "quickwit-serve" } quickwit-storage = { path = "quickwit-storage" } quickwit-telemetry = { path = "quickwit-telemetry" } -tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "92b5526", default-features = false, features = [ +tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "1ee5f907", default-features = false, features = [ "lz4-compression", "mmap", "quickwit", diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/date_time_type.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/date_time_type.rs index a08a7c379b5..b7ce4bfc429 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/date_time_type.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/date_time_type.rs @@ -21,8 +21,7 @@ use indexmap::IndexSet; use quickwit_datetime::{DateTimeInputFormat, DateTimeOutputFormat}; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value as JsonValue; -use tantivy::schema::OwnedValue as TantivyValue; -use tantivy::DateTimePrecision; +use tantivy::schema::{DateTimePrecision, OwnedValue as TantivyValue}; use super::default_as_true; diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs index a7351749dbd..4fdf1af030c 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs @@ -26,11 +26,11 @@ use anyhow::bail; use itertools::Itertools; use serde_json::Value as JsonValue; use tantivy::schema::{ - BytesOptions, Field, IntoIpv6Addr, IpAddrOptions, JsonObjectOptions, NumericOptions, - OwnedValue as TantivyValue, SchemaBuilder, TextOptions, + BytesOptions, DateOptions, Field, IntoIpv6Addr, IpAddrOptions, JsonObjectOptions, + NumericOptions, OwnedValue as TantivyValue, SchemaBuilder, TextOptions, }; use tantivy::tokenizer::{PreTokenizedString, Token}; -use tantivy::{DateOptions, TantivyDocument as Document}; +use tantivy::TantivyDocument as Document; use tracing::warn; use super::date_time_type::QuickwitDateTimeOptions; diff --git a/quickwit/quickwit-doc-mapper/src/query_builder.rs b/quickwit/quickwit-doc-mapper/src/query_builder.rs index d26284d29b8..3054ac0661f 100644 --- a/quickwit/quickwit-doc-mapper/src/query_builder.rs +++ b/quickwit/quickwit-doc-mapper/src/query_builder.rs @@ -274,8 +274,8 @@ mod test { use quickwit_query::create_default_quickwit_tokenizer_manager; use quickwit_query::query_ast::query_ast_from_user_text; use tantivy::columnar::MonotonicallyMappableToU64; - use tantivy::schema::{Schema, FAST, INDEXED, STORED, TEXT}; - use tantivy::{DateOptions, DateTime, DateTimePrecision}; + use tantivy::schema::{DateOptions, DateTimePrecision, Schema, FAST, INDEXED, STORED, TEXT}; + use tantivy::DateTime; use super::build_query; use crate::{DYNAMIC_FIELD_NAME, SOURCE_FIELD_NAME}; @@ -298,7 +298,7 @@ mod test { schema_builder.add_ip_addr_field("ip_notff", STORED); let date_options = DateOptions::default() .set_fast() - .set_precision(tantivy::DateTimePrecision::Milliseconds); + .set_precision(DateTimePrecision::Milliseconds); schema_builder.add_date_field("dt", date_options); schema_builder.add_u64_field("u64_fast", FAST | STORED); schema_builder.add_i64_field("i64_fast", FAST | STORED); diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index 85f338270db..19b8208be31 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -43,8 +43,9 @@ use quickwit_proto::types::PipelineUid; use quickwit_query::get_quickwit_fastfield_normalizer_manager; use quickwit_query::query_ast::QueryAst; use tantivy::directory::{Advice, DirectoryClone, MmapDirectory, RamDirectory}; +use tantivy::index::SegmentId; use tantivy::tokenizer::TokenizerManager; -use tantivy::{DateTime, Directory, Index, IndexMeta, IndexWriter, SegmentId, SegmentReader}; +use tantivy::{DateTime, Directory, Index, IndexMeta, IndexWriter, SegmentReader}; use tokio::runtime::Handle; use tracing::{debug, info, instrument, warn}; diff --git a/quickwit/quickwit-query/src/query_ast/range_query.rs b/quickwit/quickwit-query/src/query_ast/range_query.rs index bccc5e6278c..225c26fa815 100644 --- a/quickwit/quickwit-query/src/query_ast/range_query.rs +++ b/quickwit/quickwit-query/src/query_ast/range_query.rs @@ -354,8 +354,7 @@ fn map_bound(bound: &Bound, transform: impl Fn(&TFrom) -> TTo mod tests { use std::ops::Bound; - use tantivy::schema::{Schema, FAST, STORED, TEXT}; - use tantivy::DateOptions; + use tantivy::schema::{DateOptions, DateTimePrecision, Schema, FAST, STORED, TEXT}; use super::RangeQuery; use crate::query_ast::tantivy_query_ast::TantivyBoolQuery; @@ -372,7 +371,7 @@ mod tests { schema_builder.add_text_field("my_str_field", FAST); let date_options = DateOptions::default() .set_fast() - .set_precision(tantivy::DateTimePrecision::Milliseconds); + .set_precision(DateTimePrecision::Milliseconds); schema_builder.add_date_field("my_date_field", date_options); schema_builder.add_u64_field("my_u64_not_fastfield", STORED); if dynamic_mode { diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index ace50b5f261..a3b4cc166b3 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -30,11 +30,13 @@ use quickwit_proto::search::{ use quickwit_storage::Storage; use tantivy::query::Query; use tantivy::schema::{Document as DocumentTrait, Field, OwnedValue, TantivyDocument, Value}; -use tantivy::{ReloadPolicy, Score, Searcher, SnippetGenerator, Term}; +use tantivy::snippet::SnippetGenerator; +use tantivy::{ReloadPolicy, Score, Searcher, Term}; use tracing::{error, Instrument}; use crate::leaf::open_index_with_caches; use crate::service::SearcherContext; +use crate::thread_pool::search_executor; use crate::{convert_document_to_json_string, GlobalDocAddress}; const SNIPPET_MAX_NUM_CHARS: usize = 150; @@ -172,7 +174,7 @@ async fn fetch_docs_in_split( global_doc_addrs.sort_by_key(|doc| doc.doc_addr); // Opens the index without the ephemeral unbounded cache, this cache is indeed not useful // when fetching docs as we will fetch them only once. - let index = open_index_with_caches( + let mut index = open_index_with_caches( &searcher_context, index_storage, split, @@ -181,6 +183,11 @@ async fn fetch_docs_in_split( ) .await .context("open-index-for-split")?; + // we add an executor here, we could add it in open_index_with_caches, though we should verify + // the side-effect before + index + .set_shared_multithread_executor(search_executor()) + .context("failed to set search pool")?; let index_reader = index .reader_builder() // the docs are presorted so a cache size of NUM_CONCURRENT_REQUESTS is fine diff --git a/quickwit/quickwit-search/src/thread_pool.rs b/quickwit/quickwit-search/src/thread_pool.rs index b4ffb8ddb85..3f4f9aba2b0 100644 --- a/quickwit/quickwit-search/src/thread_pool.rs +++ b/quickwit/quickwit-search/src/thread_pool.rs @@ -18,22 +18,28 @@ // along with this program. If not, see . use std::fmt; +use std::sync::Arc; use once_cell::sync::OnceCell; use quickwit_common::metrics::GaugeGuard; +use tantivy::Executor; use tracing::error; -fn search_thread_pool() -> &'static rayon::ThreadPool { - static SEARCH_THREAD_POOL: OnceCell = OnceCell::new(); - SEARCH_THREAD_POOL.get_or_init(|| { - rayon::ThreadPoolBuilder::new() - .thread_name(|thread_id| format!("quickwit-search-{thread_id}")) - .panic_handler(|_my_panic| { - error!("task running in the quickwit search pool panicked"); - }) - .build() - .expect("Failed to spawn the spawning pool") - }) +static SEARCH_THREAD_POOL: OnceCell> = OnceCell::new(); + +fn build_executor() -> Arc { + let rayon_pool = rayon::ThreadPoolBuilder::new() + .thread_name(|thread_id| format!("quickwit-search-{thread_id}")) + .panic_handler(|_my_panic| { + error!("task running in the quickwit search pool panicked"); + }) + .build() + .expect("Failed to spawn the spawning pool"); + Arc::new(Executor::ThreadPool(rayon_pool)) +} + +pub(crate) fn search_executor() -> Arc { + SEARCH_THREAD_POOL.get_or_init(build_executor).clone() } #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -68,20 +74,17 @@ where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (tx, rx) = tokio::sync::oneshot::channel(); let span = tracing::Span::current(); - search_thread_pool().spawn(move || { - let _guard = span.enter(); - let mut active_thread_guard = - GaugeGuard::from_gauge(&crate::SEARCH_METRICS.active_search_threads_count); - active_thread_guard.add(1i64); - if tx.is_closed() { - return; - } - let task_result = cpu_heavy_task(); - let _ = tx.send(task_result); - }); - rx.await.map_err(|_| Panicked) + search_executor() + .spawn_blocking(move || { + let _guard = span.enter(); + let mut active_thread_guard = + GaugeGuard::from_gauge(&crate::SEARCH_METRICS.active_search_threads_count); + active_thread_guard.add(1i64); + cpu_heavy_task() + }) + .await + .map_err(|_| Panicked) } #[cfg(test)]