From 7dd42758e69ab96a91c73290493fe129b8ab426b Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 25 Jul 2024 18:51:04 +0900 Subject: [PATCH] Adding logs whenever we emit an internal error. (#5246) --- .../quickwit-index-management/src/index.rs | 22 +++++++-- quickwit/quickwit-ingest/src/error.rs | 19 +++++-- quickwit/quickwit-janitor/src/error.rs | 6 ++- quickwit/quickwit-proto/src/cluster/mod.rs | 6 ++- .../quickwit-proto/src/control_plane/mod.rs | 9 +++- quickwit/quickwit-proto/src/indexing/mod.rs | 6 ++- quickwit/quickwit-proto/src/ingest/mod.rs | 6 ++- quickwit/quickwit-proto/src/metastore/mod.rs | 49 ++++++++++++++++--- quickwit/quickwit-search/src/error.rs | 14 +++++- .../src/elasticsearch_api/bulk_v2.rs | 31 ++++++++++-- .../src/otlp_api/rest_handler.rs | 6 ++- 11 files changed, 149 insertions(+), 25 deletions(-) diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 51377720594..8195a515b3d 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -25,6 +25,7 @@ use futures_util::StreamExt; use itertools::Itertools; use quickwit_common::fs::{empty_dir, get_cache_directory_path}; use quickwit_common::pretty::PrettySample; +use quickwit_common::rate_limited_error; use quickwit_config::{validate_identifier, IndexConfig, SourceConfig}; use quickwit_indexing::check_source_connectivity; use quickwit_metastore::{ @@ -70,13 +71,28 @@ pub enum IndexServiceError { impl ServiceError for IndexServiceError { fn error_code(&self) -> ServiceErrorCode { match self { - Self::Internal(_) => ServiceErrorCode::Internal, + Self::Internal(err_msg) => { + rate_limited_error!(limit_per_min = 6, err_msg); + ServiceErrorCode::Internal + } Self::InvalidConfig(_) => ServiceErrorCode::BadRequest, Self::InvalidIdentifier(_) => ServiceErrorCode::BadRequest, Self::Metastore(error) => error.error_code(), Self::OperationNotAllowed(_) => ServiceErrorCode::Forbidden, - Self::SplitDeletion(_) => ServiceErrorCode::Internal, - Self::Storage(_) => ServiceErrorCode::Internal, + Self::SplitDeletion(delete_splits_error) => { + rate_limited_error!( + limit_per_min = 6, + "index service internal error/split deletion: {delete_splits_error:?}" + ); + ServiceErrorCode::Internal + } + Self::Storage(storage_error) => { + rate_limited_error!( + limit_per_min = 6, + "index service internal error/storage {storage_error:?}" + ); + ServiceErrorCode::Internal + } } } } diff --git a/quickwit/quickwit-ingest/src/error.rs b/quickwit/quickwit-ingest/src/error.rs index a919a46f666..189a94d8186 100644 --- a/quickwit/quickwit-ingest/src/error.rs +++ b/quickwit/quickwit-ingest/src/error.rs @@ -21,6 +21,7 @@ use std::io; use mrecordlog::error::*; use quickwit_actors::AskError; +use quickwit_common::rate_limited_error; use quickwit_common::tower::BufferError; pub(crate) use quickwit_proto::error::{grpc_error_to_grpc_status, grpc_status_to_service_error}; use quickwit_proto::ingest::IngestV2Error; @@ -96,12 +97,24 @@ impl From for IngestServiceError { impl ServiceError for IngestServiceError { fn error_code(&self) -> ServiceErrorCode { match self { - Self::Corruption { .. } => ServiceErrorCode::Internal, + Self::Corruption(err_msg) => { + rate_limited_error!( + limit_per_min = 6, + "ingest/corruption internal error: {err_msg}" + ); + ServiceErrorCode::Internal + } Self::IndexAlreadyExists { .. } => ServiceErrorCode::AlreadyExists, Self::IndexNotFound { .. } => ServiceErrorCode::NotFound, - Self::Internal(_) => ServiceErrorCode::Internal, + Self::Internal(err_msg) => { + rate_limited_error!(limit_per_min = 6, "ingest internal error: {err_msg}"); + ServiceErrorCode::Internal + } Self::InvalidPosition(_) => ServiceErrorCode::BadRequest, - Self::IoError { .. } => ServiceErrorCode::Internal, + Self::IoError(io_err) => { + rate_limited_error!(limit_per_min = 6, "ingest/io internal error: {io_err}"); + ServiceErrorCode::Internal + } Self::RateLimited => ServiceErrorCode::TooManyRequests, Self::Unavailable(_) => ServiceErrorCode::Unavailable, } diff --git a/quickwit/quickwit-janitor/src/error.rs b/quickwit/quickwit-janitor/src/error.rs index 3eed5dbd2fd..ea715b047a0 100644 --- a/quickwit/quickwit-janitor/src/error.rs +++ b/quickwit/quickwit-janitor/src/error.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use quickwit_common::rate_limited_error; use quickwit_proto::metastore::MetastoreError; use quickwit_proto::{ServiceError, ServiceErrorCode}; use serde::{Deserialize, Serialize}; @@ -37,7 +38,10 @@ pub enum JanitorError { impl ServiceError for JanitorError { fn error_code(&self) -> ServiceErrorCode { match self { - Self::Internal(_) => ServiceErrorCode::Internal, + Self::Internal(err_msg) => { + rate_limited_error!(limit_per_min = 6, "janitor internal error {err_msg}"); + ServiceErrorCode::Internal + } Self::InvalidDeleteQuery(_) => ServiceErrorCode::BadRequest, Self::Metastore(metastore_error) => metastore_error.error_code(), } diff --git a/quickwit/quickwit-proto/src/cluster/mod.rs b/quickwit/quickwit-proto/src/cluster/mod.rs index d16a61c5c3b..48ee9dc0554 100644 --- a/quickwit/quickwit-proto/src/cluster/mod.rs +++ b/quickwit/quickwit-proto/src/cluster/mod.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use quickwit_common::rate_limited_error; use quickwit_common::tower::MakeLoadShedError; use thiserror; @@ -43,7 +44,10 @@ pub enum ClusterError { impl ServiceError for ClusterError { fn error_code(&self) -> ServiceErrorCode { match self { - Self::Internal(_) => ServiceErrorCode::Internal, + Self::Internal(err_msg) => { + rate_limited_error!(limit_per_min = 6, "cluster internal error: {err_msg}"); + ServiceErrorCode::Internal + } Self::Timeout(_) => ServiceErrorCode::Timeout, Self::TooManyRequests => ServiceErrorCode::TooManyRequests, Self::Unavailable(_) => ServiceErrorCode::Unavailable, diff --git a/quickwit/quickwit-proto/src/control_plane/mod.rs b/quickwit/quickwit-proto/src/control_plane/mod.rs index 85caadfdb28..5a1becaf186 100644 --- a/quickwit/quickwit-proto/src/control_plane/mod.rs +++ b/quickwit/quickwit-proto/src/control_plane/mod.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use quickwit_actors::AskError; +use quickwit_common::rate_limited_error; use quickwit_common::tower::{MakeLoadShedError, RpcName}; use thiserror; @@ -52,7 +53,13 @@ impl From for ControlPlaneError { impl ServiceError for ControlPlaneError { fn error_code(&self) -> ServiceErrorCode { match self { - Self::Internal(_) => ServiceErrorCode::Internal, + Self::Internal(error_msg) => { + rate_limited_error!( + limit_per_min = 6, + "control plane internal error: {error_msg}" + ); + ServiceErrorCode::Internal + } Self::Metastore(metastore_error) => metastore_error.error_code(), Self::Timeout(_) => ServiceErrorCode::Timeout, Self::TooManyRequests => ServiceErrorCode::TooManyRequests, diff --git a/quickwit/quickwit-proto/src/indexing/mod.rs b/quickwit/quickwit-proto/src/indexing/mod.rs index 968c575aee5..ae8666978ab 100644 --- a/quickwit/quickwit-proto/src/indexing/mod.rs +++ b/quickwit/quickwit-proto/src/indexing/mod.rs @@ -25,6 +25,7 @@ use std::ops::{Add, Mul, Sub}; use bytesize::ByteSize; use quickwit_actors::AskError; use quickwit_common::pubsub::Event; +use quickwit_common::rate_limited_error; use quickwit_common::tower::{MakeLoadShedError, RpcName}; use serde::{Deserialize, Serialize}; use thiserror; @@ -55,7 +56,10 @@ pub enum IndexingError { impl ServiceError for IndexingError { fn error_code(&self) -> ServiceErrorCode { match self { - Self::Internal(_) => ServiceErrorCode::Internal, + Self::Internal(err_msg) => { + rate_limited_error!(limit_per_min = 6, "indexing error: {err_msg}"); + ServiceErrorCode::Internal + } Self::Metastore(metastore_error) => metastore_error.error_code(), Self::Timeout(_) => ServiceErrorCode::Timeout, Self::TooManyRequests => ServiceErrorCode::TooManyRequests, diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index f83f4f42c86..773d2236fc6 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -21,6 +21,7 @@ use std::iter::zip; use bytes::Bytes; use bytesize::ByteSize; +use quickwit_common::rate_limited_error; use quickwit_common::tower::MakeLoadShedError; use self::ingester::{PersistFailureReason, ReplicateFailureReason}; @@ -59,7 +60,10 @@ impl From for IngestV2Error { impl ServiceError for IngestV2Error { fn error_code(&self) -> ServiceErrorCode { match self { - Self::Internal(_) => ServiceErrorCode::Internal, + Self::Internal(error_msg) => { + rate_limited_error!(limit_per_min = 6, "ingest internal error: {error_msg}"); + ServiceErrorCode::Internal + } Self::ShardNotFound { .. } => ServiceErrorCode::NotFound, Self::Timeout(_) => ServiceErrorCode::Timeout, Self::TooManyRequests => ServiceErrorCode::TooManyRequests, diff --git a/quickwit/quickwit-proto/src/metastore/mod.rs b/quickwit/quickwit-proto/src/metastore/mod.rs index 06ffbd644a4..6caba6f7f1d 100644 --- a/quickwit/quickwit-proto/src/metastore/mod.rs +++ b/quickwit/quickwit-proto/src/metastore/mod.rs @@ -19,6 +19,7 @@ use std::fmt; +use quickwit_common::rate_limited_error; use quickwit_common::retry::Retryable; use quickwit_common::tower::MakeLoadShedError; use serde::{Deserialize, Serialize}; @@ -192,15 +193,51 @@ impl ServiceError for MetastoreError { fn error_code(&self) -> ServiceErrorCode { match self { Self::AlreadyExists(_) => ServiceErrorCode::AlreadyExists, - Self::Connection { .. } => ServiceErrorCode::Internal, - Self::Db { .. } => ServiceErrorCode::Internal, + Self::Connection { message } => { + rate_limited_error!( + limit_per_min = 6, + "metastore/connection internal error: {message}" + ); + ServiceErrorCode::Internal + } + Self::Db { message } => { + rate_limited_error!(limit_per_min = 6, "metastore/db internal error: {message}"); + ServiceErrorCode::Internal + } Self::FailedPrecondition { .. } => ServiceErrorCode::BadRequest, Self::Forbidden { .. } => ServiceErrorCode::Forbidden, - Self::Internal { .. } => ServiceErrorCode::Internal, + Self::Internal { message, cause } => { + rate_limited_error!( + limit_per_min = 6, + "metastore internal error: {message} cause: {cause}" + ); + ServiceErrorCode::Internal + } Self::InvalidArgument { .. } => ServiceErrorCode::BadRequest, - Self::Io { .. } => ServiceErrorCode::Internal, - Self::JsonDeserializeError { .. } => ServiceErrorCode::Internal, - Self::JsonSerializeError { .. } => ServiceErrorCode::Internal, + Self::Io { message } => { + rate_limited_error!(limit_per_min = 6, "metastore/io internal error: {message}"); + ServiceErrorCode::Internal + } + Self::JsonDeserializeError { + struct_name, + message, + } => { + rate_limited_error!( + limit_per_min = 6, + "metastore/jsondeser internal error: [{struct_name}] {message}" + ); + ServiceErrorCode::Internal + } + Self::JsonSerializeError { + struct_name, + message, + } => { + rate_limited_error!( + limit_per_min = 6, + "metastore/jsonser internal error: [{struct_name}] {message}" + ); + ServiceErrorCode::Internal + } Self::NotFound(_) => ServiceErrorCode::NotFound, Self::Timeout(_) => ServiceErrorCode::Timeout, Self::TooManyRequests => ServiceErrorCode::TooManyRequests, diff --git a/quickwit/quickwit-search/src/error.rs b/quickwit/quickwit-search/src/error.rs index 53304b7247e..0dbdff10e41 100644 --- a/quickwit/quickwit-search/src/error.rs +++ b/quickwit/quickwit-search/src/error.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use quickwit_common::rate_limited_error; use quickwit_doc_mapper::QueryParserError; use quickwit_proto::error::grpc_error_to_grpc_status; use quickwit_proto::metastore::{EntityKind, MetastoreError}; @@ -56,11 +57,20 @@ impl ServiceError for SearchError { fn error_code(&self) -> ServiceErrorCode { match self { Self::IndexesNotFound { .. } => ServiceErrorCode::NotFound, - Self::Internal(_) => ServiceErrorCode::Internal, + Self::Internal(error_msg) => { + rate_limited_error!(limit_per_min = 6, "search internal error: {error_msg}"); + ServiceErrorCode::Internal + } Self::InvalidAggregationRequest(_) => ServiceErrorCode::BadRequest, Self::InvalidArgument(_) => ServiceErrorCode::BadRequest, Self::InvalidQuery(_) => ServiceErrorCode::BadRequest, - Self::StorageResolver(_) => ServiceErrorCode::Internal, + Self::StorageResolver(storage_err) => { + rate_limited_error!( + limit_per_min = 6, + "search's storager resolver internal error: {storage_err}" + ); + ServiceErrorCode::Internal + } Self::Timeout(_) => ServiceErrorCode::Timeout, Self::TooManyRequests => ServiceErrorCode::TooManyRequests, Self::Unavailable(_) => ServiceErrorCode::Unavailable, diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs index c33bbb1e041..09b01acd1c6 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs @@ -21,6 +21,7 @@ use std::collections::HashMap; use std::time::Instant; use hyper::StatusCode; +use quickwit_common::rate_limited_error; use quickwit_config::INGEST_V2_SOURCE_ID; use quickwit_ingest::IngestRequestV2Builder; use quickwit_proto::ingest::router::{ @@ -147,7 +148,10 @@ pub(crate) async fn elastic_bulk_ingest_v2( let Some(ingest_request) = ingest_request_opt else { return Ok(ElasticBulkResponse::default()); }; - let ingest_response = ingest_router.ingest(ingest_request).await?; + let ingest_response = ingest_router.ingest(ingest_request).await.map_err(|err| { + rate_limited_error!(limit_per_min=6, err=?err, "router error"); + err + })?; make_elastic_bulk_response_v2(ingest_response, per_subrequest_doc_handles, now) } @@ -168,8 +172,14 @@ fn make_elastic_bulk_response_v2( .expect("`index_uid` should be a required field"); // Find the doc handles for the subresponse. - let mut doc_handles = - remove_doc_handles(&mut per_subrequest_doc_handles, success.subrequest_id)?; + let mut doc_handles = remove_doc_handles( + &mut per_subrequest_doc_handles, + success.subrequest_id, + ) + .map_err(|err| { + rate_limited_error!(limit_per_min=6, index_id=%index_id, "could not find subrequest id"); + err + })?; doc_handles.sort_unstable_by(|left, right| left.doc_uid.cmp(&right.doc_uid)); // Populate the response items with one error per parse failure. @@ -178,9 +188,11 @@ fn make_elastic_bulk_response_v2( // Since the generated doc UIDs are monotonically increasing, and inserted in order, we // can find doc handles using binary search. + let failed_doc_uid = parse_failure.doc_uid(); let doc_handle_idx = doc_handles - .binary_search_by_key(&parse_failure.doc_uid(), |doc_handle| doc_handle.doc_uid) + .binary_search_by_key(&failed_doc_uid, |doc_handle| doc_handle.doc_uid) .map_err(|_| { + rate_limited_error!(limit_per_min=6, doc_uid=%failed_doc_uid, "could not find doc_uid from parse failure"); ElasticsearchError::new( StatusCode::INTERNAL_SERVER_ERROR, format!( @@ -228,7 +240,16 @@ fn make_elastic_bulk_response_v2( // Find the doc handles for the subrequest. let doc_handles = - remove_doc_handles(&mut per_subrequest_doc_handles, failure.subrequest_id)?; + remove_doc_handles(&mut per_subrequest_doc_handles, failure.subrequest_id).map_err( + |err| { + rate_limited_error!( + limit_per_min = 6, + subrequest = failure.subrequest_id, + "failed to find error subrequest" + ); + err + }, + )?; // Populate the response items with one error per doc handle. let (exception, reason, status) = match failure.reason() { diff --git a/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs b/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs index 0b61f835e4c..bce7df4ba79 100644 --- a/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use bytes::Bytes; +use quickwit_common::rate_limited_error; use quickwit_opentelemetry::otlp::{ OtlpGrpcLogsService, OtlpGrpcTracesService, OTEL_LOGS_INDEX_ID, OTEL_TRACES_INDEX_ID, }; @@ -154,7 +155,10 @@ impl ServiceError for OtlpApiError { fn error_code(&self) -> ServiceErrorCode { match self { OtlpApiError::InvalidPayload(_) => ServiceErrorCode::BadRequest, - OtlpApiError::Ingest(_) => ServiceErrorCode::Internal, + OtlpApiError::Ingest(err_msg) => { + rate_limited_error!(limit_per_min = 6, "otlp internal error: {err_msg}"); + ServiceErrorCode::Internal + } } } }