Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add es _stats API #4442

Merged
merged 6 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ pub use collector::QuickwitAggregations;
use metrics::SEARCH_METRICS;
use quickwit_common::tower::Pool;
use quickwit_doc_mapper::DocMapper;
use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::metastore::{
ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, MetastoreServiceClient,
};
use tantivy::schema::NamedFieldDocument;

/// Refer to this as `crate::Result<T>`.
Expand All @@ -65,8 +67,8 @@ pub use find_trace_ids_collector::FindTraceIdsCollector;
use quickwit_config::SearcherConfig;
use quickwit_doc_mapper::tag_pruning::TagFilterAst;
use quickwit_metastore::{
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata,
SplitState,
IndexMetadata, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt,
MetastoreServiceStreamSplitsExt, SplitMetadata, SplitState,
};
use quickwit_proto::search::{PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets};
use quickwit_proto::types::IndexUid;
Expand All @@ -81,7 +83,10 @@ pub use crate::cluster_client::ClusterClient;
pub use crate::error::{parse_grpc_error, SearchError};
use crate::fetch_docs::fetch_docs;
use crate::leaf::leaf_search;
pub use crate::root::{jobs_to_leaf_requests, root_search, IndexMetasForLeafSearch, SearchJob};
pub use crate::root::{
check_all_index_metadata_found, jobs_to_leaf_requests, root_search, IndexMetasForLeafSearch,
SearchJob,
};
pub use crate::search_job_placer::{Job, SearchJobPlacer};
pub use crate::search_response_rest::SearchResponseRest;
pub use crate::search_stream::root_search_stream;
Expand Down Expand Up @@ -167,8 +172,16 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn
}
}

/// Get all splits of given index ids
pub async fn list_all_splits(
index_uids: Vec<IndexUid>,
metastore: &mut MetastoreServiceClient,
) -> crate::Result<Vec<SplitMetadata>> {
list_relevant_splits(index_uids, None, None, None, metastore).await
}

