From 5666fa78ae777a5a6a87e55861ff133eb8ad3be4 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Thu, 14 Dec 2023 17:59:01 +0100 Subject: [PATCH] use search_after in scroll --- quickwit/quickwit-search/src/root.rs | 38 +++++++++++++++---- .../quickwit-search/src/scroll_context.rs | 33 ++++++++++++++-- quickwit/quickwit-search/src/service.rs | 1 + 3 files changed, 61 insertions(+), 11 deletions(-) diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 05252a65969..54663097be3 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -407,7 +407,7 @@ async fn search_partial_hits_phase_with_scroll( // This is a scroll request. // // We increase max hits to add populate the scroll cache. - search_request.max_hits = SCROLL_BATCH_LEN as u64; + search_request.max_hits = search_request.max_hits.max(SCROLL_BATCH_LEN as u64); search_request.scroll_ttl_secs = None; let mut leaf_search_resp = search_partial_hits_phase( searcher_context, @@ -422,7 +422,7 @@ async fn search_partial_hits_phase_with_scroll( let scroll_context_search_request = simplify_search_request_for_scroll_api(&search_request)?; - let scroll_ctx = ScrollContext { + let mut scroll_ctx = ScrollContext { indexes_metas_for_leaf_search: indexes_metas_for_leaf_search.clone(), split_metadatas: split_metadatas.to_vec(), search_request: scroll_context_search_request, @@ -438,6 +438,7 @@ async fn search_partial_hits_phase_with_scroll( ) .next_page(leaf_search_resp.partial_hits.len() as u64); + scroll_ctx.truncate_start(); let payload: Vec = scroll_ctx.serialize(); let scroll_key = scroll_key_and_start_offset.scroll_key(); cluster_client @@ -2933,7 +2934,11 @@ mod tests { assert_eq!(timestamp_range_extractor.end_timestamp, Some(1620283880)); } - fn create_search_resp(index_uri: &str, hit_range: Range) -> LeafSearchResponse { + fn create_search_resp( + index_uri: &str, + hit_range: Range, + search_after: Option, + ) -> LeafSearchResponse { let (num_total_hits, split_id) = match index_uri { "ram:///test-index-1" => (TOTAL_NUM_HITS_INDEX_1, "split1"), "ram:///test-index-2" => (TOTAL_NUM_HITS_INDEX_2, "split2"), @@ -2942,6 +2947,17 @@ mod tests { let doc_ids = (0..num_total_hits) .rev() + .filter(|elem| { + if let Some(search_after) = &search_after { + if split_id == search_after.split_id { + *elem < (search_after.doc_id as usize) + } else { + split_id < search_after.split_id.as_str() + } + } else { + true + } + }) .skip(hit_range.start) .take(hit_range.end - hit_range.start); quickwit_proto::search::LeafSearchResponse { @@ -2989,43 +3005,49 @@ mod tests { let mut mock_search_service = MockSearchService::new(); mock_search_service.expect_leaf_search().times(2).returning( |req: quickwit_proto::search::LeafSearchRequest| { - let search_req: &SearchRequest = req.search_request.as_ref().unwrap(); + let search_req = req.search_request.unwrap(); // the leaf request does not need to know about the scroll_ttl. assert_eq!(search_req.start_offset, 0u64); assert!(search_req.scroll_ttl_secs.is_none()); assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN); + assert!(search_req.search_after.is_none()); Ok(create_search_resp( &req.index_uri, search_req.start_offset as usize ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, )) }, ); mock_search_service.expect_leaf_search().times(2).returning( |req: quickwit_proto::search::LeafSearchRequest| { - let search_req: &SearchRequest = req.search_request.as_ref().unwrap(); + let search_req = req.search_request.unwrap(); // the leaf request does not need to know about the scroll_ttl. assert_eq!(search_req.start_offset, 0u64); assert!(search_req.scroll_ttl_secs.is_none()); - assert_eq!(search_req.max_hits as usize, 2 * SCROLL_BATCH_LEN); + assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN); + assert!(search_req.search_after.is_some()); Ok(create_search_resp( &req.index_uri, search_req.start_offset as usize ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, )) }, ); mock_search_service.expect_leaf_search().times(2).returning( |req: quickwit_proto::search::LeafSearchRequest| { - let search_req: &SearchRequest = req.search_request.as_ref().unwrap(); + let search_req = req.search_request.unwrap(); // the leaf request does not need to know about the scroll_ttl. assert_eq!(search_req.start_offset, 0u64); assert!(search_req.scroll_ttl_secs.is_none()); - assert_eq!(search_req.max_hits as usize, 3 * SCROLL_BATCH_LEN); + assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN); + assert!(search_req.search_after.is_some()); Ok(create_search_resp( &req.index_uri, search_req.start_offset as usize ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, )) }, ); diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index c82e4f15210..2ed80f2b786 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -85,6 +85,17 @@ impl ScrollContext { &truncated_partial_hits[..num_partial_hits] } + /// Truncate the first few stored partial hits if we have more than SCROLL_BATCH_LEN of them. + pub fn truncate_start(&mut self) { + if self.cached_partial_hits.len() <= SCROLL_BATCH_LEN { + return; + } + + let to_truncate = self.cached_partial_hits.len() - SCROLL_BATCH_LEN; + self.cached_partial_hits.drain(..to_truncate); + self.cached_partial_hits_start_offset += to_truncate as u64; + } + pub fn serialize(&self) -> Vec { let uncompressed_payload = serde_json::to_string(self).unwrap(); uncompressed_payload.as_bytes().to_vec() @@ -104,11 +115,27 @@ impl ScrollContext { cluster_client: &ClusterClient, searcher_context: &SearcherContext, ) -> crate::Result { - if self.cached_partial_hits_start_offset <= start_offset && self.last_page_in_cache() { + // if start_offset - 1 isn't in cache, either we got a query which we already answered + // earlier, or we got a request from the future (forged, or we failed to write in the kv + // store) + if !(self.cached_partial_hits_start_offset + ..self.cached_partial_hits_start_offset + self.cached_partial_hits.len() as u64) + .contains(&(start_offset - 1)) + { + return Err(crate::SearchError::InvalidQuery( + "Reused scroll_id".to_string(), + )); + } + + if self.last_page_in_cache() { return Ok(false); } - self.search_request.max_hits = SCROLL_BATCH_LEN as u64; - self.search_request.start_offset = start_offset; + + let previous_last_hit = self.cached_partial_hits + [(start_offset - 1 - self.cached_partial_hits_start_offset) as usize] + .clone(); + + self.search_request.search_after = Some(previous_last_hit); let leaf_search_response: LeafSearchResponse = crate::root::search_partial_hits_phase( searcher_context, &self.indexes_metas_for_leaf_search, diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 48d0d6e2e05..2baaf7fdaa7 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -357,6 +357,7 @@ pub(crate) async fn scroll( if let Some(scroll_ttl_secs) = scroll_request.scroll_ttl_secs { if scroll_context_modified { + scroll_context.truncate_start(); let payload = scroll_context.serialize(); let scroll_ttl = Duration::from_secs(scroll_ttl_secs as u64); cluster_client