From be1c795d59c50d400ad86d6a13e865454da8d3a0 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Wed, 18 Oct 2023 14:44:29 -0400 Subject: [PATCH] Add support for OTLP trace data in any source --- .../index-config.yaml | 0 .../tutorials/otel-traces/kafka-source.yaml | 8 + docker-compose.yml | 12 +- monitoring/otel-collector-config.yaml | 6 +- quickwit/Cargo.lock | 2 + .../quickwit-config/src/source_config/mod.rs | 5 +- .../src/source_config/serialize.rs | 6 + quickwit/quickwit-indexing/Cargo.toml | 4 +- .../src/actors/doc_processor.rs | 629 ++++++++++++------ .../src/actors/vrl_processing.rs | 42 +- .../src/models/indexing_statistics.rs | 10 +- .../quickwit-opentelemetry/src/otlp/mod.rs | 9 +- .../quickwit-opentelemetry/src/otlp/traces.rs | 131 +++- 13 files changed, 615 insertions(+), 249 deletions(-) rename config/tutorials/{otel-trace => otel-traces}/index-config.yaml (100%) create mode 100644 config/tutorials/otel-traces/kafka-source.yaml diff --git a/config/tutorials/otel-trace/index-config.yaml b/config/tutorials/otel-traces/index-config.yaml similarity index 100% rename from config/tutorials/otel-trace/index-config.yaml rename to config/tutorials/otel-traces/index-config.yaml diff --git a/config/tutorials/otel-traces/kafka-source.yaml b/config/tutorials/otel-traces/kafka-source.yaml new file mode 100644 index 00000000000..808f3d3b7c7 --- /dev/null +++ b/config/tutorials/otel-traces/kafka-source.yaml @@ -0,0 +1,8 @@ +version: 0.6 +source_id: kafka-source +source_type: kafka +input_format: otlp_trace_proto +params: + topic: otlp_spans + client_params: + bootstrap.servers: localhost:9092 diff --git a/docker-compose.yml b/docker-compose.yml index a33fc4ca166..23b42b48078 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -93,6 +93,7 @@ services: ports: - "${MAP_HOST_KAFKA:-127.0.0.1}:9092:9092" - "${MAP_HOST_KAFKA:-127.0.0.1}:9101:9101" + - "${MAP_HOST_KAFKA:-127.0.0.1}:29092:29092" profiles: - all - kafka @@ -100,7 +101,7 @@ services: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka-broker:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 @@ -167,8 +168,6 @@ services: image: jaegertracing/all-in-one:${JAEGER_VERSION:-1.48.0} container_name: jaeger ports: - - "${MAP_HOST_JAEGER:-127.0.0.1}:4317:4317" # OTLP over gRPC - - "${MAP_HOST_JAEGER:-127.0.0.1}:4318:4318" # OTLP over HTTP - "${MAP_HOST_JAEGER:-127.0.0.1}:16686:16686" # Frontend profiles: - jaeger @@ -206,18 +205,19 @@ services: - "host.docker.internal:host-gateway" gcp-pubsub-emulator: - profiles: - - gcp-pubsub - - all # It is not an official docker image # if we prefer we can build a docker from the official docker image (gcloud cli) # and install the pubsub emulator https://cloud.google.com/pubsub/docs/emulator image: thekevjames/gcloud-pubsub-emulator:${GCLOUD_EMULATOR:-7555256f2c} + container_name: gcp-pubsub-emulator ports: - "${MAP_HOST_GCLOUD_EMULATOR:-127.0.0.1}:8681:8681" environment: # create a fake gcp project and a topic / subscription - PUBSUB_PROJECT1=quickwit-emulator,emulator_topic:emulator_subscription + profiles: + - all + - gcp-pubsub volumes: localstack_data: diff --git a/monitoring/otel-collector-config.yaml b/monitoring/otel-collector-config.yaml index 1be656fb4f4..4a1d514b83b 100644 --- a/monitoring/otel-collector-config.yaml +++ b/monitoring/otel-collector-config.yaml @@ -20,6 +20,10 @@ exporters: tls: insecure: true + kafka: + brokers: + - kafka-broker:29092 + otlp/qw: endpoint: host.docker.internal:7281 tls: @@ -36,7 +40,7 @@ service: traces: receivers: [jaeger, otlp] processors: [batch] - exporters: [jaeger, otlp/qw] + exporters: [jaeger, kafka, otlp/qw] # metrics: # receivers: [otlp] # processors: [batch] diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 2fb3a4b762f..2f052c4b08d 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5302,6 +5302,7 @@ dependencies = [ "oneshot", "openssl", "proptest", + "prost", "pulsar", "quickwit-actors", "quickwit-aws", @@ -5312,6 +5313,7 @@ dependencies = [ "quickwit-doc-mapper", "quickwit-ingest", "quickwit-metastore", + "quickwit-opentelemetry", "quickwit-proto", "quickwit-query", "quickwit-storage", diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index 0c632371c31..c7c044006ce 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -201,11 +201,14 @@ impl TestableForRegression for SourceConfig { } } -#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] #[serde(rename_all = "snake_case")] pub enum SourceInputFormat { #[default] Json, + OtlpTraceJson, + #[serde(alias = "otlp_trace_proto")] + OtlpTraceProtobuf, #[serde(alias = "plain")] PlainText, } diff --git a/quickwit/quickwit-config/src/source_config/serialize.rs b/quickwit/quickwit-config/src/source_config/serialize.rs index 75a4b23c2ea..ed68e53707a 100644 --- a/quickwit/quickwit-config/src/source_config/serialize.rs +++ b/quickwit/quickwit-config/src/source_config/serialize.rs @@ -108,6 +108,12 @@ impl SourceConfigForSerialization { } if let Some(transform_config) = &self.transform { + if matches!( + self.input_format, + SourceInputFormat::OtlpTraceJson | SourceInputFormat::OtlpTraceProtobuf + ) { + bail!("VRL transforms are not supported for OTLP input formats"); + } transform_config.validate_vrl_script()?; } diff --git a/quickwit/quickwit-indexing/Cargo.toml b/quickwit/quickwit-indexing/Cargo.toml index 26dda1c92fb..c4280731836 100644 --- a/quickwit/quickwit-indexing/Cargo.toml +++ b/quickwit/quickwit-indexing/Cargo.toml @@ -61,6 +61,7 @@ quickwit-directories = { workspace = true } quickwit-doc-mapper = { workspace = true } quickwit-ingest = { workspace = true } quickwit-metastore = { workspace = true } +quickwit-opentelemetry = { workspace = true } quickwit-proto = { workspace = true } quickwit-storage = { workspace = true } @@ -87,9 +88,10 @@ bytes = { workspace = true } criterion = { workspace = true, features = ["async_tokio"] } mockall = { workspace = true } proptest = { workspace = true } +prost = { workspace = true } rand = { workspace = true } -tempfile = { workspace = true } reqwest = { workspace = true } +tempfile = { workspace = true } quickwit-actors = { workspace = true, features = ["testsuite"] } quickwit-cluster = { workspace = true, features = ["testsuite"] } diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 279c0624b0a..0ce7b39d361 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -18,15 +18,19 @@ // along with this program. If not, see . use std::string::FromUtf8Error; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use anyhow::Context; +use anyhow::{bail, Context}; use async_trait::async_trait; use bytes::Bytes; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; use quickwit_common::runtimes::RuntimeType; use quickwit_config::{SourceInputFormat, TransformConfig}; use quickwit_doc_mapper::{DocMapper, DocParsingError, JsonObject}; +use quickwit_opentelemetry::otlp::{ + parse_otlp_spans_json, parse_otlp_spans_protobuf, JsonSpanIterator, OtlpTraceError, +}; use serde::Serialize; use serde_json::Value as JsonValue; use tantivy::schema::{Field, Value}; @@ -43,54 +47,40 @@ use crate::models::{ const PLAIN_TEXT: &str = "plain_text"; -enum InputDoc { - Json(Bytes), - PlainText(Bytes), +pub(super) struct JsonDoc { + json_obj: JsonObject, + num_bytes: usize, } -impl InputDoc { - fn from_bytes(input_format: &SourceInputFormat, bytes: Bytes) -> Self { - match input_format { - SourceInputFormat::Json => InputDoc::Json(bytes), - SourceInputFormat::PlainText => InputDoc::PlainText(bytes), +impl JsonDoc { + pub fn new(json_obj: JsonObject, num_bytes: usize) -> Self { + Self { + json_obj, + num_bytes, } } - #[cfg(feature = "vrl")] - fn try_into_vrl_doc(self) -> Result { - let vrl_doc = match self { - InputDoc::Json(bytes) => serde_json::from_slice::(&bytes)?, - InputDoc::PlainText(bytes) => { - use std::collections::BTreeMap; - let mut map = BTreeMap::new(); - let key = PLAIN_TEXT.to_string(); - let value = VrlValue::Bytes(bytes); - map.insert(key, value); - VrlValue::Object(map) - } - }; - Ok(vrl_doc) + pub fn try_from_json_value( + json_value: JsonValue, + num_bytes: usize, + ) -> Result { + match json_value { + JsonValue::Object(json_obj) => Ok(Self::new(json_obj, num_bytes)), + _ => Err(DocProcessorError::ParsingError), + } } - fn try_into_json_doc(self) -> Result { - let json_doc = match self { - InputDoc::Json(doc_bytes) => serde_json::from_slice::(&doc_bytes)?, - InputDoc::PlainText(doc_bytes) => { - let mut map = serde_json::Map::with_capacity(1); - let key = PLAIN_TEXT.to_string(); - let value = String::from_utf8(doc_bytes.to_vec())?; - map.insert(key, JsonValue::String(value)); - map - } - }; - Ok(json_doc) + #[cfg(feature = "vrl")] + pub fn try_from_vrl_doc(vrl_doc: VrlDoc) -> Result { + let json_value = serde_json::to_value(vrl_doc.vrl_value)?; + Self::try_from_json_value(json_value, vrl_doc.num_bytes) } } #[derive(Debug)] pub enum DocProcessorError { ParsingError, - MissingField, + SchemaError, #[cfg(feature = "vrl")] TransformError(VrlTerminate), } @@ -107,7 +97,126 @@ impl From for DocProcessorError { } } -#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +#[cfg(feature = "vrl")] +fn try_into_vrl_doc( + input_format: SourceInputFormat, + raw_doc: Bytes, + num_bytes: usize, +) -> Result { + let vrl_value = match input_format { + SourceInputFormat::Json => serde_json::from_slice::(&raw_doc)?, + SourceInputFormat::PlainText => { + let mut map = std::collections::BTreeMap::new(); + let key = PLAIN_TEXT.to_string(); + let value = VrlValue::Bytes(raw_doc); + map.insert(key, value); + VrlValue::Object(map) + } + SourceInputFormat::OtlpTraceJson | SourceInputFormat::OtlpTraceProtobuf => { + panic!("OTP log or trace data does not support VRL transforms") + } + }; + let vrl_doc = VrlDoc::new(vrl_value, num_bytes); + Ok(vrl_doc) +} + +fn try_into_json_docs( + input_format: SourceInputFormat, + raw_doc: Bytes, + num_bytes: usize, +) -> JsonDocIterator { + match input_format { + SourceInputFormat::Json => { + let json_doc_result = serde_json::from_slice::(&raw_doc) + .map(|json_obj| JsonDoc::new(json_obj, num_bytes)); + JsonDocIterator::from(json_doc_result) + } + SourceInputFormat::OtlpTraceJson => { + let spans = parse_otlp_spans_json(&raw_doc); + JsonDocIterator::from(spans) + } + SourceInputFormat::OtlpTraceProtobuf => { + let spans = parse_otlp_spans_protobuf(&raw_doc); + JsonDocIterator::from(spans) + } + SourceInputFormat::PlainText => { + let json_doc_result = String::from_utf8(raw_doc.to_vec()).map(|value| { + let mut json_obj = serde_json::Map::with_capacity(1); + let key = PLAIN_TEXT.to_string(); + json_obj.insert(key, JsonValue::String(value)); + JsonDoc::new(json_obj, num_bytes) + }); + JsonDocIterator::from(json_doc_result) + } + } +} + +#[cfg(feature = "vrl")] +fn parse_raw_doc( + input_format: SourceInputFormat, + raw_doc: Bytes, + num_bytes: usize, + vrl_program_opt: Option<&mut VrlProgram>, +) -> JsonDocIterator { + let Some(vrl_program) = vrl_program_opt else { + return try_into_json_docs(input_format, raw_doc, num_bytes); + }; + let json_doc_result = try_into_vrl_doc(input_format, raw_doc, num_bytes) + .and_then(|vrl_doc| vrl_program.transform_doc(vrl_doc)) + .and_then(|transformed_vrl_doc| JsonDoc::try_from_vrl_doc(transformed_vrl_doc)); + + JsonDocIterator::from(json_doc_result) +} + +#[cfg(not(feature = "vrl"))] +fn parse_raw_doc( + input_format: SourceInputFormat, + raw_doc: Bytes, + num_bytes: usize, + _vrl_program_opt: Option<&mut VrlProgram>, +) -> JsonDocIterator { + try_into_json_docs(input_format, raw_doc, num_bytes) +} + +enum JsonDocIterator { + One(Option>), + Spans(JsonSpanIterator), +} + +impl Iterator for JsonDocIterator { + type Item = Result; + + fn next(&mut self) -> Option { + match self { + Self::One(opt) => opt.take(), + Self::Spans(spans) => spans + .next() + .map(|(json_value, num_bytes)| JsonDoc::try_from_json_value(json_value, num_bytes)), + } + } +} + +impl From> for JsonDocIterator +where E: Into +{ + fn from(result: Result) -> Self { + match result { + Ok(json_doc) => Self::One(Some(Ok(json_doc))), + Err(error) => Self::One(Some(Err(error.into()))), + } + } +} + +impl From> for JsonDocIterator { + fn from(result: Result) -> Self { + match result { + Ok(json_doc) => Self::Spans(json_doc), + Err(_) => Self::One(Some(Err(DocProcessorError::ParsingError))), + } + } +} + +#[derive(Debug, Serialize)] pub struct DocProcessorCounters { index_id: String, source_id: String, @@ -115,19 +224,18 @@ pub struct DocProcessorCounters { /// into 4 categories: /// - number of docs that could not be parsed. /// - number of docs that could not be transformed. - /// - number of docs without a timestamp (if the index has no timestamp field, - /// then this counter is equal to zero) + /// - number of docs for which the doc mapper returnd an error. /// - number of valid docs. - pub num_parse_errors: u64, - pub num_transform_errors: u64, - pub num_docs_with_missing_fields: u64, - pub num_valid_docs: u64, + pub num_parse_errors: AtomicU64, + pub num_transform_errors: AtomicU64, + pub num_schema_errors: AtomicU64, + pub num_valid_docs: AtomicU64, /// Number of bytes that went through the indexer /// during its entire lifetime. /// /// Includes both valid and invalid documents. - pub overall_num_bytes: u64, + pub num_bytes_total: AtomicU64, } impl DocProcessorCounters { @@ -135,102 +243,71 @@ impl DocProcessorCounters { Self { index_id, source_id, - num_parse_errors: 0, - num_transform_errors: 0, - num_docs_with_missing_fields: 0, - num_valid_docs: 0, - overall_num_bytes: 0, + num_parse_errors: Default::default(), + num_transform_errors: Default::default(), + num_schema_errors: Default::default(), + num_valid_docs: Default::default(), + num_bytes_total: Default::default(), } } /// Returns the overall number of docs that went through the indexer (valid or not). pub fn num_processed_docs(&self) -> u64 { - self.num_valid_docs - + self.num_parse_errors - + self.num_docs_with_missing_fields - + self.num_transform_errors + self.num_valid_docs.load(Ordering::Relaxed) + + self.num_parse_errors.load(Ordering::Relaxed) + + self.num_schema_errors.load(Ordering::Relaxed) + + self.num_transform_errors.load(Ordering::Relaxed) } /// Returns the overall number of docs that were sent to the indexer but were invalid. /// (For instance, because they were missing a required field or because their because /// their format was invalid) pub fn num_invalid_docs(&self) -> u64 { - self.num_parse_errors + self.num_docs_with_missing_fields + self.num_transform_errors + self.num_parse_errors.load(Ordering::Relaxed) + + self.num_schema_errors.load(Ordering::Relaxed) + + self.num_transform_errors.load(Ordering::Relaxed) } - pub fn record_parsing_error(&mut self, num_bytes: u64) { - self.num_parse_errors += 1; - self.overall_num_bytes += num_bytes; - crate::metrics::INDEXER_METRICS - .processed_docs_total - .with_label_values([ - self.index_id.as_str(), - self.source_id.as_str(), - "parsing_error", - ]) - .inc(); - crate::metrics::INDEXER_METRICS - .processed_bytes - .with_label_values([ - self.index_id.as_str(), - self.source_id.as_str(), - "parsing_error", - ]) - .inc_by(num_bytes); - } + pub fn record_valid(&self, num_bytes: u64) { + self.num_valid_docs.fetch_add(1, Ordering::Relaxed); + self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed); - pub fn record_transform_error(&mut self, num_bytes: u64) { - self.num_transform_errors += 1; - self.overall_num_bytes += num_bytes; crate::metrics::INDEXER_METRICS .processed_docs_total - .with_label_values([ - self.index_id.as_str(), - self.source_id.as_str(), - "transform_error", - ]) + .with_label_values([&self.index_id, &self.source_id, "valid"]) .inc(); crate::metrics::INDEXER_METRICS .processed_bytes - .with_label_values([ - self.index_id.as_str(), - self.source_id.as_str(), - "transform_error", - ]) + .with_label_values([&self.index_id, &self.source_id, "valid"]) .inc_by(num_bytes); } - pub fn record_missing_field(&mut self, num_bytes: u64) { - self.num_docs_with_missing_fields += 1; - self.overall_num_bytes += num_bytes; + pub fn record_error(&self, error: DocProcessorError, num_bytes: u64) { + let label = match error { + DocProcessorError::ParsingError => { + self.num_parse_errors.fetch_add(1, Ordering::Relaxed); + "parsing_error" + } + DocProcessorError::SchemaError => { + self.num_schema_errors.fetch_add(1, Ordering::Relaxed); + "schema_error" + } + #[cfg(feature = "vrl")] + DocProcessorError::TransformError(_) => { + self.num_transform_errors.fetch_add(1, Ordering::Relaxed); + "transform_error" + } + }; crate::metrics::INDEXER_METRICS .processed_docs_total - .with_label_values([ - self.index_id.as_str(), - self.source_id.as_str(), - "missing_field", - ]) + .with_label_values([&self.index_id, &self.source_id, label]) .inc(); - crate::metrics::INDEXER_METRICS - .processed_bytes - .with_label_values([ - self.index_id.as_str(), - self.source_id.as_str(), - "missing_field", - ]) - .inc_by(num_bytes); - } - pub fn record_valid(&mut self, num_bytes: u64) { - self.num_valid_docs += 1; - self.overall_num_bytes += num_bytes; - crate::metrics::INDEXER_METRICS - .processed_docs_total - .with_label_values([self.index_id.as_str(), self.source_id.as_str(), "valid"]) - .inc(); + self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed); + crate::metrics::INDEXER_METRICS .processed_bytes - .with_label_values([self.index_id.as_str(), self.source_id.as_str(), "valid"]) + .with_label_values([&self.index_id, &self.source_id, label]) .inc_by(num_bytes); } } @@ -239,7 +316,7 @@ pub struct DocProcessor { doc_mapper: Arc, indexer_mailbox: Mailbox, timestamp_field_opt: Option, - counters: DocProcessorCounters, + counters: Arc, publish_lock: PublishLock, #[cfg(feature = "vrl")] transform_opt: Option, @@ -255,15 +332,15 @@ impl DocProcessor { transform_config_opt: Option, input_format: SourceInputFormat, ) -> anyhow::Result { - let timestamp_field_opt = extract_timestamp_field(doc_mapper.as_ref())?; + let timestamp_field_opt = extract_timestamp_field(&*doc_mapper)?; if cfg!(not(feature = "vrl")) && transform_config_opt.is_some() { - anyhow::bail!("VRL is not enabled. please recompile with the `vrl` feature") + bail!("VRL is not enabled. please recompile with the `vrl` feature") } let doc_processor = Self { doc_mapper, indexer_mailbox, timestamp_field_opt, - counters: DocProcessorCounters::new(index_id, source_id), + counters: Arc::new(DocProcessorCounters::new(index_id, source_id)), publish_lock: PublishLock::default(), #[cfg(feature = "vrl")] transform_opt: transform_config_opt @@ -288,47 +365,44 @@ impl DocProcessor { let timestamp = doc .get_first(timestamp_field) .and_then(|val| val.as_datetime()) - .ok_or(DocProcessorError::MissingField)?; + .ok_or(DocProcessorError::SchemaError)?; Ok(Some(timestamp)) } - #[cfg(feature = "vrl")] - fn get_json_doc(&mut self, input_doc: InputDoc) -> Result { - if let Some(vrl_program) = self.transform_opt.as_mut() { - let vrl_doc = input_doc.try_into_vrl_doc()?; - let transformed_vrl_doc = vrl_program.transform_doc(vrl_doc)?; - if let Ok(JsonValue::Object(json_doc)) = serde_json::to_value(transformed_vrl_doc) { - Ok(json_doc) - } else { - Err(DocProcessorError::ParsingError) + fn process_raw_doc(&mut self, raw_doc: Bytes, processed_docs: &mut Vec) { + let num_bytes = raw_doc.len(); + + #[cfg(feature = "vrl")] + let transform_opt = self.transform_opt.as_mut(); + #[cfg(not(feature = "vrl"))] + let transform_opt: Option<&mut VrlProgram> = None; + + for json_doc_result in parse_raw_doc(self.input_format, raw_doc, num_bytes, transform_opt) { + let processed_doc_result = + json_doc_result.and_then(|json_doc| self.process_json_doc(json_doc)); + + match processed_doc_result { + Ok(processed_doc) => { + self.counters.record_valid(processed_doc.num_bytes as u64); + processed_docs.push(processed_doc); + } + Err(error) => { + self.counters.record_error(error, num_bytes as u64); + } } - } else { - input_doc.try_into_json_doc() } } - #[cfg(not(feature = "vrl"))] - fn get_json_doc(&mut self, input_doc: InputDoc) -> Result { - input_doc.try_into_json_doc() - } - - fn process_document( - &mut self, - doc_bytes: Bytes, - ctx: &ActorContext, - ) -> Result { - let _protect_guard = ctx.protect_zone(); + fn process_json_doc(&self, json_doc: JsonDoc) -> Result { + let num_bytes = json_doc.num_bytes; - let num_bytes = doc_bytes.len(); - let input_doc = InputDoc::from_bytes(&self.input_format, doc_bytes); - let json_doc: JsonObject = self.get_json_doc(input_doc)?; let (partition, doc) = self .doc_mapper - .doc_from_json_obj(json_doc) + .doc_from_json_obj(json_doc.json_obj) .map_err(|error| { warn!(error=?error); match error { - DocParsingError::RequiredField(_) => DocProcessorError::MissingField, + DocParsingError::RequiredField(_) => DocProcessorError::SchemaError, _ => DocProcessorError::ParsingError, } })?; @@ -353,9 +427,12 @@ fn extract_timestamp_field(doc_mapper: &dyn DocMapper) -> anyhow::Result; fn observable_state(&self) -> Self::ObservableState { self.counters.clone() @@ -405,25 +482,9 @@ impl Handler for DocProcessor { return Ok(()); } let mut processed_docs: Vec = Vec::with_capacity(raw_doc_batch.docs.len()); - for doc in raw_doc_batch.docs { - let doc_num_bytes = doc.len() as u64; - - match self.process_document(doc, ctx) { - Ok(document) => { - self.counters.record_valid(doc_num_bytes); - processed_docs.push(document); - } - Err(DocProcessorError::ParsingError) => { - self.counters.record_parsing_error(doc_num_bytes); - } - #[cfg(feature = "vrl")] - Err(DocProcessorError::TransformError(_)) => { - self.counters.record_transform_error(doc_num_bytes); - } - Err(DocProcessorError::MissingField) => { - self.counters.record_missing_field(doc_num_bytes); - } - } + for raw_doc in raw_doc_batch.docs { + let _protected_zone_guard = ctx.protect_zone(); + self.process_raw_doc(raw_doc, &mut processed_docs); ctx.record_progress(); } let processed_doc_batch = ProcessedDocBatch { @@ -472,9 +533,15 @@ mod tests { use std::sync::Arc; use bytes::Bytes; + use prost::Message; use quickwit_actors::Universe; + use quickwit_common::uri::Uri; + use quickwit_config::{build_doc_mapper, SearchSettings}; use quickwit_doc_mapper::{default_doc_mapper_for_test, DefaultDocMapper}; use quickwit_metastore::checkpoint::SourceCheckpointDelta; + use quickwit_opentelemetry::otlp::OtlpGrpcTracesService; + use quickwit_proto::opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest; + use quickwit_proto::opentelemetry::proto::trace::v1::{ResourceSpans, ScopeSpans, Span}; use serde_json::Value as JsonValue; use tantivy::schema::NamedFieldDocument; use tantivy::Document; @@ -512,22 +579,18 @@ mod tests { 0..4, )) .await?; - let doc_processor_counters = doc_processor_handle + let counters = doc_processor_handle .process_pending_and_observe() .await .state; - assert_eq!( - doc_processor_counters, - DocProcessorCounters { - index_id: index_id.to_string(), - source_id: source_id.to_string(), - num_parse_errors: 1, - num_transform_errors: 0, - num_docs_with_missing_fields: 1, - num_valid_docs: 2, - overall_num_bytes: 387, - } - ); + assert_eq!(counters.index_id, index_id); + assert_eq!(counters.source_id, source_id); + assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 1); + assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0); + assert_eq!(counters.num_schema_errors.load(Ordering::Relaxed), 1); + assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); + assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 387); + let output_messages = indexer_inbox.drain_for_test(); assert_eq!(output_messages.len(), 1); let batch = *(output_messages @@ -704,6 +767,174 @@ mod tests { assert!(indexer_messages.is_empty()); universe.assert_quit().await; } + + #[tokio::test] + async fn test_doc_processor_otlp_trace_json() { + let root_uri = Uri::for_test("ram:///indexes"); + let index_config = OtlpGrpcTracesService::index_config(&root_uri).unwrap(); + let doc_mapper = + build_doc_mapper(&index_config.doc_mapping, &SearchSettings::default()).unwrap(); + + let universe = Universe::with_accelerated_time(); + let (indexer_mailbox, indexer_inbox) = universe.create_test_mailbox(); + let doc_processor = DocProcessor::try_new( + "my-index".to_string(), + "my-source".to_string(), + doc_mapper, + indexer_mailbox, + None, + SourceInputFormat::OtlpTraceJson, + ) + .unwrap(); + + let (doc_processor_mailbox, doc_processor_handle) = + universe.spawn_builder().spawn(doc_processor); + + let scope_spans = vec![ScopeSpans { + spans: vec![ + Span { + trace_id: vec![1; 16], + span_id: vec![2; 8], + start_time_unix_nano: 1_000_000_001, + end_time_unix_nano: 1_000_000_002, + ..Default::default() + }, + Span { + trace_id: vec![3; 16], + span_id: vec![4; 8], + start_time_unix_nano: 2_000_000_001, + end_time_unix_nano: 2_000_000_002, + ..Default::default() + }, + ], + ..Default::default() + }]; + let resource_spans = vec![ResourceSpans { + scope_spans, + ..Default::default() + }]; + let request = ExportTraceServiceRequest { + resource_spans, + ..Default::default() + }; + let raw_doc_json = serde_json::to_vec(&request).unwrap(); + let raw_doc = Bytes::from(raw_doc_json); + + let raw_doc_batch = RawDocBatch { + docs: vec![raw_doc], + checkpoint_delta: SourceCheckpointDelta::from_range(0..2), + force_commit: false, + }; + doc_processor_mailbox + .send_message(raw_doc_batch) + .await + .unwrap(); + + universe + .send_exit_with_success(&doc_processor_mailbox) + .await + .unwrap(); + + let counters = doc_processor_handle + .process_pending_and_observe() + .await + .state; + // assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 1); + assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); + + let batch = indexer_inbox.drain_for_test_typed::(); + assert_eq!(batch.len(), 1); + assert_eq!(batch[0].docs.len(), 2); + + let (exit_status, _) = doc_processor_handle.join().await; + assert!(matches!(exit_status, ActorExitStatus::Success)); + universe.assert_quit().await; + } + + #[tokio::test] + async fn test_doc_processor_otlp_trace_proto() { + let root_uri = Uri::for_test("ram:///indexes"); + let index_config = OtlpGrpcTracesService::index_config(&root_uri).unwrap(); + let doc_mapper = + build_doc_mapper(&index_config.doc_mapping, &SearchSettings::default()).unwrap(); + + let universe = Universe::with_accelerated_time(); + let (indexer_mailbox, indexer_inbox) = universe.create_test_mailbox(); + let doc_processor = DocProcessor::try_new( + "my-index".to_string(), + "my-source".to_string(), + doc_mapper, + indexer_mailbox, + None, + SourceInputFormat::OtlpTraceProtobuf, + ) + .unwrap(); + + let (doc_processor_mailbox, doc_processor_handle) = + universe.spawn_builder().spawn(doc_processor); + + let scope_spans = vec![ScopeSpans { + spans: vec![ + Span { + trace_id: vec![1; 16], + span_id: vec![2; 8], + start_time_unix_nano: 1_000_000_001, + end_time_unix_nano: 1_000_000_002, + ..Default::default() + }, + Span { + trace_id: vec![3; 16], + span_id: vec![4; 8], + start_time_unix_nano: 2_000_000_001, + end_time_unix_nano: 2_000_000_002, + ..Default::default() + }, + ], + ..Default::default() + }]; + let resource_spans = vec![ResourceSpans { + scope_spans, + ..Default::default() + }]; + let request = ExportTraceServiceRequest { + resource_spans, + ..Default::default() + }; + let mut raw_doc_buffer = Vec::new(); + request.encode(&mut raw_doc_buffer).unwrap(); + + let raw_doc = Bytes::from(raw_doc_buffer); + + let raw_doc_batch = RawDocBatch { + docs: vec![raw_doc], + checkpoint_delta: SourceCheckpointDelta::from_range(0..2), + force_commit: false, + }; + doc_processor_mailbox + .send_message(raw_doc_batch) + .await + .unwrap(); + + universe + .send_exit_with_success(&doc_processor_mailbox) + .await + .unwrap(); + + let counters = doc_processor_handle + .process_pending_and_observe() + .await + .state; + // assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 1); + assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); + + let batch = indexer_inbox.drain_for_test_typed::(); + assert_eq!(batch.len(), 1); + assert_eq!(batch[0].docs.len(), 2); + + let (exit_status, _) = doc_processor_handle.join().await; + assert!(matches!(exit_status, ActorExitStatus::Success)); + universe.assert_quit().await; + } } #[cfg(feature = "vrl")] @@ -747,22 +978,18 @@ mod tests_vrl { 0..4, )) .await?; - let doc_processor_counters = doc_processor_handle + let counters = doc_processor_handle .process_pending_and_observe() .await .state; - assert_eq!( - doc_processor_counters, - DocProcessorCounters { - index_id: index_id.to_string(), - source_id: source_id.to_string(), - num_parse_errors: 1, - num_transform_errors: 0, - num_docs_with_missing_fields: 1, - num_valid_docs: 2, - overall_num_bytes: 397, - } - ); + assert_eq!(counters.index_id, index_id.to_string()); + assert_eq!(counters.source_id, source_id.to_string()); + assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 1); + assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0); + assert_eq!(counters.num_schema_errors.load(Ordering::Relaxed), 1); + assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); + assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 397); + let output_messages = indexer_inbox.drain_for_test(); assert_eq!(output_messages.len(), 1); let batch = *(output_messages @@ -841,22 +1068,18 @@ mod tests_vrl { 0..4, )) .await.unwrap(); - let doc_processor_counters = doc_processor_handle + let counters = doc_processor_handle .process_pending_and_observe() .await .state; - assert_eq!( - doc_processor_counters, - DocProcessorCounters { - index_id: index_id.to_string(), - source_id: source_id.to_string(), - num_parse_errors: 0, - num_transform_errors: 1, - num_docs_with_missing_fields: 0, - num_valid_docs: 2, - overall_num_bytes: 200, - } - ); + assert_eq!(counters.index_id, index_id); + assert_eq!(counters.source_id, source_id); + assert_eq!(counters.num_parse_errors.load(Ordering::Relaxed), 0,); + assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 1,); + assert_eq!(counters.num_schema_errors.load(Ordering::Relaxed), 0,); + assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2,); + assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 200,); + let output_messages = indexer_inbox.drain_for_test(); assert_eq!(output_messages.len(), 1); let batch = *(output_messages diff --git a/quickwit/quickwit-indexing/src/actors/vrl_processing.rs b/quickwit/quickwit-indexing/src/actors/vrl_processing.rs index 4e76dfde6ac..974f5c31e3e 100644 --- a/quickwit/quickwit-indexing/src/actors/vrl_processing.rs +++ b/quickwit/quickwit-indexing/src/actors/vrl_processing.rs @@ -29,20 +29,39 @@ pub use vrl::value::{Secrets as VrlSecrets, Value as VrlValue}; use super::doc_processor::DocProcessorError; -pub struct VrlProgram { - runtime: Runtime, +pub(super) struct VrlDoc { + pub vrl_value: VrlValue, + pub num_bytes: usize, +} + +impl VrlDoc { + pub fn new(vrl_value: VrlValue, num_bytes: usize) -> Self { + Self { + vrl_value, + num_bytes, + } + } +} + +pub(super) struct VrlProgram { program: Program, timezone: TimeZone, + runtime: Runtime, + metadata: VrlValue, + secrets: VrlSecrets, } impl VrlProgram { - pub fn transform_doc(&mut self, mut vrl_doc: VrlValue) -> Result { - let mut metadata = VrlValue::Object(BTreeMap::new()); - let mut secrets = VrlSecrets::new(); + pub fn transform_doc(&mut self, vrl_doc: VrlDoc) -> Result { + let VrlDoc { + mut vrl_value, + num_bytes, + } = vrl_doc; + let mut target = TargetValueRef { - value: &mut vrl_doc, - metadata: &mut metadata, - secrets: &mut secrets, + value: &mut vrl_value, + metadata: &mut self.metadata, + secrets: &mut self.secrets, }; let runtime_res = self .runtime @@ -52,9 +71,12 @@ impl VrlProgram { DocProcessorError::TransformError(transform_error) }); + if let VrlValue::Object(metadata) = target.metadata { + metadata.clear(); + } self.runtime.clear(); - runtime_res + runtime_res.map(|vrl_value| VrlDoc::new(vrl_value, num_bytes)) } pub fn try_from_transform_config(transform_config: TransformConfig) -> anyhow::Result { @@ -66,6 +88,8 @@ impl VrlProgram { program, runtime, timezone, + metadata: VrlValue::Object(BTreeMap::new()), + secrets: VrlSecrets::default(), }) } } diff --git a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs index 503dd138656..8d9c67acda7 100644 --- a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs +++ b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs @@ -61,9 +61,13 @@ impl IndexingStatistics { self.num_docs += doc_processor_counters.num_processed_docs(); self.num_invalid_docs += doc_processor_counters.num_invalid_docs(); self.num_local_splits += indexer_counters.num_splits_emitted; - self.total_bytes_processed += doc_processor_counters.overall_num_bytes; - self.num_staged_splits += uploader_counters.num_staged_splits.load(Ordering::SeqCst); - self.num_uploaded_splits += uploader_counters.num_uploaded_splits.load(Ordering::SeqCst); + self.total_bytes_processed += doc_processor_counters + .num_bytes_total + .load(Ordering::Relaxed); + self.num_staged_splits += uploader_counters.num_staged_splits.load(Ordering::Relaxed); + self.num_uploaded_splits += uploader_counters + .num_uploaded_splits + .load(Ordering::Relaxed); self.num_published_splits += publisher_counters.num_published_splits; self.num_empty_splits += publisher_counters.num_empty_splits; self diff --git a/quickwit/quickwit-opentelemetry/src/otlp/mod.rs b/quickwit/quickwit-opentelemetry/src/otlp/mod.rs index f1d3baf5ff2..97c2bf63ef1 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/mod.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/mod.rs @@ -35,10 +35,17 @@ pub use logs::{OtlpGrpcLogsService, OTEL_LOGS_INDEX_ID}; pub use span_id::{SpanId, TryFromSpanIdError}; pub use trace_id::{TraceId, TryFromTraceIdError}; pub use traces::{ - Event, Link, OtlpGrpcTracesService, Span, SpanFingerprint, SpanKind, SpanStatus, + parse_otlp_spans_json, parse_otlp_spans_protobuf, Event, JsonSpanIterator, Link, + OtlpGrpcTracesService, OtlpTraceError, Span, SpanFingerprint, SpanKind, SpanStatus, OTEL_TRACES_INDEX_ID, }; +impl From for tonic::Status { + fn from(error: OtlpTraceError) -> Self { + tonic::Status::invalid_argument(error.to_string()) + } +} + impl From for tonic::Status { fn from(error: TryFromSpanIdError) -> Self { tonic::Status::invalid_argument(error.to_string()) diff --git a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs index 7d959b1c981..270dfa2da15 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs @@ -18,10 +18,11 @@ // along with this program. If not, see . use std::cmp::{Ord, Ordering, PartialEq, PartialOrd}; -use std::collections::{BTreeSet, HashMap}; +use std::collections::{btree_set, BTreeSet, HashMap}; use std::str::FromStr; use async_trait::async_trait; +use prost::Message; use quickwit_common::uri::Uri; use quickwit_config::{load_index_config_from_user_config, ConfigFormat, IndexConfig}; use quickwit_ingest::{ @@ -42,7 +43,7 @@ use tonic::{Request, Response, Status}; use tracing::field::Empty; use tracing::{error, instrument, warn, Span as RuntimeSpan}; -use super::is_zero; +use super::{is_zero, TryFromSpanIdError, TryFromTraceIdError}; use crate::otlp::metrics::OTLP_SERVICE_METRICS; use crate::otlp::{extract_attributes, SpanId, TraceId}; @@ -153,6 +154,18 @@ search_settings: default_search_fields: [] "#; +#[derive(Debug, thiserror::Error)] +pub enum OtlpTraceError { + #[error("failed to deserialize JSON span: `{0}`")] + Json(#[from] serde_json::Error), + #[error("failed to deserialize Protobuf span: `{0}`")] + Protobuf(#[from] prost::DecodeError), + #[error("failed to parse span: `{0}`")] + SpanId(#[from] TryFromSpanIdError), + #[error("failed to parse span: `{0}`")] + TraceId(#[from] TryFromTraceIdError), +} + #[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct Span { pub trace_id: TraceId, @@ -215,7 +228,11 @@ pub struct Span { } impl Span { - fn from_otlp(span: OtlpSpan, resource: &Resource, scope: &Scope) -> Result { + fn from_otlp( + span: OtlpSpan, + resource: &Resource, + scope: &Scope, + ) -> Result { let trace_id = TraceId::try_from(span.trace_id)?; let span_id = SpanId::try_from(span.span_id)?; let parent_span_id = if !span.parent_span_id.is_empty() { @@ -294,6 +311,10 @@ impl Span { struct OrdSpan(Span); impl Ord for OrdSpan { + /// Sort spans by trace ID, span name, start timestamp, and span ID in an attempt to group the + /// spans by trace ID and span name in the same docstore blocks. At some point, the + /// cost–benefit of this approach should be evaluated via a benchmark and revisited if + /// necessary. fn cmp(&self, other: &Self) -> Ordering { self.0 .trace_id @@ -594,7 +615,7 @@ pub struct Link { } impl Link { - fn try_from_otlp(link: OtlpLink) -> tonic::Result { + fn try_from_otlp(link: OtlpLink) -> Result { let link_trace_id = TraceId::try_from(link.trace_id)?; let link_span_id = SpanId::try_from(link.span_id)?; let link = Link { @@ -612,6 +633,27 @@ impl Link { } } +fn parse_otlp_spans( + request: ExportTraceServiceRequest, +) -> Result, OtlpTraceError> { + let mut spans = BTreeSet::new(); + + for resource_spans in request.resource_spans { + let resource = resource_spans + .resource + .map(Resource::from_otlp) + .unwrap_or_default(); + for scope_spans in resource_spans.scope_spans { + let scope = scope_spans.scope.map(Scope::from_otlp).unwrap_or_default(); + for span in scope_spans.spans { + let span = Span::from_otlp(span, &resource, &scope)?; + spans.insert(OrdSpan(span)); + } + } + } + Ok(spans) +} + struct ParsedSpans { doc_batch: DocBatch, num_spans: u64, @@ -695,32 +737,18 @@ impl OtlpGrpcTracesService { fn parse_spans( request: ExportTraceServiceRequest, parent_span: RuntimeSpan, - ) -> Result { - let mut ordered_spans = BTreeSet::new(); - let mut num_spans = 0; + ) -> tonic::Result { + let spans = parse_otlp_spans(request)?; + let num_spans = spans.len() as u64; let mut num_parse_errors = 0; let mut error_message = String::new(); - for resource_spans in request.resource_spans { - let resource = resource_spans - .resource - .map(Resource::from_otlp) - .unwrap_or_default(); - for scope_spans in resource_spans.scope_spans { - let scope = scope_spans.scope.map(Scope::from_otlp).unwrap_or_default(); - for span in scope_spans.spans { - num_spans += 1; - let span = Span::from_otlp(span, &resource, &scope)?; - ordered_spans.insert(OrdSpan(span)); - } - } - } let mut doc_batch_builder = DocBatchBuilder::new(OTEL_TRACES_INDEX_ID.to_string()).json_writer(); - for span in ordered_spans { + for span in spans { if let Err(error) = doc_batch_builder.ingest_doc(&span.0) { - error!(error=?error, "Failed to JSON serialize span."); - error_message = format!("Failed to JSON serialize span: {error:?}"); + error!(error=?error, "failed to JSON serialize span."); + error_message = format!("failed to JSON serialize span: {error:?}"); num_parse_errors += 1; } } @@ -797,6 +825,61 @@ impl TraceService for OtlpGrpcTracesService { } } +/// An JSON span iterator for use in the doc processor. +pub struct JsonSpanIterator { + spans: btree_set::IntoIter, + current_span_idx: usize, + num_spans: usize, + avg_span_size: usize, + avg_span_size_rem: usize, +} + +impl JsonSpanIterator { + fn new(spans: BTreeSet, num_bytes: usize) -> Self { + let num_spans = spans.len(); + let avg_span_size = num_bytes / num_spans; + let avg_span_size_rem = avg_span_size + num_bytes % num_spans; + + Self { + spans: spans.into_iter(), + current_span_idx: 0, + num_spans, + avg_span_size, + avg_span_size_rem, + } + } +} + +impl Iterator for JsonSpanIterator { + type Item = (JsonValue, usize); + + fn next(&mut self) -> Option { + let span_opt = self.spans.next().map(|OrdSpan(span)| { + serde_json::to_value(span).expect("`Span` should be JSON serializable") + }); + if span_opt.is_some() { + self.current_span_idx += 1; + } + if self.current_span_idx < self.num_spans { + span_opt.map(|span| (span, self.avg_span_size)) + } else { + span_opt.map(|span| (span, self.avg_span_size_rem)) + } + } +} + +pub fn parse_otlp_spans_json(payload_json: &[u8]) -> Result { + let request: ExportTraceServiceRequest = serde_json::from_slice(payload_json)?; + let spans = parse_otlp_spans(request)?; + Ok(JsonSpanIterator::new(spans, payload_json.len())) +} + +pub fn parse_otlp_spans_protobuf(payload_proto: &[u8]) -> Result { + let request = ExportTraceServiceRequest::decode(payload_proto)?; + let spans = parse_otlp_spans(request)?; + Ok(JsonSpanIterator::new(spans, payload_proto.len())) +} + #[cfg(test)] mod tests { use quickwit_metastore::{metastore_for_test, CreateIndexRequestExt};