Skip to content

Commit

Permalink
count optimization for multisplits (#5048)
Browse files Browse the repository at this point in the history
* count optimization for multisplits

* optimization requests by passing threshold in leaf search
* Execute query.count() instead of QuickwitCollector for count searches

We have 100 concurrent split searches by default, but num_cpus worker
threads. This means most search futures will wait to be
scheduled. When they are scheduled they can check the new threshold from
the preceding searches and maybe skip the search.

Switches to RWLock for the threshold since we read more often now.

Future Work:
We run num_cpu full searches in some cases before the threshold kicks
in. But in some cases we could statically
analyze from which split the best results come and generate count only
requests for the others.

Addresses #5032

* add comments
  • Loading branch information
PSeitz authored May 31, 2024
1 parent 85d99a3 commit c90edeb
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 44 deletions.
58 changes: 47 additions & 11 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,18 @@ pub(crate) struct QuickwitCollector {
}

impl QuickwitCollector {
pub fn is_count_only(&self) -> bool {
self.max_hits == 0 && self.aggregation.is_none()
}
/// Updates search parameters affecting the returned documents.
/// Does not update aggregations.
pub fn update_search_param(&mut self, search_request: &SearchRequest) {
let sort_by = sort_by_from_request(search_request);
self.sort_by = sort_by;
self.max_hits = search_request.max_hits as usize;
self.start_offset = search_request.start_offset as usize;
self.search_after.clone_from(&search_request.search_after);
}
pub fn fast_field_names(&self) -> HashSet<String> {
let mut fast_field_names = HashSet::default();
self.sort_by.first.add_fast_field(&mut fast_field_names);
Expand Down
123 changes: 95 additions & 28 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::{HashMap, HashSet};
use std::ops::Bound;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};

use anyhow::Context;
use futures::future::try_join_all;
Expand All @@ -47,6 +47,7 @@ use tantivy::{DateTime, Index, ReloadPolicy, Searcher, Term};
use tracing::*;

use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
use crate::root::is_metadata_count_request_with_ast;
use crate::service::{deserialize_doc_mapper, SearcherContext};
use crate::{QuickwitAggregations, SearchError};

Expand Down Expand Up @@ -330,13 +331,24 @@ async fn warm_up_fieldnorms(searcher: &Searcher, requires_scoring: bool) -> anyh
Ok(())
}

fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse {
LeafSearchResponse {
num_hits: count,
partial_hits: Vec::new(),
failed_splits: Vec::new(),
num_attempted_splits: 1,
intermediate_aggregation_result: None,
}
}

/// Apply a leaf search on a single split.
async fn leaf_search_single_split(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
doc_mapper: Arc<dyn DocMapper>,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
aggregations_limits: AggregationLimits,
) -> crate::Result<LeafSearchResponse> {
rewrite_request(
Expand All @@ -362,32 +374,67 @@ async fn leaf_search_single_split(
.await?;
let split_schema = index.schema();

let quickwit_collector =
make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?;
let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;
let (query, mut warmup_info) = doc_mapper.query(split_schema, &query_ast, false)?;
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let searcher = reader.searcher();

let collector_warmup_info = quickwit_collector.warmup_info();
let mut collector =
make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?;
let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// CanSplitDoBetter or rewrite_request may have changed the request to be a count only request
// This may be the case for AllQuery with a sort by date, where the current split can't have
// better results.
//
// TODO: SplitIdAndFooterOffsets could carry the number of docs in a split, so we could save
// opening the index and execute this earlier. Opening splits is typically served from the
// cache, so there may be no gain adding that info to SplitIdAndFooterOffsets.
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok(get_leaf_resp_from_count(searcher.num_docs() as u64));
}

let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?;

let collector_warmup_info = collector.warmup_info();
warmup_info.merge(collector_warmup_info);
warmup_info.simplify();

warmup(&searcher, &warmup_info).await?;
let span = info_span!("tantivy_search");
let leaf_search_response = crate::search_thread_pool()
.run_cpu_intensive(move || {
let _span_guard = span.enter();
searcher.search(&query, &quickwit_collector)
})
.await
.map_err(|_| {
crate::SearchError::Internal(format!("leaf search panicked. split={split_id}"))
})??;

let (search_request, leaf_search_response) = {
let split = split.clone();

crate::search_thread_pool()
.run_cpu_intensive(move || {
let _span_guard = span.enter();
// Our search execution has been scheduled, let's check if we can improve the
// request based on the results of the preceding searches
check_optimize_search_request(&mut search_request, &split, &split_filter);
collector.update_search_param(&search_request);
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok((
search_request,
get_leaf_resp_from_count(searcher.num_docs() as u64),
));
}
if collector.is_count_only() {
let count = query.count(&searcher)? as u64;
Ok((search_request, get_leaf_resp_from_count(count)))
} else {
searcher
.search(&query, &collector)
.map(|resp| (search_request, resp))
}
})
.await
.map_err(|_| {
crate::SearchError::Internal(format!("leaf search panicked. split={split_id}"))
})??
};

searcher_context
.leaf_search_cache
Expand Down Expand Up @@ -850,7 +897,7 @@ impl CanSplitDoBetter {

/// Record the new worst-of-the-top document, that is, the document which would first be
/// evicted from the list of best documents, if a better document was found. Only call this
/// funciton if you have at least max_hits documents already.
/// function if you have at least max_hits documents already.
fn record_new_worst_hit(&mut self, hit: &PartialHit) {
match self {
CanSplitDoBetter::Uninformative => (),
Expand Down Expand Up @@ -990,6 +1037,29 @@ async fn resolve_storage_and_leaf_search(
.await
}

/// Optimizes the search_request based on CanSplitDoBetter
/// Returns true if the split can return better results
fn check_optimize_search_request(
search_request: &mut SearchRequest,
split: &SplitIdAndFooterOffsets,
split_filter: &Arc<RwLock<CanSplitDoBetter>>,
) -> bool {
let can_be_better = split_filter.read().unwrap().can_be_better(split);
if !can_be_better {
disable_search_request_hits(search_request);
}
can_be_better
}

/// Alter the search request so it does not return any docs.
///
/// This is usually done since it cannot provide better hits results than existing fetched results.
fn disable_search_request_hits(search_request: &mut SearchRequest) {
search_request.max_hits = 0;
search_request.start_offset = 0;
search_request.sort_fields.clear();
}

/// `leaf` step of search.
///
/// The leaf search collects all kind of information, and returns a set of
Expand All @@ -1016,7 +1086,7 @@ pub async fn leaf_search(
|| (request.aggregation_request.is_some()
&& !matches!(split_filter, CanSplitDoBetter::FindTraceIdsAggregation(_)));

let split_filter = Arc::new(Mutex::new(split_filter));
let split_filter = Arc::new(RwLock::new(split_filter));

let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len());

Expand All @@ -1034,13 +1104,9 @@ pub async fn leaf_search(

let mut request = (*request).clone();

if !split_filter.lock().unwrap().can_be_better(&split) {
if !run_all_splits {
continue;
}
request.max_hits = 0;
request.start_offset = 0;
request.sort_fields.clear();
let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter);
if !can_be_better && !run_all_splits {
continue;
}

leaf_search_single_split_futures.push(tokio::spawn(
Expand Down Expand Up @@ -1100,7 +1166,7 @@ async fn leaf_search_single_split_wrapper(
index_storage: Arc<dyn Storage>,
doc_mapper: Arc<dyn DocMapper>,
split: SplitIdAndFooterOffsets,
split_filter: Arc<Mutex<CanSplitDoBetter>>,
split_filter: Arc<RwLock<CanSplitDoBetter>>,
incremental_merge_collector: Arc<Mutex<IncrementalCollector>>,
leaf_split_search_permit: tokio::sync::OwnedSemaphorePermit,
aggregations_limits: AggregationLimits,
Expand All @@ -1115,6 +1181,7 @@ async fn leaf_search_single_split_wrapper(
index_storage,
split.clone(),
doc_mapper,
split_filter.clone(),
aggregations_limits,
)
.await;
Expand Down Expand Up @@ -1144,10 +1211,10 @@ async fn leaf_search_single_split_wrapper(
}),
}
if let Some(last_hit) = locked_incremental_merge_collector.peek_worst_hit() {
// TODO: we could use a RWLock instead and read the value instead of updateing it
// TODO: we could use the RWLock instead and read the value instead of updateing it
// unconditionally.
split_filter
.lock()
.write()
.unwrap()
.record_new_worst_hit(last_hit.as_ref());
}
Expand Down
Loading

0 comments on commit c90edeb

Please sign in to comment.