From 74afe9dcc5c4a654d95a071ffd89ee348a372081 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 22 Jan 2024 23:16:59 +0800 Subject: [PATCH] add es _stats API add /{index}/_stats API add /_stats API --- quickwit/quickwit-search/src/lib.rs | 47 +++++++- .../src/elasticsearch_api/bulk.rs | 71 ++++++++--- .../src/elasticsearch_api/filter.rs | 14 +++ .../src/elasticsearch_api/mod.rs | 13 +++ .../src/elasticsearch_api/model/mod.rs | 2 + .../src/elasticsearch_api/model/stats.rs | 110 ++++++++++++++++++ .../src/elasticsearch_api/rest_handler.rs | 94 +++++++++++++-- quickwit/quickwit-serve/src/rest.rs | 1 + .../scenarii/es_compatibility/0020-stats.yaml | 67 +++++++++++ 9 files changed, 393 insertions(+), 26 deletions(-) create mode 100644 quickwit/quickwit-serve/src/elasticsearch_api/model/stats.rs create mode 100644 quickwit/rest-api-tests/scenarii/es_compatibility/0020-stats.yaml diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index af34dd681c1..653467f7001 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -52,7 +52,9 @@ pub use collector::QuickwitAggregations; use metrics::SEARCH_METRICS; use quickwit_common::tower::Pool; use quickwit_doc_mapper::DocMapper; -use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService, MetastoreServiceClient}; +use quickwit_proto::metastore::{ + ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, MetastoreServiceClient, +}; use tantivy::schema::NamedFieldDocument; /// Refer to this as `crate::Result`. @@ -65,8 +67,8 @@ pub use find_trace_ids_collector::FindTraceIdsCollector; use quickwit_config::SearcherConfig; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_metastore::{ - ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, - SplitState, + IndexMetadata, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, + MetastoreServiceStreamSplitsExt, SplitMetadata, SplitState, }; use quickwit_proto::search::{PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets}; use quickwit_proto::types::IndexUid; @@ -81,7 +83,10 @@ pub use crate::cluster_client::ClusterClient; pub use crate::error::{parse_grpc_error, SearchError}; use crate::fetch_docs::fetch_docs; use crate::leaf::leaf_search; -pub use crate::root::{jobs_to_leaf_requests, root_search, IndexMetasForLeafSearch, SearchJob}; +pub use crate::root::{ + check_all_index_metadata_found, jobs_to_leaf_requests, root_search, IndexMetasForLeafSearch, + SearchJob, +}; pub use crate::search_job_placer::{Job, SearchJobPlacer}; pub use crate::search_response_rest::SearchResponseRest; pub use crate::search_stream::root_search_stream; @@ -167,8 +172,16 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn } } +/// Get all splits of given index ids +pub async fn list_all_splits( + index_uids: Vec, + metastore: &mut MetastoreServiceClient, +) -> crate::Result> { + list_relevant_splits(index_uids, None, None, None, metastore).await +} + /// Extract the list of relevant splits for a given request. -async fn list_relevant_splits( +pub async fn list_relevant_splits( index_uids: Vec, start_timestamp: Option, end_timestamp: Option, @@ -196,6 +209,30 @@ async fn list_relevant_splits( Ok(splits_metadata) } +/// List all splits of given index id patterns +/// Returns a mapping from index uid to index id and a list of splits +pub async fn resolve_index_patterns( + index_id_patterns: Vec, + metastore: &mut MetastoreServiceClient, +) -> crate::Result> { + let list_indexes_metadata_request = if index_id_patterns.is_empty() { + ListIndexesMetadataRequest::all() + } else { + ListIndexesMetadataRequest { + index_id_patterns: index_id_patterns.clone(), + } + }; + + // Get the index ids from the request + let indexes_metadata = metastore + .clone() + .list_indexes_metadata(list_indexes_metadata_request) + .await? + .deserialize_indexes_metadata()?; + check_all_index_metadata_found(&indexes_metadata, &index_id_patterns)?; + Ok(indexes_metadata) +} + /// Converts a Tantivy `NamedFieldDocument` into a json string using the /// schema defined by the DocMapper. /// diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs index 5566eaff2b9..be81ea32ad9 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs @@ -150,6 +150,7 @@ mod tests { use quickwit_config::{IngestApiConfig, NodeConfig}; use quickwit_ingest::{FetchRequest, IngestServiceClient, SuggestTruncateRequest}; use quickwit_proto::ingest::router::IngestRouterServiceClient; + use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_search::MockSearchService; use crate::elasticsearch_api::bulk_v2::ElasticBulkResponse; @@ -161,11 +162,17 @@ mod tests { async fn test_bulk_api_returns_404_if_index_id_does_not_exist() { let config = Arc::new(NodeConfig::for_test()); let search_service = Arc::new(MockSearchService::new()); + let metastore_service = MetastoreServiceClient::mock(); let (universe, _temp_dir, ingest_service, _) = setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await; let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); - let elastic_api_handlers = - elastic_api_handlers(config, search_service, ingest_service, ingest_router); + let elastic_api_handlers = elastic_api_handlers( + config, + search_service, + ingest_service, + ingest_router, + metastore_service.into(), + ); let payload = r#" { "create" : { "_index" : "my-index", "_id" : "1"} } {"id": 1, "message": "push"} @@ -185,11 +192,17 @@ mod tests { async fn test_bulk_api_returns_200() { let config = Arc::new(NodeConfig::for_test()); let search_service = Arc::new(MockSearchService::new()); + let metastore_service = MetastoreServiceClient::mock(); let (universe, _temp_dir, ingest_service, _) = setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await; let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); - let elastic_api_handlers = - elastic_api_handlers(config, search_service, ingest_service, ingest_router); + let elastic_api_handlers = elastic_api_handlers( + config, + search_service, + ingest_service, + ingest_router, + metastore_service.into(), + ); let payload = r#" { "create" : { "_index" : "my-index-1", "_id" : "1"} } {"id": 1, "message": "push"} @@ -213,11 +226,17 @@ mod tests { async fn test_bulk_api_returns_200_if_payload_has_blank_lines() { let config = Arc::new(NodeConfig::for_test()); let search_service = Arc::new(MockSearchService::new()); + let metastore_service = MetastoreServiceClient::mock(); let (universe, _temp_dir, ingest_service, _) = setup_ingest_service(&["my-index-1"], &IngestApiConfig::default()).await; let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); - let elastic_api_handlers = - elastic_api_handlers(config, search_service, ingest_service, ingest_router); + let elastic_api_handlers = elastic_api_handlers( + config, + search_service, + ingest_service, + ingest_router, + metastore_service.into(), + ); let payload = " {\"create\": {\"_index\": \"my-index-1\", \"_id\": \"1674834324802805760\"}} \u{20}\u{20}\u{20}\u{20}\n @@ -238,11 +257,17 @@ mod tests { async fn test_bulk_index_api_returns_200() { let config = Arc::new(NodeConfig::for_test()); let search_service = Arc::new(MockSearchService::new()); + let metastore_service = MetastoreServiceClient::mock(); let (universe, _temp_dir, ingest_service, _) = setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await; let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); - let elastic_api_handlers = - elastic_api_handlers(config, search_service, ingest_service, ingest_router); + let elastic_api_handlers = elastic_api_handlers( + config, + search_service, + ingest_service, + ingest_router, + metastore_service.into(), + ); let payload = r#" { "create" : { "_index" : "my-index-1", "_id" : "1"} } {"id": 1, "message": "push"} @@ -266,11 +291,17 @@ mod tests { async fn test_bulk_api_blocks_when_refresh_wait_for_is_specified() { let config = Arc::new(NodeConfig::for_test()); let search_service = Arc::new(MockSearchService::new()); + let metastore_service = MetastoreServiceClient::mock(); let (universe, _temp_dir, ingest_service, ingest_service_mailbox) = setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await; let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); - let elastic_api_handlers = - elastic_api_handlers(config, search_service, ingest_service, ingest_router); + let elastic_api_handlers = elastic_api_handlers( + config, + search_service, + ingest_service, + ingest_router, + metastore_service.into(), + ); let payload = r#" { "create" : { "_index" : "my-index-1", "_id" : "1"} } {"id": 1, "message": "push"} @@ -345,11 +376,17 @@ mod tests { async fn test_bulk_api_blocks_when_refresh_true_is_specified() { let config = Arc::new(NodeConfig::for_test()); let search_service = Arc::new(MockSearchService::new()); + let metastore_service = MetastoreServiceClient::mock(); let (universe, _temp_dir, ingest_service, ingest_service_mailbox) = setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await; let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); - let elastic_api_handlers = - elastic_api_handlers(config, search_service, ingest_service, ingest_router); + let elastic_api_handlers = elastic_api_handlers( + config, + search_service, + ingest_service, + ingest_router, + metastore_service.into(), + ); let payload = r#" { "create" : { "_index" : "my-index-1", "_id" : "1"} } {"id": 1, "message": "push"} @@ -423,10 +460,16 @@ mod tests { async fn test_bulk_ingest_request_returns_400_if_action_is_malformed() { let config = Arc::new(NodeConfig::for_test()); let search_service = Arc::new(MockSearchService::new()); + let metastore_service = MetastoreServiceClient::mock(); let ingest_service = IngestServiceClient::from(IngestServiceClient::mock()); let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock()); - let elastic_api_handlers = - elastic_api_handlers(config, search_service, ingest_service, ingest_router); + let elastic_api_handlers = elastic_api_handlers( + config, + search_service, + ingest_service, + ingest_router, + metastore_service.into(), + ); let payload = r#" {"create": {"_index": "my-index", "_id": "1"},} {"id": 1, "message": "my-doc"}"#; diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs index e20dc0e6b2f..47eb9754d35 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs @@ -171,6 +171,20 @@ pub(crate) fn elastic_index_count_filter( .and(json_or_empty()) } +// No support for any query parameters for now. +#[utoipa::path(get, tag = "Search", path = "/{index}/_stats")] +pub(crate) fn elastic_index_stats_filter( +) -> impl Filter,), Error = Rejection> + Clone { + warp::path!("_elastic" / String / "_stats") + .and_then(extract_index_id_patterns) + .and(warp::get().or(warp::post()).unify()) +} + +#[utoipa::path(get, tag = "Search", path = "/_stats")] +pub(crate) fn elastic_stats_filter() -> impl Filter + Clone { + warp::path!("_elastic" / "_stats").and(warp::get().or(warp::post()).unify()) +} + #[utoipa::path(get, tag = "Search", path = "/{index}/_search")] pub(crate) fn elastic_index_search_filter( ) -> impl Filter, SearchQueryParams, SearchBody), Error = Rejection> + Clone diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs index 356d20b89c2..808b1891971 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs @@ -31,6 +31,7 @@ use hyper::StatusCode; use quickwit_config::NodeConfig; use quickwit_ingest::IngestServiceClient; use quickwit_proto::ingest::router::IngestRouterServiceClient; +use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_search::SearchService; use rest_handler::{ es_compat_cluster_info_handler, es_compat_index_multi_search_handler, @@ -41,6 +42,7 @@ use warp::{Filter, Rejection}; use self::rest_handler::{ es_compat_index_count_handler, es_compat_index_field_capabilities_handler, + es_compat_index_stats_handler, es_compat_stats_handler, }; use crate::elasticsearch_api::model::ElasticsearchError; use crate::json_api_response::JsonApiResponse; @@ -55,6 +57,7 @@ pub fn elastic_api_handlers( search_service: Arc, ingest_service: IngestServiceClient, ingest_router: IngestRouterServiceClient, + metastore: MetastoreServiceClient, ) -> impl Filter + Clone { es_compat_cluster_info_handler(node_config, BuildInfo::get()) .or(es_compat_search_handler(search_service.clone())) @@ -70,6 +73,8 @@ pub fn elastic_api_handlers( ingest_router.clone(), )) .or(es_compat_index_bulk_handler(ingest_service, ingest_router)) + .or(es_compat_index_stats_handler(metastore.clone())) + .or(es_compat_stats_handler(metastore)) // Register newly created handlers here. } @@ -123,6 +128,7 @@ mod tests { use quickwit_config::NodeConfig; use quickwit_ingest::{IngestApiService, IngestServiceClient}; use quickwit_proto::ingest::router::IngestRouterServiceClient; + use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_search::MockSearchService; use serde_json::Value as JsonValue; use warp::Filter; @@ -163,6 +169,7 @@ mod tests { Arc::new(mock_search_service), ingest_service_client(), ingest_router, + MetastoreServiceClient::mock().into(), ); let msearch_payload = r#" {"index":"index-1"} @@ -213,6 +220,7 @@ mod tests { Arc::new(mock_search_service), ingest_service_client(), ingest_router, + MetastoreServiceClient::mock().into(), ); let msearch_payload = r#" {"index":"index-1"} @@ -251,6 +259,7 @@ mod tests { Arc::new(mock_search_service), ingest_service_client(), ingest_router, + MetastoreServiceClient::mock().into(), ); let msearch_payload = r#" {"index":"index-1" @@ -282,6 +291,7 @@ mod tests { Arc::new(mock_search_service), ingest_service_client(), ingest_router, + MetastoreServiceClient::mock().into(), ); let msearch_payload = r#" {"index":"index-1"} @@ -313,6 +323,7 @@ mod tests { Arc::new(mock_search_service), ingest_service_client(), ingest_router, + MetastoreServiceClient::mock().into(), ); let msearch_payload = r#" {"index":"index-1"} @@ -343,6 +354,7 @@ mod tests { Arc::new(mock_search_service), ingest_service_client(), ingest_router, + MetastoreServiceClient::mock().into(), ); let msearch_payload = r#" {} @@ -385,6 +397,7 @@ mod tests { Arc::new(mock_search_service), ingest_service_client(), ingest_router, + MetastoreServiceClient::mock().into(), ); let msearch_payload = r#" {"index": ["index-1", "index-2"]} diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs index 6438256e6c8..aba6f04dce9 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs @@ -25,6 +25,7 @@ mod multi_search; mod scroll; mod search_body; mod search_query_params; +mod stats; pub use bulk_body::BulkAction; pub use bulk_query_params::ElasticBulkOptions; @@ -41,6 +42,7 @@ pub use scroll::ScrollQueryParams; pub use search_body::SearchBody; pub use search_query_params::{SearchQueryParams, SearchQueryParamsCount}; use serde::{Deserialize, Serialize}; +pub use stats::{ElasticsearchStatsResponse, StatsResponseEntry}; #[derive(Debug, Clone, Eq, PartialEq)] pub struct SortField { diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/stats.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/stats.rs new file mode 100644 index 00000000000..af0a2c6e2df --- /dev/null +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/stats.rs @@ -0,0 +1,110 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashMap; +use std::ops::AddAssign; + +use quickwit_metastore::SplitMetadata; +use serde::{Deserialize, Serialize}; + +/// Returns JSON in the format: +/// +/// { +/// "_all": { +/// "primaries": { +/// "store": {"size_in_bytes": 123456789}, +/// "docs": {"count": 5000} +/// }, +/// "total": { +/// "segments": {"count": 100}, +/// "docs": {"count": 5000} +/// } +/// }, +/// "indices": { +/// "exampleIndex": { +/// "primaries": { +/// "store": {"size_in_bytes": 123456789}, +/// "docs": {"count": 5000} +/// }, +/// "total": { +/// "segments": {"count": 50}, +/// "docs": {"count": 5000} +/// } +/// } +/// } +/// } +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct ElasticsearchStatsResponse { + pub _all: StatsResponseEntry, + pub indices: HashMap, // String is Field name +} + +#[derive(Clone, Serialize, Deserialize, Debug, Default)] +pub struct StatsResponseEntry { + primaries: StatsPrimariesResponse, + total: StatsTotalResponse, +} + +impl AddAssign for StatsResponseEntry { + fn add_assign(&mut self, rhs: Self) { + self.primaries.store.size_in_bytes += rhs.primaries.store.size_in_bytes; + self.primaries.docs.count += rhs.primaries.docs.count; + self.total.segments.count += rhs.total.segments.count; + self.total.docs.count += rhs.total.docs.count; + } +} + +impl From for StatsResponseEntry { + fn from(split_metadata: SplitMetadata) -> Self { + let mut stats_response_entry = StatsResponseEntry::default(); + stats_response_entry.primaries.store.size_in_bytes = + split_metadata.as_split_info().file_size_bytes.as_u64(); + stats_response_entry.primaries.docs.count = split_metadata.num_docs as u64; + stats_response_entry.total.docs.count = split_metadata.num_docs as u64; + stats_response_entry.total.segments.count = 1; + stats_response_entry + } +} + +#[derive(Clone, Serialize, Deserialize, Debug, Default)] +pub struct StatsPrimariesResponse { + store: StatsStoreResponse, + docs: StatsDocsResponse, +} + +#[derive(Clone, Serialize, Deserialize, Debug, Default)] +pub struct StatsStoreResponse { + size_in_bytes: u64, +} + +#[derive(Clone, Serialize, Deserialize, Debug, Default)] +pub struct StatsDocsResponse { + count: u64, +} + +#[derive(Clone, Serialize, Deserialize, Debug, Default)] +pub struct StatsTotalResponse { + segments: StatsTotalSegmentsResponse, + docs: StatsDocsResponse, +} + +#[derive(Clone, Serialize, Deserialize, Debug, Default)] +pub struct StatsTotalSegmentsResponse { + count: u64, +} diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index 3be0a4549a7..01d85d76734 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -17,7 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::str::from_utf8; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -30,14 +30,17 @@ use hyper::StatusCode; use itertools::Itertools; use quickwit_common::truncate_str; use quickwit_config::{validate_index_id_pattern, NodeConfig}; +use quickwit_metastore::*; +use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::search::{ CountHits, ListFieldsResponse, PartialHit, ScrollRequest, SearchResponse, SortByValue, SortDatetimeFormat, }; +use quickwit_proto::types::IndexUid; use quickwit_proto::ServiceErrorCode; use quickwit_query::query_ast::{QueryAst, UserInputQuery}; use quickwit_query::BooleanOperand; -use quickwit_search::{SearchError, SearchService}; +use quickwit_search::{list_all_splits, resolve_index_patterns, SearchError, SearchService}; use serde::{Deserialize, Serialize}; use serde_json::json; use warp::{Filter, Rejection}; @@ -45,14 +48,15 @@ use warp::{Filter, Rejection}; use super::filter::{ elastic_cluster_info_filter, elastic_field_capabilities_filter, elastic_index_count_filter, elastic_index_field_capabilities_filter, elastic_index_search_filter, - elastic_multi_search_filter, elastic_scroll_filter, elasticsearch_filter, + elastic_index_stats_filter, elastic_multi_search_filter, elastic_scroll_filter, + elastic_stats_filter, elasticsearch_filter, }; use super::model::{ build_list_field_request_for_es_api, convert_to_es_field_capabilities_response, - ElasticsearchError, FieldCapabilityQueryParams, FieldCapabilityRequestBody, - FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, - MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams, - SearchQueryParamsCount, + ElasticsearchError, ElasticsearchStatsResponse, FieldCapabilityQueryParams, + FieldCapabilityRequestBody, FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, + MultiSearchResponse, MultiSearchSingleResponse, ScrollQueryParams, SearchBody, + SearchQueryParams, SearchQueryParamsCount, StatsResponseEntry, }; use super::{make_elastic_api_response, TrackTotalHits}; use crate::format::BodyFormat; @@ -111,6 +115,25 @@ pub fn es_compat_index_field_capabilities_handler( .map(|result| make_elastic_api_response(result, BodyFormat::default())) } +/// GET or POST _elastic/_stats +pub fn es_compat_stats_handler( + search_service: MetastoreServiceClient, +) -> impl Filter + Clone { + elastic_stats_filter() + .and(with_arg(search_service)) + .then(es_compat_stats) + .map(|result| make_elastic_api_response(result, BodyFormat::default())) +} +/// GET or POST _elastic/{index}/_stats +pub fn es_compat_index_stats_handler( + search_service: MetastoreServiceClient, +) -> impl Filter + Clone { + elastic_index_stats_filter() + .and(with_arg(search_service)) + .then(es_compat_index_stats) + .map(|result| make_elastic_api_response(result, BodyFormat::default())) +} + /// GET or POST _elastic/{index}/_search pub fn es_compat_index_search_handler( search_service: Arc, @@ -337,6 +360,36 @@ async fn es_compat_index_search( Ok(search_response_rest) } +async fn es_compat_stats( + metastore: MetastoreServiceClient, +) -> Result { + es_compat_index_stats(vec!["*".to_string()], metastore).await +} + +async fn es_compat_index_stats( + index_id_patterns: Vec, + mut metastore: MetastoreServiceClient, +) -> Result { + let indexes_metadata = resolve_index_patterns(index_id_patterns, &mut metastore).await?; + // Index id to index uid mapping + let index_uid_to_index_id: HashMap = indexes_metadata + .iter() + .map(|metadata| (metadata.index_uid.clone(), metadata.index_id().to_owned())) + .collect(); + + let index_uids = indexes_metadata + .into_iter() + .map(|index_metadata| index_metadata.index_uid) + .collect_vec(); + // calling into the search module is not necessary, but reuses established patterns + let splits_metadata = list_all_splits(index_uids, &mut metastore).await?; + + let search_response_rest: ElasticsearchStatsResponse = + convert_to_es_stats_response(index_uid_to_index_id, splits_metadata); + + Ok(search_response_rest) +} + async fn es_compat_index_field_capabilities( index_id_patterns: Vec, search_params: FieldCapabilityQueryParams, @@ -498,6 +551,33 @@ async fn es_scroll( Ok(search_response_rest) } +fn convert_to_es_stats_response( + index_uid_to_index_id: HashMap, + splits: Vec, +) -> ElasticsearchStatsResponse { + let mut _all = StatsResponseEntry::default(); + let mut per_index: HashMap = HashMap::new(); + + for split_metadata in splits { + let index_id = index_uid_to_index_id + .get(&split_metadata.index_uid) + .unwrap_or_else(|| { + panic!( + "index_uid {} not found in index_uid_to_index_id", + split_metadata.index_uid + ) + }); + let stats_entry: StatsResponseEntry = split_metadata.into(); + _all += stats_entry.clone(); + let index_stats_entry = per_index.entry(index_id.to_owned()).or_default(); + *index_stats_entry += stats_entry.clone(); + } + ElasticsearchStatsResponse { + _all, + indices: per_index, + } +} + fn convert_to_es_search_response( resp: SearchResponse, append_shard_doc: bool, diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 3e68614181e..508d03ce263 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -204,6 +204,7 @@ fn api_v1_routes( quickwit_services.search_service.clone(), quickwit_services.ingest_service.clone(), quickwit_services.ingest_router_service.clone(), + quickwit_services.metastore_client.clone(), )), ) } diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0020-stats.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0020-stats.yaml new file mode 100644 index 00000000000..e91d3e4d5f0 --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0020-stats.yaml @@ -0,0 +1,67 @@ +endpoint: "gharchive/_stats" +expected: + _all: + primaries: + docs: + count: 100 + total: + segments: + count: 1 + docs: + count: 100 + indices: + gharchive: + primaries: + docs: + count: 100 + total: + segments: + count: 1 + docs: + count: 100 +--- +endpoint: "ghar*/_stats" +expected: + _all: + primaries: + docs: + count: 100 + total: + segments: + count: 1 + docs: + count: 100 + indices: + gharchive: + primaries: + docs: + count: 100 + total: + segments: + count: 1 + docs: + count: 100 +--- +endpoint: "_stats" +expected: + _all: + primaries: + docs: + count: 100 + total: + segments: + count: 1 + docs: + count: 100 + indices: + gharchive: + primaries: + docs: + count: 100 + total: + segments: + count: 1 + docs: + count: 100 + +