Skip to content

Commit

Permalink
added display control plane
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Dec 22, 2023
1 parent 718eeb1 commit 977bb07
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 5 deletions.
3 changes: 2 additions & 1 deletion quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ impl Handler<ControlPlanLoop> for ControlPlane {
_message: ControlPlanLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.indexing_scheduler.schedule_indexing_plan_if_needed(&self.model);
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.indexing_scheduler.control_running_plan(&self.model);
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
.await;
Expand Down
34 changes: 33 additions & 1 deletion quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
};
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::{NodeId, ShardId};
use quickwit_proto::types::{IndexUid, NodeId, ShardId};
use scheduling::{SourceToSchedule, SourceToScheduleType};
use serde::Serialize;
use tracing::{debug, error, info, warn};
use ulid::Ulid;

use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::scheduling::build_physical_indexing_plan;
Expand Down Expand Up @@ -212,13 +213,24 @@ impl IndexingScheduler {
&indexer_id_to_cpu_capacities,
self.state.last_applied_physical_plan.as_ref(),
);
println!("\n\n\n\n\n\n\n===============");
println!("# CURRENT PLAN");
println!("---------------");
display_plan(&new_physical_plan);
println!("# PREVIOUS PLAN");
println!("---------------");
if let Some(last_applied_plan) = self.state.last_applied_physical_plan.as_ref() {
display_plan(last_applied_plan);
}
println!("===============");
if let Some(last_applied_plan) = &self.state.last_applied_physical_plan {
let plans_diff = get_indexing_plans_diff(
last_applied_plan.indexing_tasks_per_indexer(),
new_physical_plan.indexing_tasks_per_indexer(),
);
// No need to apply the new plan as it is the same as the old one.
if plans_diff.is_empty() {
info!("no difference");
return;
}
}
Expand Down Expand Up @@ -315,6 +327,26 @@ impl IndexingScheduler {
}
}

fn display_plan(plan: &PhysicalIndexingPlan) {
for (node, tasks) in plan.indexing_tasks_per_indexer() {
println!("{node}");
for task in tasks {
if task.source_id == "_ingest-source" {
let index_uid = IndexUid::parse(&task.index_uid).unwrap();
println!(
" {:.5}:{} => {:?}",
task.pipeline_uid
.as_ref()
.map(|pipeline_uid| pipeline_uid.0)
.unwrap_or(Ulid::new()),
index_uid.index_id(),
task.shard_ids
);
}
}
}
}

struct IndexingPlansDiff<'a> {
pub missing_node_ids: FnvHashSet<&'a str>,
pub unplanned_node_ids: FnvHashSet<&'a str>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,6 @@ fn convert_scheduling_solution_to_physical_plan(
}

new_physical_plan.normalize();

new_physical_plan
}

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ impl IngesterService for Ingester {
info!(shard=%shard, "currently in shard hashmap");
}
for queue in state_guard.mrecordlog.list_queues() {
info!(queue=queue, "currently in mrecordlog");
info!(queue = queue, "currently in mrecordlog");
}
info!(queues=?remove_queue_ids, "removing queues");
for queue_id in remove_queue_ids {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-proto/src/types/pipeline_uid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const ULID_SIZE: usize = 16;

/// A pipeline uid identify an indexing pipeline and an indexing task.
#[derive(Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct PipelineUid(Ulid);
pub struct PipelineUid(pub Ulid);

impl std::fmt::Debug for PipelineUid {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
Expand Down

0 comments on commit 977bb07

Please sign in to comment.