Skip to content

Commit

Permalink
Make it possible to disable per-index metrics with an environment
Browse files Browse the repository at this point in the history
variable.

By default they are enabled.
If the `PER_INDEX_METRICS_ENABLED` environment variable is set to false,
then all of the index metrics are grouped under the index_id `__any__`.

In addition, this PR refactors a little bit the way we handled the
docprocessor metrics. We now cache the docprocessor counters, hence
preventing 1 hash lookup per document.
  • Loading branch information
fulmicoton committed Jun 14, 2024
1 parent c3f9eed commit 1e0e2fa
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 111 deletions.
21 changes: 9 additions & 12 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::OnceLock;

use once_cell::sync::Lazy;
Expand Down Expand Up @@ -450,18 +449,16 @@ impl InFlightDataGauges {
}
}

pub static MEMORY_METRICS: Lazy<MemoryMetrics> = Lazy::new(MemoryMetrics::default);

static PER_INDEX_METRICS_ENABLED: AtomicBool = AtomicBool::new(true);

pub fn disable_per_index_metrics() {
PER_INDEX_METRICS_ENABLED.store(false, Ordering::Relaxed);
}

pub fn index_label<'a>(index_name: &'a str) -> &'a str {
if PER_INDEX_METRICS_ENABLED.load(Ordering::Relaxed) {
/// This function return `index_name` or projects it to `<any>` if per-index metrics are disabled.
pub fn index_label(index_name: &str) -> &str {
static PER_INDEX_METRICS_ENABLED: OnceLock<bool> = OnceLock::new();
let per_index_metrics_enabled: bool = *PER_INDEX_METRICS_ENABLED
.get_or_init(|| crate::get_from_env("QW_PER_INDEX_METRICS_ENABLED", true));
if per_index_metrics_enabled {
index_name
} else {
"<any>"
"__any__"
}
}

pub static MEMORY_METRICS: Lazy<MemoryMetrics> = Lazy::new(MemoryMetrics::default);
4 changes: 3 additions & 1 deletion quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,9 +435,11 @@ impl ShardTable {
} else {
0
};
let index_label =
quickwit_common::metrics::index_label(source_uid.index_uid.index_id.as_str());
crate::metrics::CONTROL_PLANE_METRICS
.open_shards_total
.with_label_values([source_uid.index_uid.index_id.as_str()])
.with_label_values([index_label])
.set(num_open_shards as i64);
}

Expand Down
174 changes: 100 additions & 74 deletions quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use anyhow::{bail, Context};
use async_trait::async_trait;
use bytes::Bytes;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_common::metrics::IntCounter;
use quickwit_common::rate_limited_tracing::rate_limited_warn;
use quickwit_common::runtimes::RuntimeType;
use quickwit_config::{SourceInputFormat, TransformConfig};
Expand All @@ -44,7 +45,6 @@ use tokio::runtime::Handle;
#[cfg(feature = "vrl")]
use super::vrl_processing::*;
use crate::actors::Indexer;
use crate::metrics::DocProcessorMetrics;
use crate::models::{
NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch,
};
Expand Down Expand Up @@ -272,113 +272,137 @@ impl From<Result<JsonSpanIterator, OtlpTracesError>> for JsonDocIterator {
}
}

#[derive(Debug)]
pub struct DocProcessorCounter {
pub num_docs: AtomicU64,
pub num_docs_metric: IntCounter,
pub num_bytes_metric: IntCounter,
}

impl Serialize for DocProcessorCounter {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer {
serializer.serialize_u64(self.get_num_docs())
}
}

impl DocProcessorCounter {
fn for_index_and_doc_processor_outcome(index: &str, outcome: &str) -> DocProcessorCounter {
let index_label = quickwit_common::metrics::index_label(index);
let labels = [index_label, outcome];
DocProcessorCounter {
num_docs: Default::default(),
num_docs_metric: crate::metrics::INDEXER_METRICS
.processed_docs_total
.with_label_values(labels),
num_bytes_metric: crate::metrics::INDEXER_METRICS
.processed_bytes
.with_label_values(labels),
}
}

#[inline(always)]
fn get_num_docs(&self) -> u64 {
self.num_docs.load(Ordering::Relaxed)
}

fn record_doc(&self, num_bytes: u64) {
self.num_docs.fetch_add(1, Ordering::Relaxed);
self.num_docs_metric.inc();
self.num_bytes_metric.inc_by(num_bytes);
}
}

