diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index c7ab1911205..db22ff1743e 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -550,6 +550,7 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> { format: BodyFormat::Json, sort_by, count_all: CountHits::CountAll, + allow_failed_splits: false, }; let search_request = search_request_from_api_request(vec![args.index_id], search_request_query_string)?; diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 50ad8aec3c1..63d681f6945 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -126,14 +126,14 @@ message ListFieldsRequest { // Optional limit query to a list of fields // Wildcard expressions are supported. repeated string fields = 2; - + // Time filter, expressed in seconds since epoch. // That filter is to be interpreted as the semi-open interval: // [start_timestamp, end_timestamp). optional int64 start_timestamp = 3; optional int64 end_timestamp = 4; - // Control if the the request will fail if split_ids contains a split that does not exist. + // Control if the the request will fail if split_ids contains a split that does not exist. // optional bool fail_on_missing_index = 6; } @@ -149,7 +149,7 @@ message LeafListFieldsRequest { // Optional limit query to a list of fields // Wildcard expressions are supported. repeated string fields = 4; - + } message ListFieldsResponse { @@ -299,6 +299,14 @@ message SearchResponse { // Scroll Id (only set if scroll_secs was set in the request) optional string scroll_id = 6; + + // Returns the list of splits for which search failed. + // For the moment, the cause is unknown. + // + // It is up to the caller to decide whether to interpret + // this as an overall failure or to present the partial results + // to the end user. + repeated SplitSearchError failed_splits = 7; } message SearchPlanResponse { @@ -340,7 +348,7 @@ message LeafSearchRequest { message LeafRequestRef { // The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers` uint32 doc_mapper_ord = 1; - + // The ordinal of the index uri in LeafSearchRequest.index_uris uint32 index_uri_ord = 2; @@ -453,8 +461,8 @@ message LeafSearchResponse { // The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. repeated SplitSearchError failed_splits = 3; - // Total number of splits the leaf(s) were in charge of. - // num_attempted_splits = num_successful_splits + num_failed_splits. + // Total number of attempt to search into splits. + // num_attempted_splits = num_successful_splits + number of . uint64 num_attempted_splits = 4; // Deprecated json serialized intermediate aggregation_result. @@ -550,8 +558,7 @@ message LeafListTermsResponse { // The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. repeated SplitSearchError failed_splits = 3; - // Total number of splits the leaf(s) were in charge of. - // num_attempted_splits = num_successful_splits + num_failed_splits. + // Total number of single split search attempted. uint64 num_attempted_splits = 4; } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index ed0219d0a7f..fe92a7e2ffd 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -230,6 +230,14 @@ pub struct SearchResponse { /// Scroll Id (only set if scroll_secs was set in the request) #[prost(string, optional, tag = "6")] pub scroll_id: ::core::option::Option<::prost::alloc::string::String>, + /// Returns the list of splits for which search failed. + /// For the moment, the cause is unknown. + /// + /// It is up to the caller to decide whether to interpret + /// this as an overall failure or to present the partial results + /// to the end user. + #[prost(message, repeated, tag = "7")] + pub failed_splits: ::prost::alloc::vec::Vec, } #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -431,8 +439,8 @@ pub struct LeafSearchResponse { /// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. #[prost(message, repeated, tag = "3")] pub failed_splits: ::prost::alloc::vec::Vec, - /// Total number of splits the leaf(s) were in charge of. - /// num_attempted_splits = num_successful_splits + num_failed_splits. + /// Total number of attempt to search into splits. + /// num_attempted_splits = num_successful_splits + number of . #[prost(uint64, tag = "4")] pub num_attempted_splits: u64, /// postcard serialized intermediate aggregation_result. @@ -551,8 +559,7 @@ pub struct LeafListTermsResponse { /// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. #[prost(message, repeated, tag = "3")] pub failed_splits: ::prost::alloc::vec::Vec, - /// Total number of splits the leaf(s) were in charge of. - /// num_attempted_splits = num_successful_splits + num_failed_splits. + /// Total number of single split search attempted. #[prost(uint64, tag = "4")] pub num_attempted_splits: u64, } diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index 25d53ca7554..720e30bd653 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -94,21 +94,36 @@ impl ClusterClient { ) -> crate::Result { let mut response_res = client.leaf_search(request.clone()).await; let retry_policy = LeafSearchRetryPolicy {}; - if let Some(retry_request) = retry_policy.retry_request(request, &response_res) { - assert!(!retry_request.leaf_requests.is_empty()); - client = retry_client( - &self.search_job_placer, - client.grpc_addr(), - &retry_request.leaf_requests[0].split_offsets[0].split_id, - ) - .await?; - debug!( - "Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}", - response_res, retry_request, client + // We retry only once. + let Some(retry_request) = retry_policy.retry_request(request, &response_res) else { + return response_res; + }; + let Some(first_split) = retry_request + .leaf_requests + .iter() + .flat_map(|leaf_req| leaf_req.split_offsets.iter()) + .next() + else { + warn!( + "the retry request did not contain any split to retry. this should never happen, \ + please report" ); - let retry_result = client.leaf_search(retry_request).await; - response_res = merge_leaf_search_results(response_res, retry_result); - } + return response_res; + }; + // There could be more than one split in the retry request. We pick a single client + // arbitrarily only considering the affinity of the first split. + client = retry_client( + &self.search_job_placer, + client.grpc_addr(), + &first_split.split_id, + ) + .await?; + debug!( + "Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}", + response_res, retry_request, client + ); + let retry_result = client.leaf_search(retry_request).await; + response_res = merge_original_with_retry_leaf_search_results(response_res, retry_result); response_res } @@ -274,16 +289,24 @@ fn merge_intermediate_aggregation(left: &[u8], right: &[u8]) -> crate::Result crate::Result { - left_response + original_response .partial_hits - .extend(right_response.partial_hits); + .extend(retry_response.partial_hits); let intermediate_aggregation_result: Option> = match ( - left_response.intermediate_aggregation_result, - right_response.intermediate_aggregation_result, + original_response.intermediate_aggregation_result, + retry_response.intermediate_aggregation_result, ) { (Some(left_agg_bytes), Some(right_agg_bytes)) => { let intermediate_aggregation_bytes: Vec = @@ -296,22 +319,22 @@ fn merge_leaf_search_response( }; Ok(LeafSearchResponse { intermediate_aggregation_result, - num_hits: left_response.num_hits + right_response.num_hits, - num_attempted_splits: left_response.num_attempted_splits - + right_response.num_attempted_splits, - failed_splits: right_response.failed_splits, - partial_hits: left_response.partial_hits, + num_hits: original_response.num_hits + retry_response.num_hits, + num_attempted_splits: original_response.num_attempted_splits + + retry_response.num_attempted_splits, + failed_splits: retry_response.failed_splits, + partial_hits: original_response.partial_hits, }) } // Merge initial leaf search results with results obtained from a retry. -fn merge_leaf_search_results( +fn merge_original_with_retry_leaf_search_results( left_search_response_result: crate::Result, right_search_response_result: crate::Result, ) -> crate::Result { match (left_search_response_result, right_search_response_result) { (Ok(left_response), Ok(right_response)) => { - merge_leaf_search_response(left_response, right_response) + merge_original_with_retry_leaf_search_response(left_response, right_response) } (Ok(single_valid_response), Err(_)) => Ok(single_valid_response), (Err(_), Ok(single_valid_response)) => Ok(single_valid_response), @@ -626,8 +649,11 @@ mod tests { num_attempted_splits: 1, ..Default::default() }; - let merged_leaf_search_response = - merge_leaf_search_results(Ok(leaf_response), Ok(leaf_response_retry)).unwrap(); + let merged_leaf_search_response = merge_original_with_retry_leaf_search_results( + Ok(leaf_response), + Ok(leaf_response_retry), + ) + .unwrap(); assert_eq!(merged_leaf_search_response.num_attempted_splits, 2); assert_eq!(merged_leaf_search_response.num_hits, 2); assert_eq!(merged_leaf_search_response.partial_hits.len(), 2); @@ -649,7 +675,7 @@ mod tests { num_attempted_splits: 1, ..Default::default() }; - let merged_result = merge_leaf_search_results( + let merged_result = merge_original_with_retry_leaf_search_results( Err(SearchError::Internal("error".to_string())), Ok(leaf_response), ) @@ -663,7 +689,7 @@ mod tests { #[test] fn test_merge_leaf_search_retry_error_on_error() -> anyhow::Result<()> { - let merge_error = merge_leaf_search_results( + let merge_error = merge_original_with_retry_leaf_search_results( Err(SearchError::Internal("error".to_string())), Err(SearchError::Internal("retry error".to_string())), ) diff --git a/quickwit/quickwit-search/src/error.rs b/quickwit/quickwit-search/src/error.rs index 17f44f49e1e..76671545e71 100644 --- a/quickwit/quickwit-search/src/error.rs +++ b/quickwit/quickwit-search/src/error.rs @@ -17,10 +17,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use itertools::Itertools; use quickwit_common::rate_limited_error; use quickwit_doc_mapper::QueryParserError; use quickwit_proto::error::grpc_error_to_grpc_status; use quickwit_proto::metastore::{EntityKind, MetastoreError}; +use quickwit_proto::search::SplitSearchError; use quickwit_proto::{tonic, GrpcServiceError, ServiceError, ServiceErrorCode}; use quickwit_storage::StorageResolverError; use serde::{Deserialize, Serialize}; @@ -53,6 +55,23 @@ pub enum SearchError { Unavailable(String), } +impl SearchError { + /// Creates an internal `SearchError` from a list of split search errors. + pub fn from_split_errors(failed_splits: &[SplitSearchError]) -> Option { + let first_failing_split = failed_splits.first()?; + let failed_splits = failed_splits + .iter() + .map(|failed_split| &failed_split.split_id) + .join(", "); + let error_msg = format!( + "search failed for the following splits: {failed_splits:}. For instance, split {} \ + failed with the following error message: {}", + first_failing_split.split_id, first_failing_split.error, + ); + Some(SearchError::Internal(error_msg)) + } +} + impl ServiceError for SearchError { fn error_code(&self) -> ServiceErrorCode { match self { diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index f7aa7395172..b28391b7a1f 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -49,7 +49,7 @@ use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResult use tantivy::collector::Collector; use tantivy::schema::{FieldEntry, FieldType, Schema}; use tantivy::TantivyError; -use tracing::{debug, error, info, info_span, instrument}; +use tracing::{debug, info, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; @@ -575,6 +575,7 @@ async fn search_partial_hits_phase_with_scroll( max_hits_per_page: max_hits, cached_partial_hits_start_offset: search_request.start_offset, cached_partial_hits, + failed_splits: leaf_search_resp.failed_splits.clone(), }; let scroll_key_and_start_offset: ScrollKeyAndStartOffset = ScrollKeyAndStartOffset::new_with_start_offset( @@ -655,6 +656,8 @@ pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec> = + let leaf_search_results: Vec> = leaf_search_responses.into_iter().map(Ok).collect_vec(); let span = info_span!("merge_fruits"); let leaf_search_response = crate::search_thread_pool() .run_cpu_intensive(move || { let _span_guard = span.enter(); - merge_collector.merge_fruits(leaf_search_responses) + merge_collector.merge_fruits(leaf_search_results) }) .await .context("failed to merge leaf search responses")? @@ -711,9 +714,7 @@ pub(crate) async fn search_partial_hits_phase( "Merged leaf search response." ); if !leaf_search_response.failed_splits.is_empty() { - error!(failed_splits = ?leaf_search_response.failed_splits, "leaf search response contains at least one failed split"); - let errors: String = leaf_search_response.failed_splits.iter().join(", "); - return Err(SearchError::Internal(errors)); + quickwit_common::rate_limited_error!(limit_per_min=6, failed_splits = ?leaf_search_response.failed_splits, "leaf search response contains at least one failed split"); } Ok(leaf_search_response) } @@ -931,6 +932,7 @@ async fn root_search_aux( scroll_id: scroll_key_and_start_offset_opt .as_ref() .map(ToString::to_string), + failed_splits: first_phase_result.failed_splits, }) } @@ -4765,7 +4767,11 @@ mod tests { + leaf_search_req.leaf_requests[1].split_offsets.len() as u64, partial_hits, failed_splits: Vec::new(), - num_attempted_splits: 1, + num_attempted_splits: leaf_search_req + .leaf_requests + .iter() + .map(|leaf_req| leaf_req.split_offsets.len().into()) + .sum::(), ..Default::default() }) }, @@ -4807,4 +4813,118 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn test_root_search_split_failures() -> anyhow::Result<()> { + let search_request = quickwit_proto::search::SearchRequest { + index_id_patterns: vec!["test-index-1".to_string()], + query_ast: qast_json_helper("test", &["body"]), + max_hits: 10, + ..Default::default() + }; + let mut mock_metastore = MockMetastoreService::new(); + let index_metadata_1 = IndexMetadata::for_test("test-index-1", "ram:///test-index-1"); + let index_uid_1 = index_metadata_1.index_uid.clone(); + mock_metastore.expect_list_indexes_metadata().return_once( + move |_list_indexes_metadata_request: ListIndexesMetadataRequest| { + Ok(ListIndexesMetadataResponse::for_test(vec![ + index_metadata_1, + ])) + }, + ); + mock_metastore + .expect_list_splits() + .return_once(move |list_splits_request| { + let list_splits_query = + list_splits_request.deserialize_list_splits_query().unwrap(); + assert!(list_splits_query.index_uids == vec![index_uid_1.clone(),]); + let splits = vec![ + MockSplitBuilder::new("index-1-split-1") + .with_index_uid(&index_uid_1) + .build(), + MockSplitBuilder::new("index-1-split-2") + .with_index_uid(&index_uid_1) + .build(), + ]; + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) + }); + let mut mock_search_service_1 = MockSearchService::new(); + mock_search_service_1 + .expect_leaf_search() + .withf( + |leaf_search_req: &quickwit_proto::search::LeafSearchRequest| { + leaf_search_req.leaf_requests.len() == 1 + && leaf_search_req.leaf_requests[0].split_offsets.len() == 2 + }, + ) + .times(1) + .returning( + |_leaf_search_req: quickwit_proto::search::LeafSearchRequest| { + let partial_hits = vec![mock_partial_hit("index-1-split-1", 0u64, 1u32)]; + Ok(quickwit_proto::search::LeafSearchResponse { + num_hits: 1, + partial_hits, + failed_splits: vec![{ + SplitSearchError { + error: "some error".to_string(), + split_id: "index-1-split-1".to_string(), + retryable_error: true, + } + }], + num_attempted_splits: 3, + ..Default::default() + }) + }, + ); + mock_search_service_1 + .expect_leaf_search() + .withf( + |leaf_search_req: &quickwit_proto::search::LeafSearchRequest| { + leaf_search_req.leaf_requests.len() == 1 + && leaf_search_req.leaf_requests[0].split_offsets.len() == 1 + }, + ) + .times(1) + .returning( + |_leaf_search_req: quickwit_proto::search::LeafSearchRequest| { + Ok(quickwit_proto::search::LeafSearchResponse { + num_hits: 0, + partial_hits: Vec::new(), + failed_splits: vec![{ + SplitSearchError { + error: "some error".to_string(), + split_id: "index-1-split-1".to_string(), + retryable_error: true, + } + }], + num_attempted_splits: 1, + ..Default::default() + }) + }, + ); + mock_search_service_1 + .expect_fetch_docs() + .times(1) + .returning(|fetch_docs_req| { + Ok(quickwit_proto::search::FetchDocsResponse { + hits: get_doc_for_fetch_req(fetch_docs_req), + }) + }); + let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service_1)]); + let search_job_placer = SearchJobPlacer::new(searcher_pool); + let cluster_client = ClusterClient::new(search_job_placer.clone()); + let search_response = root_search( + &SearcherContext::for_test(), + search_request, + MetastoreServiceClient::from_mock(mock_metastore), + &cluster_client, + ) + .await + .unwrap(); + assert_eq!(search_response.num_hits, 1); + assert_eq!(search_response.hits.len(), 1); + assert_eq!(search_response.failed_splits.len(), 1); + Ok(()) + } } diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index bb21cf6db9b..a446e592400 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -28,7 +28,7 @@ use anyhow::Context; use base64::prelude::BASE64_STANDARD; use base64::Engine; use quickwit_metastore::SplitMetadata; -use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest}; +use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest, SplitSearchError}; use quickwit_proto::types::IndexUid; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; @@ -57,6 +57,7 @@ pub(crate) struct ScrollContext { pub max_hits_per_page: u64, pub cached_partial_hits_start_offset: u64, pub cached_partial_hits: Vec, + pub failed_splits: Vec, } impl ScrollContext { diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 1fb529c3d42..fec30c5b894 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -436,6 +436,7 @@ pub(crate) async fn scroll( scroll_id: Some(next_scroll_id.to_string()), errors: Vec::new(), aggregation: None, + failed_splits: scroll_context.failed_splits, }) } /// [`SearcherContext`] provides a common set of variables diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs index 70409754353..903dfd4ed9c 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs @@ -287,6 +287,11 @@ impl SearchQueryParams { })?; Ok(Some(duration)) } + + pub fn allow_partial_search_results(&self) -> bool { + // By default, elastic search allows partial results. + self.allow_partial_search_results.unwrap_or(true) + } } #[doc = "Whether to expand wildcard expression to concrete indices that are open, closed or both."] diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index e5caa8703bc..f037feb43cc 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -444,6 +444,7 @@ async fn es_compat_index_search( let _source_excludes = search_params._source_excludes.clone(); let _source_includes = search_params._source_includes.clone(); let start_instant = Instant::now(); + let allow_partial_search_results = search_params.allow_partial_search_results(); let (search_request, append_shard_doc) = build_request_for_es_api(index_id_patterns, search_params, search_body)?; let search_response: SearchResponse = search_service.root_search(search_request).await?; @@ -453,7 +454,8 @@ async fn es_compat_index_search( append_shard_doc, _source_excludes, _source_includes, - ); + allow_partial_search_results, + )?; search_response_rest.took = elapsed.as_millis() as u32; Ok(search_response_rest) } @@ -791,6 +793,7 @@ async fn es_compat_index_multi_search( build_request_for_es_api(index_ids_patterns, search_query_params, search_body)?; search_requests.push(es_request); } + // TODO: forced to do weird referencing to work around https://github.com/rust-lang/rust/issues/100905 // otherwise append_shard_doc is captured by ref, and we get lifetime issues let futures = search_requests @@ -804,12 +807,14 @@ async fn es_compat_index_multi_search( let search_response: SearchResponse = search_service.clone().root_search(search_request).await?; let elapsed = start_instant.elapsed(); - let mut search_response_rest: ElasticsearchResponse = convert_to_es_search_response( - search_response, - append_shard_doc, - _source_excludes, - _source_includes, - ); + let mut search_response_rest: ElasticsearchResponse = + convert_to_es_search_response( + search_response, + append_shard_doc, + _source_excludes, + _source_includes, + true, //< allow_partial_results. Set to to true to match ES's behavior. + )?; search_response_rest.took = elapsed.as_millis() as u32; Ok::<_, ElasticsearchError>(search_response_rest) } @@ -853,7 +858,7 @@ async fn es_scroll( let search_response: SearchResponse = search_service.scroll(scroll_request).await?; // TODO append_shard_doc depends on the initial request, but we don't have access to it let mut search_response_rest: ElasticsearchResponse = - convert_to_es_search_response(search_response, false, None, None); + convert_to_es_search_response(search_response, false, None, None, true)?; search_response_rest.took = start_instant.elapsed().as_millis() as u32; Ok(search_response_rest) } @@ -918,7 +923,13 @@ fn convert_to_es_search_response( append_shard_doc: bool, _source_excludes: Option>, _source_includes: Option>, -) -> ElasticsearchResponse { + allow_partial_results: bool, +) -> Result { + if allow_partial_results { + if let Some(search_error) = SearchError::from_split_errors(&resp.failed_splits) { + return Err(ElasticsearchError::from(search_error)); + } + } let hits: Vec = resp .hits .into_iter() @@ -929,7 +940,7 @@ fn convert_to_es_search_response( } else { None }; - ElasticsearchResponse { + Ok(ElasticsearchResponse { timed_out: false, hits: HitsMetadata { total: Some(TotalHits { @@ -942,7 +953,7 @@ fn convert_to_es_search_response( aggregations, scroll_id: resp.scroll_id, ..Default::default() - } + }) } pub(crate) fn str_lines(body: &str) -> impl Iterator { diff --git a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs index 1525c4d2b8b..01fd4a412a4 100644 --- a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs @@ -463,6 +463,7 @@ mod tests { errors: Vec::new(), aggregation: None, scroll_id: None, + failed_splits: Vec::new(), }) }); let mock_search_service = Arc::new(mock_search_service); @@ -494,6 +495,7 @@ mod tests { errors: Vec::new(), aggregation: None, scroll_id: None, + failed_splits: Vec::new(), }) }); let mock_search_service = Arc::new(mock_search_service); diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index cf9b5d40c84..7cb60ecb222 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -239,6 +239,10 @@ pub struct SearchRequestQueryString { #[serde(with = "count_hits_from_bool")] #[serde(default = "count_hits_from_bool::default")] pub count_all: CountHits, + #[param(value_type = bool)] + #[schema(value_type = bool)] + #[serde(default)] + pub allow_failed_splits: bool, } mod count_hits_from_bool { @@ -302,8 +306,23 @@ async fn search_endpoint( search_request: SearchRequestQueryString, search_service: &dyn SearchService, ) -> Result { + let allow_failed_splits = search_request.allow_failed_splits; let search_request = search_request_from_api_request(index_id_patterns, search_request)?; - let search_response = search_service.root_search(search_request).await?; + let search_response = + search_service + .root_search(search_request) + .await + .and_then(|search_response| { + // We consider case where + if allow_failed_splits { + if let Some(search_error) = + SearchError::from_split_errors(&search_response.failed_splits[..]) + { + return Err(search_error); + } + } + Ok(search_response) + })?; let search_response_rest = SearchResponseRest::try_from(search_response)?; Ok(search_response_rest) }