From afee9de3bae12e0b4a3a53028161dbc896b50143 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 12 Jul 2024 10:29:11 +0200 Subject: [PATCH] shorten apply-plan log (#5214) --- quickwit/quickwit-common/src/pretty.rs | 30 ++-- .../src/indexing_scheduler/mod.rs | 141 ++++++++++++++++-- 2 files changed, 148 insertions(+), 23 deletions(-) diff --git a/quickwit/quickwit-common/src/pretty.rs b/quickwit/quickwit-common/src/pretty.rs index 04f75bb7265..dcc5e3b1536 100644 --- a/quickwit/quickwit-common/src/pretty.rs +++ b/quickwit/quickwit-common/src/pretty.rs @@ -20,28 +20,36 @@ use std::fmt; use std::time::Duration; -pub struct PrettySample<'a, T>(&'a [T], usize); +pub struct PrettySample(I, usize); -impl<'a, T> PrettySample<'a, T> { - pub fn new(slice: &'a [T], sample_size: usize) -> Self { +impl PrettySample { + pub fn new(slice: I, sample_size: usize) -> Self { Self(slice, sample_size) } } -impl fmt::Debug for PrettySample<'_, T> -where T: fmt::Debug +impl fmt::Debug for PrettySample +where + I: IntoIterator + Clone, + T: fmt::Debug, { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { write!(formatter, "[")?; - for (i, item) in self.0.iter().enumerate() { - if i == self.1 { - write!(formatter, ", and {} more", self.0.len() - i)?; - break; - } + // in general we will get passed a reference (&[...], &HashMap...) or a Map<_> of them. + // So we either perform a Copy, or a cheap Clone of a simple struct + let mut iter = self.0.clone().into_iter().enumerate(); + for (i, item) in &mut iter { if i > 0 { write!(formatter, ", ")?; } write!(formatter, "{item:?}")?; + if i == self.1 - 1 { + break; + } + } + let left = iter.count(); + if left > 0 { + write!(formatter, ", and {left} more")?; } write!(formatter, "]")?; Ok(()) @@ -83,7 +91,7 @@ mod tests { #[test] fn test_pretty_sample() { - let pretty_sample = PrettySample::<'_, usize>::new(&[], 2); + let pretty_sample = PrettySample::<&[usize]>::new(&[], 2); assert_eq!(format!("{pretty_sample:?}"), "[]"); let pretty_sample = PrettySample::new(&[1], 2); diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 110e08a5139..a94d5de92a3 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -29,6 +29,7 @@ use std::time::{Duration, Instant}; use fnv::{FnvHashMap, FnvHashSet}; use itertools::Itertools; use once_cell::sync::OnceCell; +use quickwit_common::pretty::PrettySample; use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY, PIPELINE_THROUGHTPUT, @@ -448,38 +449,112 @@ fn get_shard_locality_metrics( impl<'a> fmt::Debug for IndexingPlansDiff<'a> { fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { if self.has_same_nodes() && self.has_same_tasks() { - return write!(formatter, "EmptyIndexingPlanDiff"); + return write!(formatter, "EmptyIndexingPlansDiff"); } - write!(formatter, "IndexingPlanDiff(")?; + write!(formatter, "IndexingPlansDiff(")?; let mut separator = ""; if !self.missing_node_ids.is_empty() { - write!(formatter, "missing_node_ids={:?}, ", self.missing_node_ids)?; + write!( + formatter, + "missing_node_ids={:?}", + PrettySample::new(&self.missing_node_ids, 10) + )?; separator = ", " } if !self.unplanned_node_ids.is_empty() { write!( formatter, "{separator}unplanned_node_ids={:?}", - self.unplanned_node_ids + PrettySample::new(&self.unplanned_node_ids, 10) )?; separator = ", " } if !self.missing_tasks_by_node_id.is_empty() { - write!( - formatter, - "{separator}missing_tasks_by_node_id={:?}, ", - self.missing_tasks_by_node_id - )?; + write!(formatter, "{separator}missing_tasks_by_node_id=",)?; + format_indexing_task_map(formatter, &self.missing_tasks_by_node_id)?; separator = ", " } if !self.unplanned_tasks_by_node_id.is_empty() { + write!(formatter, "{separator}unplanned_tasks_by_node_id=",)?; + format_indexing_task_map(formatter, &self.unplanned_tasks_by_node_id)?; + } + write!(formatter, ")") + } +} + +fn format_indexing_task_map( + formatter: &mut std::fmt::Formatter, + indexing_tasks: &FnvHashMap<&str, Vec<&IndexingTask>>, +) -> std::fmt::Result { + // we show at most 5 nodes, and aggregate the results for the other. + // we show at most 10 indexes, but aggregate results after. + // we always aggregate shard ids + // we hide pipeline id and incarnation id, they are not very useful in most case, but take a + // lot of place + const MAX_NODE: usize = 5; + const MAX_INDEXES: usize = 10; + let mut index_displayed = 0; + write!(formatter, "{{")?; + let mut indexer_iter = indexing_tasks.iter().enumerate(); + for (i, (index_name, tasks)) in &mut indexer_iter { + if i != 0 { + write!(formatter, ", ")?; + } + if index_displayed != MAX_INDEXES - 1 { + write!(formatter, "{index_name:?}: [")?; + let mut tasks_iter = tasks.iter().enumerate(); + for (i, task) in &mut tasks_iter { + if i != 0 { + write!(formatter, ", ")?; + } + write!( + formatter, + r#"(index_id: "{}", source_id: "{}", shard_count: {})"#, + task.index_uid.as_ref().unwrap().index_id, + task.source_id, + task.shard_ids.len() + )?; + index_displayed += 1; + if index_displayed == MAX_INDEXES - 1 { + let (task_count, shard_count) = tasks_iter.fold((0, 0), |(t, s), (_, task)| { + (t + 1, s + task.shard_ids.len()) + }); + if task_count > 0 { + write!( + formatter, + " and {task_count} tasks and {shard_count} shards" + )?; + } + break; + } + } + write!(formatter, "]")?; + } else { write!( formatter, - "{separator}unplanned_tasks_by_node_id={:?}", - self.unplanned_tasks_by_node_id + "{index_name:?}: [with {} tasks and {} shards]", + tasks.len(), + tasks.iter().map(|task| task.shard_ids.len()).sum::() )?; } - write!(formatter, ")") + if i == MAX_NODE - 1 { + break; + } + } + let (indexer, tasks, shards) = indexer_iter.fold((0, 0, 0), |(i, t, s), (_, (_, task))| { + ( + i + 1, + t + task.len(), + s + task.iter().map(|task| task.shard_ids.len()).sum::(), + ) + }); + if indexer > 0 { + write!( + formatter, + " and {indexer} more indexers, handling {tasks} tasks and {shards} shards}}" + ) + } else { + write!(formatter, "}}") } } @@ -884,6 +959,48 @@ mod tests { assert_eq!(indexer_2_tasks.len(), 3); } + #[test] + fn test_debug_indexing_task_map() { + let mut map = FnvHashMap::default(); + let task1 = IndexingTask { + index_uid: Some(IndexUid::for_test("index1", 123)), + source_id: "my-source".to_string(), + pipeline_uid: Some(PipelineUid::random()), + shard_ids: vec!["shard1".into()], + }; + let task2 = IndexingTask { + index_uid: Some(IndexUid::for_test("index2", 123)), + source_id: "my-source".to_string(), + pipeline_uid: Some(PipelineUid::random()), + shard_ids: vec!["shard2".into(), "shard3".into()], + }; + let task3 = IndexingTask { + index_uid: Some(IndexUid::for_test("index3", 123)), + source_id: "my-source".to_string(), + pipeline_uid: Some(PipelineUid::random()), + shard_ids: vec!["shard6".into()], + }; + // order made to map with the debug for lisibility + map.insert("indexer5", vec![&task2]); + map.insert("indexer4", vec![&task1]); + map.insert("indexer3", vec![&task1, &task3]); + map.insert("indexer2", vec![&task2, &task3, &task1, &task2]); + map.insert("indexer1", vec![&task1, &task2, &task3, &task1]); + map.insert("indexer6", vec![&task1, &task2, &task3]); + let plan = IndexingPlansDiff { + missing_node_ids: FnvHashSet::default(), + unplanned_node_ids: FnvHashSet::default(), + missing_tasks_by_node_id: map, + unplanned_tasks_by_node_id: FnvHashMap::default(), + }; + + let debug = format!("{plan:?}"); + assert_eq!( + debug, + r#"IndexingPlansDiff(missing_tasks_by_node_id={"indexer5": [(index_id: "index2", source_id: "my-source", shard_count: 2)], "indexer4": [(index_id: "index1", source_id: "my-source", shard_count: 1)], "indexer3": [(index_id: "index1", source_id: "my-source", shard_count: 1), (index_id: "index3", source_id: "my-source", shard_count: 1)], "indexer2": [(index_id: "index2", source_id: "my-source", shard_count: 2), (index_id: "index3", source_id: "my-source", shard_count: 1), (index_id: "index1", source_id: "my-source", shard_count: 1), (index_id: "index2", source_id: "my-source", shard_count: 2)], "indexer1": [(index_id: "index1", source_id: "my-source", shard_count: 1) and 3 tasks and 4 shards] and 1 more indexers, handling 3 tasks and 4 shards})"# + ); + } + proptest! { #[test] fn test_building_indexing_tasks_and_physical_plan(num_indexers in 1usize..50usize, index_id_sources in proptest::collection::vec(gen_kafka_source(), 1..20)) {