From c00dd571d2aff2f9e19734f5c8724b4788b76d16 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Fri, 30 Aug 2024 18:10:06 +0800 Subject: [PATCH] share aggregation limit on node (#5357) * share aggregation limit on node Changes how aggregation limits are shared. Previously they were per request, now they are per node. The counters are only updated after an allocation in tantivy, which should keep the contention risk relatively low. * fix randomness in test * update documentation --- docs/configuration/node-config.md | 2 +- quickwit/quickwit-search/src/leaf.rs | 2 +- quickwit/quickwit-search/src/root.rs | 11 +++-------- quickwit/quickwit-search/src/service.rs | 16 ++++++++++------ quickwit/quickwit-search/src/tests.rs | 2 +- .../sort_orders/0001-sort-elasticapi.yaml | 10 +++++----- 6 files changed, 21 insertions(+), 22 deletions(-) diff --git a/docs/configuration/node-config.md b/docs/configuration/node-config.md index e51313e9eda..371c788583c 100644 --- a/docs/configuration/node-config.md +++ b/docs/configuration/node-config.md @@ -191,7 +191,7 @@ This section contains the configuration options for a Searcher. | Property | Description | Default value | | --- | --- | --- | -| `aggregation_memory_limit` | Controls the maximum amount of memory that can be used for aggregations before aborting. This limit is per request and single leaf query (a leaf query is querying one or multiple splits concurrently). It is used to prevent excessive memory usage during the aggregation phase, which can lead to performance degradation or crashes. Since it is per request, concurrent requests can exceed the limit. | `500M`| +| `aggregation_memory_limit` | Controls the maximum amount of memory that can be used for aggregations before aborting. This limit is per searcher node. A node may run concurrent queries, which share the limit. The first query that will hit the limit will be aborted and frees its memory. It is used to prevent excessive memory usage during the aggregation phase, which can lead to performance degradation or crashes. | `500M`| | `aggregation_bucket_limit` | Determines the maximum number of buckets returned to the client. | `65000` | | `fast_field_cache_capacity` | Fast field in memory cache capacity on a Searcher. If your filter by dates, run aggregations, range queries, or if you use the search stream API, or even for tracing, it might worth increasing this parameter. The [metrics](../reference/metrics.md) starting by `quickwit_cache_fastfields_cache` can help you make an informed choice when setting this value. | `1G` | | `split_footer_cache_capacity` | Split footer in memory cache (it is essentially the hotcache) capacity on a Searcher.| `500M` | diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index e9e923c0a48..761c51bd16b 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -1072,7 +1072,7 @@ pub async fn multi_leaf_search( .map(|doc_mapper| deserialize_doc_mapper(doc_mapper)) .collect::>()?; // Creates a collector which merges responses into one - let aggregation_limits = searcher_context.create_new_aggregation_limits(); + let aggregation_limits = searcher_context.get_aggregation_limits(); // TODO: to avoid lockstep, we should pull up the future creation over the list of split ids // and have the semaphore on this level. // This will lower resource consumption due to less in-flight futures and avoid contention. diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index f76e06ccd99..dfed9722c9c 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -684,10 +684,8 @@ pub(crate) async fn search_partial_hits_phase( }; // Creates a collector which merges responses into one - let merge_collector = make_merge_collector( - search_request, - &searcher_context.create_new_aggregation_limits(), - )?; + let merge_collector = + make_merge_collector(search_request, &searcher_context.get_aggregation_limits())?; // Merging is a cpu-bound task. // It should be executed by Tokio's blocking threads. @@ -964,10 +962,7 @@ fn finalize_aggregation( Default::default() }; let final_aggregation_results: AggregationResults = intermediate_aggregation_results - .into_final_result( - aggregations, - &searcher_context.create_new_aggregation_limits(), - )?; + .into_final_result(aggregations, &searcher_context.get_aggregation_limits())?; serde_json::to_string(&final_aggregation_results)? } }; diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index bee1dd61253..1fb529c3d42 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -458,6 +458,8 @@ pub struct SearcherContext { pub split_cache_opt: Option>, /// List fields cache. Caches the list fields response for a given split. pub list_fields_cache: ListFieldsCache, + /// The aggregation limits are passed to limit the memory usage. + pub aggregation_limit: AggregationLimits, } impl std::fmt::Debug for SearcherContext { @@ -498,6 +500,10 @@ impl SearcherContext { LeafSearchCache::new(searcher_config.partial_request_cache_capacity.as_u64() as usize); let list_fields_cache = ListFieldsCache::new(searcher_config.partial_request_cache_capacity.as_u64() as usize); + let aggregation_limit = AggregationLimits::new( + Some(searcher_config.aggregation_memory_limit.as_u64()), + Some(searcher_config.aggregation_bucket_limit), + ); Self { searcher_config, @@ -508,14 +514,12 @@ impl SearcherContext { leaf_search_cache, list_fields_cache, split_cache_opt, + aggregation_limit, } } - /// Returns a new instance to track the aggregation memory usage. - pub fn create_new_aggregation_limits(&self) -> AggregationLimits { - AggregationLimits::new( - Some(self.searcher_config.aggregation_memory_limit.as_u64()), - Some(self.searcher_config.aggregation_bucket_limit), - ) + /// Returns the shared instance to track the aggregation memory usage. + pub fn get_aggregation_limits(&self) -> AggregationLimits { + self.aggregation_limit.clone() } } diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index 772eb2f9ef3..a13286b948d 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -1056,7 +1056,7 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec { let searcher_context: Arc = Arc::new(SearcherContext::new(SearcherConfig::default(), None)); - let agg_limits = searcher_context.create_new_aggregation_limits(); + let agg_limits = searcher_context.get_aggregation_limits(); let search_response = leaf_search( searcher_context, diff --git a/quickwit/rest-api-tests/scenarii/sort_orders/0001-sort-elasticapi.yaml b/quickwit/rest-api-tests/scenarii/sort_orders/0001-sort-elasticapi.yaml index 0c44e8f39a0..f235915c365 100644 --- a/quickwit/rest-api-tests/scenarii/sort_orders/0001-sort-elasticapi.yaml +++ b/quickwit/rest-api-tests/scenarii/sort_orders/0001-sort-elasticapi.yaml @@ -9,17 +9,17 @@ json: match: count: 10 sort: - - count: {"order" : "desc"} + - id: {"order" : "desc"} expected: hits: total: value: 4 relation: "eq" hits: - - _source: { "count": 15, "id": 2 } - - _source: { "count": -2.5, "id": 4 } - _source: { "id": 5 } + - _source: { "count": -2.5, "id": 4 } - _source: { "id": 3 } + - _source: { "count": 15, "id": 2 } --- endpoint: _elastic/sortorder/_search json: @@ -29,16 +29,16 @@ json: match: count: 10 sort: - - count: {"order" : "asc"} + - id: {"order" : "asc"} expected: hits: total: value: 4 relation: "eq" hits: - - _source: {"count": -2.5, "id": 4} - _source: {"count": 15, "id": 2 } - _source: {"id": 3} + - _source: {"count": -2.5, "id": 4} - _source: {"id": 5} --- endpoint: _elastic/sortorder/_search