Skip to content

Commit

Permalink
Refactor some common code
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Aug 23, 2024
1 parent 2993423 commit e548422
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 52 deletions.
36 changes: 12 additions & 24 deletions quickwit/quickwit-opentelemetry/src/otlp/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@ use prost::Message;
use quickwit_common::rate_limited_error;
use quickwit_common::thread_pool::run_cpu_intensive;
use quickwit_common::uri::Uri;
use quickwit_config::{
load_index_config_from_user_config, ConfigFormat, IndexConfig, INGEST_V2_SOURCE_ID,
};
use quickwit_ingest::{CommitType, IngestServiceError, JsonDocBatchV2Builder};
use quickwit_proto::ingest::router::{
IngestRequestV2, IngestRouterService, IngestRouterServiceClient, IngestSubrequest,
};
use quickwit_config::{load_index_config_from_user_config, ConfigFormat, IndexConfig};
use quickwit_ingest::{CommitType, JsonDocBatchV2Builder};
use quickwit_proto::ingest::router::IngestRouterServiceClient;
use quickwit_proto::ingest::DocBatchV2;
use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_server::LogsService;
use quickwit_proto::opentelemetry::proto::collector::logs::v1::{
Expand All @@ -45,8 +41,8 @@ use tracing::field::Empty;
use tracing::{error, instrument, warn, Span as RuntimeSpan};

use super::{
extract_otel_index_id_from_metadata, ingest_failures_to_error, is_zero, parse_log_record_body,
OtelSignal, SpanId, TraceId, TryFromSpanIdError, TryFromTraceIdError,
extract_otel_index_id_from_metadata, is_zero, parse_log_record_body, store_helper, OtelSignal,
SpanId, TraceId, TryFromSpanIdError, TryFromTraceIdError,
};
use crate::otlp::extract_attributes;
use crate::otlp::metrics::OTLP_SERVICE_METRICS;
Expand Down Expand Up @@ -341,22 +337,14 @@ impl OtlpGrpcLogsService {
index_id: String,
doc_batch: DocBatchV2,
) -> Result<(), tonic::Status> {
let subrequest = IngestSubrequest {
subrequest_id: 0,
store_helper(
self.ingest_router.clone(),
index_id,
source_id: INGEST_V2_SOURCE_ID.to_string(),
doc_batch: Some(doc_batch),
};
let request = IngestRequestV2 {
commit_type: CommitType::Auto.into(),
subrequests: vec![subrequest],
};
let response = self
.ingest_router
.ingest(request)
.await
.map_err(IngestServiceError::from)?;
ingest_failures_to_error(response).map_err(|e| e.into())
doc_batch,
CommitType::Auto,
)
.await?;
Ok(())
}

async fn export_instrumented(
Expand Down
30 changes: 26 additions & 4 deletions quickwit/quickwit-opentelemetry/src/otlp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

use std::collections::HashMap;

use quickwit_config::{validate_identifier, validate_index_id_pattern};
use quickwit_ingest::IngestServiceError;
use quickwit_proto::ingest::router::IngestResponseV2;
use quickwit_config::{validate_identifier, validate_index_id_pattern, INGEST_V2_SOURCE_ID};
use quickwit_ingest::{CommitType, IngestServiceError};
use quickwit_proto::ingest::router::{
IngestRequestV2, IngestRouterService, IngestRouterServiceClient, IngestSubrequest,
};
use quickwit_proto::ingest::DocBatchV2;
use quickwit_proto::opentelemetry::proto::common::v1::any_value::Value as OtlpValue;
use quickwit_proto::opentelemetry::proto::common::v1::{
AnyValue as OtlpAnyValue, ArrayValue as OtlpArrayValue, KeyValue as OtlpKeyValue,
Expand Down Expand Up @@ -222,7 +225,26 @@ pub(crate) fn extract_otel_index_id_from_metadata(
Ok(index_id.to_string())
}

fn ingest_failures_to_error(mut response: IngestResponseV2) -> Result<(), IngestServiceError> {
async fn store_helper(
ingest_router: IngestRouterServiceClient,
index_id: String,
doc_batch: DocBatchV2,
commit_type: CommitType,
) -> Result<(), IngestServiceError> {
let subrequest = IngestSubrequest {
subrequest_id: 0,
index_id,
source_id: INGEST_V2_SOURCE_ID.to_string(),
doc_batch: Some(doc_batch),
};
let request = IngestRequestV2 {
commit_type: commit_type.into(),
subrequests: vec![subrequest],
};
let mut response = ingest_router
.ingest(request)
.await
.map_err(IngestServiceError::from)?;
let num_responses = response.successes.len() + response.failures.len();
if num_responses != 1 {
return Err(IngestServiceError::Internal(format!(
Expand Down
36 changes: 12 additions & 24 deletions quickwit/quickwit-opentelemetry/src/otlp/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@ use async_trait::async_trait;
use prost::Message;
use quickwit_common::thread_pool::run_cpu_intensive;
use quickwit_common::uri::Uri;
use quickwit_config::{
load_index_config_from_user_config, ConfigFormat, IndexConfig, INGEST_V2_SOURCE_ID,
};
use quickwit_ingest::{CommitType, IngestServiceError, JsonDocBatchV2Builder};
use quickwit_proto::ingest::router::{
IngestRequestV2, IngestRouterService, IngestRouterServiceClient, IngestSubrequest,
};
use quickwit_config::{load_index_config_from_user_config, ConfigFormat, IndexConfig};
use quickwit_ingest::{CommitType, JsonDocBatchV2Builder};
use quickwit_proto::ingest::router::IngestRouterServiceClient;
use quickwit_proto::ingest::DocBatchV2;
use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_server::TraceService;
use quickwit_proto::opentelemetry::proto::collector::trace::v1::{
Expand All @@ -50,8 +46,8 @@ use tracing::field::Empty;
use tracing::{error, instrument, warn, Span as RuntimeSpan};

use super::{
extract_otel_index_id_from_metadata, ingest_failures_to_error, is_zero, OtelSignal,
TryFromSpanIdError, TryFromTraceIdError,
extract_otel_index_id_from_metadata, is_zero, store_helper, OtelSignal, TryFromSpanIdError,
TryFromTraceIdError,
};
use crate::otlp::metrics::OTLP_SERVICE_METRICS;
use crate::otlp::{extract_attributes, SpanId, TraceId};
Expand Down Expand Up @@ -797,22 +793,14 @@ impl OtlpGrpcTracesService {
index_id: String,
doc_batch: DocBatchV2,
) -> Result<(), tonic::Status> {
let subrequest = IngestSubrequest {
subrequest_id: 0,
store_helper(
self.ingest_router.clone(),
index_id,
source_id: INGEST_V2_SOURCE_ID.to_string(),
doc_batch: Some(doc_batch),
};
let request = IngestRequestV2 {
commit_type: self.commit_type.into(),
subrequests: vec![subrequest],
};
let response = self
.ingest_router
.ingest(request)
.await
.map_err(IngestServiceError::from)?;
ingest_failures_to_error(response).map_err(|e| e.into())
doc_batch,
self.commit_type,
)
.await?;
Ok(())
}

async fn export_instrumented(
Expand Down

0 comments on commit e548422

Please sign in to comment.