From 670e7cd5eaa2c32b90fd755ec2074a0fa9647958 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 14 Jun 2024 16:12:41 +0900 Subject: [PATCH] Make it possible to disable per-index metrics with an environment variable. By default they are enabled. If the `PER_INDEX_METRICS_ENABLED` environment variable is set to false, then all of the index metrics are grouped under the index_id `__any__`. In addition, this PR refactors a little bit the way we handled the docprocessor metrics. We now cache the docprocessor counters, hence preventing 1 hash lookup per document. --- quickwit/quickwit-common/src/metrics.rs | 21 ++++++------- .../src/model/shard_table.rs | 4 ++- .../src/actors/doc_processor.rs | 30 +++++++++++++++++-- quickwit/quickwit-indexing/src/metrics.rs | 24 +-------------- .../src/actors/delete_task_planner.rs | 4 ++- 5 files changed, 43 insertions(+), 40 deletions(-) diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 058bdab0cc5..b8346e96664 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -18,7 +18,6 @@ // along with this program. If not, see . use std::collections::{BTreeMap, HashMap}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::OnceLock; use once_cell::sync::Lazy; @@ -450,18 +449,16 @@ impl InFlightDataGauges { } } -pub static MEMORY_METRICS: Lazy = Lazy::new(MemoryMetrics::default); - -static PER_INDEX_METRICS_ENABLED: AtomicBool = AtomicBool::new(true); - -pub fn disable_per_index_metrics() { - PER_INDEX_METRICS_ENABLED.store(false, Ordering::Relaxed); -} - -pub fn index_label<'a>(index_name: &'a str) -> &'a str { - if PER_INDEX_METRICS_ENABLED.load(Ordering::Relaxed) { +/// This function return `index_name` or projects it to `` if per-index metrics are disabled. +pub fn index_label(index_name: &str) -> &str { + static PER_INDEX_METRICS_ENABLED: OnceLock = OnceLock::new(); + let per_index_metrics_enabled: bool = *PER_INDEX_METRICS_ENABLED + .get_or_init(|| crate::get_from_env("QW_PER_INDEX_METRICS_ENABLED", true)); + if per_index_metrics_enabled { index_name } else { - "" + "__any__" } } + +pub static MEMORY_METRICS: Lazy = Lazy::new(MemoryMetrics::default); diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 65395f63cac..8eb75e2ccf2 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -435,9 +435,11 @@ impl ShardTable { } else { 0 }; + let index_label = + quickwit_common::metrics::index_label(source_uid.index_uid.index_id.as_str()); crate::metrics::CONTROL_PLANE_METRICS .open_shards_total - .with_label_values([source_uid.index_uid.index_id.as_str()]) + .with_label_values([index_label]) .set(num_open_shards as i64); } diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 31d61482641..e2131003456 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -25,6 +25,7 @@ use anyhow::{bail, Context}; use async_trait::async_trait; use bytes::Bytes; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::metrics::{IntCounter, IntCounterVec}; use quickwit_common::rate_limited_tracing::rate_limited_warn; use quickwit_common::runtimes::RuntimeType; use quickwit_config::{SourceInputFormat, TransformConfig}; @@ -44,7 +45,6 @@ use tokio::runtime::Handle; #[cfg(feature = "vrl")] use super::vrl_processing::*; use crate::actors::Indexer; -use crate::metrics::DocProcessorMetrics; use crate::models::{ NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch, }; @@ -272,6 +272,30 @@ impl From> for JsonDocIterator { } } +#[derive(Debug)] +struct DocProcessorMetrics { + pub valid: IntCounter, + pub doc_mapper_error: IntCounter, + pub json_parse_error: IntCounter, + pub otlp_parse_error: IntCounter, + #[cfg(feature = "vrl")] + pub transform_error: IntCounter, +} + +impl DocProcessorMetrics { + fn from_counters(index_id: &str, counter_vec: &IntCounterVec<2>) -> DocProcessorMetrics { + let index_label = quickwit_common::metrics::index_label(index_id); + DocProcessorMetrics { + valid: counter_vec.with_label_values([index_label, "valid"]), + doc_mapper_error: counter_vec.with_label_values([index_label, "doc_mapper_error"]), + json_parse_error: counter_vec.with_label_values([index_label, "json_parse_error"]), + otlp_parse_error: counter_vec.with_label_values([index_label, "otlp_parse_error"]), + #[cfg(feature = "vrl")] + transform_error: counter_vec.with_label_values([index_label, "transform_error"]), + } + } +} + #[derive(Debug, Serialize)] pub struct DocProcessorCounters { index_id: IndexId, @@ -294,9 +318,9 @@ pub struct DocProcessorCounters { pub num_bytes_total: AtomicU64, #[serde(skip_serializing)] - pub doc_processor_metrics_bytes: DocProcessorMetrics, + doc_processor_metrics_bytes: DocProcessorMetrics, #[serde(skip_serializing)] - pub doc_processor_metrics_docs: DocProcessorMetrics, + doc_processor_metrics_docs: DocProcessorMetrics, } impl DocProcessorCounters { diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index d8cb0ff7f78..583a103f966 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -19,31 +19,9 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - new_counter_vec, new_gauge, new_gauge_vec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, + new_counter_vec, new_gauge, new_gauge_vec, IntCounterVec, IntGauge, IntGaugeVec, }; -#[derive(Debug)] -pub struct DocProcessorMetrics { - pub valid: IntCounter, - pub doc_mapper_error: IntCounter, - pub json_parse_error: IntCounter, - pub otlp_parse_error: IntCounter, - pub transform_error: IntCounter, -} - -impl DocProcessorMetrics { - pub fn from_counters(index_id: &str, counter_vec: &IntCounterVec<2>) -> DocProcessorMetrics { - let index_label = quickwit_common::metrics::index_label(index_id); - DocProcessorMetrics { - valid: counter_vec.with_label_values([index_label, "valid"]), - doc_mapper_error: counter_vec.with_label_values([index_label, "doc_mapper_error"]), - json_parse_error: counter_vec.with_label_values([index_label, "json_parse_error"]), - otlp_parse_error: counter_vec.with_label_values([index_label, "otlp_parse_error"]), - transform_error: counter_vec.with_label_values([index_label, "transform_error"]), - } - } -} - pub struct IndexerMetrics { pub processed_docs_total: IntCounterVec<2>, pub processed_bytes: IntCounterVec<2>, diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs index 659438c2b7d..a8adce51b0b 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs @@ -209,9 +209,11 @@ impl DeleteTaskPlanner { self.merge_split_downloader_mailbox.clone(), ) .await?; + let index_label = + quickwit_common::metrics::index_label(self.index_uid.index_id.as_str()); JANITOR_METRICS .ongoing_num_delete_operations_total - .with_label_values([&self.index_uid.index_id]) + .with_label_values([index_label]) .set(self.ongoing_delete_operations_inventory.list().len() as i64); } }