Skip to content

Commit

Permalink
Serialize service errors in dedicated gRPC header (#4678)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Mar 8, 2024
1 parent f37d8b6 commit 9cc3311
Show file tree
Hide file tree
Showing 56 changed files with 835 additions and 654 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
.ask_for_res(SpawnPipeline {
index_id: args.index_id.clone(),
source_config,
pipeline_uid: PipelineUid::from_u128(0u128),
pipeline_uid: PipelineUid::new(),
})
.await?;
let merge_pipeline_handle = indexing_server_mailbox
Expand Down Expand Up @@ -615,7 +615,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
transform_config: None,
input_format: SourceInputFormat::Json,
},
pipeline_uid: PipelineUid::from_u128(0u128),
pipeline_uid: PipelineUid::new(),
})
.await?;
let pipeline_handle: ActorHandle<MergePipeline> = indexing_service_mailbox
Expand Down
22 changes: 11 additions & 11 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,13 +929,13 @@ mod tests {
.unwrap();
let index_uid: IndexUid = IndexUid::for_test("index-1", 1);
let indexing_task1 = IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
pipeline_uid: Some(PipelineUid::for_test(1u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
};
let indexing_task2 = IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
Expand Down Expand Up @@ -1013,7 +1013,7 @@ mod tests {
let index_id = random_generator.gen_range(0..=10_000);
let source_id = random_generator.gen_range(0..=100);
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(pipeline_id as u128)),
pipeline_uid: Some(PipelineUid::for_test(pipeline_id as u128)),
index_uid: Some(
format!("index-{index_id}:11111111111111111111111111")
.parse()
Expand Down Expand Up @@ -1246,7 +1246,7 @@ mod tests {
test_serialize_indexing_tasks_aux(&[], &mut node_state);
test_serialize_indexing_tasks_aux(
&[IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
pipeline_uid: Some(PipelineUid::for_test(1u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
Expand All @@ -1256,7 +1256,7 @@ mod tests {
// change in the set of shards
test_serialize_indexing_tasks_aux(
&[IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2), ShardId::from(3)],
Expand All @@ -1266,13 +1266,13 @@ mod tests {
test_serialize_indexing_tasks_aux(
&[
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
pipeline_uid: Some(PipelineUid::for_test(1u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
},
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
Expand All @@ -1284,13 +1284,13 @@ mod tests {
test_serialize_indexing_tasks_aux(
&[
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
pipeline_uid: Some(PipelineUid::for_test(1u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
},
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(IndexUid::for_test("test-index2", 0)),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
Expand All @@ -1302,13 +1302,13 @@ mod tests {
test_serialize_indexing_tasks_aux(
&[
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
pipeline_uid: Some(PipelineUid::for_test(1u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
},
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source2".to_string(),
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-codegen/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

1. Describe your service in a proto file.

2. Define an error and a result type for your service. The error type must implement `From<tonic::Status>` and `Into<tonic::Status>`.
2. Define an error and a result type for your service. The error type must implement `quickwit_proto::error::GrpcServiceError` and have at least the three following variants: `Internal`, `Timeout`, and `Unavailable`.

3. Add the following dependencies to your project:

Expand All @@ -25,6 +25,7 @@ tower = { workspace = true }
utoipa = { workspace = true }

quickwit-actors = { workspace = true }
quickwit-proto = { workspace = true }

[dev-dependencies]
mockall = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-codegen/example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ utoipa = { workspace = true }
quickwit-actors = { workspace = true }
quickwit-common = { workspace = true }
quickwit-macros = { workspace = true }
quickwit-proto ={ workspace = true }

[dev-dependencies]
mockall = { workspace = true }
Expand Down
18 changes: 10 additions & 8 deletions quickwit/quickwit-codegen/example/src/codegen/hello.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 34 additions & 11 deletions quickwit/quickwit-codegen/example/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,53 @@
use std::fmt;

use quickwit_actors::AskError;
use quickwit_proto::error::GrpcServiceError;
pub use quickwit_proto::error::{grpc_error_to_grpc_status, grpc_status_to_service_error};
use quickwit_proto::{ServiceError, ServiceErrorCode};
use serde::{Deserialize, Serialize};

// Service errors have to be handwritten before codegen.
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, Serialize, Deserialize)]
pub enum HelloError {
#[error("internal error: {0}")]
InternalError(String),
#[error("transport error: {0}")]
TransportError(#[from] tonic::Status),
Internal(String),
#[error("invalid argument: {0}")]
InvalidArgument(String),
#[error("request timed out: {0}")]
Timeout(String),
#[error("service unavailable: {0}")]
Unavailable(String),
}

// Service errors must implement `From<tonic::Status>` and `Into<tonic::Status>`.
impl From<HelloError> for tonic::Status {
fn from(error: HelloError) -> Self {
match error {
HelloError::InternalError(message) => tonic::Status::internal(message),
HelloError::TransportError(status) => status,
impl ServiceError for HelloError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Self::Internal(_) => ServiceErrorCode::Internal,
Self::InvalidArgument(_) => ServiceErrorCode::BadRequest,
Self::Timeout(_) => ServiceErrorCode::Timeout,
Self::Unavailable(_) => ServiceErrorCode::Unavailable,
}
}
}

impl GrpcServiceError for HelloError {
fn new_internal(message: String) -> Self {
Self::Internal(message)
}

fn new_timeout(message: String) -> Self {
Self::Timeout(message)
}

fn new_unavailable(message: String) -> Self {
Self::Unavailable(message)
}
}

impl<E> From<AskError<E>> for HelloError
where E: fmt::Debug
{
fn from(error: AskError<E>) -> Self {
HelloError::InternalError(format!("{error:?}"))
HelloError::Internal(format!("{error:?}"))
}
}
74 changes: 68 additions & 6 deletions quickwit/quickwit-codegen/example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,27 @@ fn spawn_ping_response_stream(
service_stream
}

#[derive(Debug, Clone)]
struct HelloImpl;
#[derive(Debug, Clone, Default)]
struct HelloImpl {
delay: Duration,
}

#[async_trait]
impl Hello for HelloImpl {
async fn hello(&mut self, request: HelloRequest) -> HelloResult<HelloResponse> {
tokio::time::sleep(self.delay).await;

if request.name.is_empty() {
return Err(HelloError::InvalidArgument("name is empty".to_string()));
}
Ok(HelloResponse {
message: format!("Hello, {}!", request.name),
})
}

async fn goodbye(&mut self, request: GoodbyeRequest) -> HelloResult<GoodbyeResponse> {
tokio::time::sleep(self.delay).await;

Ok(GoodbyeResponse {
message: format!("Goodbye, {}!", request.name),
})
Expand Down Expand Up @@ -169,7 +178,7 @@ mod tests {

#[tokio::test]
async fn test_hello_codegen() {
let mut hello = HelloImpl;
let mut hello = HelloImpl::default();

assert_eq!(
hello
Expand Down Expand Up @@ -255,7 +264,7 @@ mod tests {

#[tokio::test]
async fn test_hello_codegen_grpc() {
let grpc_server_adapter = HelloGrpcServerAdapter::new(HelloImpl);
let grpc_server_adapter = HelloGrpcServerAdapter::new(HelloImpl::default());
let grpc_server: HelloGrpcServer<HelloGrpcServerAdapter> =
HelloGrpcServer::new(grpc_server_adapter);
let addr: SocketAddr = "127.0.0.1:6666".parse().unwrap();
Expand Down Expand Up @@ -287,6 +296,16 @@ mod tests {
}
);

assert!(matches!(
grpc_client
.hello(HelloRequest {
name: "".to_string()
})
.await
.unwrap_err(),
HelloError::InvalidArgument(_)
));

let (ping_stream_tx, ping_stream) = ServiceStream::new_bounded(1);
let mut pong_stream = grpc_client.ping(ping_stream).await.unwrap();

Expand Down Expand Up @@ -484,7 +503,7 @@ mod tests {
.stack_hello_layer(hello_layer.clone())
.stack_goodbye_layer(goodbye_layer.clone())
.stack_ping_layer(ping_layer.clone())
.build(HelloImpl);
.build(HelloImpl::default());

hello_tower
.hello(HelloRequest {
Expand Down Expand Up @@ -530,7 +549,7 @@ mod tests {

#[tokio::test]
async fn test_balance_channel() {
let hello = HelloImpl;
let hello = HelloImpl::default();
let grpc_server_adapter = HelloGrpcServerAdapter::new(hello);
let grpc_server = HelloGrpcServer::new(grpc_server_adapter);
let addr: SocketAddr = "127.0.0.1:8888".parse().unwrap();
Expand Down Expand Up @@ -602,4 +621,47 @@ mod tests {
);
hello.check_connectivity().await.unwrap();
}

#[tokio::test]
async fn test_transport_errors_handling() {
let addr: SocketAddr = "127.0.0.1:9999".parse().unwrap();
let channel = Endpoint::from_static("http://127.0.0.1:9999")
.timeout(Duration::from_millis(100))
.connect_lazy();
let max_message_size = ByteSize::mib(1);
let mut grpc_client = HelloClient::from_channel(addr, channel, max_message_size);

let error = grpc_client
.hello(HelloRequest {
name: "Client".to_string(),
})
.await
.unwrap_err();
assert!(matches!(error, HelloError::Unavailable(_)));

let hello = HelloImpl {
delay: Duration::from_secs(1),
};
let grpc_server_adapter = HelloGrpcServerAdapter::new(hello);
let grpc_server: HelloGrpcServer<HelloGrpcServerAdapter> =
HelloGrpcServer::new(grpc_server_adapter);
let addr: SocketAddr = "127.0.0.1:9999".parse().unwrap();

tokio::spawn({
async move {
Server::builder()
.add_service(grpc_server)
.serve(addr)
.await
.unwrap();
}
});
let error = grpc_client
.hello(HelloRequest {
name: "Client".to_string(),
})
.await
.unwrap_err();
assert!(matches!(error, HelloError::Timeout(_)));
}
}
Loading

0 comments on commit 9cc3311

Please sign in to comment.