Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5117 disable per index metrics #5125

Merged
merged 4 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,4 +449,16 @@ impl InFlightDataGauges {
}
}

/// This function returns `index_name` or projects it to `<any>` if per-index metrics are disabled.
pub fn index_label(index_name: &str) -> &str {
static PER_INDEX_METRICS_ENABLED: OnceLock<bool> = OnceLock::new();
let per_index_metrics_enabled: bool = *PER_INDEX_METRICS_ENABLED
.get_or_init(|| !crate::get_bool_from_env("QW_DISABLE_PER_INDEX_METRICS", false));
if per_index_metrics_enabled {
index_name
} else {
"__any__"
}
}

pub static MEMORY_METRICS: Lazy<MemoryMetrics> = Lazy::new(MemoryMetrics::default);
4 changes: 3 additions & 1 deletion quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,9 +435,11 @@ impl ShardTable {
} else {
0
};
let index_label =
quickwit_common::metrics::index_label(source_uid.index_uid.index_id.as_str());
crate::metrics::CONTROL_PLANE_METRICS
.open_shards_total
.with_label_values([source_uid.index_uid.index_id.as_str()])
.with_label_values([index_label])
.set(num_open_shards as i64);
}

Expand Down
170 changes: 104 additions & 66 deletions quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use anyhow::{bail, Context};
use async_trait::async_trait;
use bytes::Bytes;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_common::metrics::IntCounter;
use quickwit_common::rate_limited_tracing::rate_limited_warn;
use quickwit_common::runtimes::RuntimeType;
use quickwit_config::{SourceInputFormat, TransformConfig};
Expand Down Expand Up @@ -271,20 +272,65 @@ impl From<Result<JsonSpanIterator, OtlpTracesError>> for JsonDocIterator {
}
}

#[derive(Debug)]
pub struct DocProcessorCounter {
pub num_docs: AtomicU64,
pub num_docs_metric: IntCounter,
pub num_bytes_metric: IntCounter,
}

impl Serialize for DocProcessorCounter {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer {
serializer.serialize_u64(self.get_num_docs())
}
}

impl DocProcessorCounter {
fn for_index_and_doc_processor_outcome(index: &str, outcome: &str) -> DocProcessorCounter {
let index_label = quickwit_common::metrics::index_label(index);
let labels = [index_label, outcome];
DocProcessorCounter {
num_docs: Default::default(),
num_docs_metric: crate::metrics::INDEXER_METRICS
.processed_docs_total
.with_label_values(labels),
num_bytes_metric: crate::metrics::INDEXER_METRICS
.processed_bytes
.with_label_values(labels),
}
}

#[inline(always)]
fn get_num_docs(&self) -> u64 {
self.num_docs.load(Ordering::Relaxed)
}

fn record_doc(&self, num_bytes: u64) {
self.num_docs.fetch_add(1, Ordering::Relaxed);
self.num_docs_metric.inc();
self.num_bytes_metric.inc_by(num_bytes);
}
}

#[derive(Debug, Serialize)]
pub struct DocProcessorCounters {
index_id: IndexId,
source_id: SourceId,

/// Overall number of documents received, partitioned
/// into 4 categories:
/// into 5 categories:
/// - valid documents
/// - number of docs that could not be parsed.
/// - number of docs that were not valid json.
/// - number of docs that could not be transformed.
/// - number of docs for which the doc mapper returnd an error.
/// - number of valid docs.
pub num_doc_parse_errors: AtomicU64,
pub num_transform_errors: AtomicU64,
pub num_oltp_parse_errors: AtomicU64,
pub num_valid_docs: AtomicU64,
pub valid: DocProcessorCounter,
pub doc_mapper_errors: DocProcessorCounter,
pub transform_errors: DocProcessorCounter,
pub json_parse_errors: DocProcessorCounter,
pub otlp_parse_errors: DocProcessorCounter,

/// Number of bytes that went through the indexer
/// during its entire lifetime.
Expand All @@ -295,79 +341,70 @@ pub struct DocProcessorCounters {

impl DocProcessorCounters {
pub fn new(index_id: IndexId, source_id: SourceId) -> Self {
Self {
let valid_docs =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "valid");
let doc_mapper_errors =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "doc_mapper_error");
let transform_errors =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "transform_error");
let json_parse_errors =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "json_parse_error");
let otlp_parse_errors =
DocProcessorCounter::for_index_and_doc_processor_outcome(&index_id, "otlp_parse_error");
DocProcessorCounters {
index_id,
source_id,
num_doc_parse_errors: Default::default(),
num_transform_errors: Default::default(),
num_oltp_parse_errors: Default::default(),
num_valid_docs: Default::default(),

valid: valid_docs,
doc_mapper_errors,
transform_errors,
json_parse_errors,
otlp_parse_errors,
num_bytes_total: Default::default(),
}
}

