Skip to content

Commit

Permalink
Wal allocated into inflight data (#4728)
Browse files Browse the repository at this point in the history
* Moving WAL memory usage into in flight data so we can get a break down
per label.

* Added counter for split builders

* Apply suggestions from code review

Co-authored-by: Adrien Guillo <[email protected]>

---------

Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
fulmicoton and guilload authored Mar 18, 2024
1 parent 83d8d9e commit 07e2f47
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 17 deletions.
18 changes: 10 additions & 8 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,13 @@ impl Default for MemoryMetrics {

#[derive(Clone)]
pub struct InFlightDataGauges {
pub doc_processor_mailbox: IntGauge,
pub index_writer: IntGauge,
pub indexer_mailbox: IntGauge,
pub ingest_router: IntGauge,
pub rest_server: IntGauge,
pub ingest_router: IntGauge,
pub wal: IntGauge,
pub sources: InFlightDataSourceGauges,
pub doc_processor_mailbox: IntGauge,
pub indexer_mailbox: IntGauge,
pub index_writer: IntGauge,
}

impl Default for InFlightDataGauges {
Expand All @@ -270,12 +271,13 @@ impl Default for InFlightDataGauges {
["component"],
);
Self {
doc_processor_mailbox: in_flight_gauge_vec.with_label_values(["doc_processor_mailbox"]),
index_writer: in_flight_gauge_vec.with_label_values(["index_writer"]),
indexer_mailbox: in_flight_gauge_vec.with_label_values(["indexer_mailbox"]),
ingest_router: in_flight_gauge_vec.with_label_values(["ingest_router"]),
rest_server: in_flight_gauge_vec.with_label_values(["rest_server"]),
ingest_router: in_flight_gauge_vec.with_label_values(["ingest_router"]),
wal: in_flight_gauge_vec.with_label_values(["wal"]),
sources: InFlightDataSourceGauges::new(&in_flight_gauge_vec),
doc_processor_mailbox: in_flight_gauge_vec.with_label_values(["doc_processor_mailbox"]),
indexer_mailbox: in_flight_gauge_vec.with_label_values(["indexer_mailbox"]),
index_writer: in_flight_gauge_vec.with_label_values(["index_writer"]),
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ impl IndexerState {
let publish_lock = self.publish_lock.clone();
let publish_token_opt = self.publish_token_opt.clone();

let mut split_builders_guard =
GaugeGuard::from_gauge(&crate::metrics::INDEXER_METRICS.split_builders);
split_builders_guard.add(1);

let workbench = IndexingWorkbench {
workbench_id,
create_instant: Instant::now(),
Expand All @@ -242,6 +246,7 @@ impl IndexerState {
.in_flight_data
.index_writer,
),
split_builders_guard,
};
Ok(workbench)
}
Expand Down Expand Up @@ -364,6 +369,7 @@ struct IndexingWorkbench {
last_delete_opstamp: u64,
// Number of bytes declared as used by tantivy.
memory_usage: GaugeGuard,
split_builders_guard: GaugeGuard,
}

pub struct Indexer {
Expand Down Expand Up @@ -635,6 +641,7 @@ impl Indexer {
batch_parent_span,
indexing_permit,
memory_usage,
split_builders_guard,
..
}) = self.indexing_workbench_opt.take()
else {
Expand Down Expand Up @@ -687,6 +694,7 @@ impl Indexer {
commit_trigger,
batch_parent_span,
memory_usage,
_split_builders_guard: split_builders_guard,
},
)
.await?;
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-indexing/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct IndexerMetrics {
pub processed_bytes: IntCounterVec<2>,
pub backpressure_micros: IntCounterVec<1>,
pub available_concurrent_upload_permits: IntGaugeVec<1>,
pub split_builders: IntGauge,
pub ongoing_merge_operations: IntGauge,
pub pending_merge_operations: IntGauge,
pub pending_merge_bytes: IntGauge,
Expand Down Expand Up @@ -66,6 +67,12 @@ impl Default for IndexerMetrics {
&[],
["component"],
),
split_builders: new_gauge(
"split_builders",
"Number of existing index writer instances.",
"indexing",
&[],
),
ongoing_merge_operations: new_gauge(
"ongoing_merge_operations",
"Number of ongoing merge operations",
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/models/indexed_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ pub struct IndexedSplitBatchBuilder {
pub commit_trigger: CommitTrigger,
pub batch_parent_span: Span,
pub memory_usage: GaugeGuard,
pub _split_builders_guard: GaugeGuard,
}

/// Sends notifications to the Publisher that the last batch of splits was emtpy.
Expand Down
12 changes: 3 additions & 9 deletions quickwit/quickwit-ingest/src/ingest_v2/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub(super) struct IngestV2Metrics {
pub wal_acquire_lock_requests_in_flight: IntGaugeVec<2>,
pub wal_acquire_lock_request_duration_secs: HistogramVec<2>,
pub wal_disk_used_bytes: IntGauge,
pub wal_memory_allocated_bytes: IntGauge,
pub wal_memory_used_bytes: IntGauge,
}

Expand Down Expand Up @@ -77,12 +76,6 @@ impl Default for IngestV2Metrics {
"ingest",
&[],
),
wal_memory_allocated_bytes: new_gauge(
"wal_memory_allocated_bytes",
"WAL memory allocated in bytes.",
"ingest",
&[],
),
wal_memory_used_bytes: new_gauge(
"wal_memory_used_bytes",
"WAL memory used in bytes.",
Expand All @@ -97,8 +90,9 @@ pub(super) fn report_wal_usage(wal_usage: ResourceUsage) {
INGEST_V2_METRICS
.wal_disk_used_bytes
.set(wal_usage.disk_used_bytes as i64);
INGEST_V2_METRICS
.wal_memory_allocated_bytes
quickwit_common::metrics::MEMORY_METRICS
.in_flight_data
.wal
.set(wal_usage.memory_allocated_bytes as i64);
INGEST_V2_METRICS
.wal_memory_used_bytes
Expand Down

0 comments on commit 07e2f47

Please sign in to comment.