Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow specifying index in otel http protobuf api #5421

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 111 additions & 15 deletions quickwit/quickwit-serve/src/otlp_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

use quickwit_common::rate_limited_error;
use quickwit_opentelemetry::otlp::{
OtlpGrpcLogsService, OtlpGrpcTracesService, OTEL_LOGS_INDEX_ID, OTEL_TRACES_INDEX_ID,
OtelSignal, OtlpGrpcLogsService, OtlpGrpcTracesService, OTEL_LOGS_INDEX_ID,
OTEL_TRACES_INDEX_ID,
};
use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_server::LogsService;
use quickwit_proto::opentelemetry::proto::collector::logs::v1::{
Expand All @@ -41,7 +42,12 @@ use crate::rest_api_response::into_rest_api_response;
use crate::{require, with_arg, Body, BodyFormat};

#[derive(utoipa::OpenApi)]
#[openapi(paths(otlp_default_logs_handler, otlp_default_traces_handler))]
#[openapi(paths(
otlp_default_logs_handler,
otlp_logs_handler,
otlp_default_traces_handler,
otlp_ingest_traces_handler
))]
pub struct OtlpApi;

/// Setup OpenTelemetry API handlers.
Expand Down Expand Up @@ -82,7 +88,16 @@ pub(crate) fn otlp_default_logs_handler(
.and(with_arg(BodyFormat::default()))
.map(into_rest_api_response)
}

