Skip to content

Commit

Permalink
Fix instrumentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Aug 29, 2023
1 parent f7df5ef commit 1d67ad6
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 16 deletions.
6 changes: 6 additions & 0 deletions quickwit/quickwit-search/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl SearchServiceClient {
}

/// Perform leaf search.
#[instrument(skip_all)]
pub async fn leaf_search(
&mut self,
request: quickwit_proto::search::LeafSearchRequest,
Expand All @@ -139,6 +140,7 @@ impl SearchServiceClient {
}

/// Perform leaf stream.
#[instrument(skip(self, request))]
pub async fn leaf_search_stream(
&mut self,
request: quickwit_proto::search::LeafSearchStreamRequest,
Expand Down Expand Up @@ -193,6 +195,7 @@ impl SearchServiceClient {
}

/// Perform fetch docs.
#[instrument(skip(self, request))]
pub async fn fetch_docs(
&mut self,
request: quickwit_proto::search::FetchDocsRequest,
Expand All @@ -211,6 +214,7 @@ impl SearchServiceClient {
}

/// Perform leaf list terms.
#[instrument(skip(self, request))]
pub async fn leaf_list_terms(
&mut self,
request: quickwit_proto::search::LeafListTermsRequest,
Expand All @@ -231,6 +235,7 @@ impl SearchServiceClient {
/// Gets the value associated to a key stored locally in the targetted node.
/// This call is not "distributed".
/// If the key is not present on the targetted search `None` is simply returned.
#[instrument(skip(self, get_kv_req))]
pub async fn get_kv(&mut self, get_kv_req: GetKvRequest) -> crate::Result<Option<Vec<u8>>> {
match &mut self.client_impl {
SearchServiceClientImpl::Local(service) => {
Expand All @@ -251,6 +256,7 @@ impl SearchServiceClient {
/// Gets the value associated to a key stored locally in the targetted node.
/// This call is not "distributed". It is up to the client to put the K,V pair
/// on several nodes.
#[instrument(skip(self, put_kv_req))]
pub async fn put_kv(&mut self, put_kv_req: PutKvRequest) -> crate::Result<()> {
match &mut self.client_impl {
SearchServiceClientImpl::Local(service) => {
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ pub(crate) async fn open_index_with_caches(
/// to be hit.
#[instrument(skip(searcher))]
pub(crate) async fn warmup(searcher: &Searcher, warmup_info: &WarmupInfo) -> anyhow::Result<()> {
debug!(warmup_info=?warmup_info, "warmup");
let warm_up_terms_future = warm_up_terms(searcher, &warmup_info.terms_grouped_by_field)
.instrument(debug_span!("warm_up_terms"));
let warm_up_term_ranges_future =
Expand Down Expand Up @@ -326,7 +325,7 @@ async fn warm_up_fieldnorms(searcher: &Searcher, requires_scoring: bool) -> anyh
}

/// Apply a leaf search on a single split.
#[instrument(skip(searcher_context, search_request, storage, split, doc_mapper,))]
#[instrument(skip_all)]
async fn leaf_search_single_split(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
Expand Down
22 changes: 12 additions & 10 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ fn get_scroll_ttl_duration(search_request: &SearchRequest) -> crate::Result<Opti
Ok(Some(scroll_ttl))
}

#[instrument(skip(search_request, indexes_metas_for_leaf_search, cluster_client))]
#[instrument(skip_all)]
async fn search_partial_hits_phase_with_scroll(
searcher_context: &SearcherContext,
indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch,
Expand Down Expand Up @@ -431,7 +431,7 @@ async fn search_partial_hits_phase_with_scroll(
}
}

#[instrument(skip(search_request, indexes_metas_for_leaf_search, cluster_client))]
#[instrument(skip_all)]
pub(crate) async fn search_partial_hits_phase(
searcher_context: &SearcherContext,
indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch,
Expand Down Expand Up @@ -472,7 +472,7 @@ pub(crate) async fn search_partial_hits_phase(
.await
.context("failed to merge leaf search responses")?
.map_err(|error: TantivyError| crate::SearchError::Internal(error.to_string()))?;
debug!(leaf_search_response = ?leaf_search_response, "Merged leaf search response.");
// debug!(leaf_search_response = ?leaf_search_response, "Merged leaf search response.");

if !leaf_search_response.failed_splits.is_empty() {
error!(failed_splits = ?leaf_search_response.failed_splits, "Leaf search response contains at least one failed split.");
Expand All @@ -492,6 +492,13 @@ pub(crate) fn get_snippet_request(search_request: &SearchRequest) -> Option<Snip
})
}

#[instrument(skip(
indexes_metas_for_leaf_search,
partial_hits,
split_metadatas,
snippet_request_opt,
cluster_client
))]
pub(crate) async fn fetch_docs_phase(
indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch,
partial_hits: &[PartialHit],
Expand Down Expand Up @@ -571,12 +578,7 @@ pub(crate) async fn fetch_docs_phase(
/// 2. Merges the search results.
/// 3. Sends fetch docs requests to multiple leaf nodes.
/// 4. Builds the response with docs and returns.
#[instrument(skip(
searcher_context,
indexes_metas_for_leaf_search,
search_request,
cluster_client
))]
#[instrument(skip_all)]
async fn root_search_aux(
searcher_context: &SearcherContext,
indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch,
Expand Down Expand Up @@ -671,7 +673,7 @@ fn finalize_aggregation_if_any(
/// 2. Merges the search results.
/// 3. Sends fetch docs requests to multiple leaf nodes.
/// 4. Builds the response with docs and returns.
#[instrument(skip(search_request, cluster_client, metastore))]
#[instrument(skip_all)]
pub async fn root_search(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use quickwit_storage::{Cache, MemorySizedCache, QuickwitCache, StorageResolver};
use tantivy::aggregation::AggregationLimits;
use tokio::sync::Semaphore;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::info;
use tracing::{info, debug};

use crate::leaf_cache::LeafSearchCache;
use crate::root::{fetch_docs_phase, get_snippet_request};
Expand Down Expand Up @@ -178,7 +178,7 @@ impl SearchService for SearchServiceImpl {
let search_request = leaf_search_request
.search_request
.ok_or_else(|| SearchError::Internal("No search request.".to_string()))?;
info!(index=?search_request.index_id_patterns, splits=?leaf_search_request.split_offsets, "leaf_search");
debug!(index=?search_request.index_id_patterns, splits=?leaf_search_request.split_offsets, "leaf_search");
let storage = self
.storage_resolver
.resolve(&Uri::from_well_formed(leaf_search_request.index_uri))
Expand Down Expand Up @@ -242,7 +242,7 @@ impl SearchService for SearchServiceImpl {
let stream_request = leaf_stream_request
.request
.ok_or_else(|| SearchError::Internal("No search request.".to_string()))?;
info!(index=?stream_request.index_id, splits=?leaf_stream_request.split_offsets, "leaf_search");
debug!(index=?stream_request.index_id, splits=?leaf_stream_request.split_offsets, "leaf_search");
let storage = self
.storage_resolver
.resolve(&Uri::from_well_formed(leaf_stream_request.index_uri))
Expand Down Expand Up @@ -280,7 +280,7 @@ impl SearchService for SearchServiceImpl {
let search_request = leaf_search_request
.list_terms_request
.ok_or_else(|| SearchError::Internal("No search request.".to_string()))?;
info!(index=?search_request.index_id, splits=?leaf_search_request.split_offsets,
debug!(index=?search_request.index_id, splits=?leaf_search_request.split_offsets,
"leaf_search");
let storage = self
.storage_resolver
Expand Down

0 comments on commit 1d67ad6

Please sign in to comment.