From d1022d62321fb7f122903fbe9153955c2b265908 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 5 Jul 2024 10:34:37 +0900 Subject: [PATCH] Revert "Consider ingesters returning ResourceExhausted temporarily unavailable (#5155)" (#5191) This reverts commit de2e1503e40f5fa94ca879e8427f4f16bd97daae. --- .../quickwit-ingest/src/ingest_v2/ingester.rs | 26 +- .../quickwit-ingest/src/ingest_v2/router.rs | 309 ++++++------------ .../src/ingest_v2/routing_table.rs | 68 +--- 3 files changed, 111 insertions(+), 292 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 17b03b7864e..163a9f5cab4 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -920,17 +920,6 @@ impl Ingester { if state_guard.status() != IngesterStatus::Ready { return Err(IngestV2Error::Internal("node decommissioned".to_string())); } - // If the WAL disk usage is too high, we reject the request. - let wal_usage = state_guard.mrecordlog.resource_usage(); - report_wal_usage(wal_usage); - - let disk_used = wal_usage.disk_used_bytes as u64; - - if disk_used >= self.disk_capacity.as_u64() * 95 / 100 { - return Err(IngestV2Error::Internal( - "WAL disk usage too high".to_string(), - )); - } let mut successes = Vec::with_capacity(init_shards_request.subrequests.len()); let mut failures = Vec::new(); let now = Instant::now(); @@ -1692,10 +1681,7 @@ mod tests { doc_mapping_json, }], }; - let response = ingester - .init_shards(init_shards_request.clone()) - .await - .unwrap(); + let response = ingester.init_shards(init_shards_request).await.unwrap(); assert_eq!(response.successes.len(), 1); assert_eq!(response.failures.len(), 0); @@ -1703,7 +1689,7 @@ mod tests { assert_eq!(init_shard_success.subrequest_id, 0); assert_eq!(init_shard_success.shard, Some(shard)); - let mut state_guard = ingester.state.lock_fully().await.unwrap(); + let state_guard = ingester.state.lock_fully().await.unwrap(); let queue_id = queue_id(&index_uid, "test-source", &ShardId::from(1)); let shard = state_guard.shards.get(&queue_id).unwrap(); @@ -1714,14 +1700,6 @@ mod tests { assert!(state_guard.rate_trackers.contains_key(&queue_id)); assert!(state_guard.mrecordlog.queue_exists(&queue_id)); - - state_guard.set_status(IngesterStatus::Decommissioned); - drop(state_guard); - - let error = ingester.init_shards(init_shards_request).await.unwrap_err(); - assert!( - matches!(error, IngestV2Error::Internal(message) if message.contains("decommissioned")) - ); } #[tokio::test] diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 32f6cbd3960..4eaf7803efb 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt; use std::sync::{Arc, OnceLock, Weak}; -use std::time::{Duration, Instant}; +use std::time::Duration; use async_trait::async_trait; use futures::stream::FuturesUnordered; @@ -34,8 +34,7 @@ use quickwit_proto::control_plane::{ }; use quickwit_proto::indexing::ShardPositionsUpdate; use quickwit_proto::ingest::ingester::{ - IngesterService, PersistFailure, PersistFailureReason, PersistRequest, PersistResponse, - PersistSubrequest, + IngesterService, PersistFailureReason, PersistRequest, PersistResponse, PersistSubrequest, }; use quickwit_proto::ingest::router::{IngestRequestV2, IngestResponseV2, IngestRouterService}; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState}; @@ -71,9 +70,6 @@ fn ingest_request_timeout() -> Duration { }) } -const RESOURCE_EXHAUSTED_LEADER_EXCLUSION_TIMEOUT: Duration = - Duration::from_millis(if cfg!(test) { 100 } else { 500 }); - const MAX_PERSIST_ATTEMPTS: usize = 5; type PersistResult = (PersistRequestSummary, IngestV2Result); @@ -94,9 +90,6 @@ struct RouterState { debouncer: GetOrCreateOpenShardsRequestDebouncer, // Holds the routing table mapping index and source IDs to shards. routing_table: RoutingTable, - // Tracks the leaders that recently returned a `ResourceExhausted` failure and stores the - // deadline after which they are considered available again. - resource_exhausted_leaders: HashMap, } impl fmt::Debug for IngestRouter { @@ -119,9 +112,8 @@ impl IngestRouter { debouncer: GetOrCreateOpenShardsRequestDebouncer::default(), routing_table: RoutingTable { self_node_id: self_node_id.clone(), - table: HashMap::new(), + table: HashMap::default(), }, - resource_exhausted_leaders: HashMap::new(), })); let ingest_semaphore_permits = get_ingest_router_buffer_size().as_u64() as usize; let ingest_semaphore = Arc::new(Semaphore::new(ingest_semaphore_permits)); @@ -161,14 +153,6 @@ impl IngestRouter { let mut state_guard = self.state.lock().await; - // Evict the leaders that should be considered available again. - if !state_guard.resource_exhausted_leaders.is_empty() { - let now = Instant::now(); - - state_guard - .resource_exhausted_leaders - .retain(|_leader_id, deadline| now < *deadline); - } for subrequest in workbench.subworkbenches.values().filter_map(|subworbench| { if subworbench.is_pending() { Some(&subworbench.subrequest) @@ -182,7 +166,6 @@ impl IngestRouter { ingester_pool, &mut debounced_request.closed_shards, unavailable_leaders, - &state_guard.resource_exhausted_leaders, ) { let acquire_result = state_guard .debouncer @@ -203,25 +186,12 @@ impl IngestRouter { } } } - if debounced_request.is_empty() { - return debounced_request; - } - // Add the resource exhausted leaders to the list of unavailable leaders. It will force the - // control plane to open new shards on other ingesters. - if !state_guard.resource_exhausted_leaders.is_empty() { - for resource_exhausted_leader in state_guard.resource_exhausted_leaders.keys() { - debounced_request - .unavailable_leaders - .push(resource_exhausted_leader.to_string()); - } - info!(resource_exhausted_leaders=?debounced_request.unavailable_leaders , "reporting out of resources leader(s) to control plane"); - } drop(state_guard); - if !debounced_request.closed_shards.is_empty() { + if !debounced_request.is_empty() && !debounced_request.closed_shards.is_empty() { info!(closed_shards=?debounced_request.closed_shards, "reporting closed shard(s) to control plane"); } - if !unavailable_leaders.is_empty() { + if !debounced_request.is_empty() && !unavailable_leaders.is_empty() { info!(unvailable_leaders=?unavailable_leaders, "reporting unavailable leader(s) to control plane"); for unavailable_leader in unavailable_leaders.iter() { @@ -290,12 +260,13 @@ impl IngestRouter { } } - async fn process_batch_persist_results( + async fn process_persist_results( &self, workbench: &mut IngestWorkbench, mut persist_futures: FuturesUnordered>, - ) -> BatchPersistOutcome { - let mut outcome = BatchPersistOutcome::default(); + ) { + let mut closed_shards: HashMap<(IndexUid, SourceId), Vec> = HashMap::new(); + let mut deleted_shards: HashMap<(IndexUid, SourceId), Vec> = HashMap::new(); while let Some((persist_summary, persist_result)) = persist_futures.next().await { match persist_result { @@ -304,8 +275,25 @@ impl IngestRouter { workbench.record_persist_success(persist_success); } for persist_failure in persist_response.failures { - outcome.record_persist_failure(&persist_failure, &persist_summary); workbench.record_persist_failure(&persist_failure); + + if persist_failure.reason() == PersistFailureReason::ShardClosed { + let shard_id = persist_failure.shard_id().clone(); + let index_uid: IndexUid = persist_failure.index_uid().clone(); + let source_id: SourceId = persist_failure.source_id; + closed_shards + .entry((index_uid, source_id)) + .or_default() + .push(shard_id); + } else if persist_failure.reason() == PersistFailureReason::ShardNotFound { + let shard_id = persist_failure.shard_id().clone(); + let index_uid: IndexUid = persist_failure.index_uid().clone(); + let source_id: SourceId = persist_failure.source_id; + deleted_shards + .entry((index_uid, source_id)) + .or_default() + .push(shard_id); + } } } Err(persist_error) => { @@ -326,32 +314,18 @@ impl IngestRouter { } }; } - outcome - } - - async fn process_batch_persist_outcome(&self, outcome: BatchPersistOutcome) { - if outcome.is_empty() { - return; - } - let mut state_guard = self.state.lock().await; + if !closed_shards.is_empty() || !deleted_shards.is_empty() { + let mut state_guard = self.state.lock().await; - for ((index_uid, source_id), shard_ids) in outcome.closed_shards { - state_guard - .routing_table - .close_shards(&index_uid, &source_id, &shard_ids); - } - for ((index_uid, source_id), shard_ids) in outcome.deleted_shards { - state_guard - .routing_table - .delete_shards(&index_uid, &source_id, &shard_ids); - } - if !outcome.resource_exhausted_leaders.is_empty() { - let deadline = Instant::now() + RESOURCE_EXHAUSTED_LEADER_EXCLUSION_TIMEOUT; - - for leader_id in outcome.resource_exhausted_leaders { + for ((index_uid, source_id), shard_ids) in closed_shards { + state_guard + .routing_table + .close_shards(&index_uid, source_id, &shard_ids); + } + for ((index_uid, source_id), shard_ids) in deleted_shards { state_guard - .resource_exhausted_leaders - .insert(leader_id, deadline); + .routing_table + .delete_shards(&index_uid, source_id, &shard_ids); } } } @@ -372,16 +346,15 @@ impl IngestRouter { let state_guard = self.state.lock().await; + // TODO: Here would be the most optimal place to split the body of the HTTP request into + // lines, validate, transform and then pack the docs into compressed batches routed + // to the right shards. + for subrequest in workbench.pending_subrequests() { let Some(shard) = state_guard .routing_table .find_entry(&subrequest.index_id, &subrequest.source_id) - .and_then(|entry| { - entry.next_open_shard_round_robin( - &self.ingester_pool, - &state_guard.resource_exhausted_leaders, - ) - }) + .and_then(|entry| entry.next_open_shard_round_robin(&self.ingester_pool)) else { no_shards_available_subrequest_ids.push(subrequest.subrequest_id); continue; @@ -441,11 +414,8 @@ impl IngestRouter { for subrequest_id in no_shards_available_subrequest_ids { workbench.record_no_shards_available(subrequest_id); } - let outcome = self - .process_batch_persist_results(workbench, persist_futures) + self.process_persist_results(workbench, persist_futures) .await; - - self.process_batch_persist_outcome(outcome).await; } async fn retry_batch_persist( @@ -580,56 +550,6 @@ pub(super) struct PersistRequestSummary { pub subrequest_ids: Vec, } -/// Tracks the outcome of a batch persist operation. -#[derive(Default)] -pub(super) struct BatchPersistOutcome { - pub closed_shards: HashMap<(IndexUid, SourceId), Vec>, - pub deleted_shards: HashMap<(IndexUid, SourceId), Vec>, - pub resource_exhausted_leaders: HashSet, -} - -impl BatchPersistOutcome { - fn is_empty(&self) -> bool { - self.closed_shards.is_empty() - && self.deleted_shards.is_empty() - && self.resource_exhausted_leaders.is_empty() - } - - fn record_persist_failure( - &mut self, - persist_failure: &PersistFailure, - persist_summary: &PersistRequestSummary, - ) { - match persist_failure.reason() { - PersistFailureReason::ResourceExhausted => { - self.resource_exhausted_leaders - .insert(persist_summary.leader_id.clone()); - } - PersistFailureReason::ShardClosed => { - let shard_id = persist_failure.shard_id().clone(); - let index_uid = persist_failure.index_uid().clone(); - let source_id = persist_failure.source_id.clone(); - - self.closed_shards - .entry((index_uid, source_id)) - .or_default() - .push(shard_id); - } - PersistFailureReason::ShardNotFound => { - let shard_id = persist_failure.shard_id().clone(); - let index_uid = persist_failure.index_uid().clone(); - let source_id = persist_failure.source_id.clone(); - - self.deleted_shards - .entry((index_uid, source_id)) - .or_default() - .push(shard_id); - } - _ => {} - } - } -} - #[cfg(test)] mod tests { use std::collections::BTreeSet; @@ -702,10 +622,6 @@ mod tests { ..Default::default() }, ); - state_guard.resource_exhausted_leaders.insert( - "test-ingester-1".into(), - Instant::now() + RESOURCE_EXHAUSTED_LEADER_EXCLUSION_TIMEOUT, - ); drop(state_guard); let ingest_subrequests: Vec = vec![ @@ -753,20 +669,14 @@ mod tests { ); assert_eq!( get_or_create_open_shard_request.unavailable_leaders.len(), - 2 + 1 ); assert_eq!( get_or_create_open_shard_request.unavailable_leaders[0], - "test-ingester-1" - ); - assert_eq!( - get_or_create_open_shard_request.unavailable_leaders[1], "test-ingester-0" ); assert_eq!(workbench.unavailable_leaders.len(), 1); - tokio::time::sleep(RESOURCE_EXHAUSTED_LEADER_EXCLUSION_TIMEOUT * 2).await; - let (get_or_create_open_shard_request_opt, rendezvous_2) = router .make_get_or_create_open_shard_request(&mut workbench, &ingester_pool) .await @@ -1103,7 +1013,7 @@ mod tests { } #[tokio::test] - async fn test_router_process_batch_persist_results_records_persist_successes() { + async fn test_router_process_persist_results_record_persist_successes() { let self_node_id = "test-router".into(); let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()); let ingester_pool = IngesterPool::default(); @@ -1142,10 +1052,9 @@ mod tests { }); (persist_summary, persist_result) }); - let outcome = router - .process_batch_persist_results(&mut workbench, persist_futures) + router + .process_persist_results(&mut workbench, persist_futures) .await; - assert!(outcome.is_empty()); let subworkbench = workbench.subworkbenches.get(&0).unwrap(); assert!(matches!( @@ -1155,7 +1064,7 @@ mod tests { } #[tokio::test] - async fn test_router_process_batch_persist_results_records_persist_failures() { + async fn test_router_process_persist_results_record_persist_failures() { let self_node_id = "test-router".into(); let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()); let ingester_pool = IngesterPool::default(); @@ -1184,64 +1093,21 @@ mod tests { let persist_result = Ok::<_, IngestV2Error>(PersistResponse { leader_id: "test-ingester-0".to_string(), successes: Vec::new(), - failures: vec![ - PersistFailure { - subrequest_id: 0, - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - reason: PersistFailureReason::RateLimited as i32, - }, - PersistFailure { - subrequest_id: 1, - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(2)), - reason: PersistFailureReason::ResourceExhausted as i32, - }, - PersistFailure { - subrequest_id: 1, - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(3)), - reason: PersistFailureReason::ShardClosed as i32, - }, - PersistFailure { - subrequest_id: 1, - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(4)), - reason: PersistFailureReason::ShardNotFound as i32, - }, - ], + failures: vec![PersistFailure { + subrequest_id: 0, + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + reason: PersistFailureReason::RateLimited as i32, + }], }); (persist_summary, persist_result) }); - let outcome = router - .process_batch_persist_results(&mut workbench, persist_futures) + router + .process_persist_results(&mut workbench, persist_futures) .await; - let source_uid = ( - IndexUid::for_test("test-index-0", 0), - "test-source".to_string(), - ); - assert_eq!(outcome.closed_shards.len(), 1); - let closed_shards = outcome.closed_shards.get(&source_uid).unwrap(); - assert_eq!(closed_shards.len(), 1); - assert_eq!(closed_shards[0], ShardId::from(3)); - - assert_eq!(outcome.deleted_shards.len(), 1); - let deleted_shards = outcome.deleted_shards.get(&source_uid).unwrap(); - assert_eq!(deleted_shards.len(), 1); - assert_eq!(deleted_shards[0], ShardId::from(4)); - - assert_eq!(outcome.resource_exhausted_leaders.len(), 1); - assert!(outcome - .resource_exhausted_leaders - .contains("test-ingester-0")); - let subworkbench = workbench.subworkbenches.get(&0).unwrap(); - assert!(matches!( subworkbench.last_failure_opt, Some(SubworkbenchFailure::Persist { .. }) @@ -1249,7 +1115,7 @@ mod tests { } #[tokio::test] - async fn test_router_process_batch_persist_outcome() { + async fn test_router_process_persist_results_closes_and_deletes_shards() { let self_node_id = "test-router".into(); let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()); let ingester_pool = IngesterPool::default(); @@ -1284,23 +1150,41 @@ mod tests { ); drop(state_guard); - let mut batch_outcome = BatchPersistOutcome::default(); - batch_outcome.closed_shards.insert( - (index_uid.clone(), "test-source".to_string()), - vec![ShardId::from(1)], - ); - batch_outcome.deleted_shards.insert( - (index_uid.clone(), "test-source".to_string()), - vec![ShardId::from(2)], - ); - batch_outcome - .resource_exhausted_leaders - .insert("test-ingester-0".into()); + let mut workbench = IngestWorkbench::new(Vec::new(), 2); + let persist_futures = FuturesUnordered::new(); - router.process_batch_persist_outcome(batch_outcome).await; + persist_futures.push(async { + let persist_summary = PersistRequestSummary { + leader_id: "test-ingester-0".into(), + subrequest_ids: vec![0], + }; + let persist_result = Ok::<_, IngestV2Error>(PersistResponse { + leader_id: "test-ingester-0".to_string(), + successes: Vec::new(), + failures: vec![ + PersistFailure { + subrequest_id: 0, + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + reason: PersistFailureReason::ShardNotFound as i32, + }, + PersistFailure { + subrequest_id: 1, + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(2)), + reason: PersistFailureReason::ShardClosed as i32, + }, + ], + }); + (persist_summary, persist_result) + }); + router + .process_persist_results(&mut workbench, persist_futures) + .await; let state_guard = router.state.lock().await; - let routing_table_entry = state_guard .routing_table .find_entry("test-index-0", "test-source") @@ -1308,17 +1192,12 @@ mod tests { assert_eq!(routing_table_entry.len(), 1); let shard = routing_table_entry.all_shards()[0]; - assert_eq!(shard.shard_id, ShardId::from(1)); + assert_eq!(shard.shard_id, ShardId::from(2)); assert_eq!(shard.shard_state, ShardState::Closed); - - assert_eq!(state_guard.resource_exhausted_leaders.len(), 1); - assert!(state_guard - .resource_exhausted_leaders - .contains_key(&NodeId::from("test-ingester-0"))); } #[tokio::test] - async fn test_router_process_batch_persist_results_does_not_remove_unavailable_leaders() { + async fn test_router_process_persist_results_does_not_remove_unavailable_leaders() { let self_node_id = "test-router".into(); let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()); @@ -1360,7 +1239,7 @@ mod tests { (persist_summary, persist_result) }); router - .process_batch_persist_results(&mut workbench, persist_futures) + .process_persist_results(&mut workbench, persist_futures) .await; let subworkbench = workbench.subworkbenches.get(&0).unwrap(); @@ -1383,7 +1262,7 @@ mod tests { (persist_summary, persist_result) }); router - .process_batch_persist_results(&mut workbench, persist_futures) + .process_persist_results(&mut workbench, persist_futures) .await; // We do not remove the leader from the pool. diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index 78e54f9703d..7c4ed6f7fed 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -20,7 +20,6 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::Instant; use quickwit_proto::ingest::{Shard, ShardIds, ShardState}; use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId}; @@ -116,7 +115,6 @@ impl RoutingTableEntry { ingester_pool: &IngesterPool, closed_shard_ids: &mut Vec, unavailable_leaders: &mut HashSet, - resource_exhausted_leaders: &HashMap, ) -> bool { let shards = self.local_shards.iter().chain(self.remote_shards.iter()); @@ -133,9 +131,6 @@ impl RoutingTableEntry { if unavailable_leaders.contains(&shard.leader_id) { continue; } - if resource_exhausted_leaders.contains_key(&shard.leader_id) { - continue; - } if ingester_pool.contains_key(&shard.leader_id) { return true; } else { @@ -152,7 +147,6 @@ impl RoutingTableEntry { pub fn next_open_shard_round_robin( &self, ingester_pool: &IngesterPool, - resource_exhausted_leaders: &HashMap, ) -> Option<&RoutingEntry> { for (shards, round_robin_idx) in [ (&self.local_shards, &self.local_round_robin_idx), @@ -165,10 +159,7 @@ impl RoutingTableEntry { let shard_idx = round_robin_idx.fetch_add(1, Ordering::Relaxed); let shard = &shards[shard_idx % shards.len()]; - if shard.shard_state.is_open() - && ingester_pool.contains_key(&shard.leader_id) - && !resource_exhausted_leaders.contains_key(&shard.leader_id) - { + if shard.shard_state.is_open() && ingester_pool.contains_key(&shard.leader_id) { return Some(shard); } } @@ -363,19 +354,14 @@ impl RoutingTable { ingester_pool: &IngesterPool, closed_shards: &mut Vec, unavailable_leaders: &mut HashSet, - resource_exhausted_leaders: &HashMap, ) -> bool { let Some(entry) = self.find_entry(index_id, source_id) else { return false; }; let mut closed_shard_ids: Vec = Vec::new(); - let result = entry.has_open_shards( - ingester_pool, - &mut closed_shard_ids, - unavailable_leaders, - resource_exhausted_leaders, - ); + let result = + entry.has_open_shards(ingester_pool, &mut closed_shard_ids, unavailable_leaders); if !closed_shard_ids.is_empty() { closed_shards.push(ShardIds { @@ -572,19 +558,17 @@ mod tests { #[test] fn test_routing_table_entry_has_open_shards() { let index_uid = IndexUid::for_test("test-index", 0); - let source_id = SourceId::from("test-source"); + let source_id: SourceId = "test-source".into(); let table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone()); let mut closed_shard_ids = Vec::new(); let ingester_pool = IngesterPool::default(); let mut unavailable_leaders = HashSet::new(); - let mut resource_exhausted_leaders = HashMap::new(); assert!(!table_entry.has_open_shards( &ingester_pool, &mut closed_shard_ids, - &mut unavailable_leaders, - &resource_exhausted_leaders + &mut unavailable_leaders )); assert!(closed_shard_ids.is_empty()); assert!(unavailable_leaders.is_empty()); @@ -618,8 +602,7 @@ mod tests { assert!(table_entry.has_open_shards( &ingester_pool, &mut closed_shard_ids, - &mut unavailable_leaders, - &resource_exhausted_leaders + &mut unavailable_leaders )); assert_eq!(closed_shard_ids.len(), 1); assert_eq!(closed_shard_ids[0], ShardId::from(1)); @@ -660,22 +643,12 @@ mod tests { assert!(table_entry.has_open_shards( &ingester_pool, &mut closed_shard_ids, - &mut unavailable_leaders, - &resource_exhausted_leaders + &mut unavailable_leaders )); assert_eq!(closed_shard_ids.len(), 1); assert_eq!(closed_shard_ids[0], ShardId::from(1)); assert_eq!(unavailable_leaders.len(), 1); assert!(unavailable_leaders.contains("test-ingester-2")); - - resource_exhausted_leaders.insert(NodeId::from("test-ingester-1"), Instant::now()); - - assert!(!table_entry.has_open_shards( - &ingester_pool, - &mut closed_shard_ids, - &mut unavailable_leaders, - &resource_exhausted_leaders - )); } #[test] @@ -684,10 +657,8 @@ mod tests { let source_id: SourceId = "test-source".into(); let table_entry = RoutingTableEntry::empty(index_uid.clone(), source_id.clone()); let ingester_pool = IngesterPool::default(); - let mut resource_exhausted_leaders = HashMap::new(); - let shard_opt = - table_entry.next_open_shard_round_robin(&ingester_pool, &resource_exhausted_leaders); + let shard_opt = table_entry.next_open_shard_round_robin(&ingester_pool); assert!(shard_opt.is_none()); ingester_pool.insert("test-ingester-0".into(), IngesterServiceClient::mocked()); @@ -716,7 +687,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: ShardId::from(3), shard_state: ShardState::Open, - leader_id: "test-ingester-1".into(), + leader_id: "test-ingester-0".into(), }, ], local_round_robin_idx: AtomicUsize::default(), @@ -724,29 +695,20 @@ mod tests { remote_round_robin_idx: AtomicUsize::default(), }; let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &resource_exhausted_leaders) + .next_open_shard_round_robin(&ingester_pool) .unwrap(); assert_eq!(shard.shard_id, ShardId::from(2)); let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &resource_exhausted_leaders) + .next_open_shard_round_robin(&ingester_pool) .unwrap(); assert_eq!(shard.shard_id, ShardId::from(3)); let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &resource_exhausted_leaders) - .unwrap(); - assert_eq!(shard.shard_id, ShardId::from(2)); - - resource_exhausted_leaders.insert(NodeId::from("test-ingester-1"), Instant::now()); - - let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &resource_exhausted_leaders) + .next_open_shard_round_robin(&ingester_pool) .unwrap(); assert_eq!(shard.shard_id, ShardId::from(2)); - resource_exhausted_leaders.clear(); - let table_entry = RoutingTableEntry { index_uid: index_uid.clone(), source_id: source_id.clone(), @@ -791,17 +753,17 @@ mod tests { remote_round_robin_idx: AtomicUsize::default(), }; let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &resource_exhausted_leaders) + .next_open_shard_round_robin(&ingester_pool) .unwrap(); assert_eq!(shard.shard_id, ShardId::from(2)); let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &resource_exhausted_leaders) + .next_open_shard_round_robin(&ingester_pool) .unwrap(); assert_eq!(shard.shard_id, ShardId::from(5)); let shard = table_entry - .next_open_shard_round_robin(&ingester_pool, &resource_exhausted_leaders) + .next_open_shard_round_robin(&ingester_pool) .unwrap(); assert_eq!(shard.shard_id, ShardId::from(2)); }