Skip to content

Commit

Permalink
improve placing algorithm (#5051)
Browse files Browse the repository at this point in the history
* improve split cost estimate

* change placing algorithm

* add tests

* improve split cost heuristic and refactor job placing
  • Loading branch information
trinity-1686a authored Jun 11, 2024
1 parent 2bf586e commit 48c55cf
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 12 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ pub fn mock_split_meta(split_id: &str, index_uid: &IndexUid) -> SplitMetadata {
index_uid: index_uid.clone(),
split_id: split_id.to_string(),
partition_id: 13u64,
num_docs: 10,
num_docs: if split_id == "split1" { 1_000_000 } else { 10 },
uncompressed_docs_size_in_bytes: 256,
time_range: Some(121000..=130198),
create_timestamp: 0,
Expand Down
7 changes: 4 additions & 3 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1406,9 +1406,10 @@ async fn assign_client_fetch_docs_jobs(
}

// Measure the cost associated to searching in a given split metadata.
fn compute_split_cost(_split_metadata: &SplitMetadata) -> usize {
// TODO: Have a smarter cost, by smoothing the number of docs.
1
fn compute_split_cost(split_metadata: &SplitMetadata) -> usize {
// TODO this formula could be tuned a lot more. The general idea is that there is a fixed
// cost to searching a split, plus a somewhat-linear cost depending on the size of the split
5 + split_metadata.num_docs / 100_000
}

/// Builds a LeafSearchRequest to one node, from a list of [`SearchJob`].
Expand Down
87 changes: 79 additions & 8 deletions quickwit/quickwit-search/src/search_job_placer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use async_trait::async_trait;
use quickwit_common::pubsub::EventSubscriber;
use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash};
use quickwit_proto::search::{ReportSplit, ReportSplitsRequest};
use tracing::warn;

use crate::{SearchJob, SearchServiceClient, SearcherPool, SEARCH_METRICS};

Expand Down Expand Up @@ -177,13 +178,29 @@ impl SearchJobPlacer {
let mut job_assignments: HashMap<SocketAddr, (SearchServiceClient, Vec<J>)> =
HashMap::with_capacity(num_nodes);

let total_load: usize = jobs.iter().map(|job| job.cost()).sum();

// allow arround 5% disparity. Round up so we never end up in a case where
// target_load * num_nodes < total_load
// some of our tests needs 2 splits to be put on 2 different searchers. It makes sens for
// these tests to keep doing so (testing root merge). Either we can make the allowed
// difference stricter, find the right split names ("split6" instead of "split2" works).
// or modify mock_split_meta() so that not all splits have the same job cost
// for now i went with the mock_split_meta() changes.
const ALLOWED_DIFFERENCE: usize = 105;
let target_load = (total_load * ALLOWED_DIFFERENCE).div_ceil(num_nodes * 100);
for job in jobs {
sort_by_rendez_vous_hash(&mut candidate_nodes, job.split_id());
// Select the least loaded node.
let chosen_node_idx = if candidate_nodes.len() >= 2 {
usize::from(candidate_nodes[0].load > candidate_nodes[1].load)

let (chosen_node_idx, chosen_node) = if let Some((idx, node)) = candidate_nodes
.iter_mut()
.enumerate()
.find(|(_pos, node)| node.load < target_load)
{
(idx, node)
} else {
0
warn!("found no lightly loaded searcher for split, this should never happen");
(0, &mut candidate_nodes[0])
};
let metric_node_idx = match chosen_node_idx {
0 => "0",
Expand All @@ -194,8 +211,6 @@ impl SearchJobPlacer {
.job_assigned_total
.with_label_values([metric_node_idx])
.inc();

let chosen_node = &mut candidate_nodes[chosen_node_idx];
chosen_node.load += job.cost();

job_assignments
Expand Down Expand Up @@ -406,19 +421,75 @@ mod tests {
vec![
SearchJob::for_test("split5", 5),
SearchJob::for_test("split4", 4),
SearchJob::for_test("split2", 2),
SearchJob::for_test("split3", 3),
],
),
(
expected_searcher_addr_2,
vec![
SearchJob::for_test("split6", 6),
SearchJob::for_test("split3", 3),
SearchJob::for_test("split2", 2),
SearchJob::for_test("split1", 1),
],
),
];
assert_eq!(assigned_jobs, expected_assigned_jobs);
}
{
let searcher_pool = searcher_pool_for_test([
("127.0.0.1:1001", MockSearchService::new()),
("127.0.0.1:1002", MockSearchService::new()),
]);
let search_job_placer = SearchJobPlacer::new(searcher_pool);
let jobs = vec![
SearchJob::for_test("split1", 1000),
SearchJob::for_test("split2", 1),
];
let mut assigned_jobs: Vec<(SocketAddr, Vec<SearchJob>)> = search_job_placer
.assign_jobs(jobs, &HashSet::default())
.await
.unwrap()
.map(|(client, jobs)| (client.grpc_addr(), jobs))
.collect();
assigned_jobs.sort_unstable_by_key(|(node_uid, _)| *node_uid);

let expected_searcher_addr_1: SocketAddr = ([127, 0, 0, 1], 1001).into();
let expected_searcher_addr_2: SocketAddr = ([127, 0, 0, 1], 1002).into();
let expected_assigned_jobs = vec![
(
expected_searcher_addr_1,
vec![SearchJob::for_test("split1", 1000)],
),
(
expected_searcher_addr_2,
vec![SearchJob::for_test("split2", 1)],
),
];
assert_eq!(assigned_jobs, expected_assigned_jobs);
}
}

#[tokio::test]
async fn test_search_job_placer_many_splits() {
let searcher_pool = searcher_pool_for_test([
("127.0.0.1:1001", MockSearchService::new()),
("127.0.0.1:1002", MockSearchService::new()),
("127.0.0.1:1003", MockSearchService::new()),
("127.0.0.1:1004", MockSearchService::new()),
("127.0.0.1:1005", MockSearchService::new()),
]);
let search_job_placer = SearchJobPlacer::new(searcher_pool);
let jobs = (0..1000)
.map(|id| SearchJob::for_test(&format!("split{id}"), 1))
.collect();
let jobs_len: Vec<usize> = search_job_placer
.assign_jobs(jobs, &HashSet::default())
.await
.unwrap()
.map(|(_, jobs)| jobs.len())
.collect();
for job_len in jobs_len {
assert!(job_len <= 1050 / 5);
}
}
}

0 comments on commit 48c55cf

Please sign in to comment.