Skip to content

Commit

Permalink
CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 31, 2023
1 parent 83e55bb commit c7303e3
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 149 deletions.
16 changes: 6 additions & 10 deletions quickwit/quickwit-control-plane/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

The Control Plane is responsible for scheduling indexing tasks to indexers. Its role is to ensure that the cluster is correctly running all indexing tasks on each indexer.

An indexing task is simply identified by a couple `(IndexId, SourceId)`.
An indexing task is simply identified by a couple `(IndexId, SourceId, Option<Vec<ShardId>>)`.

## Scheduling algorithm

The control plane subscribes to all metastore events to keep an up to date the list of indexing tasks that should be running on the cluster.
The control plane keeps an up to date partial view of the metastore.
This is enforced by routing all of the index/shards/sources alternating
command be routed through the control plane.

On startup, or when a metastore event is received, the scheduler computes the list of indexing tasks.
It then applies a placement algorithm to decide which indexer should be running each indexing task. The result of this placement is called the physical indexing plan, and associated each indexer to a list of indexing tasks.
Expand All @@ -15,15 +17,9 @@ The control plane then emits gRPC to the indexers that are not already following

```mermaid
flowchart TB
StartScheduling(Start scheduling)--"(Sources, Nodes)"-->Define
StartScheduling(Start scheduling)--"(Sources, Nodes)"-->BuildPhysical
style StartScheduling fill:#ff0026,fill-opacity:0.5,stroke:#ff0026,stroke-width:4px
Define[Define indexing tasks] --IndexingTasks--> AssignTasks
subgraph AssignTasks[Assign each indexing task]
direction LR
Filter[Filter nodes]--Node candidates-->Score
Score[Score each node]--Nodes with score-->Select[Select best node]
end
AssignTasks--PhysicalPlan-->Apply
BuildPhysical[Build Physical Plan]--PhysicalPlan-->Apply
Apply[Apply plan to each indexer] --IndexerPlan--> Indexer1
Apply --IndexerPlan--> Indexer2
Apply --IndexerPlan--> Indexer...
Expand Down
24 changes: 12 additions & 12 deletions quickwit/quickwit-control-plane/src/indexing_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,31 @@ use serde::Serialize;
/// to identify if the plan is up to date with the metastore.
#[derive(Debug, PartialEq, Clone, Serialize, Default)]
pub struct PhysicalIndexingPlan {
indexing_tasks_per_node_id: FnvHashMap<String, Vec<IndexingTask>>,
indexing_tasks_per_indexer_id: FnvHashMap<String, Vec<IndexingTask>>,
}

impl PhysicalIndexingPlan {
pub fn add_indexing_task(&mut self, node_id: &str, indexing_task: IndexingTask) {
self.indexing_tasks_per_node_id
.entry(node_id.to_string())
pub fn add_indexing_task(&mut self, indexer_id: &str, indexing_task: IndexingTask) {
self.indexing_tasks_per_indexer_id
.entry(indexer_id.to_string())
.or_default()
.push(indexing_task);
}

/// Returns the hashmap of (node ID, indexing tasks).
pub fn indexing_tasks_per_node(&self) -> &FnvHashMap<String, Vec<IndexingTask>> {
&self.indexing_tasks_per_node_id
/// Returns the hashmap of (indexer ID, indexing tasks).
pub fn indexing_tasks_per_indexer(&self) -> &FnvHashMap<String, Vec<IndexingTask>> {
&self.indexing_tasks_per_indexer_id
}

/// Returns the hashmap of (node ID, indexing tasks).
pub fn node(&self, node_id: &str) -> Option<&[IndexingTask]> {
self.indexing_tasks_per_node_id
.get(node_id)
/// Returns the hashmap of (indexer ID, indexing tasks).
pub fn indexer(&self, indexer_id: &str) -> Option<&[IndexingTask]> {
self.indexing_tasks_per_indexer_id
.get(indexer_id)
.map(Vec::as_slice)
}

pub fn normalize(&mut self) {
for tasks in self.indexing_tasks_per_node_id.values_mut() {
for tasks in self.indexing_tasks_per_indexer_id.values_mut() {
tasks.sort_by(|left, right| {
left.index_uid
.cmp(&right.index_uid)
Expand Down
16 changes: 8 additions & 8 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ impl IndexingScheduler {
);
if let Some(last_applied_plan) = &self.state.last_applied_physical_plan {
let plans_diff = get_indexing_plans_diff(
last_applied_plan.indexing_tasks_per_node(),
new_physical_plan.indexing_tasks_per_node(),
last_applied_plan.indexing_tasks_per_indexer(),
new_physical_plan.indexing_tasks_per_indexer(),
);
// No need to apply the new plan as it is the same as the old one.
if plans_diff.is_empty() {
Expand Down Expand Up @@ -259,7 +259,7 @@ impl IndexingScheduler {

let indexing_plans_diff = get_indexing_plans_diff(
&running_indexing_tasks_by_node_id,
last_applied_plan.indexing_tasks_per_node(),
last_applied_plan.indexing_tasks_per_indexer(),
);
if !indexing_plans_diff.has_same_nodes() {
info!(plans_diff=?indexing_plans_diff, "Running plan and last applied plan node IDs differ: schedule an indexing plan.");
Expand All @@ -281,7 +281,7 @@ impl IndexingScheduler {
new_physical_plan: PhysicalIndexingPlan,
) {
debug!("Apply physical indexing plan: {:?}", new_physical_plan);
for (node_id, indexing_tasks) in new_physical_plan.indexing_tasks_per_node() {
for (node_id, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer() {
// We don't want to block on a slow indexer so we apply this change asynchronously
// TODO not blocking is cool, but we need to make sure there is not accumulation
// possible here.
Expand Down Expand Up @@ -751,10 +751,10 @@ mod tests {
indexer_max_loads.insert("indexer1".to_string(), 3_000);
indexer_max_loads.insert("indexer2".to_string(), 3_000);
let physical_plan = build_physical_indexing_plan(&sources[..], &indexer_max_loads, None);
assert_eq!(physical_plan.indexing_tasks_per_node().len(), 2);
let indexing_tasks_1 = physical_plan.node("indexer1").unwrap();
assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 2);
let indexing_tasks_1 = physical_plan.indexer("indexer1").unwrap();
assert_eq!(indexing_tasks_1.len(), 2);
let indexer_2_tasks = physical_plan.node("indexer2").unwrap();
let indexer_2_tasks = physical_plan.indexer("indexer2").unwrap();
assert_eq!(indexer_2_tasks.len(), 3);
}

Expand Down Expand Up @@ -785,7 +785,7 @@ mod tests {
.iter()
.map(|source| (&source.source_uid, source))
.collect();
for (node_id, tasks) in physical_indexing_plan.indexing_tasks_per_node() {
for (node_id, tasks) in physical_indexing_plan.indexing_tasks_per_indexer() {
let mut load_in_node = 0u32;
for task in tasks {
let source_uid = SourceUid {
Expand Down
Loading

0 comments on commit c7303e3

Please sign in to comment.