Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/count_opt' into trinity/placin-a…
Browse files Browse the repository at this point in the history
…lgorithm-fulmicoton-for-airmail
  • Loading branch information
fulmicoton committed Jun 4, 2024
2 parents bc1904e + 14af027 commit e715689
Show file tree
Hide file tree
Showing 16 changed files with 385 additions and 21 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/publish_docker_images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ jobs:
include:
- os: ubuntu-latest
platform: linux/amd64
- os: buildjet-8vcpu-ubuntu-2204-arm
platform: linux/arm64
#- os: buildjet-8vcpu-ubuntu-2204-arm
# platform: linux/arm64
runs-on: ${{ matrix.os }}
steps:
- name: Checkout
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ message SplitIdAndFooterOffsets {
optional int64 timestamp_start = 4;
// The highest timestamp appearing in the split, in seconds since epoch
optional int64 timestamp_end = 5;
// The number of docs in the split
uint64 num_docs = 6;
}

// Hits returned by a FetchDocRequest.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ mod tests {
split_footer_start: 0,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
}],
..Default::default()
}
Expand All @@ -406,13 +407,15 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
],
}],
Expand Down Expand Up @@ -441,13 +444,15 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
],
}
Expand Down
158 changes: 139 additions & 19 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,17 @@ async fn leaf_search_single_split(
return Ok(cached_answer);
}

let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// CanSplitDoBetter or rewrite_request may have changed the request to be a count only request
// This may be the case for AllQuery with a sort by date and time filter, where the current
// split can't have better results.
//
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok(get_leaf_resp_from_count(split.num_docs));
}

let split_id = split.split_id.to_string();
let index = open_index_with_caches(
searcher_context,
Expand All @@ -382,19 +393,6 @@ async fn leaf_search_single_split(

let mut collector =
make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?;
let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// CanSplitDoBetter or rewrite_request may have changed the request to be a count only request
// This may be the case for AllQuery with a sort by date, where the current split can't have
// better results.
//
// TODO: SplitIdAndFooterOffsets could carry the number of docs in a split, so we could save
// opening the index and execute this earlier. Opening splits is typically served from the
// cache, so there may be no gain adding that info to SplitIdAndFooterOffsets.
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
return Ok(get_leaf_resp_from_count(searcher.num_docs() as u64));
}

let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?;

Expand Down Expand Up @@ -879,6 +877,130 @@ impl CanSplitDoBetter {
}
}

