Skip to content

Commit

Permalink
give tantivy a shared thread pool to run fetch doc on (#4942)
Browse files Browse the repository at this point in the history
* give tantivy a shared thread pool to run fetch doc on

* remove use of unsafe in thread pool

* Updated tantivy rev

---------

Co-authored-by: Paul Masurel <[email protected]>
  • Loading branch information
trinity-1686a and fulmicoton authored May 9, 2024
1 parent a20009e commit 4a327a1
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 49 deletions.
19 changes: 9 additions & 10 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ quickwit-serve = { path = "quickwit-serve" }
quickwit-storage = { path = "quickwit-storage" }
quickwit-telemetry = { path = "quickwit-telemetry" }

tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "92b5526", default-features = false, features = [
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "1ee5f907", default-features = false, features = [
"lz4-compression",
"mmap",
"quickwit",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use indexmap::IndexSet;
use quickwit_datetime::{DateTimeInputFormat, DateTimeOutputFormat};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value as JsonValue;
use tantivy::schema::OwnedValue as TantivyValue;
use tantivy::DateTimePrecision;
use tantivy::schema::{DateTimePrecision, OwnedValue as TantivyValue};

use super::default_as_true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ use anyhow::bail;
use itertools::Itertools;
use serde_json::Value as JsonValue;
use tantivy::schema::{
BytesOptions, Field, IntoIpv6Addr, IpAddrOptions, JsonObjectOptions, NumericOptions,
OwnedValue as TantivyValue, SchemaBuilder, TextOptions,
BytesOptions, DateOptions, Field, IntoIpv6Addr, IpAddrOptions, JsonObjectOptions,
NumericOptions, OwnedValue as TantivyValue, SchemaBuilder, TextOptions,
};
use tantivy::tokenizer::{PreTokenizedString, Token};
use tantivy::{DateOptions, TantivyDocument as Document};
use tantivy::TantivyDocument as Document;
use tracing::warn;

use super::date_time_type::QuickwitDateTimeOptions;
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-doc-mapper/src/query_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ mod test {
use quickwit_query::create_default_quickwit_tokenizer_manager;
use quickwit_query::query_ast::query_ast_from_user_text;
use tantivy::columnar::MonotonicallyMappableToU64;
use tantivy::schema::{Schema, FAST, INDEXED, STORED, TEXT};
use tantivy::{DateOptions, DateTime, DateTimePrecision};
use tantivy::schema::{DateOptions, DateTimePrecision, Schema, FAST, INDEXED, STORED, TEXT};
use tantivy::DateTime;

use super::build_query;
use crate::{DYNAMIC_FIELD_NAME, SOURCE_FIELD_NAME};
Expand All @@ -298,7 +298,7 @@ mod test {
schema_builder.add_ip_addr_field("ip_notff", STORED);
let date_options = DateOptions::default()
.set_fast()
.set_precision(tantivy::DateTimePrecision::Milliseconds);
.set_precision(DateTimePrecision::Milliseconds);
schema_builder.add_date_field("dt", date_options);
schema_builder.add_u64_field("u64_fast", FAST | STORED);
schema_builder.add_i64_field("i64_fast", FAST | STORED);
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ use quickwit_proto::types::PipelineUid;
use quickwit_query::get_quickwit_fastfield_normalizer_manager;
use quickwit_query::query_ast::QueryAst;
use tantivy::directory::{Advice, DirectoryClone, MmapDirectory, RamDirectory};
use tantivy::index::SegmentId;
use tantivy::tokenizer::TokenizerManager;
use tantivy::{DateTime, Directory, Index, IndexMeta, IndexWriter, SegmentId, SegmentReader};
use tantivy::{DateTime, Directory, Index, IndexMeta, IndexWriter, SegmentReader};
use tokio::runtime::Handle;
use tracing::{debug, info, instrument, warn};

Expand Down
5 changes: 2 additions & 3 deletions quickwit/quickwit-query/src/query_ast/range_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,7 @@ fn map_bound<TFrom, TTo>(bound: &Bound<TFrom>, transform: impl Fn(&TFrom) -> TTo
mod tests {
use std::ops::Bound;

use tantivy::schema::{Schema, FAST, STORED, TEXT};
use tantivy::DateOptions;
use tantivy::schema::{DateOptions, DateTimePrecision, Schema, FAST, STORED, TEXT};

use super::RangeQuery;
use crate::query_ast::tantivy_query_ast::TantivyBoolQuery;
Expand All @@ -372,7 +371,7 @@ mod tests {
schema_builder.add_text_field("my_str_field", FAST);
let date_options = DateOptions::default()
.set_fast()
.set_precision(tantivy::DateTimePrecision::Milliseconds);
.set_precision(DateTimePrecision::Milliseconds);
schema_builder.add_date_field("my_date_field", date_options);
schema_builder.add_u64_field("my_u64_not_fastfield", STORED);
if dynamic_mode {
Expand Down
11 changes: 9 additions & 2 deletions quickwit/quickwit-search/src/fetch_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ use quickwit_proto::search::{
use quickwit_storage::Storage;
use tantivy::query::Query;
use tantivy::schema::{Document as DocumentTrait, Field, OwnedValue, TantivyDocument, Value};
use tantivy::{ReloadPolicy, Score, Searcher, SnippetGenerator, Term};
use tantivy::snippet::SnippetGenerator;
use tantivy::{ReloadPolicy, Score, Searcher, Term};
use tracing::{error, Instrument};

use crate::leaf::open_index_with_caches;
use crate::service::SearcherContext;
use crate::thread_pool::search_executor;
use crate::{convert_document_to_json_string, GlobalDocAddress};

const SNIPPET_MAX_NUM_CHARS: usize = 150;
Expand Down Expand Up @@ -172,7 +174,7 @@ async fn fetch_docs_in_split(
global_doc_addrs.sort_by_key(|doc| doc.doc_addr);
// Opens the index without the ephemeral unbounded cache, this cache is indeed not useful
// when fetching docs as we will fetch them only once.
let index = open_index_with_caches(
let mut index = open_index_with_caches(
&searcher_context,
index_storage,
split,
Expand All @@ -181,6 +183,11 @@ async fn fetch_docs_in_split(
)
.await
.context("open-index-for-split")?;
// we add an executor here, we could add it in open_index_with_caches, though we should verify
// the side-effect before
index
.set_shared_multithread_executor(search_executor())
.context("failed to set search pool")?;
let index_reader = index
.reader_builder()
// the docs are presorted so a cache size of NUM_CONCURRENT_REQUESTS is fine
Expand Down
51 changes: 27 additions & 24 deletions quickwit/quickwit-search/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,28 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::fmt;
use std::sync::Arc;

use once_cell::sync::OnceCell;
use quickwit_common::metrics::GaugeGuard;
use tantivy::Executor;
use tracing::error;

fn search_thread_pool() -> &'static rayon::ThreadPool {
static SEARCH_THREAD_POOL: OnceCell<rayon::ThreadPool> = OnceCell::new();
SEARCH_THREAD_POOL.get_or_init(|| {
rayon::ThreadPoolBuilder::new()
.thread_name(|thread_id| format!("quickwit-search-{thread_id}"))
.panic_handler(|_my_panic| {
error!("task running in the quickwit search pool panicked");
})
.build()
.expect("Failed to spawn the spawning pool")
})
static SEARCH_THREAD_POOL: OnceCell<Arc<Executor>> = OnceCell::new();

fn build_executor() -> Arc<Executor> {
let rayon_pool = rayon::ThreadPoolBuilder::new()
.thread_name(|thread_id| format!("quickwit-search-{thread_id}"))
.panic_handler(|_my_panic| {
error!("task running in the quickwit search pool panicked");
})
.build()
.expect("Failed to spawn the spawning pool");
Arc::new(Executor::ThreadPool(rayon_pool))
}

pub(crate) fn search_executor() -> Arc<Executor> {
SEARCH_THREAD_POOL.get_or_init(build_executor).clone()
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -68,20 +74,17 @@ where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
let span = tracing::Span::current();
search_thread_pool().spawn(move || {
let _guard = span.enter();
let mut active_thread_guard =
GaugeGuard::from_gauge(&crate::SEARCH_METRICS.active_search_threads_count);
active_thread_guard.add(1i64);
if tx.is_closed() {
return;
}
let task_result = cpu_heavy_task();
let _ = tx.send(task_result);
});
rx.await.map_err(|_| Panicked)
search_executor()
.spawn_blocking(move || {
let _guard = span.enter();
let mut active_thread_guard =
GaugeGuard::from_gauge(&crate::SEARCH_METRICS.active_search_threads_count);
active_thread_guard.add(1i64);
cpu_heavy_task()
})
.await
.map_err(|_| Panicked)
}

#[cfg(test)]
Expand Down

0 comments on commit 4a327a1

Please sign in to comment.