From 84883276fe1ef2d516f4e579993218e6a3f4732d Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 23 Oct 2023 17:48:25 +0900 Subject: [PATCH] Allowing the second query of `find_traces` jaeger request to run without acquiring any permit --- quickwit/quickwit-jaeger/src/lib.rs | 77 +++++++++++++------ .../protos/quickwit/search.proto | 6 ++ .../src/codegen/quickwit/quickwit.search.rs | 6 ++ quickwit/quickwit-search/src/leaf.rs | 21 +++-- quickwit/quickwit-search/src/root.rs | 1 + .../src/elastic_search_api/rest_handler.rs | 1 + .../src/search_api/rest_handler.rs | 1 + 7 files changed, 83 insertions(+), 30 deletions(-) diff --git a/quickwit/quickwit-jaeger/src/lib.rs b/quickwit/quickwit-jaeger/src/lib.rs index 40cd99afe99..732ca080858 100644 --- a/quickwit/quickwit-jaeger/src/lib.rs +++ b/quickwit/quickwit-jaeger/src/lib.rs @@ -18,7 +18,8 @@ // along with this program. If not, see . use std::collections::HashMap; -use std::mem; +use std::{fmt, mem}; +use std::fmt::Formatter; use std::ops::{Bound, RangeInclusive}; use std::sync::Arc; use std::time::Instant; @@ -79,6 +80,27 @@ pub struct JaegerService { max_fetch_spans: u64, } +#[derive(Copy, Clone, Debug)] +enum JaegerOperation { + FindTraces, + GetTrace, +} + +impl JaegerOperation { + pub fn name(&self) -> &'static str { + match self { + JaegerOperation::GetTrace => "get_trace", + JaegerOperation::FindTraces => "find_traces", + } + } +} + +impl fmt::Display for JaegerOperation { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "{}", self.name()) + } +} + impl JaegerService { pub fn new(config: JaegerConfig, search_service: Arc) -> Self { Self { @@ -187,7 +209,6 @@ impl JaegerService { async fn find_traces_inner( &self, request: FindTracesRequest, - operation_name: &'static str, request_start: Instant, ) -> JaegerResult { debug!(request=?request, "`find_traces` request"); @@ -200,16 +221,15 @@ impl JaegerService { let end = span_timestamps_range.end() + self.max_trace_duration_secs; let search_window = start..=end; let response = self - .stream_spans(&trace_ids, search_window, operation_name, request_start) + .stream_spans(&trace_ids, search_window, JaegerOperation::FindTraces, request_start) .await?; Ok(response) } - #[instrument("find_traces", skip_all)] + #[instrument("get_trace", skip_all)] async fn get_trace_inner( &self, request: GetTraceRequest, - operation_name: &'static str, request_start: Instant, ) -> JaegerResult { debug!(request=?request, "`get_trace` request"); @@ -220,12 +240,12 @@ impl JaegerService { let start = end - self.lookback_period_secs; let search_window = start..=end; let response = self - .stream_spans(&[trace_id], search_window, operation_name, request_start) + .stream_spans(&[trace_id], search_window, JaegerOperation::GetTrace, request_start) .await?; Ok(response) } - #[instrument("find_trace_ids", skip_all fields(service_name=%trace_query.service_name, operation_name=%trace_query.operation_name))] + #[instrument(skip_all, fields(service_name=%trace_query.service_name, operation_name=%trace_query.operation_name))] async fn find_trace_ids( &self, trace_query: TraceQueryParameters, @@ -274,12 +294,12 @@ impl JaegerService { Ok(trace_ids) } - #[instrument("stream_spans", skip_all, fields(num_traces=%trace_ids.len(), num_spans=Empty, num_bytes=Empty))] + #[instrument("stream_spans", skip_all, fields(num_traces=%trace_ids.len(), operation=%operation, num_spans=Empty, num_bytes=Empty))] async fn stream_spans( &self, trace_ids: &[TraceId], search_window: TimeIntervalSecs, - operation_name: &'static str, + operation: JaegerOperation, request_start: Instant, ) -> Result { if trace_ids.is_empty() { @@ -302,19 +322,30 @@ impl JaegerService { let query_ast = serde_json::to_string(&query_ast).map_err(|err| Status::internal(err.to_string()))?; + let priority = match operation { + JaegerOperation::FindTraces => { + // This is the second of two requests. Let's run it with a higher priority + // to reduce the average latency for the overall query set. + true + } + JaegerOperation::GetTrace => { + false + } + }; let search_request = SearchRequest { index_id_patterns: vec![OTEL_TRACES_INDEX_ID.to_string()], query_ast, start_timestamp: Some(*search_window.start()), end_timestamp: Some(*search_window.end()), max_hits: self.max_fetch_spans, + priority, ..Default::default() }; let search_response = match self.search_service.root_search(search_request).await { Ok(search_response) => search_response, Err(search_error) => { error!("Failed to fetch spans: {search_error:?}"); - record_error(operation_name, request_start); + record_error(operation, request_start); return Err(Status::internal("Failed to fetch spans.")); } }; @@ -326,7 +357,7 @@ impl JaegerService { spans.push(span); } Err(status) => { - record_error(operation_name, request_start); + record_error(operation, request_start); return Err(status); } }; @@ -366,7 +397,7 @@ impl JaegerService { debug!("Client disconnected: {send_error:?}"); return; } - record_send(operation_name, num_spans, chunk_num_bytes); + record_send(operation, num_spans, chunk_num_bytes); chunk_num_bytes = 0; } chunk_num_bytes += span_num_bytes; @@ -381,20 +412,20 @@ impl JaegerService { debug!("Client disconnected: {send_error:?}"); return; } - record_send(operation_name, num_spans, chunk_num_bytes); + record_send(operation, num_spans, chunk_num_bytes); } current_span.record("num_spans", num_spans_total); current_span.record("num_bytes", num_bytes_total); JAEGER_SERVICE_METRICS .fetched_traces_total - .with_label_values([operation_name, OTEL_TRACES_INDEX_ID]) + .with_label_values([operation.name(), OTEL_TRACES_INDEX_ID]) .inc_by(num_traces); let elapsed = request_start.elapsed().as_secs_f64(); JAEGER_SERVICE_METRICS .request_duration_seconds - .with_label_values([operation_name, OTEL_TRACES_INDEX_ID, "false"]) + .with_label_values([operation.name(), OTEL_TRACES_INDEX_ID, "false"]) .observe(elapsed); }); Ok(ReceiverStream::new(rx)) @@ -423,27 +454,27 @@ macro_rules! metrics { }; } -fn record_error(operation_name: &'static str, request_start: Instant) { +fn record_error(operation: JaegerOperation, request_start: Instant) { JAEGER_SERVICE_METRICS .request_errors_total - .with_label_values([operation_name, OTEL_TRACES_INDEX_ID]) + .with_label_values([operation.name(), OTEL_TRACES_INDEX_ID]) .inc(); let elapsed = request_start.elapsed().as_secs_f64(); JAEGER_SERVICE_METRICS .request_duration_seconds - .with_label_values([operation_name, OTEL_TRACES_INDEX_ID, "true"]) + .with_label_values([operation.name(), OTEL_TRACES_INDEX_ID, "true"]) .observe(elapsed); } -fn record_send(operation_name: &'static str, num_spans: usize, num_bytes: usize) { +fn record_send(operation: JaegerOperation, num_spans: usize, num_bytes: usize) { JAEGER_SERVICE_METRICS .fetched_spans_total - .with_label_values([operation_name, OTEL_TRACES_INDEX_ID]) + .with_label_values([operation.name(), OTEL_TRACES_INDEX_ID]) .inc_by(num_spans as u64); JAEGER_SERVICE_METRICS .transferred_bytes_total - .with_label_values([operation_name, OTEL_TRACES_INDEX_ID]) + .with_label_values([operation.name(), OTEL_TRACES_INDEX_ID]) .inc_by(num_bytes as u64); } @@ -487,7 +518,7 @@ impl SpanReaderPlugin for JaegerService { &self, request: Request, ) -> Result, Status> { - self.find_traces_inner(request.into_inner(), "find_traces", Instant::now()) + self.find_traces_inner(request.into_inner(), Instant::now()) .await .map(Response::new) } @@ -496,7 +527,7 @@ impl SpanReaderPlugin for JaegerService { &self, request: Request, ) -> Result, Status> { - self.get_trace_inner(request.into_inner(), "get_trace", Instant::now()) + self.get_trace_inner(request.into_inner(), Instant::now()) .await .map(Response::new) } diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 3e66acb32ec..36597f796a1 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -170,6 +170,12 @@ message SearchRequest { // enable pagination. // If split_id is empty, no comparison with _shard_doc should be done optional PartialHit search_after = 16; + + // Pick the leaf search semaphore in priority. + // This flag should be used when running a multi-step query. + // In this case, it makes sense to reduce the average latency by acquiring the permit in priority for all steps + // apart from the first one. This is especially useful when working with traces, as the a single + bool priority = 17; } message SortField { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index bd1c46cf895..c550b89db6d 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -107,6 +107,12 @@ pub struct SearchRequest { /// If split_id is empty, no comparison with _shard_doc should be done #[prost(message, optional, tag = "16")] pub search_after: ::core::option::Option, + /// Pick the leaf search semaphore in priority. + /// This flag should be used when running a multi-step query. + /// In this case, it makes sense to reduce the average latency by acquiring the permit in priority for all steps + /// apart from the first one. This is especially useful when working with traces, as the a single + #[prost(bool, tag = "17")] + pub priority: bool, } #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[derive(Eq, Hash)] diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index eef9c134eeb..3f38af89e97 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -41,6 +41,7 @@ use tantivy::fastfield::FastFieldReaders; use tantivy::schema::{Field, FieldType}; use tantivy::tokenizer::TokenizerManager; use tantivy::{Index, ReloadPolicy, Searcher, Term}; +use tokio::sync::OwnedSemaphorePermit; use tracing::*; use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector}; @@ -565,11 +566,17 @@ pub async fn leaf_search( let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len()); for split in splits { - let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore - .clone() - .acquire_owned() - .await - .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."); + let leaf_split_search_permit_opt = + if !request.priority { + Some(searcher_context.leaf_search_split_semaphore + .clone() + .acquire_owned() + .await + .expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.")) + } else { + None + }; + let mut request = (*request).clone(); @@ -591,7 +598,7 @@ pub async fn leaf_search( split, split_filter.clone(), incremental_merge_collector.clone(), - leaf_split_search_permit, + leaf_split_search_permit_opt, ) .in_current_span(), )); @@ -634,7 +641,7 @@ async fn leaf_search_single_split_wrapper( split: SplitIdAndFooterOffsets, split_filter: Arc>, incremental_merge_collector: Arc>, - leaf_split_search_permit: tokio::sync::OwnedSemaphorePermit, + leaf_split_search_permit: Option, ) { crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); let timer = crate::SEARCH_METRICS diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 13af3c21303..0bfffaba177 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -306,6 +306,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result< // We remove the scroll ttl parameter. It is irrelevant to process later request scroll_ttl_secs: None, search_after: None, + priority: req.priority, }) } diff --git a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs index 5be86f6b2c8..2b5fec5da65 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs @@ -193,6 +193,7 @@ fn build_request_for_es_api( snippet_fields: Vec::new(), scroll_ttl_secs, search_after, + priority: false, }, has_doc_id_field, )) diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index c2453632244..71e60c9d75e 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -242,6 +242,7 @@ pub fn search_request_from_api_request( sort_fields: search_request.sort_by.sort_fields, scroll_ttl_secs: None, search_after: None, + priority: false, }; Ok(search_request) }