Skip to content

Commit

Permalink
Allowing failed splits in root search.
Browse files Browse the repository at this point in the history
This PR adds a query string parameter to decide whether
encounterring an error on a subset of the splits, should fail a
search query or if a response (reflecting the partial set of successful
splits) should be returned.

It changes the behavior of the root search gRPC call, that will not return
an error in presence of partial split errors, but instead returns the list of
splits that failed.

This PR also changes the default behavior of the elasticsearch rest API: following
elasticsearch behavior on failing shards, quickwit will allow split errors.

The Quickwit API default behavior, on the other hand, is unchanged.

Closes #5411
  • Loading branch information
fulmicoton committed Sep 20, 2024
1 parent 7d357fa commit 0dfec8f
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 64 deletions.
1 change: 1 addition & 0 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> {
format: BodyFormat::Json,
sort_by,
count_all: CountHits::CountAll,
allow_failed_splits: false,
};
let search_request =
search_request_from_api_request(vec![args.index_id], search_request_query_string)?;
Expand Down
23 changes: 15 additions & 8 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ message ListFieldsRequest {
// Optional limit query to a list of fields
// Wildcard expressions are supported.
repeated string fields = 2;

// Time filter, expressed in seconds since epoch.
// That filter is to be interpreted as the semi-open interval:
// [start_timestamp, end_timestamp).
optional int64 start_timestamp = 3;
optional int64 end_timestamp = 4;

// Control if the the request will fail if split_ids contains a split that does not exist.
// Control if the the request will fail if split_ids contains a split that does not exist.
// optional bool fail_on_missing_index = 6;
}

Expand All @@ -149,7 +149,7 @@ message LeafListFieldsRequest {
// Optional limit query to a list of fields
// Wildcard expressions are supported.
repeated string fields = 4;

}