/// Open Telemetry REST/Protobuf logs ingest endpoint.
#[utoipa::path(
post,
tag = "Open Telemetry",
path = "/{index}/otlp/v1/logs",
request_body(content = String, description = "`ExportLogsServiceRequest` protobuf message", content_type = "application/x-protobuf"),
responses(
(status = 200, description = "Successfully exported logs.", body = ExportLogsServiceResponse)
),
)]
pub(crate) fn otlp_logs_handler(
otlp_log_service: Option<OtlpGrpcLogsService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
Expand Down Expand Up @@ -126,7 +141,16 @@ pub(crate) fn otlp_default_traces_handler(
.and(with_arg(BodyFormat::default()))
.map(into_rest_api_response)
}

/// Open Telemetry REST/Protobuf traces ingest endpoint.
#[utoipa::path(
post,
tag = "Open Telemetry",
path = "/{index}/otlp/v1/traces",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit surprised that we use a path param here and not a header (also what was suggested in #5413). It would be more in line with the way it works in GRPC. But of course in GRPC you don't have a path, so the comparison doesn't really hold. I guess both are fine!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That api endpoint was already there (but didn't work properly and was absent from openapi definiton). @fmassot, looks like that code was yours, what do you think (index in path vs in header)?

request_body(content = String, description = "`ExportTraceServiceRequest` protobuf message", content_type = "application/x-protobuf"),
responses(
(status = 200, description = "Successfully exported traces.", body = ExportTracesServiceResponse)
),
)]
pub(crate) fn otlp_ingest_traces_handler(
otlp_traces_service: Option<OtlpGrpcTracesService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
Expand Down Expand Up @@ -165,30 +189,44 @@ impl ServiceError for OtlpApiError {

async fn otlp_ingest_logs(
otlp_logs_service: OtlpGrpcLogsService,
_index_id: IndexId, // <- TODO: use index ID when gRPC service supports it.
index_id: IndexId,
body: Body,
) -> Result<ExportLogsServiceResponse, OtlpApiError> {
// TODO: use index ID.
let export_logs_request: ExportLogsServiceRequest =
prost::Message::decode(&body.content[..])
.map_err(|err| OtlpApiError::InvalidPayload(err.to_string()))?;
let mut request = tonic::Request::new(export_logs_request);
let index = index_id
.try_into()
.map_err(|_| OtlpApiError::InvalidPayload("invalid index id".to_string()))?;
request
.metadata_mut()
.insert(OtelSignal::Logs.header_name(), index);
let result = otlp_logs_service
.export(tonic::Request::new(export_logs_request))
.export(request)
.await
.map_err(|err| OtlpApiError::Ingest(err.to_string()))?;
Ok(result.into_inner())
}

async fn otlp_ingest_traces(
otlp_traces_service: OtlpGrpcTracesService,
_index_id: IndexId, // <- TODO: use index ID when gRPC service supports it.
index_id: IndexId,
body: Body,
) -> Result<ExportTraceServiceResponse, OtlpApiError> {
let export_traces_request: ExportTraceServiceRequest =
prost::Message::decode(&body.content[..])
.map_err(|err| OtlpApiError::InvalidPayload(err.to_string()))?;
let mut request = tonic::Request::new(export_traces_request);
let index = index_id
.try_into()
.map_err(|_| OtlpApiError::InvalidPayload("invalid index id".to_string()))?;
request
.metadata_mut()
.insert(OtelSignal::Traces.header_name(), index);
let response = otlp_traces_service
.export(tonic::Request::new(export_traces_request))
.export(request)
.await
.map_err(|err| OtlpApiError::Ingest(err.to_string()))?;
Ok(response.into_inner())
Expand Down Expand Up @@ -232,11 +270,40 @@ mod tests {
let mut mock_ingest_router = MockIngestRouterService::new();
mock_ingest_router
.expect_ingest()
.times(2)
.withf(|request| {
request.subrequests.len() == 1
&& request.subrequests[0].doc_batch.is_some()
if request.subrequests.len() == 1 {
let subrequest = &request.subrequests[0];
subrequest.doc_batch.is_some()
// && request.commit == CommitType::Auto as i32
&& request.subrequests[0].doc_batch.as_ref().unwrap().doc_lengths.len() == 1
&& subrequest.doc_batch.as_ref().unwrap().doc_lengths.len() == 1
&& subrequest.index_id == quickwit_opentelemetry::otlp::OTEL_LOGS_INDEX_ID
} else {
false
}
})
.returning(|_| {
Ok(IngestResponseV2 {
successes: vec![IngestSuccess {
num_ingested_docs: 1,
..Default::default()
}],
failures: Vec::new(),
})
});
mock_ingest_router
.expect_ingest()
.times(1)
.withf(|request| {
if request.subrequests.len() == 1 {
let subrequest = &request.subrequests[0];
subrequest.doc_batch.is_some()
// && request.commit == CommitType::Auto as i32
&& subrequest.doc_batch.as_ref().unwrap().doc_lengths.len() == 1
&& subrequest.index_id == "otel-traces-v0_6"
} else {
false
}
})
.returning(|_| {
Ok(IngestResponseV2 {
Expand Down Expand Up @@ -349,11 +416,40 @@ mod tests {
let mut mock_ingest_router = MockIngestRouterService::new();
mock_ingest_router
.expect_ingest()
.times(2)
.withf(|request| {
if request.subrequests.len() == 1 {
let subrequest = &request.subrequests[0];
subrequest.doc_batch.is_some()
// && request.commit == CommitType::Auto as i32
&& subrequest.doc_batch.as_ref().unwrap().doc_lengths.len() == 5
&& subrequest.index_id == quickwit_opentelemetry::otlp::OTEL_TRACES_INDEX_ID
} else {
false
}
})
.returning(|_| {
Ok(IngestResponseV2 {
successes: vec![IngestSuccess {
num_ingested_docs: 1,
..Default::default()
}],
failures: Vec::new(),
})
});
mock_ingest_router
.expect_ingest()
.times(1)
.withf(|request| {
request.subrequests.len() == 1
&& request.subrequests[0].doc_batch.is_some()
// && request.commit == CommitType::Auto as i32
&& request.subrequests[0].doc_batch.as_ref().unwrap().doc_lengths.len() == 5
if request.subrequests.len() == 1 {
let subrequest = &request.subrequests[0];
subrequest.doc_batch.is_some()
// && request.commit == CommitType::Auto as i32
&& subrequest.doc_batch.as_ref().unwrap().doc_lengths.len() == 5
&& subrequest.index_id == "otel-traces-v0_6"
} else {
false
}
})
.returning(|_| {
Ok(IngestResponseV2 {
Expand Down
Loading