Skip to content

Commit

Permalink
use search_after in scroll
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a committed Dec 14, 2023
1 parent 9f20660 commit 5666fa7
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 11 deletions.
38 changes: 30 additions & 8 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ async fn search_partial_hits_phase_with_scroll(
// This is a scroll request.
//
// We increase max hits to add populate the scroll cache.
search_request.max_hits = SCROLL_BATCH_LEN as u64;
search_request.max_hits = search_request.max_hits.max(SCROLL_BATCH_LEN as u64);
search_request.scroll_ttl_secs = None;
let mut leaf_search_resp = search_partial_hits_phase(
searcher_context,
Expand All @@ -422,7 +422,7 @@ async fn search_partial_hits_phase_with_scroll(

let scroll_context_search_request =
simplify_search_request_for_scroll_api(&search_request)?;
let scroll_ctx = ScrollContext {
let mut scroll_ctx = ScrollContext {
indexes_metas_for_leaf_search: indexes_metas_for_leaf_search.clone(),
split_metadatas: split_metadatas.to_vec(),
search_request: scroll_context_search_request,
Expand All @@ -438,6 +438,7 @@ async fn search_partial_hits_phase_with_scroll(
)
.next_page(leaf_search_resp.partial_hits.len() as u64);

scroll_ctx.truncate_start();
let payload: Vec<u8> = scroll_ctx.serialize();
let scroll_key = scroll_key_and_start_offset.scroll_key();
cluster_client
Expand Down Expand Up @@ -2933,7 +2934,11 @@ mod tests {
assert_eq!(timestamp_range_extractor.end_timestamp, Some(1620283880));
}

fn create_search_resp(index_uri: &str, hit_range: Range<usize>) -> LeafSearchResponse {
fn create_search_resp(
index_uri: &str,
hit_range: Range<usize>,
search_after: Option<PartialHit>,
) -> LeafSearchResponse {
let (num_total_hits, split_id) = match index_uri {
"ram:///test-index-1" => (TOTAL_NUM_HITS_INDEX_1, "split1"),
"ram:///test-index-2" => (TOTAL_NUM_HITS_INDEX_2, "split2"),
Expand All @@ -2942,6 +2947,17 @@ mod tests {

let doc_ids = (0..num_total_hits)
.rev()
.filter(|elem| {
if let Some(search_after) = &search_after {
if split_id == search_after.split_id {
*elem < (search_after.doc_id as usize)
} else {
split_id < search_after.split_id.as_str()
}
} else {
true
}
})
.skip(hit_range.start)
.take(hit_range.end - hit_range.start);
quickwit_proto::search::LeafSearchResponse {
Expand Down Expand Up @@ -2989,43 +3005,49 @@ mod tests {
let mut mock_search_service = MockSearchService::new();
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req: &SearchRequest = req.search_request.as_ref().unwrap();
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN);
assert!(search_req.search_after.is_none());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req: &SearchRequest = req.search_request.as_ref().unwrap();
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, 2 * SCROLL_BATCH_LEN);
assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req: &SearchRequest = req.search_request.as_ref().unwrap();
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, 3 * SCROLL_BATCH_LEN);
assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
Expand Down
33 changes: 30 additions & 3 deletions quickwit/quickwit-search/src/scroll_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ impl ScrollContext {
&truncated_partial_hits[..num_partial_hits]
}

/// Truncate the first few stored partial hits if we have more than SCROLL_BATCH_LEN of them.
pub fn truncate_start(&mut self) {
if self.cached_partial_hits.len() <= SCROLL_BATCH_LEN {
return;
}

let to_truncate = self.cached_partial_hits.len() - SCROLL_BATCH_LEN;
self.cached_partial_hits.drain(..to_truncate);
self.cached_partial_hits_start_offset += to_truncate as u64;
}

pub fn serialize(&self) -> Vec<u8> {
let uncompressed_payload = serde_json::to_string(self).unwrap();
uncompressed_payload.as_bytes().to_vec()
Expand All @@ -104,11 +115,27 @@ impl ScrollContext {
cluster_client: &ClusterClient,
searcher_context: &SearcherContext,
) -> crate::Result<bool> {
if self.cached_partial_hits_start_offset <= start_offset && self.last_page_in_cache() {
// if start_offset - 1 isn't in cache, either we got a query which we already answered
// earlier, or we got a request from the future (forged, or we failed to write in the kv
// store)
if !(self.cached_partial_hits_start_offset
..self.cached_partial_hits_start_offset + self.cached_partial_hits.len() as u64)
.contains(&(start_offset - 1))
{
return Err(crate::SearchError::InvalidQuery(
"Reused scroll_id".to_string(),
));
}

if self.last_page_in_cache() {
return Ok(false);
}
self.search_request.max_hits = SCROLL_BATCH_LEN as u64;
self.search_request.start_offset = start_offset;

let previous_last_hit = self.cached_partial_hits
[(start_offset - 1 - self.cached_partial_hits_start_offset) as usize]
.clone();

self.search_request.search_after = Some(previous_last_hit);
let leaf_search_response: LeafSearchResponse = crate::root::search_partial_hits_phase(
searcher_context,
&self.indexes_metas_for_leaf_search,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ pub(crate) async fn scroll(

if let Some(scroll_ttl_secs) = scroll_request.scroll_ttl_secs {
if scroll_context_modified {
scroll_context.truncate_start();
let payload = scroll_context.serialize();
let scroll_ttl = Duration::from_secs(scroll_ttl_secs as u64);
cluster_client
Expand Down

0 comments on commit 5666fa7

Please sign in to comment.