Skip to content

Commit

Permalink
Fix fmt/clippy.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Dec 25, 2023
1 parent e404ab0 commit 669d178
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 192 deletions.
190 changes: 2 additions & 188 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{HashMap, HashSet};
use std::ops::Bound;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};

use anyhow::Context;
use futures::future::try_join_all;
use itertools::{Either, Itertools};
use quickwit_common::PrettySample;
use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory};
use quickwit_doc_mapper::{DocMapper, TermRange, WarmupInfo};
use quickwit_proto::search::{
CountHits, LeafListTermsResponse, LeafSearchResponse, ListTermsRequest, PartialHit,
SearchRequest, SortOrder, SortValue, SplitIdAndFooterOffsets, SplitSearchError,
CountHits, LeafSearchResponse, PartialHit, SearchRequest, SortOrder, SortValue,
SplitIdAndFooterOffsets, SplitSearchError,
};
use quickwit_query::query_ast::QueryAst;
use quickwit_query::tokenizers::TokenizerManager;
Expand Down Expand Up @@ -697,187 +695,3 @@ async fn leaf_search_single_split_wrapper(
.record_new_worst_hit(last_hit.as_ref());
}
}

/// Apply a leaf list terms on a single split.
#[instrument(skip_all, fields(split_id = split.split_id))]
async fn leaf_list_terms_single_split(
searcher_context: &SearcherContext,
search_request: &ListTermsRequest,
storage: Arc<dyn Storage>,
split: SplitIdAndFooterOffsets,
) -> crate::Result<LeafListTermsResponse> {
let index = open_index_with_caches(searcher_context, storage, &split, None, true).await?;
let split_schema = index.schema();
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let searcher = reader.searcher();

let field = split_schema
.get_field(&search_request.field)
.with_context(|| {
format!(
"couldn't get field named {:?} from schema to list terms",
search_request.field
)
})?;

let field_type = split_schema.get_field_entry(field).field_type();
let start_term: Option<Term> = search_request
.start_key
.as_ref()
.map(|data| term_from_data(field, field_type, data));
let end_term: Option<Term> = search_request
.end_key
.as_ref()
.map(|data| term_from_data(field, field_type, data));

let mut segment_results = Vec::new();
for segment_reader in searcher.segment_readers() {
let inverted_index = segment_reader.inverted_index(field)?.clone();
let dict = inverted_index.terms();
dict.file_slice_for_range(
(
start_term
.as_ref()
.map(Term::serialized_value_bytes)
.map(Bound::Included)
.unwrap_or(Bound::Unbounded),
end_term
.as_ref()
.map(Term::serialized_value_bytes)
.map(Bound::Excluded)
.unwrap_or(Bound::Unbounded),
),
search_request.max_hits,
)
.read_bytes_async()
.await
.with_context(|| "failed to load sstable range")?;

let mut range = dict.range();
if let Some(limit) = search_request.max_hits {
range = range.limit(limit);
}
if let Some(start_term) = &start_term {
range = range.ge(start_term.serialized_value_bytes())
}
if let Some(end_term) = &end_term {
range = range.lt(end_term.serialized_value_bytes())
}
let mut stream = range
.into_stream()
.with_context(|| "failed to create stream over sstable")?;
let mut segment_result: Vec<Vec<u8>> =
Vec::with_capacity(search_request.max_hits.unwrap_or(0) as usize);
while stream.advance() {
segment_result.push(term_to_data(field, field_type, stream.key()));
}
segment_results.push(segment_result);
}

let merged_iter = segment_results.into_iter().kmerge().dedup();
let merged_results: Vec<Vec<u8>> = if let Some(limit) = search_request.max_hits {
merged_iter.take(limit as usize).collect()
} else {
merged_iter.collect()
};

Ok(LeafListTermsResponse {
num_hits: merged_results.len() as u64,
terms: merged_results,
num_attempted_splits: 1,
failed_splits: Vec::new(),
})
}

fn term_from_data(field: Field, field_type: &FieldType, data: &[u8]) -> Term {
let mut term = Term::from_field_bool(field, false);
term.clear_with_type(field_type.value_type());
term.append_bytes(data);
term
}

fn term_to_data(field: Field, field_type: &FieldType, field_value: &[u8]) -> Vec<u8> {
let mut term = Term::from_field_bool(field, false);
term.clear_with_type(field_type.value_type());
term.append_bytes(field_value);
term.serialized_term().to_vec()
}

