From 1e0e2fad66927fdb819d19cfd43c49981ff2a5a6 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 14 Jun 2024 16:12:41 +0900 Subject: [PATCH] Make it possible to disable per-index metrics with an environment variable. By default they are enabled. If the `PER_INDEX_METRICS_ENABLED` environment variable is set to false, then all of the index metrics are grouped under the index_id `__any__`. In addition, this PR refactors a little bit the way we handled the docprocessor metrics. We now cache the docprocessor counters, hence preventing 1 hash lookup per document. --- quickwit/quickwit-common/src/metrics.rs | 21 +-- .../src/model/shard_table.rs | 4 +- .../src/actors/doc_processor.rs | 174 ++++++++++-------- quickwit/quickwit-indexing/src/metrics.rs | 24 +-- .../src/actors/delete_task_planner.rs | 4 +- 5 files changed, 116 insertions(+), 111 deletions(-) diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 058bdab0cc5..b8346e96664 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -18,7 +18,6 @@ // along with this program. If not, see . use std::collections::{BTreeMap, HashMap}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::OnceLock; use once_cell::sync::Lazy; @@ -450,18 +449,16 @@ impl InFlightDataGauges { } } -pub static MEMORY_METRICS: Lazy = Lazy::new(MemoryMetrics::default); - -static PER_INDEX_METRICS_ENABLED: AtomicBool = AtomicBool::new(true); - -pub fn disable_per_index_metrics() { - PER_INDEX_METRICS_ENABLED.store(false, Ordering::Relaxed); -} - -pub fn index_label<'a>(index_name: &'a str) -> &'a str { - if PER_INDEX_METRICS_ENABLED.load(Ordering::Relaxed) { +/// This function return `index_name` or projects it to `` if per-index metrics are disabled. +pub fn index_label(index_name: &str) -> &str { + static PER_INDEX_METRICS_ENABLED: OnceLock = OnceLock::new(); + let per_index_metrics_enabled: bool = *PER_INDEX_METRICS_ENABLED + .get_or_init(|| crate::get_from_env("QW_PER_INDEX_METRICS_ENABLED", true)); + if per_index_metrics_enabled { index_name } else { - "" + "__any__" } } + +pub static MEMORY_METRICS: Lazy = Lazy::new(MemoryMetrics::default); diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 65395f63cac..8eb75e2ccf2 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -435,9 +435,11 @@ impl ShardTable { } else { 0 }; + let index_label = + quickwit_common::metrics::index_label(source_uid.index_uid.index_id.as_str()); crate::metrics::CONTROL_PLANE_METRICS .open_shards_total - .with_label_values([source_uid.index_uid.index_id.as_str()]) + .with_label_values([index_label]) .set(num_open_shards as i64); } diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 31d61482641..7eacb91869e 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -25,6 +25,7 @@ use anyhow::{bail, Context}; use async_trait::async_trait; use bytes::Bytes; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::metrics::IntCounter; use quickwit_common::rate_limited_tracing::rate_limited_warn; use quickwit_common::runtimes::RuntimeType; use quickwit_config::{SourceInputFormat, TransformConfig}; @@ -44,7 +45,6 @@ use tokio::runtime::Handle; #[cfg(feature = "vrl")] use super::vrl_processing::*; use crate::actors::Indexer; -use crate::metrics::DocProcessorMetrics; use crate::models::{ NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch, }; @@ -272,113 +272,137 @@ impl From> for JsonDocIterator { } } +#[derive(Debug)] +pub struct DocProcessorCounter { + pub num_docs: AtomicU64, + pub num_docs_metric: IntCounter, + pub num_bytes_metric: IntCounter, +} + +impl Serialize for DocProcessorCounter { + fn serialize(&self, serializer: S) -> Result + where S: serde::Serializer { + serializer.serialize_u64(self.get_num_docs()) + } +} + +impl DocProcessorCounter { + fn for_index_and_doc_processor_outcome(index: &str, outcome: &str) -> DocProcessorCounter { + let index_label = quickwit_common::metrics::index_label(index); + let labels = [index_label, outcome]; + DocProcessorCounter { + num_docs: Default::default(), + num_docs_metric: crate::metrics::INDEXER_METRICS + .processed_docs_total + .with_label_values(labels), + num_bytes_metric: crate::metrics::INDEXER_METRICS + .processed_bytes + .with_label_values(labels), + } + } + + #[inline(always)] + fn get_num_docs(&self) -> u64 { + self.num_docs.load(Ordering::Relaxed) + } + + fn record_doc(&self, num_bytes: u64) { + self.num_docs.fetch_add(1, Ordering::Relaxed); + self.num_docs_metric.inc(); + self.num_bytes_metric.inc_by(num_bytes); + } +} + #[derive(Debug, Serialize)] pub struct DocProcessorCounters { index_id: IndexId, source_id: SourceId, + /// Overall number of documents received, partitioned - /// into 4 categories: + /// into 5 categories: + /// - valid documents /// - number of docs that could not be parsed. + /// - number of docs that were not valid json. /// - number of docs that could not be transformed. /// - number of docs for which the doc mapper returnd an error. /// - number of valid docs. - pub num_doc_parse_errors: AtomicU64, - pub num_transform_errors: AtomicU64, - pub num_oltp_parse_errors: AtomicU64, - pub num_valid_docs: AtomicU64, + pub valid: DocProcessorCounter, + pub doc_mapper_errors: DocProcessorCounter, + pub transform_errors: DocProcessorCounter, + pub json_parse_errors: DocProcessorCounter, + pub otlp_parse_errors: DocProcessorCounter, /// Number of bytes that went through the indexer /// during its entire lifetime. /// /// Includes both valid and invalid documents. pub num_bytes_total: AtomicU64, - - #[serde(skip_serializing)] - pub doc_processor_metrics_bytes: DocProcessorMetrics, - #[serde(skip_serializing)] - pub doc_processor_metrics_docs: DocProcessorMetrics, } impl DocProcessorCounters { pub fn new(index_id: IndexId, source_id: SourceId) -> Self { - let doc_processor_metrics_bytes = DocProcessorMetrics::from_counters( - index_id.as_str(), - &crate::metrics::INDEXER_METRICS.processed_bytes, - ); - let doc_processor_metrics_docs = DocProcessorMetrics::from_counters( - index_id.as_str(), - &crate::metrics::INDEXER_METRICS.processed_docs_total, - ); - Self { + let valid_docs = + DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "valid"); + let doc_mapper_errors = + DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "doc_mapper_error"); + let transform_errors = + DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "transform_error"); + let json_parse_errors = + DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "json_parse_error"); + let otlp_parse_errors = + DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "otlp_parse_error"); + DocProcessorCounters { index_id, source_id, - num_doc_parse_errors: Default::default(), - num_transform_errors: Default::default(), - num_oltp_parse_errors: Default::default(), - num_valid_docs: Default::default(), - num_bytes_total: Default::default(), - doc_processor_metrics_bytes, - doc_processor_metrics_docs, + valid: valid_docs, + doc_mapper_errors, + transform_errors, + json_parse_errors, + otlp_parse_errors, + num_bytes_total: Default::default(), } } /// Returns the overall number of docs that went through the indexer (valid or not). pub fn num_processed_docs(&self) -> u64 { - self.num_valid_docs.load(Ordering::Relaxed) - + self.num_doc_parse_errors.load(Ordering::Relaxed) - + self.num_oltp_parse_errors.load(Ordering::Relaxed) - + self.num_transform_errors.load(Ordering::Relaxed) + self.valid.get_num_docs() + + self.doc_mapper_errors.get_num_docs() + + self.json_parse_errors.get_num_docs() + + self.otlp_parse_errors.get_num_docs() + + self.transform_errors.get_num_docs() } /// Returns the overall number of docs that were sent to the indexer but were invalid. /// (For instance, because they were missing a required field or because their because /// their format was invalid) pub fn num_invalid_docs(&self) -> u64 { - self.num_doc_parse_errors.load(Ordering::Relaxed) - + self.num_oltp_parse_errors.load(Ordering::Relaxed) - + self.num_transform_errors.load(Ordering::Relaxed) + self.doc_mapper_errors.get_num_docs() + + self.json_parse_errors.get_num_docs() + + self.otlp_parse_errors.get_num_docs() + + self.transform_errors.get_num_docs() } pub fn record_valid(&self, num_bytes: u64) { - self.num_valid_docs.fetch_add(1, Ordering::Relaxed); self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed); - - self.doc_processor_metrics_docs.valid.inc(); - self.doc_processor_metrics_bytes.valid.inc_by(num_bytes); + self.valid.record_doc(num_bytes); } pub fn record_error(&self, error: DocProcessorError, num_bytes: u64) { self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed); match error { DocProcessorError::DocMapperParsing(_) => { - self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed); - self.doc_processor_metrics_bytes - .doc_mapper_error - .inc_by(num_bytes); - self.doc_processor_metrics_docs.doc_mapper_error.inc(); + self.doc_mapper_errors.record_doc(num_bytes); } DocProcessorError::JsonParsing(_) => { - self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed); - self.doc_processor_metrics_bytes - .json_parse_error - .inc_by(num_bytes); - self.doc_processor_metrics_docs.json_parse_error.inc(); + self.json_parse_errors.record_doc(num_bytes); } DocProcessorError::OltpLogsParsing(_) | DocProcessorError::OltpTracesParsing(_) => { - self.num_oltp_parse_errors.fetch_add(1, Ordering::Relaxed); - self.doc_processor_metrics_bytes - .otlp_parse_error - .inc_by(num_bytes); - self.doc_processor_metrics_docs.otlp_parse_error.inc(); + self.otlp_parse_errors.record_doc(num_bytes); } #[cfg(feature = "vrl")] DocProcessorError::Transform(_) => { - self.num_transform_errors.fetch_add(1, Ordering::Relaxed); - self.doc_processor_metrics_bytes - .transform_error - .inc_by(num_bytes); - self.doc_processor_metrics_docs.transform_error.inc(); + self.transform_errors.record_doc(num_bytes); } }; } @@ -662,10 +686,11 @@ mod tests { .state; assert_eq!(counters.index_id, index_id); assert_eq!(counters.source_id, source_id); - assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 2); - assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0); - assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0); - assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); + assert_eq!(counters.doc_mapper_errors.get_num_docs(), 1); + assert_eq!(counters.json_parse_errors.get_num_docs(), 1); + assert_eq!(counters.transform_errors.get_num_docs(), 0); + assert_eq!(counters.otlp_parse_errors.get_num_docs(), 0); + assert_eq!(counters.valid.get_num_docs(), 2); assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 387); let output_messages = indexer_inbox.drain_for_test(); @@ -902,7 +927,7 @@ mod tests { .process_pending_and_observe() .await .state; - assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); + assert_eq!(counters.valid.get_num_docs(), 2); let batch = indexer_inbox.drain_for_test_typed::(); assert_eq!(batch.len(), 1); @@ -981,7 +1006,7 @@ mod tests { .process_pending_and_observe() .await .state; - assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); + assert_eq!(counters.valid.get_num_docs(), 2); let batch = indexer_inbox.drain_for_test_typed::(); assert_eq!(batch.len(), 1); @@ -1054,7 +1079,7 @@ mod tests { .process_pending_and_observe() .await .state; - assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); + assert_eq!(counters.valid.get_num_docs(), 2); let batch = indexer_inbox.drain_for_test_typed::(); assert_eq!(batch.len(), 1); @@ -1129,7 +1154,7 @@ mod tests { .process_pending_and_observe() .await .state; - assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); + assert_eq!(counters.valid.get_num_docs(), 2); let batch = indexer_inbox.drain_for_test_typed::(); assert_eq!(batch.len(), 1); @@ -1188,10 +1213,11 @@ mod tests_vrl { .state; assert_eq!(counters.index_id, index_id.to_string()); assert_eq!(counters.source_id, source_id.to_string()); - assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 2); - assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0); - assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0); - assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2); + assert_eq!(counters.doc_mapper_errors.get_num_docs(), 1); + assert_eq!(counters.json_parse_errors.get_num_docs(), 1); + assert_eq!(counters.transform_errors.get_num_docs(), 0); + assert_eq!(counters.otlp_parse_errors.get_num_docs(), 0); + assert_eq!(counters.valid.get_num_docs(), 2); assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 397); let output_messages = indexer_inbox.drain_for_test(); @@ -1278,10 +1304,10 @@ mod tests_vrl { .state; assert_eq!(counters.index_id, index_id); assert_eq!(counters.source_id, source_id); - assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 0,); - assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 1,); - assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0,); - assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2,); + assert_eq!(counters.doc_mapper_errors.get_num_docs(), 0,); + assert_eq!(counters.transform_errors.get_num_docs(), 1,); + assert_eq!(counters.otlp_parse_errors.get_num_docs(), 0,); + assert_eq!(counters.valid.get_num_docs(), 2,); assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 200,); let output_messages = indexer_inbox.drain_for_test(); diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index d8cb0ff7f78..583a103f966 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -19,31 +19,9 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - new_counter_vec, new_gauge, new_gauge_vec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, + new_counter_vec, new_gauge, new_gauge_vec, IntCounterVec, IntGauge, IntGaugeVec, }; -#[derive(Debug)] -pub struct DocProcessorMetrics { - pub valid: IntCounter, - pub doc_mapper_error: IntCounter, - pub json_parse_error: IntCounter, - pub otlp_parse_error: IntCounter, - pub transform_error: IntCounter, -} - -impl DocProcessorMetrics { - pub fn from_counters(index_id: &str, counter_vec: &IntCounterVec<2>) -> DocProcessorMetrics { - let index_label = quickwit_common::metrics::index_label(index_id); - DocProcessorMetrics { - valid: counter_vec.with_label_values([index_label, "valid"]), - doc_mapper_error: counter_vec.with_label_values([index_label, "doc_mapper_error"]), - json_parse_error: counter_vec.with_label_values([index_label, "json_parse_error"]), - otlp_parse_error: counter_vec.with_label_values([index_label, "otlp_parse_error"]), - transform_error: counter_vec.with_label_values([index_label, "transform_error"]), - } - } -} - pub struct IndexerMetrics { pub processed_docs_total: IntCounterVec<2>, pub processed_bytes: IntCounterVec<2>, diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs index 659438c2b7d..a8adce51b0b 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs @@ -209,9 +209,11 @@ impl DeleteTaskPlanner { self.merge_split_downloader_mailbox.clone(), ) .await?; + let index_label = + quickwit_common::metrics::index_label(self.index_uid.index_id.as_str()); JANITOR_METRICS .ongoing_num_delete_operations_total - .with_label_values([&self.index_uid.index_id]) + .with_label_values([index_label]) .set(self.ongoing_delete_operations_inventory.list().len() as i64); } }