From c3f9eedb8429ab141c1d0bf1ef499c626364bc1a Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 14 Jun 2024 15:28:47 +0900 Subject: [PATCH] Avoiding unnecessary hashmap lookup on every single doc --- quickwit/quickwit-common/src/metrics.rs | 15 +++++ .../src/actors/doc_processor.rs | 66 +++++++++++-------- quickwit/quickwit-indexing/src/metrics.rs | 24 ++++++- 3 files changed, 77 insertions(+), 28 deletions(-) diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index a757eb9e2ba..058bdab0cc5 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::collections::{BTreeMap, HashMap}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::OnceLock; use once_cell::sync::Lazy; @@ -450,3 +451,17 @@ impl InFlightDataGauges { } pub static MEMORY_METRICS: Lazy = Lazy::new(MemoryMetrics::default); + +static PER_INDEX_METRICS_ENABLED: AtomicBool = AtomicBool::new(true); + +pub fn disable_per_index_metrics() { + PER_INDEX_METRICS_ENABLED.store(false, Ordering::Relaxed); +} + +pub fn index_label<'a>(index_name: &'a str) -> &'a str { + if PER_INDEX_METRICS_ENABLED.load(Ordering::Relaxed) { + index_name + } else { + "" + } +} diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 3fe17d7106a..31d61482641 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -44,6 +44,7 @@ use tokio::runtime::Handle; #[cfg(feature = "vrl")] use super::vrl_processing::*; use crate::actors::Indexer; +use crate::metrics::DocProcessorMetrics; use crate::models::{ NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch, }; @@ -291,10 +292,23 @@ pub struct DocProcessorCounters { /// /// Includes both valid and invalid documents. pub num_bytes_total: AtomicU64, + + #[serde(skip_serializing)] + pub doc_processor_metrics_bytes: DocProcessorMetrics, + #[serde(skip_serializing)] + pub doc_processor_metrics_docs: DocProcessorMetrics, } impl DocProcessorCounters { pub fn new(index_id: IndexId, source_id: SourceId) -> Self { + let doc_processor_metrics_bytes = DocProcessorMetrics::from_counters( + index_id.as_str(), + &crate::metrics::INDEXER_METRICS.processed_bytes, + ); + let doc_processor_metrics_docs = DocProcessorMetrics::from_counters( + index_id.as_str(), + &crate::metrics::INDEXER_METRICS.processed_docs_total, + ); Self { index_id, source_id, @@ -303,6 +317,9 @@ impl DocProcessorCounters { num_oltp_parse_errors: Default::default(), num_valid_docs: Default::default(), num_bytes_total: Default::default(), + + doc_processor_metrics_bytes, + doc_processor_metrics_docs, } } @@ -327,47 +344,43 @@ impl DocProcessorCounters { self.num_valid_docs.fetch_add(1, Ordering::Relaxed); self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed); - crate::metrics::INDEXER_METRICS - .processed_docs_total - .with_label_values([&self.index_id, "valid"]) - .inc(); - crate::metrics::INDEXER_METRICS - .processed_bytes - .with_label_values([&self.index_id, "valid"]) - .inc_by(num_bytes); + self.doc_processor_metrics_docs.valid.inc(); + self.doc_processor_metrics_bytes.valid.inc_by(num_bytes); } pub fn record_error(&self, error: DocProcessorError, num_bytes: u64) { - let label = match error { + self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed); + match error { DocProcessorError::DocMapperParsing(_) => { self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed); - "doc_mapper_error" + self.doc_processor_metrics_bytes + .doc_mapper_error + .inc_by(num_bytes); + self.doc_processor_metrics_docs.doc_mapper_error.inc(); } DocProcessorError::JsonParsing(_) => { self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed); - "json_parse_error" + self.doc_processor_metrics_bytes + .json_parse_error + .inc_by(num_bytes); + self.doc_processor_metrics_docs.json_parse_error.inc(); } DocProcessorError::OltpLogsParsing(_) | DocProcessorError::OltpTracesParsing(_) => { self.num_oltp_parse_errors.fetch_add(1, Ordering::Relaxed); - "otlp_parse_error" + self.doc_processor_metrics_bytes + .otlp_parse_error + .inc_by(num_bytes); + self.doc_processor_metrics_docs.otlp_parse_error.inc(); } #[cfg(feature = "vrl")] DocProcessorError::Transform(_) => { self.num_transform_errors.fetch_add(1, Ordering::Relaxed); - "transform_error" + self.doc_processor_metrics_bytes + .transform_error + .inc_by(num_bytes); + self.doc_processor_metrics_docs.transform_error.inc(); } }; - crate::metrics::INDEXER_METRICS - .processed_docs_total - .with_label_values([&self.index_id, label]) - .inc(); - - self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed); - - crate::metrics::INDEXER_METRICS - .processed_bytes - .with_label_values([&self.index_id, label]) - .inc_by(num_bytes); } } @@ -395,7 +408,7 @@ impl DocProcessor { if cfg!(not(feature = "vrl")) && transform_config_opt.is_some() { bail!("VRL is not enabled: please recompile with the `vrl` feature") } - let doc_processor = Self { + Ok(DocProcessor { doc_mapper, indexer_mailbox, timestamp_field_opt, @@ -406,8 +419,7 @@ impl DocProcessor { .map(VrlProgram::try_from_transform_config) .transpose()?, input_format, - }; - Ok(doc_processor) + }) } // Extract a timestamp from a tantivy document. diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index 583a103f966..d8cb0ff7f78 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -19,9 +19,31 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - new_counter_vec, new_gauge, new_gauge_vec, IntCounterVec, IntGauge, IntGaugeVec, + new_counter_vec, new_gauge, new_gauge_vec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, }; +#[derive(Debug)] +pub struct DocProcessorMetrics { + pub valid: IntCounter, + pub doc_mapper_error: IntCounter, + pub json_parse_error: IntCounter, + pub otlp_parse_error: IntCounter, + pub transform_error: IntCounter, +} + +impl DocProcessorMetrics { + pub fn from_counters(index_id: &str, counter_vec: &IntCounterVec<2>) -> DocProcessorMetrics { + let index_label = quickwit_common::metrics::index_label(index_id); + DocProcessorMetrics { + valid: counter_vec.with_label_values([index_label, "valid"]), + doc_mapper_error: counter_vec.with_label_values([index_label, "doc_mapper_error"]), + json_parse_error: counter_vec.with_label_values([index_label, "json_parse_error"]), + otlp_parse_error: counter_vec.with_label_values([index_label, "otlp_parse_error"]), + transform_error: counter_vec.with_label_values([index_label, "transform_error"]), + } + } +} + pub struct IndexerMetrics { pub processed_docs_total: IntCounterVec<2>, pub processed_bytes: IntCounterVec<2>,