Skip to content

Commit

Permalink
Make it possible to disable per-index metrics with an environment
Browse files Browse the repository at this point in the history
variable.

By default they are enabled.
If the `PER_INDEX_METRICS_ENABLED` environment variable is set to false,
then all of the index metrics are grouped under the index_id `__any__`.

In addition, this PR refactors a little bit the way we handled the
docprocessor metrics. We now cache the docprocessor counters, hence
preventing 1 hash lookup per document.
  • Loading branch information
fulmicoton committed Jun 14, 2024
1 parent e57933c commit 670e7cd
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 40 deletions.
21 changes: 9 additions & 12 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::OnceLock;

use once_cell::sync::Lazy;
Expand Down Expand Up @@ -450,18 +449,16 @@ impl InFlightDataGauges {
}
}

pub static MEMORY_METRICS: Lazy<MemoryMetrics> = Lazy::new(MemoryMetrics::default);

static PER_INDEX_METRICS_ENABLED: AtomicBool = AtomicBool::new(true);

pub fn disable_per_index_metrics() {
PER_INDEX_METRICS_ENABLED.store(false, Ordering::Relaxed);
}

pub fn index_label<'a>(index_name: &'a str) -> &'a str {
if PER_INDEX_METRICS_ENABLED.load(Ordering::Relaxed) {
/// This function return `index_name` or projects it to `<any>` if per-index metrics are disabled.
pub fn index_label(index_name: &str) -> &str {
static PER_INDEX_METRICS_ENABLED: OnceLock<bool> = OnceLock::new();
let per_index_metrics_enabled: bool = *PER_INDEX_METRICS_ENABLED
.get_or_init(|| crate::get_from_env("QW_PER_INDEX_METRICS_ENABLED", true));
if per_index_metrics_enabled {
index_name
} else {
"<any>"
"__any__"
}
}

pub static MEMORY_METRICS: Lazy<MemoryMetrics> = Lazy::new(MemoryMetrics::default);
4 changes: 3 additions & 1 deletion quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,9 +435,11 @@ impl ShardTable {
} else {
0
};
let index_label =
quickwit_common::metrics::index_label(source_uid.index_uid.index_id.as_str());
crate::metrics::CONTROL_PLANE_METRICS
.open_shards_total
.with_label_values([source_uid.index_uid.index_id.as_str()])
.with_label_values([index_label])
.set(num_open_shards as i64);
}

Expand Down
30 changes: 27 additions & 3 deletions quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use anyhow::{bail, Context};
use async_trait::async_trait;
use bytes::Bytes;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_common::metrics::{IntCounter, IntCounterVec};
use quickwit_common::rate_limited_tracing::rate_limited_warn;
use quickwit_common::runtimes::RuntimeType;
use quickwit_config::{SourceInputFormat, TransformConfig};
Expand All @@ -44,7 +45,6 @@ use tokio::runtime::Handle;
#[cfg(feature = "vrl")]
use super::vrl_processing::*;
use crate::actors::Indexer;
use crate::metrics::DocProcessorMetrics;
use crate::models::{
NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch,
};
Expand Down Expand Up @@ -272,6 +272,30 @@ impl From<Result<JsonSpanIterator, OtlpTracesError>> for JsonDocIterator {
}
}

#[derive(Debug)]
struct DocProcessorMetrics {
pub valid: IntCounter,
pub doc_mapper_error: IntCounter,
pub json_parse_error: IntCounter,
pub otlp_parse_error: IntCounter,
#[cfg(feature = "vrl")]
pub transform_error: IntCounter,
}

impl DocProcessorMetrics {
fn from_counters(index_id: &str, counter_vec: &IntCounterVec<2>) -> DocProcessorMetrics {
let index_label = quickwit_common::metrics::index_label(index_id);
DocProcessorMetrics {
valid: counter_vec.with_label_values([index_label, "valid"]),
doc_mapper_error: counter_vec.with_label_values([index_label, "doc_mapper_error"]),
json_parse_error: counter_vec.with_label_values([index_label, "json_parse_error"]),
otlp_parse_error: counter_vec.with_label_values([index_label, "otlp_parse_error"]),
#[cfg(feature = "vrl")]
transform_error: counter_vec.with_label_values([index_label, "transform_error"]),
}
}
}

#[derive(Debug, Serialize)]
pub struct DocProcessorCounters {
index_id: IndexId,
Expand All @@ -294,9 +318,9 @@ pub struct DocProcessorCounters {
pub num_bytes_total: AtomicU64,

#[serde(skip_serializing)]
pub doc_processor_metrics_bytes: DocProcessorMetrics,
doc_processor_metrics_bytes: DocProcessorMetrics,
#[serde(skip_serializing)]
pub doc_processor_metrics_docs: DocProcessorMetrics,
doc_processor_metrics_docs: DocProcessorMetrics,
}

impl DocProcessorCounters {
Expand Down
24 changes: 1 addition & 23 deletions quickwit/quickwit-indexing/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,9 @@

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
new_counter_vec, new_gauge, new_gauge_vec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
new_counter_vec, new_gauge, new_gauge_vec, IntCounterVec, IntGauge, IntGaugeVec,
};

#[derive(Debug)]
pub struct DocProcessorMetrics {
pub valid: IntCounter,
pub doc_mapper_error: IntCounter,
pub json_parse_error: IntCounter,
pub otlp_parse_error: IntCounter,
pub transform_error: IntCounter,
}

impl DocProcessorMetrics {
pub fn from_counters(index_id: &str, counter_vec: &IntCounterVec<2>) -> DocProcessorMetrics {
let index_label = quickwit_common::metrics::index_label(index_id);
DocProcessorMetrics {
valid: counter_vec.with_label_values([index_label, "valid"]),
doc_mapper_error: counter_vec.with_label_values([index_label, "doc_mapper_error"]),
json_parse_error: counter_vec.with_label_values([index_label, "json_parse_error"]),
otlp_parse_error: counter_vec.with_label_values([index_label, "otlp_parse_error"]),
transform_error: counter_vec.with_label_values([index_label, "transform_error"]),
}
}
}

pub struct IndexerMetrics {
pub processed_docs_total: IntCounterVec<2>,
pub processed_bytes: IntCounterVec<2>,
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-janitor/src/actors/delete_task_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ impl DeleteTaskPlanner {
self.merge_split_downloader_mailbox.clone(),
)
.await?;
let index_label =
quickwit_common::metrics::index_label(self.index_uid.index_id.as_str());
JANITOR_METRICS
.ongoing_num_delete_operations_total
.with_label_values([&self.index_uid.index_id])
.with_label_values([index_label])
.set(self.ongoing_delete_operations_inventory.list().len() as i64);
}
}
Expand Down

0 comments on commit 670e7cd

Please sign in to comment.