Skip to content

Commit

Permalink
Add support for client-side streaming for code-generated clients
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jul 3, 2023
1 parent 9f600e1 commit 75de77f
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 240 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-codegen/example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ documentation = "https://quickwit.io/docs/"
[dependencies]
async-trait = { workspace = true }
dyn-clone = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
hyper = { workspace = true }
prost = { workspace = true }
Expand Down
74 changes: 42 additions & 32 deletions quickwit/quickwit-codegen/example/src/codegen/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ pub struct PingResponse {
pub message: ::prost::alloc::string::String,
}
/// BEGIN quickwit-codegen
type HelloStream<T> = quickwit_common::ServiceStream<T, crate::HelloError>;
use tower::{Layer, Service, ServiceExt};
pub type HelloStream<T> = quickwit_common::ServiceStream<crate::HelloResult<T>>;
#[cfg_attr(any(test, feature = "testsuite"), mockall::automock)]
#[async_trait::async_trait]
pub trait Hello: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static {
Expand All @@ -55,7 +56,7 @@ pub trait Hello: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static {
) -> crate::HelloResult<GoodbyeResponse>;
async fn ping(
&mut self,
request: PingRequest,
request: quickwit_common::ServiceStream<PingRequest>,
) -> crate::HelloResult<HelloStream<PingResponse>>;
}
dyn_clone::clone_trait_object!(Hello);
Expand Down Expand Up @@ -127,7 +128,7 @@ impl Hello for HelloClient {
}
async fn ping(
&mut self,
request: PingRequest,
request: quickwit_common::ServiceStream<PingRequest>,
) -> crate::HelloResult<HelloStream<PingResponse>> {
self.inner.ping(request).await
}
Expand Down Expand Up @@ -155,7 +156,7 @@ pub mod mock {
}
async fn ping(
&mut self,
request: PingRequest,
request: quickwit_common::ServiceStream<PingRequest>,
) -> crate::HelloResult<HelloStream<PingResponse>> {
self.inner.lock().await.ping(request).await
}
Expand Down Expand Up @@ -204,7 +205,7 @@ impl tower::Service<GoodbyeRequest> for Box<dyn Hello> {
Box::pin(fut)
}
}
impl tower::Service<PingRequest> for Box<dyn Hello> {
impl tower::Service<quickwit_common::ServiceStream<PingRequest>> for Box<dyn Hello> {
type Response = HelloStream<PingResponse>;
type Error = crate::HelloError;
type Future = BoxFuture<Self::Response, Self::Error>;
Expand All @@ -214,7 +215,10 @@ impl tower::Service<PingRequest> for Box<dyn Hello> {
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, request: PingRequest) -> Self::Future {
fn call(
&mut self,
request: quickwit_common::ServiceStream<PingRequest>,
) -> Self::Future {
let mut svc = self.clone();
let fut = async move { svc.ping(request).await };
Box::pin(fut)
Expand All @@ -234,7 +238,7 @@ struct HelloTowerBlock {
crate::HelloError,
>,
ping_svc: quickwit_common::tower::BoxService<
PingRequest,
quickwit_common::ServiceStream<PingRequest>,
HelloStream<PingResponse>,
crate::HelloError,
>,
Expand Down Expand Up @@ -264,7 +268,7 @@ impl Hello for HelloTowerBlock {
}
async fn ping(
&mut self,
request: PingRequest,
request: quickwit_common::ServiceStream<PingRequest>,
) -> crate::HelloResult<HelloStream<PingResponse>> {
self.ping_svc.ready().await?.call(request).await
}
Expand Down Expand Up @@ -293,7 +297,7 @@ pub struct HelloTowerBlockBuilder {
ping_layer: Option<
quickwit_common::tower::BoxLayer<
Box<dyn Hello>,
PingRequest,
quickwit_common::ServiceStream<PingRequest>,
HelloStream<PingResponse>,
crate::HelloError,
>,
Expand All @@ -316,11 +320,13 @@ impl HelloTowerBlockBuilder {
> + Clone + Send + Sync + 'static,
<L::Service as tower::Service<GoodbyeRequest>>::Future: Send + 'static,
L::Service: tower::Service<
PingRequest,
quickwit_common::ServiceStream<PingRequest>,
Response = HelloStream<PingResponse>,
Error = crate::HelloError,
> + Clone + Send + Sync + 'static,
<L::Service as tower::Service<PingRequest>>::Future: Send + 'static,
<L::Service as tower::Service<
quickwit_common::ServiceStream<PingRequest>,
>>::Future: Send + 'static,
{
self.hello_layer = Some(quickwit_common::tower::BoxLayer::new(layer.clone()));
self.goodbye_layer = Some(quickwit_common::tower::BoxLayer::new(layer.clone()));
Expand Down Expand Up @@ -357,11 +363,13 @@ impl HelloTowerBlockBuilder {
where
L: tower::Layer<Box<dyn Hello>> + Send + Sync + 'static,
L::Service: tower::Service<
PingRequest,
quickwit_common::ServiceStream<PingRequest>,
Response = HelloStream<PingResponse>,
Error = crate::HelloError,
> + Clone + Send + Sync + 'static,
<L::Service as tower::Service<PingRequest>>::Future: Send + 'static,
<L::Service as tower::Service<
quickwit_common::ServiceStream<PingRequest>,
>>::Future: Send + 'static,
{
self.ping_layer = Some(quickwit_common::tower::BoxLayer::new(layer));
self
Expand Down Expand Up @@ -460,13 +468,12 @@ impl<A: quickwit_actors::Actor> Clone for HelloMailbox<A> {
Self { inner }
}
}
use tower::{Layer, Service, ServiceExt};
impl<A, M, T, E> tower::Service<M> for HelloMailbox<A>
where
A: quickwit_actors::Actor
+ quickwit_actors::DeferableReplyHandler<M, Reply = Result<T, E>> + Send
+ 'static,
M: std::fmt::Debug + Send + Sync + 'static,
M: std::fmt::Debug + Send + 'static,
T: Send + 'static,
E: std::fmt::Debug + Send + 'static,
crate::HelloError: From<quickwit_actors::AskError<E>>,
Expand Down Expand Up @@ -494,7 +501,7 @@ where
#[async_trait::async_trait]
impl<A> Hello for HelloMailbox<A>
where
A: quickwit_actors::Actor + std::fmt::Debug + Send + Sync + 'static,
A: quickwit_actors::Actor + std::fmt::Debug,
HelloMailbox<
A,
>: tower::Service<
Expand All @@ -510,7 +517,7 @@ where
Future = BoxFuture<GoodbyeResponse, crate::HelloError>,
>
+ tower::Service<
PingRequest,
quickwit_common::ServiceStream<PingRequest>,
Response = HelloStream<PingResponse>,
Error = crate::HelloError,
Future = BoxFuture<HelloStream<PingResponse>, crate::HelloError>,
Expand All @@ -530,7 +537,7 @@ where
}
async fn ping(
&mut self,
request: PingRequest,
request: quickwit_common::ServiceStream<PingRequest>,
) -> crate::HelloResult<HelloStream<PingResponse>> {
self.call(request).await
}
Expand Down Expand Up @@ -576,15 +583,15 @@ where
}
async fn ping(
&mut self,
request: PingRequest,
request: quickwit_common::ServiceStream<PingRequest>,
) -> crate::HelloResult<HelloStream<PingResponse>> {
self.inner
.ping(request)
.await
.map(|response| {
let stream = response.into_inner();
let service_stream = quickwit_common::ServiceStream::from(stream);
service_stream.map_err(|error| error.into())
let streaming: tonic::Streaming<_> = response.into_inner();
let stream = quickwit_common::ServiceStream::from(streaming);
stream.map_err(|error| error.into())
})
.map_err(|error| error.into())
}
Expand Down Expand Up @@ -625,14 +632,17 @@ impl hello_grpc_server::HelloGrpc for HelloGrpcServerAdapter {
.map(tonic::Response::new)
.map_err(|error| error.into())
}
type PingStream = quickwit_common::ServiceStream<PingResponse, tonic::Status>;
type PingStream = quickwit_common::ServiceStream<tonic::Result<PingResponse>>;
async fn ping(
&self,
request: tonic::Request<PingRequest>,
request: tonic::Request<tonic::Streaming<PingRequest>>,
) -> Result<tonic::Response<Self::PingStream>, tonic::Status> {
self.inner
.clone()
.ping(request.into_inner())
.ping({
let streaming: tonic::Streaming<_> = request.into_inner();
quickwit_common::ServiceStream::from(streaming)
})
.await
.map(|stream| tonic::Response::new(stream.map_err(|error| error.into())))
.map_err(|error| error.into())
Expand Down Expand Up @@ -766,7 +776,7 @@ pub mod hello_grpc_client {
}
pub async fn ping(
&mut self,
request: impl tonic::IntoRequest<super::PingRequest>,
request: impl tonic::IntoStreamingRequest<Message = super::PingRequest>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::PingResponse>>,
tonic::Status,
Expand All @@ -782,9 +792,9 @@ pub mod hello_grpc_client {
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/hello.Hello/Ping");
let mut req = request.into_request();
let mut req = request.into_streaming_request();
req.extensions_mut().insert(GrpcMethod::new("hello.Hello", "Ping"));
self.inner.server_streaming(req, path, codec).await
self.inner.streaming(req, path, codec).await
}
}
}
Expand All @@ -811,7 +821,7 @@ pub mod hello_grpc_server {
+ 'static;
async fn ping(
&self,
request: tonic::Request<super::PingRequest>,
request: tonic::Request<tonic::Streaming<super::PingRequest>>,
) -> std::result::Result<tonic::Response<Self::PingStream>, tonic::Status>;
}
#[derive(Debug)]
Expand Down Expand Up @@ -982,7 +992,7 @@ pub mod hello_grpc_server {
struct PingSvc<T: HelloGrpc>(pub Arc<T>);
impl<
T: HelloGrpc,
> tonic::server::ServerStreamingService<super::PingRequest>
> tonic::server::StreamingService<super::PingRequest>
for PingSvc<T> {
type Response = super::PingResponse;
type ResponseStream = T::PingStream;
Expand All @@ -992,7 +1002,7 @@ pub mod hello_grpc_server {
>;
fn call(
&mut self,
request: tonic::Request<super::PingRequest>,
request: tonic::Request<tonic::Streaming<super::PingRequest>>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { (*inner).ping(request).await };
Expand All @@ -1017,7 +1027,7 @@ pub mod hello_grpc_server {
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.server_streaming(method, req).await;
let res = grpc.streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-codegen/example/src/hello.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ message PingResponse {
service Hello {
rpc Hello(HelloRequest) returns (HelloResponse);
rpc Goodbye(GoodbyeRequest) returns (GoodbyeResponse);
rpc Ping(PingRequest) returns (stream PingResponse);
rpc Ping(stream PingRequest) returns (stream PingResponse);
}
Loading

0 comments on commit 75de77f

Please sign in to comment.