diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 716d014e75e..4b187033862 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -576,6 +576,47 @@ async fn search_partial_hits_phase_with_scroll( } } +/// Check if the request is a count request without any filters, so we can just return the split +/// metadata count. +/// +/// This is done by exclusion, so we will need to keep it up to date if fields are added. +fn is_metadata_count_request(request: &SearchRequest) -> bool { + let query_ast: QueryAst = serde_json::from_str(&request.query_ast).unwrap(); + if query_ast != QueryAst::MatchAll { + return false; + } + if request.max_hits != 0 { + return false; + } + + // TODO: if the start and end timestamp encompass the whole split, it is still a count query + // So some could be checked on metadata + if request.start_timestamp.is_some() || request.end_timestamp.is_some() { + return false; + } + if request.aggregation_request.is_some() + || !request.snippet_fields.is_empty() + || request.search_after.is_some() + { + return false; + } + true +} + +/// Get a leaf search response that returns the num_docs of the split +fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec { + split_metadatas + .iter() + .map(|metadata| LeafSearchResponse { + num_hits: metadata.num_docs as u64, + partial_hits: Vec::new(), + failed_splits: Vec::new(), + num_attempted_splits: 1, + intermediate_aggregation_result: None, + }) + .collect() +} + #[instrument(level = "debug", skip_all)] pub(crate) async fn search_partial_hits_phase( searcher_context: &SearcherContext, @@ -584,20 +625,29 @@ pub(crate) async fn search_partial_hits_phase( split_metadatas: &[SplitMetadata], cluster_client: &ClusterClient, ) -> crate::Result { - let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); - let assigned_leaf_search_jobs = cluster_client - .search_job_placer - .assign_jobs(jobs, &HashSet::default()) - .await?; - let mut leaf_request_tasks = Vec::new(); - for (client, client_jobs) in assigned_leaf_search_jobs { - let leaf_requests = - jobs_to_leaf_requests(search_request, indexes_metas_for_leaf_search, client_jobs)?; - for leaf_request in leaf_requests { - leaf_request_tasks.push(cluster_client.leaf_search(leaf_request, client.clone())); - } - } - let leaf_search_responses: Vec = try_join_all(leaf_request_tasks).await?; + let leaf_search_responses: Vec = + if is_metadata_count_request(search_request) { + get_count_from_metadata(split_metadatas) + } else { + let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); + let assigned_leaf_search_jobs = cluster_client + .search_job_placer + .assign_jobs(jobs, &HashSet::default()) + .await?; + let mut leaf_request_tasks = Vec::new(); + for (client, client_jobs) in assigned_leaf_search_jobs { + let leaf_requests = jobs_to_leaf_requests( + search_request, + indexes_metas_for_leaf_search, + client_jobs, + )?; + for leaf_request in leaf_requests { + leaf_request_tasks + .push(cluster_client.leaf_search(leaf_request, client.clone())); + } + } + try_join_all(leaf_request_tasks).await? + }; // Creates a collector which merges responses into one let merge_collector = diff --git a/quickwit/quickwit-serve/src/elastic_search_api/filter.rs b/quickwit/quickwit-serve/src/elastic_search_api/filter.rs index 818dad5455a..2e41c6d013b 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/filter.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/filter.rs @@ -25,6 +25,7 @@ use warp::{Filter, Rejection}; use super::model::{ FieldCapabilityQueryParams, FieldCapabilityRequestBody, MultiSearchQueryParams, + SearchQueryParamsCount, }; use crate::elastic_search_api::model::{ ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams, @@ -159,6 +160,17 @@ pub(crate) fn elastic_field_capabilities_filter() -> impl Filter< .and(json_or_empty()) } +#[utoipa::path(get, tag = "Count", path = "/{index}/_count")] +pub(crate) fn elastic_index_count_filter( +) -> impl Filter, SearchQueryParamsCount, SearchBody), Error = Rejection> + Clone +{ + warp::path!("_elastic" / String / "_count") + .and_then(extract_index_id_patterns) + .and(warp::get().or(warp::post()).unify()) + .and(serde_qs::warp::query(serde_qs::Config::default())) + .and(json_or_empty()) +} + #[utoipa::path(get, tag = "Search", path = "/{index}/_search")] pub(crate) fn elastic_index_search_filter( ) -> impl Filter, SearchQueryParams, SearchBody), Error = Rejection> + Clone diff --git a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs index 4e36184013d..99fa2605911 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs @@ -39,7 +39,9 @@ use rest_handler::{ use serde::{Deserialize, Serialize}; use warp::{Filter, Rejection}; -use self::rest_handler::es_compat_index_field_capabilities_handler; +use self::rest_handler::{ + es_compat_index_count_handler, es_compat_index_field_capabilities_handler, +}; use crate::elastic_search_api::model::ElasticSearchError; use crate::json_api_response::JsonApiResponse; use crate::{BodyFormat, BuildInfo}; @@ -57,6 +59,7 @@ pub fn elastic_api_handlers( es_compat_cluster_info_handler(node_config, BuildInfo::get()) .or(es_compat_search_handler(search_service.clone())) .or(es_compat_index_search_handler(search_service.clone())) + .or(es_compat_index_count_handler(search_service.clone())) .or(es_compat_scroll_handler(search_service.clone())) .or(es_compat_index_multi_search_handler(search_service.clone())) .or(es_compat_index_field_capabilities_handler( diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs index d5731ff8806..33776c5a097 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs @@ -39,7 +39,7 @@ pub use multi_search::{ use quickwit_proto::search::{SortDatetimeFormat, SortOrder}; pub use scroll::ScrollQueryParams; pub use search_body::SearchBody; -pub use search_query_params::SearchQueryParams; +pub use search_query_params::{SearchQueryParams, SearchQueryParamsCount}; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Eq, PartialEq)] diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/search_query_params.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/search_query_params.rs index 87b53c9bfd4..443c190ad66 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/model/search_query_params.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/search_query_params.rs @@ -152,6 +152,64 @@ pub struct SearchQueryParams { pub version: Option, } +#[serde_with::skip_serializing_none] +#[derive(Default, Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct SearchQueryParamsCount { + #[serde(default)] + pub allow_no_indices: Option, + #[serde(default)] + pub analyze_wildcard: Option, + #[serde(default)] + pub analyzer: Option, + #[serde(default)] + pub default_operator: Option, + #[serde(default)] + pub df: Option, + #[serde(serialize_with = "to_simple_list")] + #[serde(deserialize_with = "from_simple_list")] + #[serde(default)] + pub expand_wildcards: Option>, + #[serde(default)] + pub ignore_throttled: Option, + #[serde(default)] + pub ignore_unavailable: Option, + #[serde(default)] + pub lenient: Option, + #[serde(default)] + pub max_concurrent_shard_requests: Option, + #[serde(default)] + pub preference: Option, + #[serde(default)] + pub q: Option, + #[serde(default)] + pub request_cache: Option, + #[serde(serialize_with = "to_simple_list")] + #[serde(deserialize_with = "from_simple_list")] + #[serde(default)] + pub routing: Option>, +} +impl From for SearchQueryParams { + fn from(value: SearchQueryParamsCount) -> Self { + SearchQueryParams { + allow_no_indices: value.allow_no_indices, + analyze_wildcard: value.analyze_wildcard, + analyzer: value.analyzer, + default_operator: value.default_operator, + df: value.df, + expand_wildcards: value.expand_wildcards, + ignore_throttled: value.ignore_throttled, + ignore_unavailable: value.ignore_unavailable, + preference: value.preference, + q: value.q, + request_cache: value.request_cache, + routing: value.routing, + size: Some(0), + ..Default::default() + } + } +} + // Parse a single sort field parameter from ES sort query string parameter. fn parse_sort_field_str(sort_field_str: &str) -> Result { if let Some((field, order_str)) = sort_field_str.split_once(':') { diff --git a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs index 48ebc53bdea..1ea2e600a12 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs @@ -38,11 +38,12 @@ use quickwit_proto::ServiceErrorCode; use quickwit_query::query_ast::{QueryAst, UserInputQuery}; use quickwit_query::BooleanOperand; use quickwit_search::{SearchError, SearchService}; +use serde::{Deserialize, Serialize}; use serde_json::json; use warp::{Filter, Rejection}; use super::filter::{ - elastic_cluster_info_filter, elastic_field_capabilities_filter, + elastic_cluster_info_filter, elastic_field_capabilities_filter, elastic_index_count_filter, elastic_index_field_capabilities_filter, elastic_index_search_filter, elastic_multi_search_filter, elastic_scroll_filter, elastic_search_filter, }; @@ -51,6 +52,7 @@ use super::model::{ ElasticSearchError, FieldCapabilityQueryParams, FieldCapabilityRequestBody, FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams, + SearchQueryParamsCount, }; use super::{make_elastic_api_response, TrackTotalHits}; use crate::format::BodyFormat; @@ -119,6 +121,16 @@ pub fn es_compat_index_search_handler( .map(|result| make_elastic_api_response(result, BodyFormat::default())) } +/// GET or POST _elastic/{index}/_count +pub fn es_compat_index_count_handler( + search_service: Arc, +) -> impl Filter + Clone { + elastic_index_count_filter() + .and(with_arg(search_service)) + .then(es_compat_index_count) + .map(|result| make_elastic_api_response(result, BodyFormat::default())) +} + /// POST _elastic/_search pub fn es_compat_index_multi_search_handler( search_service: Arc, @@ -287,6 +299,27 @@ fn partial_hit_from_search_after_param( Ok(Some(parsed_search_after)) } +#[derive(Debug, Serialize, Deserialize)] +struct ElasticSearchCountResponse { + count: u64, +} + +async fn es_compat_index_count( + index_id_patterns: Vec, + search_params: SearchQueryParamsCount, + search_body: SearchBody, + search_service: Arc, +) -> Result { + let search_params: SearchQueryParams = search_params.into(); + let (search_request, _append_shard_doc) = + build_request_for_es_api(index_id_patterns, search_params, search_body)?; + let search_response: SearchResponse = search_service.root_search(search_request).await?; + let search_response_rest: ElasticSearchCountResponse = ElasticSearchCountResponse { + count: search_response.num_hits, + }; + Ok(search_response_rest) +} + async fn es_compat_index_search( index_id_patterns: Vec, search_params: SearchQueryParams, diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0019-count.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0019-count.yaml new file mode 100644 index 00000000000..4b0d62c51ea --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0019-count.yaml @@ -0,0 +1,10 @@ +endpoint: "gharchive/_count" +params: + q: type:PushEvent +expected: + count: 60 +--- +endpoint: "gharchive/_count" +expected: + count: 100 +