Skip to content

Commit

Permalink
fix jobs group_by (#4922)
Browse files Browse the repository at this point in the history
Same bug as #4880, but on
other code paths. unify code paths.
  • Loading branch information
PSeitz authored Apr 29, 2024
1 parent 32edb28 commit f1f6cd1
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 41 deletions.
11 changes: 8 additions & 3 deletions quickwit/quickwit-search/src/list_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use quickwit_proto::types::IndexUid;
use quickwit_storage::Storage;

use crate::leaf::open_split_bundle;
use crate::search_job_placer::group_jobs_by_index_id;
use crate::service::SearcherContext;
use crate::{list_relevant_splits, resolve_index_patterns, ClusterClient, SearchError, SearchJob};

Expand Down Expand Up @@ -356,20 +357,24 @@ pub fn jobs_to_leaf_requests(
let search_request_for_leaf = request.clone();
let mut leaf_search_requests = Vec::new();
// Group jobs by index uid.
for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) {
let index_meta = index_uid_to_id.get(&index_uid).ok_or_else(|| {
group_jobs_by_index_id(jobs, |job_group| {
let index_uid = &job_group[0].index_uid;
let index_meta = index_uid_to_id.get(index_uid).ok_or_else(|| {
SearchError::Internal(format!(
"received list fields job for an unknown index {index_uid}. it should never happen"
))
})?;

let leaf_search_request = LeafListFieldsRequest {
index_id: index_meta.index_id.to_string(),
index_uri: index_meta.index_uri.to_string(),
fields: search_request_for_leaf.fields.clone(),
split_offsets: job_group.into_iter().map(|job| job.offsets).collect(),
};
leaf_search_requests.push(leaf_search_request);
}
Ok(())
})?;

Ok(leaf_search_requests)
}

Expand Down
11 changes: 7 additions & 4 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use tantivy::{ReloadPolicy, Term};
use tracing::{debug, error, info, instrument};

use crate::leaf::open_index_with_caches;
use crate::search_job_placer::group_jobs_by_index_id;
use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext};

/// Performs a distributed list terms.
Expand Down Expand Up @@ -184,20 +185,22 @@ pub fn jobs_to_leaf_requests(
) -> crate::Result<Vec<LeafListTermsRequest>> {
let search_request_for_leaf = request.clone();
let mut leaf_search_requests = Vec::new();
// Group jobs by index uid.
for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) {
let index_uri = index_uid_to_uri.get(&index_uid).ok_or_else(|| {
group_jobs_by_index_id(jobs, |job_group| {
let index_uid = &job_group[0].index_uid;
let index_uri = index_uid_to_uri.get(index_uid).ok_or_else(|| {
SearchError::Internal(format!(
"received list fields job for an unknown index {index_uid}. it should never happen"
))
})?;

let leaf_search_request = LeafListTermsRequest {
list_terms_request: Some(search_request_for_leaf.clone()),
index_uri: index_uri.to_string(),
split_offsets: job_group.into_iter().map(|job| job.offsets).collect(),
};
leaf_search_requests.push(leaf_search_request);
}
Ok(())
})?;
Ok(leaf_search_requests)
}

Expand Down
75 changes: 42 additions & 33 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::cluster_client::ClusterClient;
use crate::collector::{make_merge_collector, QuickwitAggregations};
use crate::find_trace_ids_collector::Span;
use crate::scroll_context::{ScrollContext, ScrollKeyAndStartOffset};
use crate::search_job_placer::Job;
use crate::search_job_placer::{group_by, group_jobs_by_index_id, Job};
use crate::service::SearcherContext;
use crate::{
extract_split_and_footer_offsets, list_relevant_splits, SearchError, SearchJobPlacer,
Expand Down Expand Up @@ -1399,28 +1399,30 @@ fn compute_split_cost(_split_metadata: &SplitMetadata) -> usize {
pub fn jobs_to_leaf_requests(
request: &SearchRequest,
search_indexes_metadatas: &IndexesMetasForLeafSearch,
mut jobs: Vec<SearchJob>,
jobs: Vec<SearchJob>,
) -> crate::Result<Vec<LeafSearchRequest>> {
let mut search_request_for_leaf = request.clone();
search_request_for_leaf.start_offset = 0;
search_request_for_leaf.max_hits += request.start_offset;
let mut leaf_search_requests = Vec::new();
// Group jobs by index uid.
jobs.sort_by(|job1, job2| job1.index_uid.cmp(&job2.index_uid));
for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) {
let search_index_meta = search_indexes_metadatas.get(&index_uid).ok_or_else(|| {
group_jobs_by_index_id(jobs, |job_group| {
let index_uid = &job_group[0].index_uid;
let search_index_meta = search_indexes_metadatas.get(index_uid).ok_or_else(|| {
SearchError::Internal(format!(
"received search job for an unknown index {index_uid}. it should never happen"
"received job for an unknown index {index_uid}. it should never happen"
))
})?;

let leaf_search_request = LeafSearchRequest {
search_request: Some(search_request_for_leaf.clone()),
split_offsets: job_group.into_iter().map(|job| job.offsets).collect(),
doc_mapper: search_index_meta.doc_mapper_str.clone(),
index_uri: search_index_meta.index_uri.to_string(),
};
leaf_search_requests.push(leaf_search_request);
}
Ok(())
})?;
Ok(leaf_search_requests)
}

Expand All @@ -1432,32 +1434,39 @@ pub fn jobs_to_fetch_docs_requests(
) -> crate::Result<Vec<FetchDocsRequest>> {
let mut fetch_docs_requests = Vec::new();
// Group jobs by index uid.
for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) {
let index_meta = indexes_metas_for_leaf_search
.get(&index_uid)
.ok_or_else(|| {
SearchError::Internal(format!(
"received search job for an unknown index {index_uid}"
))
})?;
let fetch_docs_jobs: Vec<FetchDocsJob> = job_group.collect();
let partial_hits: Vec<PartialHit> = fetch_docs_jobs
.iter()
.flat_map(|fetch_doc_job| fetch_doc_job.partial_hits.iter().cloned())
.collect();
let split_offsets: Vec<SplitIdAndFooterOffsets> = fetch_docs_jobs
.into_iter()
.map(|fetch_doc_job| fetch_doc_job.into())
.collect();
let fetch_docs_req = FetchDocsRequest {
partial_hits,
split_offsets,
index_uri: index_meta.index_uri.to_string(),
snippet_request: snippet_request_opt.clone(),
doc_mapper: index_meta.doc_mapper_str.clone(),
};
fetch_docs_requests.push(fetch_docs_req);
}
group_by(
jobs,
|job| &job.index_uid,
|fetch_docs_jobs| {
let index_uid = &fetch_docs_jobs[0].index_uid;

let index_meta = indexes_metas_for_leaf_search
.get(index_uid)
.ok_or_else(|| {
SearchError::Internal(format!(
"received search job for an unknown index {index_uid}"
))
})?;
let partial_hits: Vec<PartialHit> = fetch_docs_jobs
.iter()
.flat_map(|fetch_doc_job| fetch_doc_job.partial_hits.iter().cloned())
.collect();
let split_offsets: Vec<SplitIdAndFooterOffsets> = fetch_docs_jobs
.into_iter()
.map(|fetch_doc_job| fetch_doc_job.into())
.collect();
let fetch_docs_req = FetchDocsRequest {
partial_hits,
split_offsets,
index_uri: index_meta.index_uri.to_string(),
snippet_request: snippet_request_opt.clone(),
doc_mapper: index_meta.doc_mapper_str.clone(),
};
fetch_docs_requests.push(fetch_docs_req);

Ok(())
},
)?;
Ok(fetch_docs_requests)
}

Expand Down
91 changes: 90 additions & 1 deletion quickwit/quickwit-search/src/search_job_placer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use quickwit_common::pubsub::EventSubscriber;
use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash};
use quickwit_proto::search::{ReportSplit, ReportSplitsRequest};

use crate::{SearchServiceClient, SearcherPool};
use crate::{SearchJob, SearchServiceClient, SearcherPool};

/// Job.
/// The unit in which distributed search is performed.
Expand Down Expand Up @@ -234,11 +234,100 @@ impl PartialEq for CandidateNodes {

impl Eq for CandidateNodes {}

/// Groups jobs by index id and returns a list of `SearchJob` per index
pub fn group_jobs_by_index_id(
jobs: Vec<SearchJob>,
cb: impl FnMut(Vec<SearchJob>) -> crate::Result<()>,
) -> crate::Result<()> {
// Group jobs by index uid.
group_by(jobs, |job| &job.index_uid, cb)?;
Ok(())
}

/// Note: The data will be sorted.
///
/// Returns slices of the input data grouped by passed closure.
pub fn group_by<T, K: Ord, F>(
mut data: Vec<T>,
compare_by: impl Fn(&T) -> &K,
mut callback: F,
) -> crate::Result<()>
where
F: FnMut(Vec<T>) -> crate::Result<()>,
{
data.sort_by(|job1, job2| compare_by(job2).cmp(compare_by(job1)));
while !data.is_empty() {
let last_element = data.last().unwrap();
let count = data
.iter()
.rev()
.take_while(|&x| compare_by(x) == compare_by(last_element))
.count();

let group = data.split_off(data.len() - count);
callback(group)?;
}

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{searcher_pool_for_test, MockSearchService, SearchJob};

#[test]
fn test_group_by_1() {
let data = vec![1, 1, 2, 2, 2, 3, 4, 4, 5, 5, 5];
let mut outputs: Vec<Vec<i32>> = Vec::new();
group_by(
data,
|el| el,
|group| {
outputs.push(group);
Ok(())
},
)
.unwrap();
assert_eq!(outputs.len(), 5);
assert_eq!(outputs[0], vec![1, 1]);
assert_eq!(outputs[1], vec![2, 2, 2]);
assert_eq!(outputs[2], vec![3]);
assert_eq!(outputs[3], vec![4, 4]);
assert_eq!(outputs[4], vec![5, 5, 5]);
}
#[test]
fn test_group_by_all_same() {
let data = vec![1, 1];
let mut outputs: Vec<Vec<i32>> = Vec::new();
group_by(
data,
|el| el,
|group| {
outputs.push(group);
Ok(())
},
)
.unwrap();
assert_eq!(outputs.len(), 1);
assert_eq!(outputs[0], vec![1, 1]);
}
#[test]
fn test_group_by_empty() {
let data = vec![];
let mut outputs: Vec<Vec<i32>> = Vec::new();
group_by(
data,
|el| el,
|group| {
outputs.push(group);
Ok(())
},
)
.unwrap();
assert_eq!(outputs.len(), 0);
}

#[tokio::test]
async fn test_search_job_placer() {
{
Expand Down

0 comments on commit f1f6cd1

Please sign in to comment.