message ListFieldsResponse {
Expand Down Expand Up @@ -299,6 +299,14 @@ message SearchResponse {

// Scroll Id (only set if scroll_secs was set in the request)
optional string scroll_id = 6;

// Returns the list of splits for which search failed.
// For the moment, the cause is unknown.
//
// It is up to the caller to decide whether to interpret
// this as an overall failure or to present the partial results
// to the end user.
repeated SplitSearchError failed_splits = 7;
}

message SearchPlanResponse {
Expand Down Expand Up @@ -340,7 +348,7 @@ message LeafSearchRequest {
message LeafRequestRef {
// The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers`
uint32 doc_mapper_ord = 1;

// The ordinal of the index uri in LeafSearchRequest.index_uris
uint32 index_uri_ord = 2;

Expand Down Expand Up @@ -453,8 +461,8 @@ message LeafSearchResponse {
// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple.
repeated SplitSearchError failed_splits = 3;

// Total number of splits the leaf(s) were in charge of.
// num_attempted_splits = num_successful_splits + num_failed_splits.
// Total number of attempt to search into splits.
// num_attempted_splits = num_successful_splits + number of .
uint64 num_attempted_splits = 4;

// Deprecated json serialized intermediate aggregation_result.
Expand Down Expand Up @@ -550,8 +558,7 @@ message LeafListTermsResponse {
// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple.
repeated SplitSearchError failed_splits = 3;

// Total number of splits the leaf(s) were in charge of.
// num_attempted_splits = num_successful_splits + num_failed_splits.
// Total number of single split search attempted.
uint64 num_attempted_splits = 4;
}

Expand Down
15 changes: 11 additions & 4 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 58 additions & 32 deletions quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,36 @@ impl ClusterClient {
) -> crate::Result<LeafSearchResponse> {
let mut response_res = client.leaf_search(request.clone()).await;
let retry_policy = LeafSearchRetryPolicy {};
if let Some(retry_request) = retry_policy.retry_request(request, &response_res) {
assert!(!retry_request.leaf_requests.is_empty());
client = retry_client(
&self.search_job_placer,
client.grpc_addr(),
&retry_request.leaf_requests[0].split_offsets[0].split_id,
)
.await?;
debug!(
"Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}",
response_res, retry_request, client
// We retry only once.
let Some(retry_request) = retry_policy.retry_request(request, &response_res) else {
return response_res;
};
let Some(first_split) = retry_request
.leaf_requests
.iter()
.flat_map(|leaf_req| leaf_req.split_offsets.iter())
.next()
else {
warn!(
"the retry request did not contain any split to retry. this should never happen, \
please report"
);
let retry_result = client.leaf_search(retry_request).await;
response_res = merge_leaf_search_results(response_res, retry_result);
}
return response_res;
};
// There could be more than one split in the retry request. We pick a single client
// arbitrarily only considering the affinity of the first split.
client = retry_client(
&self.search_job_placer,
client.grpc_addr(),
&first_split.split_id,
)
.await?;
debug!(
"Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}",
response_res, retry_request, client
);
let retry_result = client.leaf_search(retry_request).await;
response_res = merge_original_with_retry_leaf_search_results(response_res, retry_result);
response_res
}

Expand Down Expand Up @@ -274,16 +289,24 @@ fn merge_intermediate_aggregation(left: &[u8], right: &[u8]) -> crate::Result<Ve
Ok(serialized)
}

fn merge_leaf_search_response(
mut left_response: LeafSearchResponse,
right_response: LeafSearchResponse,
/// Merge two leaf search response.
///
/// # Quirk
///
/// This is implemented for a retries.
/// For instance, the set of attempted splits of right is supposed to be the set of failed
/// list of the left one, so that the list of the overal failed splits is the list of splits on the
/// `right_response`.
fn merge_original_with_retry_leaf_search_response(
mut original_response: LeafSearchResponse,
retry_response: LeafSearchResponse,
) -> crate::Result<LeafSearchResponse> {
left_response
original_response
.partial_hits
.extend(right_response.partial_hits);
.extend(retry_response.partial_hits);
let intermediate_aggregation_result: Option<Vec<u8>> = match (
left_response.intermediate_aggregation_result,
right_response.intermediate_aggregation_result,
original_response.intermediate_aggregation_result,
retry_response.intermediate_aggregation_result,
) {
(Some(left_agg_bytes), Some(right_agg_bytes)) => {
let intermediate_aggregation_bytes: Vec<u8> =
Expand All @@ -296,22 +319,22 @@ fn merge_leaf_search_response(
};
Ok(LeafSearchResponse {
intermediate_aggregation_result,
num_hits: left_response.num_hits + right_response.num_hits,
num_attempted_splits: left_response.num_attempted_splits
+ right_response.num_attempted_splits,
failed_splits: right_response.failed_splits,
partial_hits: left_response.partial_hits,
num_hits: original_response.num_hits + retry_response.num_hits,
num_attempted_splits: original_response.num_attempted_splits
+ retry_response.num_attempted_splits,
failed_splits: retry_response.failed_splits,
partial_hits: original_response.partial_hits,
})
}

// Merge initial leaf search results with results obtained from a retry.
fn merge_leaf_search_results(
fn merge_original_with_retry_leaf_search_results(
left_search_response_result: crate::Result<LeafSearchResponse>,
right_search_response_result: crate::Result<LeafSearchResponse>,
) -> crate::Result<LeafSearchResponse> {
match (left_search_response_result, right_search_response_result) {
(Ok(left_response), Ok(right_response)) => {
merge_leaf_search_response(left_response, right_response)
merge_original_with_retry_leaf_search_response(left_response, right_response)
}
(Ok(single_valid_response), Err(_)) => Ok(single_valid_response),
(Err(_), Ok(single_valid_response)) => Ok(single_valid_response),
Expand Down Expand Up @@ -626,8 +649,11 @@ mod tests {
num_attempted_splits: 1,
..Default::default()
};
let merged_leaf_search_response =
merge_leaf_search_results(Ok(leaf_response), Ok(leaf_response_retry)).unwrap();
let merged_leaf_search_response = merge_original_with_retry_leaf_search_results(
Ok(leaf_response),
Ok(leaf_response_retry),
)
.unwrap();
assert_eq!(merged_leaf_search_response.num_attempted_splits, 2);
assert_eq!(merged_leaf_search_response.num_hits, 2);
assert_eq!(merged_leaf_search_response.partial_hits.len(), 2);
Expand All @@ -649,7 +675,7 @@ mod tests {
num_attempted_splits: 1,
..Default::default()
};
let merged_result = merge_leaf_search_results(
let merged_result = merge_original_with_retry_leaf_search_results(
Err(SearchError::Internal("error".to_string())),
Ok(leaf_response),
)
Expand All @@ -663,7 +689,7 @@ mod tests {

#[test]
fn test_merge_leaf_search_retry_error_on_error() -> anyhow::Result<()> {
let merge_error = merge_leaf_search_results(
let merge_error = merge_original_with_retry_leaf_search_results(
Err(SearchError::Internal("error".to_string())),
Err(SearchError::Internal("retry error".to_string())),
)
Expand Down
19 changes: 19 additions & 0 deletions quickwit/quickwit-search/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use itertools::Itertools;
use quickwit_common::rate_limited_error;
use quickwit_doc_mapper::QueryParserError;
use quickwit_proto::error::grpc_error_to_grpc_status;
use quickwit_proto::metastore::{EntityKind, MetastoreError};
use quickwit_proto::search::SplitSearchError;
use quickwit_proto::{tonic, GrpcServiceError, ServiceError, ServiceErrorCode};
use quickwit_storage::StorageResolverError;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -53,6 +55,23 @@ pub enum SearchError {
Unavailable(String),
}

impl SearchError {
/// Creates an internal `SearchError` from a list of split search errors.
pub fn from_split_errors(failed_splits: &[SplitSearchError]) -> Option<SearchError> {
let first_failing_split = failed_splits.first()?;
let failed_splits = failed_splits
.iter()
.map(|failed_split| &failed_split.split_id)
.join(", ");
let error_msg = format!(
"search failed for the following splits: {failed_splits:}. For instance, split {} \
failed with the following error message: {}",
first_failing_split.split_id, first_failing_split.error,
);
Some(SearchError::Internal(error_msg))
}
}

impl ServiceError for SearchError {
fn error_code(&self) -> ServiceErrorCode {
match self {
Expand Down
Loading

0 comments on commit 0dfec8f

Please sign in to comment.