diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 97664f70d0a..4e256dd1e6f 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -252,12 +252,13 @@ impl Default for MemoryMetrics { #[derive(Clone)] pub struct InFlightDataGauges { - pub doc_processor_mailbox: IntGauge, - pub index_writer: IntGauge, - pub indexer_mailbox: IntGauge, - pub ingest_router: IntGauge, pub rest_server: IntGauge, + pub ingest_router: IntGauge, + pub wal: IntGauge, pub sources: InFlightDataSourceGauges, + pub doc_processor_mailbox: IntGauge, + pub indexer_mailbox: IntGauge, + pub index_writer: IntGauge, } impl Default for InFlightDataGauges { @@ -270,12 +271,13 @@ impl Default for InFlightDataGauges { ["component"], ); Self { - doc_processor_mailbox: in_flight_gauge_vec.with_label_values(["doc_processor_mailbox"]), - index_writer: in_flight_gauge_vec.with_label_values(["index_writer"]), - indexer_mailbox: in_flight_gauge_vec.with_label_values(["indexer_mailbox"]), - ingest_router: in_flight_gauge_vec.with_label_values(["ingest_router"]), rest_server: in_flight_gauge_vec.with_label_values(["rest_server"]), + ingest_router: in_flight_gauge_vec.with_label_values(["ingest_router"]), + wal: in_flight_gauge_vec.with_label_values(["wal"]), sources: InFlightDataSourceGauges::new(&in_flight_gauge_vec), + doc_processor_mailbox: in_flight_gauge_vec.with_label_values(["doc_processor_mailbox"]), + indexer_mailbox: in_flight_gauge_vec.with_label_values(["indexer_mailbox"]), + index_writer: in_flight_gauge_vec.with_label_values(["index_writer"]), } } } diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index e827e4d0380..f4f4418d773 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -225,6 +225,10 @@ impl IndexerState { let publish_lock = self.publish_lock.clone(); let publish_token_opt = self.publish_token_opt.clone(); + let mut split_builders_guard = + GaugeGuard::from_gauge(&crate::metrics::INDEXER_METRICS.split_builders); + split_builders_guard.add(1); + let workbench = IndexingWorkbench { workbench_id, create_instant: Instant::now(), @@ -242,6 +246,7 @@ impl IndexerState { .in_flight_data .index_writer, ), + split_builders_guard, }; Ok(workbench) } @@ -364,6 +369,7 @@ struct IndexingWorkbench { last_delete_opstamp: u64, // Number of bytes declared as used by tantivy. memory_usage: GaugeGuard, + split_builders_guard: GaugeGuard, } pub struct Indexer { @@ -635,6 +641,7 @@ impl Indexer { batch_parent_span, indexing_permit, memory_usage, + split_builders_guard, .. }) = self.indexing_workbench_opt.take() else { @@ -687,6 +694,7 @@ impl Indexer { commit_trigger, batch_parent_span, memory_usage, + _split_builders_guard: split_builders_guard, }, ) .await?; diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index 56eb300f66d..583a103f966 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -27,6 +27,7 @@ pub struct IndexerMetrics { pub processed_bytes: IntCounterVec<2>, pub backpressure_micros: IntCounterVec<1>, pub available_concurrent_upload_permits: IntGaugeVec<1>, + pub split_builders: IntGauge, pub ongoing_merge_operations: IntGauge, pub pending_merge_operations: IntGauge, pub pending_merge_bytes: IntGauge, @@ -66,6 +67,12 @@ impl Default for IndexerMetrics { &[], ["component"], ), + split_builders: new_gauge( + "split_builders", + "Number of existing index writer instances.", + "indexing", + &[], + ), ongoing_merge_operations: new_gauge( "ongoing_merge_operations", "Number of ongoing merge operations", diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs index 0fd90aa4855..0dff364c6f3 100644 --- a/quickwit/quickwit-indexing/src/models/indexed_split.rs +++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs @@ -185,6 +185,7 @@ pub struct IndexedSplitBatchBuilder { pub commit_trigger: CommitTrigger, pub batch_parent_span: Span, pub memory_usage: GaugeGuard, + pub _split_builders_guard: GaugeGuard, } /// Sends notifications to the Publisher that the last batch of splits was emtpy. diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index 8f84c1acc03..4dfcaa72fd5 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -31,7 +31,6 @@ pub(super) struct IngestV2Metrics { pub wal_acquire_lock_requests_in_flight: IntGaugeVec<2>, pub wal_acquire_lock_request_duration_secs: HistogramVec<2>, pub wal_disk_used_bytes: IntGauge, - pub wal_memory_allocated_bytes: IntGauge, pub wal_memory_used_bytes: IntGauge, } @@ -77,12 +76,6 @@ impl Default for IngestV2Metrics { "ingest", &[], ), - wal_memory_allocated_bytes: new_gauge( - "wal_memory_allocated_bytes", - "WAL memory allocated in bytes.", - "ingest", - &[], - ), wal_memory_used_bytes: new_gauge( "wal_memory_used_bytes", "WAL memory used in bytes.", @@ -97,8 +90,9 @@ pub(super) fn report_wal_usage(wal_usage: ResourceUsage) { INGEST_V2_METRICS .wal_disk_used_bytes .set(wal_usage.disk_used_bytes as i64); - INGEST_V2_METRICS - .wal_memory_allocated_bytes + quickwit_common::metrics::MEMORY_METRICS + .in_flight_data + .wal .set(wal_usage.memory_allocated_bytes as i64); INGEST_V2_METRICS .wal_memory_used_bytes