Skip to content

Commit

Permalink
Exposing pipeline metrics (#4026)
Browse files Browse the repository at this point in the history
* Exposing pipeline metrics

* Apply suggestions from code review

Co-authored-by: Adrien Guillo <[email protected]>

* Moved metrics to proto/indexing

* Removing obsolete pipeline metrics keys.

Updated chitchat, and using the new iter_prefix method.

---------

Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
fulmicoton and guilload authored Oct 27, 2023
1 parent 0ebb7dd commit 67e3ee9
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 40 deletions.
5 changes: 3 additions & 2 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ base64 = "0.21"
byte-unit = { version = "4", default-features = false, features = ["serde", "std"] }
bytes = { version = "1", features = ["serde"] }
bytestring = "1.3.0"
chitchat = { git = "https://github.com/quickwit-oss/chitchat", rev = "bc29598" }
chitchat = "0.6"
chrono = { version = "0.4.23", default-features = false, features = ["clock", "std"] }
clap = { version = "4.4.1", features = ["env", "string"] }
colored = "2.0.0"
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-actors/src/actor_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ impl<A: Actor> ActorContext<A> {
self.observe_enqueued.swap(true, Ordering::Relaxed)
}

pub(crate) fn observe(&self, actor: &mut A) -> A::ObservableState {
/// Updates the observable state of the actor.
pub fn observe(&self, actor: &mut A) -> A::ObservableState {
let obs_state = actor.observable_state();
self.inner.observe_enqueued.store(false, Ordering::Relaxed);
let _ = self.observable_state_tx.send(obs_state.clone());
Expand Down
59 changes: 39 additions & 20 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{BTreeMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;
Expand All @@ -31,7 +31,7 @@ use chitchat::{
};
use futures::Stream;
use itertools::Itertools;
use quickwit_proto::indexing::IndexingTask;
use quickwit_proto::indexing::{IndexingPipelineId, IndexingTask, PipelineMetrics};
use quickwit_proto::types::NodeId;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch, Mutex, RwLock};
Expand All @@ -43,8 +43,8 @@ use tracing::{info, warn};
use crate::change::{compute_cluster_change_events, ClusterChange};
use crate::member::{
build_cluster_member, ClusterMember, NodeStateExt, ENABLED_SERVICES_KEY,
GRPC_ADVERTISE_ADDR_KEY, INDEXING_TASK_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY,
READINESS_VALUE_READY,
GRPC_ADVERTISE_ADDR_KEY, INDEXING_TASK_PREFIX, PIPELINE_METRICS_PREFIX, READINESS_KEY,
READINESS_VALUE_NOT_READY, READINESS_VALUE_READY,
};
use crate::ClusterNode;

Expand All @@ -70,7 +70,7 @@ pub struct Cluster {
}

impl Debug for Cluster {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter
.debug_struct("Cluster")
.field("cluster_id", &self.cluster_id)
Expand Down Expand Up @@ -301,9 +301,34 @@ impl Cluster {
tokio::time::sleep(GOSSIP_INTERVAL * 2).await;
}

/// This exposes in chitchat some metrics about the CPU usage of cooperative pipelines.
/// The metrics are exposed as follows:
/// Key: pipeline_metrics:<index_uid>:<source_id>
/// Value: 179‰,76MB/s
pub async fn update_self_node_pipeline_metrics(
&self,
pipeline_metrics: &HashMap<&IndexingPipelineId, PipelineMetrics>,
) {
let chitchat = self.chitchat().await;
let mut chitchat_guard = chitchat.lock().await;
let node_state = chitchat_guard.self_node_state();
let mut current_metrics_keys: HashSet<String> = node_state
.iter_prefix(PIPELINE_METRICS_PREFIX)
.map(|(key, _)| key.to_string())
.collect();
for (pipeline_id, metrics) in pipeline_metrics {
let key = format!("{PIPELINE_METRICS_PREFIX}{pipeline_id}");
current_metrics_keys.remove(&key);
node_state.set(key, metrics.to_string());
}
for obsolete_task_key in current_metrics_keys {
node_state.mark_for_deletion(&obsolete_task_key);
}
}

/// Updates indexing tasks in chitchat state.
/// Tasks are grouped by (index_id, source_id), each group is stored in a key as follows:
/// - key: `{INDEXING_TASK_PREFIX}{INDEXING_TASK_SEPARATOR}{index_id}{INDEXING_TASK_SEPARATOR}{source_id}`
/// - key: `{INDEXING_TASK_PREFIX}{index_id}{INDEXING_TASK_SEPARATOR}{source_id}`
/// - value: Number of indexing tasks in the group.
/// Keys present in chitchat state but not in the given `indexing_tasks` are marked for
/// deletion.
Expand All @@ -313,24 +338,20 @@ impl Cluster {
) -> anyhow::Result<()> {
let chitchat = self.chitchat().await;
let mut chitchat_guard = chitchat.lock().await;
let mut current_indexing_tasks_keys: HashSet<_> = chitchat_guard
.self_node_state()
.key_values(|key, _| key.starts_with(INDEXING_TASK_PREFIX))
let node_state = chitchat_guard.self_node_state();
let mut current_indexing_tasks_keys: HashSet<String> = node_state
.iter_prefix(INDEXING_TASK_PREFIX)
.map(|(key, _)| key.to_string())
.collect();
for (indexing_task, indexing_tasks_group) in
indexing_tasks.iter().group_by(|&task| task).into_iter()
{
let key = format!("{INDEXING_TASK_PREFIX}:{}", indexing_task.to_string());
let key = format!("{INDEXING_TASK_PREFIX}{indexing_task}");
current_indexing_tasks_keys.remove(&key);
chitchat_guard
.self_node_state()
.set(key, indexing_tasks_group.count().to_string());
node_state.set(key, indexing_tasks_group.count().to_string());
}
for obsolete_task_key in current_indexing_tasks_keys {
chitchat_guard
.self_node_state()
.mark_for_deletion(&obsolete_task_key);
node_state.mark_for_deletion(&obsolete_task_key);
}
Ok(())
}
Expand Down Expand Up @@ -949,13 +970,11 @@ mod tests {
let chitchat_handle = node.inner.read().await.chitchat_handle.chitchat();
let mut chitchat_guard = chitchat_handle.lock().await;
chitchat_guard.self_node_state().set(
format!(
"{INDEXING_TASK_PREFIX}:my_good_index:my_source:11111111111111111111111111"
),
format!("{INDEXING_TASK_PREFIX}my_good_index:my_source:11111111111111111111111111"),
"2".to_string(),
);
chitchat_guard.self_node_state().set(
format!("{INDEXING_TASK_PREFIX}:my_bad_index:my_source:11111111111111111111111111"),
format!("{INDEXING_TASK_PREFIX}my_bad_index:my_source:11111111111111111111111111"),
"malformatted value".to_string(),
);
}
Expand Down
9 changes: 5 additions & 4 deletions quickwit/quickwit-cluster/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ use crate::{GenerationId, QuickwitService};
// Keys used to store member's data in chitchat state.
pub(crate) const GRPC_ADVERTISE_ADDR_KEY: &str = "grpc_advertise_addr";
pub(crate) const ENABLED_SERVICES_KEY: &str = "enabled_services";
pub(crate) const PIPELINE_METRICS_PREFIX: &str = "pipeline_metrics:";

// An indexing task key is formatted as
// `{INDEXING_TASK_PREFIX}{INDEXING_TASK_SEPARATOR}{index_id}{INDEXING_TASK_SEPARATOR}{source_id}`.
pub(crate) const INDEXING_TASK_PREFIX: &str = "indexing_task";
pub(crate) const INDEXING_TASK_SEPARATOR: char = ':';
pub(crate) const INDEXING_TASK_PREFIX: &str = "indexing_task:";

// Readiness key and values used to store node's readiness in Chitchat state.
pub(crate) const READINESS_KEY: &str = "readiness";
Expand Down Expand Up @@ -163,7 +164,7 @@ pub(crate) fn build_cluster_member(

// Parses indexing task key into the IndexingTask.
fn parse_indexing_task_key(key: &str) -> anyhow::Result<IndexingTask> {
let (_prefix, reminder) = key.split_once(INDEXING_TASK_SEPARATOR).ok_or_else(|| {
let reminder = key.strip_prefix(INDEXING_TASK_PREFIX).ok_or_else(|| {
anyhow!(
"indexing task must contain the delimiter character `:`: `{}`",
key
Expand All @@ -177,7 +178,7 @@ fn parse_indexing_task_key(key: &str) -> anyhow::Result<IndexingTask> {
/// ignored, just warnings are emitted.
pub(crate) fn parse_indexing_tasks(node_state: &NodeState, node_id: &str) -> Vec<IndexingTask> {
node_state
.key_values(|key, _| key.starts_with(INDEXING_TASK_PREFIX))
.iter_prefix(INDEXING_TASK_PREFIX)
.map(|(key, versioned_value)| {
let indexing_task = parse_indexing_task_key(key)?;
let num_tasks: usize = versioned_value.value.parse()?;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl ClusterNode {
for (indexing_task, indexing_tasks_group) in
indexing_tasks.iter().group_by(|&task| task).into_iter()
{
let key = format!("{INDEXING_TASK_PREFIX}:{}", indexing_task.to_string());
let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task);
node_state.set(key, indexing_tasks_group.count().to_string());
}
Self::try_new(chitchat_id, &node_state, channel, is_self_node).unwrap()
Expand Down
39 changes: 37 additions & 2 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::hash_map::Entry;
use std::num::NonZeroU32;
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

use anyhow::Context;
use async_trait::async_trait;
Expand All @@ -38,7 +38,7 @@ use quickwit_common::temp_dir::TempDirectory;
use quickwit_config::IndexingSettings;
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::checkpoint::{IndexCheckpointDelta, SourceCheckpointDelta};
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::indexing::{IndexingPipelineId, PipelineMetrics};
use quickwit_proto::metastore::{
LastDeleteOpstampRequest, MetastoreService, MetastoreServiceClient,
};
Expand Down Expand Up @@ -79,6 +79,10 @@ pub struct IndexerCounters {
/// Number of (valid) documents in the current workbench.
/// This value is used to trigger commit and for observation.
pub num_docs_in_workbench: u64,

/// Metrics describing the load and indexing performance of the
/// pipeline. This is only updated for cooperative indexers.
pub pipeline_metrics_opt: Option<PipelineMetrics>,
}

struct IndexerState {
Expand Down Expand Up @@ -190,6 +194,7 @@ impl IndexerState {
} else {
None
};

let last_delete_opstamp_request = LastDeleteOpstampRequest {
index_uid: self.pipeline_id.index_uid.to_string(),
};
Expand Down Expand Up @@ -383,15 +388,25 @@ impl Actor for Indexer {
let Some(indexing_workbench) = &self.indexing_workbench_opt else {
return Ok(());
};

let elapsed = indexing_workbench.create_instant.elapsed();
let uncompressed_num_bytes = indexing_workbench
.indexed_splits
.values()
.map(|split| split.split_attrs.uncompressed_docs_size_in_bytes)
.sum::<u64>();
self.update_pipeline_metrics(elapsed, uncompressed_num_bytes);

self.send_to_serializer(CommitTrigger::Drained, ctx).await?;

let commit_timeout = self.indexer_state.indexing_settings.commit_timeout();
if elapsed >= commit_timeout {
return Ok(());
}

// Time to take a nap.
let sleep_for = commit_timeout - elapsed;

ctx.schedule_self_msg(sleep_for, Command::Resume).await;
self.handle(Command::Pause, ctx).await?;
Ok(())
Expand Down Expand Up @@ -530,6 +545,20 @@ impl Indexer {
}
}

fn update_pipeline_metrics(&mut self, elapsed: Duration, uncompressed_num_bytes: u64) {
let commit_timeout = self.indexer_state.indexing_settings.commit_timeout();
let cpu_thousandth: u16 = if elapsed >= commit_timeout {
1_000
} else {
(elapsed.as_micros() * 1_000 / commit_timeout.as_micros()) as u16
};
self.counters.pipeline_metrics_opt = Some(PipelineMetrics {
cpu_thousandth,
throughput_mb_per_sec: (uncompressed_num_bytes / (elapsed.as_millis() as u64 * 1_000))
as u16,
});
}

fn memory_usage(&self) -> Byte {
if let Some(workbench) = &self.indexing_workbench_opt {
workbench.memory_usage
Expand Down Expand Up @@ -797,6 +826,7 @@ mod tests {
num_splits_emitted: 1,
num_split_batches_emitted: 1,
num_docs_in_workbench: 1, //< the num docs in split counter has been reset.
pipeline_metrics_opt: None,
}
);
let messages: Vec<IndexedSplitBatchBuilder> = index_serializer_inbox.drain_for_test_typed();
Expand Down Expand Up @@ -1039,6 +1069,7 @@ mod tests {
num_splits_emitted: 1,
num_split_batches_emitted: 1,
num_docs_in_workbench: 0,
pipeline_metrics_opt: None,
}
);
let indexed_split_batches: Vec<IndexedSplitBatchBuilder> =
Expand Down Expand Up @@ -1111,6 +1142,7 @@ mod tests {
num_splits_emitted: 1,
num_split_batches_emitted: 1,
num_docs_in_workbench: 0,
pipeline_metrics_opt: None,
}
);
let output_messages: Vec<IndexedSplitBatchBuilder> =
Expand Down Expand Up @@ -1201,6 +1233,7 @@ mod tests {
num_docs_in_workbench: 2,
num_splits_emitted: 0,
num_split_batches_emitted: 0,
pipeline_metrics_opt: None,
}
);
universe.send_exit_with_success(&indexer_mailbox).await?;
Expand All @@ -1212,6 +1245,7 @@ mod tests {
num_docs_in_workbench: 0,
num_splits_emitted: 2,
num_split_batches_emitted: 1,
pipeline_metrics_opt: None,
}
);
let split_batches: Vec<IndexedSplitBatchBuilder> =
Expand Down Expand Up @@ -1557,6 +1591,7 @@ mod tests {
num_splits_emitted: 0,
num_split_batches_emitted: 0,
num_docs_in_workbench: 0, //< the num docs in split counter has been reset.
pipeline_metrics_opt: None,
}
);

Expand Down
12 changes: 8 additions & 4 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ impl Actor for IndexingPipeline {
async fn finalize(
&mut self,
_exit_status: &ActorExitStatus,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> anyhow::Result<()> {
// We update the observation to ensure our last "black box" observation
// is up to date.
self.perform_observe().await;
self.perform_observe(ctx).await;
Ok(())
}
}
Expand Down Expand Up @@ -237,7 +237,7 @@ impl IndexingPipeline {
self.statistics.generation
}

async fn perform_observe(&mut self) {
async fn perform_observe(&mut self, ctx: &ActorContext<Self>) {
let Some(handles) = &self.handles_opt else {
return;
};
Expand All @@ -256,6 +256,9 @@ impl IndexingPipeline {
)
.set_generation(self.statistics.generation)
.set_num_spawn_attempts(self.statistics.num_spawn_attempts);
let pipeline_metrics_opt = handles.indexer.last_observation().pipeline_metrics_opt;
self.statistics.pipeline_metrics_opt = pipeline_metrics_opt;
ctx.observe(self);
}

/// Checks if some actors have terminated.
Expand Down Expand Up @@ -490,7 +493,7 @@ impl Handler<SuperviseLoop> for IndexingPipeline {
supervise_loop_token: SuperviseLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.perform_observe().await;
self.perform_observe(ctx).await;
self.perform_health_check(ctx).await?;
ctx.schedule_self_msg(SUPERVISE_INTERVAL, supervise_loop_token)
.await;
Expand Down Expand Up @@ -575,6 +578,7 @@ pub struct IndexingPipelineParams {
pub source_storage_resolver: StorageResolver,
pub ingester_pool: IngesterPool,
pub queues_dir_path: PathBuf,

pub event_broker: EventBroker,
}

Expand Down
Loading

0 comments on commit 67e3ee9

Please sign in to comment.