Skip to content

Commit

Permalink
Allow index patterns in list terms. (#4322)
Browse files Browse the repository at this point in the history
* Allow index patterns in list terms.

* Fix fmt/clippy.

* Fix clippy.*

* Add multi indexes field capabilities rest api tests.

* Fix rest api tests.
  • Loading branch information
fmassot authored Dec 27, 2023
1 parent 4e89425 commit bfc64e1
Show file tree
Hide file tree
Showing 17 changed files with 537 additions and 364 deletions.
6 changes: 3 additions & 3 deletions quickwit/quickwit-jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl JaegerService {
Some(OffsetDateTime::now_utc().unix_timestamp() - self.lookback_period_secs);

let search_request = ListTermsRequest {
index_id,
index_id_patterns: vec![index_id],
field: "service_name".to_string(),
max_hits,
start_timestamp,
Expand Down Expand Up @@ -140,7 +140,7 @@ impl JaegerService {
let end_key = SpanFingerprint::end_key(&request.service, span_kind_opt);

let search_request = ListTermsRequest {
index_id,
index_id_patterns: vec![index_id],
field: "span_fingerprint".to_string(),
max_hits,
start_timestamp,
Expand Down Expand Up @@ -2379,7 +2379,7 @@ mod tests {
service
.expect_root_list_terms()
.withf(|req| {
req.index_id == OTEL_TRACES_INDEX_ID
req.index_id_patterns == vec![OTEL_TRACES_INDEX_ID]
&& req.field == "service_name"
&& req.start_timestamp.is_some()
})
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ message ReportSplitsResponse {}
// -- ListFields -------------------

message ListFieldsRequest {
// Optional limit query to a set of indexes.
repeated string index_ids = 1;
// Index ID patterns
repeated string index_id_patterns = 1;
// Optional limit query to a list of fields
// Wildcard expressions are supported.
repeated string fields = 2;
Expand Down Expand Up @@ -465,8 +465,8 @@ message FetchDocsResponse {
}

message ListTermsRequest {
// Index ID
string index_id = 1;
// Index ID patterns
repeated string index_id_patterns = 1;

// Field to search on
string field = 3;
Expand Down
10 changes: 5 additions & 5 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

192 changes: 3 additions & 189 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{HashMap, HashSet};
use std::ops::Bound;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};

use anyhow::Context;
use futures::future::try_join_all;
use itertools::{Either, Itertools};
use quickwit_common::PrettySample;
use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory};
use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo};
use quickwit_proto::search::{
CountHits, LeafListTermsResponse, LeafSearchResponse, ListTermsRequest, PartialHit,
SearchRequest, SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError,
CountHits, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, SortValue,
SplitIdAndFooterOffsets, SplitSearchError,
};
use quickwit_query::query_ast::QueryAst;
use quickwit_query::tokenizers::TokenizerManager;
Expand All @@ -39,7 +37,7 @@ use quickwit_storage::{
};
use tantivy::directory::FileSlice;
use tantivy::fastfield::FastFieldReaders;
use tantivy::schema::{Field, FieldType};
use tantivy::schema::Field;
use tantivy::{Index, ReloadPolicy, Searcher, Term};
use tracing::*;

Expand Down Expand Up @@ -697,187 +695,3 @@ async fn leaf_search_single_split_wrapper(
.record_new_worst_hit(last_hit.as_ref());
}
}

/// Apply a leaf list terms on a single split.
#[instrument(skip_all, fields(split_id = split.split_id))]
async fn leaf_list_terms_single_split(
searcher_context: &SearcherContext,
search_request: &ListTermsRequest,
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
) -> crate::Result<LeafListTermsResponse> {
let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?;
let split_schema = index.schema();
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let searcher = reader.searcher();

let field = split_schema
.get_field(&search_request.field)
.with_context(|| {
format!(
"couldn't get field named {:?} from schema to list terms",
search_request.field
)
})?;

let field_type = split_schema.get_field_entry(field).field_type();
let start_term: Option<Term> = search_request
.start_key
.as_ref()
.map(|data| term_from_data(field, field_type, data));
let end_term: Option<Term> = search_request
.end_key
.as_ref()
.map(|data| term_from_data(field, field_type, data));

let mut segment_results = Vec::new();
for segment_reader in searcher.segment_readers() {
let inverted_index = segment_reader.inverted_index(field)?.clone();
let dict = inverted_index.terms();
dict.file_slice_for_range(
(
start_term
.as_ref()
.map(Term::serialized_value_bytes)
.map(Bound::Included)
.unwrap_or(Bound::Unbounded),
end_term
.as_ref()
.map(Term::serialized_value_bytes)
.map(Bound::Excluded)
.unwrap_or(Bound::Unbounded),
),
search_request.max_hits,
)
.read_bytes_async()
.await
.with_context(|| "failed to load sstable range")?;

let mut range = dict.range();
if let Some(limit) = search_request.max_hits {
range = range.limit(limit);
}
if let Some(start_term) = &start_term {
range = range.ge(start_term.serialized_value_bytes())
}
if let Some(end_term) = &end_term {
range = range.lt(end_term.serialized_value_bytes())
}
let mut stream = range
.into_stream()
.with_context(|| "failed to create stream over sstable")?;
let mut segment_result: Vec<Vec<u8>> =
Vec::with_capacity(search_request.max_hits.unwrap_or(0) as usize);
while stream.advance() {
segment_result.push(term_to_data(field, field_type, stream.key()));
}
segment_results.push(segment_result);
}

let merged_iter = segment_results.into_iter().kmerge().dedup();
let merged_results: Vec<Vec<u8>> = if let Some(limit) = search_request.max_hits {
merged_iter.take(limit as usize).collect()
} else {
merged_iter.collect()
};

Ok(LeafListTermsResponse {
num_hits: merged_results.len() as u64,
terms: merged_results,
num_attempted_splits: 1,
failed_splits: Vec::new(),
})
}

fn term_from_data(field: Field, field_type: &FieldType, data: &[u8]) -> Term {
let mut term = Term::from_field_bool(field, false);
term.clear_with_type(field_type.value_type());
term.append_bytes(data);
term
}

fn term_to_data(field: Field, field_type: &FieldType, field_value: &[u8]) -> Vec<u8> {
let mut term = Term::from_field_bool(field, false);
term.clear_with_type(field_type.value_type());
term.append_bytes(field_value);
term.serialized_term().to_vec()
}

/// `leaf` step of list terms.
#[instrument(skip_all, fields(index = ?request.index_id))]
pub async fn leaf_list_terms(
searcher_context: Arc<SearcherContext>,
request: &ListTermsRequest,
index_storage: Arc<dyn Storage>,
splits: &[SplitIdAndFooterOffsets],
) -> Result<LeafListTermsResponse, SearchError> {
info!(split_offsets = ?PrettySample::new(splits, 5));
let leaf_search_single_split_futures: Vec<_> = splits
.iter()
.map(|split| {
let index_storage_clone = index_storage.clone();
let searcher_context_clone = searcher_context.clone();
async move {
let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore.clone()
.acquire_owned()
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
// TODO dedicated counter and timer?
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
let timer = crate::SEARCH_METRICS
.leaf_search_split_duration_secs
.start_timer();
let leaf_search_single_split_res = leaf_list_terms_single_split(
&searcher_context_clone,
request,
index_storage_clone,
split.clone(),
)
.await;
timer.observe_duration();
leaf_search_single_split_res.map_err(|err| (split.split_id.clone(), err))
}
})
.collect();

let split_search_results = futures::future::join_all(leaf_search_single_split_futures).await;

let (split_search_responses, errors): (Vec<LeafListTermsResponse>, Vec<(String, SearchError)>) =
split_search_results
.into_iter()
.partition_map(|split_search_res| match split_search_res {
Ok(split_search_resp) => Either::Left(split_search_resp),
Err(err) => Either::Right(err),
});

let merged_iter = split_search_responses
.into_iter()
.map(|leaf_search_response| leaf_search_response.terms)
.kmerge()
.dedup();
let terms: Vec<Vec<u8>> = if let Some(limit) = request.max_hits {
merged_iter.take(limit as usize).collect()
} else {
merged_iter.collect()
};

let failed_splits = errors
.into_iter()
.map(|(split_id, err)| SplitSearchError {
split_id,
error: err.to_string(),
retryable_error: true,
})
.collect();
let merged_search_response = LeafListTermsResponse {
num_hits: terms.len() as u64,
terms,
num_attempted_splits: splits.len() as u64,
failed_splits,
};

Ok(merged_search_response)
}
7 changes: 3 additions & 4 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod leaf;
mod leaf_cache;
mod list_fields;
mod list_fields_cache;
mod list_terms;
mod retry;
mod root;
mod scroll_context;
Expand Down Expand Up @@ -79,10 +80,8 @@ pub use crate::client::{
pub use crate::cluster_client::ClusterClient;
pub use crate::error::{parse_grpc_error, SearchError};
use crate::fetch_docs::fetch_docs;
use crate::leaf::{leaf_list_terms, leaf_search};
pub use crate::root::{
jobs_to_leaf_requests, root_list_terms, root_search, IndexMetasForLeafSearch, SearchJob,
};
use crate::leaf::leaf_search;
pub use crate::root::{jobs_to_leaf_requests, root_search, IndexMetasForLeafSearch, SearchJob};
pub use crate::search_job_placer::{Job, SearchJobPlacer};
pub use crate::search_response_rest::SearchResponseRest;
pub use crate::search_stream::root_search_stream;
Expand Down
13 changes: 8 additions & 5 deletions quickwit/quickwit-search/src/list_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ fn matches_pattern(field_pattern: &str, field_name: &str) -> bool {
}
}
}
///

/// `leaf` step of list fields.
pub async fn leaf_list_fields(
index_id: String,
index_storage: Arc<dyn Storage>,
Expand Down Expand Up @@ -289,11 +290,11 @@ pub async fn root_list_fields(
cluster_client: &ClusterClient,
mut metastore: MetastoreServiceClient,
) -> crate::Result<ListFieldsResponse> {
let list_indexes_metadata_request = if list_fields_req.index_ids.is_empty() {
let list_indexes_metadata_request = if list_fields_req.index_id_patterns.is_empty() {
ListIndexesMetadataRequest::all()
} else {
ListIndexesMetadataRequest {
index_id_patterns: list_fields_req.index_ids.clone(),
index_id_patterns: list_fields_req.index_id_patterns.clone(),
}
};

Expand All @@ -303,7 +304,10 @@ pub async fn root_list_fields(
.list_indexes_metadata(list_indexes_metadata_request)
.await?
.deserialize_indexes_metadata()?;
check_all_index_metadata_found(&indexes_metadata[..], &list_fields_req.index_ids[..])?;
check_all_index_metadata_found(
&indexes_metadata[..],
&list_fields_req.index_id_patterns[..],
)?;
// The request contains a wildcard, but couldn't find any index.
if indexes_metadata.is_empty() {
return Ok(ListFieldsResponse { fields: vec![] });
Expand All @@ -326,7 +330,6 @@ pub async fn root_list_fields(
.into_iter()
.map(|index_metadata| index_metadata.index_uid)
.collect();

let split_metadatas: Vec<SplitMetadata> =
list_relevant_splits(index_uids, None, None, None, &mut metastore).await?;

Expand Down
Loading

0 comments on commit bfc64e1

Please sign in to comment.