/// `leaf` step of list terms.
#[instrument(skip_all)]
pub async fn leaf_list_terms(
searcher_context: Arc<SearcherContext>,
request: &ListTermsRequest,
index_storage: Arc<dyn Storage>,
splits: &[SplitIdAndFooterOffsets],
) -> Result<LeafListTermsResponse, SearchError> {
info!(split_offsets = ?PrettySample::new(splits, 5));
let leaf_search_single_split_futures: Vec<_> = splits
.iter()
.map(|split| {
let index_storage_clone = index_storage.clone();
let searcher_context_clone = searcher_context.clone();
async move {
let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore.clone()
.acquire_owned()
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
// TODO dedicated counter and timer?
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
let timer = crate::SEARCH_METRICS
.leaf_search_split_duration_secs
.start_timer();
let leaf_search_single_split_res = leaf_list_terms_single_split(
&searcher_context_clone,
request,
index_storage_clone,
split.clone(),
)
.await;
timer.observe_duration();
leaf_search_single_split_res.map_err(|err| (split.split_id.clone(), err))
}
})
.collect();

let split_search_results = futures::future::join_all(leaf_search_single_split_futures).await;

let (split_search_responses, errors): (Vec<LeafListTermsResponse>, Vec<(String, SearchError)>) =
split_search_results
.into_iter()
.partition_map(|split_search_res| match split_search_res {
Ok(split_search_resp) => Either::Left(split_search_resp),
Err(err) => Either::Right(err),
});

let merged_iter = split_search_responses
.into_iter()
.map(|leaf_search_response| leaf_search_response.terms)
.kmerge()
.dedup();
let terms: Vec<Vec<u8>> = if let Some(limit) = request.max_hits {
merged_iter.take(limit as usize).collect()
} else {
merged_iter.collect()
};

let failed_splits = errors
.into_iter()
.map(|(split_id, err)| SplitSearchError {
split_id,
error: err.to_string(),
retryable_error: true,
})
.collect();
let merged_search_response = LeafListTermsResponse {
num_hits: terms.len() as u64,
terms,
num_attempted_splits: splits.len() as u64,
failed_splits,
};

Ok(merged_search_response)
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub use crate::client::{
pub use crate::cluster_client::ClusterClient;
pub use crate::error::{parse_grpc_error, SearchError};
use crate::fetch_docs::fetch_docs;
use crate::leaf::{leaf_list_terms, leaf_search};
use crate::leaf::leaf_search;
pub use crate::root::{jobs_to_leaf_requests, root_search, IndexMetasForLeafSearch, SearchJob};
pub use crate::search_job_placer::{Job, SearchJobPlacer};
pub use crate::search_response_rest::SearchResponseRest;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/list_terms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub async fn root_list_terms(
// For each node, forward to a node with an affinity for that index id.
for (client, client_jobs) in assigned_leaf_search_jobs {
let leaf_requests =
jobs_to_leaf_requests(&list_terms_request, &index_uid_to_index_uri, client_jobs)?;
jobs_to_leaf_requests(list_terms_request, &index_uid_to_index_uri, client_jobs)?;
for leaf_request in leaf_requests {
leaf_request_tasks.push(cluster_client.leaf_list_terms(leaf_request, client.clone()));
}
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use itertools::Itertools;
use quickwit_common::shared_consts::{DELETION_GRACE_PERIOD, SCROLL_BATCH_LEN};
use quickwit_common::uri::Uri;
use quickwit_common::PrettySample;
use quickwit_config::{build_doc_mapper, IndexConfig};
use quickwit_config::build_doc_mapper;
use quickwit_doc_mapper::tag_pruning::extract_tags_from_query;
use quickwit_doc_mapper::{DocMapper, DYNAMIC_FIELD_NAME};
use quickwit_metastore::{IndexMetadata, ListIndexesMetadataResponseExt, SplitMetadata};
Expand Down Expand Up @@ -1297,7 +1297,7 @@ mod tests {

use quickwit_common::shared_consts::SCROLL_BATCH_LEN;
use quickwit_common::ServiceStream;
use quickwit_config::{DocMapping, IndexingSettings, SearchSettings};
use quickwit_config::{DocMapping, IndexConfig, IndexingSettings, SearchSettings};
use quickwit_indexing::MockSplitBuilder;
use quickwit_metastore::{IndexMetadata, ListSplitsRequestExt, ListSplitsResponseExt};
use quickwit_proto::metastore::{ListIndexesMetadataResponse, ListSplitsResponse};
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use tantivy::Term;

use super::*;
use crate::find_trace_ids_collector::Span;
use crate::list_terms::leaf_list_terms;
use crate::service::SearcherContext;
use crate::single_node_search;

Expand Down

0 comments on commit 669d178

Please sign in to comment.