Skip to content

Commit

Permalink
mini step
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jul 7, 2023
1 parent 5a7fa16 commit be99a19
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 29 deletions.
31 changes: 5 additions & 26 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,36 +239,13 @@ pub(crate) fn validate_request(
#[instrument(skip(search_request, cluster_client))]
pub async fn root_search_aux(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
search_request: SearchRequest,
index_uri: &Uri,
doc_mapper: Arc<dyn DocMapper>,
query_ast: &QueryAst,
query_ast_resolved: QueryAst,
split_metadatas: Vec<SplitMetadata>,
cluster_client: &ClusterClient,
) -> crate::Result<SearchResponse> {
validate_request(&*doc_mapper, &search_request)?;

let query_ast: QueryAst = serde_json::from_str(&search_request.query_ast)
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

let query_ast_resolved = query_ast.parse_user_query(doc_mapper.default_search_fields())?;

if let Some(timestamp_field) = doc_mapper.timestamp_field_name() {
refine_start_end_timestamp_from_ast(
&query_ast_resolved,
timestamp_field,
&mut search_request.start_timestamp,
&mut search_request.end_timestamp,
);
}

// Validates the query by effectively building it against the current schema.
doc_mapper.query(doc_mapper.schema(), &query_ast_resolved, true)?;

search_request.query_ast = serde_json::to_string(&query_ast_resolved).map_err(|err| {
SearchError::InternalError(format!("Failed to serialize query ast: Cause {err}"))
})?;

let doc_mapper_str = serde_json::to_string(&*doc_mapper).map_err(|err| {
SearchError::InternalError(format!("Failed to serialize doc mapper: Cause {err}"))
})?;
Expand Down Expand Up @@ -399,6 +376,8 @@ pub async fn root_search_aux(
query_ast: query_ast_resolved,
split_metadatas,
search_request: simplify_search_request_for_scroll_api(&search_request),
cached_partial_hits_start_offset: search_request.start_offset,
cached_partial_hits: Vec::new(),
};
let scroll_key_and_start_offset: ScrollKeyAndStartOffset =
ScrollKeyAndStartOffset::new_with_start_offset(scroll_ctx.search_request.start_offset);
Expand Down Expand Up @@ -512,7 +491,7 @@ pub async fn root_search(
search_request,
&index_config.index_uri,
doc_mapper.clone(),
&query_ast_resolved,
query_ast_resolved,
split_metadatas,
cluster_client,
)
Expand Down
8 changes: 7 additions & 1 deletion quickwit/quickwit-search/src/scroll_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use base64::Engine;
use quickwit_common::uri::Uri;
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::SplitMetadata;
use quickwit_proto::SearchRequest;
use quickwit_proto::{PartialHit, SearchRequest};
use quickwit_query::query_ast::QueryAst;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
Expand All @@ -50,9 +50,15 @@ pub(crate) struct ScrollContext {
pub index_uri: Uri,
pub doc_mapper: Arc<dyn DocMapper>,
pub query_ast: QueryAst,
pub cached_partial_hits_start_offset: u64,
pub cached_partial_hits: Vec<PartialHit>,
}

impl ScrollContext {
pub fn advance(&mut self, num_hits: u64) {
self.search_request.start_offset += num_hits;
}

pub fn serialize(&self) -> Vec<u8> {
let uncompressed_payload = postcard::to_allocvec(self).unwrap();
// TODO add compression
Expand Down
11 changes: 9 additions & 2 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use quickwit_metastore::Metastore;
use quickwit_proto::{
FetchDocsRequest, FetchDocsResponse, GetKvRequest, LeafListTermsRequest, LeafListTermsResponse,
LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, LeafSearchStreamResponse,
ListTermsRequest, ListTermsResponse, PutKvRequest, ScrollRequest, SearchRequest,
ListTermsRequest, ListTermsResponse, PartialHit, PutKvRequest, ScrollRequest, SearchRequest,
SearchResponse, SearchStreamRequest,
};
use quickwit_storage::{Cache, MemorySizedCache, QuickwitCache, StorageResolver};
Expand All @@ -49,6 +49,8 @@ use crate::{
SearchError,
};

const SCROLL_CACHE_CAPACITY: usize = 1_000;

#[derive(Clone)]
/// The search service implementation.
pub struct SearchServiceImpl {
Expand Down Expand Up @@ -311,15 +313,20 @@ impl SearchService for SearchServiceImpl {
query_ast,
split_metadatas,
mut search_request,
cached_partial_hits,
cached_partial_hits_start_offset,
} = ScrollContext::load(&payload)
.map_err(|_| SearchError::InternalError("Corrupted scroll context.".to_string()))?;
let start_offset = std::cmp::max(search_request.start_offset, request_start_offset);
let offset_range = start_offset..start_offset + search_request.max_hits;
let partial_hits: &[PartialHit] =
&cached_partial_hits[offset_range.start as usize..offset_range.end as usize];
let mut search_response = root_search_aux(
&self.searcher_context,
search_request,
&index_uri,
doc_mapper,
&query_ast,
query_ast,
split_metadatas,
&self.cluster_client,
)
Expand Down

0 comments on commit be99a19

Please sign in to comment.