From bfc64e166655c1f9fe22f873f118173d20415885 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Wed, 27 Dec 2023 13:21:16 +0100 Subject: [PATCH] Allow index patterns in list terms. (#4322) * Allow index patterns in list terms. * Fix fmt/clippy. * Fix clippy.* * Add multi indexes field capabilities rest api tests. * Fix rest api tests. --- quickwit/quickwit-jaeger/src/lib.rs | 6 +- .../protos/quickwit/search.proto | 8 +- .../src/codegen/quickwit/quickwit.search.rs | 10 +- quickwit/quickwit-search/src/leaf.rs | 192 +-------- quickwit/quickwit-search/src/lib.rs | 7 +- quickwit/quickwit-search/src/list_fields.rs | 13 +- quickwit/quickwit-search/src/list_terms.rs | 407 ++++++++++++++++++ quickwit/quickwit-search/src/root.rs | 143 +----- quickwit/quickwit-search/src/service.rs | 6 +- quickwit/quickwit-search/src/tests.rs | 9 +- .../model/field_capability.rs | 2 +- .../src/jaeger_api/rest_handler.rs | 4 +- .../0001-field-capabilities.yaml | 25 +- .../_setup.elasticsearch.yaml | 24 +- .../_setup.quickwit.yaml | 38 +- .../_teardown.elasticsearch.yaml | 3 + .../_teardown.quickwit.yaml | 4 + 17 files changed, 537 insertions(+), 364 deletions(-) create mode 100644 quickwit/quickwit-search/src/list_terms.rs diff --git a/quickwit/quickwit-jaeger/src/lib.rs b/quickwit/quickwit-jaeger/src/lib.rs index 0aa4b418833..eb67a024a5d 100644 --- a/quickwit/quickwit-jaeger/src/lib.rs +++ b/quickwit/quickwit-jaeger/src/lib.rs @@ -103,7 +103,7 @@ impl JaegerService { Some(OffsetDateTime::now_utc().unix_timestamp() - self.lookback_period_secs); let search_request = ListTermsRequest { - index_id, + index_id_patterns: vec![index_id], field: "service_name".to_string(), max_hits, start_timestamp, @@ -140,7 +140,7 @@ impl JaegerService { let end_key = SpanFingerprint::end_key(&request.service, span_kind_opt); let search_request = ListTermsRequest { - index_id, + index_id_patterns: vec![index_id], field: "span_fingerprint".to_string(), max_hits, start_timestamp, @@ -2379,7 +2379,7 @@ mod tests { service .expect_root_list_terms() .withf(|req| { - req.index_id == OTEL_TRACES_INDEX_ID + req.index_id_patterns == vec![OTEL_TRACES_INDEX_ID] && req.field == "service_name" && req.start_timestamp.is_some() }) diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 7f5b05231bc..ecf7fd05089 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -118,8 +118,8 @@ message ReportSplitsResponse {} // -- ListFields ------------------- message ListFieldsRequest { - // Optional limit query to a set of indexes. - repeated string index_ids = 1; + // Index ID patterns + repeated string index_id_patterns = 1; // Optional limit query to a list of fields // Wildcard expressions are supported. repeated string fields = 2; @@ -465,8 +465,8 @@ message FetchDocsResponse { } message ListTermsRequest { - // Index ID - string index_id = 1; + // Index ID patterns + repeated string index_id_patterns = 1; // Field to search on string field = 3; diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 2c55a308fa5..ee4b50643e9 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -64,9 +64,9 @@ pub struct ReportSplitsResponse {} #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ListFieldsRequest { - /// Optional limit query to a set of indexes. + /// Index ID patterns #[prost(string, repeated, tag = "1")] - pub index_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + pub index_id_patterns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// Optional limit query to a list of fields /// Wildcard expressions are supported. #[prost(string, repeated, tag = "2")] @@ -448,9 +448,9 @@ pub struct FetchDocsResponse { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ListTermsRequest { - /// Index ID - #[prost(string, tag = "1")] - pub index_id: ::prost::alloc::string::String, + /// Index ID patterns + #[prost(string, repeated, tag = "1")] + pub index_id_patterns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// Field to search on #[prost(string, tag = "3")] pub field: ::prost::alloc::string::String, diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 3e371059629..d4b7f68482a 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -18,19 +18,17 @@ // along with this program. If not, see . use std::collections::{HashMap, HashSet}; -use std::ops::Bound; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use anyhow::Context; use futures::future::try_join_all; -use itertools::{Either, Itertools}; use quickwit_common::PrettySample; use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory}; use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo}; use quickwit_proto::search::{ - CountHits, LeafListTermsResponse, LeafSearchResponse, ListTermsRequest, PartialHit, - SearchRequest, SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError, + CountHits, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, SortValue, + SplitIdAndFooterOffsets, SplitSearchError, }; use quickwit_query::query_ast::QueryAst; use quickwit_query::tokenizers::TokenizerManager; @@ -39,7 +37,7 @@ use quickwit_storage::{ }; use tantivy::directory::FileSlice; use tantivy::fastfield::FastFieldReaders; -use tantivy::schema::{Field, FieldType}; +use tantivy::schema::Field; use tantivy::{Index, ReloadPolicy, Searcher, Term}; use tracing::*; @@ -697,187 +695,3 @@ async fn leaf_search_single_split_wrapper( .record_new_worst_hit(last_hit.as_ref()); } } - -/// Apply a leaf list terms on a single split. -#[instrument(skip_all, fields(split_id = split.split_id))] -async fn leaf_list_terms_single_split( - searcher_context: &SearcherContext, - search_request: &ListTermsRequest, - storage: Arc, - split: SplitIdAndFooterOffsets, -) -> crate::Result { - let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?; - let split_schema = index.schema(); - let reader = index - .reader_builder() - .reload_policy(ReloadPolicy::Manual) - .try_into()?; - let searcher = reader.searcher(); - - let field = split_schema - .get_field(&search_request.field) - .with_context(|| { - format!( - "couldn't get field named {:?} from schema to list terms", - search_request.field - ) - })?; - - let field_type = split_schema.get_field_entry(field).field_type(); - let start_term: Option = search_request - .start_key - .as_ref() - .map(|data| term_from_data(field, field_type, data)); - let end_term: Option = search_request - .end_key - .as_ref() - .map(|data| term_from_data(field, field_type, data)); - - let mut segment_results = Vec::new(); - for segment_reader in searcher.segment_readers() { - let inverted_index = segment_reader.inverted_index(field)?.clone(); - let dict = inverted_index.terms(); - dict.file_slice_for_range( - ( - start_term - .as_ref() - .map(Term::serialized_value_bytes) - .map(Bound::Included) - .unwrap_or(Bound::Unbounded), - end_term - .as_ref() - .map(Term::serialized_value_bytes) - .map(Bound::Excluded) - .unwrap_or(Bound::Unbounded), - ), - search_request.max_hits, - ) - .read_bytes_async() - .await - .with_context(|| "failed to load sstable range")?; - - let mut range = dict.range(); - if let Some(limit) = search_request.max_hits { - range = range.limit(limit); - } - if let Some(start_term) = &start_term { - range = range.ge(start_term.serialized_value_bytes()) - } - if let Some(end_term) = &end_term { - range = range.lt(end_term.serialized_value_bytes()) - } - let mut stream = range - .into_stream() - .with_context(|| "failed to create stream over sstable")?; - let mut segment_result: Vec> = - Vec::with_capacity(search_request.max_hits.unwrap_or(0) as usize); - while stream.advance() { - segment_result.push(term_to_data(field, field_type, stream.key())); - } - segment_results.push(segment_result); - } - - let merged_iter = segment_results.into_iter().kmerge().dedup(); - let merged_results: Vec> = if let Some(limit) = search_request.max_hits { - merged_iter.take(limit as usize).collect() - } else { - merged_iter.collect() - }; - - Ok(LeafListTermsResponse { - num_hits: merged_results.len() as u64, - terms: merged_results, - num_attempted_splits: 1, - failed_splits: Vec::new(), - }) -} - -fn term_from_data(field: Field, field_type: &FieldType, data: &[u8]) -> Term { - let mut term = Term::from_field_bool(field, false); - term.clear_with_type(field_type.value_type()); - term.append_bytes(data); - term -} - -fn term_to_data(field: Field, field_type: &FieldType, field_value: &[u8]) -> Vec { - let mut term = Term::from_field_bool(field, false); - term.clear_with_type(field_type.value_type()); - term.append_bytes(field_value); - term.serialized_term().to_vec() -} - -/// `leaf` step of list terms. -#[instrument(skip_all, fields(index = ?request.index_id))] -pub async fn leaf_list_terms( - searcher_context: Arc, - request: &ListTermsRequest, - index_storage: Arc, - splits: &[SplitIdAndFooterOffsets], -) -> Result { - info!(split_offsets = ?PrettySample::new(splits, 5)); - let leaf_search_single_split_futures: Vec<_> = splits - .iter() - .map(|split| { - let index_storage_clone = index_storage.clone(); - let searcher_context_clone = searcher_context.clone(); - async move { - let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore.clone() - .acquire_owned() - .await - .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); - // TODO dedicated counter and timer? - crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); - let timer = crate::SEARCH_METRICS - .leaf_search_split_duration_secs - .start_timer(); - let leaf_search_single_split_res = leaf_list_terms_single_split( - &searcher_context_clone, - request, - index_storage_clone, - split.clone(), - ) - .await; - timer.observe_duration(); - leaf_search_single_split_res.map_err(|err| (split.split_id.clone(), err)) - } - }) - .collect(); - - let split_search_results = futures::future::join_all(leaf_search_single_split_futures).await; - - let (split_search_responses, errors): (Vec, Vec<(String, SearchError)>) = - split_search_results - .into_iter() - .partition_map(|split_search_res| match split_search_res { - Ok(split_search_resp) => Either::Left(split_search_resp), - Err(err) => Either::Right(err), - }); - - let merged_iter = split_search_responses - .into_iter() - .map(|leaf_search_response| leaf_search_response.terms) - .kmerge() - .dedup(); - let terms: Vec> = if let Some(limit) = request.max_hits { - merged_iter.take(limit as usize).collect() - } else { - merged_iter.collect() - }; - - let failed_splits = errors - .into_iter() - .map(|(split_id, err)| SplitSearchError { - split_id, - error: err.to_string(), - retryable_error: true, - }) - .collect(); - let merged_search_response = LeafListTermsResponse { - num_hits: terms.len() as u64, - terms, - num_attempted_splits: splits.len() as u64, - failed_splits, - }; - - Ok(merged_search_response) -} diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 1223dbed578..54d606858bf 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -33,6 +33,7 @@ mod leaf; mod leaf_cache; mod list_fields; mod list_fields_cache; +mod list_terms; mod retry; mod root; mod scroll_context; @@ -79,10 +80,8 @@ pub use crate::client::{ pub use crate::cluster_client::ClusterClient; pub use crate::error::{parse_grpc_error, SearchError}; use crate::fetch_docs::fetch_docs; -use crate::leaf::{leaf_list_terms, leaf_search}; -pub use crate::root::{ - jobs_to_leaf_requests, root_list_terms, root_search, IndexMetasForLeafSearch, SearchJob, -}; +use crate::leaf::leaf_search; +pub use crate::root::{jobs_to_leaf_requests, root_search, IndexMetasForLeafSearch, SearchJob}; pub use crate::search_job_placer::{Job, SearchJobPlacer}; pub use crate::search_response_rest::SearchResponseRest; pub use crate::search_stream::root_search_stream; diff --git a/quickwit/quickwit-search/src/list_fields.rs b/quickwit/quickwit-search/src/list_fields.rs index dc3a4a8cb7a..06d4c8784ca 100644 --- a/quickwit/quickwit-search/src/list_fields.rs +++ b/quickwit/quickwit-search/src/list_fields.rs @@ -229,7 +229,8 @@ fn matches_pattern(field_pattern: &str, field_name: &str) -> bool { } } } -/// + +/// `leaf` step of list fields. pub async fn leaf_list_fields( index_id: String, index_storage: Arc, @@ -289,11 +290,11 @@ pub async fn root_list_fields( cluster_client: &ClusterClient, mut metastore: MetastoreServiceClient, ) -> crate::Result { - let list_indexes_metadata_request = if list_fields_req.index_ids.is_empty() { + let list_indexes_metadata_request = if list_fields_req.index_id_patterns.is_empty() { ListIndexesMetadataRequest::all() } else { ListIndexesMetadataRequest { - index_id_patterns: list_fields_req.index_ids.clone(), + index_id_patterns: list_fields_req.index_id_patterns.clone(), } }; @@ -303,7 +304,10 @@ pub async fn root_list_fields( .list_indexes_metadata(list_indexes_metadata_request) .await? .deserialize_indexes_metadata()?; - check_all_index_metadata_found(&indexes_metadata[..], &list_fields_req.index_ids[..])?; + check_all_index_metadata_found( + &indexes_metadata[..], + &list_fields_req.index_id_patterns[..], + )?; // The request contains a wildcard, but couldn't find any index. if indexes_metadata.is_empty() { return Ok(ListFieldsResponse { fields: vec![] }); @@ -326,7 +330,6 @@ pub async fn root_list_fields( .into_iter() .map(|index_metadata| index_metadata.index_uid) .collect(); - let split_metadatas: Vec = list_relevant_splits(index_uids, None, None, None, &mut metastore).await?; diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs new file mode 100644 index 00000000000..0b610da1563 --- /dev/null +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -0,0 +1,407 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::{HashMap, HashSet}; +use std::ops::Bound; +use std::sync::Arc; + +use anyhow::Context; +use futures::future::try_join_all; +use itertools::{Either, Itertools}; +use quickwit_common::PrettySample; +use quickwit_config::build_doc_mapper; +use quickwit_metastore::{ + ListIndexesMetadataResponseExt, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, + SplitMetadata, +}; +use quickwit_proto::metastore::{ + ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, MetastoreServiceClient, +}; +use quickwit_proto::search::{ + LeafListTermsRequest, LeafListTermsResponse, ListTermsRequest, ListTermsResponse, + SplitIdAndFooterOffsets, SplitSearchError, +}; +use quickwit_proto::types::IndexUid; +use quickwit_storage::Storage; +use tantivy::schema::{Field, FieldType}; +use tantivy::{ReloadPolicy, Term}; +use tracing::{debug, error, info, instrument}; + +use crate::leaf::open_index_with_caches; +use crate::root::check_all_index_metadata_found; +use crate::{ClusterClient, SearchError, SearchJob, SearcherContext}; + +/// Performs a distributed list terms. +/// 1. Sends leaf request over gRPC to multiple leaf nodes. +/// 2. Merges the search results. +/// 3. Builds the response and returns. +/// this is much simpler than `root_search` as it doesn't need to get actual docs. +#[instrument(skip(list_terms_request, cluster_client, metastore))] +pub async fn root_list_terms( + list_terms_request: &ListTermsRequest, + mut metastore: MetastoreServiceClient, + cluster_client: &ClusterClient, +) -> crate::Result { + let start_instant = tokio::time::Instant::now(); + let list_indexes_metadata_request = if list_terms_request.index_id_patterns.is_empty() { + ListIndexesMetadataRequest::all() + } else { + ListIndexesMetadataRequest { + index_id_patterns: list_terms_request.index_id_patterns.clone(), + } + }; + + // Get the index ids from the request + let indexes_metadata = metastore + .list_indexes_metadata(list_indexes_metadata_request) + .await? + .deserialize_indexes_metadata()?; + check_all_index_metadata_found( + &indexes_metadata[..], + &list_terms_request.index_id_patterns[..], + )?; + // The request contains a wildcard, but couldn't find any index. + if indexes_metadata.is_empty() { + return Ok(ListTermsResponse { + num_hits: 0, + terms: Vec::new(), + elapsed_time_micros: 0, + errors: Vec::new(), + }); + } + + for index_metadata in indexes_metadata.iter() { + let index_config = &index_metadata.index_config; + let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) + .map_err(|err| { + SearchError::Internal(format!("failed to build doc mapper. cause: {err}")) + })?; + let schema = doc_mapper.schema(); + let field = schema.get_field(&list_terms_request.field).map_err(|_| { + SearchError::InvalidQuery(format!( + "failed to list terms in `{}`, field doesn't exist", + list_terms_request.field + )) + })?; + let field_entry = schema.get_field_entry(field); + if !field_entry.is_indexed() { + return Err(SearchError::InvalidQuery( + "trying to list terms on field which isn't indexed".to_string(), + )); + } + } + let index_uids: Vec = indexes_metadata + .iter() + .map(|index_metadata| index_metadata.index_uid.clone()) + .collect(); + let mut query = quickwit_metastore::ListSplitsQuery::try_from_index_uids(index_uids)? + .with_split_state(quickwit_metastore::SplitState::Published); + + if let Some(start_ts) = list_terms_request.start_timestamp { + query = query.with_time_range_start_gte(start_ts); + } + + if let Some(end_ts) = list_terms_request.end_timestamp { + query = query.with_time_range_end_lt(end_ts); + } + let index_uid_to_index_uri: HashMap = indexes_metadata + .iter() + .map(|index_metadata| { + ( + index_metadata.index_uid.clone(), + index_metadata.index_uri().to_string(), + ) + }) + .collect(); + let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?; + let split_metadatas: Vec = metastore + .clone() + .list_splits(list_splits_request) + .await? + .collect_splits_metadata() + .await?; + + let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); + let assigned_leaf_search_jobs = cluster_client + .search_job_placer + .assign_jobs(jobs, &HashSet::default()) + .await?; + let mut leaf_request_tasks = Vec::new(); + // For each node, forward to a node with an affinity for that index id. + for (client, client_jobs) in assigned_leaf_search_jobs { + let leaf_requests = + jobs_to_leaf_requests(list_terms_request, &index_uid_to_index_uri, client_jobs)?; + for leaf_request in leaf_requests { + leaf_request_tasks.push(cluster_client.leaf_list_terms(leaf_request, client.clone())); + } + } + let leaf_search_responses: Vec = + try_join_all(leaf_request_tasks).await?; + + let failed_splits: Vec<_> = leaf_search_responses + .iter() + .flat_map(|leaf_search_response| &leaf_search_response.failed_splits) + .collect(); + + if !failed_splits.is_empty() { + error!(failed_splits = ?failed_splits, "leaf search response contains at least one failed split"); + let errors: String = failed_splits + .iter() + .map(|splits| splits.to_string()) + .collect::>() + .join(", "); + return Err(SearchError::Internal(errors)); + } + + // Merging is a cpu-bound task, but probably fast enough to not require + // spawning it on a blocking thread. + let merged_iter = leaf_search_responses + .into_iter() + .map(|leaf_search_response| leaf_search_response.terms) + .kmerge() + .dedup(); + let leaf_list_terms_response: Vec> = if let Some(limit) = list_terms_request.max_hits { + merged_iter.take(limit as usize).collect() + } else { + merged_iter.collect() + }; + + debug!( + leaf_list_terms_response_count = leaf_list_terms_response.len(), + "Merged leaf search response." + ); + + let elapsed = start_instant.elapsed(); + + Ok(ListTermsResponse { + num_hits: leaf_list_terms_response.len() as u64, + terms: leaf_list_terms_response, + elapsed_time_micros: elapsed.as_micros() as u64, + errors: Vec::new(), + }) +} + +/// Builds a list of [`LeafListFieldsRequest`], one per index, from a list of [`SearchJob`]. +pub fn jobs_to_leaf_requests( + request: &ListTermsRequest, + index_uid_to_uri: &HashMap, + jobs: Vec, +) -> crate::Result> { + let search_request_for_leaf = request.clone(); + let mut leaf_search_requests = Vec::new(); + // Group jobs by index uid. + for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) { + let index_uri = index_uid_to_uri.get(&index_uid).ok_or_else(|| { + SearchError::Internal(format!( + "received list fields job for an unknown index {index_uid}. it should never happen" + )) + })?; + let leaf_search_request = LeafListTermsRequest { + list_terms_request: Some(search_request_for_leaf.clone()), + index_uri: index_uri.to_string(), + split_offsets: job_group.into_iter().map(|job| job.offsets).collect(), + }; + leaf_search_requests.push(leaf_search_request); + } + Ok(leaf_search_requests) +} + +/// Apply a leaf list terms on a single split. +#[instrument(skip_all, fields(split_id = split.split_id))] +async fn leaf_list_terms_single_split( + searcher_context: &SearcherContext, + search_request: &ListTermsRequest, + storage: Arc, + split: SplitIdAndFooterOffsets, +) -> crate::Result { + let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?; + let split_schema = index.schema(); + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into()?; + let searcher = reader.searcher(); + + let field = split_schema + .get_field(&search_request.field) + .with_context(|| { + format!( + "couldn't get field named {:?} from schema to list terms", + search_request.field + ) + })?; + + let field_type = split_schema.get_field_entry(field).field_type(); + let start_term: Option = search_request + .start_key + .as_ref() + .map(|data| term_from_data(field, field_type, data)); + let end_term: Option = search_request + .end_key + .as_ref() + .map(|data| term_from_data(field, field_type, data)); + + let mut segment_results = Vec::new(); + for segment_reader in searcher.segment_readers() { + let inverted_index = segment_reader.inverted_index(field)?.clone(); + let dict = inverted_index.terms(); + dict.file_slice_for_range( + ( + start_term + .as_ref() + .map(Term::serialized_value_bytes) + .map(Bound::Included) + .unwrap_or(Bound::Unbounded), + end_term + .as_ref() + .map(Term::serialized_value_bytes) + .map(Bound::Excluded) + .unwrap_or(Bound::Unbounded), + ), + search_request.max_hits, + ) + .read_bytes_async() + .await + .with_context(|| "failed to load sstable range")?; + + let mut range = dict.range(); + if let Some(limit) = search_request.max_hits { + range = range.limit(limit); + } + if let Some(start_term) = &start_term { + range = range.ge(start_term.serialized_value_bytes()) + } + if let Some(end_term) = &end_term { + range = range.lt(end_term.serialized_value_bytes()) + } + let mut stream = range + .into_stream() + .with_context(|| "failed to create stream over sstable")?; + let mut segment_result: Vec> = + Vec::with_capacity(search_request.max_hits.unwrap_or(0) as usize); + while stream.advance() { + segment_result.push(term_to_data(field, field_type, stream.key())); + } + segment_results.push(segment_result); + } + + let merged_iter = segment_results.into_iter().kmerge().dedup(); + let merged_results: Vec> = if let Some(limit) = search_request.max_hits { + merged_iter.take(limit as usize).collect() + } else { + merged_iter.collect() + }; + + Ok(LeafListTermsResponse { + num_hits: merged_results.len() as u64, + terms: merged_results, + num_attempted_splits: 1, + failed_splits: Vec::new(), + }) +} + +fn term_from_data(field: Field, field_type: &FieldType, data: &[u8]) -> Term { + let mut term = Term::from_field_bool(field, false); + term.clear_with_type(field_type.value_type()); + term.append_bytes(data); + term +} + +fn term_to_data(field: Field, field_type: &FieldType, field_value: &[u8]) -> Vec { + let mut term = Term::from_field_bool(field, false); + term.clear_with_type(field_type.value_type()); + term.append_bytes(field_value); + term.serialized_term().to_vec() +} + +/// `leaf` step of list terms. +#[instrument(skip_all)] +pub async fn leaf_list_terms( + searcher_context: Arc, + request: &ListTermsRequest, + index_storage: Arc, + splits: &[SplitIdAndFooterOffsets], +) -> Result { + info!(split_offsets = ?PrettySample::new(splits, 5)); + let leaf_search_single_split_futures: Vec<_> = splits + .iter() + .map(|split| { + let index_storage_clone = index_storage.clone(); + let searcher_context_clone = searcher_context.clone(); + async move { + let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore.clone() + .acquire_owned() + .await + .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); + // TODO dedicated counter and timer? + crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); + let timer = crate::SEARCH_METRICS + .leaf_search_split_duration_secs + .start_timer(); + let leaf_search_single_split_res = leaf_list_terms_single_split( + &searcher_context_clone, + request, + index_storage_clone, + split.clone(), + ) + .await; + timer.observe_duration(); + leaf_search_single_split_res.map_err(|err| (split.split_id.clone(), err)) + } + }) + .collect(); + + let split_search_results = futures::future::join_all(leaf_search_single_split_futures).await; + + let (split_search_responses, errors): (Vec, Vec<(String, SearchError)>) = + split_search_results + .into_iter() + .partition_map(|split_search_res| match split_search_res { + Ok(split_search_resp) => Either::Left(split_search_resp), + Err(err) => Either::Right(err), + }); + + let merged_iter = split_search_responses + .into_iter() + .map(|leaf_search_response| leaf_search_response.terms) + .kmerge() + .dedup(); + let terms: Vec> = if let Some(limit) = request.max_hits { + merged_iter.take(limit as usize).collect() + } else { + merged_iter.collect() + }; + + let failed_splits = errors + .into_iter() + .map(|(split_id, err)| SplitSearchError { + split_id, + error: err.to_string(), + retryable_error: true, + }) + .collect(); + let merged_search_response = LeafListTermsResponse { + num_hits: terms.len() as u64, + terms, + num_attempted_splits: splits.len() as u64, + failed_splits, + }; + + Ok(merged_search_response) +} diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index ccd0bf1832f..bdc7536a3b7 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -26,22 +26,17 @@ use itertools::Itertools; use quickwit_common::shared_consts::{DELETION_GRACE_PERIOD, SCROLL_BATCH_LEN}; use quickwit_common::uri::Uri; use quickwit_common::PrettySample; -use quickwit_config::{build_doc_mapper, IndexConfig}; +use quickwit_config::build_doc_mapper; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_doc_mapper::{DocMapper, DYNAMIC_FIELD_NAME}; -use quickwit_metastore::{ - IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsRequestExt, - MetastoreServiceStreamSplitsExt, SplitMetadata, -}; +use quickwit_metastore::{IndexMetadata, ListIndexesMetadataResponseExt, SplitMetadata}; use quickwit_proto::metastore::{ - IndexMetadataRequest, ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, - MetastoreServiceClient, + ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, }; use quickwit_proto::search::{ - FetchDocsRequest, FetchDocsResponse, Hit, LeafHit, LeafListTermsRequest, LeafListTermsResponse, - LeafSearchRequest, LeafSearchResponse, ListTermsRequest, ListTermsResponse, PartialHit, - SearchRequest, SearchResponse, SnippetRequest, SortDatetimeFormat, SortField, SortValue, - SplitIdAndFooterOffsets, + FetchDocsRequest, FetchDocsResponse, Hit, LeafHit, LeafSearchRequest, LeafSearchResponse, + PartialHit, SearchRequest, SearchResponse, SnippetRequest, SortDatetimeFormat, SortField, + SortValue, SplitIdAndFooterOffsets, }; use quickwit_proto::types::{IndexUid, SplitId}; use quickwit_query::query_ast::{ @@ -1170,128 +1165,6 @@ impl<'a, 'b> QueryAstVisitor<'b> for ExtractTimestampRange<'a> { } } -/// Performs a distributed list terms. -/// 1. Sends leaf request over gRPC to multiple leaf nodes. -/// 2. Merges the search results. -/// 3. Builds the response and returns. -/// this is much simpler than `root_search` as it doesn't need to get actual docs. -#[instrument(skip(list_terms_request, cluster_client, metastore))] -pub async fn root_list_terms( - list_terms_request: &ListTermsRequest, - mut metastore: MetastoreServiceClient, - cluster_client: &ClusterClient, -) -> crate::Result { - let start_instant = tokio::time::Instant::now(); - let index_metadata_request = - IndexMetadataRequest::for_index_id(list_terms_request.index_id.clone()); - let index_metadata = metastore - .index_metadata(index_metadata_request) - .await? - .deserialize_index_metadata()?; - let index_uid = index_metadata.index_uid.clone(); - let index_config: IndexConfig = index_metadata.into_index_config(); - - let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) - .map_err(|err| { - SearchError::Internal(format!("failed to build doc mapper. cause: {err}")) - })?; - - let schema = doc_mapper.schema(); - let field = schema.get_field(&list_terms_request.field).map_err(|_| { - SearchError::InvalidQuery(format!( - "failed to list terms in `{}`, field doesn't exist", - list_terms_request.field - )) - })?; - - let field_entry = schema.get_field_entry(field); - if !field_entry.is_indexed() { - return Err(SearchError::InvalidQuery( - "trying to list terms on field which isn't indexed".to_string(), - )); - } - - let mut query = quickwit_metastore::ListSplitsQuery::for_index(index_uid) - .with_split_state(quickwit_metastore::SplitState::Published); - - if let Some(start_ts) = list_terms_request.start_timestamp { - query = query.with_time_range_start_gte(start_ts); - } - - if let Some(end_ts) = list_terms_request.end_timestamp { - query = query.with_time_range_end_lt(end_ts); - } - let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?; - let split_metadatas: Vec = metastore - .clone() - .list_splits(list_splits_request) - .await? - .collect_splits_metadata() - .await?; - - let index_uri = &index_config.index_uri; - - let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); - let assigned_leaf_search_jobs = cluster_client - .search_job_placer - .assign_jobs(jobs, &HashSet::default()) - .await?; - let leaf_search_responses: Vec = - try_join_all(assigned_leaf_search_jobs.map(|(client, client_jobs)| { - cluster_client.leaf_list_terms( - LeafListTermsRequest { - list_terms_request: Some(list_terms_request.clone()), - split_offsets: client_jobs.into_iter().map(|job| job.offsets).collect(), - index_uri: index_uri.to_string(), - }, - client, - ) - })) - .await?; - - let failed_splits: Vec<_> = leaf_search_responses - .iter() - .flat_map(|leaf_search_response| &leaf_search_response.failed_splits) - .collect(); - - if !failed_splits.is_empty() { - error!(failed_splits = ?failed_splits, "leaf search response contains at least one failed split"); - let errors: String = failed_splits - .iter() - .map(|splits| splits.to_string()) - .collect::>() - .join(", "); - return Err(SearchError::Internal(errors)); - } - - // Merging is a cpu-bound task, but probably fast enough to not require - // spawning it on a blocking thread. - let merged_iter = leaf_search_responses - .into_iter() - .map(|leaf_search_response| leaf_search_response.terms) - .kmerge() - .dedup(); - let leaf_list_terms_response: Vec> = if let Some(limit) = list_terms_request.max_hits { - merged_iter.take(limit as usize).collect() - } else { - merged_iter.collect() - }; - - debug!( - leaf_list_terms_response_count = leaf_list_terms_response.len(), - "Merged leaf search response." - ); - - let elapsed = start_instant.elapsed(); - - Ok(ListTermsResponse { - num_hits: leaf_list_terms_response.len() as u64, - terms: leaf_list_terms_response, - elapsed_time_micros: elapsed.as_micros() as u64, - errors: Vec::new(), - }) -} - async fn assign_client_fetch_docs_jobs( partial_hits: &[PartialHit], split_metadatas: &[SplitMetadata], @@ -1424,9 +1297,9 @@ mod tests { use quickwit_common::shared_consts::SCROLL_BATCH_LEN; use quickwit_common::ServiceStream; - use quickwit_config::{DocMapping, IndexingSettings, SearchSettings}; + use quickwit_config::{DocMapping, IndexConfig, IndexingSettings, SearchSettings}; use quickwit_indexing::MockSplitBuilder; - use quickwit_metastore::{IndexMetadata, ListSplitsResponseExt}; + use quickwit_metastore::{IndexMetadata, ListSplitsRequestExt, ListSplitsResponseExt}; use quickwit_proto::metastore::{ListIndexesMetadataResponse, ListSplitsResponse}; use quickwit_proto::search::{ ScrollRequest, SortByValue, SortOrder, SortValue, SplitSearchError, diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index ae9b494f5a3..4d2776d9ac5 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -45,13 +45,11 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use crate::leaf_cache::LeafSearchCache; use crate::list_fields::{leaf_list_fields, root_list_fields}; use crate::list_fields_cache::ListFieldsCache; +use crate::list_terms::{leaf_list_terms, root_list_terms}; use crate::root::fetch_docs_phase; use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset}; use crate::search_stream::{leaf_search_stream, root_search_stream}; -use crate::{ - fetch_docs, leaf_list_terms, leaf_search, root_list_terms, root_search, ClusterClient, - SearchError, -}; +use crate::{fetch_docs, leaf_search, root_search, ClusterClient, SearchError}; #[derive(Clone)] /// The search service implementation. diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index ec559d2a4d5..ef22d7fa014 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -39,6 +39,7 @@ use tantivy::Term; use super::*; use crate::find_trace_ids_collector::Span; +use crate::list_terms::leaf_list_terms; use crate::service::SearcherContext; use crate::single_node_search; @@ -1688,7 +1689,7 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> { { let request = ListTermsRequest { - index_id: test_sandbox.index_uid().index_id().to_string(), + index_id_patterns: vec![test_sandbox.index_uid().index_id().to_string()], field: "title".to_string(), start_key: None, end_key: None, @@ -1709,7 +1710,7 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> { } { let request = ListTermsRequest { - index_id: test_sandbox.index_uid().index_id().to_string(), + index_id_patterns: vec![test_sandbox.index_uid().index_id().to_string()], field: "title".to_string(), start_key: None, end_key: None, @@ -1730,7 +1731,7 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> { } { let request = ListTermsRequest { - index_id: test_sandbox.index_uid().index_id().to_string(), + index_id_patterns: vec![test_sandbox.index_uid().index_id().to_string()], field: "title".to_string(), start_key: Some("casper".as_bytes().to_vec()), end_key: None, @@ -1751,7 +1752,7 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> { } { let request = ListTermsRequest { - index_id: test_sandbox.index_uid().index_id().to_string(), + index_id_patterns: vec![test_sandbox.index_uid().index_id().to_string()], field: "title".to_string(), start_key: None, end_key: Some("casper".as_bytes().to_vec()), diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs index 123e4638b01..97cb206b932 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs @@ -176,7 +176,7 @@ pub fn build_list_field_request_for_es_api( _search_body: FieldCapabilityRequestBody, ) -> Result { Ok(quickwit_proto::search::ListFieldsRequest { - index_ids: index_id_patterns, + index_id_patterns, fields: search_params.fields.unwrap_or_default(), }) } diff --git a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs index 3c130be84ce..67aa00370be 100644 --- a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs @@ -333,7 +333,7 @@ mod tests { mock_search_service .expect_root_list_terms() .withf(|req| { - req.index_id == OTEL_TRACES_INDEX_ID + req.index_id_patterns == vec![OTEL_TRACES_INDEX_ID] && req.field == "service_name" && req.start_timestamp.is_some() }) @@ -370,7 +370,7 @@ mod tests { mock_search_service .expect_root_list_terms() .withf(|req| { - req.index_id == OTEL_TRACES_INDEX_ID + req.index_id_patterns == vec![OTEL_TRACES_INDEX_ID] && req.field == "span_fingerprint" && req.start_timestamp.is_some() }) diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml index 1ed8000f83f..13774dc427b 100644 --- a/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml @@ -243,10 +243,11 @@ method: [GET] engines: - quickwit - elasticsearch -endpoint: fieldcaps/_field_caps?fields=host +endpoint: fieldcaps*/_field_caps?fields=host expected: indices: - fieldcaps + - fieldcaps-2 fields: host: ip: @@ -276,18 +277,29 @@ expected: method: [GET] engines: - quickwit - - elasticsearch -endpoint: fieldca*/_field_caps?fields=date +endpoint: fieldca*/_field_caps?fields=tags* expected: indices: - fieldcaps + - fieldcaps-2 fields: - date: - date_nanos: - type: date_nanos + tags: + keyword: + type: keyword metadata_field: false searchable: true aggregatable: true + indices: + - fieldcaps + - fieldcaps-2 + tags-2: + keyword: + type: keyword + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps-2 --- # Wildcard on index name + Wildcard without match method: [GET] @@ -298,6 +310,7 @@ endpoint: fieldca*,blub*/_field_caps?fields=date expected: indices: - fieldcaps + - fieldcaps-2 fields: date: date_nanos: diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.elasticsearch.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.elasticsearch.yaml index 200e60cb3c0..2e2ee5f346c 100644 --- a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.elasticsearch.yaml +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.elasticsearch.yaml @@ -3,6 +3,10 @@ method: DELETE endpoint: fieldcaps status_code: null --- +method: DELETE +endpoint: fieldcaps-2 +status_code: null +--- # Create index 1 method: PUT endpoint: fieldcaps @@ -20,6 +24,23 @@ json: { } } --- +# Create index 2 +method: PUT +endpoint: fieldcaps-2 +json: { + "mappings": { + "properties": { + "host": { + "type": "ip", + "store": true + }, + "date": { + "type": "date_nanos" + }, + } + } +} +--- # Ingest documents in fieldcaps method: POST endpoint: _bulk @@ -31,4 +52,5 @@ ndjson: - {"name": "Fritz", "response": 30, "id": 5, "host": "192.168.0.1", "tags": ["nice", "cool"]} - "index": { "_index": "fieldcaps" } - {"nested": {"name": "Fritz", "response": 30}, "date": "2015-01-11T12:10:30Z", "host": "192.168.0.11", "tags": ["nice"]} - + - "index": { "_index": "fieldcaps-2" } + - {"name": "Fritz", "response": 30, "id": 6, "host": "192.168.0.1", "tags": ["nice", "cool"], "tags-2": ["awesome"]} diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml index 0af4612d051..ecdcd2944dd 100644 --- a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml @@ -4,6 +4,11 @@ api_root: http://localhost:7280/api/v1/ endpoint: indexes/fieldcaps status_code: null --- +method: DELETE +api_root: http://localhost:7280/api/v1/ +endpoint: indexes/fieldcaps-2 +status_code: null +--- # Create index method: POST api_root: http://localhost:7280/api/v1/ @@ -27,6 +32,29 @@ json: type: ip fast: true --- +# Create index +method: POST +api_root: http://localhost:7280/api/v1/ +endpoint: indexes/ +json: + version: "0.6" + index_id: fieldcaps-2 + doc_mapping: + mode: dynamic + dynamic_mapping: + tokenizer: default + fast: true + field_mappings: + - name: date + type: datetime + input_formats: + - rfc3339 + fast_precision: seconds + fast: true + - name: host + type: ip + fast: true +--- # Ingest documents method: POST api_root: http://localhost:7280/api/v1/ @@ -56,4 +84,12 @@ params: ndjson: - {"mixed": 5} # inter split mixed type - {"mixed": -5.5} - +--- +# Ingest documents in index 2 +method: POST +api_root: http://localhost:7280/api/v1/ +endpoint: fieldcaps-2/ingest +params: + commit: force +ndjson: + - {"name": "Fritz", "response": 30, "id": 6, "host": "192.168.0.1", "tags": ["nice", "cool"], "tags-2": ["awesome"]} diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.elasticsearch.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.elasticsearch.yaml index 9d3c1723eed..bf5ea75fe8c 100644 --- a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.elasticsearch.yaml +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.elasticsearch.yaml @@ -1,3 +1,6 @@ # # Delete index method: DELETE endpoint: fieldcaps +--- +method: DELETE +endpoint: fieldcaps-2 diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.quickwit.yaml index 66330c067ad..4100c9de88b 100644 --- a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.quickwit.yaml @@ -2,3 +2,7 @@ method: DELETE api_root: http://localhost:7280/api/v1/ endpoint: indexes/fieldcaps +--- +method: DELETE +api_root: http://localhost:7280/api/v1/ +endpoint: indexes/fieldcaps-2