From 04a6e4ad959ab9592f78afaaccf437f66916415c Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 7 Sep 2023 11:28:01 +0900 Subject: [PATCH] Change in the leaf search semaphore acquisition logic. (#3785) Instead, of spawning N tasks and acquiring the semaphore within the tasks, we do the acquisition before spawning the task. This has two effect: - we end up running less tasks at a given time. - In presence of concurrent request, we should naturally acquire permits in bulks. In theory, the latter should decrease average latency. Ideally we should have some scheduler logic... --- quickwit/quickwit-search/src/leaf.rs | 45 ++++++++++++++----------- quickwit/quickwit-search/src/service.rs | 7 ++-- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 94f1ddda049..8b6d9620d2d 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -442,20 +442,20 @@ pub async fn leaf_search( ) -> Result { info!(splits_num = splits.len(), split_offsets = ?PrettySample::new(splits, 5)); let request = Arc::new(request.clone()); - let leaf_search_single_split_futures: Vec<_> = splits - .iter() - .map(|split| { - let split = split.clone(); - let doc_mapper_clone = doc_mapper.clone(); - let index_storage_clone = index_storage.clone(); - let searcher_context_clone = searcher_context.clone(); - let request = request.clone(); - tokio::spawn( - async move { - let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore - .acquire() - .await - .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); + let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len()); + for split in splits { + let searcher_context_clone = searcher_context.clone(); + let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore + .clone() + .acquire_owned() + .await + .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); + let split = split.clone(); + let doc_mapper_clone = doc_mapper.clone(); + let index_storage_clone = index_storage.clone(); + let request = request.clone(); + let leaf_search_single_split_future = tokio::spawn( + async move { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); let timer = crate::SEARCH_METRICS .leaf_search_split_duration_secs @@ -469,11 +469,16 @@ pub async fn leaf_search( ) .await; timer.observe_duration(); + // We explicitly drop it, to highlight it to the reader and to force the move. + drop(leaf_split_search_permit); leaf_search_single_split_res.map_err(|err| (split.split_id.clone(), err)) - }.in_current_span()) - }) - .collect(); - let split_search_results = futures::future::join_all(leaf_search_single_split_futures).await; + } + .in_current_span(), + ); + leaf_search_single_split_futures.push(leaf_search_single_split_future); + } + let split_search_results: Vec, _>> = + futures::future::join_all(leaf_search_single_split_futures).await; // the result wrapping is only for the collector api merge_fruits // (Vec>) @@ -638,8 +643,8 @@ pub async fn leaf_list_terms( let index_storage_clone = index_storage.clone(); let searcher_context_clone = searcher_context.clone(); async move { - let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore - .acquire() + let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore.clone() + .acquire_owned() .await .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); // TODO dedicated counter and timer? diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 4733f5e576a..227baadd555 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -393,7 +393,7 @@ pub struct SearcherContext { /// Fast fields cache. pub fast_fields_cache: Arc, /// Counting semaphore to limit concurrent leaf search split requests. - pub leaf_search_split_semaphore: Semaphore, + pub leaf_search_split_semaphore: Arc, /// Split footer cache. pub split_footer_cache: MemorySizedCache, /// Counting semaphore to limit concurrent split stream requests. @@ -422,8 +422,9 @@ impl SearcherContext { capacity_in_bytes, &quickwit_storage::STORAGE_METRICS.split_footer_cache, ); - let leaf_search_split_semaphore = - Semaphore::new(searcher_config.max_num_concurrent_split_searches); + let leaf_search_split_semaphore = Arc::new(Semaphore::new( + searcher_config.max_num_concurrent_split_searches, + )); let split_stream_semaphore = Semaphore::new(searcher_config.max_num_concurrent_split_streams); let fast_field_cache_capacity =