Skip to content

Commit

Permalink
Added Scroll API
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jul 6, 2023
1 parent a026084 commit 5a7fa16
Show file tree
Hide file tree
Showing 23 changed files with 993 additions and 62 deletions.
13 changes: 13 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ tracing-subscriber = { version = "0.3.16", features = [
"std",
"env-filter",
] }
ttl_cache = "0.5"
typetag = "0.2"
ulid = "1.0"
username = "0.2"
Expand Down
31 changes: 31 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,30 @@ service SearchService {
// it to other nodes.
// - it should be applied on the given subset of splits
rpc LeafListTerms(LeafListTermsRequest) returns (LeafListTermsResponse);

rpc Scroll(ScrollRequest) returns (SearchResponse);
rpc PutKV(PutKVRequest) returns (PutKVResponse);
rpc GetKV(GetKVRequest) returns (GetKVResponse);
}

message ScrollRequest {
string scroll_id = 1;
}

message PutKVRequest {
bytes key = 1;
bytes payload = 2;
uint32 ttl_secs = 3;
}

message PutKVResponse {}

message GetKVRequest {
bytes key = 1;
}

message GetKVResponse {
optional bytes payload = 1;
}

// -- Search -------------------
Expand Down Expand Up @@ -110,6 +134,11 @@ message SearchRequest {

// Fields to extract snippet on
repeated string snippet_fields = 12;

// If set, the search response will include a search id
// that will make it possible to paginate through the results
// in a consistent manner.
optional uint32 scroll_ttl_secs = 14;
}

enum SortOrder {
Expand All @@ -134,6 +163,8 @@ message SearchResponse {
// Serialized aggregation response
optional string aggregation = 5;

// Scroll Id (only set if scroll_secs was set in the request)
optional string scroll_id = 6;
}

message SplitSearchError {
Expand Down
252 changes: 252 additions & 0 deletions quickwit/quickwit-proto/src/quickwit.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,40 @@
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ScrollRequest {
#[prost(string, tag = "1")]
pub scroll_id: ::prost::alloc::string::String,
}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PutKvRequest {
#[prost(bytes = "vec", tag = "1")]
pub key: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes = "vec", tag = "2")]
pub payload: ::prost::alloc::vec::Vec<u8>,
#[prost(uint32, tag = "3")]
pub ttl_secs: u32,
}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PutKvResponse {}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetKvRequest {
#[prost(bytes = "vec", tag = "1")]
pub key: ::prost::alloc::vec::Vec<u8>,
}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetKvResponse {
#[prost(bytes = "vec", optional, tag = "1")]
pub payload: ::core::option::Option<::prost::alloc::vec::Vec<u8>>,
}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[derive(Eq, Hash)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -42,6 +78,11 @@ pub struct SearchRequest {
/// Fields to extract snippet on
#[prost(string, repeated, tag = "12")]
pub snippet_fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
/// If set, the search response will include a search id
/// that will make it possible to paginate through the results
/// in a consistent manner.
#[prost(uint32, optional, tag = "14")]
pub scroll_ttl_secs: ::core::option::Option<u32>,
}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]
Expand All @@ -63,6 +104,9 @@ pub struct SearchResponse {
/// Serialized aggregation response
#[prost(string, optional, tag = "5")]
pub aggregation: ::core::option::Option<::prost::alloc::string::String>,
/// Scroll Id (only set if scroll_secs was set in the request)
#[prost(string, optional, tag = "6")]
pub scroll_id: ::core::option::Option<::prost::alloc::string::String>,
}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -734,6 +778,72 @@ pub mod search_service_client {
.insert(GrpcMethod::new("quickwit.SearchService", "LeafListTerms"));
self.inner.unary(req, path, codec).await
}
pub async fn scroll(
&mut self,
request: impl tonic::IntoRequest<super::ScrollRequest>,
) -> std::result::Result<tonic::Response<super::SearchResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/quickwit.SearchService/Scroll",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("quickwit.SearchService", "Scroll"));
self.inner.unary(req, path, codec).await
}
pub async fn put_kv(
&mut self,
request: impl tonic::IntoRequest<super::PutKvRequest>,
) -> std::result::Result<tonic::Response<super::PutKvResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/quickwit.SearchService/PutKV",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("quickwit.SearchService", "PutKV"));
self.inner.unary(req, path, codec).await
}
pub async fn get_kv(
&mut self,
request: impl tonic::IntoRequest<super::GetKvRequest>,
) -> std::result::Result<tonic::Response<super::GetKvResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/quickwit.SearchService/GetKV",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("quickwit.SearchService", "GetKV"));
self.inner.unary(req, path, codec).await
}
}
}
/// Generated server implementations.
Expand Down Expand Up @@ -817,6 +927,18 @@ pub mod search_service_server {
tonic::Response<super::LeafListTermsResponse>,
tonic::Status,
>;
async fn scroll(
&self,
request: tonic::Request<super::ScrollRequest>,
) -> std::result::Result<tonic::Response<super::SearchResponse>, tonic::Status>;
async fn put_kv(
&self,
request: tonic::Request<super::PutKvRequest>,
) -> std::result::Result<tonic::Response<super::PutKvResponse>, tonic::Status>;
async fn get_kv(
&self,
request: tonic::Request<super::GetKvRequest>,
) -> std::result::Result<tonic::Response<super::GetKvResponse>, tonic::Status>;
}
#[derive(Debug)]
pub struct SearchServiceServer<T: SearchService> {
Expand Down Expand Up @@ -1169,6 +1291,136 @@ pub mod search_service_server {
};
Box::pin(fut)
}
"/quickwit.SearchService/Scroll" => {
#[allow(non_camel_case_types)]
struct ScrollSvc<T: SearchService>(pub Arc<T>);
impl<
T: SearchService,
> tonic::server::UnaryService<super::ScrollRequest>
for ScrollSvc<T> {
type Response = super::SearchResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::ScrollRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { (*inner).scroll(request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = ScrollSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/quickwit.SearchService/PutKV" => {
#[allow(non_camel_case_types)]
struct PutKVSvc<T: SearchService>(pub Arc<T>);
impl<
T: SearchService,
> tonic::server::UnaryService<super::PutKvRequest> for PutKVSvc<T> {
type Response = super::PutKvResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::PutKvRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { (*inner).put_kv(request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = PutKVSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/quickwit.SearchService/GetKV" => {
#[allow(non_camel_case_types)]
struct GetKVSvc<T: SearchService>(pub Arc<T>);
impl<
T: SearchService,
> tonic::server::UnaryService<super::GetKvRequest> for GetKVSvc<T> {
type Response = super::GetKvResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GetKvRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { (*inner).get_kv(request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = GetKVSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
Ok(
Expand Down
Loading

0 comments on commit 5a7fa16

Please sign in to comment.