Skip to content

Commit

Permalink
Change default timestamps in OTEL logs
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Aug 29, 2024
1 parent 7e6d4fc commit df84179
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 22 deletions.
50 changes: 29 additions & 21 deletions quickwit/quickwit-opentelemetry/src/otlp/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::collections::{btree_set, BTreeSet, HashMap};

use async_trait::async_trait;
use prost::Message;
use quickwit_common::rate_limited_error;
use quickwit_common::thread_pool::run_cpu_intensive;
use quickwit_common::uri::Uri;
use quickwit_config::{load_index_config_from_user_config, ConfigFormat, IndexConfig};
Expand All @@ -36,6 +35,7 @@ use quickwit_proto::opentelemetry::proto::collector::logs::v1::{
use quickwit_proto::types::{DocUidGenerator, IndexId};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use time::OffsetDateTime;
use tonic::{Request, Response, Status};
use tracing::field::Empty;
use tracing::{error, instrument, warn, Span as RuntimeSpan};
Expand All @@ -47,7 +47,7 @@ use super::{
use crate::otlp::extract_attributes;
use crate::otlp::metrics::OTLP_SERVICE_METRICS;

pub const OTEL_LOGS_INDEX_ID: &str = "otel-logs-v0_7";
pub const OTEL_LOGS_INDEX_ID: &str = "otel-logs-v0_9";

const OTEL_LOGS_INDEX_CONFIG: &str = r#"
version: 0.8
Expand Down Expand Up @@ -147,9 +147,7 @@ pub enum OtlpLogsError {
#[derive(Debug, Serialize, Deserialize)]
pub struct LogRecord {
pub timestamp_nanos: u64,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub observed_timestamp_nanos: Option<u64>,
pub observed_timestamp_nanos: u64,
#[serde(default)]
#[serde(skip_serializing_if = "String::is_empty")]
pub service_name: String,
Expand Down Expand Up @@ -302,7 +300,8 @@ impl OtlpGrpcLogsService {
request: ExportLogsServiceRequest,
parent_span: RuntimeSpan,
) -> Result<ParsedLogRecords, Status> {
let (log_records, mut num_parse_errors) = parse_otlp_logs(request)?;
let log_records = parse_otlp_logs(request)?;
let mut num_parse_errors = 0;
let num_log_records = log_records.len() as u64;
let mut error_message = String::new();

Expand Down Expand Up @@ -400,10 +399,8 @@ impl LogsService for OtlpGrpcLogsService {

fn parse_otlp_logs(
request: ExportLogsServiceRequest,
) -> Result<(BTreeSet<OrdLogRecord>, u64), OtlpLogsError> {
) -> Result<BTreeSet<OrdLogRecord>, OtlpLogsError> {
let mut log_records = BTreeSet::new();
let mut num_parse_errors = 0;

for resource_log in request.resource_logs {
let mut resource_attributes = extract_attributes(
resource_log
Expand Down Expand Up @@ -446,16 +443,27 @@ fn parse_otlp_logs(
.unwrap_or(0);

for log_record in scope_log.log_records {
if log_record.time_unix_nano == 0 {
rate_limited_error!(limit_per_min = 10, "skipping record");
num_parse_errors += 1;
continue;
}
let observed_timestamp_nanos = if log_record.observed_time_unix_nano != 0 {
Some(log_record.observed_time_unix_nano)
let observed_timestamp_nanos = if log_record.observed_time_unix_nano == 0 {
// As per OTEL model spec, this field SHOULD be set once the
// event is observed by OpenTelemetry. If it's not set, we
// consider ourselves as the first OTEL observers.
OffsetDateTime::now_utc().unix_timestamp_nanos() as u64
} else {
None
log_record.observed_time_unix_nano
};

let timestamp_nanos = if log_record.time_unix_nano == 0 {
observed_timestamp_nanos
} else {
// When only one timestamp is supported by a recipients, the
// OTEL spec recommends using the `Timestamp` field if
// present, otherwise `ObservedTimestamp`. Even though our
// model supports multiple timestamps, we have only one
// field that that can be our `timestamp_field` and it
// should be the one that is commonly used for queries.
log_record.time_unix_nano
};

let trace_id = if log_record.trace_id.iter().any(|&byte| byte != 0) {
let trace_id = TraceId::try_from(log_record.trace_id)?;
Some(trace_id)
Expand All @@ -481,7 +489,7 @@ fn parse_otlp_logs(
let dropped_attributes_count = log_record.dropped_attributes_count;

let log_record = LogRecord {
timestamp_nanos: log_record.time_unix_nano,
timestamp_nanos,
observed_timestamp_nanos,
service_name: service_name.clone(),
severity_text,
Expand All @@ -503,7 +511,7 @@ fn parse_otlp_logs(
}
}
}
Ok((log_records, num_parse_errors))
Ok(log_records)
}

/// An iterator of JSON OTLP log records for use in the doc processor.
Expand Down Expand Up @@ -551,13 +559,13 @@ impl Iterator for JsonLogIterator {

pub fn parse_otlp_logs_json(payload_json: &[u8]) -> Result<JsonLogIterator, OtlpLogsError> {
let request: ExportLogsServiceRequest = serde_json::from_slice(payload_json)?;
let (log_records, _num_parse_errors) = parse_otlp_logs(request)?;
let log_records = parse_otlp_logs(request)?;
Ok(JsonLogIterator::new(log_records, payload_json.len()))
}

pub fn parse_otlp_logs_protobuf(payload_proto: &[u8]) -> Result<JsonLogIterator, OtlpLogsError> {
let request = ExportLogsServiceRequest::decode(payload_proto)?;
let (log_records, _num_parse_errors) = parse_otlp_logs(request)?;
let log_records = parse_otlp_logs(request)?;
Ok(JsonLogIterator::new(log_records, payload_proto.len()))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ expected:
$expect: 270 < float(val[:-2]) < 280
rep: '1'
#uuid: gharchive:01HN2SDANHDN6WFAFNH7BBMQ8C
- index: otel-logs-v0_7
- index: otel-logs-v0_9
docs.count: '0'
- index: otel-traces-v0_7
docs.count: '0'
Expand Down

0 comments on commit df84179

Please sign in to comment.