diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 2310258e725..de278b96d92 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -1182,8 +1182,9 @@ dependencies = [ [[package]] name = "chitchat" -version = "0.5.0" -source = "git+https://github.com/quickwit-oss/chitchat?rev=bc29598#bc295980ac2e00f389dfa7e87cf6dc7995061206" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e5c649e5309f040f18aa0d660e63fe913d3d642e4771dd3cb66515ad75d239d" dependencies = [ "anyhow", "async-trait", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index cee159ab6b3..261add432f8 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -49,7 +49,7 @@ base64 = "0.21" byte-unit = { version = "4", default-features = false, features = ["serde", "std"] } bytes = { version = "1", features = ["serde"] } bytestring = "1.3.0" -chitchat = { git = "https://github.com/quickwit-oss/chitchat", rev = "bc29598" } +chitchat = "0.6" chrono = { version = "0.4.23", default-features = false, features = ["clock", "std"] } clap = { version = "4.4.1", features = ["env", "string"] } colored = "2.0.0" diff --git a/quickwit/quickwit-actors/src/actor_context.rs b/quickwit/quickwit-actors/src/actor_context.rs index b0201d92a9d..b4391ee7574 100644 --- a/quickwit/quickwit-actors/src/actor_context.rs +++ b/quickwit/quickwit-actors/src/actor_context.rs @@ -214,7 +214,8 @@ impl ActorContext { self.observe_enqueued.swap(true, Ordering::Relaxed) } - pub(crate) fn observe(&self, actor: &mut A) -> A::ObservableState { + /// Updates the observable state of the actor. + pub fn observe(&self, actor: &mut A) -> A::ObservableState { let obs_state = actor.observable_state(); self.inner.observe_enqueued.store(false, Ordering::Relaxed); let _ = self.observable_state_tx.send(obs_state.clone()); diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index df5ec0c3f4e..8abe200c64b 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -17,7 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Debug; use std::net::SocketAddr; use std::sync::Arc; @@ -31,7 +31,7 @@ use chitchat::{ }; use futures::Stream; use itertools::Itertools; -use quickwit_proto::indexing::IndexingTask; +use quickwit_proto::indexing::{IndexingPipelineId, IndexingTask, PipelineMetrics}; use quickwit_proto::types::NodeId; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch, Mutex, RwLock}; @@ -43,8 +43,8 @@ use tracing::{info, warn}; use crate::change::{compute_cluster_change_events, ClusterChange}; use crate::member::{ build_cluster_member, ClusterMember, NodeStateExt, ENABLED_SERVICES_KEY, - GRPC_ADVERTISE_ADDR_KEY, INDEXING_TASK_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY, - READINESS_VALUE_READY, + GRPC_ADVERTISE_ADDR_KEY, INDEXING_TASK_PREFIX, PIPELINE_METRICS_PREFIX, READINESS_KEY, + READINESS_VALUE_NOT_READY, READINESS_VALUE_READY, }; use crate::ClusterNode; @@ -70,7 +70,7 @@ pub struct Cluster { } impl Debug for Cluster { - fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { formatter .debug_struct("Cluster") .field("cluster_id", &self.cluster_id) @@ -301,9 +301,34 @@ impl Cluster { tokio::time::sleep(GOSSIP_INTERVAL * 2).await; } + /// This exposes in chitchat some metrics about the CPU usage of cooperative pipelines. + /// The metrics are exposed as follows: + /// Key: pipeline_metrics:: + /// Value: 179‰,76MB/s + pub async fn update_self_node_pipeline_metrics( + &self, + pipeline_metrics: &HashMap<&IndexingPipelineId, PipelineMetrics>, + ) { + let chitchat = self.chitchat().await; + let mut chitchat_guard = chitchat.lock().await; + let node_state = chitchat_guard.self_node_state(); + let mut current_metrics_keys: HashSet = node_state + .iter_prefix(PIPELINE_METRICS_PREFIX) + .map(|(key, _)| key.to_string()) + .collect(); + for (pipeline_id, metrics) in pipeline_metrics { + let key = format!("{PIPELINE_METRICS_PREFIX}{pipeline_id}"); + current_metrics_keys.remove(&key); + node_state.set(key, metrics.to_string()); + } + for obsolete_task_key in current_metrics_keys { + node_state.mark_for_deletion(&obsolete_task_key); + } + } + /// Updates indexing tasks in chitchat state. /// Tasks are grouped by (index_id, source_id), each group is stored in a key as follows: - /// - key: `{INDEXING_TASK_PREFIX}{INDEXING_TASK_SEPARATOR}{index_id}{INDEXING_TASK_SEPARATOR}{source_id}` + /// - key: `{INDEXING_TASK_PREFIX}{index_id}{INDEXING_TASK_SEPARATOR}{source_id}` /// - value: Number of indexing tasks in the group. /// Keys present in chitchat state but not in the given `indexing_tasks` are marked for /// deletion. @@ -313,24 +338,20 @@ impl Cluster { ) -> anyhow::Result<()> { let chitchat = self.chitchat().await; let mut chitchat_guard = chitchat.lock().await; - let mut current_indexing_tasks_keys: HashSet<_> = chitchat_guard - .self_node_state() - .key_values(|key, _| key.starts_with(INDEXING_TASK_PREFIX)) + let node_state = chitchat_guard.self_node_state(); + let mut current_indexing_tasks_keys: HashSet = node_state + .iter_prefix(INDEXING_TASK_PREFIX) .map(|(key, _)| key.to_string()) .collect(); for (indexing_task, indexing_tasks_group) in indexing_tasks.iter().group_by(|&task| task).into_iter() { - let key = format!("{INDEXING_TASK_PREFIX}:{}", indexing_task.to_string()); + let key = format!("{INDEXING_TASK_PREFIX}{indexing_task}"); current_indexing_tasks_keys.remove(&key); - chitchat_guard - .self_node_state() - .set(key, indexing_tasks_group.count().to_string()); + node_state.set(key, indexing_tasks_group.count().to_string()); } for obsolete_task_key in current_indexing_tasks_keys { - chitchat_guard - .self_node_state() - .mark_for_deletion(&obsolete_task_key); + node_state.mark_for_deletion(&obsolete_task_key); } Ok(()) } @@ -949,13 +970,11 @@ mod tests { let chitchat_handle = node.inner.read().await.chitchat_handle.chitchat(); let mut chitchat_guard = chitchat_handle.lock().await; chitchat_guard.self_node_state().set( - format!( - "{INDEXING_TASK_PREFIX}:my_good_index:my_source:11111111111111111111111111" - ), + format!("{INDEXING_TASK_PREFIX}my_good_index:my_source:11111111111111111111111111"), "2".to_string(), ); chitchat_guard.self_node_state().set( - format!("{INDEXING_TASK_PREFIX}:my_bad_index:my_source:11111111111111111111111111"), + format!("{INDEXING_TASK_PREFIX}my_bad_index:my_source:11111111111111111111111111"), "malformatted value".to_string(), ); } diff --git a/quickwit/quickwit-cluster/src/member.rs b/quickwit/quickwit-cluster/src/member.rs index 4f9c8aec83e..728546134e7 100644 --- a/quickwit/quickwit-cluster/src/member.rs +++ b/quickwit/quickwit-cluster/src/member.rs @@ -32,10 +32,11 @@ use crate::{GenerationId, QuickwitService}; // Keys used to store member's data in chitchat state. pub(crate) const GRPC_ADVERTISE_ADDR_KEY: &str = "grpc_advertise_addr"; pub(crate) const ENABLED_SERVICES_KEY: &str = "enabled_services"; +pub(crate) const PIPELINE_METRICS_PREFIX: &str = "pipeline_metrics:"; + // An indexing task key is formatted as // `{INDEXING_TASK_PREFIX}{INDEXING_TASK_SEPARATOR}{index_id}{INDEXING_TASK_SEPARATOR}{source_id}`. -pub(crate) const INDEXING_TASK_PREFIX: &str = "indexing_task"; -pub(crate) const INDEXING_TASK_SEPARATOR: char = ':'; +pub(crate) const INDEXING_TASK_PREFIX: &str = "indexing_task:"; // Readiness key and values used to store node's readiness in Chitchat state. pub(crate) const READINESS_KEY: &str = "readiness"; @@ -163,7 +164,7 @@ pub(crate) fn build_cluster_member( // Parses indexing task key into the IndexingTask. fn parse_indexing_task_key(key: &str) -> anyhow::Result { - let (_prefix, reminder) = key.split_once(INDEXING_TASK_SEPARATOR).ok_or_else(|| { + let reminder = key.strip_prefix(INDEXING_TASK_PREFIX).ok_or_else(|| { anyhow!( "indexing task must contain the delimiter character `:`: `{}`", key @@ -177,7 +178,7 @@ fn parse_indexing_task_key(key: &str) -> anyhow::Result { /// ignored, just warnings are emitted. pub(crate) fn parse_indexing_tasks(node_state: &NodeState, node_id: &str) -> Vec { node_state - .key_values(|key, _| key.starts_with(INDEXING_TASK_PREFIX)) + .iter_prefix(INDEXING_TASK_PREFIX) .map(|(key, versioned_value)| { let indexing_task = parse_indexing_task_key(key)?; let num_tasks: usize = versioned_value.value.parse()?; diff --git a/quickwit/quickwit-cluster/src/node.rs b/quickwit/quickwit-cluster/src/node.rs index 8ca4d41abe4..1091ade7068 100644 --- a/quickwit/quickwit-cluster/src/node.rs +++ b/quickwit/quickwit-cluster/src/node.rs @@ -82,7 +82,7 @@ impl ClusterNode { for (indexing_task, indexing_tasks_group) in indexing_tasks.iter().group_by(|&task| task).into_iter() { - let key = format!("{INDEXING_TASK_PREFIX}:{}", indexing_task.to_string()); + let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task); node_state.set(key, indexing_tasks_group.count().to_string()); } Self::try_new(chitchat_id, &node_state, channel, is_self_node).unwrap() diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index eca434b5c43..70ab08492f8 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -21,7 +21,7 @@ use std::collections::hash_map::Entry; use std::num::NonZeroU32; use std::ops::RangeInclusive; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use anyhow::Context; use async_trait::async_trait; @@ -38,7 +38,7 @@ use quickwit_common::temp_dir::TempDirectory; use quickwit_config::IndexingSettings; use quickwit_doc_mapper::DocMapper; use quickwit_metastore::checkpoint::{IndexCheckpointDelta, SourceCheckpointDelta}; -use quickwit_proto::indexing::IndexingPipelineId; +use quickwit_proto::indexing::{IndexingPipelineId, PipelineMetrics}; use quickwit_proto::metastore::{ LastDeleteOpstampRequest, MetastoreService, MetastoreServiceClient, }; @@ -79,6 +79,10 @@ pub struct IndexerCounters { /// Number of (valid) documents in the current workbench. /// This value is used to trigger commit and for observation. pub num_docs_in_workbench: u64, + + /// Metrics describing the load and indexing performance of the + /// pipeline. This is only updated for cooperative indexers. + pub pipeline_metrics_opt: Option, } struct IndexerState { @@ -190,6 +194,7 @@ impl IndexerState { } else { None }; + let last_delete_opstamp_request = LastDeleteOpstampRequest { index_uid: self.pipeline_id.index_uid.to_string(), }; @@ -383,15 +388,25 @@ impl Actor for Indexer { let Some(indexing_workbench) = &self.indexing_workbench_opt else { return Ok(()); }; + let elapsed = indexing_workbench.create_instant.elapsed(); + let uncompressed_num_bytes = indexing_workbench + .indexed_splits + .values() + .map(|split| split.split_attrs.uncompressed_docs_size_in_bytes) + .sum::(); + self.update_pipeline_metrics(elapsed, uncompressed_num_bytes); + self.send_to_serializer(CommitTrigger::Drained, ctx).await?; let commit_timeout = self.indexer_state.indexing_settings.commit_timeout(); if elapsed >= commit_timeout { return Ok(()); } + // Time to take a nap. let sleep_for = commit_timeout - elapsed; + ctx.schedule_self_msg(sleep_for, Command::Resume).await; self.handle(Command::Pause, ctx).await?; Ok(()) @@ -530,6 +545,20 @@ impl Indexer { } } + fn update_pipeline_metrics(&mut self, elapsed: Duration, uncompressed_num_bytes: u64) { + let commit_timeout = self.indexer_state.indexing_settings.commit_timeout(); + let cpu_thousandth: u16 = if elapsed >= commit_timeout { + 1_000 + } else { + (elapsed.as_micros() * 1_000 / commit_timeout.as_micros()) as u16 + }; + self.counters.pipeline_metrics_opt = Some(PipelineMetrics { + cpu_thousandth, + throughput_mb_per_sec: (uncompressed_num_bytes / (elapsed.as_millis() as u64 * 1_000)) + as u16, + }); + } + fn memory_usage(&self) -> Byte { if let Some(workbench) = &self.indexing_workbench_opt { workbench.memory_usage @@ -797,6 +826,7 @@ mod tests { num_splits_emitted: 1, num_split_batches_emitted: 1, num_docs_in_workbench: 1, //< the num docs in split counter has been reset. + pipeline_metrics_opt: None, } ); let messages: Vec = index_serializer_inbox.drain_for_test_typed(); @@ -1039,6 +1069,7 @@ mod tests { num_splits_emitted: 1, num_split_batches_emitted: 1, num_docs_in_workbench: 0, + pipeline_metrics_opt: None, } ); let indexed_split_batches: Vec = @@ -1111,6 +1142,7 @@ mod tests { num_splits_emitted: 1, num_split_batches_emitted: 1, num_docs_in_workbench: 0, + pipeline_metrics_opt: None, } ); let output_messages: Vec = @@ -1201,6 +1233,7 @@ mod tests { num_docs_in_workbench: 2, num_splits_emitted: 0, num_split_batches_emitted: 0, + pipeline_metrics_opt: None, } ); universe.send_exit_with_success(&indexer_mailbox).await?; @@ -1212,6 +1245,7 @@ mod tests { num_docs_in_workbench: 0, num_splits_emitted: 2, num_split_batches_emitted: 1, + pipeline_metrics_opt: None, } ); let split_batches: Vec = @@ -1557,6 +1591,7 @@ mod tests { num_splits_emitted: 0, num_split_batches_emitted: 0, num_docs_in_workbench: 0, //< the num docs in split counter has been reset. + pipeline_metrics_opt: None, } ); diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 98bef9ac3f2..5e9b9864888 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -142,11 +142,11 @@ impl Actor for IndexingPipeline { async fn finalize( &mut self, _exit_status: &ActorExitStatus, - _ctx: &ActorContext, + ctx: &ActorContext, ) -> anyhow::Result<()> { // We update the observation to ensure our last "black box" observation // is up to date. - self.perform_observe().await; + self.perform_observe(ctx).await; Ok(()) } } @@ -237,7 +237,7 @@ impl IndexingPipeline { self.statistics.generation } - async fn perform_observe(&mut self) { + async fn perform_observe(&mut self, ctx: &ActorContext) { let Some(handles) = &self.handles_opt else { return; }; @@ -256,6 +256,9 @@ impl IndexingPipeline { ) .set_generation(self.statistics.generation) .set_num_spawn_attempts(self.statistics.num_spawn_attempts); + let pipeline_metrics_opt = handles.indexer.last_observation().pipeline_metrics_opt; + self.statistics.pipeline_metrics_opt = pipeline_metrics_opt; + ctx.observe(self); } /// Checks if some actors have terminated. @@ -490,7 +493,7 @@ impl Handler for IndexingPipeline { supervise_loop_token: SuperviseLoop, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - self.perform_observe().await; + self.perform_observe(ctx).await; self.perform_health_check(ctx).await?; ctx.schedule_self_msg(SUPERVISE_INTERVAL, supervise_loop_token) .await; @@ -575,6 +578,7 @@ pub struct IndexingPipelineParams { pub source_storage_resolver: StorageResolver, pub ingester_pool: IngesterPool, pub queues_dir_path: PathBuf, + pub event_broker: EventBroker, } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 42c22d30f00..519a296e8c6 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -46,7 +46,7 @@ use quickwit_metastore::{ }; use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, IndexingError, IndexingPipelineId, - IndexingTask, + IndexingTask, PipelineMetrics, }; use quickwit_proto::metastore::{ IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, @@ -303,6 +303,7 @@ impl IndexingService { let max_concurrent_split_uploads_index = (self.max_concurrent_split_uploads / 2).max(1); let max_concurrent_split_uploads_merge = (self.max_concurrent_split_uploads - max_concurrent_split_uploads_index).max(1); + let pipeline_params = IndexingPipelineParams { pipeline_id: pipeline_id.clone(), metastore: self.metastore.clone(), @@ -416,6 +417,19 @@ impl IndexingService { }); self.counters.num_running_merge_pipelines = self.merge_pipeline_handles.len(); self.update_cluster_running_indexing_tasks().await; + + let pipeline_metrics: HashMap<&IndexingPipelineId, PipelineMetrics> = self + .indexing_pipelines + .iter() + .flat_map(|(pipeline, (_, pipeline_handle))| { + let indexer_metrics = pipeline_handle.last_observation(); + let pipeline_metrics = indexer_metrics.pipeline_metrics_opt?; + Some((pipeline, pipeline_metrics)) + }) + .collect(); + self.cluster + .update_self_node_pipeline_metrics(&pipeline_metrics) + .await; Ok(()) } diff --git a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs index 8d9c67acda7..bd404021d45 100644 --- a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs +++ b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs @@ -19,6 +19,7 @@ use std::sync::atomic::Ordering; +use quickwit_proto::indexing::PipelineMetrics; use serde::Serialize; use crate::actors::{DocProcessorCounters, IndexerCounters, PublisherCounters, UploaderCounters}; @@ -48,6 +49,8 @@ pub struct IndexingStatistics { pub generation: usize, /// Number of successive pipeline spawn attempts. pub num_spawn_attempts: usize, + // Pipeline metrics. + pub pipeline_metrics_opt: Option, } impl IndexingStatistics { diff --git a/quickwit/quickwit-proto/src/indexing/mod.rs b/quickwit/quickwit-proto/src/indexing/mod.rs index 2ed07cd10c2..9923edea31a 100644 --- a/quickwit/quickwit-proto/src/indexing/mod.rs +++ b/quickwit/quickwit-proto/src/indexing/mod.rs @@ -17,8 +17,9 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::fmt::Formatter; use std::hash::Hash; -use std::io; +use std::{fmt, io}; use anyhow::anyhow; use quickwit_actors::AskError; @@ -149,9 +150,15 @@ pub struct IndexingPipelineId { pub pipeline_ord: usize, } -impl ToString for IndexingTask { - fn to_string(&self) -> String { - format!("{}:{}", self.index_uid, self.source_id) +impl fmt::Display for IndexingPipelineId { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "{}:{}", self.index_uid, &self.source_id) + } +} + +impl fmt::Display for IndexingTask { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "{}:{}", self.index_uid, &self.source_id) } } @@ -200,6 +207,22 @@ impl TryFrom<&str> for IndexingTask { } } +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] +pub struct PipelineMetrics { + pub cpu_thousandth: u16, + pub throughput_mb_per_sec: u16, +} + +impl fmt::Display for PipelineMetrics { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{}‰,{}MB/s", + self.cpu_thousandth, self.throughput_mb_per_sec + ) + } +} + #[cfg(test)] mod tests { use super::*;