/// Extract the list of relevant splits for a given request.
async fn list_relevant_splits(
pub async fn list_relevant_splits(
index_uids: Vec<IndexUid>,
start_timestamp: Option<i64>,
end_timestamp: Option<i64>,
Expand Down Expand Up @@ -196,6 +209,30 @@ async fn list_relevant_splits(
Ok(splits_metadata)
}

/// Resolve index patterns and returns IndexMetadata for found indices.
/// Patterns follow the elastic search patterns.
pub async fn resolve_index_patterns(
index_id_patterns: &[String],
metastore: &mut MetastoreServiceClient,
) -> crate::Result<Vec<IndexMetadata>> {
let list_indexes_metadata_request = if index_id_patterns.is_empty() {
ListIndexesMetadataRequest::all()
} else {
ListIndexesMetadataRequest {
index_id_patterns: index_id_patterns.to_owned(),
}
};

// Get the index ids from the request
let indexes_metadata = metastore
.clone()
.list_indexes_metadata(list_indexes_metadata_request)
.await?
.deserialize_indexes_metadata()?;
check_all_index_metadata_found(&indexes_metadata, index_id_patterns)?;
Ok(indexes_metadata)
}

/// Converts a Tantivy `NamedFieldDocument` into a json string using the
/// schema defined by the DocMapper.
///
Expand Down
29 changes: 5 additions & 24 deletions quickwit/quickwit-search/src/list_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ use futures::future::try_join_all;
use itertools::Itertools;
use quickwit_common::shared_consts::SPLIT_FIELDS_FILE_NAME;
use quickwit_common::uri::Uri;
use quickwit_metastore::{ListIndexesMetadataResponseExt, SplitMetadata};
use quickwit_proto::metastore::{
ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient,
};
use quickwit_metastore::SplitMetadata;
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_proto::search::{
deserialize_split_fields, LeafListFieldsRequest, ListFields, ListFieldsEntryResponse,
ListFieldsRequest, ListFieldsResponse, SplitIdAndFooterOffsets,
Expand All @@ -40,9 +38,8 @@ use quickwit_proto::types::IndexUid;
use quickwit_storage::Storage;

use crate::leaf::open_split_bundle;
use crate::root::check_all_index_metadata_found;
use crate::service::SearcherContext;
use crate::{list_relevant_splits, ClusterClient, SearchError, SearchJob};
use crate::{list_relevant_splits, resolve_index_patterns, ClusterClient, SearchError, SearchJob};

/// Get the list of splits for the request which we need to scan.
pub async fn get_fields_from_split<'a>(
Expand Down Expand Up @@ -290,24 +287,8 @@ pub async fn root_list_fields(
cluster_client: &ClusterClient,
mut metastore: MetastoreServiceClient,
) -> crate::Result<ListFieldsResponse> {
let list_indexes_metadata_request = if list_fields_req.index_id_patterns.is_empty() {
ListIndexesMetadataRequest::all()
} else {
ListIndexesMetadataRequest {
index_id_patterns: list_fields_req.index_id_patterns.clone(),
}
};

// Get the index ids from the request
let indexes_metadata = metastore
.clone()
.list_indexes_metadata(list_indexes_metadata_request)
.await?
.deserialize_indexes_metadata()?;
check_all_index_metadata_found(
&indexes_metadata[..],
&list_fields_req.index_id_patterns[..],
)?;
let indexes_metadata =
resolve_index_patterns(&list_fields_req.index_id_patterns[..], &mut metastore).await?;
// The request contains a wildcard, but couldn't find any index.
if indexes_metadata.is_empty() {
return Ok(ListFieldsResponse { fields: vec![] });
Expand Down
31 changes: 5 additions & 26 deletions quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,8 @@ use futures::future::try_join_all;
use itertools::{Either, Itertools};
use quickwit_common::PrettySample;
use quickwit_config::build_doc_mapper;
use quickwit_metastore::{
ListIndexesMetadataResponseExt, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt,
SplitMetadata,
};
use quickwit_proto::metastore::{
ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, MetastoreServiceClient,
};
use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata};
use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::{
LeafListTermsRequest, LeafListTermsResponse, ListTermsRequest, ListTermsResponse,
SplitIdAndFooterOffsets, SplitSearchError,
Expand All @@ -44,8 +39,7 @@ use tantivy::{ReloadPolicy, Term};
use tracing::{debug, error, info, instrument};

use crate::leaf::open_index_with_caches;
use crate::root::check_all_index_metadata_found;
use crate::{ClusterClient, SearchError, SearchJob, SearcherContext};
use crate::{resolve_index_patterns, ClusterClient, SearchError, SearchJob, SearcherContext};

/// Performs a distributed list terms.
/// 1. Sends leaf request over gRPC to multiple leaf nodes.
Expand All @@ -59,23 +53,8 @@ pub async fn root_list_terms(
cluster_client: &ClusterClient,
) -> crate::Result<ListTermsResponse> {
let start_instant = tokio::time::Instant::now();
let list_indexes_metadata_request = if list_terms_request.index_id_patterns.is_empty() {
ListIndexesMetadataRequest::all()
} else {
ListIndexesMetadataRequest {
index_id_patterns: list_terms_request.index_id_patterns.clone(),
}
};

// Get the index ids from the request
let indexes_metadata = metastore
.list_indexes_metadata(list_indexes_metadata_request)
.await?
.deserialize_indexes_metadata()?;
check_all_index_metadata_found(
&indexes_metadata[..],
&list_terms_request.index_id_patterns[..],
)?;
let indexes_metadata =
resolve_index_patterns(&list_terms_request.index_id_patterns, &mut metastore).await?;
// The request contains a wildcard, but couldn't find any index.
if indexes_metadata.is_empty() {
return Ok(ListTermsResponse {
Expand Down
71 changes: 57 additions & 14 deletions quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ mod tests {
use quickwit_config::{IngestApiConfig, NodeConfig};
use quickwit_ingest::{FetchRequest, IngestServiceClient, SuggestTruncateRequest};
use quickwit_proto::ingest::router::IngestRouterServiceClient;
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_search::MockSearchService;

use crate::elasticsearch_api::bulk_v2::ElasticBulkResponse;
Expand All @@ -161,11 +162,17 @@ mod tests {
async fn test_bulk_api_returns_404_if_index_id_does_not_exist() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = r#"
{ "create" : { "_index" : "my-index", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand All @@ -185,11 +192,17 @@ mod tests {
async fn test_bulk_api_returns_200() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand All @@ -213,11 +226,17 @@ mod tests {
async fn test_bulk_api_returns_200_if_payload_has_blank_lines() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = "
{\"create\": {\"_index\": \"my-index-1\", \"_id\": \"1674834324802805760\"}}
\u{20}\u{20}\u{20}\u{20}\n
Expand All @@ -238,11 +257,17 @@ mod tests {
async fn test_bulk_index_api_returns_200() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand All @@ -266,11 +291,17 @@ mod tests {
async fn test_bulk_api_blocks_when_refresh_wait_for_is_specified() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let (universe, _temp_dir, ingest_service, ingest_service_mailbox) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand Down Expand Up @@ -345,11 +376,17 @@ mod tests {
async fn test_bulk_api_blocks_when_refresh_true_is_specified() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let (universe, _temp_dir, ingest_service, ingest_service_mailbox) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
{"id": 1, "message": "push"}
Expand Down Expand Up @@ -423,10 +460,16 @@ mod tests {
async fn test_bulk_ingest_request_returns_400_if_action_is_malformed() {
let config = Arc::new(NodeConfig::for_test());
let search_service = Arc::new(MockSearchService::new());
let metastore_service = MetastoreServiceClient::mock();
let ingest_service = IngestServiceClient::from(IngestServiceClient::mock());
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let elastic_api_handlers =
elastic_api_handlers(config, search_service, ingest_service, ingest_router);
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
);
let payload = r#"
{"create": {"_index": "my-index", "_id": "1"},}
{"id": 1, "message": "my-doc"}"#;
Expand Down
14 changes: 14 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,20 @@ pub(crate) fn elastic_index_count_filter(
.and(json_or_empty())
}

// No support for any query parameters for now.
#[utoipa::path(get, tag = "Search", path = "/{index}/_stats")]
pub(crate) fn elastic_index_stats_filter(
) -> impl Filter<Extract = (Vec<String>,), Error = Rejection> + Clone {
warp::path!("_elastic" / String / "_stats")
.and_then(extract_index_id_patterns)
.and(warp::get().or(warp::post()).unify())
}

#[utoipa::path(get, tag = "Search", path = "/_stats")]
pub(crate) fn elastic_stats_filter() -> impl Filter<Extract = (), Error = Rejection> + Clone {
warp::path!("_elastic" / "_stats").and(warp::get())
}

#[utoipa::path(get, tag = "Search", path = "/{index}/_search")]
pub(crate) fn elastic_index_search_filter(
) -> impl Filter<Extract = (Vec<String>, SearchQueryParams, SearchBody), Error = Rejection> + Clone
Expand Down
Loading