diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 09be8f99d91..a6e99be6ecf 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5073,6 +5073,7 @@ dependencies = [ "anyhow", "assert-json-diff 2.0.2", "async-trait", + "base64 0.21.2", "bytes", "chitchat", "fnv", @@ -5112,6 +5113,8 @@ dependencies = [ "tower", "tracing", "tracing-opentelemetry", + "ttl_cache", + "ulid", "utoipa", ] @@ -5129,6 +5132,7 @@ dependencies = [ "futures", "futures-util", "http-serde", + "humantime", "hyper", "itertools", "mime_guess", @@ -7232,6 +7236,15 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "ttl_cache" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4189890526f0168710b6ee65ceaedf1460c48a14318ceec933cb26baa492096a" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "tungstenite" version = "0.18.0" diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 181649791e9..329d195d394 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -172,6 +172,7 @@ tracing-subscriber = { version = "0.3.16", features = [ "std", "env-filter", ] } +ttl_cache = "0.5" typetag = "0.2" ulid = "1.0" username = "0.2" diff --git a/quickwit/quickwit-proto/protos/quickwit/search_api.proto b/quickwit/quickwit-proto/protos/quickwit/search_api.proto index 2b5485c5e26..7154d0530c5 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search_api.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search_api.proto @@ -59,6 +59,30 @@ service SearchService { // it to other nodes. // - it should be applied on the given subset of splits rpc LeafListTerms(LeafListTermsRequest) returns (LeafListTermsResponse); + + rpc Scroll(ScrollRequest) returns (SearchResponse); + rpc PutKV(PutKVRequest) returns (PutKVResponse); + rpc GetKV(GetKVRequest) returns (GetKVResponse); +} + +message ScrollRequest { + string scroll_id = 1; +} + +message PutKVRequest { + bytes key = 1; + bytes payload = 2; + uint32 ttl_secs = 3; +} + +message PutKVResponse {} + +message GetKVRequest { + bytes key = 1; +} + +message GetKVResponse { + optional bytes payload = 1; } // -- Search ------------------- @@ -110,6 +134,11 @@ message SearchRequest { // Fields to extract snippet on repeated string snippet_fields = 12; + + // If set, the search response will include a search id + // that will make it possible to paginate through the results + // in a consistent manner. + optional uint32 scroll_ttl_secs = 14; } enum SortOrder { @@ -134,6 +163,8 @@ message SearchResponse { // Serialized aggregation response optional string aggregation = 5; + // Scroll Id (only set if scroll_secs was set in the request) + optional string scroll_id = 6; } message SplitSearchError { diff --git a/quickwit/quickwit-proto/src/quickwit.rs b/quickwit/quickwit-proto/src/quickwit.rs index b8646a4f9ab..666d7e6a93e 100644 --- a/quickwit/quickwit-proto/src/quickwit.rs +++ b/quickwit/quickwit-proto/src/quickwit.rs @@ -1,4 +1,40 @@ #[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ScrollRequest { + #[prost(string, tag = "1")] + pub scroll_id: ::prost::alloc::string::String, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PutKvRequest { + #[prost(bytes = "vec", tag = "1")] + pub key: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "2")] + pub payload: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "3")] + pub ttl_secs: u32, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PutKvResponse {} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetKvRequest { + #[prost(bytes = "vec", tag = "1")] + pub key: ::prost::alloc::vec::Vec, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetKvResponse { + #[prost(bytes = "vec", optional, tag = "1")] + pub payload: ::core::option::Option<::prost::alloc::vec::Vec>, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] #[derive(Eq, Hash)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -42,6 +78,11 @@ pub struct SearchRequest { /// Fields to extract snippet on #[prost(string, repeated, tag = "12")] pub snippet_fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// If set, the search response will include a search id + /// that will make it possible to paginate through the results + /// in a consistent manner. + #[prost(uint32, optional, tag = "14")] + pub scroll_ttl_secs: ::core::option::Option, } #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -63,6 +104,9 @@ pub struct SearchResponse { /// Serialized aggregation response #[prost(string, optional, tag = "5")] pub aggregation: ::core::option::Option<::prost::alloc::string::String>, + /// Scroll Id (only set if scroll_secs was set in the request) + #[prost(string, optional, tag = "6")] + pub scroll_id: ::core::option::Option<::prost::alloc::string::String>, } #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -734,6 +778,72 @@ pub mod search_service_client { .insert(GrpcMethod::new("quickwit.SearchService", "LeafListTerms")); self.inner.unary(req, path, codec).await } + pub async fn scroll( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.SearchService/Scroll", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("quickwit.SearchService", "Scroll")); + self.inner.unary(req, path, codec).await + } + pub async fn put_kv( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.SearchService/PutKV", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("quickwit.SearchService", "PutKV")); + self.inner.unary(req, path, codec).await + } + pub async fn get_kv( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.SearchService/GetKV", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("quickwit.SearchService", "GetKV")); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -817,6 +927,18 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; + async fn scroll( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn put_kv( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn get_kv( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct SearchServiceServer { @@ -1169,6 +1291,136 @@ pub mod search_service_server { }; Box::pin(fut) } + "/quickwit.SearchService/Scroll" => { + #[allow(non_camel_case_types)] + struct ScrollSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::UnaryService + for ScrollSvc { + type Response = super::SearchResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { (*inner).scroll(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = ScrollSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/quickwit.SearchService/PutKV" => { + #[allow(non_camel_case_types)] + struct PutKVSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::UnaryService for PutKVSvc { + type Response = super::PutKvResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { (*inner).put_kv(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = PutKVSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/quickwit.SearchService/GetKV" => { + #[allow(non_camel_case_types)] + struct GetKVSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::UnaryService for GetKVSvc { + type Response = super::GetKvResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { (*inner).get_kv(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = GetKVSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { Ok( diff --git a/quickwit/quickwit-search/Cargo.toml b/quickwit/quickwit-search/Cargo.toml index 82a5576af9e..53924ac1b54 100644 --- a/quickwit/quickwit-search/Cargo.toml +++ b/quickwit/quickwit-search/Cargo.toml @@ -12,6 +12,7 @@ documentation = "https://quickwit.io/docs/" [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +base64 = { workspace = true } bytes = { workspace = true } fnv = { workspace = true } futures = { workspace = true } @@ -33,9 +34,11 @@ tokio = { workspace = true } tokio-stream = { workspace = true } tokio-util = { workspace = true } tower = { workspace = true } +ttl_cache = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } opentelemetry = { workspace = true } +ulid = { workspace = true } utoipa = { workspace = true } quickwit-cluster = { workspace = true } diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index 2c3c6fbcb4b..32c8e12189a 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -29,7 +29,9 @@ use quickwit_config::service::QuickwitService; use quickwit_grpc_clients::service_client_pool::ServiceClient; use quickwit_proto::tonic::codegen::InterceptedService; use quickwit_proto::tonic::transport::Endpoint; -use quickwit_proto::{tonic, LeafSearchStreamResponse, SpanContextInterceptor}; +use quickwit_proto::{ + tonic, GetKvRequest, LeafSearchStreamResponse, PutKvRequest, SpanContextInterceptor, +}; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::transport::Channel; use tonic::Request; @@ -245,6 +247,38 @@ impl SearchServiceClient { SearchServiceClientImpl::Local(service) => service.leaf_list_terms(request).await, } } + + pub async fn get_kv(&mut self, get_kv_req: GetKvRequest) -> crate::Result>> { + match &mut self.client_impl { + SearchServiceClientImpl::Local(service) => { + let search_after_context_opt = service.get_kv(get_kv_req).await; + Ok(search_after_context_opt) + } + SearchServiceClientImpl::Grpc(grpc_client) => { + let grpc_resp: tonic::Response = grpc_client + .get_kv(get_kv_req) + .await + .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; + let get_search_after_context_resp = grpc_resp.into_inner(); + Ok(get_search_after_context_resp.payload) + } + } + } + + pub async fn put_kv(&mut self, put_kv_req: PutKvRequest) -> crate::Result<()> { + match &mut self.client_impl { + SearchServiceClientImpl::Local(service) => { + service.put_kv(put_kv_req).await; + } + SearchServiceClientImpl::Grpc(grpc_client) => { + grpc_client + .put_kv(put_kv_req) + .await + .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; + } + } + Ok(()) + } } /// Creates a [`SearchServiceClient`] from a socket address. diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index 37ffa35f37e..243e9a23c55 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -19,20 +19,24 @@ use futures::StreamExt; use quickwit_proto::{ - FetchDocsRequest, FetchDocsResponse, LeafListTermsRequest, LeafListTermsResponse, + FetchDocsRequest, FetchDocsResponse, GetKvRequest, LeafListTermsRequest, LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, LeafSearchStreamResponse, + PutKvRequest, }; use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::debug; +use tracing::{debug, error}; use crate::retry::search::LeafSearchRetryPolicy; use crate::retry::search_stream::{LeafSearchStreamRetryPolicy, SuccessfulSplitIds}; use crate::retry::{retry_client, DefaultRetryPolicy, RetryPolicy}; use crate::{SearchError, SearchJobPlacer, SearchServiceClient}; +/// We attempt to store search after context on two nodes. +const MAX_SEARCH_AFTER_REPLICATION: usize = 2; + /// Client that executes placed requests (Request, `SearchServiceClient`) and provides /// retry policies for `FetchDocsRequest`, `LeafSearchRequest` and `LeafSearchStreamRequest` /// to retry on other `SearchServiceClient`. @@ -157,6 +161,40 @@ impl ClusterClient { // TODO: implement retry client.leaf_list_terms(request.clone()).await } + + /// Attempts to store a given search context within the cluster. + /// + /// This function may fail silently, if no clients was available. + pub async fn put_kv(&self, put_request: PutKvRequest) { + let clients = self + .search_job_placer + .best_nodes_per_affinity(&put_request.key) + .await; + let mut successful_replication = 0; + for mut client in clients { + if client.put_kv(put_request.clone()).await.is_ok() { + successful_replication += 1; + if successful_replication == MAX_SEARCH_AFTER_REPLICATION { + return; + } + } + } + if successful_replication == 0 { + error!(successful_replication=%successful_replication,"failed-to-replicate-search-after"); + } + } + + /// Returns a search_after context + pub async fn get_kv(&self, key: Vec) -> Option> { + let clients = self.search_job_placer.best_nodes_per_affinity(&key).await; + let get_request = GetKvRequest { key }; + for mut client in clients.take(3) { + if let Ok(Some(search_after_resp)) = client.get_kv(get_request.clone()).await { + return Some(search_after_resp); + } + } + None + } } // Merge initial leaf search results with results obtained from a retry. diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index cf3da3404bd..2897346cae8 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -33,6 +33,7 @@ mod leaf; mod leaf_cache; mod retry; mod root; +mod search_after_cache; mod search_job_placer; mod search_response_rest; mod search_stream; @@ -266,6 +267,7 @@ pub async fn single_node_search( .iter() .map(|error| format!("{error:?}")) .collect_vec(), + scroll_id: None, }) } diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index e0eb17b4b27..ef5cf7eee59 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -18,17 +18,19 @@ // along with this program. If not, see . use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use anyhow::Context; use futures::future::try_join_all; use itertools::Itertools; +use quickwit_common::uri::Uri; use quickwit_config::{build_doc_mapper, IndexConfig}; use quickwit_doc_mapper::{DocMapper, DYNAMIC_FIELD_NAME}; use quickwit_metastore::{Metastore, SplitMetadata}; use quickwit_proto::{ FetchDocsRequest, FetchDocsResponse, Hit, LeafHit, LeafListTermsRequest, LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, ListTermsRequest, ListTermsResponse, PartialHit, - SearchRequest, SearchResponse, SplitIdAndFooterOffsets, + PutKvRequest, SearchRequest, SearchResponse, SplitIdAndFooterOffsets, }; use quickwit_query::query_ast::{ BoolQuery, QueryAst, QueryAstVisitor, RangeQuery, TermQuery, TermSetQuery, @@ -43,6 +45,7 @@ use tracing::{debug, error, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; use crate::find_trace_ids_collector::Span; +use crate::search_after_cache::{ScrollContext, ScrollKeyAndStartOffset}; use crate::search_job_placer::Job; use crate::service::SearcherContext; use crate::{ @@ -145,6 +148,27 @@ fn validate_requested_snippet_fields( Ok(()) } +fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> SearchRequest { + // We do not mutate + SearchRequest { + index_id: req.index_id.clone(), + query_ast: req.query_ast.clone(), + start_timestamp: req.start_timestamp, + end_timestamp: req.end_timestamp, + max_hits: req.max_hits, + start_offset: req.start_offset, + sort_order: req.sort_order, + sort_by_field: req.sort_by_field.clone(), + // We remove all aggregation request. + // The aggregation will not be computed for each scroll request. + aggregation_request: None, + // We remove the snippet fields. This feature is not supported for scroll requests. + snippet_fields: Vec::new(), + // We remove the scroll ttl parameter. It is irrelevant to process later request + scroll_ttl_secs: None, + } +} + fn validate_sort_by_field(field_name: &str, schema: &Schema) -> crate::Result<()> { if field_name == "_score" { return Ok(()); @@ -212,28 +236,21 @@ pub(crate) fn validate_request( /// 2. Merges the search results. /// 3. Sends fetch docs requests to multiple leaf nodes. /// 4. Builds the response with docs and returns. -#[instrument(skip(search_request, cluster_client, metastore))] -pub async fn root_search( +#[instrument(skip(search_request, cluster_client))] +pub async fn root_search_aux( searcher_context: &SearcherContext, mut search_request: SearchRequest, - metastore: &dyn Metastore, + index_uri: &Uri, + doc_mapper: Arc, + query_ast: &QueryAst, + split_metadatas: Vec, cluster_client: &ClusterClient, ) -> crate::Result { - let start_instant = tokio::time::Instant::now(); - - let index_metadata = metastore.index_metadata(&search_request.index_id).await?; - let index_uid = index_metadata.index_uid.clone(); - let index_config = index_metadata.into_index_config(); - - let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) - .map_err(|err| { - SearchError::InternalError(format!("Failed to build doc mapper. Cause: {err}")) - })?; - validate_request(&*doc_mapper, &search_request)?; let query_ast: QueryAst = serde_json::from_str(&search_request.query_ast) .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; + let query_ast_resolved = query_ast.parse_user_query(doc_mapper.default_search_fields())?; if let Some(timestamp_field) = doc_mapper.timestamp_field_name() { @@ -252,13 +269,10 @@ pub async fn root_search( SearchError::InternalError(format!("Failed to serialize query ast: Cause {err}")) })?; - let doc_mapper_str = serde_json::to_string(&doc_mapper).map_err(|err| { + let doc_mapper_str = serde_json::to_string(&*doc_mapper).map_err(|err| { SearchError::InternalError(format!("Failed to serialize doc mapper: Cause {err}")) })?; - let split_metadatas: Vec = - list_relevant_splits(index_uid, &search_request, metastore).await?; - let split_offsets_map: HashMap = split_metadatas .iter() .map(|metadata| { @@ -269,8 +283,6 @@ pub async fn root_search( }) .collect(); - let index_uri = &index_config.index_uri; - let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); let assigned_leaf_search_jobs = cluster_client @@ -374,6 +386,37 @@ pub async fn root_search( cluster_client.fetch_docs(fetch_docs_req, client) }); + let aggregation: Option = finalize_aggregation( + leaf_search_response.intermediate_aggregation_result, + aggregations, + searcher_context, + )?; + + let scroll_id = if let Some(ttl_secs) = search_request.scroll_ttl_secs.clone() { + let scroll_ctx = ScrollContext { + index_uri: index_uri.clone(), + doc_mapper, + query_ast: query_ast_resolved, + split_metadatas, + search_request: simplify_search_request_for_scroll_api(&search_request), + }; + let scroll_key_and_start_offset: ScrollKeyAndStartOffset = + ScrollKeyAndStartOffset::new_with_start_offset(scroll_ctx.search_request.start_offset); + let payload: Vec = scroll_ctx.serialize(); + let put_scroll_context_req = PutKvRequest { + key: u128::from(scroll_key_and_start_offset.scroll_ulid) + .to_le_bytes() + .to_vec(), + payload, + ttl_secs, + }; + cluster_client.put_kv(put_scroll_context_req).await; + let scroll_id = scroll_key_and_start_offset.to_string(); + Some(scroll_id) + } else { + None + }; + let fetch_docs_resps: Vec = try_join_all(fetch_docs_resp_futures).await?; // Merge the fetched docs. @@ -407,23 +450,78 @@ pub async fn root_search( .map(|(_position, hit)| hit) .collect(); - let elapsed = start_instant.elapsed(); - - let aggregation: Option = finalize_aggregation( - leaf_search_response.intermediate_aggregation_result, - aggregations, - searcher_context, - )?; - Ok(SearchResponse { aggregation, num_hits: leaf_search_response.num_hits, hits, - elapsed_time_micros: elapsed.as_micros() as u64, + elapsed_time_micros: 0u64, errors: Vec::new(), + scroll_id, }) } +/// Performs a distributed search. +/// 1. Sends leaf request over gRPC to multiple leaf nodes. +/// 2. Merges the search results. +/// 3. Sends fetch docs requests to multiple leaf nodes. +/// 4. Builds the response with docs and returns. +#[instrument(skip(search_request, cluster_client, metastore))] +pub async fn root_search( + searcher_context: &SearcherContext, + mut search_request: SearchRequest, + metastore: &dyn Metastore, + cluster_client: &ClusterClient, +) -> crate::Result { + let start_instant = tokio::time::Instant::now(); + let index_metadata = metastore.index_metadata(&search_request.index_id).await?; + let index_uid = index_metadata.index_uid.clone(); + let index_config = index_metadata.into_index_config(); + + let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) + .map_err(|err| { + SearchError::InternalError(format!("Failed to build doc mapper. Cause: {err}")) + })?; + + validate_request(&*doc_mapper, &search_request)?; + + let query_ast: QueryAst = serde_json::from_str(&search_request.query_ast) + .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; + let query_ast_resolved = query_ast.parse_user_query(doc_mapper.default_search_fields())?; + + if let Some(timestamp_field) = doc_mapper.timestamp_field_name() { + refine_start_end_timestamp_from_ast( + &query_ast_resolved, + timestamp_field, + &mut search_request.start_timestamp, + &mut search_request.end_timestamp, + ); + } + + // Validates the query by effectively building it against the current schema. + doc_mapper.query(doc_mapper.schema(), &query_ast_resolved, true)?; + + search_request.query_ast = serde_json::to_string(&query_ast_resolved).map_err(|err| { + SearchError::InternalError(format!("Failed to serialize query ast: Cause {err}")) + })?; + + let split_metadatas: Vec = + list_relevant_splits(index_uid, &search_request, metastore).await?; + + let mut search_response = root_search_aux( + searcher_context, + search_request, + &index_config.index_uri, + doc_mapper.clone(), + &query_ast_resolved, + split_metadatas, + cluster_client, + ) + .await?; + + search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64; + Ok(search_response) +} + pub(crate) fn refine_start_end_timestamp_from_ast( query_ast: &QueryAst, timestamp_field: &str, diff --git a/quickwit/quickwit-search/src/search_after_cache.rs b/quickwit/quickwit-search/src/search_after_cache.rs new file mode 100644 index 00000000000..afcf7f7903e --- /dev/null +++ b/quickwit/quickwit-search/src/search_after_cache.rs @@ -0,0 +1,150 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use quickwit_common::uri::Uri; +use quickwit_doc_mapper::DocMapper; +use quickwit_metastore::SplitMetadata; +use quickwit_proto::SearchRequest; +use quickwit_query::query_ast::QueryAst; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; +use ttl_cache::TtlCache; +use ulid::Ulid; + +/// Maximum capacity of the search after cache. +/// +/// For the moment this value is hardcoded. +/// TODO make configurable. +/// +/// Assuming a search context of 1MB, this can +/// amount to up to 1GB. +const SCROLL_CACHE_CAPACITY: usize = 1_000; + +#[derive(Serialize, Deserialize)] +pub(crate) struct ScrollContext { + pub split_metadatas: Vec, + pub search_request: SearchRequest, + pub index_uri: Uri, + pub doc_mapper: Arc, + pub query_ast: QueryAst, +} + +impl ScrollContext { + pub fn serialize(&self) -> Vec { + let uncompressed_payload = postcard::to_allocvec(self).unwrap(); + // TODO add compression + uncompressed_payload + } + + pub fn load(payload: &[u8]) -> anyhow::Result { + let scroll_context = + postcard::from_bytes(payload).context("Failed to deserialize context")?; + Ok(scroll_context) + } +} + +#[derive(Clone)] +pub(crate) struct MiniKV { + ttl_with_cache: Arc, Vec>>>, +} + +impl Default for MiniKV { + fn default() -> MiniKV { + MiniKV { + ttl_with_cache: Arc::new(RwLock::new(TtlCache::new(SCROLL_CACHE_CAPACITY))), + } + } +} + +impl MiniKV { + pub async fn put(&self, key: Vec, payload: Vec, ttl: Duration) { + let mut cache_lock = self.ttl_with_cache.write().await; + cache_lock.insert(key, payload, ttl); + } + + pub async fn get(&self, key: &[u8]) -> Option> { + let cache_lock = self.ttl_with_cache.read().await; + let search_after_context_bytes = cache_lock.get(key)?; + Some(search_after_context_bytes.clone()) + } +} + +#[derive(Serialize, Deserialize)] +pub struct ScrollKeyAndStartOffset { + pub scroll_ulid: Ulid, + pub start_offset: u64, +} + +impl ScrollKeyAndStartOffset { + pub fn new_with_start_offset(start_offset: u64) -> ScrollKeyAndStartOffset { + let scroll_ulid: Ulid = Ulid::new(); + ScrollKeyAndStartOffset { + scroll_ulid: Ulid::new(), + start_offset, + } + } + + pub fn scroll_key(&self) -> Vec { + u128::from(self.scroll_ulid).to_le_bytes().to_vec() + } +} + +impl ToString for ScrollKeyAndStartOffset { + fn to_string(&self) -> String { + let mut payload = [0u8; 24]; + payload[..16].copy_from_slice(&u128::from(self.scroll_ulid).to_le_bytes()); + payload[16..].copy_from_slice(&self.start_offset.to_le_bytes()); + BASE64_STANDARD.encode(&payload) + } +} + +impl FromStr for ScrollKeyAndStartOffset { + type Err = &'static str; + + fn from_str(scroll_id_str: &str) -> Result { + let base64_decoded: Vec = BASE64_STANDARD + .decode(scroll_id_str) + .map_err(|_| "scroll id is invalid base64.")?; + if base64_decoded.len() != 24 { + return Err("scroll id payload is not 8 bytes long."); + } + let (scroll_id, from_bytes) = base64_decoded.split_at(16); + let scroll_ulid_bytes: [u8; 16] = scroll_id.try_into().unwrap(); + let scroll_ulid = u128::from_le_bytes(scroll_ulid_bytes).into(); + let from = u64::from_le_bytes(from_bytes.try_into().unwrap()); + Ok(ScrollKeyAndStartOffset { + scroll_ulid, + start_offset: from, + }) + } +} + +#[cfg(test)] +mod tests { + + #[test] + fn test_scroll_id() {} +} diff --git a/quickwit/quickwit-search/src/search_job_placer.rs b/quickwit/quickwit-search/src/search_job_placer.rs index 3b28033958b..4cc77aaa1ef 100644 --- a/quickwit/quickwit-search/src/search_job_placer.rs +++ b/quickwit/quickwit-search/src/search_job_placer.rs @@ -66,8 +66,33 @@ impl SearchJobPlacer { } } +struct SocketAddrAndClient((SocketAddr, SearchServiceClient)); + +impl Hash for SocketAddrAndClient { + fn hash(&self, hasher: &mut H) { + self.0 .0.hash(hasher); + } +} + impl SearchJobPlacer { - /// Assign the given job to the clients. + pub async fn best_nodes_per_affinity( + &self, + affinity_key: &[u8], + ) -> impl Iterator { + let mut nodes: Vec = self + .searcher_pool + .all() + .await + .into_iter() + .map(SocketAddrAndClient) + .collect(); + sort_by_rendez_vous_hash(&mut nodes[..], affinity_key); + nodes + .into_iter() + .map(|socket_addr_and_client| socket_addr_and_client.0 .1) + } + + /// Assign the given job to the clients /// Returns a list of pair (SocketAddr, `Vec`) /// /// When exclude_addresses filters all clients it is ignored. diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index fc0a8683505..4d6de64f7bd 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -18,7 +18,9 @@ // along with this program. If not, see . use std::pin::Pin; +use std::str::FromStr; use std::sync::Arc; +use std::time::{Duration, Instant}; use async_trait::async_trait; use bytes::Bytes; @@ -27,9 +29,10 @@ use quickwit_config::SearcherConfig; use quickwit_doc_mapper::DocMapper; use quickwit_metastore::Metastore; use quickwit_proto::{ - FetchDocsRequest, FetchDocsResponse, LeafListTermsRequest, LeafListTermsResponse, + FetchDocsRequest, FetchDocsResponse, GetKvRequest, LeafListTermsRequest, LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, LeafSearchStreamResponse, - ListTermsRequest, ListTermsResponse, SearchRequest, SearchResponse, SearchStreamRequest, + ListTermsRequest, ListTermsResponse, PutKvRequest, ScrollRequest, SearchRequest, + SearchResponse, SearchStreamRequest, }; use quickwit_storage::{Cache, MemorySizedCache, QuickwitCache, StorageResolver}; use tantivy::aggregation::AggregationLimits; @@ -38,6 +41,8 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::info; use crate::leaf_cache::LeafSearchCache; +use crate::root::root_search_aux; +use crate::search_after_cache::{MiniKV, ScrollContext, ScrollKeyAndStartOffset}; use crate::search_stream::{leaf_search_stream, root_search_stream}; use crate::{ fetch_docs, leaf_list_terms, leaf_search, root_list_terms, root_search, ClusterClient, @@ -51,6 +56,7 @@ pub struct SearchServiceImpl { storage_resolver: StorageResolver, cluster_client: ClusterClient, searcher_context: Arc, + search_after_cache: MiniKV, } /// Trait representing a search service. @@ -112,6 +118,12 @@ pub trait SearchService: 'static + Send + Sync { &self, request: LeafListTermsRequest, ) -> crate::Result; + + async fn scroll(&self, scroll_request: ScrollRequest) -> crate::Result; + + async fn put_kv(&self, put_kv: PutKvRequest); + + async fn get_kv(&self, get_kv: GetKvRequest) -> Option>; } impl SearchServiceImpl { @@ -128,6 +140,7 @@ impl SearchServiceImpl { storage_resolver, cluster_client, searcher_context, + search_after_cache: MiniKV::default(), } } } @@ -149,7 +162,6 @@ impl SearchService for SearchServiceImpl { &self.cluster_client, ) .await?; - Ok(search_result) } @@ -279,6 +291,53 @@ impl SearchService for SearchServiceImpl { Ok(leaf_search_response) } + + async fn scroll(&self, scroll_request: ScrollRequest) -> crate::Result { + let start = Instant::now(); + let ScrollKeyAndStartOffset { + scroll_ulid, + start_offset, + } = ScrollKeyAndStartOffset::from_str(&scroll_request.scroll_id) + .map_err(|msg| SearchError::InvalidArgument(msg.to_string()))?; + let payload = self + .cluster_client + .get_kv(u128::from(scroll_ulid).to_le_bytes().to_vec()) + .await; + let payload = payload + .ok_or_else(|| SearchError::InternalError("scroll key not found.".to_string()))?; + let ScrollContext { + index_uri, + doc_mapper, + query_ast, + split_metadatas, + mut search_request, + } = ScrollContext::load(&payload) + .map_err(|_| SearchError::InternalError("Corrupted scroll context.".to_string()))?; + let mut search_response = root_search_aux( + &self.searcher_context, + search_request, + &index_uri, + doc_mapper, + &query_ast, + split_metadatas, + &self.cluster_client, + ) + .await?; + search_response.elapsed_time_micros = start.elapsed().as_micros() as u64; + Ok(search_response) + } + + async fn put_kv(&self, put_request: PutKvRequest) { + let ttl = Duration::from_secs(put_request.ttl_secs as u64); + self.search_after_cache + .put(put_request.key, put_request.payload, ttl) + .await; + } + + async fn get_kv(&self, get_request: GetKvRequest) -> Option> { + let payload: Vec = self.search_after_cache.get(&get_request.key).await?; + Some(payload) + } } /// [`SearcherContext`] provides a common set of variables diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 875eda897a3..995b5346db1 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -17,6 +17,7 @@ byte-unit = { workspace = true } elasticsearch-dsl = "0.4" futures = { workspace = true } futures-util = { workspace = true } +humantime = { workspace = true } http-serde = { workspace = true } hyper = { workspace = true } itertools = { workspace = true } diff --git a/quickwit/quickwit-serve/src/elastic_search_api/filter.rs b/quickwit/quickwit-serve/src/elastic_search_api/filter.rs index bcc65242ee2..a113105a717 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/filter.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/filter.rs @@ -24,7 +24,9 @@ use warp::reject::LengthRequired; use warp::{Filter, Rejection}; use super::model::MultiSearchQueryParams; -use crate::elastic_search_api::model::{ElasticIngestOptions, SearchBody, SearchQueryParams}; +use crate::elastic_search_api::model::{ + ElasticIngestOptions, ScrollQueryParams, SearchBody, SearchQueryParams, +}; const BODY_LENGTH_LIMIT: Byte = byte_unit::Byte::from_bytes(1_000_000); const CONTENT_LENGTH_LIMIT: Byte = byte_unit::Byte::from_bytes(10 * 1024 * 1024); // 10MiB @@ -157,3 +159,30 @@ pub(crate) fn elastic_multi_search_filter( .and(warp::post()) .and(serde_qs::warp::query(serde_qs::Config::default())) } + +fn merge_scroll_body_params( + from_query_string: ScrollQueryParams, + from_body: ScrollQueryParams, +) -> ScrollQueryParams { + ScrollQueryParams { + scroll: from_query_string.scroll.or(from_body.scroll), + scroll_id: from_query_string.scroll_id.or(from_body.scroll_id), + } +} + +#[utoipa::path(post, tag = "Search", path = "/_search/scroll")] +pub(crate) fn elastic_scroll_filter( +) -> impl Filter + Clone { + warp::path!("_elastic" / "_search" / "scroll") + .and(warp::body::content_length_limit( + BODY_LENGTH_LIMIT.get_bytes(), + )) + .and(warp::get().or(warp::post()).unify()) + .and(serde_qs::warp::query(serde_qs::Config::default())) + .and(json_or_empty()) + .map( + |scroll_query_params: ScrollQueryParams, scroll_body: ScrollQueryParams| { + merge_scroll_body_params(scroll_query_params, scroll_body) + }, + ) +} diff --git a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs index bb43350ada9..3e0a32e9707 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs @@ -30,8 +30,8 @@ use quickwit_config::QuickwitConfig; use quickwit_ingest::IngestServiceClient; use quickwit_search::SearchService; use rest_handler::{ - es_compat_cluster_info_handler, es_compat_index_multi_search_handler, - es_compat_index_search_handler, es_compat_search_handler, + es_compat_index_multi_search_handler, es_compat_index_search_handler, es_compat_scroll_handler, + es_compat_search_handler, es_compat_cluster_info_handler }; use serde::{Deserialize, Serialize}; use warp::{Filter, Rejection}; @@ -50,6 +50,7 @@ pub fn elastic_api_handlers( es_compat_cluster_info_handler(quickwit_config, BuildInfo::get()) .or(es_compat_search_handler(search_service.clone())) .or(es_compat_index_search_handler(search_service.clone())) + .or(es_compat_scroll_handler(search_service.clone())) .or(es_compat_index_multi_search_handler(search_service)) .or(es_compat_bulk_handler(ingest_service.clone())) .or(es_compat_index_bulk_handler(ingest_service)) diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs index 3f1313df3e8..9a64c3e9358 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs @@ -21,6 +21,7 @@ mod bulk_body; mod bulk_query_params; mod error; mod multi_search; +mod scroll; mod search_body; mod search_query_params; @@ -31,6 +32,7 @@ pub use multi_search::{ MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse, }; use quickwit_proto::SortOrder; +pub use scroll::ScrollQueryParams; pub use search_body::SearchBody; pub use search_query_params::SearchQueryParams; diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/scroll.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/scroll.rs new file mode 100644 index 00000000000..56649b7057f --- /dev/null +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/scroll.rs @@ -0,0 +1,26 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use serde::Deserialize; + +#[derive(Deserialize, Default)] +pub struct ScrollQueryParams { + pub scroll: Option, + pub scroll_id: Option, +} diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/search_query_params.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/search_query_params.rs index 2afad504afd..c108f055fe1 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/model/search_query_params.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/search_query_params.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::str::FromStr; +use std::time::Duration; use quickwit_proto::SortOrder; use quickwit_query::BooleanOperand; @@ -189,6 +190,20 @@ impl SearchQueryParams { } Ok(Some(sort_fields)) } + + /// Returns the scroll duration supplied by the user. + /// + /// This function returns an error if the scroll duration is not in the expected format. (`40s` + /// etc.) + pub fn parse_scroll_ttl(&self) -> Result, SearchError> { + let Some(scroll_str) = self.scroll.as_ref() else { + return Ok(None); + }; + let duration: Duration = humantime::parse_duration(scroll_str).map_err(|_err| { + SearchError::InvalidArgument(format!("Invalid scroll duration: `{scroll_str}`")) + })?; + Ok(Some(duration)) + } } #[doc = "Whether to expand wildcard expression to concrete indices that are open, closed or both."] diff --git a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs index 2a2d49fa0bb..eb3f33f471a 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs @@ -19,7 +19,7 @@ use std::str::from_utf8; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use bytes::Bytes; use elasticsearch_dsl::search::{Hit as ElasticHit, SearchResponse as ElasticSearchResponse}; @@ -29,7 +29,7 @@ use hyper::StatusCode; use itertools::Itertools; use quickwit_common::truncate_str; use quickwit_config::QuickwitConfig; -use quickwit_proto::{SearchResponse, ServiceErrorCode}; +use quickwit_proto::{ScrollRequest, SearchResponse, ServiceErrorCode}; use quickwit_query::query_ast::{QueryAst, UserInputQuery}; use quickwit_query::BooleanOperand; use quickwit_search::{SearchError, SearchService}; @@ -38,11 +38,11 @@ use warp::{Filter, Rejection}; use super::filter::{ elastic_cluster_info_filter, elastic_index_search_filter, elastic_multi_search_filter, - elastic_search_filter, + elastic_search_filter, elastic_scroll_filter }; use super::model::{ ElasticSearchError, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, - MultiSearchSingleResponse, SearchBody, SearchQueryParams, + MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams, }; use crate::elastic_search_api::model::SortField; use crate::format::BodyFormat; @@ -99,7 +99,17 @@ pub fn es_compat_index_search_handler( .map(make_elastic_api_response) } -/// POST _elastic/_msearch +/// GET or POST _elastic/_search/scroll +pub fn es_compat_scroll_handler( + search_service: Arc, +) -> impl Filter + Clone { + elastic_scroll_filter() + .and(with_arg(search_service)) + .then(es_scroll) + .map(make_elastic_api_response) +} + +/// POST _elastic/_search pub fn es_compat_index_multi_search_handler( search_service: Arc, ) -> impl Filter + Clone { @@ -163,6 +173,9 @@ fn build_request_for_es_api( (None, None) }; + let scroll_duration: Option = search_params.parse_scroll_ttl()?; + let scroll_ttl_secs: Option = scroll_duration.map(|duration| duration.as_secs() as u32); + Ok(quickwit_proto::SearchRequest { index_id, query_ast: serde_json::to_string(&query_ast).expect("Failed to serialize QueryAst"), @@ -171,7 +184,10 @@ fn build_request_for_es_api( aggregation_request, sort_by_field, sort_order, - ..Default::default() + start_timestamp: None, + end_timestamp: None, + snippet_fields: Vec::new(), + scroll_ttl_secs, }) } @@ -287,6 +303,23 @@ async fn es_compat_index_multi_search( Ok(multi_search_response) } +async fn es_scroll( + scroll_query_params: ScrollQueryParams, + search_service: Arc, +) -> Result { + let start_instant = Instant::now(); + let Some(scroll_id) = scroll_query_params.scroll_id.clone() else { + return Err(SearchError::InvalidArgument("Missing scroll_id".to_string()).into()); + }; + let scroll_request = ScrollRequest { scroll_id }; + let search_response: SearchResponse = search_service.scroll(scroll_request).await?; + let elapsed = start_instant.elapsed(); + let mut search_response_rest: ElasticSearchResponse = + convert_to_es_search_response(search_response); + search_response_rest.took = elapsed.as_millis() as u32; + Ok(search_response_rest) +} + fn convert_to_es_search_response(resp: SearchResponse) -> ElasticSearchResponse { let hits: Vec = resp.hits.into_iter().map(convert_hit).collect(); let aggregations: Option = if let Some(aggregation_json) = resp.aggregation { diff --git a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs index 86fbe34bad5..fc2f609bdae 100644 --- a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs +++ b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs @@ -23,7 +23,8 @@ use async_trait::async_trait; use futures::TryStreamExt; use quickwit_proto::{ convert_to_grpc_result, search_service_server as grpc, set_parent_span_from_request_metadata, - tonic, LeafSearchStreamRequest, LeafSearchStreamResponse, ServiceError, + tonic, GetKvRequest, GetKvResponse, LeafSearchStreamRequest, LeafSearchStreamResponse, + ServiceError, }; use quickwit_search::SearchService; use tracing::instrument; @@ -46,8 +47,8 @@ impl grpc::SearchService for GrpcSearchAdapter { ) -> Result, tonic::Status> { set_parent_span_from_request_metadata(request.metadata()); let search_request = request.into_inner(); - let search_res = self.0.root_search(search_request).await; - convert_to_grpc_result(search_res) + let search_result = self.0.root_search(search_request).await; + convert_to_grpc_result(search_result) } #[instrument(skip(self, request))] @@ -57,8 +58,8 @@ impl grpc::SearchService for GrpcSearchAdapter { ) -> Result, tonic::Status> { set_parent_span_from_request_metadata(request.metadata()); let leaf_search_request = request.into_inner(); - let leaf_search_res = self.0.leaf_search(leaf_search_request).await; - convert_to_grpc_result(leaf_search_res) + let leaf_search_result = self.0.leaf_search(leaf_search_request).await; + convert_to_grpc_result(leaf_search_result) } #[instrument(skip(self, request))] @@ -68,8 +69,8 @@ impl grpc::SearchService for GrpcSearchAdapter { ) -> Result, tonic::Status> { set_parent_span_from_request_metadata(request.metadata()); let fetch_docs_request = request.into_inner(); - let fetch_docs_res = self.0.fetch_docs(fetch_docs_request).await; - convert_to_grpc_result(fetch_docs_res) + let fetch_docs_result = self.0.fetch_docs(fetch_docs_request).await; + convert_to_grpc_result(fetch_docs_result) } type LeafSearchStreamStream = std::pin::Pin< @@ -102,8 +103,8 @@ impl grpc::SearchService for GrpcSearchAdapter { ) -> Result, tonic::Status> { set_parent_span_from_request_metadata(request.metadata()); let search_request = request.into_inner(); - let search_res = self.0.root_list_terms(search_request).await; - convert_to_grpc_result(search_res) + let search_result = self.0.root_list_terms(search_request).await; + convert_to_grpc_result(search_result) } #[instrument(skip(self, request))] @@ -113,7 +114,39 @@ impl grpc::SearchService for GrpcSearchAdapter { ) -> Result, tonic::Status> { set_parent_span_from_request_metadata(request.metadata()); let leaf_search_request = request.into_inner(); - let leaf_search_res = self.0.leaf_list_terms(leaf_search_request).await; - convert_to_grpc_result(leaf_search_res) + let leaf_search_result = self.0.leaf_list_terms(leaf_search_request).await; + convert_to_grpc_result(leaf_search_result) + } + + async fn scroll( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let scroll_request = request.into_inner(); + let scroll_result = self.0.scroll(scroll_request).await; + convert_to_grpc_result(scroll_result) + } + + #[instrument(skip(self, request))] + async fn put_kv( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + set_parent_span_from_request_metadata(request.metadata()); + let put_request = request.into_inner(); + self.0.put_kv(put_request).await; + Ok(tonic::Response::new(quickwit_proto::PutKvResponse {})) + } + + #[instrument(skip(self, request))] + async fn get_kv( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + set_parent_span_from_request_metadata(request.metadata()); + let get_search_after_context_request = request.into_inner(); + let payload = self.0.get_kv(get_search_after_context_request).await; + let get_response = GetKvResponse { payload }; + Ok(tonic::Response::new(get_response)) } } diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index edd248c0d4f..6bff3f8b1c7 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -205,6 +205,7 @@ async fn search_endpoint( .map(|agg| serde_json::to_string(&agg).expect("could not serialize JsonValue")), sort_order, sort_by_field, + scroll_ttl_secs: None, }; let search_response = search_service.root_search(search_request).await?; let search_response_rest = SearchResponseRest::try_from(search_response)?; diff --git a/quickwit/rest-api-tests/rest_api_test.py b/quickwit/rest-api-tests/rest_api_test.py index 647df620625..89decb590d7 100755 --- a/quickwit/rest-api-tests/rest_api_test.py +++ b/quickwit/rest-api-tests/rest_api_test.py @@ -36,13 +36,15 @@ def open_scenario(scenario_filepath): if type(step_dict) == dict: yield step_dict -def run_step(step): +def run_step(step, previous_result): + result = {} if "method" in step: methods = step["method"] if type(methods) != list: methods = [methods] for method in methods: - run_request_step(method, step) + result = run_request_step(method, step, previous_result) + return result def load_data(path): if path.endswith("gz"): @@ -50,7 +52,24 @@ def load_data(path): else: return open(path, 'rb').read() -def run_request_step(method, step): +def resolve_previous_result(c, previous_result): + if type(c) == dict: + result = {} + if len(c) == 1 and "$previous" in c: + return eval(c["$previous"], None, {"val": previous_result}) + for (k, v) in c.items(): + result[k] = resolve_previous_result(v, previous_result) + return result + if type(c) == list: + return [ + resolve_previous_result(v, previous_result) + for v in c + ] + return c + +print(resolve_previous_result({"hello": {"$previous": "val[\"scroll_id\"]"}}, {"scroll_id": "123"})) + +def run_request_step(method, step, previous_result): assert method in {"GET", "POST", "PUT", "DELETE"} if "headers" not in step: step["headers"] = {'user-agent': 'my-app/0.0.1'} @@ -66,6 +85,7 @@ def run_request_step(method, step): if body_from_file is not None: body_from_file = osp.join(step["cwd"], body_from_file) kvargs["data"] = load_data(body_from_file) + kvargs = resolve_previous_result(kvargs, previous_result) ndjson = step.get("ndjson", None) if ndjson is not None: kvargs["data"] = "\n".join([json.dumps(doc) for doc in ndjson]) @@ -77,12 +97,14 @@ def run_request_step(method, step): print(r.text) raise Exception("Wrong status code. Got %s, expected %s" % (r.status_code, expected_status_code)) expected_resp = step.get("expected", None) + json_res = r.json() if expected_resp is not None: try: - check_result(r.json(), expected_resp) + check_result(json_res, expected_resp, context_path="") except Exception as e: - print(json.dumps(r.json(), indent=2)) + print(json.dumps(json_res, indent=2)) raise e + return json_res def check_result(result, expected, context_path = ""): if type(expected) == dict and "$expect" in expected: @@ -192,6 +214,7 @@ def run_scenario(self, path, script): steps = list(open_scenario(scenario_path)) num_steps_executed = 0 num_steps_skipped = 0 + previous_result = {} for (i, step) in enumerate(steps, 1): step = stack_dicts(self.context, step) applicable_engine = step.get("engines", None) @@ -200,7 +223,7 @@ def run_scenario(self, path, script): num_steps_skipped += 1 continue try: - run_step(step) + previous_result = run_step(step, previous_result) num_steps_executed += 1 except Exception as e: print("🔴 %s" % scenario_path) diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0011-scroll-api.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0011-scroll-api.yaml new file mode 100644 index 00000000000..b172a3685eb --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0011-scroll-api.yaml @@ -0,0 +1,60 @@ +params: + size: 1 + scroll: 30m +json: + query: + match_all: {} + sort: + - actor.id: + order: desc + aggs: + mytypeagg: + terms: + field: type + size: 5 +store: + scroll_id: _scroll_id +expected: + _scroll_id: + $expect: "len(val) > 4" + aggregations: + mytypeagg: {} + hits: + hits: + - _source: {actor: {login: "miyuotsuki"}} + total: + value: 100 + relation: "eq" +--- +method: GET +endpoint: "_search/scroll" +params: + scroll: 30m +json: + scroll_id: + $previous: "val[\"_scroll_id\"]" +expected: + _scroll_id: + $expect: "len(val) > 4" + hits: + hits: + - _source: {actor: {login: "ScottThiessen"}} + total: + value: 100 +--- +method: GET +endpoint: "_search/scroll" +params: + scroll: 30m +json: + scroll_id: + $previous: "val[\"_scroll_id\"]" +expected: + _scroll_id: + $expect: "len(val) > 4" + hits: + hits: + - _source: {actor: {login: "seenajon"}} + total: + value: 100 +