Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use search_after in scroll #4280

Merged
merged 7 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 225 additions & 10 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,9 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result<
// We remove the scroll ttl parameter. It is irrelevant to process later request
scroll_ttl_secs: None,
search_after: None,
count_hits: req.count_hits,
// request is simplified after initial query, and we cache the hit count, so we don't need
// to recompute it afterward.
count_hits: quickwit_proto::search::CountHits::Underestimate as i32,
})
}

Expand Down Expand Up @@ -526,7 +528,7 @@ async fn search_partial_hits_phase_with_scroll(
// This is a scroll request.
//
// We increase max hits to add populate the scroll cache.
search_request.max_hits = SCROLL_BATCH_LEN as u64;
search_request.max_hits = search_request.max_hits.max(SCROLL_BATCH_LEN as u64);
search_request.scroll_ttl_secs = None;
let mut leaf_search_resp = search_partial_hits_phase(
searcher_context,
Expand All @@ -538,10 +540,15 @@ async fn search_partial_hits_phase_with_scroll(
.await?;
let cached_partial_hits = leaf_search_resp.partial_hits.clone();
leaf_search_resp.partial_hits.truncate(max_hits as usize);
let last_hit = leaf_search_resp
.partial_hits
.last()
.cloned()
.unwrap_or_default();

let scroll_context_search_request =
simplify_search_request_for_scroll_api(&search_request)?;
let scroll_ctx = ScrollContext {
let mut scroll_ctx = ScrollContext {
indexes_metas_for_leaf_search: indexes_metas_for_leaf_search.clone(),
split_metadatas: split_metadatas.to_vec(),
search_request: scroll_context_search_request,
Expand All @@ -554,9 +561,11 @@ async fn search_partial_hits_phase_with_scroll(
ScrollKeyAndStartOffset::new_with_start_offset(
scroll_ctx.search_request.start_offset,
max_hits as u32,
last_hit.clone(),
)
.next_page(leaf_search_resp.partial_hits.len() as u64);
.next_page(leaf_search_resp.partial_hits.len() as u64, last_hit);

scroll_ctx.clear_cache_if_unneeded();
let payload: Vec<u8> = scroll_ctx.serialize();
let scroll_key = scroll_key_and_start_offset.scroll_key();
cluster_client
Expand Down Expand Up @@ -3699,7 +3708,11 @@ mod tests {
assert_eq!(timestamp_range_extractor.end_timestamp, Some(1620283880));
}

fn create_search_resp(index_uri: &str, hit_range: Range<usize>) -> LeafSearchResponse {
fn create_search_resp(
index_uri: &str,
hit_range: Range<usize>,
search_after: Option<PartialHit>,
) -> LeafSearchResponse {
let (num_total_hits, split_id) = match index_uri {
"ram:///test-index-1" => (TOTAL_NUM_HITS_INDEX_1, "split1"),
"ram:///test-index-2" => (TOTAL_NUM_HITS_INDEX_2, "split2"),
Expand All @@ -3708,6 +3721,17 @@ mod tests {

let doc_ids = (0..num_total_hits)
.rev()
.filter(|elem| {
if let Some(search_after) = &search_after {
if split_id == search_after.split_id {
*elem < (search_after.doc_id as usize)
} else {
split_id < search_after.split_id.as_str()
}
} else {
true
}
})
.skip(hit_range.start)
.take(hit_range.end - hit_range.start);
quickwit_proto::search::LeafSearchResponse {
Expand All @@ -3723,6 +3747,7 @@ mod tests {
const TOTAL_NUM_HITS_INDEX_1: usize = 2_005;
const TOTAL_NUM_HITS_INDEX_2: usize = 10;
const MAX_HITS_PER_PAGE: usize = 93;
const MAX_HITS_PER_PAGE_LARGE: usize = 1_005;

#[tokio::test]
async fn test_root_search_with_scroll() {
Expand Down Expand Up @@ -3755,43 +3780,49 @@ mod tests {
let mut mock_search_service = MockSearchService::new();
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req: &SearchRequest = req.search_request.as_ref().unwrap();
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN);
assert!(search_req.search_after.is_none());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req: &SearchRequest = req.search_request.as_ref().unwrap();
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, 2 * SCROLL_BATCH_LEN);
assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req: &SearchRequest = req.search_request.as_ref().unwrap();
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, 3 * SCROLL_BATCH_LEN);
assert_eq!(search_req.max_hits as usize, SCROLL_BATCH_LEN);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
Expand Down Expand Up @@ -3902,6 +3933,190 @@ mod tests {
);
}

#[tokio::test]
async fn test_root_search_with_scroll_large_page() {
let mut metastore = MetastoreServiceClient::mock();
let index_metadata = IndexMetadata::for_test("test-index-1", "ram:///test-index-1");
let index_uid = index_metadata.index_uid.clone();
let index_metadata_2 = IndexMetadata::for_test("test-index-2", "ram:///test-index-2");
let index_uid_2 = index_metadata_2.index_uid.clone();
metastore
.expect_list_indexes_metadata()
.returning(move |_index_ids_query| {
let indexes_metadata = vec![index_metadata.clone(), index_metadata_2.clone()];
Ok(
ListIndexesMetadataResponse::try_from_indexes_metadata(indexes_metadata)
.unwrap(),
)
});
metastore.expect_list_splits().returning(move |_filter| {
let splits = vec![
MockSplitBuilder::new("split1")
.with_index_uid(&index_uid)
.build(),
MockSplitBuilder::new("split2")
.with_index_uid(&index_uid_2)
.build(),
];
let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap();
Ok(ServiceStream::from(vec![Ok(splits_response)]))
});
let mut mock_search_service = MockSearchService::new();
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE);
assert!(search_req.search_after.is_none());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
mock_search_service.expect_leaf_search().times(2).returning(
|req: quickwit_proto::search::LeafSearchRequest| {
let search_req = req.search_request.unwrap();
// the leaf request does not need to know about the scroll_ttl.
assert_eq!(search_req.start_offset, 0u64);
assert!(search_req.scroll_ttl_secs.is_none());
assert_eq!(search_req.max_hits as usize, MAX_HITS_PER_PAGE_LARGE);
assert!(search_req.search_after.is_some());
Ok(create_search_resp(
&req.index_uri,
search_req.start_offset as usize
..(search_req.start_offset + search_req.max_hits) as usize,
search_req.search_after,
))
},
);
let kv: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>> = Default::default();
let kv_clone = kv.clone();
mock_search_service
.expect_put_kv()
.returning(move |put_kv_req| {
kv_clone
.write()
.unwrap()
.insert(put_kv_req.key, put_kv_req.payload);
});
mock_search_service
.expect_get_kv()
.returning(move |get_kv_req| kv.read().unwrap().get(&get_kv_req.key).cloned());
mock_search_service.expect_fetch_docs().returning(
|fetch_docs_req: quickwit_proto::search::FetchDocsRequest| {
assert!(fetch_docs_req.partial_hits.len() <= MAX_HITS_PER_PAGE_LARGE);
Ok(quickwit_proto::search::FetchDocsResponse {
hits: get_doc_for_fetch_req(fetch_docs_req),
})
},
);
let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service)]);
let search_job_placer = SearchJobPlacer::new(searcher_pool);
let searcher_context = SearcherContext::for_test();
let cluster_client = ClusterClient::new(search_job_placer.clone());

let mut count_seen_hits = 0;

let mut scroll_id: String = {
let search_request = quickwit_proto::search::SearchRequest {
index_id_patterns: vec!["test-index-*".to_string()],
query_ast: qast_json_helper("test", &["body"]),
max_hits: MAX_HITS_PER_PAGE_LARGE as u64,
scroll_ttl_secs: Some(60),
..Default::default()
};
let search_response = root_search(
&searcher_context,
search_request,
MetastoreServiceClient::from(metastore),
&cluster_client,
)
.await
.unwrap();
assert_eq!(
search_response.num_hits,
(TOTAL_NUM_HITS_INDEX_1 + TOTAL_NUM_HITS_INDEX_2) as u64
);
assert_eq!(search_response.hits.len(), MAX_HITS_PER_PAGE_LARGE);
let expected = (0..TOTAL_NUM_HITS_INDEX_2)
.rev()
.zip(std::iter::repeat("split2"))
.chain(
(0..TOTAL_NUM_HITS_INDEX_1)
.rev()
.zip(std::iter::repeat("split1")),
);
for (hit, (doc_id, split)) in search_response.hits.iter().zip(expected) {
assert_eq!(
hit.partial_hit.as_ref().unwrap(),
&mock_partial_hit_opt_sort_value(split, None, doc_id as u32)
);
}
count_seen_hits += search_response.hits.len();
search_response.scroll_id.unwrap()
};
for page in 1.. {
let scroll_req = ScrollRequest {
scroll_id,
scroll_ttl_secs: Some(60),
};
let scroll_resp =
crate::service::scroll(scroll_req, &cluster_client, &searcher_context)
.await
.unwrap();
assert_eq!(
scroll_resp.num_hits,
(TOTAL_NUM_HITS_INDEX_1 + TOTAL_NUM_HITS_INDEX_2) as u64
);
let expected = (0..TOTAL_NUM_HITS_INDEX_2)
.rev()
.zip(std::iter::repeat("split2"))
.chain(
(0..TOTAL_NUM_HITS_INDEX_1)
.rev()
.zip(std::iter::repeat("split1")),
)
.skip(page * MAX_HITS_PER_PAGE_LARGE);
for (hit, (doc_id, split)) in scroll_resp.hits.iter().zip(expected) {
assert_eq!(
hit.partial_hit.as_ref().unwrap(),
&mock_partial_hit_opt_sort_value(split, None, doc_id as u32)
);
}
scroll_id = scroll_resp.scroll_id.unwrap();
count_seen_hits += scroll_resp.hits.len();
if scroll_resp.hits.is_empty() {
break;
}
}

assert_eq!(
count_seen_hits,
TOTAL_NUM_HITS_INDEX_1 + TOTAL_NUM_HITS_INDEX_2
);
}

#[tokio::test]
async fn test_root_search_multi_indices() -> anyhow::Result<()> {
let search_request = quickwit_proto::search::SearchRequest {
Expand Down
Loading