Skip to content

Commit

Permalink
share aggregation limit on node (#5357)
Browse files Browse the repository at this point in the history
* share aggregation limit on node

Changes how aggregation limits are shared. Previously they were per
request, now they are per node.
The counters are only updated after an allocation in tantivy, which
should keep the contention risk relatively low.

* fix randomness in test

* update documentation
  • Loading branch information
PSeitz authored Aug 30, 2024
1 parent 67091c5 commit c00dd57
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 22 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/node-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ This section contains the configuration options for a Searcher.

| Property | Description | Default value |
| --- | --- | --- |
| `aggregation_memory_limit` | Controls the maximum amount of memory that can be used for aggregations before aborting. This limit is per request and single leaf query (a leaf query is querying one or multiple splits concurrently). It is used to prevent excessive memory usage during the aggregation phase, which can lead to performance degradation or crashes. Since it is per request, concurrent requests can exceed the limit. | `500M`|
| `aggregation_memory_limit` | Controls the maximum amount of memory that can be used for aggregations before aborting. This limit is per searcher node. A node may run concurrent queries, which share the limit. The first query that will hit the limit will be aborted and frees its memory. It is used to prevent excessive memory usage during the aggregation phase, which can lead to performance degradation or crashes. | `500M`|
| `aggregation_bucket_limit` | Determines the maximum number of buckets returned to the client. | `65000` |
| `fast_field_cache_capacity` | Fast field in memory cache capacity on a Searcher. If your filter by dates, run aggregations, range queries, or if you use the search stream API, or even for tracing, it might worth increasing this parameter. The [metrics](../reference/metrics.md) starting by `quickwit_cache_fastfields_cache` can help you make an informed choice when setting this value. | `1G` |
| `split_footer_cache_capacity` | Split footer in memory cache (it is essentially the hotcache) capacity on a Searcher.| `500M` |
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ pub async fn multi_leaf_search(
.map(|doc_mapper| deserialize_doc_mapper(doc_mapper))
.collect::<crate::Result<_>>()?;
// Creates a collector which merges responses into one
let aggregation_limits = searcher_context.create_new_aggregation_limits();
let aggregation_limits = searcher_context.get_aggregation_limits();
// TODO: to avoid lockstep, we should pull up the future creation over the list of split ids
// and have the semaphore on this level.
// This will lower resource consumption due to less in-flight futures and avoid contention.
Expand Down
11 changes: 3 additions & 8 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,10 +684,8 @@ pub(crate) async fn search_partial_hits_phase(
};

// Creates a collector which merges responses into one
let merge_collector = make_merge_collector(
search_request,
&searcher_context.create_new_aggregation_limits(),
)?;
let merge_collector =
make_merge_collector(search_request, &searcher_context.get_aggregation_limits())?;

// Merging is a cpu-bound task.
// It should be executed by Tokio's blocking threads.
Expand Down Expand Up @@ -964,10 +962,7 @@ fn finalize_aggregation(
Default::default()
};
let final_aggregation_results: AggregationResults = intermediate_aggregation_results
.into_final_result(
aggregations,
&searcher_context.create_new_aggregation_limits(),
)?;
.into_final_result(aggregations, &searcher_context.get_aggregation_limits())?;
serde_json::to_string(&final_aggregation_results)?
}
};
Expand Down
16 changes: 10 additions & 6 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@ pub struct SearcherContext {
pub split_cache_opt: Option<Arc<SplitCache>>,
/// List fields cache. Caches the list fields response for a given split.
pub list_fields_cache: ListFieldsCache,
/// The aggregation limits are passed to limit the memory usage.
pub aggregation_limit: AggregationLimits,
}

impl std::fmt::Debug for SearcherContext {
Expand Down Expand Up @@ -498,6 +500,10 @@ impl SearcherContext {
LeafSearchCache::new(searcher_config.partial_request_cache_capacity.as_u64() as usize);
let list_fields_cache =
ListFieldsCache::new(searcher_config.partial_request_cache_capacity.as_u64() as usize);
let aggregation_limit = AggregationLimits::new(
Some(searcher_config.aggregation_memory_limit.as_u64()),
Some(searcher_config.aggregation_bucket_limit),
);

Self {
searcher_config,
Expand All @@ -508,14 +514,12 @@ impl SearcherContext {
leaf_search_cache,
list_fields_cache,
split_cache_opt,
aggregation_limit,
}
}

/// Returns a new instance to track the aggregation memory usage.
pub fn create_new_aggregation_limits(&self) -> AggregationLimits {
AggregationLimits::new(
Some(self.searcher_config.aggregation_memory_limit.as_u64()),
Some(self.searcher_config.aggregation_bucket_limit),
)
/// Returns the shared instance to track the aggregation memory usage.
pub fn get_aggregation_limits(&self) -> AggregationLimits {
self.aggregation_limit.clone()
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec<u32> {
let searcher_context: Arc<SearcherContext> =
Arc::new(SearcherContext::new(SearcherConfig::default(), None));

let agg_limits = searcher_context.create_new_aggregation_limits();
let agg_limits = searcher_context.get_aggregation_limits();

let search_response = leaf_search(
searcher_context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ json:
match:
count: 10
sort:
- count: {"order" : "desc"}
- id: {"order" : "desc"}
expected:
hits:
total:
value: 4
relation: "eq"
hits:
- _source: { "count": 15, "id": 2 }
- _source: { "count": -2.5, "id": 4 }
- _source: { "id": 5 }
- _source: { "count": -2.5, "id": 4 }
- _source: { "id": 3 }
- _source: { "count": 15, "id": 2 }
---
endpoint: _elastic/sortorder/_search
json:
Expand All @@ -29,16 +29,16 @@ json:
match:
count: 10
sort:
- count: {"order" : "asc"}
- id: {"order" : "asc"}
expected:
hits:
total:
value: 4
relation: "eq"
hits:
- _source: {"count": -2.5, "id": 4}
- _source: {"count": 15, "id": 2 }
- _source: {"id": 3}
- _source: {"count": -2.5, "id": 4}
- _source: {"id": 5}
---
endpoint: _elastic/sortorder/_search
Expand Down

0 comments on commit c00dd57

Please sign in to comment.