/// Returns the overall number of docs that went through the indexer (valid or not).
pub fn num_processed_docs(&self) -> u64 {
self.num_valid_docs.load(Ordering::Relaxed)
+ self.num_doc_parse_errors.load(Ordering::Relaxed)
+ self.num_oltp_parse_errors.load(Ordering::Relaxed)
+ self.num_transform_errors.load(Ordering::Relaxed)
self.valid.get_num_docs()
+ self.doc_mapper_errors.get_num_docs()
+ self.json_parse_errors.get_num_docs()
+ self.otlp_parse_errors.get_num_docs()
+ self.transform_errors.get_num_docs()
}

/// Returns the overall number of docs that were sent to the indexer but were invalid.
/// (For instance, because they were missing a required field or because their because
/// their format was invalid)
pub fn num_invalid_docs(&self) -> u64 {
self.num_doc_parse_errors.load(Ordering::Relaxed)
+ self.num_oltp_parse_errors.load(Ordering::Relaxed)
+ self.num_transform_errors.load(Ordering::Relaxed)
self.doc_mapper_errors.get_num_docs()
+ self.json_parse_errors.get_num_docs()
+ self.otlp_parse_errors.get_num_docs()
+ self.transform_errors.get_num_docs()
}

pub fn record_valid(&self, num_bytes: u64) {
self.num_valid_docs.fetch_add(1, Ordering::Relaxed);
self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed);

crate::metrics::INDEXER_METRICS
.processed_docs_total
.with_label_values([&self.index_id, "valid"])
.inc();
crate::metrics::INDEXER_METRICS
.processed_bytes
.with_label_values([&self.index_id, "valid"])
.inc_by(num_bytes);
self.valid.record_doc(num_bytes);
}

pub fn record_error(&self, error: DocProcessorError, num_bytes: u64) {
let label = match error {
self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed);
match error {
DocProcessorError::DocMapperParsing(_) => {
self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed);
"doc_mapper_error"
self.doc_mapper_errors.record_doc(num_bytes);
}
DocProcessorError::JsonParsing(_) => {
self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed);
"json_parse_error"
self.json_parse_errors.record_doc(num_bytes);
}
DocProcessorError::OltpLogsParsing(_) | DocProcessorError::OltpTracesParsing(_) => {
self.num_oltp_parse_errors.fetch_add(1, Ordering::Relaxed);
"otlp_parse_error"
self.otlp_parse_errors.record_doc(num_bytes);
}
#[cfg(feature = "vrl")]
DocProcessorError::Transform(_) => {
self.num_transform_errors.fetch_add(1, Ordering::Relaxed);
"transform_error"
self.transform_errors.record_doc(num_bytes);
}
};
crate::metrics::INDEXER_METRICS
.processed_docs_total
.with_label_values([&self.index_id, label])
.inc();

self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed);

crate::metrics::INDEXER_METRICS
.processed_bytes
.with_label_values([&self.index_id, label])
.inc_by(num_bytes);
}
}

Expand Down Expand Up @@ -395,7 +432,7 @@ impl DocProcessor {
if cfg!(not(feature = "vrl")) && transform_config_opt.is_some() {
bail!("VRL is not enabled: please recompile with the `vrl` feature")
}
let doc_processor = Self {
Ok(DocProcessor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to edit the contribution guidelines and prohibit Self? :D

doc_mapper,
indexer_mailbox,
timestamp_field_opt,
Expand All @@ -406,8 +443,7 @@ impl DocProcessor {
.map(VrlProgram::try_from_transform_config)
.transpose()?,
input_format,
};
Ok(doc_processor)
})
}

