Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing the second query of find_traces jaeger request to run without #4012

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 54 additions & 23 deletions quickwit/quickwit-jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::mem;
use std::{fmt, mem};
use std::fmt::Formatter;
use std::ops::{Bound, RangeInclusive};
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -79,6 +80,27 @@ pub struct JaegerService {
max_fetch_spans: u64,
}

#[derive(Copy, Clone, Debug)]
enum JaegerOperation {
FindTraces,
GetTrace,
}

impl JaegerOperation {
pub fn name(&self) -> &'static str {
match self {
JaegerOperation::GetTrace => "get_trace",
JaegerOperation::FindTraces => "find_traces",
}
}
}

impl fmt::Display for JaegerOperation {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}

impl JaegerService {
pub fn new(config: JaegerConfig, search_service: Arc<dyn SearchService>) -> Self {
Self {
Expand Down Expand Up @@ -187,7 +209,6 @@ impl JaegerService {
async fn find_traces_inner(
&self,
request: FindTracesRequest,
operation_name: &'static str,
request_start: Instant,
) -> JaegerResult<SpanStream> {
debug!(request=?request, "`find_traces` request");
Expand All @@ -200,16 +221,15 @@ impl JaegerService {
let end = span_timestamps_range.end() + self.max_trace_duration_secs;
let search_window = start..=end;
let response = self
.stream_spans(&trace_ids, search_window, operation_name, request_start)
.stream_spans(&trace_ids, search_window, JaegerOperation::FindTraces, request_start)
.await?;
Ok(response)
}

#[instrument("find_traces", skip_all)]
#[instrument("get_trace", skip_all)]
async fn get_trace_inner(
&self,
request: GetTraceRequest,
operation_name: &'static str,
request_start: Instant,
) -> JaegerResult<SpanStream> {
debug!(request=?request, "`get_trace` request");
Expand All @@ -220,12 +240,12 @@ impl JaegerService {
let start = end - self.lookback_period_secs;
let search_window = start..=end;
let response = self
.stream_spans(&[trace_id], search_window, operation_name, request_start)
.stream_spans(&[trace_id], search_window, JaegerOperation::GetTrace, request_start)
.await?;
Ok(response)
}

#[instrument("find_trace_ids", skip_all fields(service_name=%trace_query.service_name, operation_name=%trace_query.operation_name))]
#[instrument(skip_all, fields(service_name=%trace_query.service_name, operation_name=%trace_query.operation_name))]
async fn find_trace_ids(
&self,
trace_query: TraceQueryParameters,
Expand Down Expand Up @@ -274,12 +294,12 @@ impl JaegerService {
Ok(trace_ids)
}

#[instrument("stream_spans", skip_all, fields(num_traces=%trace_ids.len(), num_spans=Empty, num_bytes=Empty))]
#[instrument("stream_spans", skip_all, fields(num_traces=%trace_ids.len(), operation=%operation, num_spans=Empty, num_bytes=Empty))]
async fn stream_spans(
&self,
trace_ids: &[TraceId],
search_window: TimeIntervalSecs,
operation_name: &'static str,
operation: JaegerOperation,
request_start: Instant,
) -> Result<SpanStream, Status> {
if trace_ids.is_empty() {
Expand All @@ -302,19 +322,30 @@ impl JaegerService {
let query_ast =
serde_json::to_string(&query_ast).map_err(|err| Status::internal(err.to_string()))?;

let priority = match operation {
JaegerOperation::FindTraces => {
// This is the second of two requests. Let's run it with a higher priority
// to reduce the average latency for the overall query set.
true
}
JaegerOperation::GetTrace => {
false
}
};
let search_request = SearchRequest {
index_id_patterns: vec![OTEL_TRACES_INDEX_ID.to_string()],
query_ast,
start_timestamp: Some(*search_window.start()),
end_timestamp: Some(*search_window.end()),
max_hits: self.max_fetch_spans,
priority,
..Default::default()
};
let search_response = match self.search_service.root_search(search_request).await {
Ok(search_response) => search_response,
Err(search_error) => {
error!("Failed to fetch spans: {search_error:?}");
record_error(operation_name, request_start);
record_error(operation, request_start);
return Err(Status::internal("Failed to fetch spans."));
}
};
Expand All @@ -326,7 +357,7 @@ impl JaegerService {
spans.push(span);
}
Err(status) => {
record_error(operation_name, request_start);
record_error(operation, request_start);
return Err(status);
}
};
Expand Down Expand Up @@ -366,7 +397,7 @@ impl JaegerService {
debug!("Client disconnected: {send_error:?}");
return;
}
record_send(operation_name, num_spans, chunk_num_bytes);
record_send(operation, num_spans, chunk_num_bytes);
chunk_num_bytes = 0;
}
chunk_num_bytes += span_num_bytes;
Expand All @@ -381,20 +412,20 @@ impl JaegerService {
debug!("Client disconnected: {send_error:?}");
return;
}
record_send(operation_name, num_spans, chunk_num_bytes);
record_send(operation, num_spans, chunk_num_bytes);
}
current_span.record("num_spans", num_spans_total);
current_span.record("num_bytes", num_bytes_total);

JAEGER_SERVICE_METRICS
.fetched_traces_total
.with_label_values([operation_name, OTEL_TRACES_INDEX_ID])
.with_label_values([operation.name(), OTEL_TRACES_INDEX_ID])
.inc_by(num_traces);

let elapsed = request_start.elapsed().as_secs_f64();
JAEGER_SERVICE_METRICS
.request_duration_seconds
.with_label_values([operation_name, OTEL_TRACES_INDEX_ID, "false"])
.with_label_values([operation.name(), OTEL_TRACES_INDEX_ID, "false"])
.observe(elapsed);
});
Ok(ReceiverStream::new(rx))
Expand Down Expand Up @@ -423,27 +454,27 @@ macro_rules! metrics {
};
}

fn record_error(operation_name: &'static str, request_start: Instant) {
fn record_error(operation: JaegerOperation, request_start: Instant) {
JAEGER_SERVICE_METRICS
.request_errors_total
.with_label_values([operation_name, OTEL_TRACES_INDEX_ID])
.with_label_values([operation.name(), OTEL_TRACES_INDEX_ID])
.inc();

let elapsed = request_start.elapsed().as_secs_f64();
JAEGER_SERVICE_METRICS
.request_duration_seconds
.with_label_values([operation_name, OTEL_TRACES_INDEX_ID, "true"])
.with_label_values([operation.name(), OTEL_TRACES_INDEX_ID, "true"])
.observe(elapsed);
}

fn record_send(operation_name: &'static str, num_spans: usize, num_bytes: usize) {
fn record_send(operation: JaegerOperation, num_spans: usize, num_bytes: usize) {
JAEGER_SERVICE_METRICS
.fetched_spans_total
.with_label_values([operation_name, OTEL_TRACES_INDEX_ID])
.with_label_values([operation.name(), OTEL_TRACES_INDEX_ID])
.inc_by(num_spans as u64);
JAEGER_SERVICE_METRICS
.transferred_bytes_total
.with_label_values([operation_name, OTEL_TRACES_INDEX_ID])
.with_label_values([operation.name(), OTEL_TRACES_INDEX_ID])
.inc_by(num_bytes as u64);
}

Expand Down Expand Up @@ -487,7 +518,7 @@ impl SpanReaderPlugin for JaegerService {
&self,
request: Request<FindTracesRequest>,
) -> Result<Response<Self::FindTracesStream>, Status> {
self.find_traces_inner(request.into_inner(), "find_traces", Instant::now())
self.find_traces_inner(request.into_inner(), Instant::now())
.await
.map(Response::new)
}
Expand All @@ -496,7 +527,7 @@ impl SpanReaderPlugin for JaegerService {
&self,
request: Request<GetTraceRequest>,
) -> Result<Response<Self::GetTraceStream>, Status> {
self.get_trace_inner(request.into_inner(), "get_trace", Instant::now())
self.get_trace_inner(request.into_inner(), Instant::now())
.await
.map(Response::new)
}
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ message SearchRequest {
// enable pagination.
// If split_id is empty, no comparison with _shard_doc should be done
optional PartialHit search_after = 16;

// Pick the leaf search semaphore in priority.
// This flag should be used when running a multi-step query.
// In this case, it makes sense to reduce the average latency by acquiring the permit in priority for all steps
// apart from the first one. This is especially useful when working with traces, as the a single
bool priority = 17;
}

message SortField {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ pub struct SearchRequest {
/// If split_id is empty, no comparison with _shard_doc should be done
#[prost(message, optional, tag = "16")]
pub search_after: ::core::option::Option<PartialHit>,
/// Pick the leaf search semaphore in priority.
/// This flag should be used when running a multi-step query.
/// In this case, it makes sense to reduce the average latency by acquiring the permit in priority for all steps
/// apart from the first one. This is especially useful when working with traces, as the a single
#[prost(bool, tag = "17")]
pub priority: bool,
}
#[derive(Serialize, Deserialize, utoipa::ToSchema)]
#[derive(Eq, Hash)]
Expand Down
21 changes: 14 additions & 7 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use tantivy::fastfield::FastFieldReaders;
use tantivy::schema::{Field, FieldType};
use tantivy::tokenizer::TokenizerManager;
use tantivy::{Index, ReloadPolicy, Searcher, Term};
use tokio::sync::OwnedSemaphorePermit;
use tracing::*;

use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
Expand Down Expand Up @@ -565,11 +566,17 @@ pub async fn leaf_search(
let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len());

for split in splits {
let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore
.clone()
.acquire_owned()
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
let leaf_split_search_permit_opt =
if !request.priority {
Some(searcher_context.leaf_search_split_semaphore
.clone()
.acquire_owned()
.await
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."))
} else {
None
};


let mut request = (*request).clone();

Expand All @@ -591,7 +598,7 @@ pub async fn leaf_search(
split,
split_filter.clone(),
incremental_merge_collector.clone(),
leaf_split_search_permit,
leaf_split_search_permit_opt,
)
.in_current_span(),
));
Expand Down Expand Up @@ -634,7 +641,7 @@ async fn leaf_search_single_split_wrapper(
split: SplitIdAndFooterOffsets,
split_filter: Arc<Mutex<CanSplitDoBetter>>,
incremental_merge_collector: Arc<Mutex<IncrementalCollector>>,
leaf_split_search_permit: tokio::sync::OwnedSemaphorePermit,
leaf_split_search_permit: Option<OwnedSemaphorePermit>,
) {
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
let timer = crate::SEARCH_METRICS
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result<
// We remove the scroll ttl parameter. It is irrelevant to process later request
scroll_ttl_secs: None,
search_after: None,
priority: req.priority,
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ fn build_request_for_es_api(
snippet_fields: Vec::new(),
scroll_ttl_secs,
search_after,
priority: false,
},
has_doc_id_field,
))
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/src/search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ pub fn search_request_from_api_request(
sort_fields: search_request.sort_by.sort_fields,
scroll_ttl_secs: None,
search_after: None,
priority: false,
};
Ok(search_request)
}
Expand Down
Loading