/// This function tries to detect upfront which splits contain the top n hits and convert other
/// split searches to count only searches. It also optimizes split order.
///
/// Returns the search_requests with their split.
fn optimize(
&self,
request: Arc<SearchRequest>,
mut splits: Vec<SplitIdAndFooterOffsets>,
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> {
self.optimize_split_order(&mut splits);
// TODO: we maybe want here some deduplication + Cow logic
let mut split_with_req = splits
.into_iter()
.map(|split| (split, (*request).clone()))
.collect::<Vec<_>>();

if request.aggregation_request.is_some() || request.search_after.is_some() {
return Ok(split_with_req);
}
let query_ast: QueryAst = serde_json::from_str(request.query_ast.as_str())
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

// TODO: Update the logic to handle start_timestamp end_timestamp ranges
if request.start_timestamp.is_some() || request.end_timestamp.is_some() {
return Ok(split_with_req);
}

if !matches!(query_ast, QueryAst::MatchAll) {
return Ok(split_with_req);
}

// Count the number of splits which contain enough documents
let count_required_splits =
move |split_with_req: &[(SplitIdAndFooterOffsets, SearchRequest)],
num_requested_docs: u64| {
let mut num_docs = 0;
split_with_req
.iter()
.take_while(|(split, _req)| {
let need_more_docs = num_docs < num_requested_docs;
num_docs += split.num_docs;
need_more_docs
})
.count()
};

// reuse the detected sort order in split_filter
// we want to detect cases where we can convert some split queries to count only queries
let num_requested_docs = request.start_offset + request.max_hits;
match self {
CanSplitDoBetter::SplitIdHigher(_) => {
// In this case there is no sort order, we order by split id.
// If the the first split has enough documents, we can convert the other queries to
// count only queries
let num_splits = count_required_splits(&split_with_req, num_requested_docs);
for (_split, ref mut request) in &mut split_with_req[num_splits..] {
disable_search_request_hits(request);
}
}
CanSplitDoBetter::Uninformative => {}
CanSplitDoBetter::SplitTimestampLower(_) => {
// We order by timestamp asc. split_with_req is sorted by timestamp_start.
//
// Calculate the number of splits which are guaranteed to deliver enough documents.
let num_splits = count_required_splits(&split_with_req, num_requested_docs);
assert!(
num_splits > 0,
"We should always have at least one split to search"
);
//
// If we know that some splits will deliver enough documents, we can convert the
// others to count only queries.
// Since we only have start and end ranges and don't know the distribution we make
// sure the splits dont' overlap, since the distribution of two
// splits could be like this (dot is a timestamp doc on a x axis), for top 2
// queries.
// ```
// [. .] Split1 has enough docs, but last doc is not in top 2
// [.. .] Split2 first doc is in top2
// ```
// Let's get the biggest timestamp_end of the first num_splits splits
let biggest_end_timestamp = split_with_req
.iter()
.take(num_splits)
.map(|(split, _)| split.timestamp_end())
.max()
.unwrap();
for (split, ref mut request) in split_with_req.iter_mut().skip(num_splits) {
if split.timestamp_start() > biggest_end_timestamp {
disable_search_request_hits(request);
}
}
}
CanSplitDoBetter::SplitTimestampHigher(_) => {
// We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
//
// Calculate the number of splits which are guaranteed to deliver enough documents.
let num_splits = count_required_splits(&split_with_req, num_requested_docs);
assert!(
num_splits > 0,
"We should always have at least one split to search"
);
// We have the number of splits we need to search to get enough docs, now we need to
// find the splits that don't overlap.
//
// Let's get the smallest timestamp_start of the first num_splits splits
let smallest_start_timestamp = split_with_req
.iter()
.take(num_splits)
.map(|(split, _)| split.timestamp_start())
.min()
.unwrap();
for (split, ref mut request) in split_with_req.iter_mut().skip(num_splits) {
if split.timestamp_end() < smallest_start_timestamp {
disable_search_request_hits(request);
}
}
}
CanSplitDoBetter::FindTraceIdsAggregation(_) => {}
}

Ok(split_with_req)
}

/// Returns whether the given split can possibly give documents better than the one already
/// known to match.
fn can_be_better(&self, split: &SplitIdAndFooterOffsets) -> bool {
Expand Down Expand Up @@ -1071,14 +1193,14 @@ pub async fn leaf_search(
searcher_context: Arc<SearcherContext>,
request: Arc<SearchRequest>,
index_storage: Arc<dyn Storage>,
mut splits: Vec<SplitIdAndFooterOffsets>,
splits: Vec<SplitIdAndFooterOffsets>,
doc_mapper: Arc<dyn DocMapper>,
aggregations_limits: AggregationLimits,
) -> Result<LeafSearchResponse, SearchError> {
info!(splits_num = splits.len(), split_offsets = ?PrettySample::new(&splits, 5));

let split_filter = CanSplitDoBetter::from_request(&request, doc_mapper.timestamp_field_name());
split_filter.optimize_split_order(&mut splits);
let split_with_req = split_filter.optimize(request.clone(), splits)?;

// if client wants full count, or we are doing an aggregation, we want to run every splits.
// However if the aggregation is the tracing aggregation, we don't actually need all splits.
Expand All @@ -1088,22 +1210,20 @@ pub async fn leaf_search(

let split_filter = Arc::new(RwLock::new(split_filter));

let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len());
let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(split_with_req.len());

let merge_collector = make_merge_collector(&request, &aggregations_limits)?;
let incremental_merge_collector = IncrementalCollector::new(merge_collector);
let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector));

for split in splits {
for (split, mut request) in split_with_req {
let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore
.clone()
.acquire_owned()
.instrument(info_span!("waiting_for_leaf_search_split_semaphore"))
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");

let mut request = (*request).clone();

let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter);
if !can_be_better && !run_all_splits {
continue;
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-search/src/leaf_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};

let split_2 = SplitIdAndFooterOffsets {
Expand All @@ -215,6 +216,7 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};

let query_1 = SearchRequest {
Expand Down Expand Up @@ -269,20 +271,23 @@ mod tests {
split_footer_end: 100,
timestamp_start: Some(100),
timestamp_end: Some(199),
num_docs: 0,
};
let split_2 = SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: Some(150),
timestamp_end: Some(249),
num_docs: 0,
};
let split_3 = SplitIdAndFooterOffsets {
split_id: "split_3".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: Some(150),
timestamp_end: Some(249),
num_docs: 0,
};

let query_1 = SearchRequest {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn
.time_range
.as_ref()
.map(|time_range| *time_range.end()),
num_docs: split_metadata.num_docs as u64,
}
}

Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/src/list_fields_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};

let split_2 = SplitIdAndFooterOffsets {
Expand All @@ -95,6 +96,7 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};

let result = ListFieldsEntryResponse {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/retry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ mod tests {
split_footer_start: 0,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};
let client_for_retry = retry_client(
&search_job_placer,
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/src/retry/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ mod tests {
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
},
],
}],
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-search/src/retry/search_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,15 @@ mod tests {
split_footer_start: 0,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};
let split_2 = SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_end: 100,
split_footer_start: 0,
timestamp_start: None,
timestamp_end: None,
num_docs: 0,
};
let retry_policy = LeafSearchStreamRetryPolicy {};
let request = LeafSearchStreamRequest {
Expand Down
Loading

0 comments on commit e715689

Please sign in to comment.