Skip to content

Commit

Permalink
Allowing the second query of find_traces jaeger request to run without
Browse files Browse the repository at this point in the history
acquiring any permit
  • Loading branch information
fulmicoton committed Oct 23, 2023
1 parent 5f2f6aa commit 8488327
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 30 deletions.
77 changes: 54 additions & 23 deletions quickwit/quickwit-jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::mem;
use std::{fmt, mem};
use std::fmt::Formatter;
use std::ops::{Bound, RangeInclusive};
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -79,6 +80,27 @@ pub struct JaegerService {
max_fetch_spans: u64,
}

#[derive(Copy, Clone, Debug)]
enum JaegerOperation {
FindTraces,
GetTrace,
}

impl JaegerOperation {
pub fn name(&self) -> &'static str {
match self {
JaegerOperation::GetTrace => "get_trace",
JaegerOperation::FindTraces => "find_traces",
}
}
}

impl fmt::Display for JaegerOperation {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}

impl JaegerService {
pub fn new(config: JaegerConfig, search_service: Arc<dyn SearchService>) -> Self {
Self {
Expand Down Expand Up @@ -187,7 +209,6 @@ impl JaegerService {
async fn find_traces_inner(
&self,
request: FindTracesRequest,
operation_name: &'static str,
request_start: Instant,
) -> JaegerResult<SpanStream> {
debug!(request=?request, "`find_traces` request");
Expand All @@ -200,16 +221,15 @@ impl JaegerService {
let end = span_timestamps_range.end() + self.max_trace_duration_secs;
let search_window = start..=end;
let response = self
.stream_spans(&trace_ids, search_window, operation_name, request_start)
.stream_spans(&trace_ids, search_window, JaegerOperation::FindTraces, request_start)
.await?;
Ok(response)
}

#[instrument("find_traces", skip_all)]
#[instrument("get_trace", skip_all)]
async fn get_trace_inner(
&self,
request: GetTraceRequest,
operation_name: &'static str,
request_start: Instant,
) -> JaegerResult<SpanStream> {
debug!(request=?request, "`get_trace` request");
Expand All @@ -220,12 +240,12 @@ impl JaegerService {
let start = end - self.lookback_period_secs;
let search_window = start..=end;
let response = self
.stream_spans(&[trace_id], search_window, operation_name, request_start)
.stream_spans(&[trace_id], search_window, JaegerOperation::GetTrace, request_start)
.await?;
Ok(response)
}

#[instrument("find_trace_ids", skip_all fields(service_name=%trace_query.service_name, operation_name=%trace_query.operation_name))]
#[instrument(skip_all, fields(service_name=%trace_query.service_name, operation_name=%trace_query.operation_name))]
async fn find_trace_ids(
&self,
trace_query: TraceQueryParameters,
Expand Down Expand Up @@ -274,12 +294,12 @@ impl JaegerService {
Ok(trace_ids)
}

#[instrument("stream_spans", skip_all, fields(num_traces=%trace_ids.len(), num_spans=Empty, num_bytes=Empty))]
#[instrument("stream_spans", skip_all, fields(num_traces=%trace_ids.len(), operation=%operation, num_spans=Empty, num_bytes=Empty))]
async fn stream_spans(
&self,
trace_ids: &[TraceId],
search_window: TimeIntervalSecs,
operation_name: &'static str,
operation: JaegerOperation,
request_start: Instant,
) -> Result<SpanStream, Status> {
if trace_ids.is_empty() {
Expand All @@ -302,19 +322,30 @@ impl JaegerService {
let query_ast =
serde_json::to_string(&query_ast).map_err(|err| Status::internal(err.to_string()))?;

let priority = match operation {
JaegerOperation::FindTraces => {
// This is the second of two requests. Let's run it with a higher priority
// to reduce the average latency for the overall query set.
true
}
JaegerOperation::GetTrace => {
false
}
};
let search_request = SearchRequest {
index_id_patterns: vec![OTEL_TRACES_INDEX_ID.to_string()],
query_ast,
start_timestamp: Some(*search_window.start()),
end_timestamp: Some(*search_window.end()),
max_hits: self.max_fetch_spans,
priority,
..Default::default()
};
let search_response = match self.search_service.root_search(search_request).await {
Ok(search_response) => search_response,
Err(search_error) => {
error!("Failed to fetch spans: {search_error:?}");
record_error(operation_name, request_start);
record_error(operation, request_start);
return Err(Status::internal("Failed to fetch spans."));
}
};
Expand All @@ -326,7 +357,7 @@ impl JaegerService {
spans.push(span);
}
Err(status) => {
record_error(operation_name, request_start);
record_error(operation, request_start);
return Err(status);
}
};
Expand Down Expand Up @@ -366,7 +397,7 @@ impl JaegerService {
debug!("Client disconnected: {send_error:?}");
return;
}
record_send(operation_name, num_spans, chunk_num_bytes);
record_send(operation, num_spans, chunk_num_bytes);
chunk_num_bytes = 0;
}
chunk_num_bytes += span_num_bytes;
Expand All @@ -381,20 +412,20 @@ impl JaegerService {
debug!("Client disconnected: {send_error:?}");
return;
}
record_send(operation_name, num_spans, chunk_num_bytes);
record_send(operation, num_spans, chunk_num_bytes);
}
current_span.record("num_spans", num_spans_total);
current_span.record("num_bytes", num_bytes_total);

JAEGER_SERVICE_METRICS
.fetched_traces_total
.with_label_values([operation_name, OTEL_TRACES_INDEX_ID])
.with_label_values([operation.name(), OTEL_TRACES_INDEX_ID])
.inc_by(num_traces);

let elapsed = request_start.elapsed().as_secs_f64();
JAEGER_SERVICE_METRICS
.request_duration_seconds
.with_label_values([operation_name, OTEL_TRACES_INDEX_ID, "false"])
.with_label_values([operation.name(), OTEL_TRACES_INDEX_ID, "false"])
.observe(elapsed);
});
Ok(ReceiverStream::new(rx))
Expand Down Expand Up @@ -423,27 +454,27 @@ macro_rules! metrics {
};
}

fn record_error(operation_name: &'static str, request_start: Instant) {
fn record_error(operation: JaegerOperation, request_start: Instant) {
JAEGER_SERVICE_METRICS
.request_errors_total
.with_label_values([operation_name, OTEL_TRACES_INDEX_ID])
.with_label_values([operation.name(), OTEL_TRACES_INDEX_ID])
.inc();

let elapsed = request_start.elapsed().as_secs_f64();
JAEGER_SERVICE_METRICS
.request_duration_seconds
.with_label_values([operation_name, OTEL_TRACES_INDEX_ID, "true"])
.with_label_values([operation.name(), OTEL_TRACES_INDEX_ID, "true"])
.observe(elapsed);
}

fn record_send(operation_name: &'static str, num_spans: usize, num_bytes: usize) {
fn record_send(operation: JaegerOperation, num_spans: usize, num_bytes: usize) {
JAEGER_SERVICE_METRICS
.fetched_spans_total
.with_label_values([operation_name, OTEL_TRACES_INDEX_ID])
.with_label_values([operation.name(), OTEL_TRACES_INDEX_ID])
.inc_by(num_spans as u64);
JAEGER_SERVICE_METRICS
.transferred_bytes_total
.with_label_values([operation_name, OTEL_TRACES_INDEX_ID])
.with_label_values([operation.name(), OTEL_TRACES_INDEX_ID])
.inc_by(num_bytes as u64);
}

Expand Down Expand Up @@ -487,7 +518,7 @@ impl SpanReaderPlugin for JaegerService {
&self,
request: Request<FindTracesRequest>,
) -> Result<Response<Self::FindTracesStream>, Status> {
self.find_traces_inner(request.into_inner(), "find_traces", Instant::now())
self.find_traces_inner(request.into_inner(), Instant::now())
.await
.map(Response::new)
}
Expand All @@ -496,7 +527,7 @@ impl SpanReaderPlugin for JaegerService {
&self,
request: Request<GetTraceRequest>,
) -> Result<Response<Self::GetTraceStream>, Status> {
self.get_trace_inner(request.into_inner(), "get_trace", Instant::now())
self.get_trace_inner(request.into_inner(), Instant::now())
.await
.map(Response::new)
}
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ message SearchRequest {
// enable pagination.
// If split_id is empty, no comparison with _shard_doc should be done
optional PartialHit search_after = 16;

// Pick the leaf search semaphore in priority.
// This flag should be used when running a multi-step query.
// In this case, it makes sense to reduce the average latency by acquiring the permit in priority for all steps
// apart from the first one. This is especially useful when working with traces, as the a single
bool priority = 17;
}

message SortField {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ pub struct SearchRequest {
/// If split_id is empty, no comparison with _shard_doc should be done
#[prost(message, optional, tag = "16")]
pub search_after: ::core::option::Option<PartialHit>,
/// Pick the leaf search semaphore in priority.
/// This flag should be used when running a multi-step query.
/// In this case, it makes sense to reduce the average latency by acquiring the permit in priority for all steps
/// apart from the first one. This is especially useful when working with traces, as the a single
#[prost(bool, tag = "17")]
pub priority: bool,
}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[derive(Eq, Hash)]
Expand Down
21 changes: 14 additions & 7 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use tantivy::fastfield::FastFieldReaders;
use tantivy::schema::{Field, FieldType};
use tantivy::tokenizer::TokenizerManager;
use tantivy::{Index, ReloadPolicy, Searcher, Term};
use tokio::sync::OwnedSemaphorePermit;
use tracing::*;

use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
Expand Down Expand Up @@ -565,11 +566,17 @@ pub async fn leaf_search(
let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len());

for split in splits {
let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore
.clone()
.acquire_owned()
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
let leaf_split_search_permit_opt =
if !request.priority {
Some(searcher_context.leaf_search_split_semaphore
.clone()
.acquire_owned()
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."))
} else {
None
};


let mut request = (*request).clone();

Expand All @@ -591,7 +598,7 @@ pub async fn leaf_search(
split,
split_filter.clone(),
incremental_merge_collector.clone(),
leaf_split_search_permit,
leaf_split_search_permit_opt,
)
.in_current_span(),
));
Expand Down Expand Up @@ -634,7 +641,7 @@ async fn leaf_search_single_split_wrapper(
split: SplitIdAndFooterOffsets,
split_filter: Arc<Mutex<CanSplitDoBetter>>,
incremental_merge_collector: Arc<Mutex<IncrementalCollector>>,
leaf_split_search_permit: tokio::sync::OwnedSemaphorePermit,
leaf_split_search_permit: Option<OwnedSemaphorePermit>,
) {
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
let timer = crate::SEARCH_METRICS
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result<
// We remove the scroll ttl parameter. It is irrelevant to process later request
scroll_ttl_secs: None,
search_after: None,
priority: req.priority,
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ fn build_request_for_es_api(
snippet_fields: Vec::new(),
scroll_ttl_secs,
search_after,
priority: false,
},
has_doc_id_field,
))
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/src/search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ pub fn search_request_from_api_request(
sort_fields: search_request.sort_by.sort_fields,
scroll_ttl_secs: None,
search_after: None,
priority: false,
};
Ok(search_request)
}
Expand Down

0 comments on commit 8488327

Please sign in to comment.