diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 8c2f8f985c5..5ac1217cb4c 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5605,6 +5605,7 @@ dependencies = [ "quickwit-codegen", "quickwit-common", "quickwit-macros", + "quickwit-proto", "serde", "thiserror", "tokio", diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index e0dca7e2bc1..d9826c2ea56 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -475,7 +475,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< .ask_for_res(SpawnPipeline { index_id: args.index_id.clone(), source_config, - pipeline_uid: PipelineUid::from_u128(0u128), + pipeline_uid: PipelineUid::new(), }) .await?; let merge_pipeline_handle = indexing_server_mailbox @@ -615,7 +615,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> { transform_config: None, input_format: SourceInputFormat::Json, }, - pipeline_uid: PipelineUid::from_u128(0u128), + pipeline_uid: PipelineUid::new(), }) .await?; let pipeline_handle: ActorHandle = indexing_service_mailbox diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index 7223f522175..5c0ef7f4fca 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -929,13 +929,13 @@ mod tests { .unwrap(); let index_uid: IndexUid = IndexUid::for_test("index-1", 1); let indexing_task1 = IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(1u128)), + pipeline_uid: Some(PipelineUid::for_test(1u128)), index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), }; let indexing_task2 = IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(2u128)), + pipeline_uid: Some(PipelineUid::for_test(2u128)), index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), @@ -1013,7 +1013,7 @@ mod tests { let index_id = random_generator.gen_range(0..=10_000); let source_id = random_generator.gen_range(0..=100); IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(pipeline_id as u128)), + pipeline_uid: Some(PipelineUid::for_test(pipeline_id as u128)), index_uid: Some( format!("index-{index_id}:11111111111111111111111111") .parse() @@ -1246,7 +1246,7 @@ mod tests { test_serialize_indexing_tasks_aux(&[], &mut node_state); test_serialize_indexing_tasks_aux( &[IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(1u128)), + pipeline_uid: Some(PipelineUid::for_test(1u128)), index_uid: Some(index_uid.clone()), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(1), ShardId::from(2)], @@ -1256,7 +1256,7 @@ mod tests { // change in the set of shards test_serialize_indexing_tasks_aux( &[IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(2u128)), + pipeline_uid: Some(PipelineUid::for_test(2u128)), index_uid: Some(index_uid.clone()), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(1), ShardId::from(2), ShardId::from(3)], @@ -1266,13 +1266,13 @@ mod tests { test_serialize_indexing_tasks_aux( &[ IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(1u128)), + pipeline_uid: Some(PipelineUid::for_test(1u128)), index_uid: Some(index_uid.clone()), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(1), ShardId::from(2)], }, IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(2u128)), + pipeline_uid: Some(PipelineUid::for_test(2u128)), index_uid: Some(index_uid.clone()), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(3), ShardId::from(4)], @@ -1284,13 +1284,13 @@ mod tests { test_serialize_indexing_tasks_aux( &[ IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(1u128)), + pipeline_uid: Some(PipelineUid::for_test(1u128)), index_uid: Some(index_uid.clone()), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(1), ShardId::from(2)], }, IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(2u128)), + pipeline_uid: Some(PipelineUid::for_test(2u128)), index_uid: Some(IndexUid::for_test("test-index2", 0)), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(3), ShardId::from(4)], @@ -1302,13 +1302,13 @@ mod tests { test_serialize_indexing_tasks_aux( &[ IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(1u128)), + pipeline_uid: Some(PipelineUid::for_test(1u128)), index_uid: Some(index_uid.clone()), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(1), ShardId::from(2)], }, IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(2u128)), + pipeline_uid: Some(PipelineUid::for_test(2u128)), index_uid: Some(index_uid.clone()), source_id: "my-source2".to_string(), shard_ids: vec![ShardId::from(3), ShardId::from(4)], diff --git a/quickwit/quickwit-codegen/README.md b/quickwit/quickwit-codegen/README.md index 2eb94be55b4..8553eb22b6a 100644 --- a/quickwit/quickwit-codegen/README.md +++ b/quickwit/quickwit-codegen/README.md @@ -4,7 +4,7 @@ 1. Describe your service in a proto file. -2. Define an error and a result type for your service. The error type must implement `From` and `Into`. +2. Define an error and a result type for your service. The error type must implement `quickwit_proto::error::GrpcServiceError` and have at least the three following variants: `Internal`, `Timeout`, and `Unavailable`. 3. Add the following dependencies to your project: @@ -25,6 +25,7 @@ tower = { workspace = true } utoipa = { workspace = true } quickwit-actors = { workspace = true } +quickwit-proto = { workspace = true } [dev-dependencies] mockall = { workspace = true } diff --git a/quickwit/quickwit-codegen/example/Cargo.toml b/quickwit/quickwit-codegen/example/Cargo.toml index eaa3d358709..50b183aff0a 100644 --- a/quickwit/quickwit-codegen/example/Cargo.toml +++ b/quickwit/quickwit-codegen/example/Cargo.toml @@ -30,6 +30,7 @@ utoipa = { workspace = true } quickwit-actors = { workspace = true } quickwit-common = { workspace = true } quickwit-macros = { workspace = true } +quickwit-proto ={ workspace = true } [dev-dependencies] mockall = { workspace = true } diff --git a/quickwit/quickwit-codegen/example/src/codegen/hello.rs b/quickwit/quickwit-codegen/example/src/codegen/hello.rs index cd799b68fdb..8aa69f74e1f 100644 --- a/quickwit/quickwit-codegen/example/src/codegen/hello.rs +++ b/quickwit/quickwit-codegen/example/src/codegen/hello.rs @@ -744,7 +744,7 @@ where .hello(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn goodbye( &mut self, @@ -754,7 +754,7 @@ where .goodbye(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn ping( &mut self, @@ -766,9 +766,9 @@ where .map(|response| { let streaming: tonic::Streaming<_> = response.into_inner(); let stream = quickwit_common::ServiceStream::from(streaming); - stream.map_err(|error| error.into()) + stream.map_err(crate::error::grpc_status_to_service_error) }) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn check_connectivity(&mut self) -> anyhow::Result<()> { if self.connection_addrs_rx.borrow().len() == 0 { @@ -809,7 +809,7 @@ impl hello_grpc_server::HelloGrpc for HelloGrpcServerAdapter { .hello(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn goodbye( &self, @@ -820,7 +820,7 @@ impl hello_grpc_server::HelloGrpc for HelloGrpcServerAdapter { .goodbye(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } type PingStream = quickwit_common::ServiceStream>; async fn ping( @@ -834,8 +834,10 @@ impl hello_grpc_server::HelloGrpc for HelloGrpcServerAdapter { quickwit_common::ServiceStream::from(streaming) }) .await - .map(|stream| tonic::Response::new(stream.map_err(|error| error.into()))) - .map_err(|error| error.into()) + .map(|stream| tonic::Response::new( + stream.map_err(crate::error::grpc_error_to_grpc_status), + )) + .map_err(crate::error::grpc_error_to_grpc_status) } } /// Generated client implementations. diff --git a/quickwit/quickwit-codegen/example/src/error.rs b/quickwit/quickwit-codegen/example/src/error.rs index 1ff55133e8c..b27af941d9d 100644 --- a/quickwit/quickwit-codegen/example/src/error.rs +++ b/quickwit/quickwit-codegen/example/src/error.rs @@ -20,30 +20,53 @@ use std::fmt; use quickwit_actors::AskError; +use quickwit_proto::error::GrpcServiceError; +pub use quickwit_proto::error::{grpc_error_to_grpc_status, grpc_status_to_service_error}; +use quickwit_proto::{ServiceError, ServiceErrorCode}; +use serde::{Deserialize, Serialize}; // Service errors have to be handwritten before codegen. -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, Serialize, Deserialize)] pub enum HelloError { #[error("internal error: {0}")] - InternalError(String), - #[error("transport error: {0}")] - TransportError(#[from] tonic::Status), + Internal(String), + #[error("invalid argument: {0}")] + InvalidArgument(String), + #[error("request timed out: {0}")] + Timeout(String), + #[error("service unavailable: {0}")] + Unavailable(String), } -// Service errors must implement `From` and `Into`. -impl From for tonic::Status { - fn from(error: HelloError) -> Self { - match error { - HelloError::InternalError(message) => tonic::Status::internal(message), - HelloError::TransportError(status) => status, +impl ServiceError for HelloError { + fn error_code(&self) -> ServiceErrorCode { + match self { + Self::Internal(_) => ServiceErrorCode::Internal, + Self::InvalidArgument(_) => ServiceErrorCode::BadRequest, + Self::Timeout(_) => ServiceErrorCode::Timeout, + Self::Unavailable(_) => ServiceErrorCode::Unavailable, } } } +impl GrpcServiceError for HelloError { + fn new_internal(message: String) -> Self { + Self::Internal(message) + } + + fn new_timeout(message: String) -> Self { + Self::Timeout(message) + } + + fn new_unavailable(message: String) -> Self { + Self::Unavailable(message) + } +} + impl From> for HelloError where E: fmt::Debug { fn from(error: AskError) -> Self { - HelloError::InternalError(format!("{error:?}")) + HelloError::Internal(format!("{error:?}")) } } diff --git a/quickwit/quickwit-codegen/example/src/lib.rs b/quickwit/quickwit-codegen/example/src/lib.rs index 5dccece2e01..de69c577b18 100644 --- a/quickwit/quickwit-codegen/example/src/lib.rs +++ b/quickwit/quickwit-codegen/example/src/lib.rs @@ -113,18 +113,27 @@ fn spawn_ping_response_stream( service_stream } -#[derive(Debug, Clone)] -struct HelloImpl; +#[derive(Debug, Clone, Default)] +struct HelloImpl { + delay: Duration, +} #[async_trait] impl Hello for HelloImpl { async fn hello(&mut self, request: HelloRequest) -> HelloResult { + tokio::time::sleep(self.delay).await; + + if request.name.is_empty() { + return Err(HelloError::InvalidArgument("name is empty".to_string())); + } Ok(HelloResponse { message: format!("Hello, {}!", request.name), }) } async fn goodbye(&mut self, request: GoodbyeRequest) -> HelloResult { + tokio::time::sleep(self.delay).await; + Ok(GoodbyeResponse { message: format!("Goodbye, {}!", request.name), }) @@ -169,7 +178,7 @@ mod tests { #[tokio::test] async fn test_hello_codegen() { - let mut hello = HelloImpl; + let mut hello = HelloImpl::default(); assert_eq!( hello @@ -255,7 +264,7 @@ mod tests { #[tokio::test] async fn test_hello_codegen_grpc() { - let grpc_server_adapter = HelloGrpcServerAdapter::new(HelloImpl); + let grpc_server_adapter = HelloGrpcServerAdapter::new(HelloImpl::default()); let grpc_server: HelloGrpcServer = HelloGrpcServer::new(grpc_server_adapter); let addr: SocketAddr = "127.0.0.1:6666".parse().unwrap(); @@ -287,6 +296,16 @@ mod tests { } ); + assert!(matches!( + grpc_client + .hello(HelloRequest { + name: "".to_string() + }) + .await + .unwrap_err(), + HelloError::InvalidArgument(_) + )); + let (ping_stream_tx, ping_stream) = ServiceStream::new_bounded(1); let mut pong_stream = grpc_client.ping(ping_stream).await.unwrap(); @@ -484,7 +503,7 @@ mod tests { .stack_hello_layer(hello_layer.clone()) .stack_goodbye_layer(goodbye_layer.clone()) .stack_ping_layer(ping_layer.clone()) - .build(HelloImpl); + .build(HelloImpl::default()); hello_tower .hello(HelloRequest { @@ -530,7 +549,7 @@ mod tests { #[tokio::test] async fn test_balance_channel() { - let hello = HelloImpl; + let hello = HelloImpl::default(); let grpc_server_adapter = HelloGrpcServerAdapter::new(hello); let grpc_server = HelloGrpcServer::new(grpc_server_adapter); let addr: SocketAddr = "127.0.0.1:8888".parse().unwrap(); @@ -602,4 +621,47 @@ mod tests { ); hello.check_connectivity().await.unwrap(); } + + #[tokio::test] + async fn test_transport_errors_handling() { + let addr: SocketAddr = "127.0.0.1:9999".parse().unwrap(); + let channel = Endpoint::from_static("http://127.0.0.1:9999") + .timeout(Duration::from_millis(100)) + .connect_lazy(); + let max_message_size = ByteSize::mib(1); + let mut grpc_client = HelloClient::from_channel(addr, channel, max_message_size); + + let error = grpc_client + .hello(HelloRequest { + name: "Client".to_string(), + }) + .await + .unwrap_err(); + assert!(matches!(error, HelloError::Unavailable(_))); + + let hello = HelloImpl { + delay: Duration::from_secs(1), + }; + let grpc_server_adapter = HelloGrpcServerAdapter::new(hello); + let grpc_server: HelloGrpcServer = + HelloGrpcServer::new(grpc_server_adapter); + let addr: SocketAddr = "127.0.0.1:9999".parse().unwrap(); + + tokio::spawn({ + async move { + Server::builder() + .add_service(grpc_server) + .serve(addr) + .await + .unwrap(); + } + }); + let error = grpc_client + .hello(HelloRequest { + name: "Client".to_string(), + }) + .await + .unwrap_err(); + assert!(matches!(error, HelloError::Timeout(_))); + } } diff --git a/quickwit/quickwit-codegen/src/codegen.rs b/quickwit/quickwit-codegen/src/codegen.rs index 0425a798e7f..abbc9b655dc 100644 --- a/quickwit/quickwit-codegen/src/codegen.rs +++ b/quickwit/quickwit-codegen/src/codegen.rs @@ -1165,7 +1165,7 @@ fn generate_grpc_client_adapter_methods(context: &CodegenContext) -> TokenStream { let streaming: tonic::Streaming<_> = response.into_inner(); let stream = quickwit_common::ServiceStream::from(streaming); - stream.map_err(|error| error.into()) + stream.map_err(crate::error::grpc_status_to_service_error) } } } else { @@ -1177,7 +1177,7 @@ fn generate_grpc_client_adapter_methods(context: &CodegenContext) -> TokenStream .#method_name(request) .await .map(#into_response_type) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } }; stream.extend(method); @@ -1250,7 +1250,7 @@ fn generate_grpc_server_adapter_methods(context: &CodegenContext) -> TokenStream }; let into_response_type = if syn_method.server_streaming { quote! { - |stream| tonic::Response::new(stream.map_err(|error| error.into())) + |stream| tonic::Response::new(stream.map_err(crate::error::grpc_error_to_grpc_status)) } } else { quote! { tonic::Response::new } @@ -1264,7 +1264,7 @@ fn generate_grpc_server_adapter_methods(context: &CodegenContext) -> TokenStream .#method_name(#method_arg) .await .map(#into_response_type) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } }; stream.extend(method); diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 61876986d99..e2cc9b2e8a4 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -412,10 +412,12 @@ fn convert_metastore_error( | MetastoreError::JsonDeserializeError { .. } | MetastoreError::JsonSerializeError { .. } | MetastoreError::NotFound(_) => true, + MetastoreError::Connection { .. } | MetastoreError::Db { .. } | MetastoreError::Internal { .. } | MetastoreError::Io { .. } + | MetastoreError::Timeout { .. } | MetastoreError::Unavailable(_) => false, }; if is_transaction_certainly_aborted { diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 474e1ec7344..ca9bef08160 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -516,19 +516,19 @@ mod tests { let mut running_plan = FnvHashMap::default(); let mut desired_plan = FnvHashMap::default(); let task_1 = IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(10u128)), + pipeline_uid: Some(PipelineUid::for_test(10u128)), index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), }; let task_1b = IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(11u128)), + pipeline_uid: Some(PipelineUid::for_test(11u128)), index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), }; let task_2 = IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(20u128)), + pipeline_uid: Some(PipelineUid::for_test(20u128)), index_uid: Some(index_uid.clone()), source_id: "source-2".to_string(), shard_ids: Vec::new(), @@ -548,13 +548,13 @@ mod tests { let mut running_plan = FnvHashMap::default(); let mut desired_plan = FnvHashMap::default(); let task_1 = IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(1u128)), + pipeline_uid: Some(PipelineUid::for_test(1u128)), index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), }; let task_2 = IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(2u128)), + pipeline_uid: Some(PipelineUid::for_test(2u128)), index_uid: Some(index_uid.clone()), source_id: "source-2".to_string(), shard_ids: Vec::new(), @@ -580,13 +580,13 @@ mod tests { let mut running_plan = FnvHashMap::default(); let mut desired_plan = FnvHashMap::default(); let task_1 = IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(1u128)), + pipeline_uid: Some(PipelineUid::for_test(1u128)), index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), }; let task_2 = IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(2u128)), + pipeline_uid: Some(PipelineUid::for_test(2u128)), index_uid: Some(index_uid2.clone()), source_id: "source-2".to_string(), shard_ids: Vec::new(), @@ -620,19 +620,19 @@ mod tests { let mut running_plan = FnvHashMap::default(); let mut desired_plan = FnvHashMap::default(); let task_1a = IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(10u128)), + pipeline_uid: Some(PipelineUid::for_test(10u128)), index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), }; let task_1b = IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(11u128)), + pipeline_uid: Some(PipelineUid::for_test(11u128)), index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), }; let task_1c = IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(12u128)), + pipeline_uid: Some(PipelineUid::for_test(12u128)), index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index 0addeff8f56..96390b23c8c 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -747,11 +747,11 @@ mod tests { &source_uid, &[ ( - PipelineUid::from_u128(1u128), + PipelineUid::for_test(1u128), &[ShardId::from(1), ShardId::from(2)], ), ( - PipelineUid::from_u128(2u128), + PipelineUid::for_test(2u128), &[ShardId::from(3), ShardId::from(4), ShardId::from(5)], ), ], @@ -950,7 +950,7 @@ mod tests { #[test] fn test_group_shards_into_pipeline_single_small_pipeline() { let source_uid = source_id(); - let pipeline_uid = PipelineUid::from_u128(1u128); + let pipeline_uid = PipelineUid::for_test(1u128); let indexing_tasks = group_shards_into_pipelines_aux( &source_uid, &[12], diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index dd466b1e267..5609c8b5526 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -74,7 +74,7 @@ impl ServiceError for IndexServiceError { Self::InvalidConfig(_) => ServiceErrorCode::BadRequest, Self::InvalidIdentifier(_) => ServiceErrorCode::BadRequest, Self::Metastore(error) => error.error_code(), - Self::OperationNotAllowed(_) => ServiceErrorCode::MethodNotAllowed, + Self::OperationNotAllowed(_) => ServiceErrorCode::Forbidden, Self::SplitDeletion(_) => ServiceErrorCode::Internal, Self::Storage(_) => ServiceErrorCode::Internal, } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index ed010e419a7..ca3c911bb9e 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -658,9 +658,7 @@ mod tests { ); } num_fails -= 1; - Err(MetastoreError::Connection { - message: "MetastoreError Alarm".to_string(), - }) + Err(MetastoreError::Timeout("timeout error".to_string())) }); metastore .expect_last_delete_opstamp() @@ -700,7 +698,7 @@ mod tests { index_uid: IndexUid::for_test("test-index", 2), source_id: "test-source".to_string(), node_id: node_id.to_string(), - pipeline_uid: PipelineUid::from_u128(0u128), + pipeline_uid: PipelineUid::for_test(0u128), }; let source_config = SourceConfig { source_id: "test-source".to_string(), @@ -808,7 +806,7 @@ mod tests { index_uid: index_uid.clone(), source_id: "test-source".to_string(), node_id: node_id.to_string(), - pipeline_uid: PipelineUid::from_u128(0u128), + pipeline_uid: PipelineUid::for_test(0u128), }; let source_config = SourceConfig { source_id: "test-source".to_string(), @@ -885,7 +883,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: node_id.to_string(), - pipeline_uid: PipelineUid::from_u128(0u128), + pipeline_uid: PipelineUid::for_test(0u128), }; let source_config = SourceConfig { source_id: "test-source".to_string(), @@ -1004,7 +1002,7 @@ mod tests { index_uid: index_uid.clone(), source_id: "test-source".to_string(), node_id: node_id.to_string(), - pipeline_uid: PipelineUid::from_u128(0u128), + pipeline_uid: PipelineUid::for_test(0u128), }; let source_config = SourceConfig { source_id: "test-source".to_string(), diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 133265bee0e..b9ca436dc3d 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -205,7 +205,10 @@ impl IndexingService { let pipeline_handle = self .indexing_pipelines .remove(&pipeline_uid) - .ok_or(IndexingError::MissingPipeline { pipeline_uid })?; + .ok_or_else(|| { + let message = format!("indexing pipeline `{pipeline_uid}` not found"); + IndexingError::Internal(message) + })?; self.counters.num_running_pipelines -= 1; Ok(pipeline_handle.handle) } @@ -217,8 +220,9 @@ impl IndexingService { let pipeline_handle = self .merge_pipeline_handles .remove(pipeline_id) - .ok_or_else(|| IndexingError::MissingMergePipeline { - merge_pipeline_id: pipeline_id.to_string(), + .ok_or_else(|| { + let message = format!("merge pipeline `{pipeline_id}` not found"); + IndexingError::Internal(message) })?; self.counters.num_running_merge_pipelines -= 1; Ok(pipeline_handle.handle) @@ -231,7 +235,10 @@ impl IndexingService { let pipeline_handle = &self .indexing_pipelines .get(&pipeline_uid) - .ok_or(IndexingError::MissingPipeline { pipeline_uid })? + .ok_or_else(|| { + let message = format!("indexing pipeline `{pipeline_uid}` not found"); + IndexingError::Internal(message) + })? .handle; let observation = pipeline_handle.observe().await; Ok(observation) @@ -268,11 +275,8 @@ impl IndexingService { .indexing_pipelines .contains_key(&pipeline_id.pipeline_uid) { - return Err(IndexingError::PipelineAlreadyExists { - index_id: pipeline_id.index_uid.index_id, - source_id: pipeline_id.source_id, - pipeline_uid: pipeline_id.pipeline_uid, - }); + let message = format!("pipeline `{pipeline_id}` already exists"); + return Err(IndexingError::Internal(message)); } let pipeline_uid_str = pipeline_id.pipeline_uid.to_string(); let indexing_directory = temp_dir::Builder::default() @@ -281,18 +285,24 @@ impl IndexingService { .join(&pipeline_id.source_id) .join(&pipeline_uid_str) .tempdir_in(&self.indexing_root_directory) - .map_err(IndexingError::Io)?; + .map_err(|error| { + let message = format!("failed to create indexing directory: {error}"); + IndexingError::Internal(message) + })?; let storage = self .storage_resolver .resolve(&index_config.index_uri) .await - .map_err(|err| IndexingError::StorageResolverError(err.to_string()))?; + .map_err(|error| { + let message = format!("failed to spawn indexing pipeline: {error}"); + IndexingError::Internal(message) + })?; let merge_policy = crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings); let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone()); let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) - .map_err(IndexingError::InvalidParams)?; + .map_err(|error| IndexingError::Internal(error.to_string()))?; let merge_pipeline_params = MergePipelineParams { pipeline_id: pipeline_id.clone(), @@ -364,11 +374,8 @@ impl IndexingService { .metastore .clone() .index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string())) - .await - .map_err(|err| IndexingError::MetastoreError(err.to_string()))?; - let index_metadata = index_metadata_response - .deserialize_index_metadata() - .map_err(|err| IndexingError::MetastoreError(err.to_string()))?; + .await?; + let index_metadata = index_metadata_response.deserialize_index_metadata()?; Ok(index_metadata) } @@ -380,7 +387,7 @@ impl IndexingService { ActorState::Success => { info!( pipeline_uid=%pipeline_uid, - "Indexing pipeline exited successfully." + "indexing pipeline exited successfully" ); self.counters.num_successful_pipelines += 1; self.counters.num_running_pipelines -= 1; @@ -556,9 +563,11 @@ impl IndexingService { self.update_cluster_running_indexing_tasks_in_chitchat() .await; if !failed_spawning_pipeline_ids.is_empty() { - return Err(IndexingError::SpawnPipelinesError { - pipeline_ids: failed_spawning_pipeline_ids, - }); + let message = format!( + "failed to spawn indexing pipelines: {:?}", + failed_spawning_pipeline_ids + ); + return Err(IndexingError::Internal(message)); } Ok(()) } @@ -964,7 +973,7 @@ mod tests { }; let spawn_pipeline_msg = SpawnPipeline { index_id: index_id.clone(), - pipeline_uid: PipelineUid::from_u128(1111u128), + pipeline_uid: PipelineUid::for_test(1111u128), source_config: source_config_0.clone(), }; let pipeline_id: IndexingPipelineId = indexing_service @@ -978,7 +987,7 @@ mod tests { assert_eq!(pipeline_id.index_uid.index_id, index_id); assert_eq!(pipeline_id.source_id, source_config_0.source_id); assert_eq!(pipeline_id.node_id, "test-node"); - assert_eq!(pipeline_id.pipeline_uid, PipelineUid::from_u128(1111u128)); + assert_eq!(pipeline_id.pipeline_uid, PipelineUid::for_test(1111u128)); assert_eq!( indexing_service_handle .observe() @@ -1135,13 +1144,13 @@ mod tests { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), - pipeline_uid: Some(PipelineUid::from_u128(0u128)), + pipeline_uid: Some(PipelineUid::for_test(0u128)), }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), - pipeline_uid: Some(PipelineUid::from_u128(1u128)), + pipeline_uid: Some(PipelineUid::for_test(1u128)), }, ]; indexing_service @@ -1180,25 +1189,25 @@ mod tests { index_uid: Some(metadata.index_uid.clone()), source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), - pipeline_uid: Some(PipelineUid::from_u128(3u128)), + pipeline_uid: Some(PipelineUid::for_test(3u128)), }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), - pipeline_uid: Some(PipelineUid::from_u128(1u128)), + pipeline_uid: Some(PipelineUid::for_test(1u128)), }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), - pipeline_uid: Some(PipelineUid::from_u128(2u128)), + pipeline_uid: Some(PipelineUid::for_test(2u128)), }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), - pipeline_uid: Some(PipelineUid::from_u128(4u128)), + pipeline_uid: Some(PipelineUid::for_test(4u128)), }, ]; indexing_service @@ -1238,19 +1247,19 @@ mod tests { index_uid: Some(metadata.index_uid.clone()), source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), - pipeline_uid: Some(PipelineUid::from_u128(3u128)), + pipeline_uid: Some(PipelineUid::for_test(3u128)), }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), - pipeline_uid: Some(PipelineUid::from_u128(1u128)), + pipeline_uid: Some(PipelineUid::for_test(1u128)), }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), - pipeline_uid: Some(PipelineUid::from_u128(4u128)), + pipeline_uid: Some(PipelineUid::for_test(4u128)), }, ]; indexing_service diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index f14aeac1368..85f338270db 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -592,7 +592,7 @@ mod tests { index_uid: index_uid.clone(), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_uid: PipelineUid::from_u128(0u128), + pipeline_uid: PipelineUid::for_test(0u128), }; for split_id in 0..4 { let single_doc = std::iter::once( @@ -716,7 +716,7 @@ mod tests { let pipeline_id = IndexingPipelineId { index_uid: index_uid.clone(), node_id: "unknown".to_string(), - pipeline_uid: PipelineUid::from_u128(0u128), + pipeline_uid: PipelineUid::for_test(0u128), source_id: "unknown".to_string(), }; test_sandbox.add_documents(docs).await?; diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 06d63fb41f1..b16914f3060 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -377,7 +377,7 @@ pub mod tests { index_uid: IndexUid::new_with_random_ulid("test_index"), source_id: "test_source".to_string(), node_id: "test_node".to_string(), - pipeline_uid: PipelineUid::from_u128(0u128), + pipeline_uid: PipelineUid::for_test(0u128), }; let split_attrs = merge_split_attrs(merged_split_id, &pipeline_id, splits); create_split_metadata(merge_policy, &split_attrs, tags, 0..0) diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index ee7597694e5..754681f5c2b 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -172,7 +172,7 @@ impl SourceRuntimeArgs { node_id: "test-node".to_string(), index_uid, source_id: source_config.source_id.clone(), - pipeline_uid: PipelineUid::from_u128(0u128), + pipeline_uid: PipelineUid::for_test(0u128), }; Arc::new(SourceRuntimeArgs { pipeline_id, diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 18525540036..5953e1e8674 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -174,7 +174,7 @@ impl TestSandbox { .ask_for_res(SpawnPipeline { index_id: self.index_uid.index_id.to_string(), source_config, - pipeline_uid: PipelineUid::from_u128(0u128), + pipeline_uid: PipelineUid::for_test(0u128), }) .await?; let pipeline_handle = self diff --git a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs index c56424bf524..48449e20f74 100644 --- a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs +++ b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs @@ -803,21 +803,21 @@ where .ingest(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn fetch(&mut self, request: FetchRequest) -> crate::Result { self.inner .fetch(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn tail(&mut self, request: TailRequest) -> crate::Result { self.inner .tail(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } } #[derive(Debug)] @@ -843,7 +843,7 @@ impl ingest_service_grpc_server::IngestServiceGrpc for IngestServiceGrpcServerAd .ingest(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn fetch( &self, @@ -854,7 +854,7 @@ impl ingest_service_grpc_server::IngestServiceGrpc for IngestServiceGrpcServerAd .fetch(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn tail( &self, @@ -865,7 +865,7 @@ impl ingest_service_grpc_server::IngestServiceGrpc for IngestServiceGrpcServerAd .tail(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } } /// Generated client implementations. diff --git a/quickwit/quickwit-ingest/src/errors.rs b/quickwit/quickwit-ingest/src/error.rs similarity index 70% rename from quickwit/quickwit-ingest/src/errors.rs rename to quickwit/quickwit-ingest/src/error.rs index 015d7279a9e..4eb0479b73c 100644 --- a/quickwit/quickwit-ingest/src/errors.rs +++ b/quickwit/quickwit-ingest/src/error.rs @@ -22,11 +22,12 @@ use std::io; use mrecordlog::error::*; use quickwit_actors::AskError; use quickwit_common::tower::BufferError; +pub(crate) use quickwit_proto::error::{grpc_error_to_grpc_status, grpc_status_to_service_error}; use quickwit_proto::ingest::IngestV2Error; -use quickwit_proto::{tonic, ServiceError, ServiceErrorCode}; -use serde::Serialize; +use quickwit_proto::{tonic, GrpcServiceError, ServiceError, ServiceErrorCode}; +use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, thiserror::Error, Serialize)] +#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)] pub enum IngestServiceError { #[error("data corruption: {0}")] Corruption(String), @@ -70,40 +71,18 @@ impl From for IngestServiceError { IngestServiceError::IoError(io_error.to_string()) } } + impl From for IngestServiceError { - fn from(err: IngestV2Error) -> Self { - match err { - IngestV2Error::Internal(_) => IngestServiceError::Internal(err.to_string()), - IngestV2Error::IngesterUnavailable { ingester_id } => { - IngestServiceError::Internal(format!("ingester {} is unavailable", ingester_id)) + fn from(error: IngestV2Error) -> Self { + match error { + IngestV2Error::IngesterUnavailable { .. } + | IngestV2Error::Timeout(_) + | IngestV2Error::Unavailable(_) => IngestServiceError::Unavailable, + IngestV2Error::Internal(message) => IngestServiceError::Internal(message), + IngestV2Error::ShardNotFound { .. } => { + IngestServiceError::Internal("shard not found".to_string()) } - IngestV2Error::ShardNotFound { shard_id } => IngestServiceError::Internal(format!( - "shard {} is not found in the ingester", - shard_id - )), - IngestV2Error::Timeout => IngestServiceError::Unavailable, IngestV2Error::TooManyRequests => IngestServiceError::RateLimited, - IngestV2Error::Transport(msg) => IngestServiceError::Internal(msg.to_string()), - } - } -} - -impl From for IngestServiceError { - fn from(status: tonic::Status) -> Self { - // TODO: Use status.details() #2859. - match status.code() { - tonic::Code::AlreadyExists => IngestServiceError::IndexAlreadyExists { - index_id: status.message().to_string(), - }, - tonic::Code::NotFound => IngestServiceError::IndexNotFound { - index_id: status.message().to_string(), - }, - tonic::Code::InvalidArgument => { - IngestServiceError::InvalidPosition(status.message().to_string()) - } - tonic::Code::ResourceExhausted => IngestServiceError::RateLimited, - tonic::Code::Unavailable => IngestServiceError::Unavailable, - _ => IngestServiceError::Internal(status.message().to_string()), } } } @@ -111,18 +90,32 @@ impl From for IngestServiceError { impl ServiceError for IngestServiceError { fn error_code(&self) -> ServiceErrorCode { match self { - IngestServiceError::Corruption(_) => ServiceErrorCode::Internal, - IngestServiceError::IndexAlreadyExists { .. } => ServiceErrorCode::BadRequest, - IngestServiceError::IndexNotFound { .. } => ServiceErrorCode::NotFound, - IngestServiceError::Internal { .. } => ServiceErrorCode::Internal, - IngestServiceError::InvalidPosition(_) => ServiceErrorCode::BadRequest, - IngestServiceError::IoError { .. } => ServiceErrorCode::Internal, - IngestServiceError::RateLimited => ServiceErrorCode::RateLimited, - IngestServiceError::Unavailable => ServiceErrorCode::Internal, + Self::Corruption { .. } => ServiceErrorCode::Internal, + Self::IndexAlreadyExists { .. } => ServiceErrorCode::AlreadyExists, + Self::IndexNotFound { .. } => ServiceErrorCode::NotFound, + Self::Internal(_) => ServiceErrorCode::Internal, + Self::InvalidPosition(_) => ServiceErrorCode::BadRequest, + Self::IoError { .. } => ServiceErrorCode::Internal, + Self::RateLimited => ServiceErrorCode::TooManyRequests, + Self::Unavailable => ServiceErrorCode::Unavailable, } } } +impl GrpcServiceError for IngestServiceError { + fn new_internal(message: String) -> Self { + Self::Internal(message) + } + + fn new_timeout(message: String) -> Self { + Self::Internal(message) + } + + fn new_unavailable(_: String) -> Self { + Self::Unavailable + } +} + #[derive(Debug, thiserror::Error)] #[error("key should contain 16 bytes, got {0}")] pub struct CorruptedKey(pub usize); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index e43dcfe27e2..dcbfe2cf0b1 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -479,7 +479,7 @@ async fn fault_tolerant_fetch_stream( }; let mut fetch_stream = match ingester.open_fetch_stream(open_fetch_stream_request).await { Ok(fetch_stream) => fetch_stream, - Err(shard_not_found_error @ IngestV2Error::ShardNotFound { .. }) => { + Err(not_found_error @ IngestV2Error::ShardNotFound { .. }) => { error!( client_id=%client_id, index_uid=%index_uid, @@ -491,7 +491,7 @@ async fn fault_tolerant_fetch_stream( index_uid, source_id, shard_id, - ingest_error: shard_not_found_error, + ingest_error: not_found_error, }; let _ = fetch_message_tx.send(Err(fetch_stream_error)).await; from_position_exclusive.to_eof(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index cbaa30581fc..c2adce0c59d 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -855,7 +855,7 @@ impl Ingester { .await? .shards .get(&queue_id) - .ok_or(IngestV2Error::ShardNotFound { + .ok_or_else(|| IngestV2Error::ShardNotFound { shard_id: open_fetch_stream_request.shard_id().clone(), })? .shard_status_rx diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 291873232ce..254f2bfa512 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -292,7 +292,7 @@ impl IngestRouter { ); } match persist_error { - IngestV2Error::Transport(_) => { + IngestV2Error::Unavailable(_) => { workbench .unavailable_leaders .insert(persist_summary.leader_id); @@ -300,11 +300,11 @@ impl IngestRouter { workbench.record_transport_error(subrequest_id); } } - IngestV2Error::TooManyRequests + IngestV2Error::IngesterUnavailable { .. } | IngestV2Error::Internal(_) | IngestV2Error::ShardNotFound { .. } - | IngestV2Error::IngesterUnavailable { .. } - | IngestV2Error::Timeout => { + | IngestV2Error::Timeout(_) + | IngestV2Error::TooManyRequests => { for subrequest_id in persist_summary.subrequest_ids { workbench.record_internal_error( subrequest_id, @@ -400,7 +400,13 @@ impl IngestRouter { ingester.persist(persist_request), ) .await - .unwrap_or_else(|_| Err(IngestV2Error::Timeout)); + .unwrap_or_else(|_| { + let message = format!( + "persist request timed out after {} seconds", + PERSIST_REQUEST_TIMEOUT.as_secs() + ); + Err(IngestV2Error::Timeout(message)) + }); (persist_summary, persist_result) }; persist_futures.push(persist_future); @@ -438,7 +444,13 @@ impl IngestRouter { self.retry_batch_persist(ingest_request, MAX_PERSIST_ATTEMPTS), ) .await - .map_err(|_| IngestV2Error::Timeout) + .map_err(|_| { + let message = format!( + "ingest request timed out after {} seconds", + INGEST_REQUEST_TIMEOUT.as_secs() + ); + IngestV2Error::Timeout(message) + }) } } @@ -1118,7 +1130,8 @@ mod tests { leader_id: "test-ingester-0".into(), subrequest_ids: vec![0], }; - let persist_result = Err::<_, IngestV2Error>(IngestV2Error::Timeout); + let persist_result = + Err::<_, IngestV2Error>(IngestV2Error::Timeout("timeout error".to_string())); (persist_summary, persist_result) }); router @@ -1141,7 +1154,7 @@ mod tests { subrequest_ids: vec![1], }; let persist_result = - Err::<_, IngestV2Error>(IngestV2Error::Transport("transport error".to_string())); + Err::<_, IngestV2Error>(IngestV2Error::Unavailable("connection error".to_string())); (persist_summary, persist_result) }); router diff --git a/quickwit/quickwit-ingest/src/lib.rs b/quickwit/quickwit-ingest/src/lib.rs index eaccd16f5ea..93d605f96a6 100644 --- a/quickwit/quickwit-ingest/src/lib.rs +++ b/quickwit/quickwit-ingest/src/lib.rs @@ -20,7 +20,7 @@ #![deny(clippy::disallowed_methods)] mod doc_batch; -mod errors; +pub mod error; mod ingest_api_service; #[path = "codegen/ingest_service.rs"] mod ingest_service; @@ -37,7 +37,7 @@ use std::path::{Path, PathBuf}; use anyhow::{bail, Context}; pub use doc_batch::*; -pub use errors::IngestServiceError; +pub use error::IngestServiceError; pub use ingest_api_service::{GetMemoryCapacity, GetPartitionId, IngestApiService}; pub use ingest_service::*; pub use ingest_v2::*; diff --git a/quickwit/quickwit-ingest/src/position.rs b/quickwit/quickwit-ingest/src/position.rs index 351a40c44ed..4b9779eb20d 100644 --- a/quickwit/quickwit-ingest/src/position.rs +++ b/quickwit/quickwit-ingest/src/position.rs @@ -19,7 +19,7 @@ use std::fmt; -use crate::errors::CorruptedKey; +use crate::error::CorruptedKey; #[derive(Clone, Copy, Default, Ord, PartialOrd, Eq, PartialEq)] pub struct Position([u8; 8]); diff --git a/quickwit/quickwit-ingest/src/queue.rs b/quickwit/quickwit-ingest/src/queue.rs index 50f35a1a127..70b9cfdedb3 100644 --- a/quickwit/quickwit-ingest/src/queue.rs +++ b/quickwit/quickwit-ingest/src/queue.rs @@ -230,7 +230,7 @@ mod tests { use tokio::sync::watch; use super::Queues; - use crate::errors::IngestServiceError; + use crate::error::IngestServiceError; use crate::IngestApiService; const TEST_QUEUE_ID: &str = "my-queue"; diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 2304a99cfd6..bd6edffa8fc 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -196,7 +196,7 @@ impl DeleteTaskPipeline { let index_pipeline_id = IndexingPipelineId { index_uid: self.index_uid.clone(), node_id: "unknown".to_string(), - pipeline_uid: PipelineUid::from_u128(0u128), + pipeline_uid: PipelineUid::new(), source_id: "unknown".to_string(), }; diff --git a/quickwit/quickwit-janitor/src/error.rs b/quickwit/quickwit-janitor/src/error.rs index e81b31abd2f..3eed5dbd2fd 100644 --- a/quickwit/quickwit-janitor/src/error.rs +++ b/quickwit/quickwit-janitor/src/error.rs @@ -26,10 +26,10 @@ use thiserror::Error; #[allow(missing_docs)] #[derive(Error, Debug, Serialize, Deserialize)] pub enum JanitorError { - #[error("invalid delete query: `{0}`")] - InvalidDeleteQuery(String), #[error("internal error: `{0}`")] Internal(String), + #[error("invalid delete query: `{0}`")] + InvalidDeleteQuery(String), #[error("metastore error: `{0}`")] Metastore(#[from] MetastoreError), } @@ -37,9 +37,9 @@ pub enum JanitorError { impl ServiceError for JanitorError { fn error_code(&self) -> ServiceErrorCode { match self { - JanitorError::InvalidDeleteQuery(_) => ServiceErrorCode::BadRequest, - JanitorError::Internal(_) => ServiceErrorCode::Internal, - JanitorError::Metastore(error) => error.error_code(), + Self::Internal(_) => ServiceErrorCode::Internal, + Self::InvalidDeleteQuery(_) => ServiceErrorCode::BadRequest, + Self::Metastore(metastore_error) => metastore_error.error_code(), } } } diff --git a/quickwit/quickwit-metastore/Cargo.toml b/quickwit/quickwit-metastore/Cargo.toml index 559ef5c3f48..e183fc108e3 100644 --- a/quickwit/quickwit-metastore/Cargo.toml +++ b/quickwit/quickwit-metastore/Cargo.toml @@ -58,6 +58,7 @@ tracing-subscriber = { workspace = true } quickwit-common = { workspace = true, features = ["testsuite"] } quickwit-config = { workspace = true, features = ["testsuite"] } quickwit-doc-mapper = { workspace = true, features = ["testsuite"] } +quickwit-proto = { workspace = true, features = ["testsuite"] } quickwit-storage = { workspace = true, features = ["testsuite"] } [features] diff --git a/quickwit/quickwit-proto/src/cluster/mod.rs b/quickwit/quickwit-proto/src/cluster/mod.rs index e62bb1a1616..ea384c1154c 100644 --- a/quickwit/quickwit-proto/src/cluster/mod.rs +++ b/quickwit/quickwit-proto/src/cluster/mod.rs @@ -19,24 +19,44 @@ use thiserror; +use crate::error::{ServiceError, ServiceErrorCode}; +use crate::GrpcServiceError; + include!("../codegen/quickwit/quickwit.cluster.rs"); pub type ClusterResult = std::result::Result; #[derive(Debug, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum ClusterError { - #[error("an internal error occurred: {0}")] + #[error("internal error: {0}")] Internal(String), + #[error("request timed out: {0}")] + Timeout(String), + #[error("service unavailable: {0}")] + Unavailable(String), } -impl From for tonic::Status { - fn from(cluster_error: ClusterError) -> Self { - tonic::Status::internal(cluster_error.to_string()) +impl ServiceError for ClusterError { + fn error_code(&self) -> ServiceErrorCode { + match self { + Self::Internal(_) => ServiceErrorCode::Internal, + Self::Timeout(_) => ServiceErrorCode::Timeout, + Self::Unavailable(_) => ServiceErrorCode::Unavailable, + } } } -impl From for ClusterError { - fn from(status: tonic::Status) -> Self { - ClusterError::Internal(status.message().to_string()) +impl GrpcServiceError for ClusterError { + fn new_internal(message: String) -> Self { + Self::Internal(message) + } + + fn new_timeout(message: String) -> Self { + Self::Timeout(message) + } + + fn new_unavailable(message: String) -> Self { + Self::Unavailable(message) } } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.cluster.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.cluster.rs index 85a74768d83..ddaafef1fe4 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.cluster.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.cluster.rs @@ -486,7 +486,7 @@ where .fetch_cluster_state(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } } #[derive(Debug)] @@ -513,7 +513,7 @@ for ClusterServiceGrpcServerAdapter { .fetch_cluster_state(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } } /// Generated client implementations. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index 5c19c1df0ae..b6c4989e976 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -1519,7 +1519,7 @@ where .create_index(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn delete_index( &mut self, @@ -1529,7 +1529,7 @@ where .delete_index(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn add_source( &mut self, @@ -1539,7 +1539,7 @@ where .add_source(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn toggle_source( &mut self, @@ -1549,7 +1549,7 @@ where .toggle_source(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn delete_source( &mut self, @@ -1559,7 +1559,7 @@ where .delete_source(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn get_or_create_open_shards( &mut self, @@ -1569,7 +1569,7 @@ where .get_or_create_open_shards(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn advise_reset_shards( &mut self, @@ -1579,7 +1579,7 @@ where .advise_reset_shards(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn get_debug_state( &mut self, @@ -1589,7 +1589,7 @@ where .get_debug_state(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } } #[derive(Debug)] @@ -1616,7 +1616,7 @@ for ControlPlaneServiceGrpcServerAdapter { .create_index(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn delete_index( &self, @@ -1627,7 +1627,7 @@ for ControlPlaneServiceGrpcServerAdapter { .delete_index(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn add_source( &self, @@ -1638,7 +1638,7 @@ for ControlPlaneServiceGrpcServerAdapter { .add_source(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn toggle_source( &self, @@ -1649,7 +1649,7 @@ for ControlPlaneServiceGrpcServerAdapter { .toggle_source(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn delete_source( &self, @@ -1660,7 +1660,7 @@ for ControlPlaneServiceGrpcServerAdapter { .delete_source(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn get_or_create_open_shards( &self, @@ -1671,7 +1671,7 @@ for ControlPlaneServiceGrpcServerAdapter { .get_or_create_open_shards(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn advise_reset_shards( &self, @@ -1682,7 +1682,7 @@ for ControlPlaneServiceGrpcServerAdapter { .advise_reset_shards(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn get_debug_state( &self, @@ -1693,7 +1693,7 @@ for ControlPlaneServiceGrpcServerAdapter { .get_debug_state(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } } /// Generated client implementations. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs index 0bc5a281264..eeb22a6d866 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs @@ -460,7 +460,7 @@ where .apply_indexing_plan(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } } #[derive(Debug)] @@ -487,7 +487,7 @@ for IndexingServiceGrpcServerAdapter { .apply_indexing_plan(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } } /// Generated client implementations. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index cca100efe84..6fe0a8c4fd0 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -2147,7 +2147,7 @@ where .persist(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn open_replication_stream( &mut self, @@ -2159,9 +2159,9 @@ where .map(|response| { let streaming: tonic::Streaming<_> = response.into_inner(); let stream = quickwit_common::ServiceStream::from(streaming); - stream.map_err(|error| error.into()) + stream.map_err(crate::error::grpc_status_to_service_error) }) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn open_fetch_stream( &mut self, @@ -2173,9 +2173,9 @@ where .map(|response| { let streaming: tonic::Streaming<_> = response.into_inner(); let stream = quickwit_common::ServiceStream::from(streaming); - stream.map_err(|error| error.into()) + stream.map_err(crate::error::grpc_status_to_service_error) }) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn open_observation_stream( &mut self, @@ -2187,9 +2187,9 @@ where .map(|response| { let streaming: tonic::Streaming<_> = response.into_inner(); let stream = quickwit_common::ServiceStream::from(streaming); - stream.map_err(|error| error.into()) + stream.map_err(crate::error::grpc_status_to_service_error) }) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn init_shards( &mut self, @@ -2199,7 +2199,7 @@ where .init_shards(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn retain_shards( &mut self, @@ -2209,7 +2209,7 @@ where .retain_shards(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn truncate_shards( &mut self, @@ -2219,7 +2219,7 @@ where .truncate_shards(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn close_shards( &mut self, @@ -2229,7 +2229,7 @@ where .close_shards(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn ping( &mut self, @@ -2239,7 +2239,7 @@ where .ping(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn decommission( &mut self, @@ -2249,7 +2249,7 @@ where .decommission(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } } #[derive(Debug)] @@ -2276,7 +2276,7 @@ for IngesterServiceGrpcServerAdapter { .persist(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } type OpenReplicationStreamStream = quickwit_common::ServiceStream< tonic::Result, @@ -2292,8 +2292,10 @@ for IngesterServiceGrpcServerAdapter { quickwit_common::ServiceStream::from(streaming) }) .await - .map(|stream| tonic::Response::new(stream.map_err(|error| error.into()))) - .map_err(|error| error.into()) + .map(|stream| tonic::Response::new( + stream.map_err(crate::error::grpc_error_to_grpc_status), + )) + .map_err(crate::error::grpc_error_to_grpc_status) } type OpenFetchStreamStream = quickwit_common::ServiceStream< tonic::Result, @@ -2306,8 +2308,10 @@ for IngesterServiceGrpcServerAdapter { .clone() .open_fetch_stream(request.into_inner()) .await - .map(|stream| tonic::Response::new(stream.map_err(|error| error.into()))) - .map_err(|error| error.into()) + .map(|stream| tonic::Response::new( + stream.map_err(crate::error::grpc_error_to_grpc_status), + )) + .map_err(crate::error::grpc_error_to_grpc_status) } type OpenObservationStreamStream = quickwit_common::ServiceStream< tonic::Result, @@ -2320,8 +2324,10 @@ for IngesterServiceGrpcServerAdapter { .clone() .open_observation_stream(request.into_inner()) .await - .map(|stream| tonic::Response::new(stream.map_err(|error| error.into()))) - .map_err(|error| error.into()) + .map(|stream| tonic::Response::new( + stream.map_err(crate::error::grpc_error_to_grpc_status), + )) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn init_shards( &self, @@ -2332,7 +2338,7 @@ for IngesterServiceGrpcServerAdapter { .init_shards(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn retain_shards( &self, @@ -2343,7 +2349,7 @@ for IngesterServiceGrpcServerAdapter { .retain_shards(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn truncate_shards( &self, @@ -2354,7 +2360,7 @@ for IngesterServiceGrpcServerAdapter { .truncate_shards(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn close_shards( &self, @@ -2365,7 +2371,7 @@ for IngesterServiceGrpcServerAdapter { .close_shards(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn ping( &self, @@ -2376,7 +2382,7 @@ for IngesterServiceGrpcServerAdapter { .ping(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn decommission( &self, @@ -2387,7 +2393,7 @@ for IngesterServiceGrpcServerAdapter { .decommission(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } } /// Generated client implementations. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs index 422ab658274..9161c8531d8 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs @@ -549,7 +549,7 @@ where .ingest(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } } #[derive(Debug)] @@ -576,7 +576,7 @@ for IngestRouterServiceGrpcServerAdapter { .ingest(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } } /// Generated client implementations. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index e260b16dd6a..3540b8c0292 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -4376,7 +4376,7 @@ where .create_index(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn index_metadata( &mut self, @@ -4386,7 +4386,7 @@ where .index_metadata(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn list_indexes_metadata( &mut self, @@ -4396,7 +4396,7 @@ where .list_indexes_metadata(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn delete_index( &mut self, @@ -4406,7 +4406,7 @@ where .delete_index(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn list_splits( &mut self, @@ -4418,9 +4418,9 @@ where .map(|response| { let streaming: tonic::Streaming<_> = response.into_inner(); let stream = quickwit_common::ServiceStream::from(streaming); - stream.map_err(|error| error.into()) + stream.map_err(crate::error::grpc_status_to_service_error) }) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn stage_splits( &mut self, @@ -4430,7 +4430,7 @@ where .stage_splits(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn publish_splits( &mut self, @@ -4440,7 +4440,7 @@ where .publish_splits(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn mark_splits_for_deletion( &mut self, @@ -4450,7 +4450,7 @@ where .mark_splits_for_deletion(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn delete_splits( &mut self, @@ -4460,7 +4460,7 @@ where .delete_splits(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn add_source( &mut self, @@ -4470,7 +4470,7 @@ where .add_source(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn toggle_source( &mut self, @@ -4480,7 +4480,7 @@ where .toggle_source(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn delete_source( &mut self, @@ -4490,7 +4490,7 @@ where .delete_source(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn reset_source_checkpoint( &mut self, @@ -4500,7 +4500,7 @@ where .reset_source_checkpoint(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn last_delete_opstamp( &mut self, @@ -4510,7 +4510,7 @@ where .last_delete_opstamp(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn create_delete_task( &mut self, @@ -4520,7 +4520,7 @@ where .create_delete_task(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn update_splits_delete_opstamp( &mut self, @@ -4530,7 +4530,7 @@ where .update_splits_delete_opstamp(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn list_delete_tasks( &mut self, @@ -4540,7 +4540,7 @@ where .list_delete_tasks(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn list_stale_splits( &mut self, @@ -4550,7 +4550,7 @@ where .list_stale_splits(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn open_shards( &mut self, @@ -4560,7 +4560,7 @@ where .open_shards(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn acquire_shards( &mut self, @@ -4570,7 +4570,7 @@ where .acquire_shards(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn delete_shards( &mut self, @@ -4580,7 +4580,7 @@ where .delete_shards(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn list_shards( &mut self, @@ -4590,7 +4590,7 @@ where .list_shards(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn create_index_template( &mut self, @@ -4600,7 +4600,7 @@ where .create_index_template(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn get_index_template( &mut self, @@ -4610,7 +4610,7 @@ where .get_index_template(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn find_index_template_matches( &mut self, @@ -4620,7 +4620,7 @@ where .find_index_template_matches(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn list_index_templates( &mut self, @@ -4630,7 +4630,7 @@ where .list_index_templates(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn delete_index_templates( &mut self, @@ -4640,7 +4640,7 @@ where .delete_index_templates(request) .await .map(|response| response.into_inner()) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_status_to_service_error) } async fn check_connectivity(&mut self) -> anyhow::Result<()> { if self.connection_addrs_rx.borrow().len() == 0 { @@ -4682,7 +4682,7 @@ for MetastoreServiceGrpcServerAdapter { .create_index(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn index_metadata( &self, @@ -4693,7 +4693,7 @@ for MetastoreServiceGrpcServerAdapter { .index_metadata(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn list_indexes_metadata( &self, @@ -4704,7 +4704,7 @@ for MetastoreServiceGrpcServerAdapter { .list_indexes_metadata(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn delete_index( &self, @@ -4715,7 +4715,7 @@ for MetastoreServiceGrpcServerAdapter { .delete_index(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } type ListSplitsStream = quickwit_common::ServiceStream< tonic::Result, @@ -4728,8 +4728,10 @@ for MetastoreServiceGrpcServerAdapter { .clone() .list_splits(request.into_inner()) .await - .map(|stream| tonic::Response::new(stream.map_err(|error| error.into()))) - .map_err(|error| error.into()) + .map(|stream| tonic::Response::new( + stream.map_err(crate::error::grpc_error_to_grpc_status), + )) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn stage_splits( &self, @@ -4740,7 +4742,7 @@ for MetastoreServiceGrpcServerAdapter { .stage_splits(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn publish_splits( &self, @@ -4751,7 +4753,7 @@ for MetastoreServiceGrpcServerAdapter { .publish_splits(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn mark_splits_for_deletion( &self, @@ -4762,7 +4764,7 @@ for MetastoreServiceGrpcServerAdapter { .mark_splits_for_deletion(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn delete_splits( &self, @@ -4773,7 +4775,7 @@ for MetastoreServiceGrpcServerAdapter { .delete_splits(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn add_source( &self, @@ -4784,7 +4786,7 @@ for MetastoreServiceGrpcServerAdapter { .add_source(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn toggle_source( &self, @@ -4795,7 +4797,7 @@ for MetastoreServiceGrpcServerAdapter { .toggle_source(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn delete_source( &self, @@ -4806,7 +4808,7 @@ for MetastoreServiceGrpcServerAdapter { .delete_source(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn reset_source_checkpoint( &self, @@ -4817,7 +4819,7 @@ for MetastoreServiceGrpcServerAdapter { .reset_source_checkpoint(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn last_delete_opstamp( &self, @@ -4828,7 +4830,7 @@ for MetastoreServiceGrpcServerAdapter { .last_delete_opstamp(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn create_delete_task( &self, @@ -4839,7 +4841,7 @@ for MetastoreServiceGrpcServerAdapter { .create_delete_task(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn update_splits_delete_opstamp( &self, @@ -4850,7 +4852,7 @@ for MetastoreServiceGrpcServerAdapter { .update_splits_delete_opstamp(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn list_delete_tasks( &self, @@ -4861,7 +4863,7 @@ for MetastoreServiceGrpcServerAdapter { .list_delete_tasks(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn list_stale_splits( &self, @@ -4872,7 +4874,7 @@ for MetastoreServiceGrpcServerAdapter { .list_stale_splits(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn open_shards( &self, @@ -4883,7 +4885,7 @@ for MetastoreServiceGrpcServerAdapter { .open_shards(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn acquire_shards( &self, @@ -4894,7 +4896,7 @@ for MetastoreServiceGrpcServerAdapter { .acquire_shards(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn delete_shards( &self, @@ -4905,7 +4907,7 @@ for MetastoreServiceGrpcServerAdapter { .delete_shards(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn list_shards( &self, @@ -4916,7 +4918,7 @@ for MetastoreServiceGrpcServerAdapter { .list_shards(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn create_index_template( &self, @@ -4927,7 +4929,7 @@ for MetastoreServiceGrpcServerAdapter { .create_index_template(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn get_index_template( &self, @@ -4938,7 +4940,7 @@ for MetastoreServiceGrpcServerAdapter { .get_index_template(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn find_index_template_matches( &self, @@ -4949,7 +4951,7 @@ for MetastoreServiceGrpcServerAdapter { .find_index_template_matches(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn list_index_templates( &self, @@ -4960,7 +4962,7 @@ for MetastoreServiceGrpcServerAdapter { .list_index_templates(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } async fn delete_index_templates( &self, @@ -4971,7 +4973,7 @@ for MetastoreServiceGrpcServerAdapter { .delete_index_templates(request.into_inner()) .await .map(tonic::Response::new) - .map_err(|error| error.into()) + .map_err(crate::error::grpc_error_to_grpc_status) } } /// Generated client implementations. diff --git a/quickwit/quickwit-proto/src/control_plane/mod.rs b/quickwit/quickwit-proto/src/control_plane/mod.rs index b77ecaf851e..5b99de03ba4 100644 --- a/quickwit/quickwit-proto/src/control_plane/mod.rs +++ b/quickwit/quickwit-proto/src/control_plane/mod.rs @@ -22,25 +22,47 @@ use quickwit_common::tower::RpcName; use thiserror; use crate::metastore::MetastoreError; -use crate::{ServiceError, ServiceErrorCode}; +use crate::{GrpcServiceError, ServiceError, ServiceErrorCode}; include!("../codegen/quickwit/quickwit.control_plane.rs"); pub type ControlPlaneResult = std::result::Result; #[derive(Debug, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum ControlPlaneError { - #[error("an internal error occurred: {0}")] + #[error("internal error: {0}")] Internal(String), - #[error("a metastore error occurred: {0}")] + #[error("metastore error: {0}")] Metastore(#[from] MetastoreError), - #[error("control plane is unavailable: {0}")] + #[error("request timed out: {0}")] + Timeout(String), + #[error("service unavailable: {0}")] Unavailable(String), } -impl ControlPlaneError { - pub fn label_value(&self) -> &'static str { - "error" +impl ServiceError for ControlPlaneError { + fn error_code(&self) -> ServiceErrorCode { + match self { + Self::Internal(_) => ServiceErrorCode::Internal, + Self::Metastore(metastore_error) => metastore_error.error_code(), + Self::Timeout(_) => ServiceErrorCode::Timeout, + Self::Unavailable(_) => ServiceErrorCode::Unavailable, + } + } +} + +impl GrpcServiceError for ControlPlaneError { + fn new_internal(message: String) -> Self { + Self::Internal(message) + } + + fn new_timeout(message: String) -> Self { + Self::Timeout(message) + } + + fn new_unavailable(message: String) -> Self { + Self::Unavailable(message) } } @@ -52,51 +74,22 @@ impl From for MetastoreError { cause: message, }, ControlPlaneError::Metastore(error) => error, + ControlPlaneError::Timeout(message) => MetastoreError::Timeout(message), ControlPlaneError::Unavailable(message) => MetastoreError::Unavailable(message), } } } -impl From for tonic::Status { - fn from(control_plane_error: ControlPlaneError) -> Self { - let grpc_status_code = control_plane_error.error_code().to_grpc_status_code(); - let message_json = serde_json::to_string(&control_plane_error) - .unwrap_or_else(|_| format!("original control plane error: {control_plane_error}")); - tonic::Status::new(grpc_status_code, message_json) - } -} - -impl From for ControlPlaneError { - fn from(status: tonic::Status) -> Self { - serde_json::from_str(status.message()).unwrap_or_else(|_| { - ControlPlaneError::Internal(format!( - "failed to deserialize control plane error: `{}`", - status.message() - )) - }) - } -} - impl From> for ControlPlaneError { fn from(error: AskError) -> Self { match error { AskError::ErrorReply(error) => error, AskError::MessageNotDelivered => { - ControlPlaneError::Unavailable("request not delivered".to_string()) + Self::new_unavailable("request could not be delivered to actor".to_string()) + } + AskError::ProcessMessageError => { + Self::new_internal("an error occurred while processing the request".to_string()) } - AskError::ProcessMessageError => ControlPlaneError::Internal( - "an error occurred while processing the request".to_string(), - ), - } - } -} - -impl ServiceError for ControlPlaneError { - fn error_code(&self) -> ServiceErrorCode { - match self { - Self::Internal { .. } => ServiceErrorCode::Internal, - Self::Metastore(error) => error.error_code(), - Self::Unavailable(_) => ServiceErrorCode::Unavailable, } } } diff --git a/quickwit/quickwit-proto/src/error.rs b/quickwit/quickwit-proto/src/error.rs index 1c23d4302ec..4eff3ca1573 100644 --- a/quickwit/quickwit-proto/src/error.rs +++ b/quickwit/quickwit-proto/src/error.rs @@ -18,64 +18,68 @@ // along with this program. If not, see . use std::convert::Infallible; +use std::error::Error; +use std::fmt::Debug; -/// This enum serves as a Rosetta Stone of -/// gRPC and HTTP status code. +use anyhow::Context; +use quickwit_actors::AskError; +use serde::de::DeserializeOwned; +use serde::Serialize; +use tonic::metadata::BinaryMetadataValue; +use tracing::{error, warn}; + +const QW_ERROR_HEADER_NAME: &str = "qw-error-bin"; + +/// This enum maps our internal error codes to +/// gRPC and HTTP status codes. /// -/// It is voluntarily a restricted subset. +/// It is voluntarily a restricted subset of gRPC status codes. Please introduce new variants +/// thoughtfully. #[derive(Clone, Copy)] pub enum ServiceErrorCode { AlreadyExists, BadRequest, + // Use `Unauthenticated` if the caller cannot be identified. + Forbidden, Internal, - MethodNotAllowed, NotFound, - // Used for APIs that are available in Elasticsearch but not available yet in Quickwit. - NotSupportedYet, - RateLimited, Timeout, + TooManyRequests, + Unauthenticated, Unavailable, - UnsupportedMediaType, } impl ServiceErrorCode { - pub fn to_grpc_status_code(self) -> tonic::Code { + fn grpc_status_code(&self) -> tonic::Code { match self { - ServiceErrorCode::AlreadyExists => tonic::Code::AlreadyExists, - ServiceErrorCode::BadRequest => tonic::Code::InvalidArgument, - ServiceErrorCode::Internal => tonic::Code::Internal, - ServiceErrorCode::MethodNotAllowed => tonic::Code::InvalidArgument, - ServiceErrorCode::NotFound => tonic::Code::NotFound, - ServiceErrorCode::NotSupportedYet => tonic::Code::Unimplemented, - ServiceErrorCode::RateLimited => tonic::Code::ResourceExhausted, - ServiceErrorCode::Timeout => tonic::Code::DeadlineExceeded, - ServiceErrorCode::Unavailable => tonic::Code::Unavailable, - ServiceErrorCode::UnsupportedMediaType => tonic::Code::InvalidArgument, + Self::AlreadyExists => tonic::Code::AlreadyExists, + Self::BadRequest => tonic::Code::InvalidArgument, + Self::Forbidden => tonic::Code::PermissionDenied, + Self::Internal => tonic::Code::Internal, + Self::NotFound => tonic::Code::NotFound, + Self::Timeout => tonic::Code::DeadlineExceeded, + Self::TooManyRequests => tonic::Code::ResourceExhausted, + Self::Unauthenticated => tonic::Code::Unauthenticated, + Self::Unavailable => tonic::Code::Unavailable, } } - pub fn to_http_status_code(self) -> http::StatusCode { + + pub fn http_status_code(&self) -> http::StatusCode { match self { - ServiceErrorCode::AlreadyExists => http::StatusCode::BAD_REQUEST, - ServiceErrorCode::BadRequest => http::StatusCode::BAD_REQUEST, - ServiceErrorCode::Internal => http::StatusCode::INTERNAL_SERVER_ERROR, - ServiceErrorCode::MethodNotAllowed => http::StatusCode::METHOD_NOT_ALLOWED, - ServiceErrorCode::NotFound => http::StatusCode::NOT_FOUND, - ServiceErrorCode::NotSupportedYet => http::StatusCode::NOT_IMPLEMENTED, - ServiceErrorCode::RateLimited => http::StatusCode::TOO_MANY_REQUESTS, - ServiceErrorCode::Unavailable => http::StatusCode::SERVICE_UNAVAILABLE, - ServiceErrorCode::UnsupportedMediaType => http::StatusCode::UNSUPPORTED_MEDIA_TYPE, - ServiceErrorCode::Timeout => http::StatusCode::REQUEST_TIMEOUT, + Self::AlreadyExists => http::StatusCode::BAD_REQUEST, + Self::BadRequest => http::StatusCode::BAD_REQUEST, + Self::Forbidden => http::StatusCode::FORBIDDEN, + Self::Internal => http::StatusCode::INTERNAL_SERVER_ERROR, + Self::NotFound => http::StatusCode::NOT_FOUND, + Self::Timeout => http::StatusCode::REQUEST_TIMEOUT, + Self::TooManyRequests => http::StatusCode::TOO_MANY_REQUESTS, + Self::Unauthenticated => http::StatusCode::UNAUTHORIZED, + Self::Unavailable => http::StatusCode::SERVICE_UNAVAILABLE, } } } -pub trait ServiceError: ToString { - fn grpc_error(&self) -> tonic::Status { - let grpc_code = self.error_code().to_grpc_status_code(); - let error_msg = self.to_string(); - tonic::Status::new(grpc_code, error_msg) - } - +pub trait ServiceError: Error + Debug + 'static { fn error_code(&self) -> ServiceErrorCode; } @@ -85,9 +89,159 @@ impl ServiceError for Infallible { } } -pub fn convert_to_grpc_result( - res: Result, -) -> Result, tonic::Status> { - res.map(tonic::Response::new) - .map_err(|error| error.grpc_error()) +impl ServiceError for AskError +where E: ServiceError +{ + fn error_code(&self) -> ServiceErrorCode { + match self { + AskError::ErrorReply(error) => error.error_code(), + AskError::MessageNotDelivered => ServiceErrorCode::Unavailable, + AskError::ProcessMessageError => ServiceErrorCode::Internal, + } + } +} + +/// A trait for encoding/decoding service errors to/from gRPC statuses. Errors are stored in JSON +/// in the gRPC header [`QW_ERROR_HEADER_NAME`]. This allows for propagating them transparently +/// between clients and servers over the network without being semantically limited to a status code +/// and a message. However, it also means that modifying the serialization format of existing errors +/// or introducing new ones is not backward compatible. +pub trait GrpcServiceError: ServiceError + Serialize + DeserializeOwned + Send + Sync { + fn into_grpc_status(self) -> tonic::Status { + grpc_error_to_grpc_status(self) + } + + fn new_internal(message: String) -> Self; + + fn new_timeout(message: String) -> Self; + + fn new_unavailable(message: String) -> Self; +} + +/// Converts a service error into a gRPC status. +pub fn grpc_error_to_grpc_status(service_error: E) -> tonic::Status +where E: GrpcServiceError { + let code = service_error.error_code().grpc_status_code(); + let message = service_error.to_string(); + let mut status = tonic::Status::new(code, message); + + match encode_error(&service_error) { + Ok(header_value) => { + status + .metadata_mut() + .insert_bin(QW_ERROR_HEADER_NAME, header_value); + } + Err(error) => { + warn!(%error, "failed to encode error `{service_error:?}`"); + } + } + status +} + +/// Converts a gRPC status into a service error. +pub fn grpc_status_to_service_error(status: tonic::Status) -> E +where E: GrpcServiceError { + if let Some(header_value) = status.metadata().get_bin(QW_ERROR_HEADER_NAME) { + let service_error = match decode_error(header_value) { + Ok(service_error) => service_error, + Err(error) => { + let message = format!( + "failed to deserialize error returned from server (this can happen during \ + rolling upgrades): {error}" + ); + E::new_internal(message) + } + }; + return service_error; + } + if let Some(source) = status.source() { + error!(error = %source, "transport error"); + } + let message = status.message().to_string(); + + match status.code() { + // `Cancelled` is a client timeout whereas `DeadlineExceeded` is a server timeout. At this + // stage, we don't distinguish them. + tonic::Code::Cancelled | tonic::Code::DeadlineExceeded => E::new_timeout(message), + tonic::Code::Unavailable => E::new_unavailable(message), + _ => E::new_internal(message), + } +} + +/// Encodes a service error into a gRPC header value. +fn encode_error(service_error: &E) -> anyhow::Result { + let service_error_json = serde_json::to_vec(&service_error)?; + let header_value = BinaryMetadataValue::from_bytes(&service_error_json); + Ok(header_value) +} + +/// Decodes a service error from a gRPC header value. +fn decode_error(header_value: &BinaryMetadataValue) -> anyhow::Result { + let service_error_json = header_value.to_bytes().context("invalid header value")?; + let service_error = serde_json::from_slice(&service_error_json).with_context(|| { + if let Ok(service_error_json_str) = std::str::from_utf8(&service_error_json) { + format!("invalid JSON `{service_error_json_str}`") + } else { + "invalid JSON".to_string() + } + })?; + Ok(service_error) +} + +pub fn convert_to_grpc_result( + result: Result, +) -> tonic::Result> { + result + .map(tonic::Response::new) + .map_err(|error| error.into_grpc_status()) +} + +#[cfg(test)] +mod tests { + use serde::Deserialize; + + use super::*; + + #[test] + fn test_grpc_service_error_roundtrip() { + #[derive(Clone, Debug, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)] + #[serde(rename_all = "snake_case")] + enum MyError { + #[error("internal error: {0}")] + Internal(String), + #[error("request timed out: {0}")] + Timeout(String), + #[error("service unavailable: {0}")] + Unavailable(String), + } + + impl ServiceError for MyError { + fn error_code(&self) -> ServiceErrorCode { + match self { + MyError::Internal(_) => ServiceErrorCode::Internal, + MyError::Timeout(_) => ServiceErrorCode::Timeout, + MyError::Unavailable(_) => ServiceErrorCode::Unavailable, + } + } + } + + impl GrpcServiceError for MyError { + fn new_internal(message: String) -> Self { + MyError::Internal(message) + } + + fn new_timeout(message: String) -> Self { + MyError::Timeout(message) + } + + fn new_unavailable(message: String) -> Self { + MyError::Unavailable(message) + } + } + + let service_error = MyError::new_internal("test".to_string()); + let status = grpc_error_to_grpc_status(service_error.clone()); + let expected_error: MyError = grpc_status_to_service_error(status); + assert_eq!(service_error, expected_error); + } } diff --git a/quickwit/quickwit-proto/src/indexing/mod.rs b/quickwit/quickwit-proto/src/indexing/mod.rs index 4af856eb3ad..c400e53ba8e 100644 --- a/quickwit/quickwit-proto/src/indexing/mod.rs +++ b/quickwit/quickwit-proto/src/indexing/mod.rs @@ -17,124 +17,60 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::fmt; use std::fmt::{Display, Formatter}; use std::hash::Hash; use std::ops::{Add, Mul, Sub}; -use std::{fmt, io}; -use anyhow::anyhow; use quickwit_actors::AskError; use quickwit_common::pubsub::Event; use quickwit_common::tower::RpcName; use serde::{Deserialize, Serialize}; use thiserror; +use crate::metastore::MetastoreError; use crate::types::{IndexUid, PipelineUid, Position, ShardId, SourceId, SourceUid}; -use crate::{ServiceError, ServiceErrorCode}; +use crate::{GrpcServiceError, ServiceError, ServiceErrorCode}; include!("../codegen/quickwit/quickwit.indexing.rs"); pub type IndexingResult = std::result::Result; -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum IndexingError { - #[error("indexing pipeline `{pipeline_uid}` does not exist")] - MissingPipeline { pipeline_uid: PipelineUid }, - #[error("indexing merge pipeline `{merge_pipeline_id}` does not exist")] - MissingMergePipeline { merge_pipeline_id: String }, - #[error( - "pipeline #{pipeline_uid} for index `{index_id}` and source `{source_id}` already exists" - )] - PipelineAlreadyExists { - index_id: String, - source_id: SourceId, - pipeline_uid: PipelineUid, - }, - #[error("I/O error `{0}`")] - Io(io::Error), - #[error("invalid params `{0}`")] - InvalidParams(anyhow::Error), - #[error("Spanw pipelines errors `{pipeline_ids:?}`")] - SpawnPipelinesError { - pipeline_ids: Vec, - }, - #[error("a metastore error occurred: {0}")] - MetastoreError(String), - #[error("a storage resolver error occurred: {0}")] - StorageResolverError(String), - #[error("an internal error occurred: {0}")] + #[error("internal error: {0}")] Internal(String), - #[error("indexing service is unavailable")] - Unavailable, + #[error("metastore error: {0}")] + Metastore(#[from] MetastoreError), + #[error("request timed out: {0}")] + Timeout(String), + #[error("service unavailable: {0}")] + Unavailable(String), } -impl From for tonic::Status { - fn from(error: IndexingError) -> Self { - match error { - IndexingError::MissingPipeline { pipeline_uid } => { - tonic::Status::not_found(format!("missing pipeline `{pipeline_uid}`")) - } - IndexingError::MissingMergePipeline { merge_pipeline_id } => { - tonic::Status::not_found(format!("missing merge pipeline `{merge_pipeline_id}`")) - } - IndexingError::PipelineAlreadyExists { - index_id, - source_id, - pipeline_uid, - } => tonic::Status::already_exists(format!( - "pipeline {index_id}/{source_id} {pipeline_uid} already exists " - )), - IndexingError::Io(error) => tonic::Status::internal(error.to_string()), - IndexingError::InvalidParams(error) => { - tonic::Status::invalid_argument(error.to_string()) - } - IndexingError::SpawnPipelinesError { pipeline_ids } => { - tonic::Status::internal(format!("error spawning pipelines {:?}", pipeline_ids)) - } - IndexingError::Internal(string) => tonic::Status::internal(string), - IndexingError::MetastoreError(string) => tonic::Status::internal(string), - IndexingError::StorageResolverError(string) => tonic::Status::internal(string), - IndexingError::Unavailable => { - tonic::Status::unavailable("indexing service is unavailable") - } +impl ServiceError for IndexingError { + fn error_code(&self) -> ServiceErrorCode { + match self { + Self::Internal(_) => ServiceErrorCode::Internal, + Self::Metastore(metastore_error) => metastore_error.error_code(), + Self::Timeout(_) => ServiceErrorCode::Timeout, + Self::Unavailable(_) => ServiceErrorCode::Unavailable, } } } -impl From for IndexingError { - fn from(status: tonic::Status) -> Self { - match status.code() { - tonic::Code::InvalidArgument => { - IndexingError::InvalidParams(anyhow!(status.message().to_string())) - } - tonic::Code::NotFound => IndexingError::MissingPipeline { - pipeline_uid: PipelineUid::default(), - }, - tonic::Code::AlreadyExists => IndexingError::PipelineAlreadyExists { - index_id: "".to_string(), - source_id: "".to_string(), - pipeline_uid: PipelineUid::default(), - }, - tonic::Code::Unavailable => IndexingError::Unavailable, - _ => IndexingError::InvalidParams(anyhow!(status.message().to_string())), - } +impl GrpcServiceError for IndexingError { + fn new_internal(message: String) -> Self { + Self::Internal(message) } -} -impl ServiceError for IndexingError { - fn error_code(&self) -> ServiceErrorCode { - match self { - Self::MissingPipeline { .. } => ServiceErrorCode::NotFound, - Self::MissingMergePipeline { .. } => ServiceErrorCode::NotFound, - Self::PipelineAlreadyExists { .. } => ServiceErrorCode::BadRequest, - Self::InvalidParams(_) => ServiceErrorCode::BadRequest, - Self::SpawnPipelinesError { .. } => ServiceErrorCode::Internal, - Self::Io(_) => ServiceErrorCode::Internal, - Self::Internal(_) => ServiceErrorCode::Internal, - Self::MetastoreError(_) => ServiceErrorCode::Internal, - Self::StorageResolverError(_) => ServiceErrorCode::Internal, - Self::Unavailable => ServiceErrorCode::Unavailable, - } + fn new_timeout(message: String) -> Self { + Self::Timeout(message) + } + + fn new_unavailable(message: String) -> Self { + Self::Unavailable(message) } } @@ -142,10 +78,12 @@ impl From> for IndexingError { fn from(error: AskError) -> Self { match error { AskError::ErrorReply(error) => error, - AskError::MessageNotDelivered => IndexingError::Unavailable, - AskError::ProcessMessageError => IndexingError::Internal( - "an error occurred while processing the request".to_string(), - ), + AskError::MessageNotDelivered => { + Self::new_unavailable("request could not be delivered to actor".to_string()) + } + AskError::ProcessMessageError => { + Self::new_internal("an error occurred while processing the request".to_string()) + } } } } diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index 4ba02ac4d14..bba1e82500e 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -22,9 +22,9 @@ use bytes::Bytes; use self::ingester::{PersistFailureReason, ReplicateFailureReason}; use self::router::IngestFailureReason; use super::types::NodeId; -use super::{ServiceError, ServiceErrorCode}; -use crate::control_plane::ControlPlaneError; +use super::GrpcServiceError; use crate::types::{queue_id, IndexUid, Position, QueueId, ShardId}; +use crate::{ServiceError, ServiceErrorCode}; pub mod ingester; pub mod router; @@ -33,79 +33,48 @@ include!("../codegen/quickwit/quickwit.ingest.rs"); pub type IngestV2Result = std::result::Result; -#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)] +#[derive(Debug, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum IngestV2Error { - #[error("an internal error occurred: {0}")] - Internal(String), - /// Emitted when an ingester was not available for a given operation, - /// either directly or through replication. + // TODO: Get rid of this variant. #[error("failed to connect to ingester `{ingester_id}`")] IngesterUnavailable { ingester_id: NodeId }, + #[error("internal error: {0}")] + Internal(String), #[error("shard `{shard_id}` not found")] ShardNotFound { shard_id: ShardId }, - #[error("request timed out")] - Timeout, - // This error is provoked by semaphore located on the router. + #[error("request timed out: {0}")] + Timeout(String), #[error("too many requests")] TooManyRequests, - // TODO: Merge `Transport` and `IngesterUnavailable` into a single `Unavailable` error. - #[error("transport error: {0}")] - Transport(String), + #[error("service unavailable: {0}")] + Unavailable(String), } -impl IngestV2Error { - pub fn label_value(&self) -> &'static str { +impl ServiceError for IngestV2Error { + fn error_code(&self) -> ServiceErrorCode { match self { - Self::Timeout { .. } => "timeout", - _ => "error", + Self::IngesterUnavailable { .. } => ServiceErrorCode::Unavailable, + Self::Internal(_) => ServiceErrorCode::Internal, + Self::ShardNotFound { .. } => ServiceErrorCode::NotFound, + Self::Timeout(_) => ServiceErrorCode::Timeout, + Self::TooManyRequests => ServiceErrorCode::TooManyRequests, + Self::Unavailable(_) => ServiceErrorCode::Unavailable, } } } -impl From for IngestV2Error { - fn from(error: ControlPlaneError) -> Self { - Self::Internal(error.to_string()) +impl GrpcServiceError for IngestV2Error { + fn new_internal(message: String) -> Self { + Self::Internal(message) } -} -impl From for tonic::Status { - fn from(error: IngestV2Error) -> tonic::Status { - let code = match &error { - IngestV2Error::IngesterUnavailable { .. } => tonic::Code::Unavailable, - IngestV2Error::Internal(_) => tonic::Code::Internal, - IngestV2Error::ShardNotFound { .. } => tonic::Code::NotFound, - IngestV2Error::Timeout { .. } => tonic::Code::DeadlineExceeded, - IngestV2Error::TooManyRequests => tonic::Code::ResourceExhausted, - IngestV2Error::Transport { .. } => tonic::Code::Unavailable, - }; - let error_json: String = serde_json::to_string(&error).unwrap(); - tonic::Status::new(code, error_json) - } -} - -impl From for IngestV2Error { - fn from(status: tonic::Status) -> Self { - if let Ok(error_from_json) = serde_json::from_str(status.message()) { - return error_from_json; - } - match status.code() { - tonic::Code::Unavailable => IngestV2Error::Transport(status.message().to_string()), - tonic::Code::ResourceExhausted => IngestV2Error::TooManyRequests, - _ => IngestV2Error::Internal(status.message().to_string()), - } + fn new_timeout(message: String) -> Self { + Self::Timeout(message) } -} -impl ServiceError for IngestV2Error { - fn error_code(&self) -> ServiceErrorCode { - match self { - Self::IngesterUnavailable { .. } => ServiceErrorCode::Unavailable, - Self::Internal { .. } => ServiceErrorCode::Internal, - Self::ShardNotFound { .. } => ServiceErrorCode::NotFound, - Self::Timeout { .. } => ServiceErrorCode::Timeout, - Self::Transport { .. } => ServiceErrorCode::Unavailable, - Self::TooManyRequests => ServiceErrorCode::RateLimited, - } + fn new_unavailable(message: String) -> Self { + Self::Unavailable(message) } } @@ -350,15 +319,4 @@ mod tests { assert!(ShardState::from_json_str_name("unknown").is_none()); } - - #[test] - fn test_ingest_v2_error_grpc_conversion() { - let ingester_id = NodeId::from("test-ingester"); - let error: IngestV2Error = IngestV2Error::IngesterUnavailable { ingester_id }; - let grpc_status: tonic::Status = error.into(); - let error_serdeser = IngestV2Error::from(grpc_status); - assert!( - matches!(error_serdeser, IngestV2Error::IngesterUnavailable { ingester_id } if ingester_id == "test-ingester") - ); - } } diff --git a/quickwit/quickwit-proto/src/lib.rs b/quickwit/quickwit-proto/src/lib.rs index 36b3cf96d0a..19f084763b9 100644 --- a/quickwit/quickwit-proto/src/lib.rs +++ b/quickwit/quickwit-proto/src/lib.rs @@ -22,7 +22,6 @@ #![allow(rustdoc::invalid_html_tags)] use std::cmp::Ordering; -use std::fmt; use ::opentelemetry::global; use ::opentelemetry::propagation::{Extractor, Injector}; @@ -42,7 +41,7 @@ pub mod metastore; pub mod search; pub mod types; -pub use error::{ServiceError, ServiceErrorCode}; +pub use error::{GrpcServiceError, ServiceError, ServiceErrorCode}; use crate::search::ReportSplitsRequest; @@ -67,6 +66,7 @@ pub mod opentelemetry { include!("codegen/opentelemetry/opentelemetry.proto.collector.logs.v1.rs"); } } + // One can dream. // pub mod metrics { // pub mod v1 { // include!("codegen/opentelemetry/opentelemetry.proto.collector.metrics.v1.rs" @@ -221,16 +221,6 @@ pub fn set_parent_span_from_request_metadata(request_metadata: &tonic::metadata: Span::current().set_parent(parent_cx); } -impl ServiceError for quickwit_actors::AskError { - fn error_code(&self) -> ServiceErrorCode { - match self { - quickwit_actors::AskError::MessageNotDelivered => ServiceErrorCode::Internal, - quickwit_actors::AskError::ProcessMessageError => ServiceErrorCode::Internal, - quickwit_actors::AskError::ErrorReply(err) => err.error_code(), - } - } -} - impl search::SortOrder { #[inline(always)] pub fn compare_opt(&self, this: &Option, other: &Option) -> Ordering { diff --git a/quickwit/quickwit-proto/src/metastore/mod.rs b/quickwit/quickwit-proto/src/metastore/mod.rs index 59f7b445698..16f9a420c76 100644 --- a/quickwit/quickwit-proto/src/metastore/mod.rs +++ b/quickwit/quickwit-proto/src/metastore/mod.rs @@ -23,7 +23,7 @@ use quickwit_common::retry::Retryable; use serde::{Deserialize, Serialize}; use crate::types::{IndexId, IndexUid, QueueId, ShardId, SourceId, SplitId}; -use crate::{ServiceError, ServiceErrorCode}; +use crate::{GrpcServiceError, ServiceError, ServiceErrorCode}; pub mod events; @@ -33,6 +33,7 @@ pub type MetastoreResult = Result; /// Lists the object types stored and managed by the metastore. #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum EntityKind { /// A checkpoint delta. CheckpointDelta { @@ -141,10 +142,13 @@ pub enum MetastoreError { message: String, }, - #[error("{0} do(es) not exist")] + #[error("{0} not found")] NotFound(EntityKind), - #[error("metastore unavailable: {0}")] + #[error("request timed out: {0}")] + Timeout(String), + + #[error("service unavailable: {0}")] Unavailable(String), } @@ -157,51 +161,48 @@ impl From for MetastoreError { } } -impl From for MetastoreError { - fn from(status: tonic::Status) -> Self { - serde_json::from_str(status.message()).unwrap_or_else(|_| MetastoreError::Internal { - message: "failed to deserialize metastore error".to_string(), - cause: status.message().to_string(), - }) - } -} - -impl From for tonic::Status { - fn from(metastore_error: MetastoreError) -> Self { - let grpc_status_code = metastore_error.error_code().to_grpc_status_code(); - let message_json = serde_json::to_string(&metastore_error) - .unwrap_or_else(|_| format!("original metastore error: {metastore_error}")); - tonic::Status::new(grpc_status_code, message_json) - } -} - impl ServiceError for MetastoreError { fn error_code(&self) -> ServiceErrorCode { match self { - Self::AlreadyExists { .. } => ServiceErrorCode::AlreadyExists, + Self::AlreadyExists(_) => ServiceErrorCode::AlreadyExists, Self::Connection { .. } => ServiceErrorCode::Internal, Self::Db { .. } => ServiceErrorCode::Internal, Self::FailedPrecondition { .. } => ServiceErrorCode::BadRequest, - Self::Forbidden { .. } => ServiceErrorCode::MethodNotAllowed, + Self::Forbidden { .. } => ServiceErrorCode::Forbidden, Self::Internal { .. } => ServiceErrorCode::Internal, Self::InvalidArgument { .. } => ServiceErrorCode::BadRequest, Self::Io { .. } => ServiceErrorCode::Internal, Self::JsonDeserializeError { .. } => ServiceErrorCode::Internal, Self::JsonSerializeError { .. } => ServiceErrorCode::Internal, - Self::NotFound { .. } => ServiceErrorCode::NotFound, + Self::NotFound(_) => ServiceErrorCode::NotFound, + Self::Timeout(_) => ServiceErrorCode::Timeout, Self::Unavailable(_) => ServiceErrorCode::Unavailable, } } } +impl GrpcServiceError for MetastoreError { + fn new_internal(message: String) -> Self { + Self::Internal { + message, + cause: "".to_string(), + } + } + + fn new_timeout(message: String) -> Self { + Self::Timeout(message) + } + + fn new_unavailable(message: String) -> Self { + Self::Unavailable(message) + } +} + impl Retryable for MetastoreError { fn is_retryable(&self) -> bool { matches!( self, - MetastoreError::Connection { .. } - | MetastoreError::Db { .. } - | MetastoreError::Io { .. } - | MetastoreError::Internal { .. } + Self::Connection { .. } | Self::Db { .. } | Self::Io { .. } | Self::Internal { .. } ) } } diff --git a/quickwit/quickwit-proto/src/types/pipeline_uid.rs b/quickwit/quickwit-proto/src/types/pipeline_uid.rs index fea8406aa51..114125901c2 100644 --- a/quickwit/quickwit-proto/src/types/pipeline_uid.rs +++ b/quickwit/quickwit-proto/src/types/pipeline_uid.rs @@ -44,14 +44,15 @@ impl Display for PipelineUid { } impl PipelineUid { - pub fn from_u128(ulid_u128: u128) -> PipelineUid { - PipelineUid(Ulid::from_bytes(ulid_u128.to_le_bytes())) - } - /// Creates a new random pipeline uid. pub fn new() -> Self { Self(Ulid::new()) } + + #[cfg(any(test, feature = "testsuite"))] + pub fn for_test(ulid_u128: u128) -> PipelineUid { + Self(Ulid::from(ulid_u128)) + } } impl FromStr for PipelineUid { diff --git a/quickwit/quickwit-search/src/error.rs b/quickwit/quickwit-search/src/error.rs index c9cabef6e72..b7803f36eb8 100644 --- a/quickwit/quickwit-search/src/error.rs +++ b/quickwit/quickwit-search/src/error.rs @@ -18,8 +18,9 @@ // along with this program. If not, see . use quickwit_doc_mapper::QueryParserError; +use quickwit_proto::error::grpc_error_to_grpc_status; use quickwit_proto::metastore::{EntityKind, MetastoreError}; -use quickwit_proto::{tonic, ServiceError, ServiceErrorCode}; +use quickwit_proto::{tonic, GrpcServiceError, ServiceError, ServiceErrorCode}; use quickwit_storage::StorageResolverError; use serde::{Deserialize, Serialize}; use tantivy::TantivyError; @@ -29,6 +30,7 @@ use tokio::task::JoinError; /// Possible SearchError #[allow(missing_docs)] #[derive(Error, Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] pub enum SearchError { #[error("could not find indexes matching the IDs `{index_ids:?}`")] IndexesNotFound { index_ids: Vec }, @@ -42,24 +44,44 @@ pub enum SearchError { InvalidQuery(String), #[error("storage not found: `{0}`)")] StorageResolver(#[from] StorageResolverError), + #[error("request timed out: {0}")] + Timeout(String), + #[error("service unavailable: {0}")] + Unavailable(String), } impl ServiceError for SearchError { fn error_code(&self) -> ServiceErrorCode { match self { - SearchError::IndexesNotFound { .. } => ServiceErrorCode::NotFound, - SearchError::Internal(_) => ServiceErrorCode::Internal, - SearchError::InvalidAggregationRequest(_) => ServiceErrorCode::BadRequest, - SearchError::InvalidArgument(_) => ServiceErrorCode::BadRequest, - SearchError::InvalidQuery(_) => ServiceErrorCode::BadRequest, - SearchError::StorageResolver(_) => ServiceErrorCode::BadRequest, + Self::IndexesNotFound { .. } => ServiceErrorCode::NotFound, + Self::Internal(_) => ServiceErrorCode::Internal, + Self::InvalidAggregationRequest(_) => ServiceErrorCode::BadRequest, + Self::InvalidArgument(_) => ServiceErrorCode::BadRequest, + Self::InvalidQuery(_) => ServiceErrorCode::BadRequest, + Self::StorageResolver(_) => ServiceErrorCode::Internal, + Self::Timeout(_) => ServiceErrorCode::Timeout, + Self::Unavailable(_) => ServiceErrorCode::Unavailable, } } } +impl GrpcServiceError for SearchError { + fn new_internal(message: String) -> Self { + Self::Internal(message) + } + + fn new_timeout(message: String) -> Self { + Self::Timeout(message) + } + + fn new_unavailable(message: String) -> Self { + Self::Unavailable(message) + } +} + impl From for tonic::Status { fn from(error: SearchError) -> Self { - error.grpc_error() + grpc_error_to_grpc_status(error) } } @@ -72,7 +94,7 @@ pub fn parse_grpc_error(grpc_error: &tonic::Status) -> SearchError { impl From for SearchError { fn from(tantivy_error: TantivyError) -> Self { - SearchError::Internal(format!("Tantivy error: {tantivy_error}")) + SearchError::Internal(format!("tantivy error: {tantivy_error}")) } } @@ -84,7 +106,7 @@ impl From for SearchError { impl From for SearchError { fn from(serde_error: serde_json::Error) -> Self { - SearchError::Internal(format!("Serde error: {serde_error}")) + SearchError::Internal(format!("serde error: {serde_error}")) } } diff --git a/quickwit/quickwit-serve/src/debugging_api.rs b/quickwit/quickwit-serve/src/debugging_api.rs index 5e9132d2c14..a6a61ed40ee 100644 --- a/quickwit/quickwit-serve/src/debugging_api.rs +++ b/quickwit/quickwit-serve/src/debugging_api.rs @@ -52,6 +52,6 @@ pub async fn debugging_handler( &debug_info, // TODO error code on error hyper::StatusCode::OK, - &crate::format::BodyFormat::PrettyJson, + crate::format::BodyFormat::PrettyJson, ) } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs index 94c8bedd8fb..a9a10a29761 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs @@ -122,7 +122,7 @@ fn make_elastic_api_response( Ok(_) => StatusCode::OK, Err(error) => error.status, }; - RestApiResponse::new(&elasticsearch_result, status_code, &body_format) + RestApiResponse::new(&elasticsearch_result, status_code, body_format) } #[cfg(test)] diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs index f6571e9ec70..3b19b0c2dae 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs @@ -52,7 +52,7 @@ impl ElasticsearchError { impl From for ElasticsearchError { fn from(search_error: SearchError) -> Self { - let status = search_error.error_code().to_http_status_code(); + let status = search_error.error_code().http_status_code(); // Fill only reason field to keep it simple. let reason = ErrorCause { reason: Some(search_error.to_string()), @@ -72,7 +72,7 @@ impl From for ElasticsearchError { impl From for ElasticsearchError { fn from(ingest_service_error: IngestServiceError) -> Self { - let status = ingest_service_error.error_code().to_http_status_code(); + let status = ingest_service_error.error_code().http_status_code(); let reason = ErrorCause { reason: Some(ingest_service_error.to_string()), @@ -92,7 +92,7 @@ impl From for ElasticsearchError { impl From for ElasticsearchError { fn from(ingest_error: IngestV2Error) -> Self { - let status = ingest_error.error_code().to_http_status_code(); + let status = ingest_error.error_code().http_status_code(); let reason = ErrorCause { reason: Some(ingest_error.to_string()), @@ -112,7 +112,7 @@ impl From for ElasticsearchError { impl From for ElasticsearchError { fn from(ingest_error: IndexServiceError) -> Self { - let status = ingest_error.error_code().to_http_status_code(); + let status = ingest_error.error_code().http_status_code(); let reason = ErrorCause { reason: Some(ingest_error.to_string()), diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index cd49290098f..2ba222238d1 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -38,7 +38,6 @@ use quickwit_proto::search::{ SortDatetimeFormat, }; use quickwit_proto::types::IndexUid; -use quickwit_proto::ServiceErrorCode; use quickwit_query::query_ast::{BoolQuery, QueryAst, UserInputQuery}; use quickwit_query::BooleanOperand; use quickwit_search::{list_all_splits, resolve_index_patterns, SearchError, SearchService}; @@ -63,7 +62,7 @@ use super::model::{ }; use super::{make_elastic_api_response, TrackTotalHits}; use crate::format::BodyFormat; -use crate::rest_api_response::{into_rest_api_response, RestApiError, RestApiResponse}; +use crate::rest_api_response::{RestApiError, RestApiResponse}; use crate::{with_arg, BuildInfo}; /// Elastic compatible cluster info handler. @@ -97,12 +96,16 @@ pub fn es_compat_search_handler( elasticsearch_filter().then(|_params: SearchQueryParams| async move { // TODO let api_error = RestApiError { - service_code: ServiceErrorCode::NotSupportedYet, + status_code: StatusCode::NOT_IMPLEMENTED, message: "_elastic/_search is not supported yet. Please try the index search endpoint \ (_elastic/{index}/search)" .to_string(), }; - into_rest_api_response::<(), _>(Err(api_error), BodyFormat::default()) + RestApiResponse::new::<(), _>( + &Err(api_error), + StatusCode::NOT_IMPLEMENTED, + BodyFormat::default(), + ) }) } @@ -200,7 +203,7 @@ pub fn es_compat_index_multi_search_handler( Ok(_) => StatusCode::OK, Err(err) => err.status, }; - RestApiResponse::new(&result, status_code, &BodyFormat::default()) + RestApiResponse::new(&result, status_code, BodyFormat::default()) }) } diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index a99c35eb922..dfdd63b96b4 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -1533,7 +1533,7 @@ mod tests { .body(source_config_body) .reply(&index_management_handler) .await; - assert_eq!(resp.status(), 405); + assert_eq!(resp.status(), 403); let resp = warp::test::request() .path(format!("/indexes/hdfs-logs/sources/{CLI_SOURCE_ID}").as_str()) @@ -1541,7 +1541,7 @@ mod tests { .body(source_config_body) .reply(&index_management_handler) .await; - assert_eq!(resp.status(), 405); + assert_eq!(resp.status(), 403); // Check get a non existing source returns 404. let resp = warp::test::request() @@ -1570,7 +1570,7 @@ mod tests { } #[tokio::test] - async fn test_create_file_source_returns_405() { + async fn test_create_file_source_returns_403() { let metastore = metastore_for_test(); let index_service = IndexService::new(metastore.clone(), StorageResolver::unconfigured()); let mut node_config = NodeConfig::for_test(); @@ -1586,7 +1586,7 @@ mod tests { .body(source_config_body) .reply(&index_management_handler) .await; - assert_eq!(resp.status(), 405); + assert_eq!(resp.status(), 403); let response_body = std::str::from_utf8(resp.body()).unwrap(); assert!(response_body.contains("limited to a local usage")) } @@ -1920,7 +1920,7 @@ mod tests { .body(r#"{"enable": true}"#) .reply(&index_management_handler) .await; - assert_eq!(resp.status(), 405); + assert_eq!(resp.status(), 403); let resp = warp::test::request() .path(format!("/indexes/hdfs-logs/sources/{CLI_SOURCE_ID}/toggle").as_str()) @@ -1928,7 +1928,7 @@ mod tests { .body(r#"{"enable": true}"#) .reply(&index_management_handler) .await; - assert_eq!(resp.status(), 405); + assert_eq!(resp.status(), 403); Ok(()) } diff --git a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs index 3079d97dbeb..238492d6b15 100644 --- a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs @@ -325,13 +325,13 @@ async fn jaeger_get_trace_by_id( fn make_jaeger_api_response( jaeger_result: Result, - format: BodyFormat, + body_format: BodyFormat, ) -> RestApiResponse { let status_code = match &jaeger_result { Ok(_) => StatusCode::OK, Err(err) => err.status, }; - RestApiResponse::new(&jaeger_result, status_code, &format) + RestApiResponse::new(&jaeger_result, status_code, body_format) } #[cfg(test)] diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 7d3b57defac..a0fcc4d56b5 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -1293,7 +1293,7 @@ mod tests { assert!(new_indexer_node_info.indexing_tasks.is_empty()); let new_indexing_task = IndexingTask { - pipeline_uid: Some(PipelineUid::from_u128(0u128)), + pipeline_uid: Some(PipelineUid::for_test(0u128)), index_uid: Some(IndexUid::for_test("test-index", 0)), source_id: "test-source".to_string(), shard_ids: Vec::new(), diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index d41578e6935..8194acf3e7a 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -21,9 +21,8 @@ use std::net::SocketAddr; use std::sync::Arc; use hyper::http::HeaderValue; -use hyper::{http, Method}; +use hyper::{http, Method, StatusCode}; use quickwit_common::tower::BoxFutureInfaillible; -use quickwit_proto::ServiceErrorCode; use tower::make::Shared; use tower::ServiceBuilder; use tower_http::compression::predicate::{DefaultPredicate, Predicate, SizeAbove}; @@ -231,98 +230,98 @@ fn api_v1_routes( // More on this here: https://github.com/seanmonstar/warp/issues/388. // We may use this work on the PR is merged: https://github.com/seanmonstar/warp/pull/909. pub async fn recover_fn(rejection: Rejection) -> Result { - let err = get_status_with_error(rejection); - let status_code = err.service_code.to_http_status_code(); + let error = get_status_with_error(rejection); + let status_code = error.status_code; Ok(RestApiResponse::new::<(), _>( - &Err(err), + &Err(error), status_code, - &BodyFormat::default(), + BodyFormat::default(), )) } fn get_status_with_error(rejection: Rejection) -> RestApiError { if let Some(error) = rejection.find::() { RestApiError { - service_code: ServiceErrorCode::UnsupportedMediaType, + status_code: StatusCode::UNSUPPORTED_MEDIA_TYPE, message: error.to_string(), } } else if rejection.is_not_found() { RestApiError { - service_code: ServiceErrorCode::NotFound, + status_code: StatusCode::NOT_FOUND, message: "Route not found".to_string(), } } else if let Some(error) = rejection.find::() { RestApiError { - service_code: ServiceErrorCode::BadRequest, + status_code: StatusCode::BAD_REQUEST, message: error.to_string(), } } else if let Some(error) = rejection.find::() { // Happens when the request body could not be deserialized correctly. RestApiError { - service_code: ServiceErrorCode::BadRequest, + status_code: StatusCode::BAD_REQUEST, message: error.0.to_string(), } } else if let Some(error) = rejection.find::() { // Happens when the url path or request body contains invalid argument(s). RestApiError { - service_code: ServiceErrorCode::BadRequest, + status_code: StatusCode::BAD_REQUEST, message: error.0.to_string(), } } else if let Some(error) = rejection.find::() { // Happens when the request body could not be deserialized correctly. RestApiError { - service_code: ServiceErrorCode::BadRequest, + status_code: StatusCode::BAD_REQUEST, message: error.to_string(), } } else if let Some(error) = rejection.find::() { RestApiError { - service_code: ServiceErrorCode::UnsupportedMediaType, + status_code: StatusCode::UNSUPPORTED_MEDIA_TYPE, message: error.to_string(), } } else if let Some(error) = rejection.find::() { RestApiError { - service_code: ServiceErrorCode::UnsupportedMediaType, + status_code: StatusCode::UNSUPPORTED_MEDIA_TYPE, message: error.to_string(), } } else if let Some(error) = rejection.find::() { RestApiError { - service_code: ServiceErrorCode::BadRequest, + status_code: StatusCode::BAD_REQUEST, message: error.to_string(), } } else if let Some(error) = rejection.find::() { RestApiError { - service_code: ServiceErrorCode::BadRequest, + status_code: StatusCode::BAD_REQUEST, message: error.to_string(), } } else if let Some(error) = rejection.find::() { RestApiError { - service_code: ServiceErrorCode::BadRequest, + status_code: StatusCode::LENGTH_REQUIRED, message: error.to_string(), } } else if let Some(error) = rejection.find::() { RestApiError { - service_code: ServiceErrorCode::BadRequest, + status_code: StatusCode::BAD_REQUEST, message: error.to_string(), } } else if let Some(error) = rejection.find::() { RestApiError { - service_code: ServiceErrorCode::BadRequest, + status_code: StatusCode::BAD_REQUEST, message: error.to_string(), } } else if let Some(error) = rejection.find::() { RestApiError { - service_code: ServiceErrorCode::MethodNotAllowed, + status_code: StatusCode::METHOD_NOT_ALLOWED, message: error.to_string(), } } else if let Some(error) = rejection.find::() { RestApiError { - service_code: ServiceErrorCode::BadRequest, + status_code: StatusCode::PAYLOAD_TOO_LARGE, message: error.to_string(), } } else { error!("REST server error: {:?}", rejection); RestApiError { - service_code: ServiceErrorCode::Internal, + status_code: StatusCode::INTERNAL_SERVER_ERROR, message: "internal server error".to_string(), } } diff --git a/quickwit/quickwit-serve/src/rest_api_response.rs b/quickwit/quickwit-serve/src/rest_api_response.rs index 2f2e6de3c79..c8c64dbb1d2 100644 --- a/quickwit/quickwit-serve/src/rest_api_response.rs +++ b/quickwit/quickwit-serve/src/rest_api_response.rs @@ -18,9 +18,9 @@ // along with this program. If not, see . use hyper::header::CONTENT_TYPE; -use hyper::http::{status, HeaderValue}; -use hyper::{Body, Response}; -use quickwit_proto::{ServiceError, ServiceErrorCode}; +use hyper::http::HeaderValue; +use hyper::{Body, Response, StatusCode}; +use quickwit_proto::ServiceError; use serde::{self, Serialize}; use warp::Reply; @@ -33,22 +33,10 @@ pub(crate) struct RestApiError { // For now, we want to keep [`RestApiError`] as simple as possible // and return just a message. #[serde(skip_serializing)] - pub service_code: ServiceErrorCode, + pub status_code: StatusCode, pub message: String, } -impl ServiceError for RestApiError { - fn error_code(&self) -> ServiceErrorCode { - self.service_code - } -} - -impl ToString for RestApiError { - fn to_string(&self) -> String { - self.message.clone() - } -} - /// Makes a JSON API response from a result. /// The error is wrapped into an [`RestApiError`] to publicly expose /// a consistent error format. @@ -57,27 +45,27 @@ pub(crate) fn into_rest_api_response( body_format: BodyFormat, ) -> RestApiResponse { let rest_api_result = result.map_err(|error| RestApiError { - service_code: error.error_code(), + status_code: error.error_code().http_status_code(), message: error.to_string(), }); let status_code = match &rest_api_result { - Ok(_) => status::StatusCode::OK, - Err(error) => error.error_code().to_http_status_code(), + Ok(_) => StatusCode::OK, + Err(error) => error.status_code, }; - RestApiResponse::new(&rest_api_result, status_code, &body_format) + RestApiResponse::new(&rest_api_result, status_code, body_format) } /// A JSON reply for the REST API. pub struct RestApiResponse { - status_code: status::StatusCode, + status_code: StatusCode, inner: Result, ()>, } impl RestApiResponse { pub fn new( result: &Result, - status_code: status::StatusCode, - body_format: &BodyFormat, + status_code: StatusCode, + body_format: BodyFormat, ) -> Self { let inner = body_format.result_to_vec(result); RestApiResponse { status_code, inner } @@ -97,7 +85,7 @@ impl Reply for RestApiResponse { response } Err(()) => warp::reply::json(&RestApiError { - service_code: ServiceErrorCode::Internal, + status_code: StatusCode::INTERNAL_SERVER_ERROR, message: JSON_SERIALIZATION_ERROR.to_string(), }) .into_response(), diff --git a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs index 3a1d1a9ea9e..85297ca1848 100644 --- a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs +++ b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs @@ -27,7 +27,7 @@ use quickwit_proto::search::{ LeafSearchStreamRequest, LeafSearchStreamResponse, ListFieldsRequest, ListFieldsResponse, ReportSplitsRequest, ReportSplitsResponse, }; -use quickwit_proto::{set_parent_span_from_request_metadata, tonic, ServiceError}; +use quickwit_proto::{set_parent_span_from_request_metadata, tonic, GrpcServiceError}; use quickwit_search::SearchService; use tracing::instrument; @@ -93,8 +93,8 @@ impl grpc::SearchService for GrpcSearchAdapter { .0 .leaf_search_stream(leaf_search_request) .await - .map_err(|err| err.grpc_error())? - .map_err(|err| err.grpc_error()); + .map_err(|error| error.into_grpc_status())? + .map_err(|error| error.into_grpc_status()); Ok(tonic::Response::new(Box::pin(leaf_search_result))) } diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index 29e4fd9b0fe..705371f728e 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -489,9 +489,9 @@ fn make_streaming_reply(result: Result) -> impl Reply status_code = StatusCode::OK; warp::reply::Response::new(body) } - Err(err) => { - status_code = err.error_code().to_http_status_code(); - warp::reply::Response::new(hyper::Body::from(err.to_string())) + Err(error) => { + status_code = error.error_code().http_status_code(); + warp::reply::Response::new(hyper::Body::from(error.to_string())) } }; reply::with_status(body, status_code)