Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster info endpoint for /_elastic #3594

Merged
merged 6 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions quickwit/quickwit-serve/src/build_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use once_cell::sync::OnceCell;
use quickwit_common::runtimes::RuntimesConfig;
use serde::Serialize;

#[derive(Debug, Eq, PartialEq, Serialize)]
#[derive(Debug, Eq, PartialEq, Serialize, utoipa::ToSchema)]
pub struct BuildInfo {
pub build_date: &'static str,
pub build_profile: &'static str,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl BuildInfo {
}
}

#[derive(Debug, Eq, PartialEq, Serialize)]
#[derive(Debug, Eq, PartialEq, Serialize, utoipa::ToSchema)]
pub struct RuntimeInfo {
pub num_cpus_logical: usize,
pub num_cpus_physical: usize,
Expand Down
20 changes: 13 additions & 7 deletions quickwit/quickwit-serve/src/elastic_search_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;

use quickwit_config::IngestApiConfig;
use quickwit_config::{IngestApiConfig, QuickwitConfig};
use quickwit_ingest::{
FetchRequest, IngestResponse, IngestServiceClient, SuggestTruncateRequest,
};
Expand All @@ -142,10 +142,11 @@ mod tests {

#[tokio::test]
async fn test_bulk_api_returns_404_if_index_id_does_not_exist() {
let config = Arc::new(QuickwitConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let elastic_api_handlers = elastic_api_handlers(search_service, ingest_service);
let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service);
let payload = r#"
{ "create" : { "_index" : "my-index", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand All @@ -163,10 +164,11 @@ mod tests {

#[tokio::test]
async fn test_bulk_api_returns_200() {
let config = Arc::new(QuickwitConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let elastic_api_handlers = elastic_api_handlers(search_service, ingest_service);
let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand All @@ -188,10 +190,11 @@ mod tests {

#[tokio::test]
async fn test_bulk_index_api_returns_200() {
let config = Arc::new(QuickwitConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let elastic_api_handlers = elastic_api_handlers(search_service, ingest_service);
let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand All @@ -213,10 +216,11 @@ mod tests {

#[tokio::test]
async fn test_bulk_api_blocks_when_refresh_wait_for_is_specified() {
let config = Arc::new(QuickwitConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let (universe, _temp_dir, ingest_service, ingest_service_mailbox) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let elastic_api_handlers = elastic_api_handlers(search_service, ingest_service);
let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand Down Expand Up @@ -289,10 +293,11 @@ mod tests {

#[tokio::test]
async fn test_bulk_api_blocks_when_refresh_true_is_specified() {
let config = Arc::new(QuickwitConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let (universe, _temp_dir, ingest_service, ingest_service_mailbox) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let elastic_api_handlers = elastic_api_handlers(search_service, ingest_service);
let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand Down Expand Up @@ -364,9 +369,10 @@ mod tests {

#[tokio::test]
async fn test_bulk_ingest_request_returns_400_if_action_is_malformed() {
let config = Arc::new(QuickwitConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let ingest_service = IngestServiceClient::new(IngestServiceClient::mock());
let elastic_api_handlers = elastic_api_handlers(search_service, ingest_service);
let elastic_api_handlers = elastic_api_handlers(config, search_service, ingest_service);
let payload = r#"
{"create": {"_index": "my-index", "_id": "1"},}
{"id": 1, "message": "my-doc"}"#;
Expand Down
14 changes: 14 additions & 0 deletions quickwit/quickwit-serve/src/elastic_search_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ use crate::elastic_search_api::model::{ElasticIngestOptions, SearchBody, SearchQ
const BODY_LENGTH_LIMIT: Byte = byte_unit::Byte::from_bytes(1_000_000);
const CONTENT_LENGTH_LIMIT: Byte = byte_unit::Byte::from_bytes(10 * 1024 * 1024); // 10MiB

// TODO: Make all elastic endpoint models `utoipa` compatible
// and register them here.
#[derive(utoipa::OpenApi)]
#[openapi(paths(elastic_cluster_info_filter,))]
pub struct ElasticCompatibleApi;

#[utoipa::path(get, tag = "Cluster Info", path = "/_elastic")]
pub(crate) fn elastic_cluster_info_filter() -> impl Filter<Extract = (), Error = Rejection> + Clone
{
warp::path!("_elastic")
.and(warp::get())
.and(warp::path::end())
}

#[utoipa::path(get, tag = "Search", path = "/_search")]
pub(crate) fn elastic_search_filter(
) -> impl Filter<Extract = (SearchQueryParams,), Error = Rejection> + Clone {
Expand Down
99 changes: 83 additions & 16 deletions quickwit/quickwit-serve/src/elastic_search_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,30 @@ mod rest_handler;
use std::sync::Arc;

use bulk::{es_compat_bulk_handler, es_compat_index_bulk_handler};
pub use filter::ElasticCompatibleApi;
use quickwit_config::QuickwitConfig;
use quickwit_ingest::IngestServiceClient;
use quickwit_search::SearchService;
use rest_handler::{
es_compat_index_multi_search_handler, es_compat_index_search_handler, es_compat_search_handler,
es_compat_cluster_info_handler, es_compat_index_multi_search_handler,
es_compat_index_search_handler, es_compat_search_handler,
};
use serde::{Deserialize, Serialize};
use warp::{Filter, Rejection};

use crate::BuildInfo;

/// Setup Elasticsearch API handlers
///
/// This is where all newly supported Elasticsearch handlers
/// should be registered.
pub fn elastic_api_handlers(
quickwit_config: Arc<QuickwitConfig>,
search_service: Arc<dyn SearchService>,
ingest_service: IngestServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
es_compat_search_handler(search_service.clone())
es_compat_cluster_info_handler(quickwit_config, BuildInfo::get())
.or(es_compat_search_handler(search_service.clone()))
.or(es_compat_index_search_handler(search_service.clone()))
.or(es_compat_index_multi_search_handler(search_service))
.or(es_compat_bulk_handler(ingest_service.clone()))
Expand Down Expand Up @@ -82,12 +89,19 @@ impl From<i64> for TrackTotalHits {
mod tests {
use std::sync::Arc;

use assert_json_diff::assert_json_include;
use mockall::predicate;
use quickwit_config::QuickwitConfig;
use quickwit_ingest::{IngestApiService, IngestServiceClient};
use quickwit_search::MockSearchService;
use serde_json::Value as JsonValue;
use warp::Filter;

use super::model::ElasticSearchError;
use crate::elastic_search_api::model::MultiSearchResponse;
use crate::elastic_search_api::rest_handler::es_compat_cluster_info_handler;
use crate::rest::recover_fn;
use crate::BuildInfo;

fn ingest_service_client() -> IngestServiceClient {
let universe = quickwit_actors::Universe::new();
Expand All @@ -97,6 +111,7 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_200_responses() {
let config = Arc::new(QuickwitConfig::for_test());
let mut mock_search_service = MockSearchService::new();
mock_search_service
.expect_root_search()
Expand All @@ -111,8 +126,11 @@ mod tests {
},
))
.returning(|_| Ok(Default::default()));
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{"index":"index-1"}
{"query":{"query_string":{"query":"test"}}, "from": 5, "size": 20}
Expand All @@ -138,6 +156,7 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_one_500_and_one_200_responses() {
let config = Arc::new(QuickwitConfig::for_test());
let mut mock_search_service = MockSearchService::new();
mock_search_service
.expect_root_search()
Expand All @@ -150,8 +169,11 @@ mod tests {
))
}
});
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{"index":"index-1"}
{"query":{"query_string":{"query":"test"}}, "from": 5, "size": 10}
Expand Down Expand Up @@ -180,9 +202,13 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_400_with_malformed_request_header() {
let config = Arc::new(QuickwitConfig::for_test());
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{"index":"index-1"
{"query":{"query_string":{"query":"test"}}}
Expand All @@ -204,9 +230,13 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_400_with_malformed_request_body() {
let config = Arc::new(QuickwitConfig::for_test());
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{"index":"index-1"}
{"query":{"query_string":{"bad":"test"}}}
Expand All @@ -228,9 +258,13 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_400_with_only_a_header_request() {
let config = Arc::new(QuickwitConfig::for_test());
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{"index":"index-1"}
"#;
Expand All @@ -251,9 +285,13 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_400_with_no_index() {
let config = Arc::new(QuickwitConfig::for_test());
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{}
{"query":{"query_string":{"bad":"test"}}}
Expand All @@ -273,9 +311,13 @@ mod tests {

#[tokio::test]
async fn test_msearch_api_return_400_with_multiple_indexes() {
let config = Arc::new(QuickwitConfig::for_test());
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let es_search_api_handler = super::elastic_api_handlers(
config,
Arc::new(mock_search_service),
ingest_service_client(),
);
let msearch_payload = r#"
{"index": ["index-1", "index-2"]}
{"query":{"query_string":{"bad":"test"}}}
Expand All @@ -294,4 +336,29 @@ mod tests {
.unwrap()
.starts_with("Invalid argument: Searching only one index is supported for now."));
}

#[tokio::test]
async fn test_es_compat_cluster_info_handler() {
let build_info = BuildInfo::get();
let config = Arc::new(QuickwitConfig::for_test());
let handler =
es_compat_cluster_info_handler(config.clone(), build_info).recover(recover_fn);
let resp = warp::test::request()
.path("/_elastic")
.reply(&handler)
.await;
assert_eq!(resp.status(), 200);
let resp_json: JsonValue = serde_json::from_slice(resp.body()).unwrap();
let expected_response_json = serde_json::json!({
"name" : config.node_id,
"cluster_name" : config.cluster_id,
"version" : {
"distribution" : "quickwit",
"number" : build_info.version,
"build_hash" : build_info.commit_hash,
"build_date" : build_info.build_date,
}
});
assert_json_include!(actual: resp_json, expected: expected_response_json);
}
}
36 changes: 32 additions & 4 deletions quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,56 @@ use futures_util::StreamExt;
use hyper::StatusCode;
use itertools::Itertools;
use quickwit_common::truncate_str;
use quickwit_config::QuickwitConfig;
use quickwit_proto::{SearchResponse, ServiceErrorCode};
use quickwit_query::query_ast::{QueryAst, UserInputQuery};
use quickwit_query::BooleanOperand;
use quickwit_search::{SearchError, SearchService};
use serde_json::json;
use warp::{Filter, Rejection};

use super::filter::elastic_multi_search_filter;
use super::filter::{
elastic_cluster_info_filter, elastic_index_search_filter, elastic_multi_search_filter,
elastic_search_filter,
};
use super::model::{
ElasticSearchError, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse,
MultiSearchSingleResponse, SearchBody, SearchQueryParams,
};
use crate::elastic_search_api::filter::elastic_index_search_filter;
use crate::elastic_search_api::model::SortField;
use crate::format::BodyFormat;
use crate::json_api_response::{make_json_api_response, ApiError, JsonApiResponse};
use crate::with_arg;
use crate::{with_arg, BuildInfo};

/// Elastic compatible cluster info handler.
pub fn es_compat_cluster_info_handler(
quickwit_config: Arc<QuickwitConfig>,
build_info: &'static BuildInfo,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_cluster_info_filter()
.and(with_arg(quickwit_config))
.and(with_arg(build_info))
.then(
|config: Arc<QuickwitConfig>, build_info: &'static BuildInfo| async move {
warp::reply::json(&json!({
"name" : config.node_id,
"cluster_name" : config.cluster_id,
"version" : {
"distribution" : "quickwit",
"number" : build_info.version,
"build_hash" : build_info.commit_hash,
"build_date" : build_info.build_date,
}
}))
},
)
}

/// GET or POST _elastic/_search
pub fn es_compat_search_handler(
_search_service: Arc<dyn SearchService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
super::filter::elastic_search_filter().then(|_params: SearchQueryParams| async move {
elastic_search_filter().then(|_params: SearchQueryParams| async move {
// TODO
let api_error = ApiError {
service_code: ServiceErrorCode::NotSupportedYet,
Expand Down
Loading
Loading