Skip to content

Commit

Permalink
Add cluster info endpoint for /_elastic (#3594)
Browse files Browse the repository at this point in the history
* implements elastic compatible cluster info endpoint

* cargo fmt

* remove commented code

* mounting all es endpoint at the same location

* added es_compat test for cluster info endpoint
  • Loading branch information
evanxg852000 authored Jul 5, 2023
1 parent 50a6e71 commit ae768bf
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 29 deletions.
4 changes: 2 additions & 2 deletions quickwit/quickwit-serve/src/build_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use once_cell::sync::OnceCell;
use quickwit_common::runtimes::RuntimesConfig;
use serde::Serialize;

#[derive(Debug, Eq, PartialEq, Serialize)]
#[derive(Debug, Eq, PartialEq, Serialize, utoipa::ToSchema)]
pub struct BuildInfo {
pub build_date: &'static str,
pub build_profile: &'static str,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl BuildInfo {
}
}

#[derive(Debug, Eq, PartialEq, Serialize)]
#[derive(Debug, Eq, PartialEq, Serialize, utoipa::ToSchema)]
pub struct RuntimeInfo {
pub num_cpus_logical: usize,
pub num_cpus_physical: usize,
Expand Down
20 changes: 13 additions & 7 deletions quickwit/quickwit-serve/src/elastic_search_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;

use quickwit_config::IngestApiConfig;
use quickwit_config::{IngestApiConfig, QuickwitConfig};
use quickwit_ingest::{
FetchRequest, IngestResponse, IngestServiceClient, SuggestTruncateRequest,
};
Expand All @@ -142,10 +142,11 @@ mod tests {

#[tokio::test]
async fn test_bulk_api_returns_404_if_index_id_does_not_exist() {
let config = Arc::new(QuickwitConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let elastic_api_handlers = elastic_api_handlers(search_service, ingest_service);
let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service);
let payload = r#"
{ "create" : { "_index" : "my-index", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand All @@ -163,10 +164,11 @@ mod tests {

#[tokio::test]
async fn test_bulk_api_returns_200() {
let config = Arc::new(QuickwitConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let elastic_api_handlers = elastic_api_handlers(search_service, ingest_service);
let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand All @@ -188,10 +190,11 @@ mod tests {

#[tokio::test]
async fn test_bulk_index_api_returns_200() {
let config = Arc::new(QuickwitConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let elastic_api_handlers = elastic_api_handlers(search_service, ingest_service);
let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand All @@ -213,10 +216,11 @@ mod tests {

#[tokio::test]
async fn test_bulk_api_blocks_when_refresh_wait_for_is_specified() {
let config = Arc::new(QuickwitConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let (universe, _temp_dir, ingest_service, ingest_service_mailbox) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let elastic_api_handlers = elastic_api_handlers(search_service, ingest_service);
let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand Down Expand Up @@ -289,10 +293,11 @@ mod tests {

#[tokio::test]
async fn test_bulk_api_blocks_when_refresh_true_is_specified() {
let config = Arc::new(QuickwitConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let (universe, _temp_dir, ingest_service, ingest_service_mailbox) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let elastic_api_handlers = elastic_api_handlers(search_service, ingest_service);
let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand Down Expand Up @@ -364,9 +369,10 @@ mod tests {

#[tokio::test]
async fn test_bulk_ingest_request_returns_400_if_action_is_malformed() {
let config = Arc::new(QuickwitConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let ingest_service = IngestServiceClient::new(IngestServiceClient::mock());
let elastic_api_handlers = elastic_api_handlers(search_service, ingest_service);
let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service);
let payload = r#"
{"create": {"_index": "my-index", "_id": "1"},}
{"id": 1, "message": "my-doc"}"#;
Expand Down
14 changes: 14 additions & 0 deletions quickwit/quickwit-serve/src/elastic_search_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ use crate::elastic_search_api::model::{ElasticIngestOptions, SearchBody, SearchQ
const BODY_LENGTH_LIMIT: Byte = byte_unit::Byte::from_bytes(1_000_000);
const CONTENT_LENGTH_LIMIT: Byte = byte_unit::Byte::from_bytes(10 * 1024 * 1024); // 10MiB

// TODO: Make all elastic endpoint models `utoipa` compatible
// and register them here.
#[derive(utoipa::OpenApi)]
#[openapi(paths(elastic_cluster_info_filter,))]
pub struct ElasticCompatibleApi;

#[utoipa::path(get, tag = "Cluster Info", path = "/_elastic")]
pub(crate) fn elastic_cluster_info_filter() -> impl Filter<Extract = (), Error = Rejection> + Clone
{
warp::path!("_elastic")
.and(warp::get())
.and(warp::path::end())
}

#[utoipa::path(get, tag = "Search", path = "/_search")]
pub(crate) fn elastic_search_filter(
) -> impl Filter<Extract = (SearchQueryParams,), Error = Rejection> + Clone {
Expand Down
99 changes: 83 additions & 16 deletions quickwit/quickwit-serve/src/elastic_search_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,30 @@ mod rest_handler;
use std::sync::Arc;

use bulk::{es_compat_bulk_handler, es_compat_index_bulk_handler};
pub use filter::ElasticCompatibleApi;
use quickwit_config::QuickwitConfig;
use quickwit_ingest::IngestServiceClient;
use quickwit_search::SearchService;
use rest_handler::{
es_compat_index_multi_search_handler, es_compat_index_search_handler, es_compat_search_handler,
es_compat_cluster_info_handler, es_compat_index_multi_search_handler,
es_compat_index_search_handler, es_compat_search_handler,
};
use serde::{Deserialize, Serialize};
use warp::{Filter, Rejection};

use crate::BuildInfo;

/// Setup Elasticsearch API handlers
///
/// This is where all newly supported Elasticsearch handlers
/// should be registered.
pub fn elastic_api_handlers(
quickwit_config: Arc<QuickwitConfig>,
search_service: Arc<dyn SearchService>,
ingest_service: IngestServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
es_compat_search_handler(search_service.clone())
es_compat_cluster_info_handler(quickwit_config, BuildInfo::get())
.or(es_compat_search_handler(search_service.clone()))
.or(es_compat_index_search_handler(search_service.clone()))
.or(es_compat_index_multi_search_handler(search_service))
.or(es_compat_bulk_handler(ingest_service.clone()))
Expand Down Expand Up @@ -82,12 +89,19 @@ impl From<i64> for TrackTotalHits {
mod tests {
use std::sync::Arc;

use assert_json_diff::assert_json_include;
use mockall::predicate;
use quickwit_config::QuickwitConfig;
use quickwit_ingest::{IngestApiService, IngestServiceClient};
use quickwit_search::MockSearchService;
use serde_json::Value as JsonValue;
use warp::Filter;

use super::model::ElasticSearchError;
use crate::elastic_search_api::model::MultiSearchResponse;
use crate::elastic_search_api::rest_handler::es_compat_cluster_info_handler;
use crate::rest::recover_fn;
use crate::BuildInfo;

fn ingest_service_client() -> IngestServiceClient {
let universe = quickwit_actors::Universe::new();
Expand All @@ -97,6 +111,7 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_200_responses() {
let config = Arc::new(QuickwitConfig::for_test());
let mut mock_search_service = MockSearchService::new();
mock_search_service
.expect_root_search()
Expand All @@ -111,8 +126,11 @@ mod tests {
},
))
.returning(|_| Ok(Default::default()));
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{"index":"index-1"}
{"query":{"query_string":{"query":"test"}}, "from": 5, "size": 20}
Expand All @@ -138,6 +156,7 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_one_500_and_one_200_responses() {
let config = Arc::new(QuickwitConfig::for_test());
let mut mock_search_service = MockSearchService::new();
mock_search_service
.expect_root_search()
Expand All @@ -150,8 +169,11 @@ mod tests {
))
}
});
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{"index":"index-1"}
{"query":{"query_string":{"query":"test"}}, "from": 5, "size": 10}
Expand Down Expand Up @@ -180,9 +202,13 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_400_with_malformed_request_header() {
let config = Arc::new(QuickwitConfig::for_test());
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{"index":"index-1"
{"query":{"query_string":{"query":"test"}}}
Expand All @@ -204,9 +230,13 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_400_with_malformed_request_body() {
let config = Arc::new(QuickwitConfig::for_test());
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{"index":"index-1"}
{"query":{"query_string":{"bad":"test"}}}
Expand All @@ -228,9 +258,13 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_400_with_only_a_header_request() {
let config = Arc::new(QuickwitConfig::for_test());
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{"index":"index-1"}
"#;
Expand All @@ -251,9 +285,13 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_400_with_no_index() {
let config = Arc::new(QuickwitConfig::for_test());
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{}
{"query":{"query_string":{"bad":"test"}}}
Expand All @@ -273,9 +311,13 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_400_with_multiple_indexes() {
let config = Arc::new(QuickwitConfig::for_test());
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{"index": ["index-1", "index-2"]}
{"query":{"query_string":{"bad":"test"}}}
Expand All @@ -294,4 +336,29 @@ mod tests {
.unwrap()
.starts_with("Invalid argument: Searching only one index is supported for now."));
}

#[tokio::test]
async fn test_es_compat_cluster_info_handler() {
let build_info = BuildInfo::get();
let config = Arc::new(QuickwitConfig::for_test());
let handler =
es_compat_cluster_info_handler(config.clone(), build_info).recover(recover_fn);
let resp = warp::test::request()
.path("/_elastic")
.reply(&handler)
.await;
assert_eq!(resp.status(), 200);
let resp_json: JsonValue = serde_json::from_slice(resp.body()).unwrap();
let expected_response_json = serde_json::json!({
"name" : config.node_id,
"cluster_name" : config.cluster_id,
"version" : {
"distribution" : "quickwit",
"number" : build_info.version,
"build_hash" : build_info.commit_hash,
"build_date" : build_info.build_date,
}
});
assert_json_include!(actual: resp_json, expected: expected_response_json);
}
}
36 changes: 32 additions & 4 deletions quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,56 @@ use futures_util::StreamExt;
use hyper::StatusCode;
use itertools::Itertools;
use quickwit_common::truncate_str;
use quickwit_config::QuickwitConfig;
use quickwit_proto::{SearchResponse, ServiceErrorCode};
use quickwit_query::query_ast::{QueryAst, UserInputQuery};
use quickwit_query::BooleanOperand;
use quickwit_search::{SearchError, SearchService};
use serde_json::json;
use warp::{Filter, Rejection};

use super::filter::elastic_multi_search_filter;
use super::filter::{
elastic_cluster_info_filter, elastic_index_search_filter, elastic_multi_search_filter,
elastic_search_filter,
};
use super::model::{
ElasticSearchError, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse,
MultiSearchSingleResponse, SearchBody, SearchQueryParams,
};
use crate::elastic_search_api::filter::elastic_index_search_filter;
use crate::elastic_search_api::model::SortField;
use crate::format::BodyFormat;
use crate::json_api_response::{make_json_api_response, ApiError, JsonApiResponse};
use crate::with_arg;
use crate::{with_arg, BuildInfo};

/// Elastic compatible cluster info handler.
pub fn es_compat_cluster_info_handler(
quickwit_config: Arc<QuickwitConfig>,
build_info: &'static BuildInfo,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_cluster_info_filter()
.and(with_arg(quickwit_config))
.and(with_arg(build_info))
.then(
|config: Arc<QuickwitConfig>, build_info: &'static BuildInfo| async move {
warp::reply::json(&json!({
"name" : config.node_id,
"cluster_name" : config.cluster_id,
"version" : {
"distribution" : "quickwit",
"number" : build_info.version,
"build_hash" : build_info.commit_hash,
"build_date" : build_info.build_date,
}
}))
},
)
}

/// GET or POST _elastic/_search
pub fn es_compat_search_handler(
_search_service: Arc<dyn SearchService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
super::filter::elastic_search_filter().then(|_params: SearchQueryParams| async move {
elastic_search_filter().then(|_params: SearchQueryParams| async move {
// TODO
let api_error = ApiError {
service_code: ServiceErrorCode::NotSupportedYet,
Expand Down
Loading

0 comments on commit ae768bf

Please sign in to comment.