diff --git a/docs/internals/scroll.md b/docs/internals/scroll.md
new file mode 100644
index 00000000000..524f0549f20
--- /dev/null
+++ b/docs/internals/scroll.md
@@ -0,0 +1,66 @@
+# Scroll API
+
+The scroll API has been implemented to offer compatibility with ElasticSearch.
+The API and the implementation are quirky and are detailed in this document.
+
+## API description
+
+You can find information about the scroll API here.
+https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html#scroll-search-results
+https://www.elastic.co/guide/en/elasticsearch/reference/current/scroll-api.html
+
+The user runs a regular search request with a `scroll` param.
+The search result then contains the normal response, but a `_scroll` property is added to the search body.
+
+That id is then meant to be sent to a scroll rest API.
+This API successive calls will then return pages incrementally.
+
+## Quirk and difficulty.
+
+The scrolled results should be consistent with the state of the original index.
+For this reason we need to capture the state of the index at the point of the original request.
+
+The elasticsearch API is needlessly broken as it returns the same scroll_id over and over.
+If a network error happens between the client and the server at page N, there is no way for the client to ask the reemission of page N.
+Page N+1 will be returned on the next call.
+
+
+## Implementation
+
+The scroll context contains:
+- the detail about the original query (we need to be able to reemit paginated queries)
+- the list of split metadata used for the query
+- a cached list of partial docs (= not the doc content, just its address and its score) to avoid
+- the total number of results, in order to append that information to our response.
+searching at every single scroll requests.
+
+We use a simple leaderless KV store to keep the state required to run the scroll API.
+We generate a scroll ULID and use it to get a list of the servers with the best affinity according
+to rendez vous hashing. We then go through them in order and attempt to put that key on up to 2 servers.
+Failures for these PUTs are silent.
+
+For each call to scroll, one of two things can happen:
+- the partial docs for the page requested is in the partial doc cache. We just run the fetch_docs phase,
+and update the context with the `start_offset`.
+- the partial docs for the page request are not in the partial doc cache. We then run a new search query.
+
+We attempt to fetch `SCROLL_CACHE_CAPACITY` in order to fill the partial doc address cache for subsequent calls.
+
+# A strange `scroll_id`.
+
+For more robustness, the scroll id is the concatenation of the
+- ULID: used as the address for the search context.
+- the start_offset.
+
+The idea here is that if that if the put request failed, we can still return the right results even if we have an obsolete version of the `ScrollContext`.
+We indeed take the max of the start_offset supplied in the `scroll_id` and present in the `ScrollContext`.
+
+# Quickwit specific quirks
+
+Our state is pretty volatile. Some scrolls may end up being broken if we were to remove 2 servers within 30mn.
+
+The scroll lifetime does not extend the life of a scroll context.
+We do not anything to prevent splits from being GCed and only rely on the grace period to make sure this does not happen.
+
+The ES API does not always updates the `_scroll_id`. It does not seem to change in the different calls.
+A misimplemented client might therefore appear to work correctly on one shard with elasticsearch and not work on quickwit.
diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock
index c67b6bd30e9..64a8ab2442e 100644
--- a/quickwit/Cargo.lock
+++ b/quickwit/Cargo.lock
@@ -5472,6 +5472,7 @@ dependencies = [
"anyhow",
"assert-json-diff 2.0.2",
"async-trait",
+ "base64 0.21.2",
"bytes",
"chitchat",
"fnv",
@@ -5510,6 +5511,8 @@ dependencies = [
"tower",
"tracing",
"tracing-opentelemetry",
+ "ttl_cache",
+ "ulid",
"utoipa",
]
@@ -5527,6 +5530,7 @@ dependencies = [
"futures",
"futures-util",
"http-serde",
+ "humantime",
"hyper",
"itertools 0.11.0",
"mime_guess",
@@ -7733,6 +7737,15 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
+[[package]]
+name = "ttl_cache"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4189890526f0168710b6ee65ceaedf1460c48a14318ceec933cb26baa492096a"
+dependencies = [
+ "linked-hash-map",
+]
+
[[package]]
name = "tungstenite"
version = "0.18.0"
diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml
index 1b567cdd32a..cdeab4a6fed 100644
--- a/quickwit/Cargo.toml
+++ b/quickwit/Cargo.toml
@@ -174,6 +174,7 @@ tracing-subscriber = { version = "0.3.16", features = [
"std",
"env-filter",
] }
+ttl_cache = "0.5"
typetag = "0.2"
ulid = "1.0"
username = "0.2"
diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs
index 5b390432416..9b1fe9a4a9d 100644
--- a/quickwit/quickwit-common/src/lib.rs
+++ b/quickwit/quickwit-common/src/lib.rs
@@ -32,6 +32,7 @@ pub mod pubsub;
pub mod rand;
pub mod rendezvous_hasher;
pub mod runtimes;
+pub mod shared_consts;
pub mod sorted_iter;
pub mod stream_utils;
diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs
new file mode 100644
index 00000000000..9ead79f9570
--- /dev/null
+++ b/quickwit/quickwit-common/src/shared_consts.rs
@@ -0,0 +1,30 @@
+// Copyright (C) 2023 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use std::time::Duration;
+
+/// We cannot safely delete splits right away as a:
+/// - in-flight queries could actually have selected this split,
+/// - scroll queries may also have a point in time on these splits.
+///
+/// We deal this probably by introducing a grace period. A split is first marked as delete,
+/// and hence won't be selected for search. After a few minutes, once it reasonably safe to assume
+/// that all queries involving this split have terminated, we effectively delete the split.
+/// This duration is controlled by `DELETION_GRACE_PERIOD`.
+pub const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(60 * 32); // 32 min
diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs
index ddfe2b79504..47090cc4228 100644
--- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs
+++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs
@@ -26,6 +26,7 @@ use async_trait::async_trait;
use futures::{stream, StreamExt};
use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, Handler};
+use quickwit_common::shared_consts::DELETION_GRACE_PERIOD;
use quickwit_metastore::Metastore;
use quickwit_storage::StorageResolver;
use serde::Serialize;
@@ -40,14 +41,6 @@ const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes
/// the grace period strategy should do the job for the moment.
const STAGED_GRACE_PERIOD: Duration = Duration::from_secs(60 * 60 * 24); // 24 hours
-/// We cannot safely delete splits right away as a in-flight queries could actually
-/// have selected this split.
-/// We deal this probably by introducing a grace period. A split is first marked as delete,
-/// and hence won't be selected for search. After a few minutes, once it reasonably safe to assume
-/// that all queries involving this split have terminated, we effectively delete the split.
-/// This duration is controlled by `DELETION_GRACE_PERIOD`.
-const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(120); // 2 min
-
const MAX_CONCURRENT_GC_TASKS: usize = if cfg!(test) { 2 } else { 10 };
#[derive(Clone, Debug, Default, Serialize)]
@@ -210,6 +203,7 @@ mod tests {
use std::path::Path;
use quickwit_actors::Universe;
+ use quickwit_common::shared_consts::DELETION_GRACE_PERIOD;
use quickwit_metastore::{
IndexMetadata, ListSplitsQuery, MetastoreError, MockMetastore, Split, SplitMetadata,
SplitState,
diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto
index 6fac124a732..55ade13b0b6 100644
--- a/quickwit/quickwit-proto/protos/quickwit/search.proto
+++ b/quickwit/quickwit-proto/protos/quickwit/search.proto
@@ -52,13 +52,47 @@ service SearchService {
// It is also in charge of merging back the results.
rpc RootListTerms(ListTermsRequest) returns (ListTermsResponse);
- // Perform a leaf list terms on a given set of splits.
+ // Performs a leaf list terms on a given set of splits.
//
// It is like a regular list term except that:
// - the node should perform the listing locally instead of dispatching
// it to other nodes.
// - it should be applied on the given subset of splits
rpc LeafListTerms(LeafListTermsRequest) returns (LeafListTermsResponse);
+
+ // Performs a scroll request.
+ rpc Scroll(ScrollRequest) returns (SearchResponse);
+
+ // gRPC request used to store a key in the local storage of the targetted node.
+ // This RPC is used in the mini distributed immutable KV store embedded in quickwit.
+ rpc PutKV(PutKVRequest) returns (PutKVResponse);
+
+ // Gets a key from the local storage of the targetted node.
+ // This RPC is used in the mini distributed immutable KV store embedded in quickwit.
+ rpc GetKV(GetKVRequest) returns (GetKVResponse);
+}
+
+/// Scroll Request
+message ScrollRequest {
+ /// The `scroll_id` is the given in the response of a search request including a scroll.
+ string scroll_id = 1;
+ optional uint32 scroll_ttl_secs = 2;
+}
+
+message PutKVRequest {
+ bytes key = 1;
+ bytes payload = 2;
+ uint32 ttl_secs = 3;
+}
+
+message PutKVResponse {}
+
+message GetKVRequest {
+ bytes key = 1;
+}
+
+message GetKVResponse {
+ optional bytes payload = 1;
}
// -- Search -------------------
@@ -109,6 +143,11 @@ message SearchRequest {
// Optional sort by one or more fields (limited to 2 at the moment).
repeated SortField sort_fields = 14;
+
+ // If set, the search response will include a search id
+ // that will make it possible to paginate through the results
+ // in a consistent manner.
+ optional uint32 scroll_ttl_secs = 15;
}
message SortField {
@@ -117,9 +156,9 @@ message SortField {
}
enum SortOrder {
- /// Ascending order.
+ // Ascending order.
ASC = 0;
- /// Descending order.
+ // Descending order.
DESC = 1; //< This will be the default value;
}
@@ -138,6 +177,8 @@ message SearchResponse {
// Serialized aggregation response
optional string aggregation = 5;
+ // Scroll Id (only set if scroll_secs was set in the request)
+ optional string scroll_id = 6;
}
message SplitSearchError {
@@ -183,19 +224,19 @@ message SplitIdAndFooterOffsets {
optional int64 timestamp_end = 5;
}
-/// Hits returned by a FetchDocRequest.
-///
-/// The json that is joined is the raw tantivy json doc.
-/// It is very different from a quickwit json doc.
-///
-/// For instance:
-/// - it may contain a _source and a _dynamic field.
-/// - since tantivy has no notion of cardinality,
-/// all fields is are arrays.
-/// - since tantivy has no notion of object, the object is
-/// flattened by concatenating the path to the root.
-///
-/// See `quickwit_search::convert_leaf_hit`
+// Hits returned by a FetchDocRequest.
+//
+// The json that is joined is the raw tantivy json doc.
+// It is very different from a quickwit json doc.
+//
+// For instance:
+// - it may contain a _source and a _dynamic field.
+// - since tantivy has no notion of cardinality,
+// all fields is are arrays.
+// - since tantivy has no notion of object, the object is
+// flattened by concatenating the path to the root.
+//
+// See `quickwit_search::convert_leaf_hit`
message LeafHit {
// The actual content of the hit/
string leaf_json = 1;
@@ -280,6 +321,11 @@ message LeafSearchResponse {
optional bytes intermediate_aggregation_result = 6;
}
+message SnippetRequest {
+ repeated string snippet_fields = 1;
+ string query_ast_resolved = 2;
+}
+
message FetchDocsRequest {
// Request fetching the content of a given list of partial_hits.
repeated PartialHit partial_hits = 1;
@@ -296,12 +342,12 @@ message FetchDocsRequest {
// split files.
string index_uri = 4;
- // Search request. This is a perfect copy of the original search request,
- // that was sent to root apart from the start_offset & max_hits params.
- SearchRequest search_request = 5;
+ optional SnippetRequest snippet_request = 7;
// `DocMapper` as json serialized trait.
string doc_mapper = 6;
+
+ reserved 5;
}
message FetchDocsResponse {
@@ -372,11 +418,11 @@ message LeafListTermsResponse {
// -- Stream -------------------
enum OutputFormat {
- /// Comma Separated Values format (https://datatracker.ietf.org/doc/html/rfc4180).
- /// The delimiter is `,`.
+ // Comma Separated Values format (https://datatracker.ietf.org/doc/html/rfc4180).
+ // The delimiter is `,`.
CSV = 0; //< This will be the default value
- /// Format data by row in ClickHouse binary format.
- /// https://clickhouse.tech/docs/en/interfaces/formats/#rowbinary
+ // Format data by row in ClickHouse binary format.
+ // https://clickhouse.tech/docs/en/interfaces/formats/#rowbinary
CLICK_HOUSE_ROW_BINARY = 1;
}
@@ -409,7 +455,7 @@ message SearchStreamRequest {
optional string partition_by_field = 9;
// Fields to extract snippet on.
- repeated string snippet_fields = 10;
+ repeated string snippet_fields = 10;
}
message LeafSearchStreamRequest {
diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs
index d9c6acd532f..86a51d0c614 100644
--- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs
+++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs
@@ -1,3 +1,43 @@
+/// / Scroll Request
+#[derive(Serialize, Deserialize, utoipa::ToSchema)]
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ScrollRequest {
+ /// / The `scroll_id` is the given in the response of a search request including a scroll.
+ #[prost(string, tag = "1")]
+ pub scroll_id: ::prost::alloc::string::String,
+ #[prost(uint32, optional, tag = "2")]
+ pub scroll_ttl_secs: ::core::option::Option,
+}
+#[derive(Serialize, Deserialize, utoipa::ToSchema)]
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PutKvRequest {
+ #[prost(bytes = "vec", tag = "1")]
+ pub key: ::prost::alloc::vec::Vec,
+ #[prost(bytes = "vec", tag = "2")]
+ pub payload: ::prost::alloc::vec::Vec,
+ #[prost(uint32, tag = "3")]
+ pub ttl_secs: u32,
+}
+#[derive(Serialize, Deserialize, utoipa::ToSchema)]
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct PutKvResponse {}
+#[derive(Serialize, Deserialize, utoipa::ToSchema)]
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct GetKvRequest {
+ #[prost(bytes = "vec", tag = "1")]
+ pub key: ::prost::alloc::vec::Vec,
+}
+#[derive(Serialize, Deserialize, utoipa::ToSchema)]
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct GetKvResponse {
+ #[prost(bytes = "vec", optional, tag = "1")]
+ pub payload: ::core::option::Option<::prost::alloc::vec::Vec>,
+}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[derive(Eq, Hash)]
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -35,6 +75,11 @@ pub struct SearchRequest {
/// Optional sort by one or more fields (limited to 2 at the moment).
#[prost(message, repeated, tag = "14")]
pub sort_fields: ::prost::alloc::vec::Vec,
+ /// If set, the search response will include a search id
+ /// that will make it possible to paginate through the results
+ /// in a consistent manner.
+ #[prost(uint32, optional, tag = "15")]
+ pub scroll_ttl_secs: ::core::option::Option,
}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[derive(Eq, Hash)]
@@ -66,6 +111,9 @@ pub struct SearchResponse {
/// Serialized aggregation response
#[prost(string, optional, tag = "5")]
pub aggregation: ::core::option::Option<::prost::alloc::string::String>,
+ /// Scroll Id (only set if scroll_secs was set in the request)
+ #[prost(string, optional, tag = "6")]
+ pub scroll_id: ::core::option::Option<::prost::alloc::string::String>,
}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -122,19 +170,19 @@ pub struct SplitIdAndFooterOffsets {
#[prost(int64, optional, tag = "5")]
pub timestamp_end: ::core::option::Option,
}
-/// / Hits returned by a FetchDocRequest.
-/// /
-/// / The json that is joined is the raw tantivy json doc.
-/// / It is very different from a quickwit json doc.
-/// /
-/// / For instance:
-/// / - it may contain a _source and a _dynamic field.
-/// / - since tantivy has no notion of cardinality,
-/// / all fields is are arrays.
-/// / - since tantivy has no notion of object, the object is
-/// / flattened by concatenating the path to the root.
-/// /
-/// / See `quickwit_search::convert_leaf_hit`
+/// Hits returned by a FetchDocRequest.
+///
+/// The json that is joined is the raw tantivy json doc.
+/// It is very different from a quickwit json doc.
+///
+/// For instance:
+/// - it may contain a _source and a _dynamic field.
+/// - since tantivy has no notion of cardinality,
+/// all fields is are arrays.
+/// - since tantivy has no notion of object, the object is
+/// flattened by concatenating the path to the root.
+///
+/// See `quickwit_search::convert_leaf_hit`
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -246,6 +294,15 @@ pub struct LeafSearchResponse {
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct SnippetRequest {
+ #[prost(string, repeated, tag = "1")]
+ pub snippet_fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+ #[prost(string, tag = "2")]
+ pub query_ast_resolved: ::prost::alloc::string::String,
+}
+#[derive(Serialize, Deserialize, utoipa::ToSchema)]
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FetchDocsRequest {
/// Request fetching the content of a given list of partial_hits.
#[prost(message, repeated, tag = "1")]
@@ -262,10 +319,8 @@ pub struct FetchDocsRequest {
/// split files.
#[prost(string, tag = "4")]
pub index_uri: ::prost::alloc::string::String,
- /// Search request. This is a perfect copy of the original search request,
- /// that was sent to root apart from the start_offset & max_hits params.
- #[prost(message, optional, tag = "5")]
- pub search_request: ::core::option::Option,
+ #[prost(message, optional, tag = "7")]
+ pub snippet_request: ::core::option::Option,
/// `DocMapper` as json serialized trait.
#[prost(string, tag = "6")]
pub doc_mapper: ::prost::alloc::string::String,
@@ -418,9 +473,9 @@ pub struct LeafSearchStreamResponse {
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum SortOrder {
- /// / Ascending order.
+ /// Ascending order.
Asc = 0,
- /// / Descending order.
+ /// Descending order.
///
/// < This will be the default value;
Desc = 1,
@@ -450,13 +505,13 @@ impl SortOrder {
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum OutputFormat {
- /// / Comma Separated Values format ().
- /// / The delimiter is `,`.
+ /// Comma Separated Values format ().
+ /// The delimiter is `,`.
///
/// < This will be the default value
Csv = 0,
- /// / Format data by row in ClickHouse binary format.
- /// /
+ /// Format data by row in ClickHouse binary format.
+ ///
ClickHouseRowBinary = 1,
}
impl OutputFormat {
@@ -710,7 +765,7 @@ pub mod search_service_client {
);
self.inner.unary(req, path, codec).await
}
- /// Perform a leaf list terms on a given set of splits.
+ /// Performs a leaf list terms on a given set of splits.
///
/// It is like a regular list term except that:
/// - the node should perform the listing locally instead of dispatching
@@ -743,6 +798,77 @@ pub mod search_service_client {
);
self.inner.unary(req, path, codec).await
}
+ /// Performs a scroll request.
+ pub async fn scroll(
+ &mut self,
+ request: impl tonic::IntoRequest,
+ ) -> std::result::Result, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/quickwit.search.SearchService/Scroll",
+ );
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(GrpcMethod::new("quickwit.search.SearchService", "Scroll"));
+ self.inner.unary(req, path, codec).await
+ }
+ /// gRPC request used to store a key in the local storage of the targetted node.
+ /// This RPC is used in the mini distributed immutable KV store embedded in quickwit.
+ pub async fn put_kv(
+ &mut self,
+ request: impl tonic::IntoRequest,
+ ) -> std::result::Result, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/quickwit.search.SearchService/PutKV",
+ );
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(GrpcMethod::new("quickwit.search.SearchService", "PutKV"));
+ self.inner.unary(req, path, codec).await
+ }
+ /// Gets a key from the local storage of the targetted node.
+ /// This RPC is used in the mini distributed immutable KV store embedded in quickwit.
+ pub async fn get_kv(
+ &mut self,
+ request: impl tonic::IntoRequest,
+ ) -> std::result::Result, tonic::Status> {
+ self.inner
+ .ready()
+ .await
+ .map_err(|e| {
+ tonic::Status::new(
+ tonic::Code::Unknown,
+ format!("Service was not ready: {}", e.into()),
+ )
+ })?;
+ let codec = tonic::codec::ProstCodec::default();
+ let path = http::uri::PathAndQuery::from_static(
+ "/quickwit.search.SearchService/GetKV",
+ );
+ let mut req = request.into_request();
+ req.extensions_mut()
+ .insert(GrpcMethod::new("quickwit.search.SearchService", "GetKV"));
+ self.inner.unary(req, path, codec).await
+ }
}
}
/// Generated server implementations.
@@ -813,7 +939,7 @@ pub mod search_service_server {
tonic::Response,
tonic::Status,
>;
- /// Perform a leaf list terms on a given set of splits.
+ /// Performs a leaf list terms on a given set of splits.
///
/// It is like a regular list term except that:
/// - the node should perform the listing locally instead of dispatching
@@ -826,6 +952,23 @@ pub mod search_service_server {
tonic::Response,
tonic::Status,
>;
+ /// Performs a scroll request.
+ async fn scroll(
+ &self,
+ request: tonic::Request,
+ ) -> std::result::Result, tonic::Status>;
+ /// gRPC request used to store a key in the local storage of the targetted node.
+ /// This RPC is used in the mini distributed immutable KV store embedded in quickwit.
+ async fn put_kv(
+ &self,
+ request: tonic::Request,
+ ) -> std::result::Result, tonic::Status>;
+ /// Gets a key from the local storage of the targetted node.
+ /// This RPC is used in the mini distributed immutable KV store embedded in quickwit.
+ async fn get_kv(
+ &self,
+ request: tonic::Request,
+ ) -> std::result::Result, tonic::Status>;
}
#[derive(Debug)]
pub struct SearchServiceServer {
@@ -1178,6 +1321,136 @@ pub mod search_service_server {
};
Box::pin(fut)
}
+ "/quickwit.search.SearchService/Scroll" => {
+ #[allow(non_camel_case_types)]
+ struct ScrollSvc(pub Arc);
+ impl<
+ T: SearchService,
+ > tonic::server::UnaryService
+ for ScrollSvc {
+ type Response = super::SearchResponse;
+ type Future = BoxFuture<
+ tonic::Response,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request,
+ ) -> Self::Future {
+ let inner = Arc::clone(&self.0);
+ let fut = async move { (*inner).scroll(request).await };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let max_decoding_message_size = self.max_decoding_message_size;
+ let max_encoding_message_size = self.max_encoding_message_size;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = ScrollSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/quickwit.search.SearchService/PutKV" => {
+ #[allow(non_camel_case_types)]
+ struct PutKVSvc(pub Arc);
+ impl<
+ T: SearchService,
+ > tonic::server::UnaryService for PutKVSvc {
+ type Response = super::PutKvResponse;
+ type Future = BoxFuture<
+ tonic::Response,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request,
+ ) -> Self::Future {
+ let inner = Arc::clone(&self.0);
+ let fut = async move { (*inner).put_kv(request).await };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let max_decoding_message_size = self.max_decoding_message_size;
+ let max_encoding_message_size = self.max_encoding_message_size;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = PutKVSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
+ "/quickwit.search.SearchService/GetKV" => {
+ #[allow(non_camel_case_types)]
+ struct GetKVSvc(pub Arc);
+ impl<
+ T: SearchService,
+ > tonic::server::UnaryService for GetKVSvc {
+ type Response = super::GetKvResponse;
+ type Future = BoxFuture<
+ tonic::Response,
+ tonic::Status,
+ >;
+ fn call(
+ &mut self,
+ request: tonic::Request,
+ ) -> Self::Future {
+ let inner = Arc::clone(&self.0);
+ let fut = async move { (*inner).get_kv(request).await };
+ Box::pin(fut)
+ }
+ }
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
+ let max_decoding_message_size = self.max_decoding_message_size;
+ let max_encoding_message_size = self.max_encoding_message_size;
+ let inner = self.inner.clone();
+ let fut = async move {
+ let inner = inner.0;
+ let method = GetKVSvc(inner);
+ let codec = tonic::codec::ProstCodec::default();
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ )
+ .apply_max_message_size_config(
+ max_decoding_message_size,
+ max_encoding_message_size,
+ );
+ let res = grpc.unary(method, req).await;
+ Ok(res)
+ };
+ Box::pin(fut)
+ }
_ => {
Box::pin(async move {
Ok(
diff --git a/quickwit/quickwit-search/Cargo.toml b/quickwit/quickwit-search/Cargo.toml
index 51708d295a2..727bbf33925 100644
--- a/quickwit/quickwit-search/Cargo.toml
+++ b/quickwit/quickwit-search/Cargo.toml
@@ -12,6 +12,7 @@ documentation = "https://quickwit.io/docs/"
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
+base64 = { workspace = true }
bytes = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
@@ -33,9 +34,11 @@ tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tower = { workspace = true }
+ttl_cache = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
opentelemetry = { workspace = true }
+ulid = { workspace = true }
utoipa = { workspace = true }
quickwit-cluster = { workspace = true }
diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs
index 10ca144fd39..9b2aa32e4ea 100644
--- a/quickwit/quickwit-search/src/client.rs
+++ b/quickwit/quickwit-search/src/client.rs
@@ -26,7 +26,9 @@ use futures::{StreamExt, TryStreamExt};
use http::Uri;
use quickwit_proto::tonic::codegen::InterceptedService;
use quickwit_proto::tonic::transport::Endpoint;
-use quickwit_proto::{tonic, LeafSearchStreamResponse, SpanContextInterceptor};
+use quickwit_proto::{
+ tonic, GetKvRequest, LeafSearchStreamResponse, PutKvRequest, SpanContextInterceptor,
+};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::transport::Channel;
use tonic::Request;
@@ -227,6 +229,44 @@ impl SearchServiceClient {
SearchServiceClientImpl::Local(service) => service.leaf_list_terms(request).await,
}
}
+
+ /// Gets the value associated to a key stored locally in the targetted node.
+ /// This call is not "distributed".
+ /// If the key is not present on the targetted search `None` is simply returned.
+ pub async fn get_kv(&mut self, get_kv_req: GetKvRequest) -> crate::Result