diff --git a/quickwit/quickwit-opentelemetry/src/otlp/logs.rs b/quickwit/quickwit-opentelemetry/src/otlp/logs.rs index a78f49fceea..8937c3a54f5 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/logs.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/logs.rs @@ -22,7 +22,6 @@ use std::collections::{btree_set, BTreeSet, HashMap}; use async_trait::async_trait; use prost::Message; -use quickwit_common::rate_limited_error; use quickwit_common::thread_pool::run_cpu_intensive; use quickwit_common::uri::Uri; use quickwit_config::{load_index_config_from_user_config, ConfigFormat, IndexConfig}; @@ -36,6 +35,7 @@ use quickwit_proto::opentelemetry::proto::collector::logs::v1::{ use quickwit_proto::types::{DocUidGenerator, IndexId}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; +use time::OffsetDateTime; use tonic::{Request, Response, Status}; use tracing::field::Empty; use tracing::{error, instrument, warn, Span as RuntimeSpan}; @@ -47,7 +47,7 @@ use super::{ use crate::otlp::extract_attributes; use crate::otlp::metrics::OTLP_SERVICE_METRICS; -pub const OTEL_LOGS_INDEX_ID: &str = "otel-logs-v0_7"; +pub const OTEL_LOGS_INDEX_ID: &str = "otel-logs-v0_9"; const OTEL_LOGS_INDEX_CONFIG: &str = r#" version: 0.8 @@ -147,9 +147,7 @@ pub enum OtlpLogsError { #[derive(Debug, Serialize, Deserialize)] pub struct LogRecord { pub timestamp_nanos: u64, - #[serde(default)] - #[serde(skip_serializing_if = "Option::is_none")] - pub observed_timestamp_nanos: Option, + pub observed_timestamp_nanos: u64, #[serde(default)] #[serde(skip_serializing_if = "String::is_empty")] pub service_name: String, @@ -302,7 +300,8 @@ impl OtlpGrpcLogsService { request: ExportLogsServiceRequest, parent_span: RuntimeSpan, ) -> Result { - let (log_records, mut num_parse_errors) = parse_otlp_logs(request)?; + let log_records = parse_otlp_logs(request)?; + let mut num_parse_errors = 0; let num_log_records = log_records.len() as u64; let mut error_message = String::new(); @@ -400,10 +399,8 @@ impl LogsService for OtlpGrpcLogsService { fn parse_otlp_logs( request: ExportLogsServiceRequest, -) -> Result<(BTreeSet, u64), OtlpLogsError> { +) -> Result, OtlpLogsError> { let mut log_records = BTreeSet::new(); - let mut num_parse_errors = 0; - for resource_log in request.resource_logs { let mut resource_attributes = extract_attributes( resource_log @@ -446,16 +443,27 @@ fn parse_otlp_logs( .unwrap_or(0); for log_record in scope_log.log_records { - if log_record.time_unix_nano == 0 { - rate_limited_error!(limit_per_min = 10, "skipping record"); - num_parse_errors += 1; - continue; - } - let observed_timestamp_nanos = if log_record.observed_time_unix_nano != 0 { - Some(log_record.observed_time_unix_nano) + let observed_timestamp_nanos = if log_record.observed_time_unix_nano == 0 { + // As per OTEL model spec, this field SHOULD be set once the + // event is observed by OpenTelemetry. If it's not set, we + // consider ourselves as the first OTEL observers. + OffsetDateTime::now_utc().unix_timestamp_nanos() as u64 } else { - None + log_record.observed_time_unix_nano }; + + let timestamp_nanos = if log_record.time_unix_nano == 0 { + observed_timestamp_nanos + } else { + // When only one timestamp is supported by a recipients, the + // OTEL spec recommends using the `Timestamp` field if + // present, otherwise `ObservedTimestamp`. Even though our + // model supports multiple timestamps, we have only one + // field that that can be our `timestamp_field` and it + // should be the one that is commonly used for queries. + log_record.time_unix_nano + }; + let trace_id = if log_record.trace_id.iter().any(|&byte| byte != 0) { let trace_id = TraceId::try_from(log_record.trace_id)?; Some(trace_id) @@ -481,7 +489,7 @@ fn parse_otlp_logs( let dropped_attributes_count = log_record.dropped_attributes_count; let log_record = LogRecord { - timestamp_nanos: log_record.time_unix_nano, + timestamp_nanos, observed_timestamp_nanos, service_name: service_name.clone(), severity_text, @@ -503,7 +511,7 @@ fn parse_otlp_logs( } } } - Ok((log_records, num_parse_errors)) + Ok(log_records) } /// An iterator of JSON OTLP log records for use in the doc processor. @@ -551,13 +559,13 @@ impl Iterator for JsonLogIterator { pub fn parse_otlp_logs_json(payload_json: &[u8]) -> Result { let request: ExportLogsServiceRequest = serde_json::from_slice(payload_json)?; - let (log_records, _num_parse_errors) = parse_otlp_logs(request)?; + let log_records = parse_otlp_logs(request)?; Ok(JsonLogIterator::new(log_records, payload_json.len())) } pub fn parse_otlp_logs_protobuf(payload_proto: &[u8]) -> Result { let request = ExportLogsServiceRequest::decode(payload_proto)?; - let (log_records, _num_parse_errors) = parse_otlp_logs(request)?; + let log_records = parse_otlp_logs(request)?; Ok(JsonLogIterator::new(log_records, payload_proto.len())) } diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0021-cat-indices.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0021-cat-indices.yaml index c5c0e9ae809..f13ef92293c 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/0021-cat-indices.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0021-cat-indices.yaml @@ -20,7 +20,7 @@ expected: $expect: 270 < float(val[:-2]) < 280 rep: '1' #uuid: gharchive:01HN2SDANHDN6WFAFNH7BBMQ8C -- index: otel-logs-v0_7 +- index: otel-logs-v0_9 docs.count: '0' - index: otel-traces-v0_7 docs.count: '0'