diff --git a/docs/internals/scroll.md b/docs/internals/scroll.md new file mode 100644 index 00000000000..524f0549f20 --- /dev/null +++ b/docs/internals/scroll.md @@ -0,0 +1,66 @@ +# Scroll API + +The scroll API has been implemented to offer compatibility with ElasticSearch. +The API and the implementation are quirky and are detailed in this document. + +## API description + +You can find information about the scroll API here. +https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html#scroll-search-results +https://www.elastic.co/guide/en/elasticsearch/reference/current/scroll-api.html + +The user runs a regular search request with a `scroll` param. +The search result then contains the normal response, but a `_scroll` property is added to the search body. + +That id is then meant to be sent to a scroll rest API. +This API successive calls will then return pages incrementally. + +## Quirk and difficulty. + +The scrolled results should be consistent with the state of the original index. +For this reason we need to capture the state of the index at the point of the original request. + +The elasticsearch API is needlessly broken as it returns the same scroll_id over and over. +If a network error happens between the client and the server at page N, there is no way for the client to ask the reemission of page N. +Page N+1 will be returned on the next call. + + +## Implementation + +The scroll context contains: +- the detail about the original query (we need to be able to reemit paginated queries) +- the list of split metadata used for the query +- a cached list of partial docs (= not the doc content, just its address and its score) to avoid +- the total number of results, in order to append that information to our response. +searching at every single scroll requests. + +We use a simple leaderless KV store to keep the state required to run the scroll API. +We generate a scroll ULID and use it to get a list of the servers with the best affinity according +to rendez vous hashing. We then go through them in order and attempt to put that key on up to 2 servers. +Failures for these PUTs are silent. + +For each call to scroll, one of two things can happen: +- the partial docs for the page requested is in the partial doc cache. We just run the fetch_docs phase, +and update the context with the `start_offset`. +- the partial docs for the page request are not in the partial doc cache. We then run a new search query. + +We attempt to fetch `SCROLL_CACHE_CAPACITY` in order to fill the partial doc address cache for subsequent calls. + +# A strange `scroll_id`. + +For more robustness, the scroll id is the concatenation of the +- ULID: used as the address for the search context. +- the start_offset. + +The idea here is that if that if the put request failed, we can still return the right results even if we have an obsolete version of the `ScrollContext`. +We indeed take the max of the start_offset supplied in the `scroll_id` and present in the `ScrollContext`. + +# Quickwit specific quirks + +Our state is pretty volatile. Some scrolls may end up being broken if we were to remove 2 servers within 30mn. + +The scroll lifetime does not extend the life of a scroll context. +We do not anything to prevent splits from being GCed and only rely on the grace period to make sure this does not happen. + +The ES API does not always updates the `_scroll_id`. It does not seem to change in the different calls. +A misimplemented client might therefore appear to work correctly on one shard with elasticsearch and not work on quickwit. diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index c67b6bd30e9..64a8ab2442e 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5472,6 +5472,7 @@ dependencies = [ "anyhow", "assert-json-diff 2.0.2", "async-trait", + "base64 0.21.2", "bytes", "chitchat", "fnv", @@ -5510,6 +5511,8 @@ dependencies = [ "tower", "tracing", "tracing-opentelemetry", + "ttl_cache", + "ulid", "utoipa", ] @@ -5527,6 +5530,7 @@ dependencies = [ "futures", "futures-util", "http-serde", + "humantime", "hyper", "itertools 0.11.0", "mime_guess", @@ -7733,6 +7737,15 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "ttl_cache" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4189890526f0168710b6ee65ceaedf1460c48a14318ceec933cb26baa492096a" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "tungstenite" version = "0.18.0" diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 1b567cdd32a..cdeab4a6fed 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -174,6 +174,7 @@ tracing-subscriber = { version = "0.3.16", features = [ "std", "env-filter", ] } +ttl_cache = "0.5" typetag = "0.2" ulid = "1.0" username = "0.2" diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 5b390432416..9b1fe9a4a9d 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -32,6 +32,7 @@ pub mod pubsub; pub mod rand; pub mod rendezvous_hasher; pub mod runtimes; +pub mod shared_consts; pub mod sorted_iter; pub mod stream_utils; diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs new file mode 100644 index 00000000000..9ead79f9570 --- /dev/null +++ b/quickwit/quickwit-common/src/shared_consts.rs @@ -0,0 +1,30 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::time::Duration; + +/// We cannot safely delete splits right away as a: +/// - in-flight queries could actually have selected this split, +/// - scroll queries may also have a point in time on these splits. +/// +/// We deal this probably by introducing a grace period. A split is first marked as delete, +/// and hence won't be selected for search. After a few minutes, once it reasonably safe to assume +/// that all queries involving this split have terminated, we effectively delete the split. +/// This duration is controlled by `DELETION_GRACE_PERIOD`. +pub const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(60 * 32); // 32 min diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index ddfe2b79504..47090cc4228 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -26,6 +26,7 @@ use async_trait::async_trait; use futures::{stream, StreamExt}; use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, Handler}; +use quickwit_common::shared_consts::DELETION_GRACE_PERIOD; use quickwit_metastore::Metastore; use quickwit_storage::StorageResolver; use serde::Serialize; @@ -40,14 +41,6 @@ const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes /// the grace period strategy should do the job for the moment. const STAGED_GRACE_PERIOD: Duration = Duration::from_secs(60 * 60 * 24); // 24 hours -/// We cannot safely delete splits right away as a in-flight queries could actually -/// have selected this split. -/// We deal this probably by introducing a grace period. A split is first marked as delete, -/// and hence won't be selected for search. After a few minutes, once it reasonably safe to assume -/// that all queries involving this split have terminated, we effectively delete the split. -/// This duration is controlled by `DELETION_GRACE_PERIOD`. -const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(120); // 2 min - const MAX_CONCURRENT_GC_TASKS: usize = if cfg!(test) { 2 } else { 10 }; #[derive(Clone, Debug, Default, Serialize)] @@ -210,6 +203,7 @@ mod tests { use std::path::Path; use quickwit_actors::Universe; + use quickwit_common::shared_consts::DELETION_GRACE_PERIOD; use quickwit_metastore::{ IndexMetadata, ListSplitsQuery, MetastoreError, MockMetastore, Split, SplitMetadata, SplitState, diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 6fac124a732..55ade13b0b6 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -52,13 +52,47 @@ service SearchService { // It is also in charge of merging back the results. rpc RootListTerms(ListTermsRequest) returns (ListTermsResponse); - // Perform a leaf list terms on a given set of splits. + // Performs a leaf list terms on a given set of splits. // // It is like a regular list term except that: // - the node should perform the listing locally instead of dispatching // it to other nodes. // - it should be applied on the given subset of splits rpc LeafListTerms(LeafListTermsRequest) returns (LeafListTermsResponse); + + // Performs a scroll request. + rpc Scroll(ScrollRequest) returns (SearchResponse); + + // gRPC request used to store a key in the local storage of the targetted node. + // This RPC is used in the mini distributed immutable KV store embedded in quickwit. + rpc PutKV(PutKVRequest) returns (PutKVResponse); + + // Gets a key from the local storage of the targetted node. + // This RPC is used in the mini distributed immutable KV store embedded in quickwit. + rpc GetKV(GetKVRequest) returns (GetKVResponse); +} + +/// Scroll Request +message ScrollRequest { + /// The `scroll_id` is the given in the response of a search request including a scroll. + string scroll_id = 1; + optional uint32 scroll_ttl_secs = 2; +} + +message PutKVRequest { + bytes key = 1; + bytes payload = 2; + uint32 ttl_secs = 3; +} + +message PutKVResponse {} + +message GetKVRequest { + bytes key = 1; +} + +message GetKVResponse { + optional bytes payload = 1; } // -- Search ------------------- @@ -109,6 +143,11 @@ message SearchRequest { // Optional sort by one or more fields (limited to 2 at the moment). repeated SortField sort_fields = 14; + + // If set, the search response will include a search id + // that will make it possible to paginate through the results + // in a consistent manner. + optional uint32 scroll_ttl_secs = 15; } message SortField { @@ -117,9 +156,9 @@ message SortField { } enum SortOrder { - /// Ascending order. + // Ascending order. ASC = 0; - /// Descending order. + // Descending order. DESC = 1; //< This will be the default value; } @@ -138,6 +177,8 @@ message SearchResponse { // Serialized aggregation response optional string aggregation = 5; + // Scroll Id (only set if scroll_secs was set in the request) + optional string scroll_id = 6; } message SplitSearchError { @@ -183,19 +224,19 @@ message SplitIdAndFooterOffsets { optional int64 timestamp_end = 5; } -/// Hits returned by a FetchDocRequest. -/// -/// The json that is joined is the raw tantivy json doc. -/// It is very different from a quickwit json doc. -/// -/// For instance: -/// - it may contain a _source and a _dynamic field. -/// - since tantivy has no notion of cardinality, -/// all fields is are arrays. -/// - since tantivy has no notion of object, the object is -/// flattened by concatenating the path to the root. -/// -/// See `quickwit_search::convert_leaf_hit` +// Hits returned by a FetchDocRequest. +// +// The json that is joined is the raw tantivy json doc. +// It is very different from a quickwit json doc. +// +// For instance: +// - it may contain a _source and a _dynamic field. +// - since tantivy has no notion of cardinality, +// all fields is are arrays. +// - since tantivy has no notion of object, the object is +// flattened by concatenating the path to the root. +// +// See `quickwit_search::convert_leaf_hit` message LeafHit { // The actual content of the hit/ string leaf_json = 1; @@ -280,6 +321,11 @@ message LeafSearchResponse { optional bytes intermediate_aggregation_result = 6; } +message SnippetRequest { + repeated string snippet_fields = 1; + string query_ast_resolved = 2; +} + message FetchDocsRequest { // Request fetching the content of a given list of partial_hits. repeated PartialHit partial_hits = 1; @@ -296,12 +342,12 @@ message FetchDocsRequest { // split files. string index_uri = 4; - // Search request. This is a perfect copy of the original search request, - // that was sent to root apart from the start_offset & max_hits params. - SearchRequest search_request = 5; + optional SnippetRequest snippet_request = 7; // `DocMapper` as json serialized trait. string doc_mapper = 6; + + reserved 5; } message FetchDocsResponse { @@ -372,11 +418,11 @@ message LeafListTermsResponse { // -- Stream ------------------- enum OutputFormat { - /// Comma Separated Values format (https://datatracker.ietf.org/doc/html/rfc4180). - /// The delimiter is `,`. + // Comma Separated Values format (https://datatracker.ietf.org/doc/html/rfc4180). + // The delimiter is `,`. CSV = 0; //< This will be the default value - /// Format data by row in ClickHouse binary format. - /// https://clickhouse.tech/docs/en/interfaces/formats/#rowbinary + // Format data by row in ClickHouse binary format. + // https://clickhouse.tech/docs/en/interfaces/formats/#rowbinary CLICK_HOUSE_ROW_BINARY = 1; } @@ -409,7 +455,7 @@ message SearchStreamRequest { optional string partition_by_field = 9; // Fields to extract snippet on. - repeated string snippet_fields = 10; + repeated string snippet_fields = 10; } message LeafSearchStreamRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index d9c6acd532f..86a51d0c614 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -1,3 +1,43 @@ +/// / Scroll Request +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ScrollRequest { + /// / The `scroll_id` is the given in the response of a search request including a scroll. + #[prost(string, tag = "1")] + pub scroll_id: ::prost::alloc::string::String, + #[prost(uint32, optional, tag = "2")] + pub scroll_ttl_secs: ::core::option::Option, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PutKvRequest { + #[prost(bytes = "vec", tag = "1")] + pub key: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "2")] + pub payload: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "3")] + pub ttl_secs: u32, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PutKvResponse {} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetKvRequest { + #[prost(bytes = "vec", tag = "1")] + pub key: ::prost::alloc::vec::Vec, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetKvResponse { + #[prost(bytes = "vec", optional, tag = "1")] + pub payload: ::core::option::Option<::prost::alloc::vec::Vec>, +} #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[derive(Eq, Hash)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -35,6 +75,11 @@ pub struct SearchRequest { /// Optional sort by one or more fields (limited to 2 at the moment). #[prost(message, repeated, tag = "14")] pub sort_fields: ::prost::alloc::vec::Vec, + /// If set, the search response will include a search id + /// that will make it possible to paginate through the results + /// in a consistent manner. + #[prost(uint32, optional, tag = "15")] + pub scroll_ttl_secs: ::core::option::Option, } #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[derive(Eq, Hash)] @@ -66,6 +111,9 @@ pub struct SearchResponse { /// Serialized aggregation response #[prost(string, optional, tag = "5")] pub aggregation: ::core::option::Option<::prost::alloc::string::String>, + /// Scroll Id (only set if scroll_secs was set in the request) + #[prost(string, optional, tag = "6")] + pub scroll_id: ::core::option::Option<::prost::alloc::string::String>, } #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -122,19 +170,19 @@ pub struct SplitIdAndFooterOffsets { #[prost(int64, optional, tag = "5")] pub timestamp_end: ::core::option::Option, } -/// / Hits returned by a FetchDocRequest. -/// / -/// / The json that is joined is the raw tantivy json doc. -/// / It is very different from a quickwit json doc. -/// / -/// / For instance: -/// / - it may contain a _source and a _dynamic field. -/// / - since tantivy has no notion of cardinality, -/// / all fields is are arrays. -/// / - since tantivy has no notion of object, the object is -/// / flattened by concatenating the path to the root. -/// / -/// / See `quickwit_search::convert_leaf_hit` +/// Hits returned by a FetchDocRequest. +/// +/// The json that is joined is the raw tantivy json doc. +/// It is very different from a quickwit json doc. +/// +/// For instance: +/// - it may contain a _source and a _dynamic field. +/// - since tantivy has no notion of cardinality, +/// all fields is are arrays. +/// - since tantivy has no notion of object, the object is +/// flattened by concatenating the path to the root. +/// +/// See `quickwit_search::convert_leaf_hit` #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -246,6 +294,15 @@ pub struct LeafSearchResponse { #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct SnippetRequest { + #[prost(string, repeated, tag = "1")] + pub snippet_fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, tag = "2")] + pub query_ast_resolved: ::prost::alloc::string::String, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct FetchDocsRequest { /// Request fetching the content of a given list of partial_hits. #[prost(message, repeated, tag = "1")] @@ -262,10 +319,8 @@ pub struct FetchDocsRequest { /// split files. #[prost(string, tag = "4")] pub index_uri: ::prost::alloc::string::String, - /// Search request. This is a perfect copy of the original search request, - /// that was sent to root apart from the start_offset & max_hits params. - #[prost(message, optional, tag = "5")] - pub search_request: ::core::option::Option, + #[prost(message, optional, tag = "7")] + pub snippet_request: ::core::option::Option, /// `DocMapper` as json serialized trait. #[prost(string, tag = "6")] pub doc_mapper: ::prost::alloc::string::String, @@ -418,9 +473,9 @@ pub struct LeafSearchStreamResponse { #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum SortOrder { - /// / Ascending order. + /// Ascending order. Asc = 0, - /// / Descending order. + /// Descending order. /// /// < This will be the default value; Desc = 1, @@ -450,13 +505,13 @@ impl SortOrder { #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum OutputFormat { - /// / Comma Separated Values format (). - /// / The delimiter is `,`. + /// Comma Separated Values format (). + /// The delimiter is `,`. /// /// < This will be the default value Csv = 0, - /// / Format data by row in ClickHouse binary format. - /// / + /// Format data by row in ClickHouse binary format. + /// ClickHouseRowBinary = 1, } impl OutputFormat { @@ -710,7 +765,7 @@ pub mod search_service_client { ); self.inner.unary(req, path, codec).await } - /// Perform a leaf list terms on a given set of splits. + /// Performs a leaf list terms on a given set of splits. /// /// It is like a regular list term except that: /// - the node should perform the listing locally instead of dispatching @@ -743,6 +798,77 @@ pub mod search_service_client { ); self.inner.unary(req, path, codec).await } + /// Performs a scroll request. + pub async fn scroll( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.search.SearchService/Scroll", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("quickwit.search.SearchService", "Scroll")); + self.inner.unary(req, path, codec).await + } + /// gRPC request used to store a key in the local storage of the targetted node. + /// This RPC is used in the mini distributed immutable KV store embedded in quickwit. + pub async fn put_kv( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.search.SearchService/PutKV", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("quickwit.search.SearchService", "PutKV")); + self.inner.unary(req, path, codec).await + } + /// Gets a key from the local storage of the targetted node. + /// This RPC is used in the mini distributed immutable KV store embedded in quickwit. + pub async fn get_kv( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.search.SearchService/GetKV", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("quickwit.search.SearchService", "GetKV")); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -813,7 +939,7 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; - /// Perform a leaf list terms on a given set of splits. + /// Performs a leaf list terms on a given set of splits. /// /// It is like a regular list term except that: /// - the node should perform the listing locally instead of dispatching @@ -826,6 +952,23 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; + /// Performs a scroll request. + async fn scroll( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// gRPC request used to store a key in the local storage of the targetted node. + /// This RPC is used in the mini distributed immutable KV store embedded in quickwit. + async fn put_kv( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// Gets a key from the local storage of the targetted node. + /// This RPC is used in the mini distributed immutable KV store embedded in quickwit. + async fn get_kv( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct SearchServiceServer { @@ -1178,6 +1321,136 @@ pub mod search_service_server { }; Box::pin(fut) } + "/quickwit.search.SearchService/Scroll" => { + #[allow(non_camel_case_types)] + struct ScrollSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::UnaryService + for ScrollSvc { + type Response = super::SearchResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { (*inner).scroll(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = ScrollSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/quickwit.search.SearchService/PutKV" => { + #[allow(non_camel_case_types)] + struct PutKVSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::UnaryService for PutKVSvc { + type Response = super::PutKvResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { (*inner).put_kv(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = PutKVSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/quickwit.search.SearchService/GetKV" => { + #[allow(non_camel_case_types)] + struct GetKVSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::UnaryService for GetKVSvc { + type Response = super::GetKvResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { (*inner).get_kv(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = GetKVSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { Ok( diff --git a/quickwit/quickwit-search/Cargo.toml b/quickwit/quickwit-search/Cargo.toml index 51708d295a2..727bbf33925 100644 --- a/quickwit/quickwit-search/Cargo.toml +++ b/quickwit/quickwit-search/Cargo.toml @@ -12,6 +12,7 @@ documentation = "https://quickwit.io/docs/" [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +base64 = { workspace = true } bytes = { workspace = true } fnv = { workspace = true } futures = { workspace = true } @@ -33,9 +34,11 @@ tokio = { workspace = true } tokio-stream = { workspace = true } tokio-util = { workspace = true } tower = { workspace = true } +ttl_cache = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } opentelemetry = { workspace = true } +ulid = { workspace = true } utoipa = { workspace = true } quickwit-cluster = { workspace = true } diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index 10ca144fd39..9b2aa32e4ea 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -26,7 +26,9 @@ use futures::{StreamExt, TryStreamExt}; use http::Uri; use quickwit_proto::tonic::codegen::InterceptedService; use quickwit_proto::tonic::transport::Endpoint; -use quickwit_proto::{tonic, LeafSearchStreamResponse, SpanContextInterceptor}; +use quickwit_proto::{ + tonic, GetKvRequest, LeafSearchStreamResponse, PutKvRequest, SpanContextInterceptor, +}; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::transport::Channel; use tonic::Request; @@ -227,6 +229,44 @@ impl SearchServiceClient { SearchServiceClientImpl::Local(service) => service.leaf_list_terms(request).await, } } + + /// Gets the value associated to a key stored locally in the targetted node. + /// This call is not "distributed". + /// If the key is not present on the targetted search `None` is simply returned. + pub async fn get_kv(&mut self, get_kv_req: GetKvRequest) -> crate::Result>> { + match &mut self.client_impl { + SearchServiceClientImpl::Local(service) => { + let search_after_context_opt = service.get_kv(get_kv_req).await; + Ok(search_after_context_opt) + } + SearchServiceClientImpl::Grpc(grpc_client) => { + let grpc_resp: tonic::Response = grpc_client + .get_kv(get_kv_req) + .await + .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; + let get_search_after_context_resp = grpc_resp.into_inner(); + Ok(get_search_after_context_resp.payload) + } + } + } + + /// Gets the value associated to a key stored locally in the targetted node. + /// This call is not "distributed". It is up to the client to put the K,V pair + /// on several nodes. + pub async fn put_kv(&mut self, put_kv_req: PutKvRequest) -> crate::Result<()> { + match &mut self.client_impl { + SearchServiceClientImpl::Local(service) => { + service.put_kv(put_kv_req).await; + } + SearchServiceClientImpl::Grpc(grpc_client) => { + grpc_client + .put_kv(put_kv_req) + .await + .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; + } + } + Ok(()) + } } /// Creates a [`SearchServiceClient`] from a socket address. diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index 4c3f78c838b..d17099cd4e3 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -17,22 +17,35 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::time::Duration; + +use futures::future::ready; use futures::StreamExt; use quickwit_proto::{ - FetchDocsRequest, FetchDocsResponse, LeafListTermsRequest, LeafListTermsResponse, + FetchDocsRequest, FetchDocsResponse, GetKvRequest, LeafListTermsRequest, LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, LeafSearchStreamResponse, + PutKvRequest, }; use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::debug; +use tracing::{debug, error}; use crate::retry::search::LeafSearchRetryPolicy; use crate::retry::search_stream::{LeafSearchStreamRetryPolicy, SuccessfulSplitIds}; use crate::retry::{retry_client, DefaultRetryPolicy, RetryPolicy}; use crate::{SearchError, SearchJobPlacer, SearchServiceClient}; +/// Maximum number of put requests emitted to perform a replicated given PUT KV. +const MAX_PUT_KV_ATTEMPTS: usize = 6; + +/// Maximum number of get requests emitted to perform a GET KV request. +const MAX_GET_KV_ATTEMPTS: usize = 6; + +/// We attempt to store our KVs on two nodes. +const TARGET_NUM_REPLICATION: usize = 2; + /// Client that executes placed requests (Request, `SearchServiceClient`) and provides /// retry policies for `FetchDocsRequest`, `LeafSearchRequest` and `LeafSearchStreamRequest` /// to retry on other `SearchServiceClient`. @@ -157,6 +170,57 @@ impl ClusterClient { // TODO: implement retry client.leaf_list_terms(request.clone()).await } + + /// Attempts to store a given search context within the cluster. + /// + /// This function may fail silently, if no clients was available. + pub async fn put_kv(&self, key: &[u8], payload: &[u8], ttl: Duration) { + let clients = self + .search_job_placer + .best_nodes_per_affinity(&key) + .await + .take(MAX_PUT_KV_ATTEMPTS); + + // We run the put requests concurrently. + // Our target is a replication over TARGET_NUM_REPLICATION nodes, we therefore try to avoid + // replicating on more than TARGET_NUM_REPLICATION nodes at the same time. Of + // course, this may still result in the replication over more nodes, but this is not + // a problem. + // + // The requests are made in a concurrent manner, up to two at a time. As soon as 2 requests + // are successful, we stop. + let successful_replication = futures::stream::iter(clients) + .map(|mut client| { + let put_kv_request = PutKvRequest { + key: key.to_vec(), + payload: payload.to_vec(), + ttl_secs: ttl.as_secs() as u32, + }; + async move { client.put_kv(put_kv_request).await } + }) + .buffer_unordered(TARGET_NUM_REPLICATION) + .filter(|put_kv_successful| ready(put_kv_successful.is_ok())) + .take(TARGET_NUM_REPLICATION) + .count() + .await; + + if successful_replication == 0 { + error!(successful_replication=%successful_replication,"failed-to-replicate-scroll-context"); + } + } + + /// Returns a search_after context + pub async fn get_kv(&self, key: &[u8]) -> Option> { + let clients = self.search_job_placer.best_nodes_per_affinity(&key).await; + // On the read side, we attempt to contact up to 6 nodes. + for mut client in clients.take(MAX_GET_KV_ATTEMPTS) { + let get_request = GetKvRequest { key: key.to_vec() }; + if let Ok(Some(search_after_resp)) = client.get_kv(get_request.clone()).await { + return Some(search_after_resp); + } + } + None + } } /// Takes two intermediate aggregation results serialized using postcard, @@ -605,4 +669,97 @@ mod tests { assert_eq!(results.len(), 2); assert!(results[0].is_ok()); } + + #[tokio::test] + async fn test_put_kv_happy_path() { + // 3 servers 1, 2, 3 + // Targetted key has affinity [1, 2, 3]. + // + // Put on 1 and 2 is successful + // Get succeeds on 1. + let mut mock_search_service_1 = MockSearchService::new(); + mock_search_service_1 + .expect_put_kv() + .once() + .returning(|_put_req: quickwit_proto::PutKvRequest| {}); + mock_search_service_1 + .expect_get_kv() + .once() + .returning(|_get_req: quickwit_proto::GetKvRequest| Some(b"my_payload".to_vec())); + let mut mock_search_service_2 = MockSearchService::new(); + mock_search_service_2 + .expect_put_kv() + .once() + .returning(|_put_req: quickwit_proto::PutKvRequest| {}); + let mut mock_search_service_3 = MockSearchService::new(); + // Due to the buffered call it is possible for the + // put request to 3 to be emitted too. + mock_search_service_3 + .expect_put_kv() + .returning(|_put_req: quickwit_proto::PutKvRequest| {}); + let searcher_pool = searcher_pool_for_test([ + ("127.0.0.1:1001", mock_search_service_1), + ("127.0.0.1:1002", mock_search_service_2), + ("127.0.0.1:1003", mock_search_service_3), + ]); + let search_job_placer = SearchJobPlacer::new(searcher_pool); + let cluster_client = ClusterClient::new(search_job_placer); + cluster_client + .put_kv( + &b"my_key"[..], + &b"my_payload"[..], + Duration::from_secs(10 * 60), + ) + .await; + let result = cluster_client.get_kv(&b"my_key"[..]).await; + assert_eq!(result, Some(b"my_payload".to_vec())) + } + + #[tokio::test] + async fn test_put_kv_failing_get() { + // 3 servers 1, 2, 3 + // Targetted key has affinity [1, 2, 3]. + // + // Put on 1 and 2 is successful + // Get fails on 1. + // Get succeeds on 2. + let mut mock_search_service_1 = MockSearchService::new(); + mock_search_service_1 + .expect_put_kv() + .once() + .returning(|_put_req: quickwit_proto::PutKvRequest| {}); + mock_search_service_1 + .expect_get_kv() + .once() + .returning(|_get_req: quickwit_proto::GetKvRequest| None); + let mut mock_search_service_2 = MockSearchService::new(); + mock_search_service_2 + .expect_put_kv() + .once() + .returning(|_put_req: quickwit_proto::PutKvRequest| {}); + mock_search_service_2 + .expect_get_kv() + .once() + .returning(|_get_req: quickwit_proto::GetKvRequest| Some(b"my_payload".to_vec())); + let mut mock_search_service_3 = MockSearchService::new(); + mock_search_service_3 + .expect_put_kv() + .returning(|_leaf_search_req: quickwit_proto::PutKvRequest| {}); + let searcher_pool = searcher_pool_for_test([ + ("127.0.0.1:1001", mock_search_service_1), + ("127.0.0.1:1002", mock_search_service_2), + ("127.0.0.1:1003", mock_search_service_3), + ]); + let search_job_placer = SearchJobPlacer::new(searcher_pool); + let cluster_client = ClusterClient::new(search_job_placer); + cluster_client + .put_kv( + &b"my_key"[..], + &b"my_payload"[..], + Duration::from_secs(10 * 60), + ) + .await; + let result = cluster_client.get_kv(&b"my_key"[..]).await; + assert_eq!(result, Some(b"my_payload".to_vec())) + } } diff --git a/quickwit/quickwit-search/src/fetch_docs.rs b/quickwit/quickwit-search/src/fetch_docs.rs index 8124a0fe89a..ef7a9a891e9 100644 --- a/quickwit/quickwit-search/src/fetch_docs.rs +++ b/quickwit/quickwit-search/src/fetch_docs.rs @@ -24,7 +24,7 @@ use anyhow::{Context, Ok}; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use quickwit_doc_mapper::DocMapper; -use quickwit_proto::{FetchDocsResponse, PartialHit, SearchRequest, SplitIdAndFooterOffsets}; +use quickwit_proto::{FetchDocsResponse, PartialHit, SnippetRequest, SplitIdAndFooterOffsets}; use quickwit_storage::Storage; use tantivy::query::Query; use tantivy::schema::{Field, Value}; @@ -45,7 +45,7 @@ async fn fetch_docs_to_map( index_storage: Arc, splits: &[SplitIdAndFooterOffsets], doc_mapper: Arc, - search_request_opt: Option<&SearchRequest>, + snippet_request_opt: Option<&SnippetRequest>, ) -> anyhow::Result> { let mut split_fetch_docs_futures = Vec::new(); @@ -72,7 +72,7 @@ async fn fetch_docs_to_map( index_storage.clone(), split_and_offset, doc_mapper.clone(), - search_request_opt, + snippet_request_opt, )); } @@ -112,7 +112,7 @@ pub async fn fetch_docs( index_storage: Arc, splits: &[SplitIdAndFooterOffsets], doc_mapper: Arc, - search_request_opt: Option<&SearchRequest>, + snippet_request_opt: Option<&SnippetRequest>, ) -> anyhow::Result { let global_doc_addrs: Vec = partial_hits .iter() @@ -125,7 +125,7 @@ pub async fn fetch_docs( index_storage, splits, doc_mapper, - search_request_opt, + snippet_request_opt, ) .await?; @@ -159,14 +159,13 @@ struct Document { } /// Fetching docs from a specific split. -#[tracing::instrument(skip(global_doc_addrs, index_storage, split, searcher_context))] async fn fetch_docs_in_split( searcher_context: Arc, mut global_doc_addrs: Vec, index_storage: Arc, split: &SplitIdAndFooterOffsets, doc_mapper: Arc, - search_request_opt: Option<&SearchRequest>, + snippet_request_opt: Option<&SnippetRequest>, ) -> anyhow::Result> { global_doc_addrs.sort_by_key(|doc| doc.doc_addr); // Opens the index without the ephemeral unbounded cache, this cache is indeed not useful @@ -187,8 +186,11 @@ async fn fetch_docs_in_split( .reload_policy(ReloadPolicy::Manual) .try_into()?; let searcher = Arc::new(index_reader.searcher()); - let fields_snippet_generator_opt = if let Some(search_request) = search_request_opt { - Some(create_fields_snippet_generator(&searcher, doc_mapper.clone(), search_request).await?) + let fields_snippet_generator_opt = if let Some(snippet_request) = snippet_request_opt { + Some( + create_fields_snippet_generator(&searcher, doc_mapper.clone(), &snippet_request) + .await?, + ) } else { None }; @@ -296,14 +298,14 @@ impl FieldsSnippetGenerator { async fn create_fields_snippet_generator( searcher: &Searcher, doc_mapper: Arc, - search_request: &SearchRequest, + snippet_request: &SnippetRequest, ) -> anyhow::Result { let schema = searcher.schema(); - let query_ast = - serde_json::from_str(&search_request.query_ast).context("Invalid query ast Json")?; - let (query, _) = doc_mapper.query(schema.clone(), &query_ast, false)?; + let query_ast_resolved = serde_json::from_str(&snippet_request.query_ast_resolved) + .context("Failed to deserialize QueryAst.")?; + let (query, _) = doc_mapper.query(schema.clone(), &query_ast_resolved, false)?; let mut snippet_generators = HashMap::new(); - for field_name in &search_request.snippet_fields { + for field_name in &snippet_request.snippet_fields { let field = schema.get_field(field_name)?; let snippet_generator = create_snippet_generator(searcher, &query, field).await?; snippet_generators.insert(field_name.clone(), snippet_generator); diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index cf3da3404bd..9c25cb2f834 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -33,6 +33,7 @@ mod leaf; mod leaf_cache; mod retry; mod root; +mod scroll_context; mod search_job_placer; mod search_response_rest; mod search_stream; @@ -66,7 +67,8 @@ use quickwit_config::{build_doc_mapper, SearcherConfig}; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_metastore::{ListSplitsQuery, Metastore, SplitMetadata, SplitState}; use quickwit_proto::{ - Hit, IndexUid, PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets, + Hit, IndexUid, PartialHit, SearchRequest, SearchResponse, SnippetRequest, + SplitIdAndFooterOffsets, }; use quickwit_storage::StorageResolver; use tantivy::DocAddress; @@ -218,8 +220,11 @@ pub async fn single_node_search( .await .context("Failed to perform leaf search.")?; - let search_request_opt = if !search_request.snippet_fields.is_empty() { - Some(&search_request) + let snippet_request_opt: Option = if !search_request.snippet_fields.is_empty() { + Some(SnippetRequest { + snippet_fields: search_request.snippet_fields.clone(), + query_ast_resolved: search_request.query_ast.clone(), + }) } else { None }; @@ -230,7 +235,7 @@ pub async fn single_node_search( index_storage, &split_metadata, doc_mapper, - search_request_opt, + snippet_request_opt.as_ref(), ) .await .context("Failed to perform fetch docs.")?; @@ -266,6 +271,7 @@ pub async fn single_node_search( .iter() .map(|error| format!("{error:?}")) .collect_vec(), + scroll_id: None, }) } diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 2d8d475ea89..746a93933e3 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -18,17 +18,21 @@ // along with this program. If not, see . use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::time::Duration; use anyhow::Context; use futures::future::try_join_all; use itertools::Itertools; +use quickwit_common::shared_consts::DELETION_GRACE_PERIOD; +use quickwit_common::uri::Uri; use quickwit_config::{build_doc_mapper, IndexConfig}; use quickwit_doc_mapper::{DocMapper, DYNAMIC_FIELD_NAME}; use quickwit_metastore::{Metastore, SplitMetadata}; use quickwit_proto::{ FetchDocsRequest, FetchDocsResponse, Hit, LeafHit, LeafListTermsRequest, LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, ListTermsRequest, ListTermsResponse, PartialHit, - SearchRequest, SearchResponse, SortField, SplitIdAndFooterOffsets, + SearchRequest, SearchResponse, SnippetRequest, SortField, SplitIdAndFooterOffsets, }; use quickwit_query::query_ast::{ BoolQuery, QueryAst, QueryAstVisitor, RangeQuery, TermQuery, TermSetQuery, @@ -43,6 +47,7 @@ use tracing::{debug, error, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; use crate::find_trace_ids_collector::Span; +use crate::scroll_context::{ScrollContext, ScrollKeyAndStartOffset}; use crate::search_job_placer::Job; use crate::service::SearcherContext; use crate::{ @@ -50,6 +55,9 @@ use crate::{ SearchServiceClient, }; +/// Maximum accepted scroll TTL. +const MAX_SCROLL_TTL: Duration = Duration::from_secs(DELETION_GRACE_PERIOD.as_secs() - 60 * 2); + /// SearchJob to be assigned to search clients by the [`SearchJobPlacer`]. #[derive(Debug, Clone, PartialEq)] pub struct SearchJob { @@ -161,6 +169,27 @@ fn validate_sort_by_fields(sort_fields: &[SortField], schema: &Schema) -> crate: Ok(()) } + +fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> SearchRequest { + // We do not mutate + SearchRequest { + index_id: req.index_id.clone(), + query_ast: req.query_ast.clone(), + start_timestamp: req.start_timestamp, + end_timestamp: req.end_timestamp, + max_hits: req.max_hits, + start_offset: req.start_offset, + sort_fields: req.sort_fields.clone(), + // We remove all aggregation request. + // The aggregation will not be computed for each scroll request. + aggregation_request: None, + // We remove the snippet fields. This feature is not supported for scroll requests. + snippet_fields: Vec::new(), + // We remove the scroll ttl parameter. It is irrelevant to process later request + scroll_ttl_secs: None, + } +} + fn validate_sort_by_field(field_name: &str, schema: &Schema) -> crate::Result<()> { if field_name == "_score" { return Ok(()); @@ -221,70 +250,15 @@ pub(crate) fn validate_request( Ok(()) } -/// Performs a distributed search. -/// 1. Sends leaf request over gRPC to multiple leaf nodes. -/// 2. Merges the search results. -/// 3. Sends fetch docs requests to multiple leaf nodes. -/// 4. Builds the response with docs and returns. -#[instrument(skip(search_request, cluster_client, metastore))] -pub async fn root_search( +#[instrument(skip(search_request, cluster_client))] +pub async fn search_partial_hits_round( searcher_context: &SearcherContext, - mut search_request: SearchRequest, - metastore: &dyn Metastore, + search_request: &SearchRequest, + index_uri: &Uri, + doc_mapper_str: &str, + split_metadatas: &[SplitMetadata], cluster_client: &ClusterClient, -) -> crate::Result { - let start_instant = tokio::time::Instant::now(); - - let index_metadata = metastore.index_metadata(&search_request.index_id).await?; - let index_uid = index_metadata.index_uid.clone(); - let index_config = index_metadata.into_index_config(); - - let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) - .map_err(|err| { - SearchError::InternalError(format!("Failed to build doc mapper. Cause: {err}")) - })?; - - validate_request(&*doc_mapper, &search_request)?; - - let query_ast: QueryAst = serde_json::from_str(&search_request.query_ast) - .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; - let query_ast_resolved = query_ast.parse_user_query(doc_mapper.default_search_fields())?; - - if let Some(timestamp_field) = doc_mapper.timestamp_field_name() { - refine_start_end_timestamp_from_ast( - &query_ast_resolved, - timestamp_field, - &mut search_request.start_timestamp, - &mut search_request.end_timestamp, - ); - } - - // Validates the query by effectively building it against the current schema. - doc_mapper.query(doc_mapper.schema(), &query_ast_resolved, true)?; - - search_request.query_ast = serde_json::to_string(&query_ast_resolved).map_err(|err| { - SearchError::InternalError(format!("Failed to serialize query ast: Cause {err}")) - })?; - - let doc_mapper_str = serde_json::to_string(&doc_mapper).map_err(|err| { - SearchError::InternalError(format!("Failed to serialize doc mapper: Cause {err}")) - })?; - - let split_metadatas: Vec = - list_relevant_splits(index_uid, &search_request, metastore).await?; - - let split_offsets_map: HashMap = split_metadatas - .iter() - .map(|metadata| { - ( - metadata.split_id().to_string(), - extract_split_and_footer_offsets(metadata), - ) - }) - .collect(); - - let index_uri = &index_config.index_uri; - +) -> crate::Result { let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); let assigned_leaf_search_jobs = cluster_client @@ -294,7 +268,7 @@ pub async fn root_search( let leaf_search_responses: Vec = try_join_all(assigned_leaf_search_jobs.map(|(client, client_jobs)| { let leaf_request = jobs_to_leaf_request( - &search_request, + search_request, &doc_mapper_str, index_uri.as_ref(), client_jobs, @@ -306,7 +280,6 @@ pub async fn root_search( // Creates a collector which merges responses into one let merge_collector = make_merge_collector(&search_request, &searcher_context.get_aggregation_limits())?; - let aggregations = merge_collector.aggregation.clone(); // Merging is a cpu-bound task. // It should be executed by Tokio's blocking threads. @@ -336,9 +309,29 @@ pub async fn root_search( .join(", "); return Err(SearchError::InternalError(errors)); } + Ok(leaf_search_response) +} + +pub(crate) fn get_snippet_request(search_request: &SearchRequest) -> Option { + if search_request.snippet_fields.is_empty() { + return None; + } + Some(SnippetRequest { + snippet_fields: search_request.snippet_fields.clone(), + query_ast_resolved: search_request.query_ast.clone(), + }) +} - let hit_order: HashMap<(String, u32, u32), usize> = leaf_search_response - .partial_hits +pub async fn fetch_docs_round( + partial_hits: &[PartialHit], + split_metadatas: &[SplitMetadata], + index_id: &str, + index_uri: &Uri, + doc_mapper_str: &str, + snippet_request_opt: Option, + cluster_client: &ClusterClient, +) -> crate::Result> { + let hit_order: HashMap<(String, u32, u32), usize> = partial_hits .iter() .enumerate() .map(|(position, partial_hit)| { @@ -353,8 +346,8 @@ pub async fn root_search( let client_fetch_docs_task: Vec<(SearchServiceClient, Vec)> = assign_client_fetch_doc_tasks( - &leaf_search_response.partial_hits, - &split_offsets_map, + partial_hits, + &split_metadatas[..], &cluster_client.search_job_placer, ) .await?; @@ -372,18 +365,13 @@ pub async fn root_search( .map(|fetch_doc_job| fetch_doc_job.into()) .collect(); - let search_request_opt = if search_request.snippet_fields.is_empty() { - None - } else { - Some(search_request.clone()) - }; let fetch_docs_req = FetchDocsRequest { partial_hits, - index_id: search_request.index_id.to_string(), + index_id: index_id.to_string(), split_offsets, index_uri: index_uri.to_string(), - search_request: search_request_opt, - doc_mapper: doc_mapper_str.clone(), + snippet_request: snippet_request_opt.clone(), + doc_mapper: doc_mapper_str.to_string(), }; cluster_client.fetch_docs(fetch_docs_req, client) }); @@ -416,28 +404,173 @@ pub async fn root_search( .collect(); hits_with_position.sort_by_key(|(position, _)| *position); - let hits = hits_with_position + let hits: Vec = hits_with_position .into_iter() .map(|(_position, hit)| hit) .collect(); - let elapsed = start_instant.elapsed(); + Ok(hits) +} + +/// Performs a distributed search. +/// 1. Sends leaf request over gRPC to multiple leaf nodes. +/// 2. Merges the search results. +/// 3. Sends fetch docs requests to multiple leaf nodes. +/// 4. Builds the response with docs and returns. +#[instrument(skip(search_request, cluster_client))] +pub async fn root_search_aux( + searcher_context: &SearcherContext, + search_request: SearchRequest, + index_uri: &Uri, + doc_mapper: Arc, + query_ast_resolved: QueryAst, + split_metadatas: Vec, + cluster_client: &ClusterClient, +) -> crate::Result { + let aggregations: Option = match &search_request.aggregation_request { + Some(aggregation) => Some(serde_json::from_str(aggregation)?), + None => None, + }; + + let doc_mapper_str = serde_json::to_string(&*doc_mapper).map_err(|err| { + SearchError::InternalError(format!("Failed to serialize doc mapper: Cause {err}")) + })?; + + let first_round_result: LeafSearchResponse = search_partial_hits_round( + searcher_context, + &search_request, + index_uri, + &doc_mapper_str, + &split_metadatas[..], + cluster_client, + ) + .await?; + + let snippet_request: Option = get_snippet_request(&search_request); + let hits = fetch_docs_round( + &first_round_result.partial_hits, + &split_metadatas[..], + &search_request.index_id, + index_uri, + &doc_mapper_str, + snippet_request, + cluster_client, + ) + .await?; + + let mut scroll_context_search_request = simplify_search_request_for_scroll_api(&search_request); + scroll_context_search_request.start_offset += hits.len() as u64; + let scroll_id = if let Some(ttl_secs) = search_request.scroll_ttl_secs.clone() { + let ttl = Duration::from_secs(ttl_secs as u64); + if ttl > MAX_SCROLL_TTL { + return Err(SearchError::InvalidArgument(format!( + "Quickwit only supports scroll TTL period up to {} secs.", + MAX_SCROLL_TTL.as_secs() + ))); + } + let scroll_ctx = ScrollContext { + index_uri: index_uri.clone(), + doc_mapper_str: doc_mapper_str.to_string(), + query_ast: query_ast_resolved, + split_metadatas: split_metadatas.to_vec(), + search_request: scroll_context_search_request, + cached_partial_hits_start_offset: search_request.start_offset, + cached_partial_hits: Vec::new(), + total_num_hits: first_round_result.num_hits, + max_hits_per_page: search_request.max_hits, + }; + let scroll_key_and_start_offset: ScrollKeyAndStartOffset = + ScrollKeyAndStartOffset::new_with_start_offset( + scroll_ctx.search_request.start_offset, + scroll_ctx.search_request.max_hits as u32, + ); + let payload: Vec = scroll_ctx.serialize(); + let scroll_key = scroll_key_and_start_offset.scroll_key(); + cluster_client.put_kv(&scroll_key, &payload, ttl).await; + let scroll_id = scroll_key_and_start_offset.to_string(); + Some(scroll_id) + } else { + None + }; let aggregation: Option = finalize_aggregation( - leaf_search_response.intermediate_aggregation_result, + first_round_result.intermediate_aggregation_result, aggregations, searcher_context, )?; Ok(SearchResponse { aggregation, - num_hits: leaf_search_response.num_hits, + num_hits: first_round_result.num_hits, hits, - elapsed_time_micros: elapsed.as_micros() as u64, + elapsed_time_micros: 0u64, errors: Vec::new(), + scroll_id, }) } +/// Performs a distributed search. +/// 1. Sends leaf request over gRPC to multiple leaf nodes. +/// 2. Merges the search results. +/// 3. Sends fetch docs requests to multiple leaf nodes. +/// 4. Builds the response with docs and returns. +#[instrument(skip(search_request, cluster_client, metastore))] +pub async fn root_search( + searcher_context: &SearcherContext, + mut search_request: SearchRequest, + metastore: &dyn Metastore, + cluster_client: &ClusterClient, +) -> crate::Result { + let start_instant = tokio::time::Instant::now(); + let index_metadata = metastore.index_metadata(&search_request.index_id).await?; + let index_uid = index_metadata.index_uid.clone(); + let index_config = index_metadata.into_index_config(); + + let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) + .map_err(|err| { + SearchError::InternalError(format!("Failed to build doc mapper. Cause: {err}")) + })?; + + validate_request(&*doc_mapper, &search_request)?; + + let query_ast: QueryAst = serde_json::from_str(&search_request.query_ast) + .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; + let query_ast_resolved = query_ast.parse_user_query(doc_mapper.default_search_fields())?; + + if let Some(timestamp_field) = doc_mapper.timestamp_field_name() { + refine_start_end_timestamp_from_ast( + &query_ast_resolved, + timestamp_field, + &mut search_request.start_timestamp, + &mut search_request.end_timestamp, + ); + } + + // Validates the query by effectively building it against the current schema. + doc_mapper.query(doc_mapper.schema(), &query_ast_resolved, true)?; + + search_request.query_ast = serde_json::to_string(&query_ast_resolved).map_err(|err| { + SearchError::InternalError(format!("Failed to serialize query ast: Cause {err}")) + })?; + + let split_metadatas: Vec = + list_relevant_splits(index_uid, &search_request, metastore).await?; + + let mut search_response = root_search_aux( + searcher_context, + search_request, + &index_config.index_uri, + doc_mapper.clone(), + query_ast_resolved, + split_metadatas, + cluster_client, + ) + .await?; + + search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64; + Ok(search_response) +} + pub(crate) fn refine_start_end_timestamp_from_ast( query_ast: &QueryAst, timestamp_field: &str, @@ -717,9 +850,19 @@ pub async fn root_list_terms( async fn assign_client_fetch_doc_tasks( partial_hits: &[PartialHit], - split_offsets_map: &HashMap, + split_metadatas: &[SplitMetadata], client_pool: &SearchJobPlacer, ) -> crate::Result)>> { + let split_offsets_map: HashMap = split_metadatas + .iter() + .map(|metadata| { + ( + metadata.split_id().to_string(), + extract_split_and_footer_offsets(metadata), + ) + }) + .collect(); + // Group the partial hits per split let mut partial_hits_map: HashMap> = HashMap::new(); for partial_hit in partial_hits.iter() { diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs new file mode 100644 index 00000000000..b7c279fd32b --- /dev/null +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -0,0 +1,165 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use quickwit_common::uri::Uri; +use quickwit_metastore::SplitMetadata; +use quickwit_proto::{PartialHit, SearchRequest}; +use quickwit_query::query_ast::QueryAst; +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; +use ttl_cache::TtlCache; +use ulid::Ulid; + +/// Maximum capacity of the search after cache. +/// +/// For the moment this value is hardcoded. +/// TODO make configurable. +/// +/// Assuming a search context of 1MB, this can +/// amount to up to 1GB. +const SCROLL_CACHE_CAPACITY: usize = 1_000; + +#[derive(Serialize, Deserialize)] +pub(crate) struct ScrollContext { + pub split_metadatas: Vec, + pub search_request: SearchRequest, + pub index_uri: Uri, + pub doc_mapper_str: String, + pub query_ast: QueryAst, + pub total_num_hits: u64, + pub max_hits_per_page: u64, + pub cached_partial_hits_start_offset: u64, + pub cached_partial_hits: Vec, +} + +impl ScrollContext { + pub fn serialize(&self) -> Vec { + let uncompressed_payload = serde_json::to_string(self).unwrap(); + uncompressed_payload.as_bytes().to_vec() + } + + pub fn load(payload: &[u8]) -> anyhow::Result { + let scroll_context = + serde_json::from_slice(payload).context("Failed to deserialize context")?; + Ok(scroll_context) + } +} + +#[derive(Clone)] +pub(crate) struct MiniKV { + ttl_with_cache: Arc, Vec>>>, +} + +impl Default for MiniKV { + fn default() -> MiniKV { + MiniKV { + ttl_with_cache: Arc::new(RwLock::new(TtlCache::new(SCROLL_CACHE_CAPACITY))), + } + } +} + +impl MiniKV { + pub async fn put(&self, key: Vec, payload: Vec, ttl: Duration) { + let mut cache_lock = self.ttl_with_cache.write().await; + cache_lock.insert(key, payload, ttl); + } + + pub async fn get(&self, key: &[u8]) -> Option> { + let cache_lock = self.ttl_with_cache.read().await; + let search_after_context_bytes = cache_lock.get(key)?; + Some(search_after_context_bytes.clone()) + } +} + +#[derive(Serialize, Deserialize, Copy, Clone, Debug)] +pub struct ScrollKeyAndStartOffset { + scroll_ulid: Ulid, + pub start_offset: u64, + pub max_hits: u32, +} + +impl ScrollKeyAndStartOffset { + pub fn new_with_start_offset(start_offset: u64, max_hits: u32) -> ScrollKeyAndStartOffset { + let scroll_ulid: Ulid = Ulid::new(); + ScrollKeyAndStartOffset { + scroll_ulid, + start_offset, + max_hits, + } + } + + pub fn next_page(mut self) -> ScrollKeyAndStartOffset { + self.start_offset += self.max_hits as u64; + self + } + + pub fn scroll_key(&self) -> [u8; 16] { + u128::from(self.scroll_ulid).to_le_bytes() + } +} + +impl ToString for ScrollKeyAndStartOffset { + fn to_string(&self) -> String { + let mut payload = [0u8; 28]; + payload[..16].copy_from_slice(&u128::from(self.scroll_ulid).to_le_bytes()); + payload[16..24].copy_from_slice(&self.start_offset.to_le_bytes()); + payload[24..28].copy_from_slice(&self.max_hits.to_le_bytes()); + BASE64_STANDARD.encode(&payload) + } +} + +impl FromStr for ScrollKeyAndStartOffset { + type Err = &'static str; + + fn from_str(scroll_id_str: &str) -> Result { + let base64_decoded: Vec = BASE64_STANDARD + .decode(scroll_id_str) + .map_err(|_| "scroll id is invalid base64.")?; + if base64_decoded.len() != 16 + 8 + 4 { + return Err("scroll id payload is not 8 bytes long."); + } + let (scroll_ulid_bytes, from_bytes, max_hits_bytes) = ( + &base64_decoded[..16], + &base64_decoded[16..24], + &base64_decoded[24..28], + ); + let scroll_ulid = u128::from_le_bytes(scroll_ulid_bytes.try_into().unwrap()).into(); + let from = u64::from_le_bytes(from_bytes.try_into().unwrap()); + let max_hits = u32::from_le_bytes(max_hits_bytes.try_into().unwrap()); + Ok(ScrollKeyAndStartOffset { + scroll_ulid, + start_offset: from, + max_hits, + }) + } +} + +#[cfg(test)] +mod tests { + + #[test] + fn test_scroll_id() {} +} diff --git a/quickwit/quickwit-search/src/search_job_placer.rs b/quickwit/quickwit-search/src/search_job_placer.rs index 3b28033958b..702670e51ef 100644 --- a/quickwit/quickwit-search/src/search_job_placer.rs +++ b/quickwit/quickwit-search/src/search_job_placer.rs @@ -66,8 +66,40 @@ impl SearchJobPlacer { } } +struct SocketAddrAndClient { + socket_addr: SocketAddr, + client: SearchServiceClient, +} + +impl Hash for SocketAddrAndClient { + fn hash(&self, hasher: &mut H) { + self.socket_addr.hash(hasher); + } +} + impl SearchJobPlacer { - /// Assign the given job to the clients. + /// Returns an iterator over the + pub async fn best_nodes_per_affinity( + &self, + affinity_key: &[u8], + ) -> impl Iterator { + let mut nodes: Vec = self + .searcher_pool + .all() + .await + .into_iter() + .map(|(socket_addr, client)| SocketAddrAndClient { + socket_addr, + client, + }) + .collect(); + sort_by_rendez_vous_hash(&mut nodes[..], affinity_key); + nodes + .into_iter() + .map(|socket_addr_and_client| socket_addr_and_client.client) + } + + /// Assign the given job to the clients /// Returns a list of pair (SocketAddr, `Vec`) /// /// When exclude_addresses filters all clients it is ignored. diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index fc0a8683505..86f9f0219e7 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -18,7 +18,9 @@ // along with this program. If not, see . use std::pin::Pin; +use std::str::FromStr; use std::sync::Arc; +use std::time::{Duration, Instant}; use async_trait::async_trait; use bytes::Bytes; @@ -27,9 +29,10 @@ use quickwit_config::SearcherConfig; use quickwit_doc_mapper::DocMapper; use quickwit_metastore::Metastore; use quickwit_proto::{ - FetchDocsRequest, FetchDocsResponse, LeafListTermsRequest, LeafListTermsResponse, - LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, LeafSearchStreamResponse, - ListTermsRequest, ListTermsResponse, SearchRequest, SearchResponse, SearchStreamRequest, + FetchDocsRequest, FetchDocsResponse, GetKvRequest, Hit, LeafListTermsRequest, + LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, + LeafSearchStreamResponse, ListTermsRequest, ListTermsResponse, PartialHit, PutKvRequest, + ScrollRequest, SearchRequest, SearchResponse, SearchStreamRequest, SnippetRequest, }; use quickwit_storage::{Cache, MemorySizedCache, QuickwitCache, StorageResolver}; use tantivy::aggregation::AggregationLimits; @@ -38,12 +41,16 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::info; use crate::leaf_cache::LeafSearchCache; +use crate::root::{fetch_docs_round, get_snippet_request}; +use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset}; use crate::search_stream::{leaf_search_stream, root_search_stream}; use crate::{ fetch_docs, leaf_list_terms, leaf_search, root_list_terms, root_search, ClusterClient, SearchError, }; +const SCROLL_CACHE_CAPACITY: usize = 1_000; + #[derive(Clone)] /// The search service implementation. pub struct SearchServiceImpl { @@ -51,6 +58,7 @@ pub struct SearchServiceImpl { storage_resolver: StorageResolver, cluster_client: ClusterClient, searcher_context: Arc, + search_after_cache: MiniKV, } /// Trait representing a search service. @@ -112,6 +120,18 @@ pub trait SearchService: 'static + Send + Sync { &self, request: LeafListTermsRequest, ) -> crate::Result; + + /// Performs a scroll request. + async fn scroll(&self, scroll_request: ScrollRequest) -> crate::Result; + + /// Stores a Key value in the local cache. + /// This operation is not distributed. The distribution logic lives in + /// the `ClusterClient`. + async fn put_kv(&self, put_kv: PutKvRequest); + + /// Gets the payload associated to a key in the local cache. + /// See also `put_kv(..)`. + async fn get_kv(&self, get_kv: GetKvRequest) -> Option>; } impl SearchServiceImpl { @@ -128,6 +148,7 @@ impl SearchServiceImpl { storage_resolver, cluster_client, searcher_context, + search_after_cache: MiniKV::default(), } } } @@ -149,7 +170,6 @@ impl SearchService for SearchServiceImpl { &self.cluster_client, ) .await?; - Ok(search_result) } @@ -188,7 +208,8 @@ impl SearchService for SearchServiceImpl { .storage_resolver .resolve(&Uri::from_well_formed(fetch_docs_request.index_uri)) .await?; - let search_request_opt = fetch_docs_request.search_request.as_ref(); + let snippet_request_opt: Option<&SnippetRequest> = + fetch_docs_request.snippet_request.as_ref(); let doc_mapper = deserialize_doc_mapper(&fetch_docs_request.doc_mapper)?; let fetch_docs_response = fetch_docs( self.searcher_context.clone(), @@ -196,7 +217,7 @@ impl SearchService for SearchServiceImpl { storage, &fetch_docs_request.split_offsets, doc_mapper, - search_request_opt, + snippet_request_opt, ) .await?; @@ -279,6 +300,109 @@ impl SearchService for SearchServiceImpl { Ok(leaf_search_response) } + + async fn scroll(&self, scroll_request: ScrollRequest) -> crate::Result { + let start = Instant::now(); + let current_scroll = ScrollKeyAndStartOffset::from_str(&scroll_request.scroll_id) + .map_err(|msg| SearchError::InvalidArgument(msg.to_string()))?; + let next_scroll = current_scroll.next_page(); + let start_offset = current_scroll.start_offset; + let scroll_id_bytes = current_scroll.scroll_key(); + let payload = self.cluster_client.get_kv(&scroll_id_bytes[..]).await; + let payload = payload + .ok_or_else(|| SearchError::InternalError("scroll key not found.".to_string()))?; + let ScrollContext { + index_uri, + doc_mapper_str, + query_ast, + split_metadatas, + mut search_request, + cached_partial_hits, + cached_partial_hits_start_offset, + total_num_hits, + max_hits_per_page, + } = ScrollContext::load(&payload) + .map_err(|_| SearchError::InternalError("Corrupted scroll context.".to_string()))?; + let index_id = search_request.index_id.clone(); + let snippet_request: Option = get_snippet_request(&search_request); + let partial_hits: Vec = if cached_partial_hits_start_offset <= start_offset + && cached_partial_hits_start_offset + cached_partial_hits.len() as u64 + >= start_offset + search_request.max_hits + { + let truncated_partial_hits = + &cached_partial_hits[(start_offset - cached_partial_hits_start_offset) as usize..]; + let num_partial_hits = truncated_partial_hits + .len() + .min(search_request.max_hits as usize); + truncated_partial_hits[..num_partial_hits].to_vec() + } else { + search_request.max_hits = SCROLL_CACHE_CAPACITY as u64; + search_request.start_offset = start_offset; + let leaf_search_response: LeafSearchResponse = crate::root::search_partial_hits_round( + &*self.searcher_context, + &search_request, + &index_uri, + &doc_mapper_str, + &split_metadatas[..], + &self.cluster_client, + ) + .await?; + let new_scroll_context = ScrollContext { + index_uri: index_uri.clone(), + doc_mapper_str: doc_mapper_str.clone(), + query_ast, + split_metadatas: split_metadatas.clone(), + search_request, + cached_partial_hits: leaf_search_response.partial_hits.clone(), + cached_partial_hits_start_offset, + total_num_hits, + max_hits_per_page, + }; + if let Some(scroll_ttl_secs) = scroll_request.scroll_ttl_secs { + let payload = new_scroll_context.serialize(); + let scroll_ttl = Duration::from_secs(scroll_ttl_secs as u64); + self.cluster_client + .put_kv(&next_scroll.scroll_key(), &payload, scroll_ttl) + .await; + } + // We fetch more hits than necessary, in order to amortize those search requests. + // Now that we have cached these partial results, we can truncate our list of partial + // hits. + let mut partial_hits = leaf_search_response.partial_hits; + partial_hits.truncate(max_hits_per_page as usize); + partial_hits + }; + let hits: Vec = fetch_docs_round( + &partial_hits[..], + &split_metadatas[..], + &index_id, + &index_uri, + &doc_mapper_str, + snippet_request, + &self.cluster_client, + ) + .await?; + Ok(SearchResponse { + hits, + num_hits: total_num_hits, + elapsed_time_micros: start.elapsed().as_micros() as u64, + scroll_id: Some(next_scroll.to_string()), + errors: Vec::new(), + aggregation: None, + }) + } + + async fn put_kv(&self, put_request: PutKvRequest) { + let ttl = Duration::from_secs(put_request.ttl_secs as u64); + self.search_after_cache + .put(put_request.key, put_request.payload, ttl) + .await; + } + + async fn get_kv(&self, get_request: GetKvRequest) -> Option> { + let payload: Vec = self.search_after_cache.get(&get_request.key).await?; + Some(payload) + } } /// [`SearcherContext`] provides a common set of variables diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index cd46e8e946c..a80a1deec8c 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -14,9 +14,10 @@ anyhow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } byte-unit = { workspace = true } -elasticsearch-dsl = "0.4.14" +elasticsearch-dsl = "0.4.15" futures = { workspace = true } futures-util = { workspace = true } +humantime = { workspace = true } http-serde = { workspace = true } hyper = { workspace = true } itertools = { workspace = true } diff --git a/quickwit/quickwit-serve/src/elastic_search_api/filter.rs b/quickwit/quickwit-serve/src/elastic_search_api/filter.rs index bcc65242ee2..a113105a717 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/filter.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/filter.rs @@ -24,7 +24,9 @@ use warp::reject::LengthRequired; use warp::{Filter, Rejection}; use super::model::MultiSearchQueryParams; -use crate::elastic_search_api::model::{ElasticIngestOptions, SearchBody, SearchQueryParams}; +use crate::elastic_search_api::model::{ + ElasticIngestOptions, ScrollQueryParams, SearchBody, SearchQueryParams, +}; const BODY_LENGTH_LIMIT: Byte = byte_unit::Byte::from_bytes(1_000_000); const CONTENT_LENGTH_LIMIT: Byte = byte_unit::Byte::from_bytes(10 * 1024 * 1024); // 10MiB @@ -157,3 +159,30 @@ pub(crate) fn elastic_multi_search_filter( .and(warp::post()) .and(serde_qs::warp::query(serde_qs::Config::default())) } + +fn merge_scroll_body_params( + from_query_string: ScrollQueryParams, + from_body: ScrollQueryParams, +) -> ScrollQueryParams { + ScrollQueryParams { + scroll: from_query_string.scroll.or(from_body.scroll), + scroll_id: from_query_string.scroll_id.or(from_body.scroll_id), + } +} + +#[utoipa::path(post, tag = "Search", path = "/_search/scroll")] +pub(crate) fn elastic_scroll_filter( +) -> impl Filter + Clone { + warp::path!("_elastic" / "_search" / "scroll") + .and(warp::body::content_length_limit( + BODY_LENGTH_LIMIT.get_bytes(), + )) + .and(warp::get().or(warp::post()).unify()) + .and(serde_qs::warp::query(serde_qs::Config::default())) + .and(json_or_empty()) + .map( + |scroll_query_params: ScrollQueryParams, scroll_body: ScrollQueryParams| { + merge_scroll_body_params(scroll_query_params, scroll_body) + }, + ) +} diff --git a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs index 35f7faf5e8c..ad51542e2b1 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs @@ -31,7 +31,7 @@ use quickwit_ingest::IngestServiceClient; use quickwit_search::SearchService; use rest_handler::{ es_compat_cluster_info_handler, es_compat_index_multi_search_handler, - es_compat_index_search_handler, es_compat_search_handler, + es_compat_index_search_handler, es_compat_scroll_handler, es_compat_search_handler, }; use serde::{Deserialize, Serialize}; use warp::{Filter, Rejection}; @@ -50,6 +50,7 @@ pub fn elastic_api_handlers( es_compat_cluster_info_handler(node_config, BuildInfo::get()) .or(es_compat_search_handler(search_service.clone())) .or(es_compat_index_search_handler(search_service.clone())) + .or(es_compat_scroll_handler(search_service.clone())) .or(es_compat_index_multi_search_handler(search_service)) .or(es_compat_bulk_handler(ingest_service.clone())) .or(es_compat_index_bulk_handler(ingest_service)) diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs index 3f1313df3e8..9a64c3e9358 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs @@ -21,6 +21,7 @@ mod bulk_body; mod bulk_query_params; mod error; mod multi_search; +mod scroll; mod search_body; mod search_query_params; @@ -31,6 +32,7 @@ pub use multi_search::{ MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse, }; use quickwit_proto::SortOrder; +pub use scroll::ScrollQueryParams; pub use search_body::SearchBody; pub use search_query_params::SearchQueryParams; diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/scroll.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/scroll.rs new file mode 100644 index 00000000000..56649b7057f --- /dev/null +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/scroll.rs @@ -0,0 +1,26 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use serde::Deserialize; + +#[derive(Deserialize, Default)] +pub struct ScrollQueryParams { + pub scroll: Option, + pub scroll_id: Option, +} diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/search_query_params.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/search_query_params.rs index 2afad504afd..c108f055fe1 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/model/search_query_params.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/search_query_params.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::str::FromStr; +use std::time::Duration; use quickwit_proto::SortOrder; use quickwit_query::BooleanOperand; @@ -189,6 +190,20 @@ impl SearchQueryParams { } Ok(Some(sort_fields)) } + + /// Returns the scroll duration supplied by the user. + /// + /// This function returns an error if the scroll duration is not in the expected format. (`40s` + /// etc.) + pub fn parse_scroll_ttl(&self) -> Result, SearchError> { + let Some(scroll_str) = self.scroll.as_ref() else { + return Ok(None); + }; + let duration: Duration = humantime::parse_duration(scroll_str).map_err(|_err| { + SearchError::InvalidArgument(format!("Invalid scroll duration: `{scroll_str}`")) + })?; + Ok(Some(duration)) + } } #[doc = "Whether to expand wildcard expression to concrete indices that are open, closed or both."] diff --git a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs index a21a9062c7f..bec9ba5e5cf 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs @@ -20,7 +20,7 @@ use std::collections::BTreeMap; use std::str::from_utf8; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use bytes::Bytes; use elasticsearch_dsl::search::{Hit as ElasticHit, SearchResponse as ElasticSearchResponse}; @@ -30,7 +30,7 @@ use hyper::StatusCode; use itertools::Itertools; use quickwit_common::truncate_str; use quickwit_config::NodeConfig; -use quickwit_proto::{SearchResponse, ServiceErrorCode}; +use quickwit_proto::{ScrollRequest, SearchResponse, ServiceErrorCode}; use quickwit_query::query_ast::{QueryAst, UserInputQuery}; use quickwit_query::BooleanOperand; use quickwit_search::{SearchError, SearchService}; @@ -39,11 +39,11 @@ use warp::{Filter, Rejection}; use super::filter::{ elastic_cluster_info_filter, elastic_index_search_filter, elastic_multi_search_filter, - elastic_search_filter, + elastic_scroll_filter, elastic_search_filter, }; use super::model::{ ElasticSearchError, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, - MultiSearchSingleResponse, SearchBody, SearchQueryParams, + MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams, }; use crate::format::BodyFormat; use crate::json_api_response::{make_json_api_response, ApiError, JsonApiResponse}; @@ -99,7 +99,17 @@ pub fn es_compat_index_search_handler( .map(make_elastic_api_response) } -/// POST _elastic/_msearch +/// GET or POST _elastic/_search/scroll +pub fn es_compat_scroll_handler( + search_service: Arc, +) -> impl Filter + Clone { + elastic_scroll_filter() + .and(with_arg(search_service)) + .then(es_scroll) + .map(make_elastic_api_response) +} + +/// POST _elastic/_search pub fn es_compat_index_multi_search_handler( search_service: Arc, ) -> impl Filter + Clone { @@ -162,6 +172,9 @@ fn build_request_for_es_api( ))); } + let scroll_duration: Option = search_params.parse_scroll_ttl()?; + let scroll_ttl_secs: Option = scroll_duration.map(|duration| duration.as_secs() as u32); + Ok(quickwit_proto::SearchRequest { index_id, query_ast: serde_json::to_string(&query_ast).expect("Failed to serialize QueryAst"), @@ -169,7 +182,10 @@ fn build_request_for_es_api( start_offset, aggregation_request, sort_fields, - ..Default::default() + start_timestamp: None, + end_timestamp: None, + snippet_fields: Vec::new(), + scroll_ttl_secs, }) } @@ -285,6 +301,32 @@ async fn es_compat_index_multi_search( Ok(multi_search_response) } +async fn es_scroll( + scroll_query_params: ScrollQueryParams, + search_service: Arc, +) -> Result { + let start_instant = Instant::now(); + let Some(scroll_id) = scroll_query_params.scroll_id.clone() else { + return Err(SearchError::InvalidArgument("Missing scroll_id".to_string()).into()); + }; + let scroll_ttl_secs: Option = if let Some(scroll_ttl) = scroll_query_params.scroll { + let scroll_ttl_duration = humantime::parse_duration(&scroll_ttl) + .map_err(|_| SearchError::InvalidArgument(format!("Scroll invalid: {}", scroll_ttl)))?; + Some(scroll_ttl_duration.as_secs() as u32) + } else { + None + }; + let scroll_request = ScrollRequest { + scroll_id, + scroll_ttl_secs, + }; + let search_response: SearchResponse = search_service.scroll(scroll_request).await?; + let mut search_response_rest: ElasticSearchResponse = + convert_to_es_search_response(search_response); + search_response_rest.took = start_instant.elapsed().as_millis() as u32; + Ok(search_response_rest) +} + fn convert_to_es_search_response(resp: SearchResponse) -> ElasticSearchResponse { let hits: Vec = resp.hits.into_iter().map(convert_hit).collect(); let aggregations: Option = if let Some(aggregation_json) = resp.aggregation { @@ -303,6 +345,7 @@ fn convert_to_es_search_response(resp: SearchResponse) -> ElasticSearchResponse hits, }, aggregations, + scroll_id: resp.scroll_id, ..Default::default() } } diff --git a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs index 86fbe34bad5..fc2f609bdae 100644 --- a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs +++ b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs @@ -23,7 +23,8 @@ use async_trait::async_trait; use futures::TryStreamExt; use quickwit_proto::{ convert_to_grpc_result, search_service_server as grpc, set_parent_span_from_request_metadata, - tonic, LeafSearchStreamRequest, LeafSearchStreamResponse, ServiceError, + tonic, GetKvRequest, GetKvResponse, LeafSearchStreamRequest, LeafSearchStreamResponse, + ServiceError, }; use quickwit_search::SearchService; use tracing::instrument; @@ -46,8 +47,8 @@ impl grpc::SearchService for GrpcSearchAdapter { ) -> Result, tonic::Status> { set_parent_span_from_request_metadata(request.metadata()); let search_request = request.into_inner(); - let search_res = self.0.root_search(search_request).await; - convert_to_grpc_result(search_res) + let search_result = self.0.root_search(search_request).await; + convert_to_grpc_result(search_result) } #[instrument(skip(self, request))] @@ -57,8 +58,8 @@ impl grpc::SearchService for GrpcSearchAdapter { ) -> Result, tonic::Status> { set_parent_span_from_request_metadata(request.metadata()); let leaf_search_request = request.into_inner(); - let leaf_search_res = self.0.leaf_search(leaf_search_request).await; - convert_to_grpc_result(leaf_search_res) + let leaf_search_result = self.0.leaf_search(leaf_search_request).await; + convert_to_grpc_result(leaf_search_result) } #[instrument(skip(self, request))] @@ -68,8 +69,8 @@ impl grpc::SearchService for GrpcSearchAdapter { ) -> Result, tonic::Status> { set_parent_span_from_request_metadata(request.metadata()); let fetch_docs_request = request.into_inner(); - let fetch_docs_res = self.0.fetch_docs(fetch_docs_request).await; - convert_to_grpc_result(fetch_docs_res) + let fetch_docs_result = self.0.fetch_docs(fetch_docs_request).await; + convert_to_grpc_result(fetch_docs_result) } type LeafSearchStreamStream = std::pin::Pin< @@ -102,8 +103,8 @@ impl grpc::SearchService for GrpcSearchAdapter { ) -> Result, tonic::Status> { set_parent_span_from_request_metadata(request.metadata()); let search_request = request.into_inner(); - let search_res = self.0.root_list_terms(search_request).await; - convert_to_grpc_result(search_res) + let search_result = self.0.root_list_terms(search_request).await; + convert_to_grpc_result(search_result) } #[instrument(skip(self, request))] @@ -113,7 +114,39 @@ impl grpc::SearchService for GrpcSearchAdapter { ) -> Result, tonic::Status> { set_parent_span_from_request_metadata(request.metadata()); let leaf_search_request = request.into_inner(); - let leaf_search_res = self.0.leaf_list_terms(leaf_search_request).await; - convert_to_grpc_result(leaf_search_res) + let leaf_search_result = self.0.leaf_list_terms(leaf_search_request).await; + convert_to_grpc_result(leaf_search_result) + } + + async fn scroll( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let scroll_request = request.into_inner(); + let scroll_result = self.0.scroll(scroll_request).await; + convert_to_grpc_result(scroll_result) + } + + #[instrument(skip(self, request))] + async fn put_kv( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + set_parent_span_from_request_metadata(request.metadata()); + let put_request = request.into_inner(); + self.0.put_kv(put_request).await; + Ok(tonic::Response::new(quickwit_proto::PutKvResponse {})) + } + + #[instrument(skip(self, request))] + async fn get_kv( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + set_parent_span_from_request_metadata(request.metadata()); + let get_search_after_context_request = request.into_inner(); + let payload = self.0.get_kv(get_search_after_context_request).await; + let get_response = GetKvResponse { payload }; + Ok(tonic::Response::new(get_response)) } } diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index 566658d25e7..8721ac1fbb1 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -221,6 +221,7 @@ async fn search_endpoint( .aggs .map(|agg| serde_json::to_string(&agg).expect("could not serialize JsonValue")), sort_fields: search_request.sort_by.sort_fields, + scroll_ttl_secs: None, }; let search_response = search_service.root_search(search_request).await?; let search_response_rest = SearchResponseRest::try_from(search_response)?; diff --git a/quickwit/rest-api-tests/run_tests.py b/quickwit/rest-api-tests/run_tests.py index 7874f6fa395..52460c90264 100755 --- a/quickwit/rest-api-tests/run_tests.py +++ b/quickwit/rest-api-tests/run_tests.py @@ -40,13 +40,15 @@ def open_scenario(scenario_filepath): if type(step_dict) == dict: yield step_dict -def run_step(step): +def run_step(step, previous_result): + result = {} if "method" in step: methods = step["method"] if type(methods) != list: methods = [methods] for method in methods: - run_request_step(method, step) + result = run_request_step(method, step, previous_result) + return result def load_data(path): if path.endswith("gz"): @@ -66,7 +68,24 @@ def run_request_with_retry(run_req, expected_status_code=None, num_retries=10, w raise Exception("Wrong status code. Got %s, expected %s" % (r.status_code, expected_status_code)) -def run_request_step(method, step): +def resolve_previous_result(c, previous_result): + if type(c) == dict: + result = {} + if len(c) == 1 and "$previous" in c: + return eval(c["$previous"], None, {"val": previous_result}) + for (k, v) in c.items(): + result[k] = resolve_previous_result(v, previous_result) + return result + if type(c) == list: + return [ + resolve_previous_result(v, previous_result) + for v in c + ] + return c + +print(resolve_previous_result({"hello": {"$previous": "val[\"scroll_id\"]"}}, {"scroll_id": "123"})) + +def run_request_step(method, step, previous_result): assert method in {"GET", "POST", "PUT", "DELETE"} if "headers" not in step: step["headers"] = {'user-agent': 'my-app/0.0.1'} @@ -82,6 +101,7 @@ def run_request_step(method, step): if body_from_file is not None: body_from_file = osp.join(step["cwd"], body_from_file) kvargs["data"] = load_data(body_from_file) + kvargs = resolve_previous_result(kvargs, previous_result) ndjson = step.get("ndjson", None) if ndjson is not None: kvargs["data"] = "\n".join([json.dumps(doc) for doc in ndjson]) @@ -91,12 +111,14 @@ def run_request_step(method, step): run_req = lambda : method_req(url, **kvargs) r = run_request_with_retry(run_req, expected_status_code, num_retries) expected_resp = step.get("expected", None) + json_res = r.json() if expected_resp is not None: try: - check_result(r.json(), expected_resp) + check_result(json_res, expected_resp, context_path="") except Exception as e: - print(json.dumps(r.json(), indent=2)) + print(json.dumps(json_res, indent=2)) raise e + return json_res def check_result(result, expected, context_path = ""): if type(expected) == dict and "$expect" in expected: @@ -211,6 +233,7 @@ def run_scenario(self, path, script): steps = list(open_scenario(scenario_path)) num_steps_executed = 0 num_steps_skipped = 0 + previous_result = {} for (i, step) in enumerate(steps, 1): step = stack_dicts(self.context, step) applicable_engine = step.get("engines", None) @@ -219,7 +242,7 @@ def run_scenario(self, path, script): num_steps_skipped += 1 continue try: - run_step(step) + previous_result = run_step(step, previous_result) num_steps_executed += 1 except Exception as e: print("🔴 %s" % scenario_path) diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0011-scroll-api.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0011-scroll-api.yaml new file mode 100644 index 00000000000..3e9703564af --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0011-scroll-api.yaml @@ -0,0 +1,74 @@ +params: + size: 1 + scroll: 30m +json: + query: + match_all: {} + sort: + - actor.id: + order: desc + aggs: + mytypeagg: + terms: + field: type + size: 5 +store: + scroll_id: _scroll_id +expected: + _scroll_id: + $expect: "len(val) > 4" + aggregations: + mytypeagg: {} + hits: + hits: + - _source: {actor: {login: "miyuotsuki"}} + total: + value: 100 + relation: "eq" +--- +method: GET +endpoint: "_search/scroll" +params: + scroll: 30m +json: + scroll_id: + $previous: "val[\"_scroll_id\"]" +expected: + _scroll_id: + $expect: "len(val) > 4" + hits: + hits: + - _source: {actor: {login: "ScottThiessen"}} + total: + value: 100 +--- +method: GET +endpoint: "_search/scroll" +params: + scroll: 30m +json: + scroll_id: + $previous: "val[\"_scroll_id\"]" +expected: + _scroll_id: + $expect: "len(val) > 4" + hits: + hits: + - _source: {actor: {login: "seenajon"}} + total: + value: 100 +--- +params: + size: 1 + scroll: 31m +json: + query: + match_all: {} + sort: + - actor.id: + order: desc +status_code: 400 +expected: + status: 400 + error: + reason: "Invalid argument: Quickwit only supports scroll TTL period up to 1800 secs."