Skip to content

Commit

Permalink
Merge branch 'main' into self-schedule-not-async
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Jan 18, 2024
2 parents 94cb865 + ffcd482 commit 4b5a6f6
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 17 deletions.
78 changes: 64 additions & 14 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,47 @@ async fn search_partial_hits_phase_with_scroll(
}
}

/// Check if the request is a count request without any filters, so we can just return the split
/// metadata count.
///
/// This is done by exclusion, so we will need to keep it up to date if fields are added.
fn is_metadata_count_request(request: &SearchRequest) -> bool {
let query_ast: QueryAst = serde_json::from_str(&request.query_ast).unwrap();
if query_ast != QueryAst::MatchAll {
return false;
}
if request.max_hits != 0 {
return false;
}

// TODO: if the start and end timestamp encompass the whole split, it is still a count query
// So some could be checked on metadata
if request.start_timestamp.is_some() || request.end_timestamp.is_some() {
return false;
}
if request.aggregation_request.is_some()
|| !request.snippet_fields.is_empty()
|| request.search_after.is_some()
{
return false;
}
true
}

/// Get a leaf search response that returns the num_docs of the split
fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec<LeafSearchResponse> {
split_metadatas
.iter()
.map(|metadata| LeafSearchResponse {
num_hits: metadata.num_docs as u64,
partial_hits: Vec::new(),
failed_splits: Vec::new(),
num_attempted_splits: 1,
intermediate_aggregation_result: None,
})
.collect()
}

