From d2f7de9fdfdfe124845b199d556c9ec6aa476cb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Tue, 2 Jan 2024 18:40:19 +0100 Subject: [PATCH] Add default otlp endpoints. --- .../src/otlp_api/rest_handler.rs | 149 ++++++++++++++---- 1 file changed, 115 insertions(+), 34 deletions(-) diff --git a/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs b/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs index 6efe619346f..0e6d29747bd 100644 --- a/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/otlp_api/rest_handler.rs @@ -18,7 +18,7 @@ // along with this program. If not, see . use bytes::Bytes; -use quickwit_opentelemetry::otlp::{OtlpGrpcLogsService, OtlpGrpcTracesService}; +use quickwit_opentelemetry::otlp::{OtlpGrpcLogsService, OtlpGrpcTracesService, OTEL_LOGS_INDEX_ID, OTEL_TRACES_INDEX_ID}; use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_server::LogsService; use quickwit_proto::opentelemetry::proto::collector::logs::v1::{ ExportLogsServiceRequest, ExportLogsServiceResponse, @@ -44,14 +44,36 @@ pub(crate) fn otlp_ingest_api_handlers( otlp_logs_service: Option, otlp_traces_service: Option, ) -> impl Filter + Clone { - oltp_ingest_logs_handler(otlp_logs_service).or(oltp_ingest_traces_handler(otlp_traces_service)) + otlp_default_logs_handler(otlp_logs_service.clone()) + .or(otlp_default_traces_handler(otlp_traces_service.clone())) + .or(otlp_logs_handler(otlp_logs_service)) + .or(otlp_ingest_traces_handler(otlp_traces_service)) } -pub(crate) fn oltp_ingest_logs_handler( +pub(crate) fn otlp_default_logs_handler( + otlp_logs_service: Option, +) -> impl Filter + Clone { + require(otlp_logs_service) + .and(warp::path!("otlp" / "v1" / "logs")) + .and(warp::header::exact_ignore_case( + "content-type", + "application/x-protobuf", + )) + .and(warp::post()) + .and(warp::body::bytes()) + .then(|otlp_logs_service, body| async move { + otlp_ingest_logs(otlp_logs_service, OTEL_LOGS_INDEX_ID.to_string(), body) + .await + }) + .and(with_arg(BodyFormat::default())) + .map(make_json_api_response) +} + +pub(crate) fn otlp_logs_handler( otlp_log_service: Option, ) -> impl Filter + Clone { require(otlp_log_service) - .and(warp::path!(String / "ingest" / "otlp" / "v1" / "logs")) + .and(warp::path!(String / "otlp" / "v1" / "logs")) .and(warp::header::exact_ignore_case( "content-type", "application/x-protobuf", @@ -63,11 +85,30 @@ pub(crate) fn oltp_ingest_logs_handler( .map(make_json_api_response) } -pub(crate) fn oltp_ingest_traces_handler( +pub(crate) fn otlp_default_traces_handler( otlp_traces_service: Option, ) -> impl Filter + Clone { require(otlp_traces_service) - .and(warp::path!(String / "ingest" / "otlp" / "v1" / "traces")) + .and(warp::path!("otlp" / "v1" / "traces")) + .and(warp::header::exact_ignore_case( + "content-type", + "application/x-protobuf", + )) + .and(warp::post()) + .and(warp::body::bytes()) + .then(|otlp_traces_service, body| async move { + otlp_ingest_traces(otlp_traces_service, OTEL_TRACES_INDEX_ID.to_string(), body) + .await + }) + .and(with_arg(BodyFormat::default())) + .map(make_json_api_response) +} + +pub(crate) fn otlp_ingest_traces_handler( + otlp_traces_service: Option, +) -> impl Filter + Clone { + require(otlp_traces_service) + .and(warp::path!(String / "otlp" / "v1" / "traces")) .and(warp::header::exact_ignore_case( "content-type", "application/x-protobuf", @@ -192,25 +233,48 @@ mod tests { let body = export_logs_request.encode_to_vec(); let otlp_traces_api_handler = otlp_ingest_api_handlers(Some(logs_service), Some(traces_service)).recover(recover_fn); - let resp = warp::test::request() - .path("/otel-traces-v0_6/ingest/otlp/v1/logs") + { + // Test default otlp endpoint + let resp = warp::test::request() + .path("/otlp/v1/logs") .method("POST") .header("content-type", "application/x-protobuf") - .body(body) + .body(body.clone()) .reply(&otlp_traces_api_handler) .await; - - assert_eq!(resp.status(), 200); - let actual_response: ExportLogsServiceResponse = - serde_json::from_slice(resp.body()).unwrap(); - assert!(actual_response.partial_success.is_some()); - assert_eq!( - actual_response - .partial_success - .unwrap() - .rejected_log_records, - 0 - ); + assert_eq!(resp.status(), 200); + let actual_response: ExportLogsServiceResponse = + serde_json::from_slice(resp.body()).unwrap(); + assert!(actual_response.partial_success.is_some()); + assert_eq!( + actual_response + .partial_success + .unwrap() + .rejected_log_records, + 0 + ); + } + { + // Test endpoint with given index ID. + let resp = warp::test::request() + .path("/otel-traces-v0_6/otlp/v1/logs") + .method("POST") + .header("content-type", "application/x-protobuf") + .body(body.clone()) + .reply(&otlp_traces_api_handler) + .await; + assert_eq!(resp.status(), 200); + let actual_response: ExportLogsServiceResponse = + serde_json::from_slice(resp.body()).unwrap(); + assert!(actual_response.partial_success.is_some()); + assert_eq!( + actual_response + .partial_success + .unwrap() + .rejected_log_records, + 0 + ); + } } #[tokio::test] @@ -238,18 +302,35 @@ mod tests { let body = export_trace_request.encode_to_vec(); let otlp_traces_api_handler = otlp_ingest_api_handlers(Some(logs_service), Some(traces_service)).recover(recover_fn); - let resp = warp::test::request() - .path("/otel-traces-v0_6/ingest/otlp/v1/traces") - .method("POST") - .header("content-type", "application/x-protobuf") - .body(body) - .reply(&otlp_traces_api_handler) - .await; - - assert_eq!(resp.status(), 200); - let actual_response: ExportTraceServiceResponse = - serde_json::from_slice(resp.body()).unwrap(); - assert!(actual_response.partial_success.is_some()); - assert_eq!(actual_response.partial_success.unwrap().rejected_spans, 0); + { + // Test default otlp endpoint + let resp = warp::test::request() + .path("/otlp/v1/traces") + .method("POST") + .header("content-type", "application/x-protobuf") + .body(body.clone()) + .reply(&otlp_traces_api_handler) + .await; + assert_eq!(resp.status(), 200); + let actual_response: ExportTraceServiceResponse = + serde_json::from_slice(resp.body()).unwrap(); + assert!(actual_response.partial_success.is_some()); + assert_eq!(actual_response.partial_success.unwrap().rejected_spans, 0); + } + { + // Test endpoint with given index ID. + let resp = warp::test::request() + .path("/otel-traces-v0_6/otlp/v1/traces") + .method("POST") + .header("content-type", "application/x-protobuf") + .body(body) + .reply(&otlp_traces_api_handler) + .await; + assert_eq!(resp.status(), 200); + let actual_response: ExportTraceServiceResponse = + serde_json::from_slice(resp.body()).unwrap(); + assert!(actual_response.partial_success.is_some()); + assert_eq!(actual_response.partial_success.unwrap().rejected_spans, 0); + } } }