Skip to content

Commit

Permalink
Add default otlp endpoints.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Jan 2, 2024
1 parent fcde258 commit d2f7de9
Showing 1 changed file with 115 additions and 34 deletions.
149 changes: 115 additions & 34 deletions quickwit/quickwit-serve/src/otlp_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use bytes::Bytes;
use quickwit_opentelemetry::otlp::{OtlpGrpcLogsService, OtlpGrpcTracesService};
use quickwit_opentelemetry::otlp::{OtlpGrpcLogsService, OtlpGrpcTracesService, OTEL_LOGS_INDEX_ID, OTEL_TRACES_INDEX_ID};
use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_server::LogsService;
use quickwit_proto::opentelemetry::proto::collector::logs::v1::{
ExportLogsServiceRequest, ExportLogsServiceResponse,
Expand All @@ -44,14 +44,36 @@ pub(crate) fn otlp_ingest_api_handlers(
otlp_logs_service: Option<OtlpGrpcLogsService>,
otlp_traces_service: Option<OtlpGrpcTracesService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
oltp_ingest_logs_handler(otlp_logs_service).or(oltp_ingest_traces_handler(otlp_traces_service))
otlp_default_logs_handler(otlp_logs_service.clone())
.or(otlp_default_traces_handler(otlp_traces_service.clone()))
.or(otlp_logs_handler(otlp_logs_service))
.or(otlp_ingest_traces_handler(otlp_traces_service))
}

pub(crate) fn oltp_ingest_logs_handler(
pub(crate) fn otlp_default_logs_handler(
otlp_logs_service: Option<OtlpGrpcLogsService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
require(otlp_logs_service)
.and(warp::path!("otlp" / "v1" / "logs"))
.and(warp::header::exact_ignore_case(
"content-type",
"application/x-protobuf",
))
.and(warp::post())
.and(warp::body::bytes())
.then(|otlp_logs_service, body| async move {
otlp_ingest_logs(otlp_logs_service, OTEL_LOGS_INDEX_ID.to_string(), body)
.await
})
.and(with_arg(BodyFormat::default()))
.map(make_json_api_response)
}

pub(crate) fn otlp_logs_handler(
otlp_log_service: Option<OtlpGrpcLogsService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
require(otlp_log_service)
.and(warp::path!(String / "ingest" / "otlp" / "v1" / "logs"))
.and(warp::path!(String / "otlp" / "v1" / "logs"))
.and(warp::header::exact_ignore_case(
"content-type",
"application/x-protobuf",
Expand All @@ -63,11 +85,30 @@ pub(crate) fn oltp_ingest_logs_handler(
.map(make_json_api_response)
}

pub(crate) fn oltp_ingest_traces_handler(
pub(crate) fn otlp_default_traces_handler(
otlp_traces_service: Option<OtlpGrpcTracesService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
require(otlp_traces_service)
.and(warp::path!(String / "ingest" / "otlp" / "v1" / "traces"))
.and(warp::path!("otlp" / "v1" / "traces"))
.and(warp::header::exact_ignore_case(
"content-type",
"application/x-protobuf",
))
.and(warp::post())
.and(warp::body::bytes())
.then(|otlp_traces_service, body| async move {
otlp_ingest_traces(otlp_traces_service, OTEL_TRACES_INDEX_ID.to_string(), body)
.await
})
.and(with_arg(BodyFormat::default()))
.map(make_json_api_response)
}

pub(crate) fn otlp_ingest_traces_handler(
otlp_traces_service: Option<OtlpGrpcTracesService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
require(otlp_traces_service)
.and(warp::path!(String / "otlp" / "v1" / "traces"))
.and(warp::header::exact_ignore_case(
"content-type",
"application/x-protobuf",
Expand Down Expand Up @@ -192,25 +233,48 @@ mod tests {
let body = export_logs_request.encode_to_vec();
let otlp_traces_api_handler =
otlp_ingest_api_handlers(Some(logs_service), Some(traces_service)).recover(recover_fn);
let resp = warp::test::request()
.path("/otel-traces-v0_6/ingest/otlp/v1/logs")
{
// Test default otlp endpoint
let resp = warp::test::request()
.path("/otlp/v1/logs")
.method("POST")
.header("content-type", "application/x-protobuf")
.body(body)
.body(body.clone())
.reply(&otlp_traces_api_handler)
.await;

assert_eq!(resp.status(), 200);
let actual_response: ExportLogsServiceResponse =
serde_json::from_slice(resp.body()).unwrap();
assert!(actual_response.partial_success.is_some());
assert_eq!(
actual_response
.partial_success
.unwrap()
.rejected_log_records,
0
);
assert_eq!(resp.status(), 200);
let actual_response: ExportLogsServiceResponse =
serde_json::from_slice(resp.body()).unwrap();
assert!(actual_response.partial_success.is_some());
assert_eq!(
actual_response
.partial_success
.unwrap()
.rejected_log_records,
0
);
}
{
// Test endpoint with given index ID.
let resp = warp::test::request()
.path("/otel-traces-v0_6/otlp/v1/logs")
.method("POST")
.header("content-type", "application/x-protobuf")
.body(body.clone())
.reply(&otlp_traces_api_handler)
.await;
assert_eq!(resp.status(), 200);
let actual_response: ExportLogsServiceResponse =
serde_json::from_slice(resp.body()).unwrap();
assert!(actual_response.partial_success.is_some());
assert_eq!(
actual_response
.partial_success
.unwrap()
.rejected_log_records,
0
);
}
}

#[tokio::test]
Expand Down Expand Up @@ -238,18 +302,35 @@ mod tests {
let body = export_trace_request.encode_to_vec();
let otlp_traces_api_handler =
otlp_ingest_api_handlers(Some(logs_service), Some(traces_service)).recover(recover_fn);
let resp = warp::test::request()
.path("/otel-traces-v0_6/ingest/otlp/v1/traces")
.method("POST")
.header("content-type", "application/x-protobuf")
.body(body)
.reply(&otlp_traces_api_handler)
.await;

assert_eq!(resp.status(), 200);
let actual_response: ExportTraceServiceResponse =
serde_json::from_slice(resp.body()).unwrap();
assert!(actual_response.partial_success.is_some());
assert_eq!(actual_response.partial_success.unwrap().rejected_spans, 0);
{
// Test default otlp endpoint
let resp = warp::test::request()
.path("/otlp/v1/traces")
.method("POST")
.header("content-type", "application/x-protobuf")
.body(body.clone())
.reply(&otlp_traces_api_handler)
.await;
assert_eq!(resp.status(), 200);
let actual_response: ExportTraceServiceResponse =
serde_json::from_slice(resp.body()).unwrap();
assert!(actual_response.partial_success.is_some());
assert_eq!(actual_response.partial_success.unwrap().rejected_spans, 0);
}
{
// Test endpoint with given index ID.
let resp = warp::test::request()
.path("/otel-traces-v0_6/otlp/v1/traces")
.method("POST")
.header("content-type", "application/x-protobuf")
.body(body)
.reply(&otlp_traces_api_handler)
.await;
assert_eq!(resp.status(), 200);
let actual_response: ExportTraceServiceResponse =
serde_json::from_slice(resp.body()).unwrap();
assert!(actual_response.partial_success.is_some());
assert_eq!(actual_response.partial_success.unwrap().rejected_spans, 0);
}
}
}

0 comments on commit d2f7de9

Please sign in to comment.