diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 86e45fff0f5..062e0d371e5 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -350,7 +350,9 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result< // We remove the scroll ttl parameter. It is irrelevant to process later request scroll_ttl_secs: None, search_after: None, - count_hits: req.count_hits, + // request is simplified after initial query, and we cache the hit count, so we don't need + // to recompute it afterward. + count_hits: quickwit_proto::search::CountHits::Underestimate as i32, }) } @@ -526,7 +528,7 @@ async fn search_partial_hits_phase_with_scroll( // This is a scroll request. // // We increase max hits to add populate the scroll cache. - search_request.max_hits = SCROLL_BATCH_LEN as u64; + search_request.max_hits = search_request.max_hits.max(SCROLL_BATCH_LEN as u64); search_request.scroll_ttl_secs = None; let mut leaf_search_resp = search_partial_hits_phase( searcher_context, @@ -538,10 +540,15 @@ async fn search_partial_hits_phase_with_scroll( .await?; let cached_partial_hits = leaf_search_resp.partial_hits.clone(); leaf_search_resp.partial_hits.truncate(max_hits as usize); + let last_hit = leaf_search_resp + .partial_hits + .last() + .cloned() + .unwrap_or_default(); let scroll_context_search_request = simplify_search_request_for_scroll_api(&search_request)?; - let scroll_ctx = ScrollContext { + let mut scroll_ctx = ScrollContext { indexes_metas_for_leaf_search: indexes_metas_for_leaf_search.clone(), split_metadatas: split_metadatas.to_vec(), search_request: scroll_context_search_request, @@ -554,9 +561,11 @@ async fn search_partial_hits_phase_with_scroll( ScrollKeyAndStartOffset::new_with_start_offset( scroll_ctx.search_request.start_offset, max_hits as u32, + last_hit.clone(), ) - .next_page(leaf_search_resp.partial_hits.len() as u64); + .next_page(leaf_search_resp.partial_hits.len() as u64, last_hit); + scroll_ctx.clear_cache_if_unneeded(); let payload: Vec = scroll_ctx.serialize(); let scroll_key = scroll_key_and_start_offset.scroll_key(); cluster_client @@ -3714,7 +3723,11 @@ mod tests { assert_eq!(timestamp_range_extractor.end_timestamp, Some(1620283880)); } - fn create_search_resp(index_uri: &str, hit_range: Range) -> LeafSearchResponse { + fn create_search_resp( + index_uri: &str, + hit_range: Range, + search_after: Option, + ) -> LeafSearchResponse { let (num_total_hits, split_id) = match index_uri { "ram:///test-index-1" => (TOTAL_NUM_HITS_INDEX_1, "split1"), "ram:///test-index-2" => (TOTAL_NUM_HITS_INDEX_2, "split2"), @@ -3723,6 +3736,17 @@ mod tests { let doc_ids = (0..num_total_hits) .rev() + .filter(|elem| { + if let Some(search_after) = &search_after { + if split_id == search_after.split_id { + *elem < (search_after.doc_id as usize) + } else { + split_id < search_after.split_id.as_str() + } + } else { + true + } + }) .skip(hit_range.start) .take(hit_range.end - hit_range.start); quickwit_proto::search::LeafSearchResponse { @@ -3738,6 +3762,7 @@ mod tests { const TOTAL_NUM_HITS_INDEX_1: usize = 2_005; const TOTAL_NUM_HITS_INDEX_2: usize = 10; const MAX_HITS_PER_PAGE: usize = 93; + const MAX_HITS_PER_PAGE_LARGE: usize = 1_005; #[tokio::test] async fn test_root_search_with_scroll() { @@ -3770,43 +3795,49 @@ mod tests { let mut mock_search_service = MockSearchService::new(); mock_search_service.expect_leaf_search().times(2).returning( |req: quickwit_proto::search::LeafSearchRequest| { - let search_req: &SearchRequest = req.search_request.as_ref().unwrap(); + let search_req = req.search_request.unwrap(); // the leaf request does not need to know about the scroll_ttl. assert_eq!(search_req.start_offset, 0u64); assert!(search_req.scroll_ttl_secs.is_none()); assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN); + assert!(search_req.search_after.is_none()); Ok(create_search_resp( &req.index_uri, search_req.start_offset as usize ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, )) }, ); mock_search_service.expect_leaf_search().times(2).returning( |req: quickwit_proto::search::LeafSearchRequest| { - let search_req: &SearchRequest = req.search_request.as_ref().unwrap(); + let search_req = req.search_request.unwrap(); // the leaf request does not need to know about the scroll_ttl. assert_eq!(search_req.start_offset, 0u64); assert!(search_req.scroll_ttl_secs.is_none()); - assert_eq!(search_req.max_hits as usize, 2 * SCROLL_BATCH_LEN); + assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN); + assert!(search_req.search_after.is_some()); Ok(create_search_resp( &req.index_uri, search_req.start_offset as usize ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, )) }, ); mock_search_service.expect_leaf_search().times(2).returning( |req: quickwit_proto::search::LeafSearchRequest| { - let search_req: &SearchRequest = req.search_request.as_ref().unwrap(); + let search_req = req.search_request.unwrap(); // the leaf request does not need to know about the scroll_ttl. assert_eq!(search_req.start_offset, 0u64); assert!(search_req.scroll_ttl_secs.is_none()); - assert_eq!(search_req.max_hits as usize, 3 * SCROLL_BATCH_LEN); + assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN); + assert!(search_req.search_after.is_some()); Ok(create_search_resp( &req.index_uri, search_req.start_offset as usize ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, )) }, ); @@ -3917,6 +3948,190 @@ mod tests { ); } + #[tokio::test] + async fn test_root_search_with_scroll_large_page() { + let mut metastore = MetastoreServiceClient::mock(); + let index_metadata = IndexMetadata::for_test("test-index-1", "ram:///test-index-1"); + let index_uid = index_metadata.index_uid.clone(); + let index_metadata_2 = IndexMetadata::for_test("test-index-2", "ram:///test-index-2"); + let index_uid_2 = index_metadata_2.index_uid.clone(); + metastore + .expect_list_indexes_metadata() + .returning(move |_index_ids_query| { + let indexes_metadata = vec![index_metadata.clone(), index_metadata_2.clone()]; + Ok( + ListIndexesMetadataResponse::try_from_indexes_metadata(indexes_metadata) + .unwrap(), + ) + }); + metastore.expect_list_splits().returning(move |_filter| { + let splits = vec![ + MockSplitBuilder::new("split1") + .with_index_uid(&index_uid) + .build(), + MockSplitBuilder::new("split2") + .with_index_uid(&index_uid_2) + .build(), + ]; + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) + }); + let mut mock_search_service = MockSearchService::new(); + mock_search_service.expect_leaf_search().times(2).returning( + |req: quickwit_proto::search::LeafSearchRequest| { + let search_req = req.search_request.unwrap(); + // the leaf request does not need to know about the scroll_ttl. + assert_eq!(search_req.start_offset, 0u64); + assert!(search_req.scroll_ttl_secs.is_none()); + assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE); + assert!(search_req.search_after.is_none()); + Ok(create_search_resp( + &req.index_uri, + search_req.start_offset as usize + ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, + )) + }, + ); + mock_search_service.expect_leaf_search().times(2).returning( + |req: quickwit_proto::search::LeafSearchRequest| { + let search_req = req.search_request.unwrap(); + // the leaf request does not need to know about the scroll_ttl. + assert_eq!(search_req.start_offset, 0u64); + assert!(search_req.scroll_ttl_secs.is_none()); + assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE); + assert!(search_req.search_after.is_some()); + Ok(create_search_resp( + &req.index_uri, + search_req.start_offset as usize + ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, + )) + }, + ); + mock_search_service.expect_leaf_search().times(2).returning( + |req: quickwit_proto::search::LeafSearchRequest| { + let search_req = req.search_request.unwrap(); + // the leaf request does not need to know about the scroll_ttl. + assert_eq!(search_req.start_offset, 0u64); + assert!(search_req.scroll_ttl_secs.is_none()); + assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE); + assert!(search_req.search_after.is_some()); + Ok(create_search_resp( + &req.index_uri, + search_req.start_offset as usize + ..(search_req.start_offset + search_req.max_hits) as usize, + search_req.search_after, + )) + }, + ); + let kv: Arc, Vec>>> = Default::default(); + let kv_clone = kv.clone(); + mock_search_service + .expect_put_kv() + .returning(move |put_kv_req| { + kv_clone + .write() + .unwrap() + .insert(put_kv_req.key, put_kv_req.payload); + }); + mock_search_service + .expect_get_kv() + .returning(move |get_kv_req| kv.read().unwrap().get(&get_kv_req.key).cloned()); + mock_search_service.expect_fetch_docs().returning( + |fetch_docs_req: quickwit_proto::search::FetchDocsRequest| { + assert!(fetch_docs_req.partial_hits.len() <= MAX_HITS_PER_PAGE_LARGE); + Ok(quickwit_proto::search::FetchDocsResponse { + hits: get_doc_for_fetch_req(fetch_docs_req), + }) + }, + ); + let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service)]); + let search_job_placer = SearchJobPlacer::new(searcher_pool); + let searcher_context = SearcherContext::for_test(); + let cluster_client = ClusterClient::new(search_job_placer.clone()); + + let mut count_seen_hits = 0; + + let mut scroll_id: String = { + let search_request = quickwit_proto::search::SearchRequest { + index_id_patterns: vec!["test-index-*".to_string()], + query_ast: qast_json_helper("test", &["body"]), + max_hits: MAX_HITS_PER_PAGE_LARGE as u64, + scroll_ttl_secs: Some(60), + ..Default::default() + }; + let search_response = root_search( + &searcher_context, + search_request, + MetastoreServiceClient::from(metastore), + &cluster_client, + ) + .await + .unwrap(); + assert_eq!( + search_response.num_hits, + (TOTAL_NUM_HITS_INDEX_1 + TOTAL_NUM_HITS_INDEX_2) as u64 + ); + assert_eq!(search_response.hits.len(), MAX_HITS_PER_PAGE_LARGE); + let expected = (0..TOTAL_NUM_HITS_INDEX_2) + .rev() + .zip(std::iter::repeat("split2")) + .chain( + (0..TOTAL_NUM_HITS_INDEX_1) + .rev() + .zip(std::iter::repeat("split1")), + ); + for (hit, (doc_id, split)) in search_response.hits.iter().zip(expected) { + assert_eq!( + hit.partial_hit.as_ref().unwrap(), + &mock_partial_hit_opt_sort_value(split, None, doc_id as u32) + ); + } + count_seen_hits += search_response.hits.len(); + search_response.scroll_id.unwrap() + }; + for page in 1.. { + let scroll_req = ScrollRequest { + scroll_id, + scroll_ttl_secs: Some(60), + }; + let scroll_resp = + crate::service::scroll(scroll_req, &cluster_client, &searcher_context) + .await + .unwrap(); + assert_eq!( + scroll_resp.num_hits, + (TOTAL_NUM_HITS_INDEX_1 + TOTAL_NUM_HITS_INDEX_2) as u64 + ); + let expected = (0..TOTAL_NUM_HITS_INDEX_2) + .rev() + .zip(std::iter::repeat("split2")) + .chain( + (0..TOTAL_NUM_HITS_INDEX_1) + .rev() + .zip(std::iter::repeat("split1")), + ) + .skip(page * MAX_HITS_PER_PAGE_LARGE); + for (hit, (doc_id, split)) in scroll_resp.hits.iter().zip(expected) { + assert_eq!( + hit.partial_hit.as_ref().unwrap(), + &mock_partial_hit_opt_sort_value(split, None, doc_id as u32) + ); + } + scroll_id = scroll_resp.scroll_id.unwrap(); + count_seen_hits += scroll_resp.hits.len(); + if scroll_resp.hits.is_empty() { + break; + } + } + + assert_eq!( + count_seen_hits, + TOTAL_NUM_HITS_INDEX_1 + TOTAL_NUM_HITS_INDEX_2 + ); + } + #[tokio::test] async fn test_root_search_multi_indices() -> anyhow::Result<()> { let search_request = quickwit_proto::search::SearchRequest { diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index 32a2dbdac64..3b5d9f93f39 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -59,11 +59,6 @@ pub(crate) struct ScrollContext { } impl ScrollContext { - /// Returns true if the current page in cache is incomplete. - pub fn last_page_in_cache(&self) -> bool { - self.cached_partial_hits.len() < SCROLL_BATCH_LEN - } - /// Returns as many results in cache. pub fn get_cached_partial_hits(&self, doc_range: Range) -> &[PartialHit] { if doc_range.end <= doc_range.start { @@ -85,6 +80,13 @@ impl ScrollContext { &truncated_partial_hits[..num_partial_hits] } + /// Clear cache if it wouldn't be useful, i.e. if page size is greater than SCROLL_BATCH_LEN + pub fn clear_cache_if_unneeded(&mut self) { + if self.search_request.max_hits > SCROLL_BATCH_LEN as u64 { + self.cached_partial_hits.clear(); + } + } + pub fn serialize(&self) -> Vec { let uncompressed_payload = serde_json::to_string(self).unwrap(); uncompressed_payload.as_bytes().to_vec() @@ -101,14 +103,11 @@ impl ScrollContext { pub async fn load_batch_starting_at( &mut self, start_offset: u64, + previous_last_hit: PartialHit, cluster_client: &ClusterClient, searcher_context: &SearcherContext, ) -> crate::Result { - if self.cached_partial_hits_start_offset <= start_offset && self.last_page_in_cache() { - return Ok(false); - } - self.search_request.max_hits = SCROLL_BATCH_LEN as u64; - self.search_request.start_offset = start_offset; + self.search_request.search_after = Some(previous_last_hit); let leaf_search_response: LeafSearchResponse = crate::root::search_partial_hits_phase( searcher_context, &self.indexes_metas_for_leaf_search, @@ -149,31 +148,42 @@ impl MiniKV { } } -#[derive(Serialize, Deserialize, Copy, Clone, Eq, PartialEq, Debug)] +#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug)] pub(crate) struct ScrollKeyAndStartOffset { scroll_ulid: Ulid, pub(crate) start_offset: u64, + // this is set to zero if there are no more documents pub(crate) max_hits_per_page: u32, + pub(crate) search_after: PartialHit, } impl ScrollKeyAndStartOffset { pub fn new_with_start_offset( start_offset: u64, max_hits_per_page: u32, + search_after: PartialHit, ) -> ScrollKeyAndStartOffset { let scroll_ulid: Ulid = Ulid::new(); + // technically we could only initialize search_after on first call to next_page, and use + // default() before, but that feels like partial initialization. ScrollKeyAndStartOffset { scroll_ulid, start_offset, max_hits_per_page, + search_after, } } - pub fn next_page(mut self, found_hits_in_current_page: u64) -> ScrollKeyAndStartOffset { + pub fn next_page( + mut self, + found_hits_in_current_page: u64, + last_hit: PartialHit, + ) -> ScrollKeyAndStartOffset { self.start_offset += found_hits_in_current_page; if found_hits_in_current_page < self.max_hits_per_page as u64 { self.max_hits_per_page = 0; } + self.search_after = last_hit; self } @@ -184,10 +194,12 @@ impl ScrollKeyAndStartOffset { impl ToString for ScrollKeyAndStartOffset { fn to_string(&self) -> String { - let mut payload = [0u8; 28]; + let mut payload = vec![0u8; 28]; payload[..16].copy_from_slice(&u128::from(self.scroll_ulid).to_le_bytes()); payload[16..24].copy_from_slice(&self.start_offset.to_le_bytes()); payload[24..28].copy_from_slice(&self.max_hits_per_page.to_le_bytes()); + serde_json::to_writer(&mut payload, &self.search_after) + .expect("serializing PartialHit should never fail"); BASE64_STANDARD.encode(payload) } } @@ -199,8 +211,8 @@ impl FromStr for ScrollKeyAndStartOffset { let base64_decoded: Vec = BASE64_STANDARD .decode(scroll_id_str) .map_err(|_| "scroll id is invalid base64.")?; - if base64_decoded.len() != 16 + 8 + 4 { - return Err("scroll id payload is not 8 bytes long"); + if base64_decoded.len() <= 16 + 8 + 4 { + return Err("scroll id payload is truncated"); } let (scroll_ulid_bytes, from_bytes, max_hits_bytes) = ( &base64_decoded[..16], @@ -210,10 +222,13 @@ impl FromStr for ScrollKeyAndStartOffset { let scroll_ulid = u128::from_le_bytes(scroll_ulid_bytes.try_into().unwrap()).into(); let from = u64::from_le_bytes(from_bytes.try_into().unwrap()); let max_hits = u32::from_le_bytes(max_hits_bytes.try_into().unwrap()); + let search_after = + serde_json::from_slice(&base64_decoded[28..]).map_err(|_| "scroll id is malformed")?; Ok(ScrollKeyAndStartOffset { scroll_ulid, start_offset: from, max_hits_per_page: max_hits, + search_after, }) } } @@ -222,11 +237,20 @@ impl FromStr for ScrollKeyAndStartOffset { mod tests { use std::str::FromStr; + use quickwit_proto::search::PartialHit; + use crate::scroll_context::ScrollKeyAndStartOffset; #[test] fn test_scroll_id() { - let scroll = ScrollKeyAndStartOffset::new_with_start_offset(10, 100); + let partial_hit = PartialHit { + sort_value: None, + sort_value2: None, + split_id: "split".to_string(), + segment_ord: 1, + doc_id: 2, + }; + let scroll = ScrollKeyAndStartOffset::new_with_start_offset(10, 100, partial_hit); let scroll_str = scroll.to_string(); let ser_deser_scroll = ScrollKeyAndStartOffset::from_str(&scroll_str).unwrap(); assert_eq!(scroll, ser_deser_scroll); diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index dde05550c97..faccb190f41 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -380,33 +380,21 @@ pub(crate) async fn scroll( let mut partial_hits = Vec::new(); let mut scroll_context_modified = false; - loop { - let current_doc = start_doc + partial_hits.len() as u64; - partial_hits - .extend_from_slice(scroll_context.get_cached_partial_hits(current_doc..end_doc)); - if partial_hits.len() as u64 >= scroll_context.max_hits_per_page { - break; - } - let cursor: u64 = start_doc + partial_hits.len() as u64; - if !scroll_context - .load_batch_starting_at(cursor, cluster_client, searcher_context) - .await? - { - break; - } + let cached_results = scroll_context.get_cached_partial_hits(start_doc..end_doc); + partial_hits.extend_from_slice(cached_results); + if (partial_hits.len() as u64) < current_scroll.max_hits_per_page as u64 { + let search_after = partial_hits + .last() + .cloned() + .unwrap_or_else(|| current_scroll.search_after.clone()); + let cursor = start_doc + partial_hits.len() as u64; + scroll_context + .load_batch_starting_at(cursor, search_after, cluster_client, searcher_context) + .await?; + partial_hits.extend_from_slice(scroll_context.get_cached_partial_hits(cursor..end_doc)); scroll_context_modified = true; } - if let Some(scroll_ttl_secs) = scroll_request.scroll_ttl_secs { - if scroll_context_modified { - let payload = scroll_context.serialize(); - let scroll_ttl = Duration::from_secs(scroll_ttl_secs as u64); - cluster_client - .put_kv(&scroll_key, &payload, scroll_ttl) - .await; - } - } - // Fetch the actual documents. let hits: Vec = fetch_docs_phase( &scroll_context.indexes_metas_for_leaf_search, @@ -417,13 +405,27 @@ pub(crate) async fn scroll( ) .await?; - let next_scroll_id = Some(current_scroll.next_page(hits.len() as u64)); + let next_scroll_id = current_scroll.next_page( + hits.len() as u64, + partial_hits.last().cloned().unwrap_or_default(), + ); + + if let Some(scroll_ttl_secs) = scroll_request.scroll_ttl_secs { + if scroll_context_modified { + scroll_context.clear_cache_if_unneeded(); + let payload = scroll_context.serialize(); + let scroll_ttl = Duration::from_secs(scroll_ttl_secs as u64); + cluster_client + .put_kv(&scroll_key, &payload, scroll_ttl) + .await; + } + } Ok(SearchResponse { hits, num_hits: scroll_context.total_num_hits, elapsed_time_micros: start.elapsed().as_micros() as u64, - scroll_id: next_scroll_id.as_ref().map(ToString::to_string), + scroll_id: Some(next_scroll_id.to_string()), errors: Vec::new(), aggregation: None, })