// Extract a timestamp from a tantivy document.
Expand Down Expand Up @@ -650,10 +686,11 @@ mod tests {
.state;
assert_eq!(counters.index_id, index_id);
assert_eq!(counters.source_id, source_id);
assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 2);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.doc_mapper_errors.get_num_docs(), 1);
assert_eq!(counters.json_parse_errors.get_num_docs(), 1);
assert_eq!(counters.transform_errors.get_num_docs(), 0);
assert_eq!(counters.otlp_parse_errors.get_num_docs(), 0);
assert_eq!(counters.valid.get_num_docs(), 2);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 387);

let output_messages = indexer_inbox.drain_for_test();
Expand Down Expand Up @@ -890,7 +927,7 @@ mod tests {
.process_pending_and_observe()
.await
.state;
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.valid.get_num_docs(), 2);

let batch = indexer_inbox.drain_for_test_typed::<ProcessedDocBatch>();
assert_eq!(batch.len(), 1);
Expand Down Expand Up @@ -969,7 +1006,7 @@ mod tests {
.process_pending_and_observe()
.await
.state;
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.valid.get_num_docs(), 2);

let batch = indexer_inbox.drain_for_test_typed::<ProcessedDocBatch>();
assert_eq!(batch.len(), 1);
Expand Down Expand Up @@ -1042,7 +1079,7 @@ mod tests {
.process_pending_and_observe()
.await
.state;
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.valid.get_num_docs(), 2);

let batch = indexer_inbox.drain_for_test_typed::<ProcessedDocBatch>();
assert_eq!(batch.len(), 1);
Expand Down Expand Up @@ -1117,7 +1154,7 @@ mod tests {
.process_pending_and_observe()
.await
.state;
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.valid.get_num_docs(), 2);

let batch = indexer_inbox.drain_for_test_typed::<ProcessedDocBatch>();
assert_eq!(batch.len(), 1);
Expand Down Expand Up @@ -1176,10 +1213,11 @@ mod tests_vrl {
.state;
assert_eq!(counters.index_id, index_id.to_string());
assert_eq!(counters.source_id, source_id.to_string());
assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 2);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2);
assert_eq!(counters.doc_mapper_errors.get_num_docs(), 1);
assert_eq!(counters.json_parse_errors.get_num_docs(), 1);
assert_eq!(counters.transform_errors.get_num_docs(), 0);
assert_eq!(counters.otlp_parse_errors.get_num_docs(), 0);
assert_eq!(counters.valid.get_num_docs(), 2);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 397);

let output_messages = indexer_inbox.drain_for_test();
Expand Down Expand Up @@ -1266,10 +1304,10 @@ mod tests_vrl {
.state;
assert_eq!(counters.index_id, index_id);
assert_eq!(counters.source_id, source_id);
assert_eq!(counters.num_doc_parse_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_transform_errors.load(Ordering::Relaxed), 1,);
assert_eq!(counters.num_oltp_parse_errors.load(Ordering::Relaxed), 0,);
assert_eq!(counters.num_valid_docs.load(Ordering::Relaxed), 2,);
assert_eq!(counters.doc_mapper_errors.get_num_docs(), 0,);
assert_eq!(counters.transform_errors.get_num_docs(), 1,);
assert_eq!(counters.otlp_parse_errors.get_num_docs(), 0,);
assert_eq!(counters.valid.get_num_docs(), 2,);
assert_eq!(counters.num_bytes_total.load(Ordering::Relaxed), 200,);

let output_messages = indexer_inbox.drain_for_test();
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-janitor/src/actors/delete_task_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ impl DeleteTaskPlanner {
self.merge_split_downloader_mailbox.clone(),
)
.await?;
let index_label =
quickwit_common::metrics::index_label(self.index_uid.index_id.as_str());
JANITOR_METRICS
.ongoing_num_delete_operations_total
.with_label_values([&self.index_uid.index_id])
.with_label_values([index_label])
.set(self.ongoing_delete_operations_inventory.list().len() as i64);
}
}
Expand Down
Loading