Skip to content

Commit

Permalink
Avoiding unnecessary hashmap lookup on every single doc
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jun 14, 2024
1 parent 8a69c73 commit c3f9eed
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 28 deletions.
15 changes: 15 additions & 0 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::OnceLock;

use once_cell::sync::Lazy;
Expand Down Expand Up @@ -450,3 +451,17 @@ impl InFlightDataGauges {
}

pub static MEMORY_METRICS: Lazy<MemoryMetrics> = Lazy::new(MemoryMetrics::default);

static PER_INDEX_METRICS_ENABLED: AtomicBool = AtomicBool::new(true);

pub fn disable_per_index_metrics() {
PER_INDEX_METRICS_ENABLED.store(false, Ordering::Relaxed);
}

pub fn index_label<'a>(index_name: &'a str) -> &'a str {
if PER_INDEX_METRICS_ENABLED.load(Ordering::Relaxed) {
index_name
} else {
"<any>"
}
}
66 changes: 39 additions & 27 deletions quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use tokio::runtime::Handle;
#[cfg(feature = "vrl")]
use super::vrl_processing::*;
use crate::actors::Indexer;
use crate::metrics::DocProcessorMetrics;
use crate::models::{
NewPublishLock, NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock, RawDocBatch,
};
Expand Down Expand Up @@ -291,10 +292,23 @@ pub struct DocProcessorCounters {
///
/// Includes both valid and invalid documents.
pub num_bytes_total: AtomicU64,

#[serde(skip_serializing)]
pub doc_processor_metrics_bytes: DocProcessorMetrics,
#[serde(skip_serializing)]
pub doc_processor_metrics_docs: DocProcessorMetrics,
}

impl DocProcessorCounters {
pub fn new(index_id: IndexId, source_id: SourceId) -> Self {
let doc_processor_metrics_bytes = DocProcessorMetrics::from_counters(
index_id.as_str(),
&crate::metrics::INDEXER_METRICS.processed_bytes,
);
let doc_processor_metrics_docs = DocProcessorMetrics::from_counters(
index_id.as_str(),
&crate::metrics::INDEXER_METRICS.processed_docs_total,
);
Self {
index_id,
source_id,
Expand All @@ -303,6 +317,9 @@ impl DocProcessorCounters {
num_oltp_parse_errors: Default::default(),
num_valid_docs: Default::default(),
num_bytes_total: Default::default(),

doc_processor_metrics_bytes,
doc_processor_metrics_docs,
}
}

Expand All @@ -327,47 +344,43 @@ impl DocProcessorCounters {
self.num_valid_docs.fetch_add(1, Ordering::Relaxed);
self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed);

crate::metrics::INDEXER_METRICS
.processed_docs_total
.with_label_values([&self.index_id, "valid"])
.inc();
crate::metrics::INDEXER_METRICS
.processed_bytes
.with_label_values([&self.index_id, "valid"])
.inc_by(num_bytes);
self.doc_processor_metrics_docs.valid.inc();
self.doc_processor_metrics_bytes.valid.inc_by(num_bytes);
}

pub fn record_error(&self, error: DocProcessorError, num_bytes: u64) {
let label = match error {
self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed);
match error {
DocProcessorError::DocMapperParsing(_) => {
self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed);
"doc_mapper_error"
self.doc_processor_metrics_bytes
.doc_mapper_error
.inc_by(num_bytes);
self.doc_processor_metrics_docs.doc_mapper_error.inc();
}
DocProcessorError::JsonParsing(_) => {
self.num_doc_parse_errors.fetch_add(1, Ordering::Relaxed);
"json_parse_error"
self.doc_processor_metrics_bytes
.json_parse_error
.inc_by(num_bytes);
self.doc_processor_metrics_docs.json_parse_error.inc();
}
DocProcessorError::OltpLogsParsing(_) | DocProcessorError::OltpTracesParsing(_) => {
self.num_oltp_parse_errors.fetch_add(1, Ordering::Relaxed);
"otlp_parse_error"
self.doc_processor_metrics_bytes
.otlp_parse_error
.inc_by(num_bytes);
self.doc_processor_metrics_docs.otlp_parse_error.inc();
}
#[cfg(feature = "vrl")]
DocProcessorError::Transform(_) => {
self.num_transform_errors.fetch_add(1, Ordering::Relaxed);
"transform_error"
self.doc_processor_metrics_bytes
.transform_error
.inc_by(num_bytes);
self.doc_processor_metrics_docs.transform_error.inc();
}
};
crate::metrics::INDEXER_METRICS
.processed_docs_total
.with_label_values([&self.index_id, label])
.inc();

self.num_bytes_total.fetch_add(num_bytes, Ordering::Relaxed);

crate::metrics::INDEXER_METRICS
.processed_bytes
.with_label_values([&self.index_id, label])
.inc_by(num_bytes);
}
}

Expand Down Expand Up @@ -395,7 +408,7 @@ impl DocProcessor {
if cfg!(not(feature = "vrl")) && transform_config_opt.is_some() {
bail!("VRL is not enabled: please recompile with the `vrl` feature")
}
let doc_processor = Self {
Ok(DocProcessor {
doc_mapper,
indexer_mailbox,
timestamp_field_opt,
Expand All @@ -406,8 +419,7 @@ impl DocProcessor {
.map(VrlProgram::try_from_transform_config)
.transpose()?,
input_format,
};
Ok(doc_processor)
})
}

// Extract a timestamp from a tantivy document.
Expand Down
24 changes: 23 additions & 1 deletion quickwit/quickwit-indexing/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,31 @@

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
new_counter_vec, new_gauge, new_gauge_vec, IntCounterVec, IntGauge, IntGaugeVec,
new_counter_vec, new_gauge, new_gauge_vec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
};

#[derive(Debug)]
pub struct DocProcessorMetrics {
pub valid: IntCounter,
pub doc_mapper_error: IntCounter,
pub json_parse_error: IntCounter,
pub otlp_parse_error: IntCounter,
pub transform_error: IntCounter,
}

impl DocProcessorMetrics {
pub fn from_counters(index_id: &str, counter_vec: &IntCounterVec<2>) -> DocProcessorMetrics {
let index_label = quickwit_common::metrics::index_label(index_id);
DocProcessorMetrics {
valid: counter_vec.with_label_values([index_label, "valid"]),
doc_mapper_error: counter_vec.with_label_values([index_label, "doc_mapper_error"]),
json_parse_error: counter_vec.with_label_values([index_label, "json_parse_error"]),
otlp_parse_error: counter_vec.with_label_values([index_label, "otlp_parse_error"]),
transform_error: counter_vec.with_label_values([index_label, "transform_error"]),
}
}
}

pub struct IndexerMetrics {
pub processed_docs_total: IntCounterVec<2>,
pub processed_bytes: IntCounterVec<2>,
Expand Down

0 comments on commit c3f9eed

Please sign in to comment.