Skip to content

Commit

Permalink
use search_after in scroll (#4280)
Browse files Browse the repository at this point in the history
* use search_after in scroll

* truncate more documents of scroll cache

* send search_after key as part of scroll id
  • Loading branch information
trinity-1686a authored Jan 25, 2024
1 parent 9eef3bb commit ca38897
Show file tree
Hide file tree
Showing 3 changed files with 293 additions and 52 deletions.
235 changes: 225 additions & 10 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,9 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result<
// We remove the scroll ttl parameter. It is irrelevant to process later request
scroll_ttl_secs: None,
search_after: None,
count_hits: req.count_hits,
// request is simplified after initial query, and we cache the hit count, so we don't need
// to recompute it afterward.
count_hits: quickwit_proto::search::CountHits::Underestimate as i32,
})
}

Expand Down Expand Up @@ -526,7 +528,7 @@ async fn search_partial_hits_phase_with_scroll(
// This is a scroll request.
//
// We increase max hits to add populate the scroll cache.
search_request.max_hits = SCROLL_BATCH_LEN as u64;
search_request.max_hits = search_request.max_hits.max(SCROLL_BATCH_LEN as u64);
search_request.scroll_ttl_secs = None;
let mut leaf_search_resp = search_partial_hits_phase(
searcher_context,
Expand All @@ -538,10 +540,15 @@ async fn search_partial_hits_phase_with_scroll(
.await?;
let cached_partial_hits = leaf_search_resp.partial_hits.clone();
leaf_search_resp.partial_hits.truncate(max_hits as usize);
let last_hit = leaf_search_resp
.partial_hits
.last()
.cloned()
.unwrap_or_default();

let scroll_context_search_request =
simplify_search_request_for_scroll_api(&search_request)?;
let scroll_ctx = ScrollContext {
let mut scroll_ctx = ScrollContext {
indexes_metas_for_leaf_search: indexes_metas_for_leaf_search.clone(),
split_metadatas: split_metadatas.to_vec(),
search_request: scroll_context_search_request,
Expand All @@ -554,9 +561,11 @@ async fn search_partial_hits_phase_with_scroll(
ScrollKeyAndStartOffset::new_with_start_offset(
scroll_ctx.search_request.start_offset,
max_hits as u32,
last_hit.clone(),
)
.next_page(leaf_search_resp.partial_hits.len() as u64);
.next_page(leaf_search_resp.partial_hits.len() as u64, last_hit);

scroll_ctx.clear_cache_if_unneeded();
let payload: Vec<u8> = scroll_ctx.serialize();
let scroll_key = scroll_key_and_start_offset.scroll_key();
cluster_client
Expand Down Expand Up @@ -3714,7 +3723,11 @@ mod tests {
assert_eq!(timestamp_range_extractor.end_timestamp, Some(1620283880));
}

fn create_search_resp(index_uri: &str, hit_range: Range<usize>) -> LeafSearchResponse {
fn create_search_resp(
index_uri: &str,
hit_range: Range<usize>,
search_after: Option<PartialHit>,
) -> LeafSearchResponse {
let (num_total_hits, split_id) = match index_uri {
"ram:///test-index-1" => (TOTAL_NUM_HITS_INDEX_1, "split1"),
"ram:///test-index-2" => (TOTAL_NUM_HITS_INDEX_2, "split2"),
Expand All @@ -3723,6 +3736,17 @@ mod tests {

let doc_ids = (0..num_total_hits)
.rev()
.filter(|elem| {
if let Some(search_after) = &search_after {
if split_id == search_after.split_id {
*elem < (search_after.doc_id as usize)
} else {
split_id < search_after.split_id.as_str()
}
} else {
true
}
})
.skip(hit_range.start)
.take(hit_range.end - hit_range.start);
quickwit_proto::search::LeafSearchResponse {
Expand All @@ -3738,6 +3762,7 @@ mod tests {
const TOTAL_NUM_HITS_INDEX_1: usize = 2_005;
const TOTAL_NUM_HITS_INDEX_2: usize = 10;
const MAX_HITS_PER_PAGE: usize = 93;
const MAX_HITS_PER_PAGE_LARGE: usize = 1_005;

#[tokio::test]
async fn test_root_search_with_scroll() {
Expand Down Expand Up @@ -3770,43 +3795,49 @@ mod tests {
let mut mock_search_service = MockSearchService::new();
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req: &SearchRequest = req.search_request.as_ref().unwrap();
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN);
assert!(search_req.search_after.is_none());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req: &SearchRequest = req.search_request.as_ref().unwrap();
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, 2 * SCROLL_BATCH_LEN);
assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req: &SearchRequest = req.search_request.as_ref().unwrap();
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, 3 * SCROLL_BATCH_LEN);
assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
Expand Down Expand Up @@ -3917,6 +3948,190 @@ mod tests {
);
}

#[tokio::test]
async fn test_root_search_with_scroll_large_page() {
let mut metastore = MetastoreServiceClient::mock();
let index_metadata = IndexMetadata::for_test("test-index-1", "ram:///test-index-1");
let index_uid = index_metadata.index_uid.clone();
let index_metadata_2 = IndexMetadata::for_test("test-index-2", "ram:///test-index-2");
let index_uid_2 = index_metadata_2.index_uid.clone();
metastore
.expect_list_indexes_metadata()
.returning(move |_index_ids_query| {
let indexes_metadata = vec![index_metadata.clone(), index_metadata_2.clone()];
Ok(
ListIndexesMetadataResponse::try_from_indexes_metadata(indexes_metadata)
.unwrap(),
)
});
metastore.expect_list_splits().returning(move |_filter| {
let splits = vec![
MockSplitBuilder::new("split1")
.with_index_uid(&index_uid)
.build(),
MockSplitBuilder::new("split2")
.with_index_uid(&index_uid_2)
.build(),
];
let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap();
Ok(ServiceStream::from(vec![Ok(splits_response)]))
});
let mut mock_search_service = MockSearchService::new();
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE);
assert!(search_req.search_after.is_none());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
let kv: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>> = Default::default();
let kv_clone = kv.clone();
mock_search_service
.expect_put_kv()
.returning(move |put_kv_req| {
kv_clone
.write()
.unwrap()
.insert(put_kv_req.key, put_kv_req.payload);
});
mock_search_service
.expect_get_kv()
.returning(move |get_kv_req| kv.read().unwrap().get(&get_kv_req.key).cloned());
mock_search_service.expect_fetch_docs().returning(
|fetch_docs_req: quickwit_proto::search::FetchDocsRequest| {
assert!(fetch_docs_req.partial_hits.len() <= MAX_HITS_PER_PAGE_LARGE);
Ok(quickwit_proto::search::FetchDocsResponse {
hits: get_doc_for_fetch_req(fetch_docs_req),
})
},
);
let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service)]);
let search_job_placer = SearchJobPlacer::new(searcher_pool);
let searcher_context = SearcherContext::for_test();
let cluster_client = ClusterClient::new(search_job_placer.clone());

let mut count_seen_hits = 0;

let mut scroll_id: String = {
let search_request = quickwit_proto::search::SearchRequest {
index_id_patterns: vec!["test-index-*".to_string()],
query_ast: qast_json_helper("test", &["body"]),
max_hits: MAX_HITS_PER_PAGE_LARGE as u64,
scroll_ttl_secs: Some(60),
..Default::default()
};
let search_response = root_search(
&searcher_context,
search_request,
MetastoreServiceClient::from(metastore),
&cluster_client,
)
.await
.unwrap();
assert_eq!(
search_response.num_hits,
(TOTAL_NUM_HITS_INDEX_1 + TOTAL_NUM_HITS_INDEX_2) as u64
);
assert_eq!(search_response.hits.len(), MAX_HITS_PER_PAGE_LARGE);
let expected = (0..TOTAL_NUM_HITS_INDEX_2)
.rev()
.zip(std::iter::repeat("split2"))
.chain(
(0..TOTAL_NUM_HITS_INDEX_1)
.rev()
.zip(std::iter::repeat("split1")),
);
for (hit, (doc_id, split)) in search_response.hits.iter().zip(expected) {
assert_eq!(
hit.partial_hit.as_ref().unwrap(),
&mock_partial_hit_opt_sort_value(split, None, doc_id as u32)
);
}
count_seen_hits += search_response.hits.len();
search_response.scroll_id.unwrap()
};
for page in 1.. {
let scroll_req = ScrollRequest {
scroll_id,
scroll_ttl_secs: Some(60),
};
let scroll_resp =
crate::service::scroll(scroll_req, &cluster_client, &searcher_context)
.await
.unwrap();
assert_eq!(
scroll_resp.num_hits,
(TOTAL_NUM_HITS_INDEX_1 + TOTAL_NUM_HITS_INDEX_2) as u64
);
let expected = (0..TOTAL_NUM_HITS_INDEX_2)
.rev()
.zip(std::iter::repeat("split2"))
.chain(
(0..TOTAL_NUM_HITS_INDEX_1)
.rev()
.zip(std::iter::repeat("split1")),
)
.skip(page * MAX_HITS_PER_PAGE_LARGE);
for (hit, (doc_id, split)) in scroll_resp.hits.iter().zip(expected) {
assert_eq!(
hit.partial_hit.as_ref().unwrap(),
&mock_partial_hit_opt_sort_value(split, None, doc_id as u32)
);
}
scroll_id = scroll_resp.scroll_id.unwrap();
count_seen_hits += scroll_resp.hits.len();
if scroll_resp.hits.is_empty() {
break;
}
}

assert_eq!(
count_seen_hits,
TOTAL_NUM_HITS_INDEX_1 + TOTAL_NUM_HITS_INDEX_2
);
}

#[tokio::test]
async fn test_root_search_multi_indices() -> anyhow::Result<()> {
let search_request = quickwit_proto::search::SearchRequest {
Expand Down
Loading

0 comments on commit ca38897

Please sign in to comment.