Skip to content

Commit

Permalink
Change in the leaf search semaphore acquisition logic. (#3785)
Browse files Browse the repository at this point in the history
Instead, of spawning N tasks and acquiring the semaphore within
the tasks, we do the acquisition before spawning the task.

This has two effect:
- we end up running less tasks at a given time.
- In presence of concurrent request,
we should naturally acquire permits in bulks.

In theory, the latter should decrease average latency.
Ideally we should have some scheduler logic...
  • Loading branch information
fulmicoton authored Sep 7, 2023
1 parent ef9a31e commit 04a6e4a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 23 deletions.
45 changes: 25 additions & 20 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,20 +442,20 @@ pub async fn leaf_search(
) -> Result<LeafSearchResponse, SearchError> {
info!(splits_num = splits.len(), split_offsets = ?PrettySample::new(splits, 5));
let request = Arc::new(request.clone());
let leaf_search_single_split_futures: Vec<_> = splits
.iter()
.map(|split| {
let split = split.clone();
let doc_mapper_clone = doc_mapper.clone();
let index_storage_clone = index_storage.clone();
let searcher_context_clone = searcher_context.clone();
let request = request.clone();
tokio::spawn(
async move {
let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore
.acquire()
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len());
for split in splits {
let searcher_context_clone = searcher_context.clone();
let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore
.clone()
.acquire_owned()
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
let split = split.clone();
let doc_mapper_clone = doc_mapper.clone();
let index_storage_clone = index_storage.clone();
let request = request.clone();
let leaf_search_single_split_future = tokio::spawn(
async move {
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
let timer = crate::SEARCH_METRICS
.leaf_search_split_duration_secs
Expand All @@ -469,11 +469,16 @@ pub async fn leaf_search(
)
.await;
timer.observe_duration();
// We explicitly drop it, to highlight it to the reader and to force the move.
drop(leaf_split_search_permit);
leaf_search_single_split_res.map_err(|err| (split.split_id.clone(), err))
}.in_current_span())
})
.collect();
let split_search_results = futures::future::join_all(leaf_search_single_split_futures).await;
}
.in_current_span(),
);
leaf_search_single_split_futures.push(leaf_search_single_split_future);
}
let split_search_results: Vec<Result<Result<LeafSearchResponse, _>, _>> =
futures::future::join_all(leaf_search_single_split_futures).await;

// the result wrapping is only for the collector api merge_fruits
// (Vec<tantivy::Result<LeafSearchResponse>>)
Expand Down Expand Up @@ -638,8 +643,8 @@ pub async fn leaf_list_terms(
let index_storage_clone = index_storage.clone();
let searcher_context_clone = searcher_context.clone();
async move {
let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore
.acquire()
let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore.clone()
.acquire_owned()
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
// TODO dedicated counter and timer?
Expand Down
7 changes: 4 additions & 3 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ pub struct SearcherContext {
/// Fast fields cache.
pub fast_fields_cache: Arc<dyn StorageCache>,
/// Counting semaphore to limit concurrent leaf search split requests.
pub leaf_search_split_semaphore: Semaphore,
pub leaf_search_split_semaphore: Arc<Semaphore>,
/// Split footer cache.
pub split_footer_cache: MemorySizedCache<String>,
/// Counting semaphore to limit concurrent split stream requests.
Expand Down Expand Up @@ -422,8 +422,9 @@ impl SearcherContext {
capacity_in_bytes,
&quickwit_storage::STORAGE_METRICS.split_footer_cache,
);
let leaf_search_split_semaphore =
Semaphore::new(searcher_config.max_num_concurrent_split_searches);
let leaf_search_split_semaphore = Arc::new(Semaphore::new(
searcher_config.max_num_concurrent_split_searches,
));
let split_stream_semaphore =
Semaphore::new(searcher_config.max_num_concurrent_split_streams);
let fast_field_cache_capacity =
Expand Down

0 comments on commit 04a6e4a

Please sign in to comment.