Skip to content

Commit

Permalink
shorten apply-plan log (#5214)
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a authored Jul 12, 2024
1 parent 4de8d7a commit afee9de
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 23 deletions.
30 changes: 19 additions & 11 deletions quickwit/quickwit-common/src/pretty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,36 @@
use std::fmt;
use std::time::Duration;

pub struct PrettySample<'a, T>(&'a [T], usize);
pub struct PrettySample<I>(I, usize);

impl<'a, T> PrettySample<'a, T> {
pub fn new(slice: &'a [T], sample_size: usize) -> Self {
impl<I> PrettySample<I> {
pub fn new(slice: I, sample_size: usize) -> Self {
Self(slice, sample_size)
}
}

impl<T> fmt::Debug for PrettySample<'_, T>
where T: fmt::Debug
impl<I, T> fmt::Debug for PrettySample<I>
where
I: IntoIterator<Item = T> + Clone,
T: fmt::Debug,
{
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(formatter, "[")?;
for (i, item) in self.0.iter().enumerate() {
if i == self.1 {
write!(formatter, ", and {} more", self.0.len() - i)?;
break;
}
// in general we will get passed a reference (&[...], &HashMap...) or a Map<_> of them.
// So we either perform a Copy, or a cheap Clone of a simple struct
let mut iter = self.0.clone().into_iter().enumerate();
for (i, item) in &mut iter {
if i > 0 {
write!(formatter, ", ")?;
}
write!(formatter, "{item:?}")?;
if i == self.1 - 1 {
break;
}
}
let left = iter.count();
if left > 0 {
write!(formatter, ", and {left} more")?;
}
write!(formatter, "]")?;
Ok(())
Expand Down Expand Up @@ -83,7 +91,7 @@ mod tests {

#[test]
fn test_pretty_sample() {
let pretty_sample = PrettySample::<'_, usize>::new(&[], 2);
let pretty_sample = PrettySample::<&[usize]>::new(&[], 2);
assert_eq!(format!("{pretty_sample:?}"), "[]");

let pretty_sample = PrettySample::new(&[1], 2);
Expand Down
141 changes: 129 additions & 12 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::time::{Duration, Instant};
use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use quickwit_common::pretty::PrettySample;
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
PIPELINE_THROUGHTPUT,
Expand Down Expand Up @@ -448,38 +449,112 @@ fn get_shard_locality_metrics(
impl<'a> fmt::Debug for IndexingPlansDiff<'a> {
fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.has_same_nodes() && self.has_same_tasks() {
return write!(formatter, "EmptyIndexingPlanDiff");
return write!(formatter, "EmptyIndexingPlansDiff");
}
write!(formatter, "IndexingPlanDiff(")?;
write!(formatter, "IndexingPlansDiff(")?;
let mut separator = "";
if !self.missing_node_ids.is_empty() {
write!(formatter, "missing_node_ids={:?}, ", self.missing_node_ids)?;
write!(
formatter,
"missing_node_ids={:?}",
PrettySample::new(&self.missing_node_ids, 10)
)?;
separator = ", "
}
if !self.unplanned_node_ids.is_empty() {
write!(
formatter,
"{separator}unplanned_node_ids={:?}",
self.unplanned_node_ids
PrettySample::new(&self.unplanned_node_ids, 10)
)?;
separator = ", "
}
if !self.missing_tasks_by_node_id.is_empty() {
write!(
formatter,
"{separator}missing_tasks_by_node_id={:?}, ",
self.missing_tasks_by_node_id
)?;
write!(formatter, "{separator}missing_tasks_by_node_id=",)?;
format_indexing_task_map(formatter, &self.missing_tasks_by_node_id)?;
separator = ", "
}
if !self.unplanned_tasks_by_node_id.is_empty() {
write!(formatter, "{separator}unplanned_tasks_by_node_id=",)?;
format_indexing_task_map(formatter, &self.unplanned_tasks_by_node_id)?;
}
write!(formatter, ")")
}
}

fn format_indexing_task_map(
formatter: &mut std::fmt::Formatter,
indexing_tasks: &FnvHashMap<&str, Vec<&IndexingTask>>,
) -> std::fmt::Result {
// we show at most 5 nodes, and aggregate the results for the other.
// we show at most 10 indexes, but aggregate results after.
// we always aggregate shard ids
// we hide pipeline id and incarnation id, they are not very useful in most case, but take a
// lot of place
const MAX_NODE: usize = 5;
const MAX_INDEXES: usize = 10;
let mut index_displayed = 0;
write!(formatter, "{{")?;
let mut indexer_iter = indexing_tasks.iter().enumerate();
for (i, (index_name, tasks)) in &mut indexer_iter {
if i != 0 {
write!(formatter, ", ")?;
}
if index_displayed != MAX_INDEXES - 1 {
write!(formatter, "{index_name:?}: [")?;
let mut tasks_iter = tasks.iter().enumerate();
for (i, task) in &mut tasks_iter {
if i != 0 {
write!(formatter, ", ")?;
}
write!(
formatter,
r#"(index_id: "{}", source_id: "{}", shard_count: {})"#,
task.index_uid.as_ref().unwrap().index_id,
task.source_id,
task.shard_ids.len()
)?;
index_displayed += 1;
if index_displayed == MAX_INDEXES - 1 {
let (task_count, shard_count) = tasks_iter.fold((0, 0), |(t, s), (_, task)| {
(t + 1, s + task.shard_ids.len())
});
if task_count > 0 {
write!(
formatter,
" and {task_count} tasks and {shard_count} shards"
)?;
}
break;
}
}
write!(formatter, "]")?;
} else {
write!(
formatter,
"{separator}unplanned_tasks_by_node_id={:?}",
self.unplanned_tasks_by_node_id
"{index_name:?}: [with {} tasks and {} shards]",
tasks.len(),
tasks.iter().map(|task| task.shard_ids.len()).sum::<usize>()
)?;
}
write!(formatter, ")")
if i == MAX_NODE - 1 {
break;
}
}
let (indexer, tasks, shards) = indexer_iter.fold((0, 0, 0), |(i, t, s), (_, (_, task))| {
(
i + 1,
t + task.len(),
s + task.iter().map(|task| task.shard_ids.len()).sum::<usize>(),
)
});
if indexer > 0 {
write!(
formatter,
" and {indexer} more indexers, handling {tasks} tasks and {shards} shards}}"
)
} else {
write!(formatter, "}}")
}
}

Expand Down Expand Up @@ -884,6 +959,48 @@ mod tests {
assert_eq!(indexer_2_tasks.len(), 3);
}

#[test]
fn test_debug_indexing_task_map() {
let mut map = FnvHashMap::default();
let task1 = IndexingTask {
index_uid: Some(IndexUid::for_test("index1", 123)),
source_id: "my-source".to_string(),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec!["shard1".into()],
};
let task2 = IndexingTask {
index_uid: Some(IndexUid::for_test("index2", 123)),
source_id: "my-source".to_string(),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec!["shard2".into(), "shard3".into()],
};
let task3 = IndexingTask {
index_uid: Some(IndexUid::for_test("index3", 123)),
source_id: "my-source".to_string(),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec!["shard6".into()],
};
// order made to map with the debug for lisibility
map.insert("indexer5", vec![&task2]);
map.insert("indexer4", vec![&task1]);
map.insert("indexer3", vec![&task1, &task3]);
map.insert("indexer2", vec![&task2, &task3, &task1, &task2]);
map.insert("indexer1", vec![&task1, &task2, &task3, &task1]);
map.insert("indexer6", vec![&task1, &task2, &task3]);
let plan = IndexingPlansDiff {
missing_node_ids: FnvHashSet::default(),
unplanned_node_ids: FnvHashSet::default(),
missing_tasks_by_node_id: map,
unplanned_tasks_by_node_id: FnvHashMap::default(),
};

let debug = format!("{plan:?}");
assert_eq!(
debug,
r#"IndexingPlansDiff(missing_tasks_by_node_id={"indexer5": [(index_id: "index2", source_id: "my-source", shard_count: 2)], "indexer4": [(index_id: "index1", source_id: "my-source", shard_count: 1)], "indexer3": [(index_id: "index1", source_id: "my-source", shard_count: 1), (index_id: "index3", source_id: "my-source", shard_count: 1)], "indexer2": [(index_id: "index2", source_id: "my-source", shard_count: 2), (index_id: "index3", source_id: "my-source", shard_count: 1), (index_id: "index1", source_id: "my-source", shard_count: 1), (index_id: "index2", source_id: "my-source", shard_count: 2)], "indexer1": [(index_id: "index1", source_id: "my-source", shard_count: 1) and 3 tasks and 4 shards] and 1 more indexers, handling 3 tasks and 4 shards})"#
);
}

proptest! {
#[test]
fn test_building_indexing_tasks_and_physical_plan(num_indexers in 1usize..50usize, index_id_sources in proptest::collection::vec(gen_kafka_source(), 1..20)) {
Expand Down

0 comments on commit afee9de

Please sign in to comment.