Skip to content

Commit

Permalink
share aggregation limit on node
Browse files Browse the repository at this point in the history
Changes how aggregation limits are shared. Previously they were per
request, now they are per node.
The counters are only updated after an allocation in tantivy, which
should keep the contention risk relatively low.
  • Loading branch information
PSeitz committed Aug 28, 2024
1 parent 60368df commit 9853776
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 10 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ pub async fn multi_leaf_search(
.map(|doc_mapper| deserialize_doc_mapper(doc_mapper))
.collect::<crate::Result<_>>()?;
// Creates a collector which merges responses into one
let aggregation_limits = searcher_context.create_new_aggregation_limits();
let aggregation_limits = searcher_context.get_aggregation_limits();
// TODO: to avoid lockstep, we should pull up the future creation over the list of split ids
// and have the semaphore on this level.
// This will lower resource consumption due to less in-flight futures and avoid contention.
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ pub(crate) async fn search_partial_hits_phase(
// Creates a collector which merges responses into one
let merge_collector = make_merge_collector(
search_request,
&searcher_context.create_new_aggregation_limits(),
&searcher_context.get_aggregation_limits(),
)?;

// Merging is a cpu-bound task.
Expand Down Expand Up @@ -966,7 +966,7 @@ fn finalize_aggregation(
let final_aggregation_results: AggregationResults = intermediate_aggregation_results
.into_final_result(
aggregations,
&searcher_context.create_new_aggregation_limits(),
&searcher_context.get_aggregation_limits(),
)?;
serde_json::to_string(&final_aggregation_results)?
}
Expand Down
17 changes: 11 additions & 6 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ pub struct SearcherContext {
pub split_cache_opt: Option<Arc<SplitCache>>,
/// List fields cache. Caches the list fields response for a given split.
pub list_fields_cache: ListFieldsCache,
/// The aggregation limits are passed to limit the memory usage.
pub aggregation_limit: AggregationLimits,
}

impl std::fmt::Debug for SearcherContext {
Expand Down Expand Up @@ -498,6 +500,10 @@ impl SearcherContext {
LeafSearchCache::new(searcher_config.partial_request_cache_capacity.as_u64() as usize);
let list_fields_cache =
ListFieldsCache::new(searcher_config.partial_request_cache_capacity.as_u64() as usize);
let aggregation_limit = AggregationLimits::new(
Some(searcher_config.aggregation_memory_limit.as_u64()),
Some(searcher_config.aggregation_bucket_limit),
);

Self {
searcher_config,
Expand All @@ -508,14 +514,13 @@ impl SearcherContext {
leaf_search_cache,
list_fields_cache,
split_cache_opt,
aggregation_limit

}
}

/// Returns a new instance to track the aggregation memory usage.
pub fn create_new_aggregation_limits(&self) -> AggregationLimits {
AggregationLimits::new(
Some(self.searcher_config.aggregation_memory_limit.as_u64()),
Some(self.searcher_config.aggregation_bucket_limit),
)
/// Returns the shared instance to track the aggregation memory usage.
pub fn get_aggregation_limits(&self) -> AggregationLimits {
self.aggregation_limit.clone()
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec<u32> {
let searcher_context: Arc<SearcherContext> =
Arc::new(SearcherContext::new(SearcherConfig::default(), None));

let agg_limits = searcher_context.create_new_aggregation_limits();
let agg_limits = searcher_context.get_aggregation_limits();

let search_response = leaf_search(
searcher_context,
Expand Down

0 comments on commit 9853776

Please sign in to comment.