Skip to content

Commit

Permalink
Track bytes in-flight in ingester persist and replicate methods (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Mar 23, 2024
1 parent 150d2cf commit 2e870ac
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 93 deletions.
178 changes: 93 additions & 85 deletions quickwit/Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ impl Default for MemoryMetrics {
pub struct InFlightDataGauges {
pub rest_server: IntGauge,
pub ingest_router: IntGauge,
pub ingester_persist: IntGauge,
pub ingester_replicate: IntGauge,
pub wal: IntGauge,
pub fetch_stream: IntGauge,
pub multi_fetch_stream: IntGauge,
Expand All @@ -275,6 +277,8 @@ impl Default for InFlightDataGauges {
Self {
rest_server: in_flight_gauge_vec.with_label_values(["rest_server"]),
ingest_router: in_flight_gauge_vec.with_label_values(["ingest_router"]),
ingester_persist: in_flight_gauge_vec.with_label_values(["ingester_persist"]),
ingester_replicate: in_flight_gauge_vec.with_label_values(["ingester_replicate"]),
wal: in_flight_gauge_vec.with_label_values(["wal"]),
fetch_stream: in_flight_gauge_vec.with_label_values(["fetch_stream"]),
multi_fetch_stream: in_flight_gauge_vec.with_label_values(["multi_fetch_stream"]),
Expand Down
14 changes: 14 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use mrecordlog::error::CreateQueueError;
use quickwit_cluster::Cluster;
use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
use quickwit_common::pretty::PrettyDisplay;
use quickwit_common::pubsub::{EventBroker, EventSubscriber};
use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings};
Expand Down Expand Up @@ -968,6 +969,19 @@ impl IngesterService for Ingester {
&mut self,
persist_request: PersistRequest,
) -> IngestV2Result<PersistResponse> {
// If the request is local, the amount of memory it occupies is already
// accounted for in the router.
let request_size_bytes = persist_request
.subrequests
.iter()
.flat_map(|subrequest| match &subrequest.doc_batch {
Some(doc_batch) if doc_batch.doc_buffer.is_unique() => Some(doc_batch.num_bytes()),
_ => None,
})
.sum::<usize>();
let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_persist);
gauge_guard.add(request_size_bytes as i64);

self.persist_inner(persist_request).await
}

Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ mod tests {
let doc_batch = doc_batch_builder.build().unwrap();

assert_eq!(doc_batch.num_docs(), 2);
assert_eq!(doc_batch.num_bytes(), 13);
assert_eq!(doc_batch.num_bytes(), 21);
assert_eq!(doc_batch.doc_lengths, [7, 6]);
assert_eq!(doc_batch.doc_buffer, Bytes::from(&b"Hello, World!"[..]));
}
Expand Down Expand Up @@ -278,7 +278,7 @@ mod tests {
.as_ref()
.unwrap()
.num_bytes(),
13
21
);
assert_eq!(
ingest_request.subrequests[0]
Expand Down Expand Up @@ -313,7 +313,7 @@ mod tests {
.as_ref()
.unwrap()
.num_bytes(),
12
20
);
assert_eq!(
ingest_request.subrequests[1]
Expand Down Expand Up @@ -345,6 +345,6 @@ mod tests {
doc_buffer: vec![0u8; 100].into(),
doc_lengths: vec![10, 20, 30],
};
assert_eq!(estimate_size(&doc_batch), ByteSize(106));
assert_eq!(estimate_size(&doc_batch), ByteSize(118));
}
}
5 changes: 5 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::time::{Duration, Instant};
use bytesize::ByteSize;
use futures::{Future, StreamExt};
use mrecordlog::error::CreateQueueError;
use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS};
use quickwit_common::{rate_limited_warn, ServiceStream};
use quickwit_proto::ingest::ingester::{
ack_replication_message, syn_replication_message, AckReplicationMessage, IngesterStatus,
Expand Down Expand Up @@ -507,6 +508,10 @@ impl ReplicationTask {
self.current_replication_seqno, replicate_request.replication_seqno
)));
}
let request_size_bytes = replicate_request.num_bytes();
let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_replicate);
gauge_guard.add(request_size_bytes as i64);

self.current_replication_seqno += 1;

let commit_type = replicate_request.commit_type();
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ impl IngestRouterService for IngestRouter {

let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingest_router);
gauge_guard.add(request_size_bytes as i64);

let _permit = self
.ingest_semaphore
.clone()
Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-proto/src/ingest/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,16 @@ impl AckReplicationMessage {
}
}

impl ReplicateRequest {
pub fn num_bytes(&self) -> usize {
self.subrequests
.iter()
.flat_map(|subrequest| &subrequest.doc_batch)
.map(|doc_batch| doc_batch.num_bytes())
.sum()
}
}

impl ReplicateSubrequest {
pub fn shard_id(&self) -> &ShardId {
self.shard_id
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-proto/src/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl DocBatchV2 {
}

pub fn num_bytes(&self) -> usize {
self.doc_buffer.len()
self.doc_buffer.len() + self.doc_lengths.len() * 4
}

pub fn num_docs(&self) -> usize {
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ mod tests {
assert_eq!(subrequests[0].index_id, "my-index-1");
assert_eq!(subrequests[0].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_docs(), 2);
assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_bytes(), 96);
assert_eq!(subrequests[0].doc_batch.as_ref().unwrap().num_bytes(), 104);

assert_eq!(subrequests[1].index_id, "my-index-2");
assert_eq!(subrequests[1].source_id, INGEST_V2_SOURCE_ID);
assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_docs(), 1);
assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_bytes(), 48);
assert_eq!(subrequests[1].doc_batch.as_ref().unwrap().num_bytes(), 52);

Ok(IngestResponseV2 {
successes: vec![
Expand Down Expand Up @@ -238,7 +238,7 @@ mod tests {
assert_eq!(subrequest_0.index_id, "my-index-1");
assert_eq!(subrequest_0.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_docs(), 1);
assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_bytes(), 48);
assert_eq!(subrequest_0.doc_batch.as_ref().unwrap().num_bytes(), 52);

Ok(IngestResponseV2 {
successes: vec![IngestSuccess {
Expand Down

0 comments on commit 2e870ac

Please sign in to comment.