Skip to content

Commit

Permalink
add es _stats API
Browse files Browse the repository at this point in the history
add /{index}/_stats API
add /_stats API
  • Loading branch information
PSeitz committed Jan 23, 2024
1 parent 0c283ab commit 74afe9d
Show file tree
Hide file tree
Showing 9 changed files with 393 additions and 26 deletions.
47 changes: 42 additions & 5 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ pub use collector::QuickwitAggregations;
use metrics::SEARCH_METRICS;
use quickwit_common::tower::Pool;
use quickwit_doc_mapper::DocMapper;
use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::metastore::{
ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, MetastoreServiceClient,
};
use tantivy::schema::NamedFieldDocument;

/// Refer to this as `crate::Result<T>`.
Expand All @@ -65,8 +67,8 @@ pub use find_trace_ids_collector::FindTraceIdsCollector;
use quickwit_config::SearcherConfig;
use quickwit_doc_mapper::tag_pruning::TagFilterAst;
use quickwit_metastore::{
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata,
SplitState,
IndexMetadata, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt,
MetastoreServiceStreamSplitsExt, SplitMetadata, SplitState,
};
use quickwit_proto::search::{PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets};
use quickwit_proto::types::IndexUid;
Expand All @@ -81,7 +83,10 @@ pub use crate::cluster_client::ClusterClient;
pub use crate::error::{parse_grpc_error, SearchError};
use crate::fetch_docs::fetch_docs;
use crate::leaf::leaf_search;
pub use crate::root::{jobs_to_leaf_requests, root_search, IndexMetasForLeafSearch, SearchJob};
pub use crate::root::{
check_all_index_metadata_found, jobs_to_leaf_requests, root_search, IndexMetasForLeafSearch,
SearchJob,
};
pub use crate::search_job_placer::{Job, SearchJobPlacer};
pub use crate::search_response_rest::SearchResponseRest;
pub use crate::search_stream::root_search_stream;
Expand Down Expand Up @@ -167,8 +172,16 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn
}
}

/// Get all splits of given index ids
pub async fn list_all_splits(
index_uids: Vec<IndexUid>,
metastore: &mut MetastoreServiceClient,
) -> crate::Result<Vec<SplitMetadata>> {
list_relevant_splits(index_uids, None, None, None, metastore).await
}

