diff --git a/quickwit/quickwit-jaeger/src/lib.rs b/quickwit/quickwit-jaeger/src/lib.rs
index 0aa4b418833..eb67a024a5d 100644
--- a/quickwit/quickwit-jaeger/src/lib.rs
+++ b/quickwit/quickwit-jaeger/src/lib.rs
@@ -103,7 +103,7 @@ impl JaegerService {
Some(OffsetDateTime::now_utc().unix_timestamp() - self.lookback_period_secs);
let search_request = ListTermsRequest {
- index_id,
+ index_id_patterns: vec![index_id],
field: "service_name".to_string(),
max_hits,
start_timestamp,
@@ -140,7 +140,7 @@ impl JaegerService {
let end_key = SpanFingerprint::end_key(&request.service, span_kind_opt);
let search_request = ListTermsRequest {
- index_id,
+ index_id_patterns: vec![index_id],
field: "span_fingerprint".to_string(),
max_hits,
start_timestamp,
@@ -2379,7 +2379,7 @@ mod tests {
service
.expect_root_list_terms()
.withf(|req| {
- req.index_id == OTEL_TRACES_INDEX_ID
+ req.index_id_patterns == vec![OTEL_TRACES_INDEX_ID]
&& req.field == "service_name"
&& req.start_timestamp.is_some()
})
diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto
index 7f5b05231bc..ecf7fd05089 100644
--- a/quickwit/quickwit-proto/protos/quickwit/search.proto
+++ b/quickwit/quickwit-proto/protos/quickwit/search.proto
@@ -118,8 +118,8 @@ message ReportSplitsResponse {}
// -- ListFields -------------------
message ListFieldsRequest {
- // Optional limit query to a set of indexes.
- repeated string index_ids = 1;
+ // Index ID patterns
+ repeated string index_id_patterns = 1;
// Optional limit query to a list of fields
// Wildcard expressions are supported.
repeated string fields = 2;
@@ -465,8 +465,8 @@ message FetchDocsResponse {
}
message ListTermsRequest {
- // Index ID
- string index_id = 1;
+ // Index ID patterns
+ repeated string index_id_patterns = 1;
// Field to search on
string field = 3;
diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs
index 2c55a308fa5..ee4b50643e9 100644
--- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs
+++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs
@@ -64,9 +64,9 @@ pub struct ReportSplitsResponse {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListFieldsRequest {
- /// Optional limit query to a set of indexes.
+ /// Index ID patterns
#[prost(string, repeated, tag = "1")]
- pub index_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+ pub index_id_patterns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
/// Optional limit query to a list of fields
/// Wildcard expressions are supported.
#[prost(string, repeated, tag = "2")]
@@ -448,9 +448,9 @@ pub struct FetchDocsResponse {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListTermsRequest {
- /// Index ID
- #[prost(string, tag = "1")]
- pub index_id: ::prost::alloc::string::String,
+ /// Index ID patterns
+ #[prost(string, repeated, tag = "1")]
+ pub index_id_patterns: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
/// Field to search on
#[prost(string, tag = "3")]
pub field: ::prost::alloc::string::String,
diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs
index 3e371059629..d4b7f68482a 100644
--- a/quickwit/quickwit-search/src/leaf.rs
+++ b/quickwit/quickwit-search/src/leaf.rs
@@ -18,19 +18,17 @@
// along with this program. If not, see .
use std::collections::{HashMap, HashSet};
-use std::ops::Bound;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use anyhow::Context;
use futures::future::try_join_all;
-use itertools::{Either, Itertools};
use quickwit_common::PrettySample;
use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory};
use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo};
use quickwit_proto::search::{
- CountHits, LeafListTermsResponse, LeafSearchResponse, ListTermsRequest, PartialHit,
- SearchRequest, SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError,
+ CountHits, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, SortValue,
+ SplitIdAndFooterOffsets, SplitSearchError,
};
use quickwit_query::query_ast::QueryAst;
use quickwit_query::tokenizers::TokenizerManager;
@@ -39,7 +37,7 @@ use quickwit_storage::{
};
use tantivy::directory::FileSlice;
use tantivy::fastfield::FastFieldReaders;
-use tantivy::schema::{Field, FieldType};
+use tantivy::schema::Field;
use tantivy::{Index, ReloadPolicy, Searcher, Term};
use tracing::*;
@@ -697,187 +695,3 @@ async fn leaf_search_single_split_wrapper(
.record_new_worst_hit(last_hit.as_ref());
}
}
-
-/// Apply a leaf list terms on a single split.
-#[instrument(skip_all, fields(split_id = split.split_id))]
-async fn leaf_list_terms_single_split(
- searcher_context: &SearcherContext,
- search_request: &ListTermsRequest,
- storage: Arc,
- split: SplitIdAndFooterOffsets,
-) -> crate::Result {
- let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?;
- let split_schema = index.schema();
- let reader = index
- .reader_builder()
- .reload_policy(ReloadPolicy::Manual)
- .try_into()?;
- let searcher = reader.searcher();
-
- let field = split_schema
- .get_field(&search_request.field)
- .with_context(|| {
- format!(
- "couldn't get field named {:?} from schema to list terms",
- search_request.field
- )
- })?;
-
- let field_type = split_schema.get_field_entry(field).field_type();
- let start_term: Option = search_request
- .start_key
- .as_ref()
- .map(|data| term_from_data(field, field_type, data));
- let end_term: Option = search_request
- .end_key
- .as_ref()
- .map(|data| term_from_data(field, field_type, data));
-
- let mut segment_results = Vec::new();
- for segment_reader in searcher.segment_readers() {
- let inverted_index = segment_reader.inverted_index(field)?.clone();
- let dict = inverted_index.terms();
- dict.file_slice_for_range(
- (
- start_term
- .as_ref()
- .map(Term::serialized_value_bytes)
- .map(Bound::Included)
- .unwrap_or(Bound::Unbounded),
- end_term
- .as_ref()
- .map(Term::serialized_value_bytes)
- .map(Bound::Excluded)
- .unwrap_or(Bound::Unbounded),
- ),
- search_request.max_hits,
- )
- .read_bytes_async()
- .await
- .with_context(|| "failed to load sstable range")?;
-
- let mut range = dict.range();
- if let Some(limit) = search_request.max_hits {
- range = range.limit(limit);
- }
- if let Some(start_term) = &start_term {
- range = range.ge(start_term.serialized_value_bytes())
- }
- if let Some(end_term) = &end_term {
- range = range.lt(end_term.serialized_value_bytes())
- }
- let mut stream = range
- .into_stream()
- .with_context(|| "failed to create stream over sstable")?;
- let mut segment_result: Vec> =
- Vec::with_capacity(search_request.max_hits.unwrap_or(0) as usize);
- while stream.advance() {
- segment_result.push(term_to_data(field, field_type, stream.key()));
- }
- segment_results.push(segment_result);
- }
-
- let merged_iter = segment_results.into_iter().kmerge().dedup();
- let merged_results: Vec> = if let Some(limit) = search_request.max_hits {
- merged_iter.take(limit as usize).collect()
- } else {
- merged_iter.collect()
- };
-
- Ok(LeafListTermsResponse {
- num_hits: merged_results.len() as u64,
- terms: merged_results,
- num_attempted_splits: 1,
- failed_splits: Vec::new(),
- })
-}
-
-fn term_from_data(field: Field, field_type: &FieldType, data: &[u8]) -> Term {
- let mut term = Term::from_field_bool(field, false);
- term.clear_with_type(field_type.value_type());
- term.append_bytes(data);
- term
-}
-
-fn term_to_data(field: Field, field_type: &FieldType, field_value: &[u8]) -> Vec {
- let mut term = Term::from_field_bool(field, false);
- term.clear_with_type(field_type.value_type());
- term.append_bytes(field_value);
- term.serialized_term().to_vec()
-}
-
-/// `leaf` step of list terms.
-#[instrument(skip_all, fields(index = ?request.index_id))]
-pub async fn leaf_list_terms(
- searcher_context: Arc,
- request: &ListTermsRequest,
- index_storage: Arc,
- splits: &[SplitIdAndFooterOffsets],
-) -> Result {
- info!(split_offsets = ?PrettySample::new(splits, 5));
- let leaf_search_single_split_futures: Vec<_> = splits
- .iter()
- .map(|split| {
- let index_storage_clone = index_storage.clone();
- let searcher_context_clone = searcher_context.clone();
- async move {
- let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore.clone()
- .acquire_owned()
- .await
- .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
- // TODO dedicated counter and timer?
- crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
- let timer = crate::SEARCH_METRICS
- .leaf_search_split_duration_secs
- .start_timer();
- let leaf_search_single_split_res = leaf_list_terms_single_split(
- &searcher_context_clone,
- request,
- index_storage_clone,
- split.clone(),
- )
- .await;
- timer.observe_duration();
- leaf_search_single_split_res.map_err(|err| (split.split_id.clone(), err))
- }
- })
- .collect();
-
- let split_search_results = futures::future::join_all(leaf_search_single_split_futures).await;
-
- let (split_search_responses, errors): (Vec, Vec<(String, SearchError)>) =
- split_search_results
- .into_iter()
- .partition_map(|split_search_res| match split_search_res {
- Ok(split_search_resp) => Either::Left(split_search_resp),
- Err(err) => Either::Right(err),
- });
-
- let merged_iter = split_search_responses
- .into_iter()
- .map(|leaf_search_response| leaf_search_response.terms)
- .kmerge()
- .dedup();
- let terms: Vec> = if let Some(limit) = request.max_hits {
- merged_iter.take(limit as usize).collect()
- } else {
- merged_iter.collect()
- };
-
- let failed_splits = errors
- .into_iter()
- .map(|(split_id, err)| SplitSearchError {
- split_id,
- error: err.to_string(),
- retryable_error: true,
- })
- .collect();
- let merged_search_response = LeafListTermsResponse {
- num_hits: terms.len() as u64,
- terms,
- num_attempted_splits: splits.len() as u64,
- failed_splits,
- };
-
- Ok(merged_search_response)
-}
diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs
index 1223dbed578..54d606858bf 100644
--- a/quickwit/quickwit-search/src/lib.rs
+++ b/quickwit/quickwit-search/src/lib.rs
@@ -33,6 +33,7 @@ mod leaf;
mod leaf_cache;
mod list_fields;
mod list_fields_cache;
+mod list_terms;
mod retry;
mod root;
mod scroll_context;
@@ -79,10 +80,8 @@ pub use crate::client::{
pub use crate::cluster_client::ClusterClient;
pub use crate::error::{parse_grpc_error, SearchError};
use crate::fetch_docs::fetch_docs;
-use crate::leaf::{leaf_list_terms, leaf_search};
-pub use crate::root::{
- jobs_to_leaf_requests, root_list_terms, root_search, IndexMetasForLeafSearch, SearchJob,
-};
+use crate::leaf::leaf_search;
+pub use crate::root::{jobs_to_leaf_requests, root_search, IndexMetasForLeafSearch, SearchJob};
pub use crate::search_job_placer::{Job, SearchJobPlacer};
pub use crate::search_response_rest::SearchResponseRest;
pub use crate::search_stream::root_search_stream;
diff --git a/quickwit/quickwit-search/src/list_fields.rs b/quickwit/quickwit-search/src/list_fields.rs
index dc3a4a8cb7a..06d4c8784ca 100644
--- a/quickwit/quickwit-search/src/list_fields.rs
+++ b/quickwit/quickwit-search/src/list_fields.rs
@@ -229,7 +229,8 @@ fn matches_pattern(field_pattern: &str, field_name: &str) -> bool {
}
}
}
-///
+
+/// `leaf` step of list fields.
pub async fn leaf_list_fields(
index_id: String,
index_storage: Arc,
@@ -289,11 +290,11 @@ pub async fn root_list_fields(
cluster_client: &ClusterClient,
mut metastore: MetastoreServiceClient,
) -> crate::Result {
- let list_indexes_metadata_request = if list_fields_req.index_ids.is_empty() {
+ let list_indexes_metadata_request = if list_fields_req.index_id_patterns.is_empty() {
ListIndexesMetadataRequest::all()
} else {
ListIndexesMetadataRequest {
- index_id_patterns: list_fields_req.index_ids.clone(),
+ index_id_patterns: list_fields_req.index_id_patterns.clone(),
}
};
@@ -303,7 +304,10 @@ pub async fn root_list_fields(
.list_indexes_metadata(list_indexes_metadata_request)
.await?
.deserialize_indexes_metadata()?;
- check_all_index_metadata_found(&indexes_metadata[..], &list_fields_req.index_ids[..])?;
+ check_all_index_metadata_found(
+ &indexes_metadata[..],
+ &list_fields_req.index_id_patterns[..],
+ )?;
// The request contains a wildcard, but couldn't find any index.
if indexes_metadata.is_empty() {
return Ok(ListFieldsResponse { fields: vec![] });
@@ -326,7 +330,6 @@ pub async fn root_list_fields(
.into_iter()
.map(|index_metadata| index_metadata.index_uid)
.collect();
-
let split_metadatas: Vec =
list_relevant_splits(index_uids, None, None, None, &mut metastore).await?;
diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs
new file mode 100644
index 00000000000..0b610da1563
--- /dev/null
+++ b/quickwit/quickwit-search/src/list_terms.rs
@@ -0,0 +1,407 @@
+// Copyright (C) 2023 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use std::collections::{HashMap, HashSet};
+use std::ops::Bound;
+use std::sync::Arc;
+
+use anyhow::Context;
+use futures::future::try_join_all;
+use itertools::{Either, Itertools};
+use quickwit_common::PrettySample;
+use quickwit_config::build_doc_mapper;
+use quickwit_metastore::{
+ ListIndexesMetadataResponseExt, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt,
+ SplitMetadata,
+};
+use quickwit_proto::metastore::{
+ ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, MetastoreServiceClient,
+};
+use quickwit_proto::search::{
+ LeafListTermsRequest, LeafListTermsResponse, ListTermsRequest, ListTermsResponse,
+ SplitIdAndFooterOffsets, SplitSearchError,
+};
+use quickwit_proto::types::IndexUid;
+use quickwit_storage::Storage;
+use tantivy::schema::{Field, FieldType};
+use tantivy::{ReloadPolicy, Term};
+use tracing::{debug, error, info, instrument};
+
+use crate::leaf::open_index_with_caches;
+use crate::root::check_all_index_metadata_found;
+use crate::{ClusterClient, SearchError, SearchJob, SearcherContext};
+
+/// Performs a distributed list terms.
+/// 1. Sends leaf request over gRPC to multiple leaf nodes.
+/// 2. Merges the search results.
+/// 3. Builds the response and returns.
+/// this is much simpler than `root_search` as it doesn't need to get actual docs.
+#[instrument(skip(list_terms_request, cluster_client, metastore))]
+pub async fn root_list_terms(
+ list_terms_request: &ListTermsRequest,
+ mut metastore: MetastoreServiceClient,
+ cluster_client: &ClusterClient,
+) -> crate::Result {
+ let start_instant = tokio::time::Instant::now();
+ let list_indexes_metadata_request = if list_terms_request.index_id_patterns.is_empty() {
+ ListIndexesMetadataRequest::all()
+ } else {
+ ListIndexesMetadataRequest {
+ index_id_patterns: list_terms_request.index_id_patterns.clone(),
+ }
+ };
+
+ // Get the index ids from the request
+ let indexes_metadata = metastore
+ .list_indexes_metadata(list_indexes_metadata_request)
+ .await?
+ .deserialize_indexes_metadata()?;
+ check_all_index_metadata_found(
+ &indexes_metadata[..],
+ &list_terms_request.index_id_patterns[..],
+ )?;
+ // The request contains a wildcard, but couldn't find any index.
+ if indexes_metadata.is_empty() {
+ return Ok(ListTermsResponse {
+ num_hits: 0,
+ terms: Vec::new(),
+ elapsed_time_micros: 0,
+ errors: Vec::new(),
+ });
+ }
+
+ for index_metadata in indexes_metadata.iter() {
+ let index_config = &index_metadata.index_config;
+ let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)
+ .map_err(|err| {
+ SearchError::Internal(format!("failed to build doc mapper. cause: {err}"))
+ })?;
+ let schema = doc_mapper.schema();
+ let field = schema.get_field(&list_terms_request.field).map_err(|_| {
+ SearchError::InvalidQuery(format!(
+ "failed to list terms in `{}`, field doesn't exist",
+ list_terms_request.field
+ ))
+ })?;
+ let field_entry = schema.get_field_entry(field);
+ if !field_entry.is_indexed() {
+ return Err(SearchError::InvalidQuery(
+ "trying to list terms on field which isn't indexed".to_string(),
+ ));
+ }
+ }
+ let index_uids: Vec = indexes_metadata
+ .iter()
+ .map(|index_metadata| index_metadata.index_uid.clone())
+ .collect();
+ let mut query = quickwit_metastore::ListSplitsQuery::try_from_index_uids(index_uids)?
+ .with_split_state(quickwit_metastore::SplitState::Published);
+
+ if let Some(start_ts) = list_terms_request.start_timestamp {
+ query = query.with_time_range_start_gte(start_ts);
+ }
+
+ if let Some(end_ts) = list_terms_request.end_timestamp {
+ query = query.with_time_range_end_lt(end_ts);
+ }
+ let index_uid_to_index_uri: HashMap = indexes_metadata
+ .iter()
+ .map(|index_metadata| {
+ (
+ index_metadata.index_uid.clone(),
+ index_metadata.index_uri().to_string(),
+ )
+ })
+ .collect();
+ let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?;
+ let split_metadatas: Vec = metastore
+ .clone()
+ .list_splits(list_splits_request)
+ .await?
+ .collect_splits_metadata()
+ .await?;
+
+ let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect();
+ let assigned_leaf_search_jobs = cluster_client
+ .search_job_placer
+ .assign_jobs(jobs, &HashSet::default())
+ .await?;
+ let mut leaf_request_tasks = Vec::new();
+ // For each node, forward to a node with an affinity for that index id.
+ for (client, client_jobs) in assigned_leaf_search_jobs {
+ let leaf_requests =
+ jobs_to_leaf_requests(list_terms_request, &index_uid_to_index_uri, client_jobs)?;
+ for leaf_request in leaf_requests {
+ leaf_request_tasks.push(cluster_client.leaf_list_terms(leaf_request, client.clone()));
+ }
+ }
+ let leaf_search_responses: Vec =
+ try_join_all(leaf_request_tasks).await?;
+
+ let failed_splits: Vec<_> = leaf_search_responses
+ .iter()
+ .flat_map(|leaf_search_response| &leaf_search_response.failed_splits)
+ .collect();
+
+ if !failed_splits.is_empty() {
+ error!(failed_splits = ?failed_splits, "leaf search response contains at least one failed split");
+ let errors: String = failed_splits
+ .iter()
+ .map(|splits| splits.to_string())
+ .collect::>()
+ .join(", ");
+ return Err(SearchError::Internal(errors));
+ }
+
+ // Merging is a cpu-bound task, but probably fast enough to not require
+ // spawning it on a blocking thread.
+ let merged_iter = leaf_search_responses
+ .into_iter()
+ .map(|leaf_search_response| leaf_search_response.terms)
+ .kmerge()
+ .dedup();
+ let leaf_list_terms_response: Vec> = if let Some(limit) = list_terms_request.max_hits {
+ merged_iter.take(limit as usize).collect()
+ } else {
+ merged_iter.collect()
+ };
+
+ debug!(
+ leaf_list_terms_response_count = leaf_list_terms_response.len(),
+ "Merged leaf search response."
+ );
+
+ let elapsed = start_instant.elapsed();
+
+ Ok(ListTermsResponse {
+ num_hits: leaf_list_terms_response.len() as u64,
+ terms: leaf_list_terms_response,
+ elapsed_time_micros: elapsed.as_micros() as u64,
+ errors: Vec::new(),
+ })
+}
+
+/// Builds a list of [`LeafListFieldsRequest`], one per index, from a list of [`SearchJob`].
+pub fn jobs_to_leaf_requests(
+ request: &ListTermsRequest,
+ index_uid_to_uri: &HashMap,
+ jobs: Vec,
+) -> crate::Result> {
+ let search_request_for_leaf = request.clone();
+ let mut leaf_search_requests = Vec::new();
+ // Group jobs by index uid.
+ for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) {
+ let index_uri = index_uid_to_uri.get(&index_uid).ok_or_else(|| {
+ SearchError::Internal(format!(
+ "received list fields job for an unknown index {index_uid}. it should never happen"
+ ))
+ })?;
+ let leaf_search_request = LeafListTermsRequest {
+ list_terms_request: Some(search_request_for_leaf.clone()),
+ index_uri: index_uri.to_string(),
+ split_offsets: job_group.into_iter().map(|job| job.offsets).collect(),
+ };
+ leaf_search_requests.push(leaf_search_request);
+ }
+ Ok(leaf_search_requests)
+}
+
+/// Apply a leaf list terms on a single split.
+#[instrument(skip_all, fields(split_id = split.split_id))]
+async fn leaf_list_terms_single_split(
+ searcher_context: &SearcherContext,
+ search_request: &ListTermsRequest,
+ storage: Arc,
+ split: SplitIdAndFooterOffsets,
+) -> crate::Result {
+ let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?;
+ let split_schema = index.schema();
+ let reader = index
+ .reader_builder()
+ .reload_policy(ReloadPolicy::Manual)
+ .try_into()?;
+ let searcher = reader.searcher();
+
+ let field = split_schema
+ .get_field(&search_request.field)
+ .with_context(|| {
+ format!(
+ "couldn't get field named {:?} from schema to list terms",
+ search_request.field
+ )
+ })?;
+
+ let field_type = split_schema.get_field_entry(field).field_type();
+ let start_term: Option = search_request
+ .start_key
+ .as_ref()
+ .map(|data| term_from_data(field, field_type, data));
+ let end_term: Option = search_request
+ .end_key
+ .as_ref()
+ .map(|data| term_from_data(field, field_type, data));
+
+ let mut segment_results = Vec::new();
+ for segment_reader in searcher.segment_readers() {
+ let inverted_index = segment_reader.inverted_index(field)?.clone();
+ let dict = inverted_index.terms();
+ dict.file_slice_for_range(
+ (
+ start_term
+ .as_ref()
+ .map(Term::serialized_value_bytes)
+ .map(Bound::Included)
+ .unwrap_or(Bound::Unbounded),
+ end_term
+ .as_ref()
+ .map(Term::serialized_value_bytes)
+ .map(Bound::Excluded)
+ .unwrap_or(Bound::Unbounded),
+ ),
+ search_request.max_hits,
+ )
+ .read_bytes_async()
+ .await
+ .with_context(|| "failed to load sstable range")?;
+
+ let mut range = dict.range();
+ if let Some(limit) = search_request.max_hits {
+ range = range.limit(limit);
+ }
+ if let Some(start_term) = &start_term {
+ range = range.ge(start_term.serialized_value_bytes())
+ }
+ if let Some(end_term) = &end_term {
+ range = range.lt(end_term.serialized_value_bytes())
+ }
+ let mut stream = range
+ .into_stream()
+ .with_context(|| "failed to create stream over sstable")?;
+ let mut segment_result: Vec> =
+ Vec::with_capacity(search_request.max_hits.unwrap_or(0) as usize);
+ while stream.advance() {
+ segment_result.push(term_to_data(field, field_type, stream.key()));
+ }
+ segment_results.push(segment_result);
+ }
+
+ let merged_iter = segment_results.into_iter().kmerge().dedup();
+ let merged_results: Vec> = if let Some(limit) = search_request.max_hits {
+ merged_iter.take(limit as usize).collect()
+ } else {
+ merged_iter.collect()
+ };
+
+ Ok(LeafListTermsResponse {
+ num_hits: merged_results.len() as u64,
+ terms: merged_results,
+ num_attempted_splits: 1,
+ failed_splits: Vec::new(),
+ })
+}
+
+fn term_from_data(field: Field, field_type: &FieldType, data: &[u8]) -> Term {
+ let mut term = Term::from_field_bool(field, false);
+ term.clear_with_type(field_type.value_type());
+ term.append_bytes(data);
+ term
+}
+
+fn term_to_data(field: Field, field_type: &FieldType, field_value: &[u8]) -> Vec {
+ let mut term = Term::from_field_bool(field, false);
+ term.clear_with_type(field_type.value_type());
+ term.append_bytes(field_value);
+ term.serialized_term().to_vec()
+}
+
+/// `leaf` step of list terms.
+#[instrument(skip_all)]
+pub async fn leaf_list_terms(
+ searcher_context: Arc,
+ request: &ListTermsRequest,
+ index_storage: Arc,
+ splits: &[SplitIdAndFooterOffsets],
+) -> Result {
+ info!(split_offsets = ?PrettySample::new(splits, 5));
+ let leaf_search_single_split_futures: Vec<_> = splits
+ .iter()
+ .map(|split| {
+ let index_storage_clone = index_storage.clone();
+ let searcher_context_clone = searcher_context.clone();
+ async move {
+ let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore.clone()
+ .acquire_owned()
+ .await
+ .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
+ // TODO dedicated counter and timer?
+ crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
+ let timer = crate::SEARCH_METRICS
+ .leaf_search_split_duration_secs
+ .start_timer();
+ let leaf_search_single_split_res = leaf_list_terms_single_split(
+ &searcher_context_clone,
+ request,
+ index_storage_clone,
+ split.clone(),
+ )
+ .await;
+ timer.observe_duration();
+ leaf_search_single_split_res.map_err(|err| (split.split_id.clone(), err))
+ }
+ })
+ .collect();
+
+ let split_search_results = futures::future::join_all(leaf_search_single_split_futures).await;
+
+ let (split_search_responses, errors): (Vec, Vec<(String, SearchError)>) =
+ split_search_results
+ .into_iter()
+ .partition_map(|split_search_res| match split_search_res {
+ Ok(split_search_resp) => Either::Left(split_search_resp),
+ Err(err) => Either::Right(err),
+ });
+
+ let merged_iter = split_search_responses
+ .into_iter()
+ .map(|leaf_search_response| leaf_search_response.terms)
+ .kmerge()
+ .dedup();
+ let terms: Vec> = if let Some(limit) = request.max_hits {
+ merged_iter.take(limit as usize).collect()
+ } else {
+ merged_iter.collect()
+ };
+
+ let failed_splits = errors
+ .into_iter()
+ .map(|(split_id, err)| SplitSearchError {
+ split_id,
+ error: err.to_string(),
+ retryable_error: true,
+ })
+ .collect();
+ let merged_search_response = LeafListTermsResponse {
+ num_hits: terms.len() as u64,
+ terms,
+ num_attempted_splits: splits.len() as u64,
+ failed_splits,
+ };
+
+ Ok(merged_search_response)
+}
diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs
index ccd0bf1832f..bdc7536a3b7 100644
--- a/quickwit/quickwit-search/src/root.rs
+++ b/quickwit/quickwit-search/src/root.rs
@@ -26,22 +26,17 @@ use itertools::Itertools;
use quickwit_common::shared_consts::{DELETION_GRACE_PERIOD, SCROLL_BATCH_LEN};
use quickwit_common::uri::Uri;
use quickwit_common::PrettySample;
-use quickwit_config::{build_doc_mapper, IndexConfig};
+use quickwit_config::build_doc_mapper;
use quickwit_doc_mapper::tag_pruning::extract_tags_from_query;
use quickwit_doc_mapper::{DocMapper, DYNAMIC_FIELD_NAME};
-use quickwit_metastore::{
- IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsRequestExt,
- MetastoreServiceStreamSplitsExt, SplitMetadata,
-};
+use quickwit_metastore::{IndexMetadata, ListIndexesMetadataResponseExt, SplitMetadata};
use quickwit_proto::metastore::{
- IndexMetadataRequest, ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService,
- MetastoreServiceClient,
+ ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::search::{
- FetchDocsRequest, FetchDocsResponse, Hit, LeafHit, LeafListTermsRequest, LeafListTermsResponse,
- LeafSearchRequest, LeafSearchResponse, ListTermsRequest, ListTermsResponse, PartialHit,
- SearchRequest, SearchResponse, SnippetRequest, SortDatetimeFormat, SortField, SortValue,
- SplitIdAndFooterOffsets,
+ FetchDocsRequest, FetchDocsResponse, Hit, LeafHit, LeafSearchRequest, LeafSearchResponse,
+ PartialHit, SearchRequest, SearchResponse, SnippetRequest, SortDatetimeFormat, SortField,
+ SortValue, SplitIdAndFooterOffsets,
};
use quickwit_proto::types::{IndexUid, SplitId};
use quickwit_query::query_ast::{
@@ -1170,128 +1165,6 @@ impl<'a, 'b> QueryAstVisitor<'b> for ExtractTimestampRange<'a> {
}
}
-/// Performs a distributed list terms.
-/// 1. Sends leaf request over gRPC to multiple leaf nodes.
-/// 2. Merges the search results.
-/// 3. Builds the response and returns.
-/// this is much simpler than `root_search` as it doesn't need to get actual docs.
-#[instrument(skip(list_terms_request, cluster_client, metastore))]
-pub async fn root_list_terms(
- list_terms_request: &ListTermsRequest,
- mut metastore: MetastoreServiceClient,
- cluster_client: &ClusterClient,
-) -> crate::Result {
- let start_instant = tokio::time::Instant::now();
- let index_metadata_request =
- IndexMetadataRequest::for_index_id(list_terms_request.index_id.clone());
- let index_metadata = metastore
- .index_metadata(index_metadata_request)
- .await?
- .deserialize_index_metadata()?;
- let index_uid = index_metadata.index_uid.clone();
- let index_config: IndexConfig = index_metadata.into_index_config();
-
- let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)
- .map_err(|err| {
- SearchError::Internal(format!("failed to build doc mapper. cause: {err}"))
- })?;
-
- let schema = doc_mapper.schema();
- let field = schema.get_field(&list_terms_request.field).map_err(|_| {
- SearchError::InvalidQuery(format!(
- "failed to list terms in `{}`, field doesn't exist",
- list_terms_request.field
- ))
- })?;
-
- let field_entry = schema.get_field_entry(field);
- if !field_entry.is_indexed() {
- return Err(SearchError::InvalidQuery(
- "trying to list terms on field which isn't indexed".to_string(),
- ));
- }
-
- let mut query = quickwit_metastore::ListSplitsQuery::for_index(index_uid)
- .with_split_state(quickwit_metastore::SplitState::Published);
-
- if let Some(start_ts) = list_terms_request.start_timestamp {
- query = query.with_time_range_start_gte(start_ts);
- }
-
- if let Some(end_ts) = list_terms_request.end_timestamp {
- query = query.with_time_range_end_lt(end_ts);
- }
- let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?;
- let split_metadatas: Vec = metastore
- .clone()
- .list_splits(list_splits_request)
- .await?
- .collect_splits_metadata()
- .await?;
-
- let index_uri = &index_config.index_uri;
-
- let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect();
- let assigned_leaf_search_jobs = cluster_client
- .search_job_placer
- .assign_jobs(jobs, &HashSet::default())
- .await?;
- let leaf_search_responses: Vec =
- try_join_all(assigned_leaf_search_jobs.map(|(client, client_jobs)| {
- cluster_client.leaf_list_terms(
- LeafListTermsRequest {
- list_terms_request: Some(list_terms_request.clone()),
- split_offsets: client_jobs.into_iter().map(|job| job.offsets).collect(),
- index_uri: index_uri.to_string(),
- },
- client,
- )
- }))
- .await?;
-
- let failed_splits: Vec<_> = leaf_search_responses
- .iter()
- .flat_map(|leaf_search_response| &leaf_search_response.failed_splits)
- .collect();
-
- if !failed_splits.is_empty() {
- error!(failed_splits = ?failed_splits, "leaf search response contains at least one failed split");
- let errors: String = failed_splits
- .iter()
- .map(|splits| splits.to_string())
- .collect::>()
- .join(", ");
- return Err(SearchError::Internal(errors));
- }
-
- // Merging is a cpu-bound task, but probably fast enough to not require
- // spawning it on a blocking thread.
- let merged_iter = leaf_search_responses
- .into_iter()
- .map(|leaf_search_response| leaf_search_response.terms)
- .kmerge()
- .dedup();
- let leaf_list_terms_response: Vec> = if let Some(limit) = list_terms_request.max_hits {
- merged_iter.take(limit as usize).collect()
- } else {
- merged_iter.collect()
- };
-
- debug!(
- leaf_list_terms_response_count = leaf_list_terms_response.len(),
- "Merged leaf search response."
- );
-
- let elapsed = start_instant.elapsed();
-
- Ok(ListTermsResponse {
- num_hits: leaf_list_terms_response.len() as u64,
- terms: leaf_list_terms_response,
- elapsed_time_micros: elapsed.as_micros() as u64,
- errors: Vec::new(),
- })
-}
-
async fn assign_client_fetch_docs_jobs(
partial_hits: &[PartialHit],
split_metadatas: &[SplitMetadata],
@@ -1424,9 +1297,9 @@ mod tests {
use quickwit_common::shared_consts::SCROLL_BATCH_LEN;
use quickwit_common::ServiceStream;
- use quickwit_config::{DocMapping, IndexingSettings, SearchSettings};
+ use quickwit_config::{DocMapping, IndexConfig, IndexingSettings, SearchSettings};
use quickwit_indexing::MockSplitBuilder;
- use quickwit_metastore::{IndexMetadata, ListSplitsResponseExt};
+ use quickwit_metastore::{IndexMetadata, ListSplitsRequestExt, ListSplitsResponseExt};
use quickwit_proto::metastore::{ListIndexesMetadataResponse, ListSplitsResponse};
use quickwit_proto::search::{
ScrollRequest, SortByValue, SortOrder, SortValue, SplitSearchError,
diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs
index ae9b494f5a3..4d2776d9ac5 100644
--- a/quickwit/quickwit-search/src/service.rs
+++ b/quickwit/quickwit-search/src/service.rs
@@ -45,13 +45,11 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::leaf_cache::LeafSearchCache;
use crate::list_fields::{leaf_list_fields, root_list_fields};
use crate::list_fields_cache::ListFieldsCache;
+use crate::list_terms::{leaf_list_terms, root_list_terms};
use crate::root::fetch_docs_phase;
use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset};
use crate::search_stream::{leaf_search_stream, root_search_stream};
-use crate::{
- fetch_docs, leaf_list_terms, leaf_search, root_list_terms, root_search, ClusterClient,
- SearchError,
-};
+use crate::{fetch_docs, leaf_search, root_search, ClusterClient, SearchError};
#[derive(Clone)]
/// The search service implementation.
diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs
index ec559d2a4d5..ef22d7fa014 100644
--- a/quickwit/quickwit-search/src/tests.rs
+++ b/quickwit/quickwit-search/src/tests.rs
@@ -39,6 +39,7 @@ use tantivy::Term;
use super::*;
use crate::find_trace_ids_collector::Span;
+use crate::list_terms::leaf_list_terms;
use crate::service::SearcherContext;
use crate::single_node_search;
@@ -1688,7 +1689,7 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> {
{
let request = ListTermsRequest {
- index_id: test_sandbox.index_uid().index_id().to_string(),
+ index_id_patterns: vec![test_sandbox.index_uid().index_id().to_string()],
field: "title".to_string(),
start_key: None,
end_key: None,
@@ -1709,7 +1710,7 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> {
}
{
let request = ListTermsRequest {
- index_id: test_sandbox.index_uid().index_id().to_string(),
+ index_id_patterns: vec![test_sandbox.index_uid().index_id().to_string()],
field: "title".to_string(),
start_key: None,
end_key: None,
@@ -1730,7 +1731,7 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> {
}
{
let request = ListTermsRequest {
- index_id: test_sandbox.index_uid().index_id().to_string(),
+ index_id_patterns: vec![test_sandbox.index_uid().index_id().to_string()],
field: "title".to_string(),
start_key: Some("casper".as_bytes().to_vec()),
end_key: None,
@@ -1751,7 +1752,7 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> {
}
{
let request = ListTermsRequest {
- index_id: test_sandbox.index_uid().index_id().to_string(),
+ index_id_patterns: vec![test_sandbox.index_uid().index_id().to_string()],
field: "title".to_string(),
start_key: None,
end_key: Some("casper".as_bytes().to_vec()),
diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs
index 123e4638b01..97cb206b932 100644
--- a/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs
+++ b/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs
@@ -176,7 +176,7 @@ pub fn build_list_field_request_for_es_api(
_search_body: FieldCapabilityRequestBody,
) -> Result {
Ok(quickwit_proto::search::ListFieldsRequest {
- index_ids: index_id_patterns,
+ index_id_patterns,
fields: search_params.fields.unwrap_or_default(),
})
}
diff --git a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs
index 3c130be84ce..67aa00370be 100644
--- a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs
+++ b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs
@@ -333,7 +333,7 @@ mod tests {
mock_search_service
.expect_root_list_terms()
.withf(|req| {
- req.index_id == OTEL_TRACES_INDEX_ID
+ req.index_id_patterns == vec![OTEL_TRACES_INDEX_ID]
&& req.field == "service_name"
&& req.start_timestamp.is_some()
})
@@ -370,7 +370,7 @@ mod tests {
mock_search_service
.expect_root_list_terms()
.withf(|req| {
- req.index_id == OTEL_TRACES_INDEX_ID
+ req.index_id_patterns == vec![OTEL_TRACES_INDEX_ID]
&& req.field == "span_fingerprint"
&& req.start_timestamp.is_some()
})
diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml
index 1ed8000f83f..13774dc427b 100644
--- a/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml
+++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml
@@ -243,10 +243,11 @@ method: [GET]
engines:
- quickwit
- elasticsearch
-endpoint: fieldcaps/_field_caps?fields=host
+endpoint: fieldcaps*/_field_caps?fields=host
expected:
indices:
- fieldcaps
+ - fieldcaps-2
fields:
host:
ip:
@@ -276,18 +277,29 @@ expected:
method: [GET]
engines:
- quickwit
- - elasticsearch
-endpoint: fieldca*/_field_caps?fields=date
+endpoint: fieldca*/_field_caps?fields=tags*
expected:
indices:
- fieldcaps
+ - fieldcaps-2
fields:
- date:
- date_nanos:
- type: date_nanos
+ tags:
+ keyword:
+ type: keyword
metadata_field: false
searchable: true
aggregatable: true
+ indices:
+ - fieldcaps
+ - fieldcaps-2
+ tags-2:
+ keyword:
+ type: keyword
+ metadata_field: false
+ searchable: true
+ aggregatable: true
+ indices:
+ - fieldcaps-2
---
# Wildcard on index name + Wildcard without match
method: [GET]
@@ -298,6 +310,7 @@ endpoint: fieldca*,blub*/_field_caps?fields=date
expected:
indices:
- fieldcaps
+ - fieldcaps-2
fields:
date:
date_nanos:
diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.elasticsearch.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.elasticsearch.yaml
index 200e60cb3c0..2e2ee5f346c 100644
--- a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.elasticsearch.yaml
+++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.elasticsearch.yaml
@@ -3,6 +3,10 @@ method: DELETE
endpoint: fieldcaps
status_code: null
---
+method: DELETE
+endpoint: fieldcaps-2
+status_code: null
+---
# Create index 1
method: PUT
endpoint: fieldcaps
@@ -20,6 +24,23 @@ json: {
}
}
---
+# Create index 2
+method: PUT
+endpoint: fieldcaps-2
+json: {
+ "mappings": {
+ "properties": {
+ "host": {
+ "type": "ip",
+ "store": true
+ },
+ "date": {
+ "type": "date_nanos"
+ },
+ }
+ }
+}
+---
# Ingest documents in fieldcaps
method: POST
endpoint: _bulk
@@ -31,4 +52,5 @@ ndjson:
- {"name": "Fritz", "response": 30, "id": 5, "host": "192.168.0.1", "tags": ["nice", "cool"]}
- "index": { "_index": "fieldcaps" }
- {"nested": {"name": "Fritz", "response": 30}, "date": "2015-01-11T12:10:30Z", "host": "192.168.0.11", "tags": ["nice"]}
-
+ - "index": { "_index": "fieldcaps-2" }
+ - {"name": "Fritz", "response": 30, "id": 6, "host": "192.168.0.1", "tags": ["nice", "cool"], "tags-2": ["awesome"]}
diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml
index 0af4612d051..ecdcd2944dd 100644
--- a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml
+++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml
@@ -4,6 +4,11 @@ api_root: http://localhost:7280/api/v1/
endpoint: indexes/fieldcaps
status_code: null
---
+method: DELETE
+api_root: http://localhost:7280/api/v1/
+endpoint: indexes/fieldcaps-2
+status_code: null
+---
# Create index
method: POST
api_root: http://localhost:7280/api/v1/
@@ -27,6 +32,29 @@ json:
type: ip
fast: true
---
+# Create index
+method: POST
+api_root: http://localhost:7280/api/v1/
+endpoint: indexes/
+json:
+ version: "0.6"
+ index_id: fieldcaps-2
+ doc_mapping:
+ mode: dynamic
+ dynamic_mapping:
+ tokenizer: default
+ fast: true
+ field_mappings:
+ - name: date
+ type: datetime
+ input_formats:
+ - rfc3339
+ fast_precision: seconds
+ fast: true
+ - name: host
+ type: ip
+ fast: true
+---
# Ingest documents
method: POST
api_root: http://localhost:7280/api/v1/
@@ -56,4 +84,12 @@ params:
ndjson:
- {"mixed": 5} # inter split mixed type
- {"mixed": -5.5}
-
+---
+# Ingest documents in index 2
+method: POST
+api_root: http://localhost:7280/api/v1/
+endpoint: fieldcaps-2/ingest
+params:
+ commit: force
+ndjson:
+ - {"name": "Fritz", "response": 30, "id": 6, "host": "192.168.0.1", "tags": ["nice", "cool"], "tags-2": ["awesome"]}
diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.elasticsearch.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.elasticsearch.yaml
index 9d3c1723eed..bf5ea75fe8c 100644
--- a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.elasticsearch.yaml
+++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.elasticsearch.yaml
@@ -1,3 +1,6 @@
# # Delete index
method: DELETE
endpoint: fieldcaps
+---
+method: DELETE
+endpoint: fieldcaps-2
diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.quickwit.yaml
index 66330c067ad..4100c9de88b 100644
--- a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.quickwit.yaml
+++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.quickwit.yaml
@@ -2,3 +2,7 @@
method: DELETE
api_root: http://localhost:7280/api/v1/
endpoint: indexes/fieldcaps
+---
+method: DELETE
+api_root: http://localhost:7280/api/v1/
+endpoint: indexes/fieldcaps-2