From 99a92dd1efac7efe6f2d3cf46e48e85cd54432b3 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Tue, 30 Jan 2024 05:46:50 +0100 Subject: [PATCH] add cat indices API (#4465) * add cat indices API * add AddAssign for Status * handle status, add index pattern test --- .../src/elasticsearch_api/filter.rs | 23 +- .../src/elasticsearch_api/mod.rs | 5 +- .../elasticsearch_api/model/cat_indices.rs | 285 ++++++++++++++++++ .../src/elasticsearch_api/model/mod.rs | 2 + .../src/elasticsearch_api/rest_handler.rs | 113 ++++++- .../es_compatibility/0021-cat-indices.yaml | 89 ++++++ 6 files changed, 506 insertions(+), 11 deletions(-) create mode 100644 quickwit/quickwit-serve/src/elasticsearch_api/model/cat_indices.rs create mode 100644 quickwit/rest-api-tests/scenarii/es_compatibility/0021-cat-indices.yaml diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs index bc383d155c5..4ea8e12e757 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs @@ -24,8 +24,8 @@ use warp::reject::LengthRequired; use warp::{Filter, Rejection}; use super::model::{ - FieldCapabilityQueryParams, FieldCapabilityRequestBody, MultiSearchQueryParams, - SearchQueryParamsCount, + CatIndexQueryParams, FieldCapabilityQueryParams, FieldCapabilityRequestBody, + MultiSearchQueryParams, SearchQueryParamsCount, }; use crate::elasticsearch_api::model::{ ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams, @@ -177,7 +177,7 @@ pub(crate) fn elastic_index_stats_filter( ) -> impl Filter,), Error = Rejection> + Clone { warp::path!("_elastic" / String / "_stats") .and_then(extract_index_id_patterns) - .and(warp::get().or(warp::post()).unify()) + .and(warp::get()) } #[utoipa::path(get, tag = "Search", path = "/_stats")] @@ -185,6 +185,23 @@ pub(crate) fn elastic_stats_filter() -> impl Filter impl Filter, CatIndexQueryParams), Error = Rejection> + Clone { + warp::path!("_elastic" / "_cat" / "indices" / String) + .and_then(extract_index_id_patterns) + .and(warp::get()) + .and(serde_qs::warp::query(serde_qs::Config::default())) +} + +#[utoipa::path(get, tag = "Search", path = "/_cat/indices")] +pub(crate) fn elastic_cat_indices_filter( +) -> impl Filter + Clone { + warp::path!("_elastic" / "_cat" / "indices") + .and(warp::get()) + .and(serde_qs::warp::query(serde_qs::Config::default())) +} + #[utoipa::path(get, tag = "Search", path = "/{index}/_search")] pub(crate) fn elastic_index_search_filter( ) -> impl Filter, SearchQueryParams, SearchBody), Error = Rejection> + Clone diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs index 808b1891971..5b846cb27fb 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs @@ -41,6 +41,7 @@ use serde::{Deserialize, Serialize}; use warp::{Filter, Rejection}; use self::rest_handler::{ + es_compat_cat_indices_handler, es_compat_index_cat_indices_handler, es_compat_index_count_handler, es_compat_index_field_capabilities_handler, es_compat_index_stats_handler, es_compat_stats_handler, }; @@ -74,7 +75,9 @@ pub fn elastic_api_handlers( )) .or(es_compat_index_bulk_handler(ingest_service, ingest_router)) .or(es_compat_index_stats_handler(metastore.clone())) - .or(es_compat_stats_handler(metastore)) + .or(es_compat_stats_handler(metastore.clone())) + .or(es_compat_index_cat_indices_handler(metastore.clone())) + .or(es_compat_cat_indices_handler(metastore.clone())) // Register newly created handlers here. } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/cat_indices.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/cat_indices.rs new file mode 100644 index 00000000000..e7b98166d97 --- /dev/null +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/cat_indices.rs @@ -0,0 +1,285 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashSet; +use std::ops::AddAssign; + +use hyper::StatusCode; +use quickwit_metastore::SplitMetadata; +use serde::{Deserialize, Serialize, Serializer}; + +use super::ElasticsearchError; +use crate::simple_list::{from_simple_list, to_simple_list}; + +#[serde_with::skip_serializing_none] +#[derive(Default, Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct CatIndexQueryParams { + #[serde(default)] + /// Only JSON supported for now. + pub format: Option, + /// Comma-separated list of column names to display. + #[serde(serialize_with = "to_simple_list")] + #[serde(deserialize_with = "from_simple_list")] + #[serde(default)] + pub h: Option>, + #[serde(default)] + /// Filter for health: green, yellow, or red + pub health: Option, + /// Unit used to display byte values. + /// Unsupported for now. + #[serde(default)] + pub bytes: Option, + /// Comma-separated list of column names or column aliases used to sort the response. + /// Unsupported for now. + #[serde(serialize_with = "to_simple_list")] + #[serde(deserialize_with = "from_simple_list")] + #[serde(default)] + pub s: Option>, + /// If true, the response includes column headings. Defaults to false. + /// Unsupported for now. + #[serde(default)] + pub v: Option, +} +impl CatIndexQueryParams { + pub fn validate(&self) -> Result<(), ElasticsearchError> { + if let Some(format) = &self.format { + if format.to_lowercase() != "json" { + return Err(ElasticsearchError::new( + StatusCode::BAD_REQUEST, + format!( + "Format {:?} is not supported. Only format=json is supported.", + format + ), + )); + } + } else { + return Err(ElasticsearchError::new( + StatusCode::BAD_REQUEST, + "Only format=json is supported.".to_string(), + )); + } + let unsupported_parameter_error = |field: &str| { + ElasticsearchError::new( + StatusCode::BAD_REQUEST, + format!("Parameter {:?} is not supported.", field), + ) + }; + if self.bytes.is_some() { + return Err(unsupported_parameter_error("bytes")); + } + if self.v.is_some() { + return Err(unsupported_parameter_error("v")); + } + if self.s.is_some() { + return Err(unsupported_parameter_error("s")); + } + + Ok(()) + } +} + +#[derive(Debug, Clone, Default, Serialize)] +pub struct ElasticsearchCatIndexResponse { + pub health: Health, + status: Status, + pub index: String, + uuid: String, + pri: String, + rep: String, + #[serde(rename = "docs.count", serialize_with = "serialize_u64_as_string")] + docs_count: u64, + #[serde(rename = "docs.deleted", serialize_with = "serialize_u64_as_string")] + docs_deleted: u64, + #[serde(rename = "store.size", serialize_with = "ser_es_format")] + store_size: u64, + #[serde(rename = "pri.store.size", serialize_with = "ser_es_format")] + pri_store_size: u64, + #[serde(rename = "dataset.size", serialize_with = "ser_es_format")] + dataset_size: u64, +} + +impl ElasticsearchCatIndexResponse { + pub fn serialize_filtered( + &self, + fields: &Option>, + ) -> serde_json::Result { + let mut value = serde_json::to_value(self)?; + + if let Some(fields) = fields { + let fields: HashSet = fields.iter().cloned().collect(); + // If fields are specified, retain only those fields + if let serde_json::Value::Object(ref mut map) = value { + map.retain(|key, _| fields.contains(key)); + } + } + + Ok(value) + } +} +impl AddAssign for ElasticsearchCatIndexResponse { + fn add_assign(&mut self, rhs: Self) { + // pri and rep are always 1, so we can just overwrite them + self.pri = rhs.pri; + self.rep = rhs.rep; + // Set index, since this may be a default entry + self.index = rhs.index; + self.uuid = rhs.uuid; + + self.health += rhs.health; + self.status += rhs.status; + self.docs_count += rhs.docs_count; + self.docs_deleted += rhs.docs_deleted; + self.store_size += rhs.store_size; + self.pri_store_size += rhs.pri_store_size; + self.dataset_size += rhs.dataset_size; + } +} + +impl From for ElasticsearchCatIndexResponse { + fn from(split_metadata: SplitMetadata) -> Self { + ElasticsearchCatIndexResponse { + store_size: split_metadata.as_split_info().file_size_bytes.as_u64(), + pri_store_size: split_metadata.as_split_info().file_size_bytes.as_u64(), + dataset_size: split_metadata + .as_split_info() + .uncompressed_docs_size_bytes + .as_u64(), + uuid: split_metadata.index_uid.to_string(), + pri: "1".to_string(), + rep: "1".to_string(), + docs_count: split_metadata.as_split_info().num_docs as u64, + ..Default::default() + } + } +} + +fn serialize_u64_as_string(value: &u64, serializer: S) -> Result +where S: Serializer { + serializer.serialize_str(&value.to_string()) +} + +fn ser_es_format(bytes: &u64, serializer: S) -> Result +where S: Serializer { + serializer.serialize_str(&format_byte_size(*bytes)) +} + +fn format_byte_size(bytes: u64) -> String { + const KILOBYTE: u64 = 1024; + const MEGABYTE: u64 = KILOBYTE * 1024; + const GIGABYTE: u64 = MEGABYTE * 1024; + const TERABYTE: u64 = GIGABYTE * 1024; + if bytes < KILOBYTE { + format!("{}b", bytes) + } else if bytes < MEGABYTE { + format!("{:.1}kb", bytes as f64 / KILOBYTE as f64) + } else if bytes < GIGABYTE { + format!("{:.1}mb", bytes as f64 / MEGABYTE as f64) + } else if bytes < TERABYTE { + format!("{:.1}gb", bytes as f64 / GIGABYTE as f64) + } else { + format!("{:.1}tb", bytes as f64 / TERABYTE as f64) + } +} + +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum Health { + #[default] + Green = 1, + Yellow = 2, + Red = 3, +} +impl AddAssign for Health { + fn add_assign(&mut self, other: Self) { + *self = match std::cmp::max(*self as u8, other as u8) { + 1 => Health::Green, + 2 => Health::Yellow, + _ => Health::Red, + }; + } +} + +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Status { + #[default] + Open = 1, +} +impl AddAssign for Status { + fn add_assign(&mut self, other: Self) { + *self = match std::cmp::max(*self as u8, other as u8) { + 1 => Status::Open, + _ => Status::Open, + }; + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + + #[test] + fn test_serialize_filtered() { + let response = ElasticsearchCatIndexResponse { + health: Health::Green, + status: Status::Open, + index: "test_index".to_string(), + uuid: "test_uuid".to_string(), + pri: "1".to_string(), + rep: "2".to_string(), + docs_count: 100, + docs_deleted: 10, + store_size: 1000, + pri_store_size: 500, + dataset_size: 1500, + }; + + // Test serialization with all fields + let all_fields = response.serialize_filtered(&None).unwrap(); + let expected_all_fields = json!({ + "health": "green", + "status": "open", + "index": "test_index", + "uuid": "test_uuid", + "pri": "1", + "rep": "2", + "docs.count": "100", + "docs.deleted": "10", + "store.size": "1000b", // Assuming ser_es_format formats size to kb + "pri.store.size": "500b", // Example format + "dataset.size": "1.5kb", // Example format + }); + assert_eq!(all_fields, expected_all_fields); + + // Test serialization with selected fields + let selected_fields = response + .serialize_filtered(&Some(vec!["index".to_string(), "uuid".to_string()])) + .unwrap(); + let expected_selected_fields = json!({ + "index": "test_index", + "uuid": "test_uuid" + }); + assert_eq!(selected_fields, expected_selected_fields); + + // Add more test cases as needed + } +} diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs index aba6f04dce9..694e9fc74c5 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs @@ -19,6 +19,7 @@ mod bulk_body; mod bulk_query_params; +mod cat_indices; mod error; mod field_capability; mod multi_search; @@ -29,6 +30,7 @@ mod stats; pub use bulk_body::BulkAction; pub use bulk_query_params::ElasticBulkOptions; +pub use cat_indices::{CatIndexQueryParams, ElasticsearchCatIndexResponse}; pub use error::ElasticsearchError; pub use field_capability::{ build_list_field_request_for_es_api, convert_to_es_field_capabilities_response, diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index 87a7c0043b3..c2201674422 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -46,17 +46,19 @@ use serde_json::json; use warp::{Filter, Rejection}; use super::filter::{ - elastic_cluster_info_filter, elastic_field_capabilities_filter, elastic_index_count_filter, + elastic_cat_indices_filter, elastic_cluster_info_filter, elastic_field_capabilities_filter, + elastic_index_cat_indices_filter, elastic_index_count_filter, elastic_index_field_capabilities_filter, elastic_index_search_filter, elastic_index_stats_filter, elastic_multi_search_filter, elastic_scroll_filter, elastic_stats_filter, elasticsearch_filter, }; use super::model::{ build_list_field_request_for_es_api, convert_to_es_field_capabilities_response, - ElasticsearchError, ElasticsearchStatsResponse, FieldCapabilityQueryParams, - FieldCapabilityRequestBody, FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, - MultiSearchResponse, MultiSearchSingleResponse, ScrollQueryParams, SearchBody, - SearchQueryParams, SearchQueryParamsCount, StatsResponseEntry, + CatIndexQueryParams, ElasticsearchCatIndexResponse, ElasticsearchError, + ElasticsearchStatsResponse, FieldCapabilityQueryParams, FieldCapabilityRequestBody, + FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, + MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams, + SearchQueryParamsCount, StatsResponseEntry, }; use super::{make_elastic_api_response, TrackTotalHits}; use crate::format::BodyFormat; @@ -115,7 +117,7 @@ pub fn es_compat_index_field_capabilities_handler( .map(|result| make_elastic_api_response(result, BodyFormat::default())) } -/// GET or POST _elastic/_stats +/// GET _elastic/_stats pub fn es_compat_stats_handler( search_service: MetastoreServiceClient, ) -> impl Filter + Clone { @@ -124,7 +126,7 @@ pub fn es_compat_stats_handler( .then(es_compat_stats) .map(|result| make_elastic_api_response(result, BodyFormat::default())) } -/// GET or POST _elastic/{index}/_stats +/// GET _elastic/{index}/_stats pub fn es_compat_index_stats_handler( search_service: MetastoreServiceClient, ) -> impl Filter + Clone { @@ -134,6 +136,26 @@ pub fn es_compat_index_stats_handler( .map(|result| make_elastic_api_response(result, BodyFormat::default())) } +/// GET _elastic/_cat/indices +pub fn es_compat_cat_indices_handler( + search_service: MetastoreServiceClient, +) -> impl Filter + Clone { + elastic_cat_indices_filter() + .and(with_arg(search_service)) + .then(es_compat_cat_indices) + .map(|result| make_elastic_api_response(result, BodyFormat::default())) +} + +/// GET _elastic/_cat/indices/{index} +pub fn es_compat_index_cat_indices_handler( + search_service: MetastoreServiceClient, +) -> impl Filter + Clone { + elastic_index_cat_indices_filter() + .and(with_arg(search_service)) + .then(es_compat_index_cat_indices) + .map(|result| make_elastic_api_response(result, BodyFormat::default())) +} + /// GET or POST _elastic/{index}/_search pub fn es_compat_index_search_handler( search_service: Arc, @@ -390,6 +412,57 @@ async fn es_compat_index_stats( Ok(search_response_rest) } +async fn es_compat_cat_indices( + query_params: CatIndexQueryParams, + metastore: MetastoreServiceClient, +) -> Result, ElasticsearchError> { + es_compat_index_cat_indices(vec!["*".to_string()], query_params, metastore).await +} + +async fn es_compat_index_cat_indices( + index_id_patterns: Vec, + query_params: CatIndexQueryParams, + mut metastore: MetastoreServiceClient, +) -> Result, ElasticsearchError> { + query_params.validate()?; + let indexes_metadata = resolve_index_patterns(&index_id_patterns, &mut metastore).await?; + // Index id to index uid mapping + let index_uid_to_index_id: HashMap = indexes_metadata + .iter() + .map(|metadata| (metadata.index_uid.clone(), metadata.index_id().to_owned())) + .collect(); + + let index_uids = indexes_metadata + .into_iter() + .map(|index_metadata| index_metadata.index_uid) + .collect_vec(); + // calling into the search module is not necessary, but reuses established patterns + let splits_metadata = list_all_splits(index_uids, &mut metastore).await?; + + let search_response_rest: Vec = + convert_to_es_cat_indices_response(index_uid_to_index_id, splits_metadata); + + let search_response_rest = search_response_rest + .into_iter() + .filter(|resp| { + if let Some(health) = query_params.health { + resp.health == health + } else { + true + } + }) + .map(|cat_index| cat_index.serialize_filtered(&query_params.h)) + .collect::, serde_json::Error>>() + .map_err(|serde_errror| { + ElasticsearchError::new( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to serialize cat indices response: {}", serde_errror), + ) + })?; + + Ok(search_response_rest) +} + async fn es_compat_index_field_capabilities( index_id_patterns: Vec, search_params: FieldCapabilityQueryParams, @@ -551,6 +624,32 @@ async fn es_scroll( Ok(search_response_rest) } +fn convert_to_es_cat_indices_response( + index_uid_to_index_id: HashMap, + splits: Vec, +) -> Vec { + let mut per_index: HashMap = HashMap::new(); + + for split_metadata in splits { + let index_id = index_uid_to_index_id + .get(&split_metadata.index_uid) + .unwrap_or_else(|| { + panic!( + "index_uid {} not found in index_uid_to_index_id", + split_metadata.index_uid + ) + }); + let mut cat_index_entry: ElasticsearchCatIndexResponse = split_metadata.into(); + cat_index_entry.index = index_id.to_owned(); + + let index_stats_entry = per_index.entry(index_id.to_owned()).or_default(); + *index_stats_entry += cat_index_entry.clone(); + } + let indices: Vec = per_index.values().cloned().collect(); + + indices +} + fn convert_to_es_stats_response( index_uid_to_index_id: HashMap, splits: Vec, diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0021-cat-indices.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0021-cat-indices.yaml new file mode 100644 index 00000000000..b56f6c30053 --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0021-cat-indices.yaml @@ -0,0 +1,89 @@ +method: [GET] +engines: + - quickwit +endpoint: "_cat/indices?format=json" +expected: +- dataset.size: 222.8kb + docs.count: '100' + docs.deleted: '0' + health: green + index: gharchive + pri: '1' + pri.store.size: 271.8kb + rep: '1' + status: open + store.size: 271.8kb + #uuid: gharchive:01HN2SDANHDN6WFAFNH7BBMQ8C +--- +method: [GET] +engines: + - quickwit +endpoint: "_cat/indices/gharchive?format=json" +expected: +- dataset.size: 222.8kb + docs.count: '100' + docs.deleted: '0' + health: green + index: gharchive + pri: '1' + pri.store.size: 271.8kb + rep: '1' + status: open + store.size: 271.8kb + #uuid: gharchive:01HN2SDANHDN6WFAFNH7BBMQ8C +--- +method: [GET] +engines: + - quickwit + - elasticsearch +endpoint: "_cat/indices/gharchive?format=json&h=docs.count,index" +expected: +- docs.count: '100' + index: gharchive +--- # Wildcard test +method: [GET] +engines: + - quickwit + - elasticsearch +endpoint: "_cat/indices/gharc*?format=json&h=docs.count,index" +expected: +- docs.count: '100' + index: gharchive +--- # health green test +method: [GET] +engines: + - quickwit + - elasticsearch +endpoint: "_cat/indices/gharchive?format=json&health=green" +expected: +- docs.count: '100' + index: gharchive +--- # health red test +method: [GET] +engines: + - quickwit + - elasticsearch +endpoint: "_cat/indices/gharchive?format=json&health=red" +expected: [] +--- +method: [GET] +engines: + - quickwit + - elasticsearch +endpoint: "_cat/indices/gharchive" # missing format=json +status_code: 400 +--- +method: [GET] +engines: + - quickwit + - elasticsearch +endpoint: "_cat/indices/gharchive?format=json&v=true" # invalid h=true +status_code: 400 +--- +method: [GET] +engines: + - quickwit + - elasticsearch +endpoint: "_cat/indices/gharchive?format=json&b=b" # unsupported bytes parameter +status_code: 400 +