/// Extract the list of relevant splits for a given request.
async fn list_relevant_splits(
pub async fn list_relevant_splits(
index_uids: Vec<IndexUid>,
start_timestamp: Option<i64>,
end_timestamp: Option<i64>,
Expand Down Expand Up @@ -196,6 +209,30 @@ async fn list_relevant_splits(
Ok(splits_metadata)
}

/// List all splits of given index id patterns
/// Returns a mapping from index uid to index id and a list of splits
pub async fn resolve_index_patterns(
index_id_patterns: Vec<String>,
metastore: &mut MetastoreServiceClient,
) -> crate::Result<Vec<IndexMetadata>> {
let list_indexes_metadata_request = if index_id_patterns.is_empty() {
ListIndexesMetadataRequest::all()
} else {
ListIndexesMetadataRequest {
index_id_patterns: index_id_patterns.clone(),
}
};

// Get the index ids from the request
let indexes_metadata = metastore
.clone()
.list_indexes_metadata(list_indexes_metadata_request)
.await?
.deserialize_indexes_metadata()?;
check_all_index_metadata_found(&indexes_metadata, &index_id_patterns)?;
Ok(indexes_metadata)
}

/// Converts a Tantivy `NamedFieldDocument` into a json string using the
/// schema defined by the DocMapper.
///
Expand Down
71 changes: 57 additions & 14 deletions quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ mod tests {
use quickwit_config::{IngestApiConfig, NodeConfig};
use quickwit_ingest::{FetchRequest, IngestServiceClient, SuggestTruncateRequest};
use quickwit_proto::ingest::router::IngestRouterServiceClient;
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_search::MockSearchService;

use crate::elasticsearch_api::bulk_v2::ElasticBulkResponse;
Expand All @@ -161,11 +162,17 @@ mod tests {
async fn test_bulk_api_returns_404_if_index_id_does_not_exist() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = r#"
{ "create" : { "_index" : "my-index", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand All @@ -185,11 +192,17 @@ mod tests {
async fn test_bulk_api_returns_200() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand All @@ -213,11 +226,17 @@ mod tests {
async fn test_bulk_api_returns_200_if_payload_has_blank_lines() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = "
{\"create\": {\"_index\": \"my-index-1\", \"_id\": \"1674834324802805760\"}}
\u{20}\u{20}\u{20}\u{20}\n
Expand All @@ -238,11 +257,17 @@ mod tests {
async fn test_bulk_index_api_returns_200() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand All @@ -266,11 +291,17 @@ mod tests {
async fn test_bulk_api_blocks_when_refresh_wait_for_is_specified() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let (universe, _temp_dir, ingest_service, ingest_service_mailbox) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand Down Expand Up @@ -345,11 +376,17 @@ mod tests {
async fn test_bulk_api_blocks_when_refresh_true_is_specified() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let (universe, _temp_dir, ingest_service, ingest_service_mailbox) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand Down Expand Up @@ -423,10 +460,16 @@ mod tests {
async fn test_bulk_ingest_request_returns_400_if_action_is_malformed() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let ingest_service = IngestServiceClient::from(IngestServiceClient::mock());
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = r#"
{"create": {"_index": "my-index", "_id": "1"},}
{"id": 1, "message": "my-doc"}"#;
Expand Down
14 changes: 14 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,20 @@ pub(crate) fn elastic_index_count_filter(
.and(json_or_empty())
}

// No support for any query parameters for now.
#[utoipa::path(get, tag = "Search", path = "/{index}/_stats")]
pub(crate) fn elastic_index_stats_filter(
) -> impl Filter<Extract = (Vec<String>,), Error = Rejection> + Clone {
warp::path!("_elastic" / String / "_stats")
.and_then(extract_index_id_patterns)
.and(warp::get().or(warp::post()).unify())
}

#[utoipa::path(get, tag = "Search", path = "/_stats")]
pub(crate) fn elastic_stats_filter() -> impl Filter<Extract = (), Error = Rejection> + Clone {
warp::path!("_elastic" / "_stats").and(warp::get().or(warp::post()).unify())
}

#[utoipa::path(get, tag = "Search", path = "/{index}/_search")]
pub(crate) fn elastic_index_search_filter(
) -> impl Filter<Extract = (Vec<String>, SearchQueryParams, SearchBody), Error = Rejection> + Clone
Expand Down
13 changes: 13 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use hyper::StatusCode;
use quickwit_config::NodeConfig;
use quickwit_ingest::IngestServiceClient;
use quickwit_proto::ingest::router::IngestRouterServiceClient;
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_search::SearchService;
use rest_handler::{
es_compat_cluster_info_handler, es_compat_index_multi_search_handler,
Expand All @@ -41,6 +42,7 @@ use warp::{Filter, Rejection};

use self::rest_handler::{
es_compat_index_count_handler, es_compat_index_field_capabilities_handler,
es_compat_index_stats_handler, es_compat_stats_handler,
};
use crate::elasticsearch_api::model::ElasticsearchError;
use crate::json_api_response::JsonApiResponse;
Expand All @@ -55,6 +57,7 @@ pub fn elastic_api_handlers(
search_service: Arc<dyn SearchService>,
ingest_service: IngestServiceClient,
ingest_router: IngestRouterServiceClient,
metastore: MetastoreServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
es_compat_cluster_info_handler(node_config, BuildInfo::get())
.or(es_compat_search_handler(search_service.clone()))
Expand All @@ -70,6 +73,8 @@ pub fn elastic_api_handlers(
ingest_router.clone(),
))
.or(es_compat_index_bulk_handler(ingest_service, ingest_router))
.or(es_compat_index_stats_handler(metastore.clone()))
.or(es_compat_stats_handler(metastore))
// Register newly created handlers here.
}

Expand Down Expand Up @@ -123,6 +128,7 @@ mod tests {
use quickwit_config::NodeConfig;
use quickwit_ingest::{IngestApiService, IngestServiceClient};
use quickwit_proto::ingest::router::IngestRouterServiceClient;
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_search::MockSearchService;
use serde_json::Value as JsonValue;
use warp::Filter;
Expand Down Expand Up @@ -163,6 +169,7 @@ mod tests {
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
);
let msearch_payload = r#"
{"index":"index-1"}
Expand Down Expand Up @@ -213,6 +220,7 @@ mod tests {
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
);
let msearch_payload = r#"
{"index":"index-1"}
Expand Down Expand Up @@ -251,6 +259,7 @@ mod tests {
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
);
let msearch_payload = r#"
{"index":"index-1"
Expand Down Expand Up @@ -282,6 +291,7 @@ mod tests {
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
);
let msearch_payload = r#"
{"index":"index-1"}
Expand Down Expand Up @@ -313,6 +323,7 @@ mod tests {
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
);
let msearch_payload = r#"
{"index":"index-1"}
Expand Down Expand Up @@ -343,6 +354,7 @@ mod tests {
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
);
let msearch_payload = r#"
{}
Expand Down Expand Up @@ -385,6 +397,7 @@ mod tests {
Arc::new(mock_search_service),
ingest_service_client(),
ingest_router,
MetastoreServiceClient::mock().into(),
);
let msearch_payload = r#"
{"index": ["index-1", "index-2"]}
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod multi_search;
mod scroll;
mod search_body;
mod search_query_params;
mod stats;

pub use bulk_body::BulkAction;
pub use bulk_query_params::ElasticBulkOptions;
Expand All @@ -41,6 +42,7 @@ pub use scroll::ScrollQueryParams;
pub use search_body::SearchBody;
pub use search_query_params::{SearchQueryParams, SearchQueryParamsCount};
use serde::{Deserialize, Serialize};
pub use stats::{ElasticsearchStatsResponse, StatsResponseEntry};

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SortField {
Expand Down
Loading

0 comments on commit 74afe9d

Please sign in to comment.