From db520bf86ee1d02af93ee28fbddbb9db5d862d88 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Fri, 8 Mar 2024 12:46:37 -0500 Subject: [PATCH] Measure amount of data in-flight in various buffers (#4701) --- quickwit/quickwit-cli/src/jemalloc.rs | 39 ++-- quickwit/quickwit-cluster/src/metrics.rs | 7 + quickwit/quickwit-common/src/metrics.rs | 181 +++++++++++++++++- quickwit/quickwit-config/src/lib.rs | 10 +- .../quickwit-config/src/source_config/mod.rs | 9 +- .../src/source_config/serialize.rs | 4 +- .../src/indexing_scheduler/mod.rs | 2 +- .../src/actors/doc_processor.rs | 77 +++----- .../quickwit-indexing/src/actors/indexer.rs | 140 +++++++------- quickwit/quickwit-indexing/src/metrics.rs | 3 + .../src/models/processed_doc.rs | 22 +++ .../src/models/raw_doc_batch.rs | 54 +++--- .../src/source/file_source.rs | 16 +- .../src/source/gcp_pubsub_source.rs | 29 +-- .../src/source/ingest/mod.rs | 6 +- .../src/source/ingest_api_source.rs | 18 +- .../src/source/kafka_source.rs | 57 +++--- .../src/source/kinesis/kinesis_source.rs | 40 ++-- quickwit/quickwit-indexing/src/source/mod.rs | 55 ++++-- .../src/source/pulsar_source.rs | 27 +-- .../src/source/vec_source.rs | 29 +-- .../quickwit-ingest/src/ingest_v2/metrics.rs | 2 + .../quickwit-ingest/src/ingest_v2/router.rs | 9 +- quickwit/quickwit-ingest/src/metrics.rs | 7 +- .../protos/quickwit/metastore.proto | 6 +- .../codegen/quickwit/quickwit.metastore.rs | 10 +- quickwit/quickwit-proto/src/metastore/mod.rs | 2 +- quickwit/quickwit-search/src/metrics.rs | 7 +- quickwit/quickwit-search/src/thread_pool.rs | 2 +- quickwit/quickwit-serve/src/decompression.rs | 23 ++- .../src/elasticsearch_api/bulk.rs | 7 +- .../src/elasticsearch_api/bulk_v2.rs | 6 +- .../src/elasticsearch_api/filter.rs | 5 +- .../src/ingest_api/rest_handler.rs | 16 +- quickwit/quickwit-serve/src/lib.rs | 1 + quickwit/quickwit-storage/src/metrics.rs | 9 +- 36 files changed, 602 insertions(+), 335 deletions(-) diff --git a/quickwit/quickwit-cli/src/jemalloc.rs b/quickwit/quickwit-cli/src/jemalloc.rs index b2cc7bf134d..f9ab5bfe869 100644 --- a/quickwit/quickwit-cli/src/jemalloc.rs +++ b/quickwit/quickwit-cli/src/jemalloc.rs @@ -19,47 +19,48 @@ use std::time::Duration; -use quickwit_common::metrics::new_gauge; +use quickwit_common::metrics::MEMORY_METRICS; use tikv_jemallocator::Jemalloc; use tracing::error; #[global_allocator] pub static GLOBAL: Jemalloc = Jemalloc; -pub const JEMALLOC_METRICS_POLLING_INTERVAL: Duration = Duration::from_secs(1); +const JEMALLOC_METRICS_POLLING_INTERVAL: Duration = Duration::from_secs(1); pub async fn jemalloc_metrics_loop() -> tikv_jemalloc_ctl::Result<()> { - let allocated_gauge = new_gauge( - "allocated_num_bytes", - "Number of bytes allocated memory, as reported by jemallocated.", - "quickwit", - ); + let memory_metrics = MEMORY_METRICS.clone(); - // Obtain a MIB for the `epoch`, `stats.allocated`, and - // `atats.resident` keys: - let epoch_management_information_base = tikv_jemalloc_ctl::epoch::mib()?; - let allocated = tikv_jemalloc_ctl::stats::allocated::mib()?; + // Obtain a MIB for the `epoch`, `stats.active`, `stats.allocated`, and `stats.resident` keys: + let epoch_mib = tikv_jemalloc_ctl::epoch::mib()?; + let active_mib = tikv_jemalloc_ctl::stats::active::mib()?; + let allocated_mib = tikv_jemalloc_ctl::stats::allocated::mib()?; + let resident_mib = tikv_jemalloc_ctl::stats::resident::mib()?; let mut poll_interval = tokio::time::interval(JEMALLOC_METRICS_POLLING_INTERVAL); loop { poll_interval.tick().await; - // Many statistics are cached and only updated - // when the epoch is advanced: - epoch_management_information_base.advance()?; + // Many statistics are cached and only updated when the epoch is advanced: + epoch_mib.advance()?; - // Read statistics using MIB key: - let allocated = allocated.read()?; + // Read statistics using MIB keys: + let active = active_mib.read()?; + memory_metrics.active_bytes.set(active as i64); - allocated_gauge.set(allocated as i64); + let allocated = allocated_mib.read()?; + memory_metrics.allocated_bytes.set(allocated as i64); + + let resident = resident_mib.read()?; + memory_metrics.resident_bytes.set(resident as i64); } } pub fn start_jemalloc_metrics_loop() { tokio::task::spawn(async { - if let Err(jemalloc_metrics_err) = jemalloc_metrics_loop().await { - error!(err=?jemalloc_metrics_err, "failed to gather metrics from jemalloc"); + if let Err(error) = jemalloc_metrics_loop().await { + error!(%error, "failed to collect metrics from jemalloc"); } }); } diff --git a/quickwit/quickwit-cluster/src/metrics.rs b/quickwit/quickwit-cluster/src/metrics.rs index 85f996bc9a0..8e5534f503a 100644 --- a/quickwit/quickwit-cluster/src/metrics.rs +++ b/quickwit/quickwit-cluster/src/metrics.rs @@ -51,36 +51,43 @@ impl Default for ClusterMetrics { "live_nodes", "The number of live nodes observed locally.", "cluster", + &[], ), ready_nodes: new_gauge( "ready_nodes", "The number of ready nodes observed locally.", "cluster", + &[], ), zombie_nodes: new_gauge( "zombie_nodes", "The number of zombie nodes observed locally.", "cluster", + &[], ), dead_nodes: new_gauge( "dead_nodes", "The number of dead nodes observed locally.", "cluster", + &[], ), cluster_state_size_bytes: new_gauge( "cluster_state_size_bytes", "The size of the cluster state in bytes.", "cluster", + &[], ), node_state_keys: new_gauge( "node_state_keys", "The number of keys in the node state.", "cluster", + &[], ), node_state_size_bytes: new_gauge( "node_state_size_bytes", "The size of the node state in bytes.", "cluster", + &[], ), gossip_recv_messages_total: new_counter( "gossip_recv_messages_total", diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 247b01746c1..dc536974dbe 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; +use once_cell::sync::Lazy; use prometheus::{Encoder, HistogramOpts, Opts, TextEncoder}; pub use prometheus::{ Histogram, HistogramTimer, HistogramVec as PrometheusHistogramVec, IntCounter, @@ -91,10 +92,20 @@ pub fn new_counter_vec( IntCounterVec { underlying } } -pub fn new_gauge(name: &str, help: &str, subsystem: &str) -> IntGauge { +pub fn new_gauge( + name: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], +) -> IntGauge { + let owned_const_labels: HashMap = const_labels + .iter() + .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) + .collect(); let gauge_opts = Opts::new(name, help) .namespace("quickwit") - .subsystem(subsystem); + .subsystem(subsystem) + .const_labels(owned_const_labels); let gauge = IntGauge::with_opts(gauge_opts).expect("failed to create gauge"); prometheus::register(Box::new(gauge.clone())).expect("failed to register gauge"); gauge @@ -157,18 +168,32 @@ pub fn new_histogram_vec( HistogramVec { underlying } } -pub struct GaugeGuard(&'static IntGauge); +pub struct GaugeGuard { + gauge: &'static IntGauge, + delta: i64, +} impl GaugeGuard { - pub fn from_gauge(gauge: &'static IntGauge) -> Self { - gauge.inc(); - Self(gauge) + pub fn from_gauge(gauge: &'static IntGauge, delta: i64) -> Self { + gauge.add(delta); + + Self { gauge, delta } + } + + pub fn add(&mut self, delta: i64) { + self.delta += delta; + self.gauge.add(self.delta); + } + + pub fn sub(&mut self, delta: i64) { + self.delta -= delta; + self.gauge.sub(delta); } } impl Drop for GaugeGuard { fn drop(&mut self) { - self.0.dec(); + self.gauge.sub(self.delta) } } @@ -179,3 +204,145 @@ pub fn metrics_text_payload() -> String { let _ = encoder.encode(&metric_families, &mut buffer); // TODO avoid ignoring the error. String::from_utf8_lossy(&buffer).to_string() } + +#[derive(Clone)] +pub struct MemoryMetrics { + pub active_bytes: IntGauge, + pub allocated_bytes: IntGauge, + pub resident_bytes: IntGauge, + pub in_flight_data: InFlightDataGauges, +} + +impl Default for MemoryMetrics { + fn default() -> Self { + Self { + active_bytes: new_gauge( + "active_bytes", + "Total number of bytes in active pages allocated by the application, as reported \ + by jemalloc `stats.active`.", + "memory", + &[], + ), + allocated_bytes: new_gauge( + "allocated_bytes", + "Total number of bytes allocated by the application, as reported by jemalloc \ + `stats.allocated`.", + "memory", + &[], + ), + resident_bytes: new_gauge( + "resident_bytes", + " Total number of bytes in physically resident data pages mapped by the \ + allocator, as reported by jemalloc `stats.resident`.", + "memory", + &[], + ), + in_flight_data: InFlightDataGauges::default(), + } + } +} + +#[derive(Clone)] +pub struct InFlightDataGauges { + pub doc_processor_mailbox: IntGauge, + pub indexer_mailbox: IntGauge, + pub ingest_router: IntGauge, + pub rest_server: IntGauge, + pub sources: InFlightDataSourceGauges, +} + +const IN_FLIGHT_DATA_GAUGES_HELP: &str = "Amount of data in-flight in various buffers in bytes."; + +impl Default for InFlightDataGauges { + fn default() -> Self { + Self { + doc_processor_mailbox: new_gauge( + "in_flight_data_bytes", + IN_FLIGHT_DATA_GAUGES_HELP, + "memory", + &[("component", "doc_processor_mailbox")], + ), + indexer_mailbox: new_gauge( + "in_flight_data_bytes", + IN_FLIGHT_DATA_GAUGES_HELP, + "memory", + &[("component", "indexer_mailbox")], + ), + ingest_router: new_gauge( + "in_flight_data_bytes", + IN_FLIGHT_DATA_GAUGES_HELP, + "memory", + &[("component", "ingest_router")], + ), + rest_server: new_gauge( + "in_flight_data_bytes", + IN_FLIGHT_DATA_GAUGES_HELP, + "memory", + &[("component", "rest_server")], + ), + sources: InFlightDataSourceGauges::default(), + } + } +} + +#[derive(Clone)] +pub struct InFlightDataSourceGauges { + pub file: IntGauge, + pub ingest: IntGauge, + pub kafka: IntGauge, + pub kinesis: IntGauge, + pub pubsub: IntGauge, + pub pulsar: IntGauge, + pub other: IntGauge, +} + +impl Default for InFlightDataSourceGauges { + fn default() -> Self { + Self { + file: new_gauge( + "in_flight_data_bytes", + IN_FLIGHT_DATA_GAUGES_HELP, + "memory", + &[("component", "file_source")], + ), + ingest: new_gauge( + "in_flight_data_bytes", + IN_FLIGHT_DATA_GAUGES_HELP, + "memory", + &[("component", "ingest_source")], + ), + kafka: new_gauge( + "in_flight_data_bytes", + IN_FLIGHT_DATA_GAUGES_HELP, + "memory", + &[("component", "kafka_source")], + ), + kinesis: new_gauge( + "in_flight_data_bytes", + IN_FLIGHT_DATA_GAUGES_HELP, + "memory", + &[("component", "kinesis_source")], + ), + pubsub: new_gauge( + "in_flight_data_bytes", + IN_FLIGHT_DATA_GAUGES_HELP, + "memory", + &[("component", "pubsub_source")], + ), + pulsar: new_gauge( + "in_flight_data_bytes", + IN_FLIGHT_DATA_GAUGES_HELP, + "memory", + &[("component", "pulsar")], + ), + other: new_gauge( + "in_flight_data_bytes", + IN_FLIGHT_DATA_GAUGES_HELP, + "memory", + &[("component", "other")], + ), + } + } +} + +pub static MEMORY_METRICS: Lazy = Lazy::new(MemoryMetrics::default); diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index e3a5a587add..ca003263686 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -53,10 +53,10 @@ use serde::de::DeserializeOwned; use serde::Serialize; use serde_json::Value as JsonValue; pub use source_config::{ - load_source_config_from_user_config, FileSourceParams, GcpPubSubSourceParams, - KafkaSourceParams, KinesisSourceParams, PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, - SourceConfig, SourceInputFormat, SourceParams, TransformConfig, VecSourceParams, - VoidSourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID, + load_source_config_from_user_config, FileSourceParams, KafkaSourceParams, KinesisSourceParams, + PubSubSourceParams, PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig, + SourceInputFormat, SourceParams, TransformConfig, VecSourceParams, VoidSourceParams, + CLI_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID, }; use tracing::warn; @@ -95,7 +95,7 @@ pub use crate::storage_config::{ SourceInputFormat, SourceParams, FileSourceParams, - GcpPubSubSourceParams, + PubSubSourceParams, KafkaSourceParams, KinesisSourceParams, PulsarSourceParams, diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index 5b83dbd66e6..e077f112b36 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -93,12 +93,12 @@ impl SourceConfig { pub fn source_type(&self) -> SourceType { match self.source_params { SourceParams::File(_) => SourceType::File, - SourceParams::GcpPubSub(_) => SourceType::GcpPubsub, SourceParams::Ingest => SourceType::IngestV2, SourceParams::IngestApi => SourceType::IngestV1, SourceParams::IngestCli => SourceType::Cli, SourceParams::Kafka(_) => SourceType::Kafka, SourceParams::Kinesis(_) => SourceType::Kinesis, + SourceParams::PubSub(_) => SourceType::PubSub, SourceParams::Pulsar(_) => SourceType::Pulsar, SourceParams::Vec(_) => SourceType::Vec, SourceParams::Void(_) => SourceType::Void, @@ -109,7 +109,7 @@ impl SourceConfig { pub fn params(&self) -> JsonValue { match &self.source_params { SourceParams::File(params) => serde_json::to_value(params), - SourceParams::GcpPubSub(params) => serde_json::to_value(params), + SourceParams::PubSub(params) => serde_json::to_value(params), SourceParams::Ingest => serde_json::to_value(()), SourceParams::IngestApi => serde_json::to_value(()), SourceParams::IngestCli => serde_json::to_value(()), @@ -229,7 +229,6 @@ impl FromStr for SourceInputFormat { #[serde(tag = "source_type", content = "params", rename_all = "snake_case")] pub enum SourceParams { File(FileSourceParams), - GcpPubSub(GcpPubSubSourceParams), Ingest, #[serde(rename = "ingest-api")] IngestApi, @@ -237,6 +236,8 @@ pub enum SourceParams { IngestCli, Kafka(KafkaSourceParams), Kinesis(KinesisSourceParams), + #[serde(rename = "pubsub")] + PubSub(PubSubSourceParams), Pulsar(PulsarSourceParams), Vec(VecSourceParams), Void(VoidSourceParams), @@ -316,7 +317,7 @@ pub struct KafkaSourceParams { #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] #[serde(deny_unknown_fields)] -pub struct GcpPubSubSourceParams { +pub struct PubSubSourceParams { /// Name of the subscription that the source consumes. pub subscription: String, /// When backfill mode is enabled, the source exits after reaching the end of the topic. diff --git a/quickwit/quickwit-config/src/source_config/serialize.rs b/quickwit/quickwit-config/src/source_config/serialize.rs index f3902f6e636..3df558eb59d 100644 --- a/quickwit/quickwit-config/src/source_config/serialize.rs +++ b/quickwit/quickwit-config/src/source_config/serialize.rs @@ -92,7 +92,7 @@ impl SourceConfigForSerialization { SourceParams::Kafka(_) | SourceParams::Kinesis(_) | SourceParams::Pulsar(_) => { // TODO consider any validation opportunity } - SourceParams::GcpPubSub(_) + SourceParams::PubSub(_) | SourceParams::Ingest | SourceParams::IngestApi | SourceParams::IngestCli @@ -100,7 +100,7 @@ impl SourceConfigForSerialization { | SourceParams::Void(_) => {} } match &self.source_params { - SourceParams::GcpPubSub(_) | SourceParams::Kafka(_) => {} + SourceParams::PubSub(_) | SourceParams::Kafka(_) => {} _ => { if self.desired_num_pipelines > 1 || self.max_num_pipelines_per_indexer > 1 { bail!("Quickwit currently supports multiple pipelines only for GCP PubSub or Kafka sources. open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types"); diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index ca9bef08160..15dd7a282a3 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -162,7 +162,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { } SourceType::Kafka | SourceType::Kinesis - | SourceType::GcpPubsub + | SourceType::PubSub | SourceType::Nats | SourceType::Pulsar => { sources.push(SourceToSchedule { diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 9df96a55a08..5e4c424a8cf 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -506,16 +506,17 @@ impl Handler for DocProcessor { return Ok(()); } let mut processed_docs: Vec = Vec::with_capacity(raw_doc_batch.docs.len()); + for raw_doc in raw_doc_batch.docs { let _protected_zone_guard = ctx.protect_zone(); self.process_raw_doc(raw_doc, &mut processed_docs); ctx.record_progress(); } - let processed_doc_batch = ProcessedDocBatch { - docs: processed_docs, - checkpoint_delta: raw_doc_batch.checkpoint_delta, - force_commit: raw_doc_batch.force_commit, - }; + let processed_doc_batch = ProcessedDocBatch::new( + processed_docs, + raw_doc_batch.checkpoint_delta, + raw_doc_batch.force_commit, + ); ctx.send_message(&self.indexer_mailbox, processed_doc_batch) .await?; Ok(()) @@ -556,7 +557,6 @@ impl Handler for DocProcessor { mod tests { use std::sync::Arc; - use bytes::Bytes; use prost::Message; use quickwit_actors::Universe; use quickwit_common::uri::Uri; @@ -595,10 +595,10 @@ mod tests { doc_processor_mailbox .send_message(RawDocBatch::for_test( &[ - r#"{"body": "happy", "response_date": "2021-12-19T16:39:57+00:00", "response_time": 12, "response_payload": "YWJj"}"#, // missing timestamp - r#"{"body": "happy", "timestamp": 1628837062, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#, // ok - r#"{"body": "happy2", "timestamp": 1628837062, "response_date": "2021-12-19T16:40:57+00:00", "response_time": 13, "response_payload": "YWJj"}"#, // ok - "{", // invalid json + br#"{"body": "happy", "response_date": "2021-12-19T16:39:57+00:00", "response_time": 12, "response_payload": "YWJj"}"#, // missing timestamp + br#"{"body": "happy", "timestamp": 1628837062, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#, // ok + br#"{"body": "happy2", "timestamp": 1628837062, "response_date": "2021-12-19T16:40:57+00:00", "response_time": 13, "response_payload": "YWJj"}"#, // ok + b"{", // invalid json ], 0..4, )) @@ -679,24 +679,15 @@ mod tests { let (doc_processor_mailbox, doc_processor_handle) = universe.spawn_builder().spawn(doc_processor); doc_processor_mailbox - .send_message(RawDocBatch { - docs: vec![ - Bytes::from_static( - br#"{"tenant": "tenant_1", "body": "first doc for tenant 1"}"#, - ), - Bytes::from_static( - br#"{"tenant": "tenant_2", "body": "first doc for tenant 2"}"#, - ), - Bytes::from_static( - br#"{"tenant": "tenant_1", "body": "second doc for tenant 1"}"#, - ), - Bytes::from_static( - br#"{"tenant": "tenant_2", "body": "second doc for tenant 2"}"#, - ), + .send_message(RawDocBatch::for_test( + &[ + br#"{"tenant": "tenant_1", "body": "first doc for tenant 1"}"#, + br#"{"tenant": "tenant_2", "body": "first doc for tenant 2"}"#, + br#"{"tenant": "tenant_1", "body": "second doc for tenant 1"}"#, + br#"{"tenant": "tenant_2", "body": "second doc for tenant 2"}"#, ], - checkpoint_delta: SourceCheckpointDelta::from_range(0..2), - force_commit: false, - }) + 0..2, + )) .await?; universe .send_exit_with_success(&doc_processor_mailbox) @@ -776,7 +767,7 @@ mod tests { doc_processor_mailbox .send_message(RawDocBatch::for_test( &[ - r#"{"body": "happy", "timestamp": 1628837062, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#, + br#"{"body": "happy", "timestamp": 1628837062, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#, ], 0..1, )) @@ -839,13 +830,7 @@ mod tests { }]; let request = ExportTraceServiceRequest { resource_spans }; let raw_doc_json = serde_json::to_vec(&request).unwrap(); - let raw_doc = Bytes::from(raw_doc_json); - - let raw_doc_batch = RawDocBatch { - docs: vec![raw_doc], - checkpoint_delta: SourceCheckpointDelta::from_range(0..2), - force_commit: false, - }; + let raw_doc_batch = RawDocBatch::for_test(&[&raw_doc_json], 0..2); doc_processor_mailbox .send_message(raw_doc_batch) .await @@ -921,13 +906,7 @@ mod tests { let mut raw_doc_buffer = Vec::new(); request.encode(&mut raw_doc_buffer).unwrap(); - let raw_doc = Bytes::from(raw_doc_buffer); - - let raw_doc_batch = RawDocBatch { - docs: vec![raw_doc], - checkpoint_delta: SourceCheckpointDelta::from_range(0..2), - force_commit: false, - }; + let raw_doc_batch = RawDocBatch::for_test(&[&raw_doc_buffer], 0..2); doc_processor_mailbox .send_message(raw_doc_batch) .await @@ -988,10 +967,10 @@ mod tests_vrl { doc_processor_mailbox .send_message(RawDocBatch::for_test( &[ - r#"{"body": "happy", "response_date": "2021-12-19T16:39:57+00:00", "response_time": 12, "response_payload": "YWJj"}"#, // missing timestamp - r#"{"body": "happy using VRL", "timestamp": 1628837062, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#, // ok - r#"{"body": "happy2", "timestamp": 1628837062, "response_date": "2021-12-19T16:40:57+00:00", "response_time": 13, "response_payload": "YWJj"}"#, // ok - "{", // invalid json + br#"{"body": "happy", "response_date": "2021-12-19T16:39:57+00:00", "response_time": 12, "response_payload": "YWJj"}"#, // missing timestamp + br#"{"body": "happy using VRL", "timestamp": 1628837062, "response_date": "2021-12-19T16:39:59+00:00", "response_time": 2, "response_payload": "YWJj"}"#, // ok + br#"{"body": "happy2", "timestamp": 1628837062, "response_date": "2021-12-19T16:40:57+00:00", "response_time": 13, "response_payload": "YWJj"}"#, // ok + b"{", // invalid json ], 0..4, )) @@ -1079,9 +1058,9 @@ mod tests_vrl { .send_message(RawDocBatch::for_test( &[ // body,timestamp,response_date,response_time,response_payload - r#""happy using VRL",1628837062,"2021-12-19T16:39:59+00:00",2,"YWJj""#, - r#""happy2",1628837062,"2021-12-19T16:40:57+00:00",13,"YWJj""#, - r#""happy2",1628837062,"2021-12-19T16:40:57+00:00","invalid-response_time","YWJj""#, + br#""happy using VRL",1628837062,"2021-12-19T16:39:59+00:00",2,"YWJj""#, + br#""happy2",1628837062,"2021-12-19T16:40:57+00:00",13,"YWJj""#, + br#""happy2",1628837062,"2021-12-19T16:40:57+00:00","invalid-response_time","YWJj""#, ], 0..4, )) diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 352424e5317..c0abaf842f7 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -770,8 +770,8 @@ mod tests { ); let (indexer_mailbox, indexer_handle) = universe.spawn_builder().spawn(indexer); indexer_mailbox - .send_message(ProcessedDocBatch { - docs: vec![ + .send_message(ProcessedDocBatch::new( + vec![ ProcessedDoc { doc: doc!( body_field=>"this is a test document", @@ -791,13 +791,13 @@ mod tests { num_bytes: 30, }, ], - checkpoint_delta: SourceCheckpointDelta::from_range(4..6), - force_commit: false, - }) + SourceCheckpointDelta::from_range(4..6), + false, + )) .await?; indexer_mailbox - .send_message(ProcessedDocBatch { - docs: vec![ + .send_message(ProcessedDocBatch::new( + vec![ ProcessedDoc { doc: doc!( body_field=>"this is a test document 3", @@ -817,13 +817,13 @@ mod tests { num_bytes: 30, }, ], - checkpoint_delta: SourceCheckpointDelta::from_range(6..8), - force_commit: false, - }) + SourceCheckpointDelta::from_range(6..8), + false, + )) .await?; indexer_mailbox - .send_message(ProcessedDocBatch { - docs: vec![ProcessedDoc { + .send_message(ProcessedDocBatch::new( + vec![ProcessedDoc { doc: doc!( body_field=>"this is a test document 5", timestamp_field=>DateTime::from_timestamp_secs(1_662_529_435) @@ -832,9 +832,9 @@ mod tests { partition: 1, num_bytes: 30, }], - checkpoint_delta: SourceCheckpointDelta::from_range(8..9), - force_commit: false, - }) + SourceCheckpointDelta::from_range(8..9), + false, + )) .await?; let indexer_counters = indexer_handle.process_pending_and_observe().await.state; assert_eq!( @@ -921,11 +921,11 @@ mod tests { }; for i in 0..10_000 { indexer_mailbox - .send_message(ProcessedDocBatch { - docs: vec![make_doc(i)], - checkpoint_delta: SourceCheckpointDelta::from_range(i..i + 1), - force_commit: false, - }) + .send_message(ProcessedDocBatch::new( + vec![make_doc(i)], + SourceCheckpointDelta::from_range(i..i + 1), + false, + )) .await?; let output_messages: Vec = index_serializer_inbox.drain_for_test_typed(); @@ -986,8 +986,8 @@ mod tests { async move { let mut position = 0; while indexer_mailbox - .send_message(ProcessedDocBatch { - docs: vec![ProcessedDoc { + .send_message(ProcessedDocBatch::new( + vec![ProcessedDoc { doc: doc!( body_field=>"this is a test document", timestamp_field=>DateTime::from_timestamp_secs(1_662_529_435) @@ -996,9 +996,9 @@ mod tests { partition: 1, num_bytes: 30, }], - force_commit: false, - checkpoint_delta: SourceCheckpointDelta::from_range(position..position + 1), - }) + SourceCheckpointDelta::from_range(position..position + 1), + false, + )) .await .is_ok() { @@ -1064,8 +1064,8 @@ mod tests { ); let (indexer_mailbox, indexer_handle) = universe.spawn_builder().spawn(indexer); indexer_mailbox - .send_message(ProcessedDocBatch { - docs: vec![ProcessedDoc { + .send_message(ProcessedDocBatch::new( + vec![ProcessedDoc { doc: doc!( body_field=>"this is a test document 5", timestamp_field=>DateTime::from_timestamp_secs(1_662_529_435) @@ -1074,9 +1074,9 @@ mod tests { partition: 1, num_bytes: 30, }], - checkpoint_delta: SourceCheckpointDelta::from_range(8..9), - force_commit: false, - }) + SourceCheckpointDelta::from_range(8..9), + false, + )) .await .unwrap(); let mut indexer_counters: IndexerCounters = Default::default(); @@ -1151,8 +1151,8 @@ mod tests { ); let (indexer_mailbox, indexer_handle) = universe.spawn_builder().spawn(indexer); indexer_mailbox - .send_message(ProcessedDocBatch { - docs: vec![ProcessedDoc { + .send_message(ProcessedDocBatch::new( + vec![ProcessedDoc { doc: doc!( body_field=>"this is a test document 5", timestamp_field=> DateTime::from_timestamp_secs(1_662_529_435) @@ -1161,9 +1161,9 @@ mod tests { partition: 1, num_bytes: 30, }], - checkpoint_delta: SourceCheckpointDelta::from_range(8..9), - force_commit: false, - }) + SourceCheckpointDelta::from_range(8..9), + false, + )) .await .unwrap(); universe.send_exit_with_success(&indexer_mailbox).await?; @@ -1234,8 +1234,8 @@ mod tests { ); let (indexer_mailbox, indexer_handle) = universe.spawn_builder().spawn(indexer); indexer_mailbox - .send_message(ProcessedDocBatch { - docs: vec![ + .send_message(ProcessedDocBatch::new( + vec![ ProcessedDoc { doc: doc!( body_field=>"doc 2", @@ -1255,9 +1255,9 @@ mod tests { num_bytes: 30, }, ], - checkpoint_delta: SourceCheckpointDelta::from_range(8..9), - force_commit: false, - }) + SourceCheckpointDelta::from_range(8..9), + false, + )) .await?; let indexer_counters = indexer_handle.process_pending_and_observe().await.state; @@ -1331,16 +1331,16 @@ mod tests { for partition in 0..100 { indexer_mailbox - .send_message(ProcessedDocBatch { - docs: vec![ProcessedDoc { + .send_message(ProcessedDocBatch::new( + vec![ProcessedDoc { doc: doc!(body_field=>"doc {i}"), timestamp_opt: None, partition, num_bytes: 30, }], - checkpoint_delta: SourceCheckpointDelta::from_range(partition..partition + 1), - force_commit: false, - }) + SourceCheckpointDelta::from_range(partition..partition + 1), + false, + )) .await .unwrap(); } @@ -1409,16 +1409,16 @@ mod tests { .await .unwrap(); indexer_mailbox - .send_message(ProcessedDocBatch { - docs: vec![ProcessedDoc { + .send_message(ProcessedDocBatch::new( + vec![ProcessedDoc { doc: doc!(body_field=>"doc 1"), timestamp_opt: None, partition: 0, num_bytes: 30, }], - checkpoint_delta: SourceCheckpointDelta::from_range(0..1), - force_commit: false, - }) + SourceCheckpointDelta::from_range(0..1), + false, + )) .await .unwrap(); } @@ -1480,16 +1480,16 @@ mod tests { indexer_handle.process_pending_and_observe().await; publish_lock.kill().await; indexer_mailbox - .send_message(ProcessedDocBatch { - docs: vec![ProcessedDoc { + .send_message(ProcessedDocBatch::new( + vec![ProcessedDoc { doc: doc!(body_field=>"doc 1"), timestamp_opt: None, partition: 0, num_bytes: 30, }], - checkpoint_delta: SourceCheckpointDelta::from_range(0..1), - force_commit: false, - }) + SourceCheckpointDelta::from_range(0..1), + false, + )) .await .unwrap(); universe @@ -1536,16 +1536,16 @@ mod tests { ); let (indexer_mailbox, indexer_handle) = universe.spawn_builder().spawn(indexer); indexer_mailbox - .send_message(ProcessedDocBatch { - docs: vec![ProcessedDoc { + .send_message(ProcessedDocBatch::new( + vec![ProcessedDoc { doc: doc!(body_field=>"doc 1"), timestamp_opt: None, partition: 0, num_bytes: 30, }], - checkpoint_delta: SourceCheckpointDelta::from_range(0..1), - force_commit: true, - }) + SourceCheckpointDelta::from_range(0..1), + true, + )) .await .unwrap(); universe @@ -1604,18 +1604,18 @@ mod tests { ); let (indexer_mailbox, indexer_handle) = universe.spawn_builder().spawn(indexer); indexer_mailbox - .send_message(ProcessedDocBatch { - docs: Vec::new(), - checkpoint_delta: SourceCheckpointDelta::from_range(4..6), - force_commit: false, - }) + .send_message(ProcessedDocBatch::new( + Vec::new(), + SourceCheckpointDelta::from_range(4..6), + false, + )) .await?; indexer_mailbox - .send_message(ProcessedDocBatch { - docs: Vec::new(), - checkpoint_delta: SourceCheckpointDelta::from_range(6..8), - force_commit: false, - }) + .send_message(ProcessedDocBatch::new( + Vec::new(), + SourceCheckpointDelta::from_range(6..8), + false, + )) .await?; universe .sleep(commit_timeout + Duration::from_secs(2)) diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index 9dc6393609e..56eb300f66d 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -70,16 +70,19 @@ impl Default for IndexerMetrics { "ongoing_merge_operations", "Number of ongoing merge operations", "indexing", + &[], ), pending_merge_operations: new_gauge( "pending_merge_operations", "Number of pending merge operations", "indexing", + &[], ), pending_merge_bytes: new_gauge( "pending_merge_bytes", "Number of pending merge bytes", "indexing", + &[], ), } } diff --git a/quickwit/quickwit-indexing/src/models/processed_doc.rs b/quickwit/quickwit-indexing/src/models/processed_doc.rs index c73259837b8..1ef771e6bed 100644 --- a/quickwit/quickwit-indexing/src/models/processed_doc.rs +++ b/quickwit/quickwit-indexing/src/models/processed_doc.rs @@ -19,6 +19,7 @@ use std::fmt; +use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; use quickwit_metastore::checkpoint::SourceCheckpointDelta; use tantivy::{DateTime, TantivyDocument}; @@ -40,9 +41,30 @@ impl fmt::Debug for ProcessedDoc { } pub struct ProcessedDocBatch { + // Do not directly append documents to this vector; otherwise, in-flight metrics will be + // incorrect. pub docs: Vec, pub checkpoint_delta: SourceCheckpointDelta, pub force_commit: bool, + _gauge_guard: GaugeGuard, +} + +impl ProcessedDocBatch { + pub fn new( + docs: Vec, + checkpoint_delta: SourceCheckpointDelta, + force_commit: bool, + ) -> Self { + let delta = docs.iter().map(|doc| doc.num_bytes as i64).sum::(); + let _gauge_guard = + GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight_data.indexer_mailbox, delta); + Self { + docs, + checkpoint_delta, + force_commit, + _gauge_guard, + } + } } impl fmt::Debug for ProcessedDocBatch { diff --git a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs index f890488bbec..19f7826dbfb 100644 --- a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs +++ b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs @@ -20,13 +20,16 @@ use std::fmt; use bytes::Bytes; +use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; use quickwit_metastore::checkpoint::SourceCheckpointDelta; -#[derive(Default)] pub struct RawDocBatch { + // Do not directly append documents to this vector; otherwise, in-flight metrics will be + // incorrect. pub docs: Vec, pub checkpoint_delta: SourceCheckpointDelta, pub force_commit: bool, + _gauge_guard: GaugeGuard, } impl RawDocBatch { @@ -35,38 +38,23 @@ impl RawDocBatch { checkpoint_delta: SourceCheckpointDelta, force_commit: bool, ) -> Self { + let delta = docs.iter().map(|doc| doc.len() as i64).sum::(); + let _gauge_guard = + GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight_data.doc_processor_mailbox, delta); + Self { docs, checkpoint_delta, force_commit, + _gauge_guard, } } - pub fn with_capacity(capacity: usize) -> Self { - Self { - docs: Vec::with_capacity(capacity), - checkpoint_delta: SourceCheckpointDelta::default(), - force_commit: false, - } - } - - pub fn num_docs(&self) -> usize { - self.docs.len() - } - - #[cfg(any(test, feature = "testsuite"))] - pub fn for_test(docs: &[&str], range: std::ops::Range) -> Self { - let docs = docs - .iter() - .map(|doc| Bytes::from(doc.to_string())) - .collect(); + #[cfg(test)] + pub fn for_test(docs: &[&[u8]], range: std::ops::Range) -> Self { + let docs = docs.iter().map(|doc| Bytes::from(doc.to_vec())).collect(); let checkpoint_delta = SourceCheckpointDelta::from_range(range); - - Self { - docs, - checkpoint_delta, - force_commit: false, - } + Self::new(docs, checkpoint_delta, false) } } @@ -74,9 +62,23 @@ impl fmt::Debug for RawDocBatch { fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result { formatter .debug_struct("RawDocBatch") - .field("num_docs", &self.num_docs()) + .field("num_docs", &self.docs.len()) .field("checkpoint_delta", &self.checkpoint_delta) .field("force_commit", &self.force_commit) .finish() } } + +impl Default for RawDocBatch { + fn default() -> Self { + let _gauge_guard = + GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight_data.doc_processor_mailbox, 0); + + Self { + docs: Vec::new(), + checkpoint_delta: SourceCheckpointDelta::default(), + force_commit: false, + _gauge_guard, + } + } +} diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index ba61e60cb6f..2c25eec96f8 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -31,13 +31,14 @@ use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_common::uri::Uri; use quickwit_config::FileSourceParams; use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; +use quickwit_proto::metastore::SourceType; use quickwit_proto::types::Position; use serde::Serialize; use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; use tracing::info; +use super::BatchBuilder; use crate::actors::DocProcessor; -use crate::models::RawDocBatch; use crate::source::{Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory}; /// Number of bytes after which a new batch is cut. @@ -73,7 +74,8 @@ impl Source for FileSource { // We collect batches of documents before sending them to the indexer. let limit_num_bytes = self.counters.previous_offset + BATCH_NUM_BYTES_LIMIT; let mut reached_eof = false; - let mut doc_batch = RawDocBatch::default(); + let mut batch_builder = BatchBuilder::new(SourceType::File); + while self.counters.current_offset < limit_num_bytes { let mut doc_line = String::new(); // guard the zone in case of slow read, such as reading from someone @@ -86,18 +88,18 @@ impl Source for FileSource { reached_eof = true; break; } - doc_batch.docs.push(Bytes::from(doc_line)); + batch_builder.add_doc(Bytes::from(doc_line)); self.counters.current_offset += num_bytes as u64; self.counters.num_lines_processed += 1; } - if !doc_batch.docs.is_empty() { + if !batch_builder.docs.is_empty() { if let Some(filepath) = &self.params.filepath { let filepath_str = filepath .to_str() .context("path is invalid utf-8")? .to_string(); let partition_id = PartitionId::from(filepath_str); - doc_batch + batch_builder .checkpoint_delta .record_partition_delta( partition_id, @@ -107,7 +109,8 @@ impl Source for FileSource { .unwrap(); } self.counters.previous_offset = self.counters.current_offset; - ctx.send_message(doc_processor_mailbox, doc_batch).await?; + ctx.send_message(doc_processor_mailbox, batch_builder.build()) + .await?; } if reached_eof { info!("EOF"); @@ -246,6 +249,7 @@ mod tests { use quickwit_proto::types::IndexUid; use super::*; + use crate::models::RawDocBatch; use crate::source::SourceActor; #[tokio::test] diff --git a/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs b/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs index 1b37fd3cb7a..70263b67739 100644 --- a/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs +++ b/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs @@ -30,8 +30,9 @@ use google_cloud_pubsub::client::{Client, ClientConfig}; use google_cloud_pubsub::subscription::Subscription; use quickwit_actors::{ActorContext, ActorExitStatus, Mailbox}; use quickwit_common::rand::append_random_suffix; -use quickwit_config::GcpPubSubSourceParams; +use quickwit_config::PubSubSourceParams; use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; +use quickwit_proto::metastore::SourceType; use quickwit_proto::types::Position; use serde_json::{json, Value as JsonValue}; use tokio::time; @@ -48,11 +49,11 @@ pub struct GcpPubSubSourceFactory; #[async_trait] impl TypedSourceFactory for GcpPubSubSourceFactory { type Source = GcpPubSubSource; - type Params = GcpPubSubSourceParams; + type Params = PubSubSourceParams; async fn typed_create_source( ctx: Arc, - params: GcpPubSubSourceParams, + params: PubSubSourceParams, _checkpoint: SourceCheckpoint, // TODO: Use checkpoint! ) -> anyhow::Result { GcpPubSubSource::try_new(ctx, params).await @@ -97,7 +98,7 @@ impl fmt::Debug for GcpPubSubSource { impl GcpPubSubSource { pub async fn try_new( ctx: Arc, - params: GcpPubSubSourceParams, + params: PubSubSourceParams, ) -> anyhow::Result { let subscription_name = params.subscription; let backfill_mode_enabled = params.enable_backfill_mode; @@ -166,18 +167,18 @@ impl Source for GcpPubSubSource { ctx: &SourceContext, ) -> Result { let now = Instant::now(); - let mut batch: BatchBuilder = BatchBuilder::default(); + let mut batch_builder = BatchBuilder::new(SourceType::PubSub); let deadline = time::sleep(EMIT_BATCHES_TIMEOUT); tokio::pin!(deadline); // TODO: ensure we ACK the message after being commit: at least once // TODO: ensure we increase_ack_deadline for the items loop { tokio::select! { - resp = self.pull_message_batch(&mut batch) => { + resp = self.pull_message_batch(&mut batch_builder) => { if let Err(err) = resp { warn!("failed to pull messages from subscription `{}`: {:?}", self.subscription_name, err); } - if batch.num_bytes >= BATCH_NUM_BYTES_LIMIT { + if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT { break; } } @@ -188,7 +189,7 @@ impl Source for GcpPubSubSource { ctx.record_progress(); } - if batch.num_bytes > 0 { + if batch_builder.num_bytes > 0 { self.state.num_consecutive_empty_batches = 0 } else { self.state.num_consecutive_empty_batches += 1 @@ -200,13 +201,13 @@ impl Source for GcpPubSubSource { ctx.send_exit_with_success(doc_processor_mailbox).await?; return Err(ActorExitStatus::Success); } - if !batch.checkpoint_delta.is_empty() { + if !batch_builder.checkpoint_delta.is_empty() { debug!( - num_bytes=%batch.num_bytes, - num_docs=%batch.docs.len(), + num_bytes=%batch_builder.num_bytes, + num_docs=%batch_builder.docs.len(), num_millis=%now.elapsed().as_millis(), "Sending doc batch to indexer."); - let message = batch.build(); + let message = batch_builder.build(); ctx.send_message(doc_processor_mailbox, message).await?; } Ok(Duration::default()) @@ -316,7 +317,7 @@ mod gcp_pubsub_emulator_tests { desired_num_pipelines: NonZeroUsize::new(1).unwrap(), max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), enabled: true, - source_params: SourceParams::GcpPubSub(GcpPubSubSourceParams { + source_params: SourceParams::PubSub(PubSubSourceParams { project_id: Some(GCP_TEST_PROJECT.to_string()), enable_backfill_mode: true, subscription: subscription.to_string(), @@ -355,7 +356,7 @@ mod gcp_pubsub_emulator_tests { let index_id = append_random_suffix("test-gcp-pubsub-source--invalid-subscription--index"); let index_uid = IndexUid::new_with_random_ulid(&index_id); let metastore = metastore_for_test(); - let SourceParams::GcpPubSub(params) = source_config.clone().source_params else { + let SourceParams::PubSub(params) = source_config.clone().source_params else { panic!( "Expected `SourceParams::GcpPubSub` source params, got {:?}", source_config.source_params diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index 66c3a2346be..551ed632914 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -38,7 +38,9 @@ use quickwit_proto::ingest::ingester::{ TruncateShardsSubrequest, }; use quickwit_proto::ingest::IngestV2Error; -use quickwit_proto::metastore::{AcquireShardsRequest, MetastoreService, MetastoreServiceClient}; +use quickwit_proto::metastore::{ + AcquireShardsRequest, MetastoreService, MetastoreServiceClient, SourceType, +}; use quickwit_proto::types::{ NodeId, PipelineUid, Position, PublishToken, ShardId, SourceId, SourceUid, }; @@ -463,7 +465,7 @@ impl Source for IngestSource { doc_processor_mailbox: &Mailbox, ctx: &SourceContext, ) -> Result { - let mut batch_builder = BatchBuilder::default(); + let mut batch_builder = BatchBuilder::new(SourceType::IngestV2); let now = time::Instant::now(); let deadline = now + EMIT_BATCHES_TIMEOUT; diff --git a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs index 8a20d16fc55..ceb22c78cdf 100644 --- a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs +++ b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs @@ -28,13 +28,13 @@ use quickwit_ingest::{ GetPartitionId, IngestApiService, SuggestTruncateRequest, }; use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; +use quickwit_proto::metastore::SourceType; use quickwit_proto::types::Position; use serde::Serialize; use serde_json::Value as JsonValue; -use super::{Source, SourceActor, SourceContext, TypedSourceFactory}; +use super::{BatchBuilder, Source, SourceActor, SourceContext, TypedSourceFactory}; use crate::actors::DocProcessor; -use crate::models::RawDocBatch; use crate::source::SourceRuntimeArgs; /// Wait time for SourceActor before pooling for new documents. @@ -168,16 +168,17 @@ impl Source for IngestApiSource { let batch_num_docs = doc_batch.num_docs(); // TODO use a timestamp (in the raw doc batch) given by at ingest time to be more accurate. - let mut raw_doc_batch = RawDocBatch::with_capacity(doc_batch.num_docs()); + let mut batch_builder = + BatchBuilder::with_capacity(doc_batch.num_docs(), SourceType::IngestV1); for doc in doc_batch.into_iter() { match doc { - DocCommand::Ingest { payload } => raw_doc_batch.docs.push(payload), - DocCommand::Commit => raw_doc_batch.force_commit = true, + DocCommand::Ingest { payload } => batch_builder.add_doc(payload), + DocCommand::Commit => batch_builder.force_commit(), } } let current_offset = first_position + batch_num_docs as u64 - 1; let partition_id = self.partition_id.clone(); - raw_doc_batch + batch_builder .checkpoint_delta .record_partition_delta( partition_id, @@ -189,8 +190,8 @@ impl Source for IngestApiSource { ) .map_err(anyhow::Error::from)?; - self.update_counters(current_offset, raw_doc_batch.docs.len() as u64); - ctx.send_message(batch_sink, raw_doc_batch).await?; + self.update_counters(current_offset, batch_builder.docs.len() as u64); + ctx.send_message(batch_sink, batch_builder.build()).await?; Ok(Duration::default()) } @@ -251,6 +252,7 @@ mod tests { use quickwit_proto::types::IndexUid; use super::*; + use crate::models::RawDocBatch; use crate::source::SourceActor; fn make_ingest_request( diff --git a/quickwit/quickwit-indexing/src/source/kafka_source.rs b/quickwit/quickwit-indexing/src/source/kafka_source.rs index d243d22c27a..ff435041b1f 100644 --- a/quickwit/quickwit-indexing/src/source/kafka_source.rs +++ b/quickwit/quickwit-indexing/src/source/kafka_source.rs @@ -31,7 +31,7 @@ use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_config::KafkaSourceParams; use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; use quickwit_metastore::IndexMetadataResponseExt; -use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService}; +use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, SourceType}; use quickwit_proto::types::{IndexUid, Position}; use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; use rdkafka::consumer::{ @@ -475,7 +475,7 @@ impl Source for KafkaSource { ctx: &SourceContext, ) -> Result { let now = Instant::now(); - let mut batch = BatchBuilder::default(); + let mut batch_builder = BatchBuilder::new(SourceType::Kafka); let deadline = time::sleep(EMIT_BATCHES_TIMEOUT); tokio::pin!(deadline); @@ -484,13 +484,13 @@ impl Source for KafkaSource { event_opt = self.events_rx.recv() => { let event = event_opt.ok_or_else(|| ActorExitStatus::from(anyhow!("consumer was dropped")))?; match event { - KafkaEvent::Message(message) => self.process_message(message, &mut batch).await?, + KafkaEvent::Message(message) => self.process_message(message, &mut batch_builder).await?, KafkaEvent::AssignPartitions { partitions, assignment_tx} => self.process_assign_partitions(ctx, &partitions, assignment_tx).await?, - KafkaEvent::RevokePartitions { ack_tx } => self.process_revoke_partitions(ctx, doc_processor_mailbox, &mut batch, ack_tx).await?, + KafkaEvent::RevokePartitions { ack_tx } => self.process_revoke_partitions(ctx, doc_processor_mailbox, &mut batch_builder, ack_tx).await?, KafkaEvent::PartitionEOF(partition) => self.process_partition_eof(partition), KafkaEvent::Error(error) => Err(ActorExitStatus::from(error))?, } - if batch.num_bytes >= BATCH_NUM_BYTES_LIMIT { + if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT { break; } } @@ -500,13 +500,14 @@ impl Source for KafkaSource { } ctx.record_progress(); } - if !batch.checkpoint_delta.is_empty() { + if !batch_builder.checkpoint_delta.is_empty() { debug!( - num_docs=%batch.docs.len(), - num_bytes=%batch.num_bytes, + num_docs=%batch_builder.docs.len(), + num_bytes=%batch_builder.num_bytes, num_millis=%now.elapsed().as_millis(), - "Sending doc batch to indexer."); - let message = batch.build(); + "sending doc batch to indexer" + ); + let message = batch_builder.build(); ctx.send_message(doc_processor_mailbox, message).await?; } if self.should_exit() { @@ -1024,7 +1025,7 @@ mod kafka_broker_tests { assert_eq!(kafka_source.state.num_messages_processed, 0); assert_eq!(kafka_source.state.num_invalid_messages, 0); - let mut batch = BatchBuilder::default(); + let mut batch_builder = BatchBuilder::new(SourceType::Kafka); let message = KafkaMessage { doc_opt: None, @@ -1033,12 +1034,12 @@ mod kafka_broker_tests { offset: 0, }; kafka_source - .process_message(message, &mut batch) + .process_message(message, &mut batch_builder) .await .unwrap(); - assert_eq!(batch.docs.len(), 0); - assert_eq!(batch.num_bytes, 0); + assert_eq!(batch_builder.docs.len(), 0); + assert_eq!(batch_builder.num_bytes, 0); assert_eq!( kafka_source.state.current_positions.get(&1).unwrap(), &Position::offset(0u64) @@ -1054,13 +1055,13 @@ mod kafka_broker_tests { offset: 1, }; kafka_source - .process_message(message, &mut batch) + .process_message(message, &mut batch_builder) .await .unwrap(); - assert_eq!(batch.docs.len(), 1); - assert_eq!(batch.docs[0], "test-doc"); - assert_eq!(batch.num_bytes, 8); + assert_eq!(batch_builder.docs.len(), 1); + assert_eq!(batch_builder.docs[0], "test-doc"); + assert_eq!(batch_builder.num_bytes, 8); assert_eq!( kafka_source.state.current_positions.get(&1).unwrap(), &Position::offset(1u64) @@ -1076,13 +1077,13 @@ mod kafka_broker_tests { offset: 42, }; kafka_source - .process_message(message, &mut batch) + .process_message(message, &mut batch_builder) .await .unwrap(); - assert_eq!(batch.docs.len(), 2); - assert_eq!(batch.docs[1], "test-doc"); - assert_eq!(batch.num_bytes, 16); + assert_eq!(batch_builder.docs.len(), 2); + assert_eq!(batch_builder.docs[1], "test-doc"); + assert_eq!(batch_builder.num_bytes, 16); assert_eq!( kafka_source.state.current_positions.get(&2).unwrap(), &Position::offset(42u64) @@ -1102,7 +1103,7 @@ mod kafka_broker_tests { Position::offset(42u64), ) .unwrap(); - assert_eq!(batch.checkpoint_delta, expected_checkpoint_delta); + assert_eq!(batch_builder.checkpoint_delta, expected_checkpoint_delta); // Message from unassigned partition let message = KafkaMessage { @@ -1112,7 +1113,7 @@ mod kafka_broker_tests { offset: 42, }; kafka_source - .process_message(message, &mut batch) + .process_message(message, &mut batch_builder) .await .unwrap_err(); } @@ -1212,20 +1213,20 @@ mod kafka_broker_tests { ActorContext::for_test(&universe, source_mailbox, observable_state_tx); let (ack_tx, ack_rx) = oneshot::channel(); - let mut batch = BatchBuilder::default(); - batch.add_doc(Bytes::from_static(b"test-doc")); + let mut batch_builder = BatchBuilder::new(SourceType::Kafka); + batch_builder.add_doc(Bytes::from_static(b"test-doc")); let publish_lock = kafka_source.publish_lock.clone(); assert!(publish_lock.is_alive()); assert_eq!(kafka_source.state.num_rebalances, 0); kafka_source - .process_revoke_partitions(&ctx, &indexer_mailbox, &mut batch, ack_tx) + .process_revoke_partitions(&ctx, &indexer_mailbox, &mut batch_builder, ack_tx) .await .unwrap(); ack_rx.await.unwrap(); - assert!(batch.docs.is_empty()); + assert!(batch_builder.docs.is_empty()); assert!(publish_lock.is_dead()); assert_eq!(kafka_source.state.num_rebalances, 1); diff --git a/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs b/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs index 6b14a4b2fb8..c542760bf25 100644 --- a/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs +++ b/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs @@ -31,7 +31,8 @@ use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_aws::get_aws_config; use quickwit_common::retry::RetryParams; use quickwit_config::{KinesisSourceParams, RegionOrEndpoint}; -use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint, SourceCheckpointDelta}; +use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; +use quickwit_proto::metastore::SourceType; use quickwit_proto::types::Position; use serde_json::{json, Value as JsonValue}; use tokio::sync::mpsc; @@ -41,11 +42,10 @@ use tracing::{info, warn}; use super::api::list_shards; use super::shard_consumer::{ShardConsumer, ShardConsumerHandle, ShardConsumerMessage}; use crate::actors::DocProcessor; -use crate::models::RawDocBatch; use crate::source::kinesis::helpers::get_kinesis_client; use crate::source::{ - Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory, BATCH_NUM_BYTES_LIMIT, - EMIT_BATCHES_TIMEOUT, + BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory, + BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT, }; type ShardId = String; @@ -212,10 +212,7 @@ impl Source for KinesisSource { indexer_mailbox: &Mailbox, ctx: &SourceContext, ) -> Result { - let mut batch_num_bytes = 0; - let mut docs = Vec::new(); - let mut checkpoint_delta = SourceCheckpointDelta::default(); - + let mut batch_builder = BatchBuilder::new(SourceType::Kinesis); let deadline = time::sleep(EMIT_BATCHES_TIMEOUT); tokio::pin!(deadline); @@ -245,16 +242,12 @@ impl Source for KinesisSource { stream_name=%self.stream_name, shard_id=%shard_id, sequence_number=%record_sequence_number, - "Record is empty." + "record is empty" ); self.state.num_invalid_records += 1; continue; } - let doc_num_bytes = record_data.len() as u64; - docs.push(Bytes::from(record_data)); - batch_num_bytes += doc_num_bytes; - self.state.num_bytes_processed += doc_num_bytes; - self.state.num_records_processed += 1; + batch_builder.add_doc(Bytes::from(record_data)); if i == num_records - 1 { let shard_consumer_state = self @@ -272,14 +265,14 @@ impl Source for KinesisSource { let current_position = Position::from(record_sequence_number); let previous_position = std::mem::replace(&mut shard_consumer_state.position, current_position.clone()); - checkpoint_delta.record_partition_delta( + batch_builder.checkpoint_delta.record_partition_delta( partition_id, previous_position, current_position, ).context("failed to record partition delta")?; } } - if batch_num_bytes >= BATCH_NUM_BYTES_LIMIT { + if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT { break; } } @@ -310,13 +303,12 @@ impl Source for KinesisSource { } } } - if !checkpoint_delta.is_empty() { - let batch = RawDocBatch { - docs, - checkpoint_delta, - force_commit: false, - }; - ctx.send_message(indexer_mailbox, batch).await?; + self.state.num_bytes_processed += batch_builder.num_bytes; + self.state.num_records_processed += batch_builder.docs.len() as u64; + + if !batch_builder.checkpoint_delta.is_empty() { + ctx.send_message(indexer_mailbox, batch_builder.build()) + .await?; } if self.state.shard_consumers.is_empty() { info!(stream_name = %self.stream_name, "reached end of stream"); @@ -369,8 +361,10 @@ pub(super) async fn get_region( #[cfg(all(test, feature = "kinesis-localstack-tests"))] mod tests { use quickwit_actors::Universe; + use quickwit_metastore::checkpoint::SourceCheckpointDelta; use super::*; + use crate::models::RawDocBatch; use crate::source::kinesis::helpers::tests::{ make_shard_id, put_records_into_shards, setup, teardown, }; diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index 754681f5c2b..7976158439e 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -90,13 +90,14 @@ use once_cell::sync::OnceCell; #[cfg(feature = "pulsar")] pub use pulsar_source::{PulsarSource, PulsarSourceFactory}; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox}; +use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; use quickwit_common::pubsub::EventBroker; use quickwit_common::runtimes::RuntimeType; use quickwit_config::{SourceConfig, SourceParams}; use quickwit_ingest::IngesterPool; use quickwit_metastore::checkpoint::{SourceCheckpoint, SourceCheckpointDelta}; use quickwit_proto::indexing::IndexingPipelineId; -use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_proto::metastore::{MetastoreServiceClient, SourceType}; use quickwit_proto::types::{IndexUid, PipelineUid, ShardId}; use quickwit_storage::StorageResolver; use serde_json::Value as JsonValue; @@ -379,13 +380,14 @@ impl Handler for SourceActor { } } +// TODO: Use `SourceType` instead of `&str``. pub fn quickwit_supported_sources() -> &'static SourceLoader { static SOURCE_LOADER: OnceCell = OnceCell::new(); SOURCE_LOADER.get_or_init(|| { let mut source_factory = SourceLoader::default(); source_factory.add_source("file", FileSourceFactory); #[cfg(feature = "gcp-pubsub")] - source_factory.add_source("gcp_pubsub", GcpPubSubSourceFactory); + source_factory.add_source("pubsub", GcpPubSubSourceFactory); source_factory.add_source("ingest-api", IngestApiSourceFactory); source_factory.add_source("ingest", IngestSourceFactory); #[cfg(feature = "kafka")] @@ -463,27 +465,57 @@ impl Handler for SourceActor { ctx: &SourceContext, ) -> Result<(), ActorExitStatus> { let SuggestTruncate(checkpoint) = suggest_truncate; - if let Err(err) = self.source.suggest_truncate(checkpoint, ctx).await { + + if let Err(error) = self.source.suggest_truncate(checkpoint, ctx).await { // Failing to process suggest truncate does not // kill the source nor the indexing pipeline, but we log the error. - error!(err=?err, "suggest-truncate-error"); + error!(%error, "failed to process suggest truncate"); } Ok(()) } } -#[derive(Debug, Default)] -pub(crate) struct BatchBuilder { +pub(super) struct BatchBuilder { + // Do not directly append documents to this vector; otherwise, in-flight metrics will be + // incorrect. Use `add_doc` instead. docs: Vec, num_bytes: u64, checkpoint_delta: SourceCheckpointDelta, force_commit: bool, + gauge_guard: GaugeGuard, } impl BatchBuilder { + pub fn new(source_type: SourceType) -> Self { + Self::with_capacity(0, source_type) + } + + pub fn with_capacity(capacity: usize, source_type: SourceType) -> Self { + let gauge = match source_type { + SourceType::File => &MEMORY_METRICS.in_flight_data.sources.file, + SourceType::IngestV2 => &MEMORY_METRICS.in_flight_data.sources.ingest, + SourceType::Kafka => &MEMORY_METRICS.in_flight_data.sources.kafka, + SourceType::Kinesis => &MEMORY_METRICS.in_flight_data.sources.kinesis, + SourceType::PubSub => &MEMORY_METRICS.in_flight_data.sources.pubsub, + SourceType::Pulsar => &MEMORY_METRICS.in_flight_data.sources.pulsar, + _ => &MEMORY_METRICS.in_flight_data.sources.other, + }; + let gauge_guard = GaugeGuard::from_gauge(gauge, 0); + + Self { + docs: Vec::with_capacity(capacity), + num_bytes: 0, + checkpoint_delta: SourceCheckpointDelta::default(), + force_commit: false, + gauge_guard, + } + } + pub fn add_doc(&mut self, doc: Bytes) { - self.num_bytes += doc.len() as u64; + let num_bytes = doc.len(); self.docs.push(doc); + self.gauge_guard.add(num_bytes as i64); + self.num_bytes += num_bytes as u64; } pub fn force_commit(&mut self) { @@ -491,18 +523,15 @@ impl BatchBuilder { } pub fn build(self) -> RawDocBatch { - RawDocBatch { - docs: self.docs, - checkpoint_delta: self.checkpoint_delta, - force_commit: self.force_commit, - } + RawDocBatch::new(self.docs, self.checkpoint_delta, self.force_commit) } #[cfg(feature = "kafka")] pub fn clear(&mut self) { self.docs.clear(); - self.num_bytes = 0; self.checkpoint_delta = SourceCheckpointDelta::default(); + self.gauge_guard.sub(self.num_bytes as i64); + self.num_bytes = 0; } } diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index eba4aa15b8b..6c2c6e1cfc5 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -34,6 +34,7 @@ use pulsar::{ use quickwit_actors::{ActorContext, ActorExitStatus, Mailbox}; use quickwit_config::{PulsarSourceAuth, PulsarSourceParams}; use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; +use quickwit_proto::metastore::SourceType; use quickwit_proto::types::{IndexUid, Position}; use serde_json::{json, Value as JsonValue}; use tokio::time; @@ -212,7 +213,7 @@ impl Source for PulsarSource { ctx: &SourceContext, ) -> Result { let now = Instant::now(); - let mut batch = BatchBuilder::default(); + let mut batch_builder = BatchBuilder::new(SourceType::Pulsar); let deadline = time::sleep(EMIT_BATCHES_TIMEOUT); tokio::pin!(deadline); @@ -226,9 +227,9 @@ impl Source for PulsarSource { .ok_or_else(|| ActorExitStatus::from(anyhow!("consumer was dropped")))? .map_err(|e| ActorExitStatus::from(anyhow!("failed to get message from consumer: {:?}", e)))?; - self.process_message(message, &mut batch).map_err(ActorExitStatus::from)?; + self.process_message(message, &mut batch_builder).map_err(ActorExitStatus::from)?; - if batch.num_bytes >= BATCH_NUM_BYTES_LIMIT { + if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT { break; } } @@ -239,16 +240,16 @@ impl Source for PulsarSource { ctx.record_progress(); } - if !batch.checkpoint_delta.is_empty() { + if !batch_builder.checkpoint_delta.is_empty() { debug!( - num_docs=%batch.docs.len(), - num_bytes=%batch.num_bytes, + num_docs=%batch_builder.docs.len(), + num_bytes=%batch_builder.num_bytes, num_millis=%now.elapsed().as_millis(), - "Sending doc batch to indexer."); - let message = batch.build(); + "sending doc batch to indexer" + ); + let message = batch_builder.build(); ctx.send_message(doc_processor_mailbox, message).await?; } - Ok(Duration::default()) } @@ -567,7 +568,7 @@ mod pulsar_broker_tests { merged_batch .checkpoint_delta .extend(batch.checkpoint_delta) - .expect("Merge batches."); + .unwrap(); } merged_batch.docs.sort(); merged_batch @@ -837,7 +838,7 @@ mod pulsar_broker_tests { .expect("Setup pulsar source"); let position = Position::Beginning; - let mut batch = BatchBuilder::default(); + let mut batch = BatchBuilder::new(SourceType::Pulsar); pulsar_source .add_doc_to_batch(&topic, position, Bytes::from_static(b""), &mut batch) .expect("Add batch should not error on empty doc."); @@ -849,7 +850,7 @@ mod pulsar_broker_tests { assert!(batch.docs.is_empty()); let position = Position::offset(1u64); // Used for testing simplicity. - let mut batch = BatchBuilder::default(); + let mut batch = BatchBuilder::new(SourceType::Pulsar); let doc = Bytes::from_static(b"some-demo-data"); pulsar_source .add_doc_to_batch(&topic, position, doc, &mut batch) @@ -866,7 +867,7 @@ mod pulsar_broker_tests { assert_eq!(batch.docs.len(), 1); let position = Position::offset(4u64); // Used for testing simplicity. - let mut batch = BatchBuilder::default(); + let mut batch = BatchBuilder::new(SourceType::Pulsar); let doc = Bytes::from_static(b"some-demo-data-2"); pulsar_source .add_doc_to_batch(&topic, position, doc, &mut batch) diff --git a/quickwit/quickwit-indexing/src/source/vec_source.rs b/quickwit/quickwit-indexing/src/source/vec_source.rs index 1426bcaf0c6..06b0b84883c 100644 --- a/quickwit/quickwit-indexing/src/source/vec_source.rs +++ b/quickwit/quickwit-indexing/src/source/vec_source.rs @@ -25,12 +25,13 @@ use async_trait::async_trait; use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_config::VecSourceParams; use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint, SourceCheckpointDelta}; +use quickwit_proto::metastore::SourceType; use quickwit_proto::types::Position; use serde_json::Value as JsonValue; use tracing::info; +use super::BatchBuilder; use crate::actors::DocProcessor; -use crate::models::RawDocBatch; use crate::source::{Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory}; pub struct VecSource { @@ -92,29 +93,32 @@ impl Source for VecSource { batch_sink: &Mailbox, ctx: &SourceContext, ) -> Result { - let mut doc_batch = RawDocBatch::default(); + let mut batch_builder = BatchBuilder::new(SourceType::Vec); - doc_batch.docs.extend( - self.params.docs[self.next_item_idx..] - .iter() - .take(self.params.batch_num_docs) - .cloned(), - ); - if doc_batch.docs.is_empty() { + for doc in self.params.docs[self.next_item_idx..] + .iter() + .take(self.params.batch_num_docs) + .cloned() + { + batch_builder.add_doc(doc); + } + if batch_builder.docs.is_empty() { info!("reached end of source"); ctx.send_exit_with_success(batch_sink).await?; return Err(ActorExitStatus::Success); } let from_item_idx = self.next_item_idx; - self.next_item_idx += doc_batch.docs.len(); + self.next_item_idx += batch_builder.docs.len(); let to_item_idx = self.next_item_idx; - doc_batch.checkpoint_delta = SourceCheckpointDelta::from_partition_delta( + + batch_builder.checkpoint_delta = SourceCheckpointDelta::from_partition_delta( self.partition.clone(), position_from_offset(from_item_idx), position_from_offset(to_item_idx), ) .unwrap(); - ctx.send_message(batch_sink, doc_batch).await?; + ctx.send_message(batch_sink, batch_builder.build()).await?; + Ok(Duration::default()) } @@ -142,6 +146,7 @@ mod tests { use serde_json::json; use super::*; + use crate::models::RawDocBatch; use crate::source::SourceActor; #[tokio::test] diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index 14708b5b733..1acd5b83f2b 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -67,11 +67,13 @@ impl Default for IngestV2Metrics { "wal_disk_usage_bytes", "WAL disk usage in bytes.", "ingest", + &[], ), wal_memory_usage_bytes: new_gauge( "wal_memory_usage_bytes", "WAL memory usage in bytes.", "ingest", + &[], ), } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 254f2bfa512..390abd96bae 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -25,6 +25,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::{Future, StreamExt}; +use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; use quickwit_common::pubsub::{EventBroker, EventSubscriber}; use quickwit_proto::control_plane::{ ControlPlaneService, ControlPlaneServiceClient, GetOrCreateOpenShardsRequest, @@ -460,10 +461,16 @@ impl IngestRouterService for IngestRouter { &mut self, ingest_request: IngestRequestV2, ) -> IngestV2Result { + let request_size_bytes = ingest_request.num_bytes(); + + let _gauge_guard = GaugeGuard::from_gauge( + &MEMORY_METRICS.in_flight_data.ingest_router, + request_size_bytes as i64, + ); let _permit = self .ingest_semaphore .clone() - .try_acquire_many_owned(ingest_request.num_bytes() as u32) + .try_acquire_many_owned(request_size_bytes as u32) .map_err(|_| IngestV2Error::TooManyRequests)?; self.ingest_timeout(ingest_request, INGEST_REQUEST_TIMEOUT) diff --git a/quickwit/quickwit-ingest/src/metrics.rs b/quickwit/quickwit-ingest/src/metrics.rs index 3f088ae2e79..4df779a37cb 100644 --- a/quickwit/quickwit-ingest/src/metrics.rs +++ b/quickwit/quickwit-ingest/src/metrics.rs @@ -51,7 +51,12 @@ impl Default for IngestMetrics { "Total number of docs replicated.", "ingest", ), - queue_count: new_gauge("queue_count", "Number of queues currently active", "ingest"), + queue_count: new_gauge( + "queue_count", + "Number of queues currently active", + "ingest", + &[], + ), } } } diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 0067284eb2d..b70636852b6 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -28,12 +28,16 @@ enum SourceType { SOURCE_TYPE_UNSPECIFIED = 0; SOURCE_TYPE_CLI = 1; SOURCE_TYPE_FILE = 2; - SOURCE_TYPE_GCP_PUBSUB = 3; SOURCE_TYPE_INGEST_V1 = 4; SOURCE_TYPE_INGEST_V2 = 5; + // Apache Kafka SOURCE_TYPE_KAFKA = 6; + // Amazon Kinesis SOURCE_TYPE_KINESIS = 7; SOURCE_TYPE_NATS = 8; + // Google Cloud Pub/Sub + SOURCE_TYPE_PUB_SUB = 3; + // Apache Pulsar SOURCE_TYPE_PULSAR = 9; SOURCE_TYPE_VEC = 10; SOURCE_TYPE_VOID = 11; diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 3540b8c0292..95a154dc3c7 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -442,12 +442,16 @@ pub enum SourceType { Unspecified = 0, Cli = 1, File = 2, - GcpPubsub = 3, IngestV1 = 4, IngestV2 = 5, + /// Apache Kafka Kafka = 6, + /// Amazon Kinesis Kinesis = 7, Nats = 8, + /// Google Cloud Pub/Sub + PubSub = 3, + /// Apache Pulsar Pulsar = 9, Vec = 10, Void = 11, @@ -462,12 +466,12 @@ impl SourceType { SourceType::Unspecified => "SOURCE_TYPE_UNSPECIFIED", SourceType::Cli => "SOURCE_TYPE_CLI", SourceType::File => "SOURCE_TYPE_FILE", - SourceType::GcpPubsub => "SOURCE_TYPE_GCP_PUBSUB", SourceType::IngestV1 => "SOURCE_TYPE_INGEST_V1", SourceType::IngestV2 => "SOURCE_TYPE_INGEST_V2", SourceType::Kafka => "SOURCE_TYPE_KAFKA", SourceType::Kinesis => "SOURCE_TYPE_KINESIS", SourceType::Nats => "SOURCE_TYPE_NATS", + SourceType::PubSub => "SOURCE_TYPE_PUB_SUB", SourceType::Pulsar => "SOURCE_TYPE_PULSAR", SourceType::Vec => "SOURCE_TYPE_VEC", SourceType::Void => "SOURCE_TYPE_VOID", @@ -479,12 +483,12 @@ impl SourceType { "SOURCE_TYPE_UNSPECIFIED" => Some(Self::Unspecified), "SOURCE_TYPE_CLI" => Some(Self::Cli), "SOURCE_TYPE_FILE" => Some(Self::File), - "SOURCE_TYPE_GCP_PUBSUB" => Some(Self::GcpPubsub), "SOURCE_TYPE_INGEST_V1" => Some(Self::IngestV1), "SOURCE_TYPE_INGEST_V2" => Some(Self::IngestV2), "SOURCE_TYPE_KAFKA" => Some(Self::Kafka), "SOURCE_TYPE_KINESIS" => Some(Self::Kinesis), "SOURCE_TYPE_NATS" => Some(Self::Nats), + "SOURCE_TYPE_PUB_SUB" => Some(Self::PubSub), "SOURCE_TYPE_PULSAR" => Some(Self::Pulsar), "SOURCE_TYPE_VEC" => Some(Self::Vec), "SOURCE_TYPE_VOID" => Some(Self::Void), diff --git a/quickwit/quickwit-proto/src/metastore/mod.rs b/quickwit/quickwit-proto/src/metastore/mod.rs index 16f9a420c76..1c0ee751c2c 100644 --- a/quickwit/quickwit-proto/src/metastore/mod.rs +++ b/quickwit/quickwit-proto/src/metastore/mod.rs @@ -212,12 +212,12 @@ impl SourceType { match self { SourceType::Cli => "ingest-cli", SourceType::File => "file", - SourceType::GcpPubsub => "gcp_pubsub", SourceType::IngestV1 => "ingest-api", SourceType::IngestV2 => "ingest", SourceType::Kafka => "kafka", SourceType::Kinesis => "kinesis", SourceType::Nats => "nats", + SourceType::PubSub => "pubsub", SourceType::Pulsar => "pulsar", SourceType::Unspecified => "unspecified", SourceType::Vec => "vec", diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index 5a4a677f2b9..b1d89abff57 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -36,18 +36,19 @@ impl Default for SearchMetrics { leaf_searches_splits_total: new_counter( "leaf_searches_splits_total", "Number of leaf searches (count of splits) started.", - "quickwit_search", + "search", ), leaf_search_split_duration_secs: new_histogram( "leaf_search_split_duration_secs", "Number of seconds required to run a leaf search over a single split. The timer \ starts after the semaphore is obtained.", - "quickwit_search", + "search", ), active_search_threads_count: new_gauge( "active_search_threads_count", "Number of threads in use in the CPU thread pool", - "quickwit_search", + "search", + &[], ), } } diff --git a/quickwit/quickwit-search/src/thread_pool.rs b/quickwit/quickwit-search/src/thread_pool.rs index edf9f913c6c..2ff897344cf 100644 --- a/quickwit/quickwit-search/src/thread_pool.rs +++ b/quickwit/quickwit-search/src/thread_pool.rs @@ -73,7 +73,7 @@ where search_thread_pool().spawn(move || { let _guard = span.enter(); let _active_thread_guard = - GaugeGuard::from_gauge(&crate::SEARCH_METRICS.active_search_threads_count); + GaugeGuard::from_gauge(&crate::SEARCH_METRICS.active_search_threads_count, 1); if tx.is_closed() { return; } diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs index c0be92cb214..74208e4ede2 100644 --- a/quickwit/quickwit-serve/src/decompression.rs +++ b/quickwit/quickwit-serve/src/decompression.rs @@ -21,6 +21,7 @@ use std::io::Read; use bytes::Bytes; use flate2::read::GzDecoder; +use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; use thiserror::Error; use tokio::task; use warp::reject::Reject; @@ -78,10 +79,28 @@ pub(crate) struct UnsupportedEncoding(String); impl Reject for UnsupportedEncoding {} /// Custom filter for optional decompression -pub(crate) fn get_body_bytes() -> impl Filter + Clone { +pub(crate) fn get_body_bytes() -> impl Filter + Clone { warp::header::optional("content-encoding") .and(warp::body::bytes()) .and_then(|encoding: Option, body: Bytes| async move { - decompress_body(encoding, body).await + decompress_body(encoding, body).await.map(Body::from) }) } + +pub(crate) struct Body { + pub content: Bytes, + _gauge_guard: GaugeGuard, +} + +impl From for Body { + fn from(content: Bytes) -> Self { + let _gauge_guard = GaugeGuard::from_gauge( + &MEMORY_METRICS.in_flight_data.rest_server, + content.len() as i64, + ); + Body { + content, + _gauge_guard, + } + } +} diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs index 38a0e4efd68..3e661d8c4b2 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs @@ -20,7 +20,6 @@ use std::collections::HashMap; use std::time::Instant; -use bytes::Bytes; use hyper::StatusCode; use quickwit_config::enable_ingest_v2; use quickwit_ingest::{ @@ -36,7 +35,7 @@ use crate::elasticsearch_api::make_elastic_api_response; use crate::elasticsearch_api::model::{BulkAction, ElasticBulkOptions, ElasticsearchError}; use crate::format::extract_format_from_qs; use crate::ingest_api::lines; -use crate::with_arg; +use crate::{with_arg, Body}; /// POST `_elastic/_bulk` pub fn es_compat_bulk_handler( @@ -78,7 +77,7 @@ pub fn es_compat_index_bulk_handler( async fn elastic_ingest_bulk( default_index_id: Option, - body: Bytes, + body: Body, bulk_options: ElasticBulkOptions, mut ingest_service: IngestServiceClient, ingest_router: IngestRouterServiceClient, @@ -88,7 +87,7 @@ async fn elastic_ingest_bulk( } let now = Instant::now(); let mut doc_batch_builders = HashMap::new(); - let mut lines = lines(&body).enumerate(); + let mut lines = lines(&body.content).enumerate(); while let Some((line_number, line)) = lines.next() { let action = serde_json::from_slice::(line).map_err(|error| { diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs index cdd0a15f23b..d1fccfa33cf 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs @@ -19,7 +19,6 @@ use std::time::Instant; -use bytes::Bytes; use hyper::StatusCode; use quickwit_config::INGEST_V2_SOURCE_ID; use quickwit_ingest::IngestRequestV2Builder; @@ -33,6 +32,7 @@ use tracing::warn; use crate::elasticsearch_api::model::{BulkAction, ElasticBulkOptions, ElasticsearchError}; use crate::ingest_api::lines; +use crate::Body; #[derive(Debug, Default, Serialize, Deserialize)] pub(crate) struct ElasticBulkResponse { @@ -43,13 +43,13 @@ pub(crate) struct ElasticBulkResponse { pub(crate) async fn elastic_bulk_ingest_v2( default_index_id: Option, - body: Bytes, + body: Body, bulk_options: ElasticBulkOptions, mut ingest_router: IngestRouterServiceClient, ) -> Result { let now = Instant::now(); let mut ingest_request_builder = IngestRequestV2Builder::default(); - let mut lines = lines(&body).enumerate(); + let mut lines = lines(&body.content).enumerate(); while let Some((line_no, line)) = lines.next() { let action = serde_json::from_slice::(line).map_err(|error| { diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs index 1214668fb00..a9f9b0e6c66 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs @@ -32,6 +32,7 @@ use crate::elasticsearch_api::model::{ ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams, }; use crate::search_api::{extract_index_id_patterns, extract_index_id_patterns_default}; +use crate::Body; const BODY_LENGTH_LIMIT: ByteSize = ByteSize::mib(1); const CONTENT_LENGTH_LIMIT: ByteSize = ByteSize::mib(10); @@ -71,7 +72,7 @@ pub(crate) fn elasticsearch_filter( ) )] pub(crate) fn elastic_bulk_filter( -) -> impl Filter + Clone { +) -> impl Filter + Clone { warp::path!("_elastic" / "_bulk") .and(warp::post()) .and(warp::body::content_length_limit( @@ -94,7 +95,7 @@ pub(crate) fn elastic_bulk_filter( ) )] pub(crate) fn elastic_index_bulk_filter( -) -> impl Filter + Clone { +) -> impl Filter + Clone { warp::path!("_elastic" / String / "_bulk") .and(warp::post()) .and(warp::body::content_length_limit( diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs index 828b3556ba3..dab54901702 100644 --- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs @@ -35,7 +35,7 @@ use warp::{Filter, Rejection}; use crate::decompression::get_body_bytes; use crate::format::extract_format_from_qs; use crate::rest_api_response::into_rest_api_response; -use crate::{with_arg, BodyFormat}; +use crate::{with_arg, Body, BodyFormat}; #[derive(utoipa::OpenApi)] #[openapi(paths(ingest, tail_endpoint,))] @@ -75,7 +75,7 @@ pub(crate) fn ingest_api_handlers( fn ingest_filter( config: IngestApiConfig, -) -> impl Filter + Clone { +) -> impl Filter + Clone { warp::path!(String / "ingest") .and(warp::post()) .and(warp::body::content_length_limit( @@ -99,7 +99,7 @@ fn ingest_handler( fn ingest_v2_filter( config: IngestApiConfig, -) -> impl Filter + Clone { +) -> impl Filter + Clone { warp::path!(String / "ingest-v2") .and(warp::post()) .and(warp::body::content_length_limit( @@ -124,13 +124,13 @@ fn ingest_v2_handler( async fn ingest_v2( index_id: IndexId, - body: Bytes, + body: Body, ingest_options: IngestOptions, mut ingest_router: IngestRouterServiceClient, ) -> Result { let mut doc_batch_builder = DocBatchV2Builder::default(); - for doc in lines(&body) { + for doc in lines(&body.content) { doc_batch_builder.add_doc(doc); } let doc_batch_opt = doc_batch_builder.build(); @@ -206,14 +206,14 @@ fn convert_ingest_response_v2( /// Ingest documents async fn ingest( index_id: String, - body: Bytes, + body: Body, ingest_options: IngestOptions, mut ingest_service: IngestServiceClient, ) -> Result { // The size of the body should be an upper bound of the size of the batch. The removal of the // end of line character for each doc compensates the addition of the `DocCommand` header. - let mut doc_batch_builder = DocBatchBuilder::with_capacity(index_id, body.remaining()); - for line in lines(&body) { + let mut doc_batch_builder = DocBatchBuilder::with_capacity(index_id, body.content.remaining()); + for line in lines(&body.content) { doc_batch_builder.ingest_doc(line); } let ingest_req = IngestRequest { diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 5dd7878f3eb..c997b180693 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -57,6 +57,7 @@ use std::time::Duration; use anyhow::Context; use bytesize::ByteSize; +pub(crate) use decompression::Body; pub use format::BodyFormat; use futures::StreamExt; use itertools::Itertools; diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 338da9c7fd5..6589d3cf133 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -42,10 +42,11 @@ impl Default for StorageMetrics { StorageMetrics { fast_field_cache: CacheMetrics::for_component("fastfields"), fd_cache_metrics: CacheMetrics::for_component("fd"), - shortlived_cache: CacheMetrics::for_component("shortlived"), partial_request_cache: CacheMetrics::for_component("partial_request"), searcher_split_cache: CacheMetrics::for_component("searcher_split"), + shortlived_cache: CacheMetrics::for_component("shortlived"), split_footer_cache: CacheMetrics::for_component("splitfooter"), + object_storage_get_total: new_counter( "object_storage_gets_total", "Number of objects fetched.", @@ -96,14 +97,16 @@ impl CacheMetrics { "in_cache_count", "Count of {component_name} in cache", &namespace, + &[], ), in_cache_num_bytes: new_gauge( "in_cache_num_bytes", "Number of {component_name} bytes in cache", &namespace, + &[], ), hits_num_items: new_counter( - "cache_hit_total", + "cache_hits_total", "Number of {component_name} cache hits", &namespace, ), @@ -113,7 +116,7 @@ impl CacheMetrics { &namespace, ), misses_num_items: new_counter( - "cache_miss_total", + "cache_misses_total", "Number of {component_name} cache misses", &namespace, ),