Skip to content

Commit

Permalink
Measure amount of data in-flight in various buffers (#4701)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Mar 8, 2024
1 parent dd10ecf commit db520bf
Show file tree
Hide file tree
Showing 36 changed files with 602 additions and 335 deletions.
39 changes: 20 additions & 19 deletions quickwit/quickwit-cli/src/jemalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,47 +19,48 @@

use std::time::Duration;

use quickwit_common::metrics::new_gauge;
use quickwit_common::metrics::MEMORY_METRICS;
use tikv_jemallocator::Jemalloc;
use tracing::error;

#[global_allocator]
pub static GLOBAL: Jemalloc = Jemalloc;

pub const JEMALLOC_METRICS_POLLING_INTERVAL: Duration = Duration::from_secs(1);
const JEMALLOC_METRICS_POLLING_INTERVAL: Duration = Duration::from_secs(1);

pub async fn jemalloc_metrics_loop() -> tikv_jemalloc_ctl::Result<()> {
let allocated_gauge = new_gauge(
"allocated_num_bytes",
"Number of bytes allocated memory, as reported by jemallocated.",
"quickwit",
);
let memory_metrics = MEMORY_METRICS.clone();

// Obtain a MIB for the `epoch`, `stats.allocated`, and
// `atats.resident` keys:
let epoch_management_information_base = tikv_jemalloc_ctl::epoch::mib()?;
let allocated = tikv_jemalloc_ctl::stats::allocated::mib()?;
// Obtain a MIB for the `epoch`, `stats.active`, `stats.allocated`, and `stats.resident` keys:
let epoch_mib = tikv_jemalloc_ctl::epoch::mib()?;
let active_mib = tikv_jemalloc_ctl::stats::active::mib()?;
let allocated_mib = tikv_jemalloc_ctl::stats::allocated::mib()?;
let resident_mib = tikv_jemalloc_ctl::stats::resident::mib()?;

let mut poll_interval = tokio::time::interval(JEMALLOC_METRICS_POLLING_INTERVAL);

loop {
poll_interval.tick().await;

// Many statistics are cached and only updated
// when the epoch is advanced:
epoch_management_information_base.advance()?;
// Many statistics are cached and only updated when the epoch is advanced:
epoch_mib.advance()?;

// Read statistics using MIB key:
let allocated = allocated.read()?;
// Read statistics using MIB keys:
let active = active_mib.read()?;
memory_metrics.active_bytes.set(active as i64);

allocated_gauge.set(allocated as i64);
let allocated = allocated_mib.read()?;
memory_metrics.allocated_bytes.set(allocated as i64);

let resident = resident_mib.read()?;
memory_metrics.resident_bytes.set(resident as i64);
}
}

pub fn start_jemalloc_metrics_loop() {
tokio::task::spawn(async {
if let Err(jemalloc_metrics_err) = jemalloc_metrics_loop().await {
error!(err=?jemalloc_metrics_err, "failed to gather metrics from jemalloc");
if let Err(error) = jemalloc_metrics_loop().await {
error!(%error, "failed to collect metrics from jemalloc");
}
});
}
7 changes: 7 additions & 0 deletions quickwit/quickwit-cluster/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,36 +51,43 @@ impl Default for ClusterMetrics {
"live_nodes",
"The number of live nodes observed locally.",
"cluster",
&[],
),
ready_nodes: new_gauge(
"ready_nodes",
"The number of ready nodes observed locally.",
"cluster",
&[],
),
zombie_nodes: new_gauge(
"zombie_nodes",
"The number of zombie nodes observed locally.",
"cluster",
&[],
),
dead_nodes: new_gauge(
"dead_nodes",
"The number of dead nodes observed locally.",
"cluster",
&[],
),
cluster_state_size_bytes: new_gauge(
"cluster_state_size_bytes",
"The size of the cluster state in bytes.",
"cluster",
&[],
),
node_state_keys: new_gauge(
"node_state_keys",
"The number of keys in the node state.",
"cluster",
&[],
),
node_state_size_bytes: new_gauge(
"node_state_size_bytes",
"The size of the node state in bytes.",
"cluster",
&[],
),
gossip_recv_messages_total: new_counter(
"gossip_recv_messages_total",
Expand Down
181 changes: 174 additions & 7 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::collections::HashMap;

use once_cell::sync::Lazy;
use prometheus::{Encoder, HistogramOpts, Opts, TextEncoder};
pub use prometheus::{
Histogram, HistogramTimer, HistogramVec as PrometheusHistogramVec, IntCounter,
Expand Down Expand Up @@ -91,10 +92,20 @@ pub fn new_counter_vec<const N: usize>(
IntCounterVec { underlying }
}

pub fn new_gauge(name: &str, help: &str, subsystem: &str) -> IntGauge {
pub fn new_gauge(
name: &str,
help: &str,
subsystem: &str,
const_labels: &[(&str, &str)],
) -> IntGauge {
let owned_const_labels: HashMap<String, String> = const_labels
.iter()
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
.collect();
let gauge_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem);
.subsystem(subsystem)
.const_labels(owned_const_labels);
let gauge = IntGauge::with_opts(gauge_opts).expect("failed to create gauge");
prometheus::register(Box::new(gauge.clone())).expect("failed to register gauge");
gauge
Expand Down Expand Up @@ -157,18 +168,32 @@ pub fn new_histogram_vec<const N: usize>(
HistogramVec { underlying }
}

pub struct GaugeGuard(&'static IntGauge);
pub struct GaugeGuard {
gauge: &'static IntGauge,
delta: i64,
}

impl GaugeGuard {
pub fn from_gauge(gauge: &'static IntGauge) -> Self {
gauge.inc();
Self(gauge)
pub fn from_gauge(gauge: &'static IntGauge, delta: i64) -> Self {
gauge.add(delta);

Self { gauge, delta }
}

pub fn add(&mut self, delta: i64) {
self.delta += delta;
self.gauge.add(self.delta);
}

pub fn sub(&mut self, delta: i64) {
self.delta -= delta;
self.gauge.sub(delta);
}
}

impl Drop for GaugeGuard {
fn drop(&mut self) {
self.0.dec();
self.gauge.sub(self.delta)
}
}

Expand All @@ -179,3 +204,145 @@ pub fn metrics_text_payload() -> String {
let _ = encoder.encode(&metric_families, &mut buffer); // TODO avoid ignoring the error.
String::from_utf8_lossy(&buffer).to_string()
}

#[derive(Clone)]
pub struct MemoryMetrics {
pub active_bytes: IntGauge,
pub allocated_bytes: IntGauge,
pub resident_bytes: IntGauge,
pub in_flight_data: InFlightDataGauges,
}

impl Default for MemoryMetrics {
fn default() -> Self {
Self {
active_bytes: new_gauge(
"active_bytes",
"Total number of bytes in active pages allocated by the application, as reported \
by jemalloc `stats.active`.",
"memory",
&[],
),
allocated_bytes: new_gauge(
"allocated_bytes",
"Total number of bytes allocated by the application, as reported by jemalloc \
`stats.allocated`.",
"memory",
&[],
),
resident_bytes: new_gauge(
"resident_bytes",
" Total number of bytes in physically resident data pages mapped by the \
allocator, as reported by jemalloc `stats.resident`.",
"memory",
&[],
),
in_flight_data: InFlightDataGauges::default(),
}
}
}

#[derive(Clone)]
pub struct InFlightDataGauges {
pub doc_processor_mailbox: IntGauge,
pub indexer_mailbox: IntGauge,
pub ingest_router: IntGauge,
pub rest_server: IntGauge,
pub sources: InFlightDataSourceGauges,
}

const IN_FLIGHT_DATA_GAUGES_HELP: &str = "Amount of data in-flight in various buffers in bytes.";

impl Default for InFlightDataGauges {
fn default() -> Self {
Self {
doc_processor_mailbox: new_gauge(
"in_flight_data_bytes",
IN_FLIGHT_DATA_GAUGES_HELP,
"memory",
&[("component", "doc_processor_mailbox")],
),
indexer_mailbox: new_gauge(
"in_flight_data_bytes",
IN_FLIGHT_DATA_GAUGES_HELP,
"memory",
&[("component", "indexer_mailbox")],
),
ingest_router: new_gauge(
"in_flight_data_bytes",
IN_FLIGHT_DATA_GAUGES_HELP,
"memory",
&[("component", "ingest_router")],
),
rest_server: new_gauge(
"in_flight_data_bytes",
IN_FLIGHT_DATA_GAUGES_HELP,
"memory",
&[("component", "rest_server")],
),
sources: InFlightDataSourceGauges::default(),
}
}
}

#[derive(Clone)]
pub struct InFlightDataSourceGauges {
pub file: IntGauge,
pub ingest: IntGauge,
pub kafka: IntGauge,
pub kinesis: IntGauge,
pub pubsub: IntGauge,
pub pulsar: IntGauge,
pub other: IntGauge,
}

impl Default for InFlightDataSourceGauges {
fn default() -> Self {
Self {
file: new_gauge(
"in_flight_data_bytes",
IN_FLIGHT_DATA_GAUGES_HELP,
"memory",
&[("component", "file_source")],
),
ingest: new_gauge(
"in_flight_data_bytes",
IN_FLIGHT_DATA_GAUGES_HELP,
"memory",
&[("component", "ingest_source")],
),
kafka: new_gauge(
"in_flight_data_bytes",
IN_FLIGHT_DATA_GAUGES_HELP,
"memory",
&[("component", "kafka_source")],
),
kinesis: new_gauge(
"in_flight_data_bytes",
IN_FLIGHT_DATA_GAUGES_HELP,
"memory",
&[("component", "kinesis_source")],
),
pubsub: new_gauge(
"in_flight_data_bytes",
IN_FLIGHT_DATA_GAUGES_HELP,
"memory",
&[("component", "pubsub_source")],
),
pulsar: new_gauge(
"in_flight_data_bytes",
IN_FLIGHT_DATA_GAUGES_HELP,
"memory",
&[("component", "pulsar")],
),
other: new_gauge(
"in_flight_data_bytes",
IN_FLIGHT_DATA_GAUGES_HELP,
"memory",
&[("component", "other")],
),
}
}
}

pub static MEMORY_METRICS: Lazy<MemoryMetrics> = Lazy::new(MemoryMetrics::default);
10 changes: 5 additions & 5 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value as JsonValue;
pub use source_config::{
load_source_config_from_user_config, FileSourceParams, GcpPubSubSourceParams,
KafkaSourceParams, KinesisSourceParams, PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint,
SourceConfig, SourceInputFormat, SourceParams, TransformConfig, VecSourceParams,
VoidSourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
load_source_config_from_user_config, FileSourceParams, KafkaSourceParams, KinesisSourceParams,
PubSubSourceParams, PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig,
SourceInputFormat, SourceParams, TransformConfig, VecSourceParams, VoidSourceParams,
CLI_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
};
use tracing::warn;

Expand Down Expand Up @@ -95,7 +95,7 @@ pub use crate::storage_config::{
SourceInputFormat,
SourceParams,
FileSourceParams,
GcpPubSubSourceParams,
PubSubSourceParams,
KafkaSourceParams,
KinesisSourceParams,
PulsarSourceParams,
Expand Down
9 changes: 5 additions & 4 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ impl SourceConfig {
pub fn source_type(&self) -> SourceType {
match self.source_params {
SourceParams::File(_) => SourceType::File,
SourceParams::GcpPubSub(_) => SourceType::GcpPubsub,
SourceParams::Ingest => SourceType::IngestV2,
SourceParams::IngestApi => SourceType::IngestV1,
SourceParams::IngestCli => SourceType::Cli,
SourceParams::Kafka(_) => SourceType::Kafka,
SourceParams::Kinesis(_) => SourceType::Kinesis,
SourceParams::PubSub(_) => SourceType::PubSub,
SourceParams::Pulsar(_) => SourceType::Pulsar,
SourceParams::Vec(_) => SourceType::Vec,
SourceParams::Void(_) => SourceType::Void,
Expand All @@ -109,7 +109,7 @@ impl SourceConfig {
pub fn params(&self) -> JsonValue {
match &self.source_params {
SourceParams::File(params) => serde_json::to_value(params),
SourceParams::GcpPubSub(params) => serde_json::to_value(params),
SourceParams::PubSub(params) => serde_json::to_value(params),
SourceParams::Ingest => serde_json::to_value(()),
SourceParams::IngestApi => serde_json::to_value(()),
SourceParams::IngestCli => serde_json::to_value(()),
Expand Down Expand Up @@ -229,14 +229,15 @@ impl FromStr for SourceInputFormat {
#[serde(tag = "source_type", content = "params", rename_all = "snake_case")]
pub enum SourceParams {
File(FileSourceParams),
GcpPubSub(GcpPubSubSourceParams),
Ingest,
#[serde(rename = "ingest-api")]
IngestApi,
#[serde(rename = "ingest-cli")]
IngestCli,
Kafka(KafkaSourceParams),
Kinesis(KinesisSourceParams),
#[serde(rename = "pubsub")]
PubSub(PubSubSourceParams),
Pulsar(PulsarSourceParams),
Vec(VecSourceParams),
Void(VoidSourceParams),
Expand Down Expand Up @@ -316,7 +317,7 @@ pub struct KafkaSourceParams {

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct GcpPubSubSourceParams {
pub struct PubSubSourceParams {
/// Name of the subscription that the source consumes.
pub subscription: String,
/// When backfill mode is enabled, the source exits after reaching the end of the topic.
Expand Down
Loading

0 comments on commit db520bf

Please sign in to comment.