#[instrument(level = "debug", skip_all)]
pub(crate) async fn search_partial_hits_phase(
searcher_context: &SearcherContext,
Expand All @@ -584,20 +625,29 @@ pub(crate) async fn search_partial_hits_phase(
split_metadatas: &[SplitMetadata],
cluster_client: &ClusterClient,
) -> crate::Result<LeafSearchResponse> {
let jobs: Vec<SearchJob> = split_metadatas.iter().map(SearchJob::from).collect();
let assigned_leaf_search_jobs = cluster_client
.search_job_placer
.assign_jobs(jobs, &HashSet::default())
.await?;
let mut leaf_request_tasks = Vec::new();
for (client, client_jobs) in assigned_leaf_search_jobs {
let leaf_requests =
jobs_to_leaf_requests(search_request, indexes_metas_for_leaf_search, client_jobs)?;
for leaf_request in leaf_requests {
leaf_request_tasks.push(cluster_client.leaf_search(leaf_request, client.clone()));
}
}
let leaf_search_responses: Vec<LeafSearchResponse> = try_join_all(leaf_request_tasks).await?;
let leaf_search_responses: Vec<LeafSearchResponse> =
if is_metadata_count_request(search_request) {
get_count_from_metadata(split_metadatas)
} else {
let jobs: Vec<SearchJob> = split_metadatas.iter().map(SearchJob::from).collect();
let assigned_leaf_search_jobs = cluster_client
.search_job_placer
.assign_jobs(jobs, &HashSet::default())
.await?;
let mut leaf_request_tasks = Vec::new();
for (client, client_jobs) in assigned_leaf_search_jobs {
let leaf_requests = jobs_to_leaf_requests(
search_request,
indexes_metas_for_leaf_search,
client_jobs,
)?;
for leaf_request in leaf_requests {
leaf_request_tasks
.push(cluster_client.leaf_search(leaf_request, client.clone()));
}
}
try_join_all(leaf_request_tasks).await?
};

// Creates a collector which merges responses into one
let merge_collector =
Expand Down
12 changes: 12 additions & 0 deletions quickwit/quickwit-serve/src/elastic_search_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use warp::{Filter, Rejection};

use super::model::{
FieldCapabilityQueryParams, FieldCapabilityRequestBody, MultiSearchQueryParams,
SearchQueryParamsCount,
};
use crate::elastic_search_api::model::{
ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams,
Expand Down Expand Up @@ -159,6 +160,17 @@ pub(crate) fn elastic_field_capabilities_filter() -> impl Filter<
.and(json_or_empty())
}

#[utoipa::path(get, tag = "Count", path = "/{index}/_count")]
pub(crate) fn elastic_index_count_filter(
) -> impl Filter<Extract = (Vec<String>, SearchQueryParamsCount, SearchBody), Error = Rejection> + Clone
{
warp::path!("_elastic" / String / "_count")
.and_then(extract_index_id_patterns)
.and(warp::get().or(warp::post()).unify())
.and(serde_qs::warp::query(serde_qs::Config::default()))
.and(json_or_empty())
}

#[utoipa::path(get, tag = "Search", path = "/{index}/_search")]
pub(crate) fn elastic_index_search_filter(
) -> impl Filter<Extract = (Vec<String>, SearchQueryParams, SearchBody), Error = Rejection> + Clone
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-serve/src/elastic_search_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use rest_handler::{
use serde::{Deserialize, Serialize};
use warp::{Filter, Rejection};

use self::rest_handler::es_compat_index_field_capabilities_handler;
use self::rest_handler::{
es_compat_index_count_handler, es_compat_index_field_capabilities_handler,
};
use crate::elastic_search_api::model::ElasticSearchError;
use crate::json_api_response::JsonApiResponse;
use crate::{BodyFormat, BuildInfo};
Expand All @@ -57,6 +59,7 @@ pub fn elastic_api_handlers(
es_compat_cluster_info_handler(node_config, BuildInfo::get())
.or(es_compat_search_handler(search_service.clone()))
.or(es_compat_index_search_handler(search_service.clone()))
.or(es_compat_index_count_handler(search_service.clone()))
.or(es_compat_scroll_handler(search_service.clone()))
.or(es_compat_index_multi_search_handler(search_service.clone()))
.or(es_compat_index_field_capabilities_handler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub use multi_search::{
use quickwit_proto::search::{SortDatetimeFormat, SortOrder};
pub use scroll::ScrollQueryParams;
pub use search_body::SearchBody;
pub use search_query_params::SearchQueryParams;
pub use search_query_params::{SearchQueryParams, SearchQueryParamsCount};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Eq, PartialEq)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,64 @@ pub struct SearchQueryParams {
pub version: Option<bool>,
}

#[serde_with::skip_serializing_none]
#[derive(Default, Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SearchQueryParamsCount {
#[serde(default)]
pub allow_no_indices: Option<bool>,
#[serde(default)]
pub analyze_wildcard: Option<bool>,
#[serde(default)]
pub analyzer: Option<String>,
#[serde(default)]
pub default_operator: Option<BooleanOperand>,
#[serde(default)]
pub df: Option<String>,
#[serde(serialize_with = "to_simple_list")]
#[serde(deserialize_with = "from_simple_list")]
#[serde(default)]
pub expand_wildcards: Option<Vec<ExpandWildcards>>,
#[serde(default)]
pub ignore_throttled: Option<bool>,
#[serde(default)]
pub ignore_unavailable: Option<bool>,
#[serde(default)]
pub lenient: Option<bool>,
#[serde(default)]
pub max_concurrent_shard_requests: Option<u64>,
#[serde(default)]
pub preference: Option<String>,
#[serde(default)]
pub q: Option<String>,
#[serde(default)]
pub request_cache: Option<bool>,
#[serde(serialize_with = "to_simple_list")]
#[serde(deserialize_with = "from_simple_list")]
#[serde(default)]
pub routing: Option<Vec<String>>,
}
impl From<SearchQueryParamsCount> for SearchQueryParams {
fn from(value: SearchQueryParamsCount) -> Self {
SearchQueryParams {
allow_no_indices: value.allow_no_indices,
analyze_wildcard: value.analyze_wildcard,
analyzer: value.analyzer,
default_operator: value.default_operator,
df: value.df,
expand_wildcards: value.expand_wildcards,
ignore_throttled: value.ignore_throttled,
ignore_unavailable: value.ignore_unavailable,
preference: value.preference,
q: value.q,
request_cache: value.request_cache,
routing: value.routing,
size: Some(0),
..Default::default()
}
}
}

// Parse a single sort field parameter from ES sort query string parameter.
fn parse_sort_field_str(sort_field_str: &str) -> Result<SortField, SearchError> {
if let Some((field, order_str)) = sort_field_str.split_once(':') {
Expand Down
35 changes: 34 additions & 1 deletion quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ use quickwit_proto::ServiceErrorCode;
use quickwit_query::query_ast::{QueryAst, UserInputQuery};
use quickwit_query::BooleanOperand;
use quickwit_search::{SearchError, SearchService};
use serde::{Deserialize, Serialize};
use serde_json::json;
use warp::{Filter, Rejection};

use super::filter::{
elastic_cluster_info_filter, elastic_field_capabilities_filter,
elastic_cluster_info_filter, elastic_field_capabilities_filter, elastic_index_count_filter,
elastic_index_field_capabilities_filter, elastic_index_search_filter,
elastic_multi_search_filter, elastic_scroll_filter, elastic_search_filter,
};
Expand All @@ -51,6 +52,7 @@ use super::model::{
ElasticSearchError, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse,
MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams,
SearchQueryParamsCount,
};
use super::{make_elastic_api_response, TrackTotalHits};
use crate::format::BodyFormat;
Expand Down Expand Up @@ -119,6 +121,16 @@ pub fn es_compat_index_search_handler(
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

/// GET or POST _elastic/{index}/_count
pub fn es_compat_index_count_handler(
search_service: Arc<dyn SearchService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_index_count_filter()
.and(with_arg(search_service))
.then(es_compat_index_count)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

/// POST _elastic/_search
pub fn es_compat_index_multi_search_handler(
search_service: Arc<dyn SearchService>,
Expand Down Expand Up @@ -287,6 +299,27 @@ fn partial_hit_from_search_after_param(
Ok(Some(parsed_search_after))
}

#[derive(Debug, Serialize, Deserialize)]
struct ElasticSearchCountResponse {
count: u64,
}

async fn es_compat_index_count(
index_id_patterns: Vec<String>,
search_params: SearchQueryParamsCount,
search_body: SearchBody,
search_service: Arc<dyn SearchService>,
) -> Result<ElasticSearchCountResponse, ElasticSearchError> {
let search_params: SearchQueryParams = search_params.into();
let (search_request, _append_shard_doc) =
build_request_for_es_api(index_id_patterns, search_params, search_body)?;
let search_response: SearchResponse = search_service.root_search(search_request).await?;
let search_response_rest: ElasticSearchCountResponse = ElasticSearchCountResponse {
count: search_response.num_hits,
};
Ok(search_response_rest)
}

async fn es_compat_index_search(
index_id_patterns: Vec<String>,
search_params: SearchQueryParams,
Expand Down
10 changes: 10 additions & 0 deletions quickwit/rest-api-tests/scenarii/es_compatibility/0019-count.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
endpoint: "gharchive/_count"
params:
q: type:PushEvent
expected:
count: 60
---
endpoint: "gharchive/_count"
expected:
count: 100

0 comments on commit 4b5a6f6

Please sign in to comment.