From be99a195d48071656579744d553911814975989a Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 6 Jul 2023 17:40:42 +0900 Subject: [PATCH] mini step --- quickwit/quickwit-search/src/root.rs | 31 +++---------------- .../quickwit-search/src/scroll_context.rs | 8 ++++- quickwit/quickwit-search/src/service.rs | 11 +++++-- 3 files changed, 21 insertions(+), 29 deletions(-) diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 2e00487f9cc..9e4ca6cbe4b 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -239,36 +239,13 @@ pub(crate) fn validate_request( #[instrument(skip(search_request, cluster_client))] pub async fn root_search_aux( searcher_context: &SearcherContext, - mut search_request: SearchRequest, + search_request: SearchRequest, index_uri: &Uri, doc_mapper: Arc, - query_ast: &QueryAst, + query_ast_resolved: QueryAst, split_metadatas: Vec, cluster_client: &ClusterClient, ) -> crate::Result { - validate_request(&*doc_mapper, &search_request)?; - - let query_ast: QueryAst = serde_json::from_str(&search_request.query_ast) - .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; - - let query_ast_resolved = query_ast.parse_user_query(doc_mapper.default_search_fields())?; - - if let Some(timestamp_field) = doc_mapper.timestamp_field_name() { - refine_start_end_timestamp_from_ast( - &query_ast_resolved, - timestamp_field, - &mut search_request.start_timestamp, - &mut search_request.end_timestamp, - ); - } - - // Validates the query by effectively building it against the current schema. - doc_mapper.query(doc_mapper.schema(), &query_ast_resolved, true)?; - - search_request.query_ast = serde_json::to_string(&query_ast_resolved).map_err(|err| { - SearchError::InternalError(format!("Failed to serialize query ast: Cause {err}")) - })?; - let doc_mapper_str = serde_json::to_string(&*doc_mapper).map_err(|err| { SearchError::InternalError(format!("Failed to serialize doc mapper: Cause {err}")) })?; @@ -399,6 +376,8 @@ pub async fn root_search_aux( query_ast: query_ast_resolved, split_metadatas, search_request: simplify_search_request_for_scroll_api(&search_request), + cached_partial_hits_start_offset: search_request.start_offset, + cached_partial_hits: Vec::new(), }; let scroll_key_and_start_offset: ScrollKeyAndStartOffset = ScrollKeyAndStartOffset::new_with_start_offset(scroll_ctx.search_request.start_offset); @@ -512,7 +491,7 @@ pub async fn root_search( search_request, &index_config.index_uri, doc_mapper.clone(), - &query_ast_resolved, + query_ast_resolved, split_metadatas, cluster_client, ) diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index afcf7f7903e..17d6aeb6014 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -27,7 +27,7 @@ use base64::Engine; use quickwit_common::uri::Uri; use quickwit_doc_mapper::DocMapper; use quickwit_metastore::SplitMetadata; -use quickwit_proto::SearchRequest; +use quickwit_proto::{PartialHit, SearchRequest}; use quickwit_query::query_ast::QueryAst; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; @@ -50,9 +50,15 @@ pub(crate) struct ScrollContext { pub index_uri: Uri, pub doc_mapper: Arc, pub query_ast: QueryAst, + pub cached_partial_hits_start_offset: u64, + pub cached_partial_hits: Vec, } impl ScrollContext { + pub fn advance(&mut self, num_hits: u64) { + self.search_request.start_offset += num_hits; + } + pub fn serialize(&self) -> Vec { let uncompressed_payload = postcard::to_allocvec(self).unwrap(); // TODO add compression diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index f6e746fab08..818799b46ab 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -31,7 +31,7 @@ use quickwit_metastore::Metastore; use quickwit_proto::{ FetchDocsRequest, FetchDocsResponse, GetKvRequest, LeafListTermsRequest, LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, LeafSearchStreamResponse, - ListTermsRequest, ListTermsResponse, PutKvRequest, ScrollRequest, SearchRequest, + ListTermsRequest, ListTermsResponse, PartialHit, PutKvRequest, ScrollRequest, SearchRequest, SearchResponse, SearchStreamRequest, }; use quickwit_storage::{Cache, MemorySizedCache, QuickwitCache, StorageResolver}; @@ -49,6 +49,8 @@ use crate::{ SearchError, }; +const SCROLL_CACHE_CAPACITY: usize = 1_000; + #[derive(Clone)] /// The search service implementation. pub struct SearchServiceImpl { @@ -311,15 +313,20 @@ impl SearchService for SearchServiceImpl { query_ast, split_metadatas, mut search_request, + cached_partial_hits, + cached_partial_hits_start_offset, } = ScrollContext::load(&payload) .map_err(|_| SearchError::InternalError("Corrupted scroll context.".to_string()))?; let start_offset = std::cmp::max(search_request.start_offset, request_start_offset); + let offset_range = start_offset..start_offset + search_request.max_hits; + let partial_hits: &[PartialHit] = + &cached_partial_hits[offset_range.start as usize..offset_range.end as usize]; let mut search_response = root_search_aux( &self.searcher_context, search_request, &index_uri, doc_mapper, - &query_ast, + query_ast, split_metadatas, &self.cluster_client, )