#[derive(Debug, Serialize)]
pub struct DocProcessorCounters {
index_id: IndexId,
source_id: SourceId,

/// Overall number of documents received, partitioned
/// into 4 categories:
/// into 5 categories:
/// - valid documents
/// - number of docs that could not be parsed.
/// - number of docs that were not valid json.
/// - number of docs that could not be transformed.
/// - number of docs for which the doc mapper returnd an error.
/// - number of valid docs.
pub num_doc_parse_errors: AtomicU64,
pub num_transform_errors: AtomicU64,
pub num_oltp_parse_errors: AtomicU64,
pub num_valid_docs: AtomicU64,
pub valid: DocProcessorCounter,
pub doc_mapper_errors: DocProcessorCounter,
pub transform_errors: DocProcessorCounter,
pub json_parse_errors: DocProcessorCounter,
pub otlp_parse_errors: DocProcessorCounter,

/// Number of bytes that went through the indexer
/// during its entire lifetime.
///
/// Includes both valid and invalid documents.
pub num_bytes_total: AtomicU64,

#[serde(skip_serializing)]
pub doc_processor_metrics_bytes: DocProcessorMetrics,
#[serde(skip_serializing)]
pub doc_processor_metrics_docs: DocProcessorMetrics,
}

impl DocProcessorCounters {
pub fn new(index_id: IndexId, source_id: SourceId) -> Self {
let doc_processor_metrics_bytes = DocProcessorMetrics::from_counters(
index_id.as_str(),
&crate::metrics::INDEXER_METRICS.processed_bytes,
);
let doc_processor_metrics_docs = DocProcessorMetrics::from_counters(
index_id.as_str(),
&crate::metrics::INDEXER_METRICS.processed_docs_total,
);
Self {
let valid_docs =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "valid");
let doc_mapper_errors =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "doc_mapper_error");
let transform_errors =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "transform_error");
let json_parse_errors =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "json_parse_error");
let otlp_parse_errors =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "otlp_parse_error");
DocProcessorCounters {
index_id,
source_id,
num_doc_parse_errors: Default::default(),
num_transform_errors: Default::default(),
num_oltp_parse_errors: Default::default(),
num_valid_docs: Default::default(),
num_bytes_total: Default::default(),

doc_processor_metrics_bytes,
doc_processor_metrics_docs,
valid: valid_docs,
doc_mapper_errors,
transform_errors,
json_parse_errors,
otlp_parse_errors,
num_bytes_total: Default::default(),
}
}

/// Returns the overall number of docs that went through the indexer (valid or not).
pub fn num_processed_docs(&self) -> u64 {
self.num_valid_docs.load(Ordering::Relaxed)
+ self.num_doc_parse_errors.load(Ordering::Relaxed)
+ self.num_oltp_parse_errors.load(Ordering::Relaxed)
+ self.num_transform_errors.load(Ordering::Relaxed)
self.valid.get_num_docs()
+ self.doc_mapper_errors.get_num_docs()
+ self.json_parse_errors.get_num_docs()
+ self.otlp_parse_errors.get_num_docs()
+ self.transform_errors.get_num_docs()
}

/// Returns the overall number of docs that were sent to the indexer but were invalid.
/// (For instance, because they were missing a required field or because their because
/// their format was invalid)
pub fn num_invalid_docs(&self) -> u64 {
self.num_doc_parse_errors.load(Ordering::Relaxed)
+ self.num_oltp_parse_errors.load(Ordering::Relaxed)
+ self.num_transform_errors.load(Ordering::Relaxed)
self.doc_mapper_errors.get_num_docs()
+ self.json_parse_errors.get_num_docs()
+ self.otlp_parse_errors.get_num_docs()
+ self.transform_errors.get_num_docs()
}

pub fn record_valid(&self, num_bytes: u64) {
self.num_valid_docs.fetch_add(1, Ordering::Relaxed);
self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed);

self.doc_processor_metrics_docs.valid.inc();
self.doc_processor_metrics_bytes.valid.inc_by(num_bytes);
self.valid.record_doc(num_bytes);
}

pub fn record_error(&self, error: DocProcessorError, num_bytes: u64) {
self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed);
match error {
DocProcessorError::DocMapperParsing(_) => {
self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed);
self.doc_processor_metrics_bytes
.doc_mapper_error
.inc_by(num_bytes);
self.doc_processor_metrics_docs.doc_mapper_error.inc();
self.doc_mapper_errors.record_doc(num_bytes);
}
DocProcessorError::JsonParsing(_) => {
self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed);
self.doc_processor_metrics_bytes
.json_parse_error
.inc_by(num_bytes);
self.doc_processor_metrics_docs.json_parse_error.inc();
self.json_parse_errors.record_doc(num_bytes);
}
DocProcessorError::OltpLogsParsing(_) | DocProcessorError::OltpTracesParsing(_) => {
self.num_oltp_parse_errors.fetch_add(1, Ordering::Relaxed);
self.doc_processor_metrics_bytes
.otlp_parse_error
.inc_by(num_bytes);
self.doc_processor_metrics_docs.otlp_parse_error.inc();
self.otlp_parse_errors.record_doc(num_bytes);
}
#[cfg(feature = "vrl")]
DocProcessorError::Transform(_) => {
self.num_transform_errors.fetch_add(1, Ordering::Relaxed);
self.doc_processor_metrics_bytes
.transform_error
.inc_by(num_bytes);
self.doc_processor_metrics_docs.transform_error.inc();
self.transform_errors.record_doc(num_bytes);
}
};
}
Expand Down Expand Up @@ -662,10 +686,11 @@ mod tests {
.state;
assert_eq!(counters.index_id, index_id);
assert_eq!(counters.source_id, source_id);
assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 2);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.doc_mapper_errors.get_num_docs(), 1);
assert_eq!(counters.json_parse_errors.get_num_docs(), 1);
assert_eq!(counters.transform_errors.get_num_docs(), 0);
assert_eq!(counters.otlp_parse_errors.get_num_docs(), 0);
assert_eq!(counters.valid.get_num_docs(), 2);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 387);

