Skip to content

Commit

Permalink
Report WAL allocated bytes in Prom metrics (#4707)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Mar 8, 2024
1 parent 77be8e7 commit f2f0639
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 117 deletions.
2 changes: 1 addition & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ matches = "0.1.9"
md5 = "0.7"
mime_guess = "2.0.4"
mockall = "0.11"
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "2c593d3" }
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "187486f" }
new_string_template = "1.4.0"
nom = "7.1.3"
num_cpus = "1"
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ impl IngestApiService {
index_id: index_id.to_string(),
});
}
let disk_usage = self.queues.disk_usage();
let disk_used = self.queues.resource_usage().disk_used_bytes;

if disk_usage > self.disk_limit {
if disk_used > self.disk_limit {
info!("ingestion rejected due to disk limit");
return Err(IngestServiceError::RateLimited);
}
Expand Down Expand Up @@ -239,8 +239,8 @@ impl IngestApiService {
.suggest_truncate(&request.index_id, request.up_to_position_included, ctx)
.await?;

let memory_usage = self.queues.memory_usage();
let new_capacity = self.memory_limit - memory_usage;
let memory_used = self.queues.resource_usage().memory_used_bytes;
let new_capacity = self.memory_limit - memory_used;
self.memory_capacity.reset_capacity(new_capacity);

Ok(())
Expand Down
72 changes: 27 additions & 45 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ use super::replication::{
};
use super::state::{IngesterState, InnerIngesterState, WeakIngesterState};
use super::IngesterPool;
use crate::ingest_v2::metrics::report_wal_usage;
use crate::metrics::INGEST_METRICS;
use crate::mrecordlog_async::MultiRecordLogAsync;
use crate::{estimate_size, with_lock_metrics, FollowerId};
Expand Down Expand Up @@ -318,12 +319,9 @@ impl Ingester {
.reset_shards_operations_total
.with_label_values(["success"])
.inc();
INGEST_V2_METRICS
.wal_disk_usage_bytes
.set(state_guard.mrecordlog.disk_usage() as i64);
INGEST_V2_METRICS
.wal_memory_usage_bytes
.set(state_guard.mrecordlog.memory_usage() as i64);

let wal_usage = state_guard.mrecordlog.resource_usage();
report_wal_usage(wal_usage);
}
Ok(Err(error)) => {
warn!("advise reset shards request failed: {error}");
Expand Down Expand Up @@ -516,31 +514,27 @@ impl Ingester {
};
let requested_capacity = estimate_size(&doc_batch);

match check_enough_capacity(
if let Err(error) = check_enough_capacity(
&state_guard.mrecordlog,
self.disk_capacity,
self.memory_capacity,
requested_capacity + sum_of_requested_capacity,
) {
Ok(_usage) => (),
Err(error) => {
rate_limited_warn!(
limit_per_min = 10,
"failed to persist records to ingester `{}`: {error}",
self.self_node_id
);
let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::ResourceExhausted as i32,
};
persist_failures.push(persist_failure);
continue;
}
rate_limited_warn!(
limit_per_min = 10,
"failed to persist records to ingester `{}`: {error}",
self.self_node_id
);
let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::ResourceExhausted as i32,
};
persist_failures.push(persist_failure);
continue;
};

let (rate_limiter, rate_meter) = state_guard
.rate_trackers
.get_mut(&queue_id)
Expand Down Expand Up @@ -768,20 +762,15 @@ impl Ingester {
}
info!("deleted {} dangling shard(s)", shards_to_delete.len());
}
let disk_usage = state_guard.mrecordlog.disk_usage() as u64;
let wal_usage = state_guard.mrecordlog.resource_usage();
drop(state_guard);

let disk_used = wal_usage.disk_used_bytes as u64;

if disk_usage >= self.disk_capacity.as_u64() * 90 / 100 {
if disk_used >= self.disk_capacity.as_u64() * 90 / 100 {
self.background_reset_shards();
}

INGEST_V2_METRICS
.wal_disk_usage_bytes
.set(disk_usage as i64);
INGEST_V2_METRICS
.wal_memory_usage_bytes
.set(state_guard.mrecordlog.memory_usage() as i64);

drop(state_guard);
report_wal_usage(wal_usage);

let leader_id = self.self_node_id.to_string();
let persist_response = PersistResponse {
Expand Down Expand Up @@ -935,15 +924,8 @@ impl Ingester {
.await;
}
}
let current_disk_usage = state_guard.mrecordlog.disk_usage();
let current_memory_usage = state_guard.mrecordlog.memory_usage();

INGEST_V2_METRICS
.wal_disk_usage_bytes
.set(current_disk_usage as i64);
INGEST_V2_METRICS
.wal_memory_usage_bytes
.set(current_memory_usage as i64);
let wal_usage = state_guard.mrecordlog.resource_usage();
report_wal_usage(wal_usage);

self.check_decommissioning_status(&mut state_guard);
let truncate_response = TruncateShardsResponse {};
Expand Down
35 changes: 27 additions & 8 deletions quickwit/quickwit-ingest/src/ingest_v2/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use mrecordlog::ResourceUsage;
use once_cell::sync::Lazy;
use quickwit_common::metrics::{
new_counter_vec, new_gauge, new_gauge_vec, new_histogram_vec, HistogramVec, IntCounterVec,
Expand All @@ -28,8 +29,9 @@ pub(super) struct IngestV2Metrics {
pub shards: IntGaugeVec<2>,
pub wal_acquire_lock_requests_in_flight: IntGaugeVec<2>,
pub wal_acquire_lock_request_duration_secs: HistogramVec<2>,
pub wal_disk_usage_bytes: IntGauge,
pub wal_memory_usage_bytes: IntGauge,
pub wal_disk_used_bytes: IntGauge,
pub wal_memory_allocated_bytes: IntGauge,
pub wal_memory_used_bytes: IntGauge,
}

impl Default for IngestV2Metrics {
Expand Down Expand Up @@ -63,20 +65,37 @@ impl Default for IngestV2Metrics {
&[],
["operation", "type"],
),
wal_disk_usage_bytes: new_gauge(
"wal_disk_usage_bytes",
"WAL disk usage in bytes.",
wal_disk_used_bytes: new_gauge(
"wal_disk_used_bytes",
"WAL disk space used in bytes.",
"ingest",
&[],
),
wal_memory_usage_bytes: new_gauge(
"wal_memory_usage_bytes",
"WAL memory usage in bytes.",
wal_memory_allocated_bytes: new_gauge(
"wal_memory_allocated_bytes",
"WAL memory allocated in bytes.",
"ingest",
),
wal_memory_used_bytes: new_gauge(
"wal_memory_used_bytes",
"WAL memory used in bytes.",
"ingest",
&[],
),
}
}
}

pub(super) fn report_wal_usage(wal_usage: ResourceUsage) {
INGEST_V2_METRICS
.wal_disk_used_bytes
.set(wal_usage.disk_used_bytes as i64);
INGEST_V2_METRICS
.wal_memory_allocated_bytes
.set(wal_usage.memory_allocated_bytes as i64);
INGEST_V2_METRICS
.wal_memory_used_bytes
.set(wal_usage.memory_used_bytes as i64);
}

pub(super) static INGEST_V2_METRICS: Lazy<IngestV2Metrics> = Lazy::new(IngestV2Metrics::default);
27 changes: 9 additions & 18 deletions quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,6 @@ pub(super) async fn append_non_empty_doc_batch(
}
}

#[derive(Debug, Clone, Copy)]
pub(super) struct MRecordLogUsage {
pub disk: ByteSize,
pub memory: ByteSize,
}

/// Error returned when the mrecordlog does not have enough capacity to store some records.
#[derive(Debug, Clone, Copy, thiserror::Error)]
pub(super) enum NotEnoughCapacityError {
Expand Down Expand Up @@ -118,30 +112,27 @@ pub(super) fn check_enough_capacity(
disk_capacity: ByteSize,
memory_capacity: ByteSize,
requested_capacity: ByteSize,
) -> Result<MRecordLogUsage, NotEnoughCapacityError> {
let disk_usage = ByteSize(mrecordlog.disk_usage() as u64);
) -> Result<(), NotEnoughCapacityError> {
let wal_usage = mrecordlog.resource_usage();
let disk_used = ByteSize(wal_usage.disk_used_bytes as u64);

if disk_usage + requested_capacity > disk_capacity {
if disk_used + requested_capacity > disk_capacity {
return Err(NotEnoughCapacityError::Disk {
usage: disk_usage,
usage: disk_used,
capacity: disk_capacity,
requested: requested_capacity,
});
}
let memory_usage = ByteSize(mrecordlog.memory_usage() as u64);
let memory_used = ByteSize(wal_usage.memory_used_bytes as u64);

if memory_usage + requested_capacity > memory_capacity {
if memory_used + requested_capacity > memory_capacity {
return Err(NotEnoughCapacityError::Memory {
usage: memory_usage,
usage: memory_used,
capacity: memory_capacity,
requested: requested_capacity,
});
}
let usage = MRecordLogUsage {
disk: disk_usage,
memory: memory_usage,
};
Ok(usage)
Ok(())
}

/// Deletes a queue from the WAL. Returns without error if the queue does not exist.
Expand Down
48 changes: 22 additions & 26 deletions quickwit/quickwit-ingest/src/ingest_v2/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::time::{Duration, Instant};

use bytesize::ByteSize;
use futures::{Future, StreamExt};
use quickwit_common::ServiceStream;
use quickwit_common::{rate_limited_warn, ServiceStream};
use quickwit_proto::ingest::ingester::{
ack_replication_message, syn_replication_message, AckReplicationMessage, IngesterStatus,
InitReplicaRequest, InitReplicaResponse, ReplicateFailure, ReplicateFailureReason,
Expand All @@ -36,11 +36,11 @@ use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tracing::{error, warn};

use super::metrics::report_wal_usage;
use super::models::IngesterShard;
use super::mrecord::MRecord;
use super::mrecordlog_utils::check_enough_capacity;
use super::state::IngesterState;
use crate::ingest_v2::metrics::INGEST_V2_METRICS;
use crate::metrics::INGEST_METRICS;
use crate::{estimate_size, with_lock_metrics};

Expand Down Expand Up @@ -588,26 +588,27 @@ impl ReplicationTask {

let requested_capacity = estimate_size(&doc_batch);

let current_usage = match check_enough_capacity(
if let Err(error) = check_enough_capacity(
&state_guard.mrecordlog,
self.disk_capacity,
self.memory_capacity,
requested_capacity,
) {
Ok(usage) => usage,
Err(error) => {
warn!("failed to replicate records: {error}");
rate_limited_warn!(
limit_per_min = 10,
"failed to replicate records to ingester `{}`: {error}",
self.follower_id,
);

let replicate_failure = ReplicateFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: ReplicateFailureReason::ResourceExhausted as i32,
};
replicate_failures.push(replicate_failure);
continue;
}
let replicate_failure = ReplicateFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: ReplicateFailureReason::ResourceExhausted as i32,
};
replicate_failures.push(replicate_failure);
continue;
};
let current_position_inclusive: Position = if force_commit {
let encoded_mrecords = doc_batch
Expand All @@ -630,16 +631,6 @@ impl ReplicationTask {
.map(Position::offset)
.expect("records should not be empty");

let new_disk_usage = current_usage.disk + requested_capacity;
let new_memory_usage = current_usage.memory + requested_capacity;

INGEST_V2_METRICS
.wal_disk_usage_bytes
.set(new_disk_usage.as_u64() as i64);
INGEST_V2_METRICS
.wal_memory_usage_bytes
.set(new_memory_usage.as_u64() as i64);

INGEST_METRICS
.replicated_num_bytes_total
.inc_by(batch_num_bytes);
Expand All @@ -663,6 +654,11 @@ impl ReplicationTask {
};
replicate_successes.push(replicate_success);
}
let wal_usage = state_guard.mrecordlog.resource_usage();
drop(state_guard);

report_wal_usage(wal_usage);

let follower_id = self.follower_id.clone().into();

let replicate_response = ReplicateResponse {
Expand Down
10 changes: 3 additions & 7 deletions quickwit/quickwit-ingest/src/mrecordlog_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::path::Path;

use bytes::Buf;
use mrecordlog::error::*;
use mrecordlog::{MultiRecordLog, Record, SyncPolicy};
use mrecordlog::{MultiRecordLog, Record, ResourceUsage, SyncPolicy};
use tokio::task::JoinError;
use tracing::error;

Expand Down Expand Up @@ -185,11 +185,7 @@ impl MultiRecordLogAsync {
self.mrecordlog_ref().last_record(queue)
}

pub fn memory_usage(&self) -> usize {
self.mrecordlog_ref().memory_usage()
}

pub fn disk_usage(&self) -> usize {
self.mrecordlog_ref().disk_usage()
pub fn resource_usage(&self) -> ResourceUsage {
self.mrecordlog_ref().resource_usage()
}
}
Loading

0 comments on commit f2f0639

Please sign in to comment.