diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index b79a0838f81..ed1b00fa202 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5951,11 +5951,13 @@ dependencies = [ "anyhow", "assert-json-diff 2.0.2", "async-trait", + "base64 0.21.5", "bytes", "bytesize", "elasticsearch-dsl", "futures", "futures-util", + "hex", "http-serde", "humantime", "hyper", @@ -5966,6 +5968,7 @@ dependencies = [ "once_cell", "opentelemetry", "percent-encoding", + "prost-types", "quickwit-actors", "quickwit-cluster", "quickwit-common", diff --git a/quickwit/quickwit-jaeger/src/lib.rs b/quickwit/quickwit-jaeger/src/lib.rs index 4cd82260018..0aa4b418833 100644 --- a/quickwit/quickwit-jaeger/src/lib.rs +++ b/quickwit/quickwit-jaeger/src/lib.rs @@ -72,6 +72,7 @@ type JaegerResult = Result; type SpanStream = ReceiverStream>; +#[derive(Clone)] pub struct JaegerService { search_service: Arc, lookback_period_secs: i64, diff --git a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs index 8be4544aaf1..3d709405b68 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs @@ -724,7 +724,7 @@ impl OtlpGrpcTracesService { .inc_by(num_bytes); let response = ExportTraceServiceResponse { - // `rejected_spans=0` and `error_message=""` is consided a "full" success. + // `rejected_spans=0` and `error_message=""` is considered a "full" success. partial_success: Some(ExportTracePartialSuccess { rejected_spans: num_parse_errors as i64, error_message, diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 4fe411e96de..6f34d2620a0 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -12,11 +12,13 @@ documentation = "https://quickwit.io/docs/" [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +base64 = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } elasticsearch-dsl = "0.4.15" futures = { workspace = true } futures-util = { workspace = true } +hex = { workspace = true } humantime = { workspace = true } http-serde = { workspace = true } hyper = { workspace = true } @@ -25,6 +27,7 @@ mime_guess = { workspace = true } num_cpus = { workspace = true } once_cell = { workspace = true } percent-encoding = { workspace = true } +prost-types = { workspace = true } regex = { workspace = true } rust-embed = { workspace = true } serde = { workspace = true } diff --git a/quickwit/quickwit-serve/resources/tests/jaeger_ui_trace.json b/quickwit/quickwit-serve/resources/tests/jaeger_ui_trace.json new file mode 100644 index 00000000000..487fd7e8a38 --- /dev/null +++ b/quickwit/quickwit-serve/resources/tests/jaeger_ui_trace.json @@ -0,0 +1,168 @@ +{ + "traceID": "0000000000000001", + "spans": [ + { + "traceID": "0000000000000001", + "spanID": "0000000000000001", + "operationName": "test-general-conversion", + "references": [], + "startTime": 1485467191639875, + "duration": 5, + "flags": 0, + "tags": [], + "logs": [ + { + "timestamp": 1485467191639875, + "fields": [ + { + "key": "event", + "type": "string", + "value": "some-event" + } + ] + }, + { + "timestamp": 1485467191639875, + "fields": [ + { + "key": "x", + "type": "string", + "value": "y" + } + ] + } + ], + "processID": "p1", + "warnings": [] + }, + { + "traceID": "0000000000000001", + "spanID": "0000000000000002", + "operationName": "some-operation", + "references": [], + "flags": 0, + "startTime": 1485467191639875, + "duration": 5, + "tags": [ + { + "key": "peer.service", + "type": "string", + "value": "service-y" + }, + { + "key": "peer.ipv4", + "type": "int64", + "value": 23456 + }, + { + "key": "error", + "type": "bool", + "value": true + }, + { + "key": "temperature", + "type": "float64", + "value": 72.5 + }, + { + "key": "javascript_limit", + "type": "int64", + "value": "9223372036854775222" + }, + { + "key": "blob", + "type": "binary", + "value": "AAAwOQ==" + } + ], + "logs": [], + "processID": "p1", + "warnings": [] + }, + { + "traceID": "0000000000000001", + "spanID": "0000000000000003", + "operationName": "some-operation", + "flags": 0, + "references": [ + { + "refType": "CHILD_OF", + "traceID": "0000000000000001", + "spanID": "0000000000000002" + } + ], + "startTime": 1485467191639875, + "duration": 5, + "tags": [], + "logs": [], + "processID": "p2", + "warnings": [] + }, + { + "traceID": "0000000000000001", + "spanID": "0000000000000004", + "operationName": "reference-test", + "flags": 0, + "references": [ + { + "refType": "CHILD_OF", + "traceID": "00000000000000ff", + "spanID": "00000000000000ff" + }, + { + "refType": "CHILD_OF", + "traceID": "0000000000000001", + "spanID": "0000000000000002" + }, + { + "refType": "FOLLOWS_FROM", + "traceID": "0000000000000001", + "spanID": "0000000000000002" + } + ], + "startTime": 1485467191639875, + "duration": 5, + "tags": [], + "logs": [], + "processID": "p2", + "warnings": [ + "some span warning" + ] + }, + { + "traceID": "0000000000000001", + "spanID": "0000000000000005", + "operationName": "preserveParentID-test", + "flags": 0, + "references": [ + { + "refType": "CHILD_OF", + "traceID": "0000000000000001", + "spanID": "0000000000000004" + } + ], + "startTime": 1485467191639875, + "duration": 4, + "tags": [], + "logs": [], + "processID": "p2", + "warnings": [ + "some span warning" + ] + } + ], + "processes": { + "p1": { + "serviceName": "service-x", + "key": "p1", + "tags": [] + }, + "p2": { + "serviceName": "service-y", + "key": "p2", + "tags": [] + } + }, + "warnings": [ + ] + } diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs index 3820c114144..e6a8f8715e9 100644 --- a/quickwit/quickwit-serve/src/grpc.rs +++ b/quickwit/quickwit-serve/src/grpc.rs @@ -23,7 +23,6 @@ use std::sync::Arc; use quickwit_common::tower::BoxFutureInfaillible; use quickwit_config::service::QuickwitService; -use quickwit_jaeger::JaegerService; use quickwit_opentelemetry::otlp::{OtlpGrpcLogsService, OtlpGrpcTracesService}; use quickwit_proto::indexing::IndexingServiceClient; use quickwit_proto::jaeger::storage::v1::span_reader_plugin_server::SpanReaderPluginServer; @@ -153,18 +152,11 @@ pub(crate) async fn start_grpc_server( } else { None }; - let enable_jaeger_endpoint = services.node_config.jaeger_config.enable_endpoint; - let jaeger_grpc_service = if enable_jaeger_endpoint - && services - .node_config - .is_service_enabled(QuickwitService::Searcher) - { + + // Mount gRPC jaeger service if present. + let jaeger_grpc_service = if let Some(jaeger_service) = services.jaeger_service_opt.clone() { enabled_grpc_services.insert("jaeger"); - let search_service = services.search_service.clone(); - Some(SpanReaderPluginServer::new(JaegerService::new( - services.node_config.jaeger_config.clone(), - search_service, - ))) + Some(SpanReaderPluginServer::new(jaeger_service)) } else { None }; diff --git a/quickwit/quickwit-serve/src/jaeger_api/mod.rs b/quickwit/quickwit-serve/src/jaeger_api/mod.rs new file mode 100644 index 00000000000..f02754f7c1d --- /dev/null +++ b/quickwit/quickwit-serve/src/jaeger_api/mod.rs @@ -0,0 +1,23 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +mod model; +mod parse_duration; +mod rest_handler; +pub(crate) use rest_handler::{jaeger_api_handlers, JaegerApi}; diff --git a/quickwit/quickwit-serve/src/jaeger_api/model.rs b/quickwit/quickwit-serve/src/jaeger_api/model.rs new file mode 100644 index 00000000000..ef12ee6826e --- /dev/null +++ b/quickwit/quickwit-serve/src/jaeger_api/model.rs @@ -0,0 +1,560 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashMap; + +use base64::prelude::{Engine, BASE64_STANDARD}; +use hyper::StatusCode; +use itertools::Itertools; +use prost_types::{Duration, Timestamp}; +use quickwit_proto::jaeger::api_v2::{KeyValue, Log, Process, Span, SpanRef, ValueType}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use serde_with::serde_as; + +pub(super) const DEFAULT_NUMBER_OF_TRACES: i32 = 20; + +pub(super) fn build_jaeger_traces(spans: Vec) -> anyhow::Result> { + let jaeger_traces: Vec = spans + .into_iter() + .group_by(|span| span.trace_id.clone()) + .into_iter() + .map(|(span_id, group)| JaegerTrace::new(span_id, group.collect())) + .collect(); + Ok(jaeger_traces) +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct JaegerResponseBody { + pub data: T, +} + +#[serde_with::skip_serializing_none] +#[derive(Clone, Default, Debug, Serialize, Deserialize, utoipa::IntoParams)] +#[serde(rename_all = "camelCase")] +#[serde(deny_unknown_fields)] +pub struct TracesSearchQueryParams { + #[serde(default)] + pub service: Option, + #[serde(default)] + pub operation: Option, + pub start: Option, + pub end: Option, + pub tags: Option, + pub min_duration: Option, + pub max_duration: Option, + pub lookback: Option, + pub limit: Option, +} + +// Jaeger Model for UI +// Source: https://github.com/jaegertracing/jaeger/blob/main/model/json/model.go#L82 + +#[derive(Clone, Default, Debug, PartialEq, Serialize, utoipa::IntoParams)] +#[serde(rename_all = "camelCase")] +pub struct JaegerTrace { + #[serde(rename = "traceID")] + #[serde(serialize_with = "serialize_bytes_to_hex")] + trace_id: Vec, + spans: Vec, + processes: HashMap, + warnings: Vec, +} + +impl JaegerTrace { + pub fn new(trace_id: Vec, mut spans: Vec) -> Self { + let processes = Self::build_process_map(&mut spans); + JaegerTrace { + trace_id, + spans, + processes, + warnings: Vec::new(), + } + } + + /// Processes a collection of spans, updating the `process_id` field based on the unique + /// `service_name` values. The function uses an accumulator (`acc`) to keep track of + /// processed `JaegerProcess` objects and assigns a new key to each unique `service_name` value. + /// The logic has been replicated from + /// https://github.com/jaegertracing/jaeger/blob/995231c42cadd70bce2bbbf02579e33f6e6329c8/model/converter/json/process_hashtable.go#L37 + /// TODO: use also tags to identify processes. + fn build_process_map(spans: &mut [JaegerSpan]) -> HashMap { + let mut service_name_to_process_id: HashMap = HashMap::new(); + let mut process_map: HashMap = HashMap::new(); + let mut process_counter: i32 = 0; + for span in spans.iter_mut() { + let Some(current_process) = span.process.as_mut() else { + continue; + }; + if let Some(process_id) = service_name_to_process_id.get(¤t_process.service_name) + { + span.process_id = Some(process_id.clone()); + } else { + process_counter += 1; + current_process.key = format!("p{}", process_counter); + span.process_id = Some(current_process.key.clone()); + process_map.insert(current_process.key.clone(), current_process.clone()); + service_name_to_process_id.insert( + current_process.service_name.clone(), + current_process.key.clone(), + ); + } + } + process_map + } +} + +#[serde_as] +#[derive(Debug, Clone, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct JaegerSpan { + #[serde(rename = "traceID")] + #[serde(serialize_with = "serialize_bytes_to_hex")] + pub trace_id: Vec, + #[serde(rename = "spanID")] + #[serde(serialize_with = "serialize_bytes_to_hex")] + span_id: Vec, + operation_name: String, + references: Vec, + #[serde(default)] + flags: u32, + start_time: i64, // start_time since Unix epoch + duration: i64, // microseconds + tags: Vec, + logs: Vec, + #[serde(default)] + #[serde(skip_serializing)] + process: Option, + #[serde(rename = "processID")] + pub process_id: Option, + pub warnings: Vec, +} + +impl TryFrom for JaegerSpan { + type Error = anyhow::Error; + fn try_from(span: Span) -> Result { + let references: Vec = + span.references.iter().map(JaegerSpanRef::from).collect(); + let tags: Vec = span.tags.iter().map(JaegerKeyValue::from).collect(); + let logs: Vec = span.logs.iter().map(JaegerLog::from).collect(); + Ok(Self { + trace_id: span.trace_id, + span_id: span.span_id, + operation_name: span.operation_name.clone(), + references, + flags: span.flags, + start_time: span + .start_time + .as_ref() + .map(convert_timestamp_to_microsecs) + .unwrap_or(0), + duration: span + .duration + .map(convert_duration_to_microsecs) + .unwrap_or(0), + tags, + logs, + process: span.process.map(JaegerProcess::from), + process_id: None, + warnings: span.warnings.iter().map(|s| s.to_string()).collect(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct JaegerSpanRef { + #[serde(rename = "traceID")] + #[serde(serialize_with = "serialize_bytes_to_hex")] + trace_id: Vec, + #[serde(rename = "spanID")] + #[serde(serialize_with = "serialize_bytes_to_hex")] + span_id: Vec, + ref_type: String, +} + +impl From<&SpanRef> for JaegerSpanRef { + fn from(sr: &SpanRef) -> Self { + Self { + trace_id: sr.trace_id.clone(), + span_id: sr.span_id.clone(), + ref_type: if sr.ref_type == 0 { + "CHILD_OF".to_string() + } else { + "FOLLOWS_FROM".to_string() + }, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct JaegerKeyValue { + key: String, + #[serde(rename = "type")] + value_type: String, + value: Value, +} + +impl From<&KeyValue> for JaegerKeyValue { + fn from(kv: &KeyValue) -> Self { + match kv.v_type { + // String = 0, + 0 => Self { + key: kv.key.to_string(), + value_type: ValueType::String.as_str_name().to_lowercase(), + value: json!(kv.v_str.to_string()), + }, + // Bool = 1, + 1 => Self { + key: kv.key.to_string(), + value_type: ValueType::Bool.as_str_name().to_lowercase(), + value: json!(kv.v_bool), + }, + // Int64 = 2, + 2 => { + if kv.v_int64 > 9007199254740991 { + Self { + key: kv.key.to_string(), + value_type: ValueType::Int64.as_str_name().to_lowercase(), + value: json!(kv.v_int64.to_string()), + } + } else { + Self { + key: kv.key.to_string(), + value_type: ValueType::Int64.as_str_name().to_lowercase(), + value: json!(kv.v_int64), + } + } + } + // Float64 = 3, + 3 => Self { + key: kv.key.to_string(), + value_type: ValueType::Float64.as_str_name().to_lowercase(), + value: json!(kv.v_float64), + }, + // Binary = 4, + 4 => Self { + key: kv.key.to_string(), + value_type: ValueType::Binary.as_str_name().to_lowercase(), + value: serde_json::Value::String(BASE64_STANDARD.encode(kv.v_binary.as_slice())), + }, + _ => Self { + key: "no_value".to_string(), + value_type: "unsupported_type".to_string(), + value: Default::default(), + }, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct JaegerLog { + timestamp: i64, // microseconds since Unix epoch + fields: Vec, +} + +impl From<&Log> for JaegerLog { + fn from(log: &Log) -> Self { + Self { + timestamp: log + .timestamp + .as_ref() + .map(convert_timestamp_to_microsecs) + .unwrap_or(0), + fields: log.fields.iter().map(JaegerKeyValue::from).collect(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct JaegerProcess { + service_name: String, + key: String, + tags: Vec, +} + +impl Default for JaegerProcess { + fn default() -> Self { + Self { + service_name: "none".to_string(), + key: "".to_string(), + tags: vec![], + } + } +} + +impl From for JaegerProcess { + fn from(process: Process) -> Self { + Self { + service_name: process.service_name.to_string(), + key: "".to_string(), + tags: process.tags.iter().map(JaegerKeyValue::from).collect(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JaegerError { + #[serde(with = "http_serde::status_code")] + pub status: StatusCode, + pub message: String, +} + +impl From for JaegerError { + fn from(error: anyhow::Error) -> Self { + Self { + status: StatusCode::INTERNAL_SERVER_ERROR, + message: error.to_string(), + } + } +} + +fn serialize_bytes_to_hex(bytes: &Vec, s: S) -> Result +where S: serde::Serializer { + s.serialize_str(&format!("{:0>16}", hex::encode(bytes))) +} + +fn convert_timestamp_to_microsecs(timestamp: &Timestamp) -> i64 { + timestamp.seconds * 1_000_000 + i64::from(timestamp.nanos / 1000) +} + +fn convert_duration_to_microsecs(duration: Duration) -> i64 { + duration.seconds * 1_000_000 + i64::from(duration.nanos / 1000) +} + +#[cfg(test)] +mod tests { + use quickwit_proto::jaeger::api_v2::Log; + + use crate::jaeger_api::model::{build_jaeger_traces, JaegerSpan}; + + #[test] + fn test_convert_grpc_jaeger_spans_into_jaeger_ui_model() { + let file_content = std::fs::read_to_string(get_jaeger_ui_trace_filepath()).unwrap(); + let expected_jaeger_trace: serde_json::Value = serde_json::from_str(&file_content).unwrap(); + let grpc_spans = create_grpc_spans(); + let jaeger_spans: Vec = grpc_spans + .iter() + .map(|span| super::JaegerSpan::try_from(span.clone()).unwrap()) + .collect(); + let traces = build_jaeger_traces(jaeger_spans).unwrap(); + let trace_json: serde_json::Value = serde_json::to_value(traces[0].clone()).unwrap(); + assert_json_diff::assert_json_eq!(expected_jaeger_trace, trace_json); + } + + fn get_jaeger_ui_trace_filepath() -> String { + format!( + "{}/resources/tests/jaeger_ui_trace.json", + env!("CARGO_MANIFEST_DIR"), + ) + } + + fn create_grpc_spans() -> Vec { + let span_0 = quickwit_proto::jaeger::api_v2::Span { + trace_id: vec![1], + span_id: vec![1], + operation_name: "test-general-conversion".to_string(), + start_time: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + duration: Some(prost_types::Duration { + seconds: 0, + nanos: 5000, + }), + process: Some(quickwit_proto::jaeger::api_v2::Process { + service_name: "service-x".to_string(), + tags: vec![], + }), + logs: vec![ + Log { + timestamp: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + fields: vec![quickwit_proto::jaeger::api_v2::KeyValue { + key: "event".to_string(), + v_type: 0, + v_str: "some-event".to_string(), + ..Default::default() + }], + }, + Log { + timestamp: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + fields: vec![quickwit_proto::jaeger::api_v2::KeyValue { + key: "x".to_string(), + v_type: 0, + v_str: "y".to_string(), + ..Default::default() + }], + }, + ], + ..Default::default() + }; + let span_1 = quickwit_proto::jaeger::api_v2::Span { + operation_name: "some-operation".to_string(), + trace_id: vec![1], + span_id: vec![2], + start_time: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + duration: Some(prost_types::Duration { + seconds: 0, + nanos: 5000, + }), + process: Some(quickwit_proto::jaeger::api_v2::Process { + service_name: "service-x".to_string(), + tags: vec![], + }), + process_id: "".to_string(), + tags: vec![ + quickwit_proto::jaeger::api_v2::KeyValue { + key: "peer.service".to_string(), + v_type: 0, + v_str: "service-y".to_string(), + ..Default::default() + }, + quickwit_proto::jaeger::api_v2::KeyValue { + key: "peer.ipv4".to_string(), + v_type: 2, + v_int64: 23456, + ..Default::default() + }, + quickwit_proto::jaeger::api_v2::KeyValue { + key: "error".to_string(), + v_type: 1, + v_bool: true, + ..Default::default() + }, + quickwit_proto::jaeger::api_v2::KeyValue { + key: "temperature".to_string(), + v_type: 3, + v_float64: 72.5, + ..Default::default() + }, + quickwit_proto::jaeger::api_v2::KeyValue { + key: "javascript_limit".to_string(), + v_type: 2, + v_int64: 9223372036854775222, + ..Default::default() + }, + quickwit_proto::jaeger::api_v2::KeyValue { + key: "blob".to_string(), + v_type: 4, + v_binary: vec![0b0, 0b0, 0b00110000, 0b00111001], + ..Default::default() + }, + ], + ..Default::default() + }; + let span_2 = quickwit_proto::jaeger::api_v2::Span { + operation_name: "some-operation".to_string(), + trace_id: vec![1], + span_id: vec![3], + references: vec![quickwit_proto::jaeger::api_v2::SpanRef { + trace_id: vec![1], + span_id: vec![2], + ref_type: 0, + }], + start_time: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + duration: Some(prost_types::Duration { + seconds: 0, + nanos: 5000, + }), + process: Some(quickwit_proto::jaeger::api_v2::Process { + service_name: "service-y".to_string(), + tags: vec![], + }), + process_id: "".to_string(), + ..Default::default() + }; + let span_3 = quickwit_proto::jaeger::api_v2::Span { + operation_name: "reference-test".to_string(), + trace_id: vec![1], + span_id: vec![4], + references: vec![ + quickwit_proto::jaeger::api_v2::SpanRef { + trace_id: vec![255], + span_id: vec![255], + ref_type: 0, + }, + quickwit_proto::jaeger::api_v2::SpanRef { + trace_id: vec![1], + span_id: vec![2], + ref_type: 0, + }, + quickwit_proto::jaeger::api_v2::SpanRef { + trace_id: vec![1], + span_id: vec![2], + ref_type: 1, + }, + ], + start_time: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + duration: Some(prost_types::Duration { + seconds: 0, + nanos: 5000, + }), + process: Some(quickwit_proto::jaeger::api_v2::Process { + service_name: "service-y".to_string(), + tags: vec![], + }), + process_id: "".to_string(), + warnings: vec!["some span warning".to_string()], + ..Default::default() + }; + let span_4 = quickwit_proto::jaeger::api_v2::Span { + operation_name: "preserveParentID-test".to_string(), + trace_id: vec![1], + span_id: vec![5], + references: vec![quickwit_proto::jaeger::api_v2::SpanRef { + trace_id: vec![1], + span_id: vec![4], + ref_type: 0, + }], + start_time: Some(prost_types::Timestamp { + seconds: 1485467191, + nanos: 639875000, + }), + duration: Some(prost_types::Duration { + seconds: 0, + nanos: 4000, + }), + process: Some(quickwit_proto::jaeger::api_v2::Process { + service_name: "service-y".to_string(), + tags: vec![], + }), + process_id: "".to_string(), + warnings: vec!["some span warning".to_string()], + ..Default::default() + }; + vec![span_0, span_1, span_2, span_3, span_4] + } +} diff --git a/quickwit/quickwit-serve/src/jaeger_api/parse_duration.rs b/quickwit/quickwit-serve/src/jaeger_api/parse_duration.rs new file mode 100644 index 00000000000..4a07c3ef2b0 --- /dev/null +++ b/quickwit/quickwit-serve/src/jaeger_api/parse_duration.rs @@ -0,0 +1,98 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use prost_types::{Duration as ProstDuration, Timestamp as ProstTimestamp}; + +pub(crate) fn parse_duration_with_units(duration_string: String) -> anyhow::Result { + parse_duration_nanos(&duration_string) + .map(to_well_known_timestamp) + .map(|timestamp| ProstDuration { + seconds: timestamp.seconds, + nanos: timestamp.nanos, + }) + .map_err(|error| anyhow::anyhow!("Failed to parse duration: {:?}", error)) +} + +pub(crate) fn to_well_known_timestamp(timestamp_nanos: i64) -> ProstTimestamp { + let seconds = timestamp_nanos / 1_000_000; + let nanos = (timestamp_nanos % 1_000_000) as i32; + ProstTimestamp { seconds, nanos } +} + +/// Parses a duration string and return duration in nanoseconds. +/// A duration string is a possibly signed sequence of decimal numbers, each +/// with optional fraction and a unit suffix, such as "300ms", "-1.5h". +/// +/// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". +fn parse_duration_nanos(input: &str) -> anyhow::Result { + let mut num_str = String::new(); + for ch in input.trim().chars() { + if ch.is_ascii_digit() || ch == '.' || ch == '-' { + num_str.push(ch); + continue; + } + if ch.is_alphabetic() { + let unit = &input[num_str.len()..]; + let num: f64 = num_str.parse()?; + let duration: f64 = match unit { + "ns" => num, + "us" | "µs" => num * 1000.0, + "ms" => num * 1_000_000.0, + "s" => num * 1_000_000_000.0, + "m" => num * 60.0 * 1_000_000_000.0, + "h" => num * 3600.0 * 1_000_000_000.0, + _ => anyhow::bail!("Invalid time unit: {}", unit), + }; + if num < std::i64::MIN as f64 || num > std::i64::MAX as f64 { + anyhow::bail!("Invalid duration: {}", num_str) + } + return Ok(duration.round() as i64); + } else { + anyhow::bail!("Invalid duration string") + } + } + anyhow::bail!("Invalid duration string") +} + +#[cfg(test)] +mod tests { + use crate::jaeger_api::parse_duration::parse_duration_nanos; + + #[test] + fn test_parse_duration_nanos() { + // Test valid duration strings + assert_eq!(parse_duration_nanos("300ns").unwrap(), 300); + assert_eq!(parse_duration_nanos("1us").unwrap(), 1000); + assert_eq!(parse_duration_nanos("2.5ms").unwrap(), 2500000); + assert_eq!(parse_duration_nanos("3s").unwrap(), 3000000000); + assert_eq!(parse_duration_nanos("4m").unwrap(), 240000000000); + assert_eq!(parse_duration_nanos("5h").unwrap(), 18000000000000); + assert_eq!(parse_duration_nanos("-100ns").unwrap(), -100); + assert_eq!(parse_duration_nanos("-2us").unwrap(), -2000); + assert_eq!(parse_duration_nanos("-3.5ms").unwrap(), -3500000); + assert_eq!(parse_duration_nanos("-4s").unwrap(), -4000000000); + assert_eq!(parse_duration_nanos("-5m").unwrap(), -300000000000); + assert_eq!(parse_duration_nanos("-6h").unwrap(), -21600000000000); + + // Test invalid duration strings + assert!(parse_duration_nanos("abc").is_err()); + assert!(parse_duration_nanos("1.2.3s").is_err()); + assert!(parse_duration_nanos("1-.23s").is_err()); + } +} diff --git a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs new file mode 100644 index 00000000000..3c130be84ce --- /dev/null +++ b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs @@ -0,0 +1,481 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashMap; + +use hyper::StatusCode; +use itertools::Itertools; +use quickwit_jaeger::JaegerService; +use quickwit_proto::jaeger::storage::v1::span_reader_plugin_server::SpanReaderPlugin; +use quickwit_proto::jaeger::storage::v1::{ + FindTracesRequest, GetOperationsRequest, GetServicesRequest, GetTraceRequest, + SpansResponseChunk, TraceQueryParameters, +}; +use quickwit_proto::tonic; +use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::StreamExt; +use tracing::error; +use warp::{Filter, Rejection}; + +use super::model::build_jaeger_traces; +use super::parse_duration::{parse_duration_with_units, to_well_known_timestamp}; +use crate::jaeger_api::model::{ + JaegerError, JaegerResponseBody, JaegerSpan, JaegerTrace, TracesSearchQueryParams, + DEFAULT_NUMBER_OF_TRACES, +}; +use crate::json_api_response::JsonApiResponse; +use crate::{require, BodyFormat}; + +#[derive(utoipa::OpenApi)] +#[openapi(paths( + jaeger_services_handler, + jaeger_service_operations_handler, + jaeger_traces_search_handler, + jaeger_traces_handler +))] +pub(crate) struct JaegerApi; + +/// Setup Jaeger API handlers +/// +/// This is where all Jaeger handlers +/// should be registered. +/// Request are executed on the `otel traces v0_6` index. +pub(crate) fn jaeger_api_handlers( + jaeger_service_opt: Option, +) -> impl Filter + Clone { + let jaeger_api_root_url = warp::path!("otel-traces-v0_6" / "jaeger" / "api" / ..); + jaeger_api_root_url.and( + jaeger_services_handler(jaeger_service_opt.clone()) + .or(jaeger_service_operations_handler( + jaeger_service_opt.clone(), + )) + .or(jaeger_traces_search_handler(jaeger_service_opt.clone())) + .or(jaeger_traces_handler(jaeger_service_opt.clone())), + ) +} + +#[utoipa::path( + get, + tag = "Jaeger", + path = "/otel-traces-v0_6/jaeger/api/services", + responses( + (status = 200, description = "Successfully fetched services names.", body = JaegerResponseBody ) + ) +)] +pub fn jaeger_services_handler( + jaeger_service_opt: Option, +) -> impl Filter + Clone { + warp::path!("services") + .and(warp::get()) + .and(require(jaeger_service_opt)) + .then(jaeger_services) + .map(|result| make_jaeger_api_response(result, BodyFormat::default())) +} + +#[utoipa::path( + get, + tag = "Jaeger", + path = "/otel-traces-v0_6/jaeger/api/services/{service}/operations", + responses( + (status = 200, description = "Successfully fetched operations names the given service.", body = JaegerResponseBody ) + ) +)] +pub fn jaeger_service_operations_handler( + jaeger_service_opt: Option, +) -> impl Filter + Clone { + warp::path!("services" / String / "operations") + .and(warp::get()) + .and(require(jaeger_service_opt)) + .then(jaeger_service_operations) + .map(|result| make_jaeger_api_response(result, BodyFormat::default())) +} + +#[utoipa::path( + get, + tag = "Jaeger", + path = "/otel-traces-v0_6/jaeger/api/traces?service={service}&start={start_in_ns}&end={end_in_ns}&lookback=custom", + responses( + (status = 200, description = "Successfully fetched traces information.", body = JaegerResponseBody ) + ), + params( + TracesSearchQueryParams, + ("service" = Option, Query, description = "The service name."), + ("operation" = Option, Query, description = "The operation name."), + ("start" = Option, Query, description = "The start time in nanoseconds."), + ("end" = Option, Query, description = "The end time in nanoseconds."), + ("tags" = Option, Query, description = "Sets tags with values in the logfmt format, such as error=true status=200."), + ("min_duration" = Option, Query, description = "Filters all traces with a duration higher than the set value. Possible values are 1.2s, 100ms, 500us."), + ("max_duration" = Option, Query, description = "Filters all traces with a duration lower than the set value. Possible values are 1.2s, 100ms, 500us."), + ("limit" = Option, Query, description = "Limits the number of traces returned."), + ) +)] +pub fn jaeger_traces_search_handler( + jaeger_service_opt: Option, +) -> impl Filter + Clone { + warp::path!("traces") + .and(warp::get()) + .and(serde_qs::warp::query(serde_qs::Config::default())) + .and(require(jaeger_service_opt)) + .then(jaeger_traces_search) + .map(|result| make_jaeger_api_response(result, BodyFormat::default())) +} + +#[utoipa::path( + get, + tag = "Jaeger", + path = "/otel-traces-v0_6/jaeger/api/traces/{id}", + responses( + (status = 200, description = "Successfully fetched traces spans for the provided trace ID.", body = JaegerResponseBody ) + ) +)] +pub fn jaeger_traces_handler( + jaeger_service_opt: Option, +) -> impl Filter + Clone { + warp::path!("traces" / String) + .and(warp::get()) + .and(require(jaeger_service_opt)) + .then(jaeger_get_trace_by_id) + .map(|result| make_jaeger_api_response(result, BodyFormat::default())) +} + +async fn jaeger_services( + jaeger_service: JaegerService, +) -> Result>, JaegerError> { + let get_services_response = jaeger_service + .get_services(tonic::Request::new(GetServicesRequest {})) + .await + .unwrap() + .into_inner(); + Ok(JaegerResponseBody::> { + data: get_services_response.services, + }) +} + +async fn jaeger_service_operations( + service_name: String, + jaeger_service: JaegerService, +) -> Result>, JaegerError> { + let get_operations_request = GetOperationsRequest { + service: service_name, + span_kind: "".to_string(), + }; + let get_operations_response = jaeger_service + .get_operations(tonic::Request::new(get_operations_request)) + .await + .unwrap() + .into_inner(); + + let operations = get_operations_response + .operations + .into_iter() + .map(|op| op.name) + .collect_vec(); + Ok(JaegerResponseBody::> { data: operations }) +} + +async fn jaeger_traces_search( + search_params: TracesSearchQueryParams, + jaeger_service: JaegerService, +) -> Result>, JaegerError> { + let duration_min = search_params + .min_duration + .map(parse_duration_with_units) + .transpose()?; + let duration_max = search_params + .max_duration + .map(parse_duration_with_units) + .transpose()?; + let tags = search_params + .tags + .clone() + .map(|s| { + serde_json::from_str::>(&s).map_err(|error| { + let error_msg = format!( + "failed to deserialize tags `{:?}`: {:?}", + search_params.tags, error + ); + error!(error_msg); + JaegerError { + status: StatusCode::INTERNAL_SERVER_ERROR, + message: error_msg, + } + }) + }) + .transpose()? + .unwrap_or(Default::default()); + let query = TraceQueryParameters { + service_name: search_params.service.unwrap_or_default(), + operation_name: search_params.operation.unwrap_or_default(), + tags, + start_time_min: search_params.start.map(to_well_known_timestamp), + start_time_max: search_params.end.map(to_well_known_timestamp), + duration_min, + duration_max, + num_traces: search_params.limit.unwrap_or(DEFAULT_NUMBER_OF_TRACES), + }; + let find_traces_request = FindTracesRequest { query: Some(query) }; + let spans_chunk_stream = jaeger_service + .find_traces(tonic::Request::new(find_traces_request)) + .await + .map_err(|error| { + error!(error = ?error, "failed to fetch traces"); + JaegerError { + status: StatusCode::INTERNAL_SERVER_ERROR, + message: "failed to fetch traces".to_string(), + } + })? + .into_inner(); + let jaeger_spans = collect_and_build_jaeger_spans(spans_chunk_stream).await?; + let jaeger_traces: Vec = build_jaeger_traces(jaeger_spans)?; + Ok(JaegerResponseBody { + data: jaeger_traces, + }) +} + +async fn collect_and_build_jaeger_spans( + mut spans_chunk_stream: ReceiverStream>, +) -> anyhow::Result> { + let mut all_spans = Vec::::new(); + while let Some(Ok(SpansResponseChunk { spans })) = spans_chunk_stream.next().await { + let jaeger_spans: Vec = + spans.into_iter().map(JaegerSpan::try_from).try_collect()?; + all_spans.extend(jaeger_spans); + } + Ok(all_spans) +} + +async fn jaeger_get_trace_by_id( + trace_id_string: String, + jaeger_service: JaegerService, +) -> Result>, JaegerError> { + let trace_id = hex::decode(trace_id_string.clone()).map_err(|error| { + error!(error = ?error, "failed to decode trace `{}`", trace_id_string.clone()); + JaegerError { + status: StatusCode::INTERNAL_SERVER_ERROR, + message: "failed to decode trace id".to_string(), + } + })?; + let get_trace_request = GetTraceRequest { trace_id }; + let spans_chunk_stream = jaeger_service + .get_trace(tonic::Request::new(get_trace_request)) + .await + .map_err(|error| { + error!(error = ?error, "failed to fetch trace `{}`", trace_id_string.clone()); + JaegerError { + status: StatusCode::INTERNAL_SERVER_ERROR, + message: "failed to fetch trace".to_string(), + } + })? + .into_inner(); + let jaeger_spans = collect_and_build_jaeger_spans(spans_chunk_stream).await?; + let jaeger_traces: Vec = build_jaeger_traces(jaeger_spans)?; + Ok(JaegerResponseBody { + data: jaeger_traces, + }) +} + +fn make_jaeger_api_response( + jaeger_result: Result, + format: BodyFormat, +) -> JsonApiResponse { + let status_code = match &jaeger_result { + Ok(_) => StatusCode::OK, + Err(err) => err.status, + }; + JsonApiResponse::new(&jaeger_result, status_code, &format) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use quickwit_config::JaegerConfig; + use quickwit_opentelemetry::otlp::OTEL_TRACES_INDEX_ID; + use quickwit_search::MockSearchService; + use serde_json::Value as JsonValue; + + use super::*; + use crate::recover_fn; + + #[tokio::test] + async fn test_when_jaeger_not_found() { + let jaeger_api_handler = jaeger_api_handlers(None).recover(recover_fn); + let resp = warp::test::request() + .path("/otel-traces-v0_6/jaeger/api/services") + .reply(&jaeger_api_handler) + .await; + let error_body = serde_json::from_slice::>(resp.body()).unwrap(); + assert_eq!(resp.status(), 404); + assert!(error_body.contains_key("message")); + assert_eq!(error_body.get("message").unwrap(), "Route not found"); + } + + #[tokio::test] + async fn test_jaeger_services() -> anyhow::Result<()> { + let mut mock_search_service = MockSearchService::new(); + mock_search_service + .expect_root_list_terms() + .withf(|req| { + req.index_id == OTEL_TRACES_INDEX_ID + && req.field == "service_name" + && req.start_timestamp.is_some() + }) + .return_once(|_| { + Ok(quickwit_proto::search::ListTermsResponse { + num_hits: 0, + terms: Vec::new(), + elapsed_time_micros: 0, + errors: Vec::new(), + }) + }); + let mock_search_service = Arc::new(mock_search_service); + let jaeger = JaegerService::new(JaegerConfig::default(), mock_search_service); + + let jaeger_api_handler = jaeger_api_handlers(Some(jaeger)).recover(recover_fn); + let resp = warp::test::request() + .path("/otel-traces-v0_6/jaeger/api/services") + .reply(&jaeger_api_handler) + .await; + assert_eq!(resp.status(), 200); + let actual_response_json: JsonValue = serde_json::from_slice(resp.body())?; + assert!(actual_response_json + .get("data") + .unwrap() + .as_array() + .unwrap() + .is_empty()); + Ok(()) + } + + #[tokio::test] + async fn test_jaeger_service_operations() { + let mut mock_search_service = MockSearchService::new(); + mock_search_service + .expect_root_list_terms() + .withf(|req| { + req.index_id == OTEL_TRACES_INDEX_ID + && req.field == "span_fingerprint" + && req.start_timestamp.is_some() + }) + .return_once(|_| { + Ok(quickwit_proto::search::ListTermsResponse { + num_hits: 1, + terms: Vec::new(), + elapsed_time_micros: 0, + errors: Vec::new(), + }) + }); + let mock_search_service = Arc::new(mock_search_service); + let jaeger = JaegerService::new(JaegerConfig::default(), mock_search_service); + let jaeger_api_handler = jaeger_api_handlers(Some(jaeger)).recover(recover_fn); + let resp = warp::test::request() + .path("/otel-traces-v0_6/jaeger/api/services/service1/operations") + .reply(&jaeger_api_handler) + .await; + assert_eq!(resp.status(), 200); + let actual_response_json: JsonValue = serde_json::from_slice(resp.body()).unwrap(); + assert!(actual_response_json + .get("data") + .unwrap() + .as_array() + .unwrap() + .is_empty()); + } + + #[tokio::test] + async fn test_jaeger_traces_search() { + let mut mock_search_service = MockSearchService::new(); + mock_search_service + .expect_root_search() + .withf(|req| { + assert!(req.query_ast.contains( + "{\"type\":\"term\",\"field\":\"resource_attributes.tag.first\",\"value\":\"\ + common\"}" + )); + assert!(req.query_ast.contains( + "{\"type\":\"term\",\"field\":\"resource_attributes.tag.second\",\"value\":\"\ + true\"}" + )); + assert_eq!( + req.index_id_patterns, + vec![OTEL_TRACES_INDEX_ID.to_string()] + ); + true + }) + .return_once(|_| { + Ok(quickwit_proto::search::SearchResponse { + num_hits: 0, + hits: vec![], + elapsed_time_micros: 0, + errors: vec![], + aggregation: None, + scroll_id: None, + }) + }); + let mock_search_service = Arc::new(mock_search_service); + let jaeger = JaegerService::new(JaegerConfig::default(), mock_search_service); + let jaeger_api_handler = jaeger_api_handlers(Some(jaeger)).recover(recover_fn); + let resp = warp::test::request() + .path( + "/otel-traces-v0_6/jaeger/api/traces?service=quickwit&\ + operation=delete_splits_marked_for_deletion&minDuration=500us&maxDuration=1.2s&\ + tags=%7B%22tag.first%22%3A%22common%22%2C%22tag.second%22%3A%22true%22%7D&\ + limit=1&start=1702352106016000&end=1702373706016000&lookback=custom", + ) + .reply(&jaeger_api_handler) + .await; + assert_eq!(resp.status(), 200); + } + + #[tokio::test] + async fn test_jaeger_trace_by_id() { + let mut mock_search_service = MockSearchService::new(); + mock_search_service + .expect_root_search() + .withf(|req| req.index_id_patterns == vec![OTEL_TRACES_INDEX_ID.to_string()]) + .return_once(|_| { + Ok(quickwit_proto::search::SearchResponse { + num_hits: 0, + hits: vec![], + elapsed_time_micros: 0, + errors: vec![], + aggregation: None, + scroll_id: None, + }) + }); + let mock_search_service = Arc::new(mock_search_service); + let jaeger = JaegerService::new(JaegerConfig::default(), mock_search_service); + + let jaeger_api_handler = jaeger_api_handlers(Some(jaeger)).recover(recover_fn); + let resp = warp::test::request() + .path("/otel-traces-v0_6/jaeger/api/traces/1506026ddd216249555653218dc88a6c") + .reply(&jaeger_api_handler) + .await; + + assert_eq!(resp.status(), 200); + let actual_response_json: JsonValue = serde_json::from_slice(resp.body()).unwrap(); + assert!(actual_response_json + .get("data") + .unwrap() + .as_array() + .unwrap() + .is_empty()); + } +} diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 6728aac4fdb..c753b65de17 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -27,6 +27,7 @@ mod health_check_api; mod index_api; mod indexing_api; mod ingest_api; +mod jaeger_api; mod json_api_response; mod metrics; mod metrics_api; @@ -76,6 +77,7 @@ use quickwit_ingest::{ GetMemoryCapacity, IngestApiService, IngestRequest, IngestRouter, IngestServiceClient, Ingester, IngesterPool, LocalShardsUpdate, }; +use quickwit_jaeger::JaegerService; use quickwit_janitor::{start_janitor_service, JanitorService}; use quickwit_metastore::{ ControlPlaneMetastore, ListIndexesMetadataResponseExt, MetastoreResolver, @@ -130,6 +132,7 @@ struct QuickwitServices { pub ingest_router_service: IngestRouterServiceClient, pub ingester_service_opt: Option, pub janitor_service_opt: Option>, + pub jaeger_service_opt: Option, /// We do have a search service even on nodes that are not running `search`. /// It is only used to serve the rest API calls and will only execute /// the root requests. @@ -483,6 +486,18 @@ pub async fn serve_quickwit( None }; + let jaeger_service_opt = if node_config.jaeger_config.enable_endpoint + && node_config.is_service_enabled(QuickwitService::Searcher) + { + let search_service = search_service.clone(); + Some(JaegerService::new( + node_config.jaeger_config.clone(), + search_service, + )) + } else { + None + }; + let grpc_listen_addr = node_config.grpc_listen_addr; let rest_listen_addr = node_config.rest_config.listen_addr; let quickwit_services: Arc = Arc::new(QuickwitServices { @@ -499,6 +514,7 @@ pub async fn serve_quickwit( ingest_service, ingester_service_opt: ingester_service_opt.clone(), janitor_service_opt, + jaeger_service_opt, search_service, }); // Setup and start gRPC server. diff --git a/quickwit/quickwit-serve/src/openapi.rs b/quickwit/quickwit-serve/src/openapi.rs index 944cb3dd60e..442bce1583a 100644 --- a/quickwit/quickwit-serve/src/openapi.rs +++ b/quickwit/quickwit-serve/src/openapi.rs @@ -34,6 +34,7 @@ use crate::health_check_api::HealthCheckApi; use crate::index_api::IndexApi; use crate::indexing_api::IndexingApi; use crate::ingest_api::{IngestApi, IngestApiSchemas}; +use crate::jaeger_api::JaegerApi; use crate::metrics_api::MetricsApi; use crate::node_info_handler::NodeInfoApi; use crate::search_api::SearchApi; @@ -74,6 +75,7 @@ pub fn build_docs() -> utoipa::openapi::OpenApi { Tag::new("Node Info"), Tag::new("Indexing"), Tag::new("Splits"), + Tag::new("Jaeger"), ]; docs_base.tags = Some(tags); @@ -89,6 +91,7 @@ pub fn build_docs() -> utoipa::openapi::OpenApi { docs_base .merge_components_and_paths(ElasticCompatibleApi::openapi().with_path_prefix("/api/v1")); docs_base.merge_components_and_paths(NodeInfoApi::openapi().with_path_prefix("/api/v1")); + docs_base.merge_components_and_paths(JaegerApi::openapi().with_path_prefix("/api/v1")); // Schemas docs_base.merge_components_and_paths(MetastoreApiSchemas::openapi()); diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 22bec50b943..2a6f5ffbc6f 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -39,6 +39,7 @@ use crate::health_check_api::health_check_handlers; use crate::index_api::index_management_handlers; use crate::indexing_api::indexing_get_handler; use crate::ingest_api::ingest_api_handlers; +use crate::jaeger_api::jaeger_api_handlers; use crate::json_api_response::{ApiError, JsonApiResponse}; use crate::metrics_api::metrics_handler; use crate::node_info_handler::node_info_handler; @@ -193,6 +194,9 @@ fn api_v1_routes( .or(delete_task_api_handlers( quickwit_services.metastore_client.clone(), )) + .or(jaeger_api_handlers( + quickwit_services.jaeger_service_opt.clone(), + )) .or(elastic_api_handlers( quickwit_services.node_config.clone(), quickwit_services.search_service.clone(), @@ -604,6 +608,7 @@ mod tests { metastore_server_opt: None, node_config: Arc::new(node_config.clone()), search_service: Arc::new(MockSearchService::new()), + jaeger_service_opt: None, }; let handler = api_v1_routes(Arc::new(quickwit_services))