let output_messages = indexer_inbox.drain_for_test();
Expand Down Expand Up @@ -902,7 +927,7 @@ mod tests {
.process_pending_and_observe()
.await
.state;
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.valid.get_num_docs(), 2);

let batch = indexer_inbox.drain_for_test_typed::<ProcessedDocBatch>();
assert_eq!(batch.len(), 1);
Expand Down Expand Up @@ -981,7 +1006,7 @@ mod tests {
.process_pending_and_observe()
.await
.state;
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.valid.get_num_docs(), 2);

let batch = indexer_inbox.drain_for_test_typed::<ProcessedDocBatch>();
assert_eq!(batch.len(), 1);
Expand Down Expand Up @@ -1054,7 +1079,7 @@ mod tests {
.process_pending_and_observe()
.await
.state;
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.valid.get_num_docs(), 2);

let batch = indexer_inbox.drain_for_test_typed::<ProcessedDocBatch>();
assert_eq!(batch.len(), 1);
Expand Down Expand Up @@ -1129,7 +1154,7 @@ mod tests {
.process_pending_and_observe()
.await
.state;
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.valid.get_num_docs(), 2);

let batch = indexer_inbox.drain_for_test_typed::<ProcessedDocBatch>();
assert_eq!(batch.len(), 1);
Expand Down Expand Up @@ -1188,10 +1213,11 @@ mod tests_vrl {
.state;
assert_eq!(counters.index_id, index_id.to_string());
assert_eq!(counters.source_id, source_id.to_string());
assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 2);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.doc_mapper_errors.get_num_docs(), 1);
assert_eq!(counters.json_parse_errors.get_num_docs(), 1);
assert_eq!(counters.transform_errors.get_num_docs(), 0);
assert_eq!(counters.otlp_parse_errors.get_num_docs(), 0);
assert_eq!(counters.valid.get_num_docs(), 2);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 397);

let output_messages = indexer_inbox.drain_for_test();
Expand Down Expand Up @@ -1278,10 +1304,10 @@ mod tests_vrl {
.state;
assert_eq!(counters.index_id, index_id);
assert_eq!(counters.source_id, source_id);
assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 1,);
assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2,);
assert_eq!(counters.doc_mapper_errors.get_num_docs(), 0,);
assert_eq!(counters.transform_errors.get_num_docs(), 1,);
assert_eq!(counters.otlp_parse_errors.get_num_docs(), 0,);
assert_eq!(counters.valid.get_num_docs(), 2,);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 200,);

let output_messages = indexer_inbox.drain_for_test();
Expand Down
24 changes: 1 addition & 23 deletions quickwit/quickwit-indexing/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,9 @@

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
new_counter_vec, new_gauge, new_gauge_vec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
new_counter_vec, new_gauge, new_gauge_vec, IntCounterVec, IntGauge, IntGaugeVec,
};

#[derive(Debug)]
pub struct DocProcessorMetrics {
pub valid: IntCounter,
pub doc_mapper_error: IntCounter,
pub json_parse_error: IntCounter,
pub otlp_parse_error: IntCounter,
pub transform_error: IntCounter,
}

impl DocProcessorMetrics {
pub fn from_counters(index_id: &str, counter_vec: &IntCounterVec<2>) -> DocProcessorMetrics {
let index_label = quickwit_common::metrics::index_label(index_id);
DocProcessorMetrics {
valid: counter_vec.with_label_values([index_label, "valid"]),
doc_mapper_error: counter_vec.with_label_values([index_label, "doc_mapper_error"]),
json_parse_error: counter_vec.with_label_values([index_label, "json_parse_error"]),
otlp_parse_error: counter_vec.with_label_values([index_label, "otlp_parse_error"]),
transform_error: counter_vec.with_label_values([index_label, "transform_error"]),
}
}
}

pub struct IndexerMetrics {
pub processed_docs_total: IntCounterVec<2>,
pub processed_bytes: IntCounterVec<2>,
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-janitor/src/actors/delete_task_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ impl DeleteTaskPlanner {
self.merge_split_downloader_mailbox.clone(),
)
.await?;
let index_label =
quickwit_common::metrics::index_label(self.index_uid.index_id.as_str());
JANITOR_METRICS
.ongoing_num_delete_operations_total
.with_label_values([&self.index_uid.index_id])
.with_label_values([index_label])
.set(self.ongoing_delete_operations_inventory.list().len() as i64);
}
}
Expand Down

0 comments on commit 1e0e2fa

Please sign in to comment.