Skip to content

Commit

Permalink
Issue/4009 shard locality (#4863)
Browse files Browse the repository at this point in the history
* Added shard locality in the control plane pipeline assignment logic.

Closes #4009

* Apply suggestions from code review

Co-authored-by: Adrien Guillo <[email protected]>

---------

Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
fulmicoton and guilload authored Apr 23, 2024
1 parent cfa2191 commit 6fd8dc8
Show file tree
Hide file tree
Showing 12 changed files with 803 additions and 190 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ serde_qs = { version = "0.12", features = ["warp"] }
serde_with = "3.6.0"
serde_yaml = "0.9"
siphasher = "0.3"
smallvec = "1"
sqlx = { version = "0.7", features = [
"migrate",
"postgres",
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ once_cell = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
smallvec = { workspace = true }
time = { workspace = true }
tokio = { workspace = true }
tower = { workspace = true }
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-control-plane/src/indexing_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ impl PhysicalIndexingPlan {
&self.indexing_tasks_per_indexer_id
}

pub fn num_indexers(&self) -> usize {
self.indexing_tasks_per_indexer_id.len()
}

/// Returns the hashmap of (indexer ID, indexing tasks).
pub fn indexing_tasks_per_indexer_mut(&mut self) -> &mut FnvHashMap<String, Vec<IndexingTask>> {
&mut self.indexing_tasks_per_indexer_id
Expand Down
43 changes: 40 additions & 3 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use tracing::{debug, info, warn};
use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::change_tracker::{NotifyChangeOnDrop, RebuildNotifier};
use crate::indexing_scheduler::scheduling::build_physical_indexing_plan;
use crate::model::ControlPlaneModel;
use crate::metrics::ShardLocalityMetrics;
use crate::model::{ControlPlaneModel, ShardLocations};
use crate::{IndexerNodeInfo, IndexerPool};

pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration =
Expand Down Expand Up @@ -231,11 +232,16 @@ impl IndexingScheduler {
return;
};

let shard_locations = model.shard_locations();
let new_physical_plan = build_physical_indexing_plan(
&sources,
&indexer_id_to_cpu_capacities,
self.state.last_applied_physical_plan.as_ref(),
&shard_locations,
);
let shard_locality_metrics =
get_shard_locality_metrics(&new_physical_plan, &shard_locations);
crate::metrics::CONTROL_PLANE_METRICS.set_shard_locality_metrics(shard_locality_metrics);
if let Some(last_applied_plan) = &self.state.last_applied_physical_plan {
let plans_diff = get_indexing_plans_diff(
last_applied_plan.indexing_tasks_per_indexer(),
Expand Down Expand Up @@ -370,6 +376,33 @@ impl<'a> IndexingPlansDiff<'a> {
}
}

fn get_shard_locality_metrics(
physical_plan: &PhysicalIndexingPlan,
shard_locations: &ShardLocations,
) -> ShardLocalityMetrics {
let mut num_local_shards = 0;
let mut num_remote_shards = 0;
for (indexer, tasks) in physical_plan.indexing_tasks_per_indexer() {
for task in tasks {
for shard_id in &task.shard_ids {
if shard_locations
.get_shard_locations(shard_id)
.iter()
.any(|node| node.as_str() == indexer)
{
num_local_shards += 1;
} else {
num_remote_shards += 1;
}
}
}
}
ShardLocalityMetrics {
num_remote_shards,
num_local_shards,
}
}

impl<'a> fmt::Debug for IndexingPlansDiff<'a> {
fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.has_same_nodes() && self.has_same_tasks() {
Expand Down Expand Up @@ -510,6 +543,7 @@ mod tests {
use quickwit_proto::types::{IndexUid, PipelineUid, SourceUid};

use super::*;
use crate::model::ShardLocations;
#[test]
fn test_indexing_plans_diff() {
let index_uid = IndexUid::from_str("index-1:11111111111111111111111111").unwrap();
Expand Down Expand Up @@ -798,7 +832,9 @@ mod tests {
let mut indexer_max_loads = FnvHashMap::default();
indexer_max_loads.insert("indexer1".to_string(), mcpu(3_000));
indexer_max_loads.insert("indexer2".to_string(), mcpu(3_000));
let physical_plan = build_physical_indexing_plan(&sources[..], &indexer_max_loads, None);
let shard_locations = ShardLocations::default();
let physical_plan =
build_physical_indexing_plan(&sources[..], &indexer_max_loads, None, &shard_locations);
assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 2);
let indexing_tasks_1 = physical_plan.indexer("indexer1").unwrap();
assert_eq!(indexing_tasks_1.len(), 2);
Expand Down Expand Up @@ -828,7 +864,8 @@ mod tests {
let indexer_id = format!("indexer-{i}");
indexer_max_loads.insert(indexer_id, mcpu(4_000));
}
let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None);
let shard_locations = ShardLocations::default();
let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations);
}
}

Expand Down
Loading

0 comments on commit 6fd8dc8

Please sign in to comment.