From b3849c8c37c94acaddbd80e9bf478aa8c2d849bf Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 4 Oct 2023 17:48:20 +0900 Subject: [PATCH] Readding tests --- quickwit/quickwit-actors/src/actor_context.rs | 3 +- quickwit/quickwit-common/src/progress.rs | 9 + .../src/control_plane.rs | 63 ++- .../src/ingest/ingest_controller.rs | 90 ++-- quickwit/quickwit-control-plane/src/lib.rs | 3 + .../quickwit-control-plane/src/scheduler.rs | 379 +--------------- quickwit/quickwit-control-plane/src/tests.rs | 412 ++++++++++++++++++ 7 files changed, 494 insertions(+), 465 deletions(-) create mode 100644 quickwit/quickwit-control-plane/src/tests.rs diff --git a/quickwit/quickwit-actors/src/actor_context.rs b/quickwit/quickwit-actors/src/actor_context.rs index b0201d92a9d..b690bae2c50 100644 --- a/quickwit/quickwit-actors/src/actor_context.rs +++ b/quickwit/quickwit-actors/src/actor_context.rs @@ -151,8 +151,7 @@ impl ActorContext { /// Executes a future in a protected zone. pub async fn protect_future(&self, future: Fut) -> T where Fut: Future { - let _guard = self.protect_zone(); - future.await + self.progress.protect_future(future).await } /// Cooperatively yields, while keeping the actor protected. diff --git a/quickwit/quickwit-common/src/progress.rs b/quickwit/quickwit-common/src/progress.rs index 01293b3bb90..4d8e759aa03 100644 --- a/quickwit/quickwit-common/src/progress.rs +++ b/quickwit/quickwit-common/src/progress.rs @@ -20,6 +20,8 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; +use futures::Future; + /// Progress makes it possible to register some progress. /// It is used in lieu of healthcheck. /// @@ -83,6 +85,13 @@ impl Progress { .fetch_max(ProgressState::Updated.into(), Ordering::Relaxed); } + /// Executes a future in a protected zone. + pub async fn protect_future(&self, future: Fut) -> T + where Fut: Future { + let _guard = self.protect_zone(); + future.await + } + pub fn protect_zone(&self) -> ProtectedZoneGuard { loop { let previous_state: ProgressState = self.0.load(Ordering::SeqCst).into(); diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 1e64f701dbd..7a2973477c6 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -48,7 +48,7 @@ use crate::scheduler::{IndexingScheduler, IndexingSchedulerState}; use crate::IndexerPool; /// Interval between two controls (or checks) of the desired plan VS running plan. -const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { +pub const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { Duration::from_millis(500) } else { Duration::from_secs(3) @@ -58,7 +58,7 @@ const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsu /// control plan loop. // Note: it's currently not possible to define a const duration with // `CONTROL_PLAN_LOOP_INTERVAL * number`. -const REFRESH_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { +pub const REFRESH_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { Duration::from_secs(3) } else { Duration::from_secs(60) @@ -106,24 +106,6 @@ pub struct ControlPlane { ingest_controller: IngestController, } -#[async_trait] -impl Handler for ControlPlane { - type Reply = (); - - async fn handle( - &mut self, - _message: ControlPlanLoop, - ctx: &ActorContext, - ) -> Result<(), ActorExitStatus> { - if let Err(error) = self.indexing_scheduler.control_running_plan().await { - error!("Error when controlling the running plan: `{}`.", error); - } - ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop) - .await; - Ok(()) - } -} - impl ControlPlane { pub fn spawn( universe: &Universe, @@ -147,10 +129,10 @@ impl ControlPlane { } } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Default)] pub struct ControlPlaneObservableState { - ingester_controller: IngestControllerState, - indexing_scheduler: IndexingSchedulerState, + pub ingester_controller: IngestControllerState, + pub indexing_scheduler: IndexingSchedulerState, } #[async_trait] @@ -170,7 +152,7 @@ impl Actor for ControlPlane { async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { self.ingest_controller - .load_state(ctx) + .load_state(ctx.progress()) .await .context("failed to initialize ingest controller")?; @@ -182,15 +164,23 @@ impl Actor for ControlPlane { } } -// macro_rules! handle_ask_res { -// ($ask_res:expr) => { -// match $ask_res { -// Ok(response) => response, -// Err(AskError::ErrorReply(error)) => return Ok(Err(error)), -// Err(error) => return Err(ActorExitStatus::Failure(anyhow::anyhow!(error).into())), -// } -// }; -// } +#[async_trait] +impl Handler for ControlPlane { + type Reply = (); + + async fn handle( + &mut self, + _message: ControlPlanLoop, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + if let Err(error) = self.indexing_scheduler.control_running_plan().await { + error!("Error when controlling the running plan: `{}`.", error); + } + ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop) + .await; + Ok(()) + } +} #[async_trait] impl Handler for ControlPlane { @@ -361,7 +351,10 @@ impl Handler for ControlPlane { request: GetOpenShardsRequest, ctx: &ActorContext, ) -> Result { - Ok(self.ingest_controller.get_open_shards(request, ctx).await) + Ok(self + .ingest_controller + .get_open_shards(request, ctx.progress()) + .await) } } @@ -377,7 +370,7 @@ impl Handler for ControlPlane { // TODO decide on what the error should be. let close_shards_resp = self .ingest_controller - .close_shards(request, ctx) + .close_shards(request, ctx.progress()) .await .context("Failed to close shards")?; Ok(Ok(close_shards_resp)) diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 5a681cbf515..0f8eda4d95f 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use itertools::Itertools; -use quickwit_actors::ActorContext; +use quickwit_common::Progress; use quickwit_config::INGEST_SOURCE_ID; use quickwit_ingest::IngesterPool; use quickwit_metastore::{ListIndexesQuery, Metastore}; @@ -44,8 +44,6 @@ use serde::Serialize; use tokio::time::timeout; use tracing::{error, info}; -use crate::control_plane::ControlPlane; - const PING_LEADER_TIMEOUT: Duration = if cfg!(test) { Duration::from_millis(50) } else { @@ -267,17 +265,14 @@ impl IngestController { } } - pub(crate) async fn load_state( - &mut self, - ctx: &ActorContext, - ) -> ControlPlaneResult<()> { + pub(crate) async fn load_state(&mut self, progress: &Progress) -> ControlPlaneResult<()> { info!("syncing internal state with metastore"); let now = Instant::now(); self.index_table.clear(); self.shard_table.clear(); - let indexes = ctx + let indexes = progress .protect_future(self.metastore.list_indexes_metadatas(ListIndexesQuery::All)) .await?; @@ -307,7 +302,7 @@ impl IngestController { } if !subrequests.is_empty() { let list_shards_request = metastore::ListShardsRequest { subrequests }; - let list_shard_response = ctx + let list_shard_response = progress .protect_future(self.metastore.list_shards(list_shards_request)) .await?; @@ -352,7 +347,7 @@ impl IngestController { &mut self, leader_id: &NodeId, follower_id_opt: Option<&NodeId>, - ctx: &ActorContext, + progress: &Progress, ) -> Result<(), PingError> { let mut leader_ingester = self .ingester_pool @@ -366,7 +361,7 @@ impl IngestController { .cloned() .map(|follower_id| follower_id.into()), }; - ctx.protect_future(timeout( + progress.protect_future(timeout( PING_LEADER_TIMEOUT, leader_ingester.ping(ping_request), )) @@ -388,7 +383,7 @@ impl IngestController { async fn find_leader_and_follower( &mut self, unavailable_ingesters: &mut HashSet, - ctx: &ActorContext, + progress: &Progress, ) -> Option<(NodeId, Option)> { let mut candidates: Vec = self.ingester_pool.keys().await; candidates.retain(|node_id| !unavailable_ingesters.contains(node_id)); @@ -403,7 +398,7 @@ impl IngestController { continue; } if self - .ping_leader_and_follower(&leader_id, None, ctx) + .ping_leader_and_follower(&leader_id, None, progress) .await .is_ok() { @@ -420,7 +415,7 @@ impl IngestController { continue; } match self - .ping_leader_and_follower(&leader_id, Some(&follower_id), ctx) + .ping_leader_and_follower(&leader_id, Some(&follower_id), progress) .await { Ok(_) => return Some((leader_id, Some(follower_id))), @@ -446,7 +441,7 @@ impl IngestController { pub(crate) async fn get_open_shards( &mut self, get_open_shards_request: GetOpenShardsRequest, - ctx: &ActorContext, + progress: &Progress, ) -> ControlPlaneResult { let mut get_open_shards_subresponses = Vec::with_capacity(get_open_shards_request.subrequests.len()); @@ -493,7 +488,7 @@ impl IngestController { // TODO: Find leaders in batches. // TODO: Round-robin leader-follower pairs or choose according to load. let (leader_id, follower_id) = self - .find_leader_and_follower(&mut unavailable_ingesters, ctx) + .find_leader_and_follower(&mut unavailable_ingesters, progress) .await .ok_or_else(|| { ControlPlaneError::Unavailable("no available ingester".to_string()) @@ -512,7 +507,7 @@ impl IngestController { let open_shards_request = metastore::OpenShardsRequest { subrequests: open_shards_subrequests, }; - let open_shards_response = ctx + let open_shards_response = progress .protect_future(self.metastore.open_shards(open_shards_request)) .await?; for open_shards_subresponse in &open_shards_response.subresponses { @@ -542,7 +537,7 @@ impl IngestController { pub(crate) async fn close_shards( &mut self, close_shards_request: CloseShardsRequest, - ctx: &ActorContext, + progress: &Progress, ) -> ControlPlaneResult { let mut close_shards_subrequests = Vec::with_capacity(close_shards_request.subrequests.len()); @@ -560,7 +555,7 @@ impl IngestController { let metastore_close_shards_request = metastore::CloseShardsRequest { subrequests: close_shards_subrequests, }; - let close_shards_response = ctx + let close_shards_response = progress .protect_future(self.metastore.close_shards(metastore_close_shards_request)) .await?; for close_shards_success in close_shards_response.successes { @@ -582,9 +577,9 @@ enum PingError { FollowerUnavailable, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Default)] pub struct IngestControllerState { - num_indexes: usize, + pub num_indexes: usize, } impl IngestController { @@ -614,7 +609,6 @@ impl IngestController { #[cfg(test)] mod tests { - use quickwit_actors::Universe; use quickwit_config::SourceConfig; use quickwit_metastore::{IndexMetadata, MockMetastore}; use quickwit_proto::control_plane::GetOpenShardsSubrequest; @@ -622,7 +616,6 @@ mod tests { IngesterServiceClient, MockIngesterService, PingResponse, }; use quickwit_proto::ingest::{IngestV2Error, Shard}; - use tokio::sync::watch; use super::*; @@ -823,10 +816,7 @@ mod tests { #[tokio::test] async fn test_ingest_controller_load_shard_table() { - let universe = Universe::with_accelerated_time(); - let (mailbox, _inbox) = universe.create_test_mailbox(); - let (observable_state_tx, _observable_state_rx) = watch::channel(json!({})); - let ctx = ActorContext::for_test(&universe, mailbox, observable_state_tx); + let progress = Progress::default(); let mut mock_metastore = MockMetastore::default(); mock_metastore @@ -881,7 +871,7 @@ mod tests { let mut ingest_controller = IngestController::new(metastore, ingester_pool, replication_factor); - ingest_controller.load_state(&ctx).await.unwrap(); + ingest_controller.load_state(&progress).await.unwrap(); assert_eq!(ingest_controller.index_table.len(), 2); assert_eq!( @@ -919,10 +909,7 @@ mod tests { #[tokio::test] async fn test_ingest_controller_ping_leader() { - let universe = Universe::new(); - let (mailbox, _inbox) = universe.create_test_mailbox(); - let (observable_state_tx, _observable_state_rx) = watch::channel(json!({})); - let ctx = ActorContext::for_test(&universe, mailbox, observable_state_tx); + let progress = Progress::default(); let mock_metastore = MockMetastore::default(); let metastore = Arc::new(mock_metastore); @@ -933,7 +920,7 @@ mod tests { let leader_id: NodeId = "test-ingester-0".into(); let error = ingest_controller - .ping_leader_and_follower(&ctx, &leader_id, None) + .ping_leader_and_follower(&leader_id, None, &progress) .await .unwrap_err(); assert!(matches!(error, PingError::LeaderUnavailable)); @@ -951,7 +938,7 @@ mod tests { .await; ingest_controller - .ping_leader_and_follower(&ctx, &leader_id, None) + .ping_leader_and_follower(&leader_id, None, &progress) .await .unwrap(); @@ -971,7 +958,7 @@ mod tests { .await; let error = ingest_controller - .ping_leader_and_follower(&ctx, &leader_id, None) + .ping_leader_and_follower(&leader_id, None, &progress) .await .unwrap_err(); assert!(matches!(error, PingError::LeaderUnavailable)); @@ -993,7 +980,7 @@ mod tests { let follower_id: NodeId = "test-ingester-1".into(); let error = ingest_controller - .ping_leader_and_follower(&ctx, &leader_id, Some(&follower_id)) + .ping_leader_and_follower(&leader_id, Some(&follower_id), &progress) .await .unwrap_err(); assert!(matches!(error, PingError::FollowerUnavailable)); @@ -1001,10 +988,7 @@ mod tests { #[tokio::test] async fn test_ingest_controller_find_leader_replication_factor_1() { - let universe = Universe::with_accelerated_time(); - let (mailbox, _inbox) = universe.create_test_mailbox(); - let (observable_state_tx, _observable_state_rx) = watch::channel(json!({})); - let ctx = ActorContext::for_test(&universe, mailbox, observable_state_tx); + let progress = Progress::default(); let mock_metastore = MockMetastore::default(); let metastore = Arc::new(mock_metastore); @@ -1014,7 +998,7 @@ mod tests { IngestController::new(metastore, ingester_pool.clone(), replication_factor); let leader_follower_pair = ingest_controller - .find_leader_and_follower(&ctx, &mut HashSet::new()) + .find_leader_and_follower(&mut HashSet::new(), &progress) .await; assert!(leader_follower_pair.is_none()); @@ -1031,7 +1015,7 @@ mod tests { .await; let leader_follower_pair = ingest_controller - .find_leader_and_follower(&ctx, &mut HashSet::new()) + .find_leader_and_follower(&mut HashSet::new(), &progress) .await; assert!(leader_follower_pair.is_none()); @@ -1048,7 +1032,7 @@ mod tests { .await; let (leader_id, follower_id) = ingest_controller - .find_leader_and_follower(&ctx, &mut HashSet::new()) + .find_leader_and_follower(&mut HashSet::new(), &progress) .await .unwrap(); assert_eq!(leader_id.as_str(), "test-ingester-1"); @@ -1057,10 +1041,7 @@ mod tests { #[tokio::test] async fn test_ingest_controller_find_leader_replication_factor_2() { - let universe = Universe::with_accelerated_time(); - let (mailbox, _inbox) = universe.create_test_mailbox(); - let (observable_state_tx, _observable_state_rx) = watch::channel(json!({})); - let ctx = ActorContext::for_test(&universe, mailbox, observable_state_tx); + let progress = Progress::default(); let mock_metastore = MockMetastore::default(); let metastore = Arc::new(mock_metastore); @@ -1070,7 +1051,7 @@ mod tests { IngestController::new(metastore, ingester_pool.clone(), replication_factor); let leader_follower_pair = ingest_controller - .find_leader_and_follower(&ctx, &mut HashSet::new()) + .find_leader_and_follower(&mut HashSet::new(), &progress) .await; assert!(leader_follower_pair.is_none()); @@ -1098,7 +1079,7 @@ mod tests { .await; let leader_follower_pair = ingest_controller - .find_leader_and_follower(&ctx, &mut HashSet::new()) + .find_leader_and_follower(&mut HashSet::new(), &progress) .await; assert!(leader_follower_pair.is_none()); @@ -1132,7 +1113,7 @@ mod tests { .await; let (leader_id, follower_id) = ingest_controller - .find_leader_and_follower(&ctx, &mut HashSet::new()) + .find_leader_and_follower(&mut HashSet::new(), &progress) .await .unwrap(); assert_eq!(leader_id.as_str(), "test-ingester-0"); @@ -1149,10 +1130,7 @@ mod tests { let source_id = "test-source".to_string(); - let universe = Universe::with_accelerated_time(); - let (mailbox, _inbox) = universe.create_test_mailbox(); - let (observable_state_tx, _observable_state_rx) = watch::channel(json!({})); - let ctx = ActorContext::for_test(&universe, mailbox, observable_state_tx); + let progress = Progress::default(); let mut mock_metastore = MockMetastore::default(); mock_metastore @@ -1237,7 +1215,7 @@ mod tests { unavailable_ingesters: Vec::new(), }; let response = ingest_controller - .get_open_shards(&ctx, request) + .get_open_shards(request, &progress) .await .unwrap(); assert_eq!(response.subresponses.len(), 0); @@ -1258,7 +1236,7 @@ mod tests { unavailable_ingesters, }; let response = ingest_controller - .get_open_shards(&ctx, request) + .get_open_shards(request, &progress) .await .unwrap(); assert_eq!(response.subresponses.len(), 2); diff --git a/quickwit/quickwit-control-plane/src/lib.rs b/quickwit/quickwit-control-plane/src/lib.rs index a4bdc118a69..21958f3bb89 100644 --- a/quickwit/quickwit-control-plane/src/lib.rs +++ b/quickwit/quickwit-control-plane/src/lib.rs @@ -33,3 +33,6 @@ pub struct IndexerNodeInfo { } pub type IndexerPool = Pool; + +#[cfg(test)] +mod tests; diff --git a/quickwit/quickwit-control-plane/src/scheduler.rs b/quickwit/quickwit-control-plane/src/scheduler.rs index 1338564cf06..8a3c6ba5ece 100644 --- a/quickwit/quickwit-control-plane/src/scheduler.rs +++ b/quickwit/quickwit-control-plane/src/scheduler.rs @@ -38,11 +38,12 @@ use crate::indexing_plan::{ }; use crate::{IndexerNodeInfo, IndexerPool}; -const MIN_DURATION_BETWEEN_SCHEDULING: Duration = if cfg!(any(test, feature = "testsuite")) { - Duration::from_millis(50) -} else { - Duration::from_secs(30) -}; +pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration = + if cfg!(any(test, feature = "testsuite")) { + Duration::from_millis(50) + } else { + Duration::from_secs(30) + }; #[derive(Debug, Clone, Default, Serialize)] pub struct IndexingSchedulerState { @@ -497,373 +498,7 @@ fn get_indexing_tasks_diff<'a>( #[cfg(test)] mod tests { - use std::collections::{HashMap, HashSet}; - use std::num::NonZeroUsize; - use std::sync::Arc; - use std::time::Duration; - - use chitchat::transport::ChannelTransport; - use futures::{Stream, StreamExt}; - use quickwit_actors::{ActorHandle, Inbox, Mailbox, Universe}; - use quickwit_cluster::{create_cluster_for_test, Cluster, ClusterChange}; - use quickwit_common::test_utils::wait_until_predicate; - use quickwit_common::tower::{Change, Pool}; - use quickwit_config::service::QuickwitService; - use quickwit_config::{KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams}; - use quickwit_indexing::IndexingService; - use quickwit_metastore::{IndexMetadata, ListIndexesQuery, MockMetastore}; - use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingServiceClient, IndexingTask}; - use quickwit_proto::metastore::ListShardsResponse; - use quickwit_proto::NodeId; - use serde_json::json; - - use super::{IndexingScheduler, CONTROL_PLAN_LOOP_INTERVAL}; - use crate::scheduler::{ - get_indexing_plans_diff, MIN_DURATION_BETWEEN_SCHEDULING, REFRESH_PLAN_LOOP_INTERVAL, - }; - use crate::IndexerNodeInfo; - - fn index_metadata_for_test( - index_id: &str, - source_id: &str, - desired_num_pipelines: usize, - max_num_pipelines_per_indexer: usize, - ) -> IndexMetadata { - let mut index_metadata = IndexMetadata::for_test(index_id, "ram://indexes/test-index"); - let source_config = SourceConfig { - enabled: true, - source_id: source_id.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(max_num_pipelines_per_indexer) - .unwrap(), - desired_num_pipelines: NonZeroUsize::new(desired_num_pipelines).unwrap(), - source_params: SourceParams::Kafka(KafkaSourceParams { - topic: "topic".to_string(), - client_log_level: None, - client_params: json!({ - "bootstrap.servers": "localhost:9092", - }), - enable_backfill_mode: true, - }), - transform_config: None, - input_format: SourceInputFormat::Json, - }; - index_metadata - .sources - .insert(source_id.to_string(), source_config); - index_metadata - } - - pub fn test_indexer_change_stream( - cluster_change_stream: impl Stream + Send + 'static, - indexing_clients: HashMap>, - ) -> impl Stream> + Send + 'static { - cluster_change_stream.filter_map(move |cluster_change| { - let indexing_clients = indexing_clients.clone(); - Box::pin(async move { - match cluster_change { - ClusterChange::Add(node) - if node.enabled_services().contains(&QuickwitService::Indexer) => - { - let node_id = node.node_id().to_string(); - let indexing_tasks = node.indexing_tasks().to_vec(); - let client_mailbox = indexing_clients.get(&node_id).unwrap().clone(); - let client = IndexingServiceClient::from_mailbox(client_mailbox); - Some(Change::Insert( - node_id, - IndexerNodeInfo { - client, - indexing_tasks, - }, - )) - } - ClusterChange::Remove(node) => Some(Change::Remove(node.node_id().to_string())), - _ => None, - } - }) - }) - } - - async fn start_scheduler( - cluster: Cluster, - indexers: &[&Cluster], - universe: &Universe, - ) -> (Vec>, ActorHandle) { - let index_1 = "test-indexing-plan-1"; - let source_1 = "source-1"; - let index_2 = "test-indexing-plan-2"; - let source_2 = "source-2"; - let index_metadata_1 = index_metadata_for_test(index_1, source_1, 2, 2); - let mut index_metadata_2 = index_metadata_for_test(index_2, source_2, 1, 1); - index_metadata_2.create_timestamp = index_metadata_1.create_timestamp + 1; - let mut metastore = MockMetastore::default(); - metastore.expect_list_indexes_metadatas().returning( - move |_list_indexes_query: ListIndexesQuery| { - Ok(vec![index_metadata_2.clone(), index_metadata_1.clone()]) - }, - ); - metastore.expect_list_shards().returning(|_| { - Ok(ListShardsResponse { - subresponses: Vec::new(), - }) - }); - let mut indexer_inboxes = Vec::new(); - let indexer_pool = Pool::default(); - let change_stream = cluster.ready_nodes_change_stream().await; - let mut indexing_clients = HashMap::new(); - for indexer in indexers { - let (indexing_service_mailbox, indexing_service_inbox) = universe.create_test_mailbox(); - indexing_clients.insert(indexer.self_node_id().to_string(), indexing_service_mailbox); - indexer_inboxes.push(indexing_service_inbox); - } - let indexer_change_stream = test_indexer_change_stream(change_stream, indexing_clients); - indexer_pool.listen_for_changes(indexer_change_stream); - - let self_node_id: NodeId = cluster.self_node_id().to_string().into(); - let indexing_scheduler = IndexingScheduler::new( - cluster.cluster_id().to_string(), - self_node_id, - Arc::new(metastore), - indexer_pool, - ); - let (_, scheduler_handler) = universe.spawn_builder().spawn(indexing_scheduler); - (indexer_inboxes, scheduler_handler) - } - - #[tokio::test] - async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() { - quickwit_common::setup_logging_for_tests(); - let transport = ChannelTransport::default(); - let cluster = - create_cluster_for_test(Vec::new(), &["indexer", "control_plane"], &transport, true) - .await - .unwrap(); - cluster - .wait_for_ready_members(|members| members.len() == 1, Duration::from_secs(5)) - .await - .unwrap(); - let universe = Universe::with_accelerated_time(); - let (indexing_service_inboxes, scheduler_handler) = - start_scheduler(cluster.clone(), &[&cluster.clone()], &universe).await; - let indexing_service_inbox = indexing_service_inboxes[0].clone(); - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - let indexing_service_inbox_messages = - indexing_service_inbox.drain_for_test_typed::(); - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1); - assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); - assert!(scheduler_state.last_applied_physical_plan.is_some()); - assert_eq!(indexing_service_inbox_messages.len(), 1); - - // After a CONTROL_PLAN_LOOP_INTERVAL, the control loop will check if the desired plan is - // running on the indexer. As chitchat state of the indexer is not updated (we did - // not instantiate a indexing service for that), the control loop will apply again - // the same plan. - // Check first the plan is not updated before `MIN_DURATION_BETWEEN_SCHEDULING`. - tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(0.5)).await; - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1); - - // After `MIN_DURATION_BETWEEN_SCHEDULING`, we should see a plan update. - tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(0.7)).await; - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - let indexing_service_inbox_messages = - indexing_service_inbox.drain_for_test_typed::(); - assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 2); - assert_eq!(indexing_service_inbox_messages.len(), 1); - let indexing_tasks = indexing_service_inbox_messages - .first() - .unwrap() - .indexing_tasks - .clone(); - - // Update the indexer state and check that the indexer does not receive any new - // `ApplyIndexingPlanRequest`. - cluster - .update_self_node_indexing_tasks(&indexing_tasks) - .await - .unwrap(); - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 2); - let indexing_service_inbox_messages = - indexing_service_inbox.drain_for_test_typed::(); - assert_eq!(indexing_service_inbox_messages.len(), 0); - - // Update the indexer state with a different plan and check that the indexer does now - // receive a new `ApplyIndexingPlanRequest`. - cluster - .update_self_node_indexing_tasks(&[indexing_tasks[0].clone()]) - .await - .unwrap(); - tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(1.2)).await; - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 3); - let indexing_service_inbox_messages = - indexing_service_inbox.drain_for_test_typed::(); - assert_eq!(indexing_service_inbox_messages.len(), 1); - universe.assert_quit().await; - } - - #[tokio::test] - async fn test_scheduler_scheduling_no_indexer() { - quickwit_common::setup_logging_for_tests(); - let transport = ChannelTransport::default(); - let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true) - .await - .unwrap(); - let universe = Universe::with_accelerated_time(); - let (indexing_service_inboxes, scheduler_handler) = - start_scheduler(cluster.clone(), &[], &universe).await; - assert_eq!(indexing_service_inboxes.len(), 0); - - // No indexer. - universe.sleep(CONTROL_PLAN_LOOP_INTERVAL).await; - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); - assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); - assert!(scheduler_state.last_applied_physical_plan.is_none()); - - // Wait REFRESH_PLAN_LOOP_INTERVAL * 2, as there is no indexer, we should observe no - // scheduling. - universe.sleep(REFRESH_PLAN_LOOP_INTERVAL * 2).await; - let scheduler_state = scheduler_handler.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); - assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); - assert!(scheduler_state.last_applied_physical_plan.is_none()); - universe.assert_quit().await; - } - - #[tokio::test] - async fn test_scheduler_scheduling_multiple_indexers() { - quickwit_common::setup_logging_for_tests(); - let transport = ChannelTransport::default(); - let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true) - .await - .unwrap(); - let cluster_indexer_1 = create_cluster_for_test( - vec![cluster.gossip_advertise_addr().to_string()], - &["indexer"], - &transport, - true, - ) - .await - .unwrap(); - let cluster_indexer_2 = create_cluster_for_test( - vec![cluster.gossip_advertise_addr().to_string()], - &["indexer"], - &transport, - true, - ) - .await - .unwrap(); - let universe = Universe::new(); - let (indexing_service_inboxes, scheduler_handler) = start_scheduler( - cluster.clone(), - &[&cluster_indexer_1, &cluster_indexer_2], - &universe, - ) - .await; - let indexing_service_inbox_1 = indexing_service_inboxes[0].clone(); - let indexing_service_inbox_2 = indexing_service_inboxes[1].clone(); - let scheduler_handler_arc = Arc::new(scheduler_handler); - - // No indexer. - let scheduler_state = scheduler_handler_arc.process_pending_and_observe().await; - let indexing_service_inbox_messages = - indexing_service_inbox_1.drain_for_test_typed::(); - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); - assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); - assert!(scheduler_state.last_applied_physical_plan.is_none()); - assert_eq!(indexing_service_inbox_messages.len(), 0); - - cluster - .wait_for_ready_members( - |members| { - members - .iter() - .any(|member| member.enabled_services.contains(&QuickwitService::Indexer)) - }, - Duration::from_secs(5), - ) - .await - .unwrap(); - - // Wait for chitchat update, sheduler will detect new indexers and schedule a plan. - wait_until_predicate( - || { - let scheduler_handler_arc_clone = scheduler_handler_arc.clone(); - async move { - let scheduler_state = scheduler_handler_arc_clone - .process_pending_and_observe() - .await; - scheduler_state.num_schedule_indexing_plan == 1 - } - }, - CONTROL_PLAN_LOOP_INTERVAL * 4, - Duration::from_millis(100), - ) - .await - .unwrap(); - let scheduler_state = scheduler_handler_arc.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1); - let indexing_service_inbox_messages_1 = - indexing_service_inbox_1.drain_for_test_typed::(); - let indexing_service_inbox_messages_2 = - indexing_service_inbox_2.drain_for_test_typed::(); - assert_eq!(indexing_service_inbox_messages_1.len(), 1); - assert_eq!(indexing_service_inbox_messages_2.len(), 1); - cluster_indexer_1 - .update_self_node_indexing_tasks(&indexing_service_inbox_messages_1[0].indexing_tasks) - .await - .unwrap(); - cluster_indexer_2 - .update_self_node_indexing_tasks(&indexing_service_inbox_messages_2[0].indexing_tasks) - .await - .unwrap(); - - // Wait 2 CONTROL_PLAN_LOOP_INTERVAL again and check the scheduler will not apply the plan - // several times. - universe.sleep(CONTROL_PLAN_LOOP_INTERVAL * 2).await; - let scheduler_state = scheduler_handler_arc.process_pending_and_observe().await; - assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); - - // Shutdown cluster and wait until the new scheduling. - cluster_indexer_2.shutdown().await; - - cluster - .wait_for_ready_members( - |members| { - members - .iter() - .filter(|member| { - member.enabled_services.contains(&QuickwitService::Indexer) - }) - .count() - == 1 - }, - Duration::from_secs(5), - ) - .await - .unwrap(); - - wait_until_predicate( - || { - let scheduler_handler_arc_clone = scheduler_handler_arc.clone(); - async move { - let scheduler_state = scheduler_handler_arc_clone - .process_pending_and_observe() - .await; - scheduler_state.num_schedule_indexing_plan == 2 - } - }, - CONTROL_PLAN_LOOP_INTERVAL * 10, - Duration::from_millis(100), - ) - .await - .unwrap(); - - universe.assert_quit().await; - } + use super::*; #[test] fn test_indexing_plans_diff() { diff --git a/quickwit/quickwit-control-plane/src/tests.rs b/quickwit/quickwit-control-plane/src/tests.rs new file mode 100644 index 00000000000..0d355b370f1 --- /dev/null +++ b/quickwit/quickwit-control-plane/src/tests.rs @@ -0,0 +1,412 @@ +use std::collections::HashMap; +use std::num::NonZeroUsize; +use std::sync::Arc; +use std::time::Duration; + +use chitchat::transport::ChannelTransport; +use futures::{Stream, StreamExt}; +use quickwit_actors::{ActorHandle, Inbox, Mailbox, Universe}; +use quickwit_cluster::{create_cluster_for_test, Cluster, ClusterChange}; +use quickwit_common::test_utils::wait_until_predicate; +use quickwit_common::tower::{Change, Pool}; +use quickwit_config::service::QuickwitService; +use quickwit_config::{KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams}; +use quickwit_indexing::IndexingService; +use quickwit_metastore::{IndexMetadata, ListIndexesQuery, MockMetastore}; +use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingServiceClient}; +use quickwit_proto::metastore::ListShardsResponse; +use quickwit_proto::NodeId; +use serde_json::json; + +use crate::control_plane::{ControlPlane, CONTROL_PLAN_LOOP_INTERVAL, REFRESH_PLAN_LOOP_INTERVAL}; +use crate::scheduler::MIN_DURATION_BETWEEN_SCHEDULING; +use crate::IndexerNodeInfo; + +fn index_metadata_for_test( + index_id: &str, + source_id: &str, + desired_num_pipelines: usize, + max_num_pipelines_per_indexer: usize, +) -> IndexMetadata { + let mut index_metadata = IndexMetadata::for_test(index_id, "ram://indexes/test-index"); + let source_config = SourceConfig { + enabled: true, + source_id: source_id.to_string(), + max_num_pipelines_per_indexer: NonZeroUsize::new(max_num_pipelines_per_indexer).unwrap(), + desired_num_pipelines: NonZeroUsize::new(desired_num_pipelines).unwrap(), + source_params: SourceParams::Kafka(KafkaSourceParams { + topic: "topic".to_string(), + client_log_level: None, + client_params: json!({ + "bootstrap.servers": "localhost:9092", + }), + enable_backfill_mode: true, + }), + transform_config: None, + input_format: SourceInputFormat::Json, + }; + index_metadata + .sources + .insert(source_id.to_string(), source_config); + index_metadata +} + +pub fn test_indexer_change_stream( + cluster_change_stream: impl Stream + Send + 'static, + indexing_clients: HashMap>, +) -> impl Stream> + Send + 'static { + cluster_change_stream.filter_map(move |cluster_change| { + let indexing_clients = indexing_clients.clone(); + Box::pin(async move { + match cluster_change { + ClusterChange::Add(node) + if node.enabled_services().contains(&QuickwitService::Indexer) => + { + let node_id = node.node_id().to_string(); + let indexing_tasks = node.indexing_tasks().to_vec(); + let client_mailbox = indexing_clients.get(&node_id).unwrap().clone(); + let client = IndexingServiceClient::from_mailbox(client_mailbox); + Some(Change::Insert( + node_id, + IndexerNodeInfo { + client, + indexing_tasks, + }, + )) + } + ClusterChange::Remove(node) => Some(Change::Remove(node.node_id().to_string())), + _ => None, + } + }) + }) +} + +async fn start_control_plane( + cluster: Cluster, + indexers: &[&Cluster], + universe: &Universe, +) -> (Vec>, ActorHandle) { + let index_1 = "test-indexing-plan-1"; + let source_1 = "source-1"; + let index_2 = "test-indexing-plan-2"; + let source_2 = "source-2"; + let index_metadata_1 = index_metadata_for_test(index_1, source_1, 2, 2); + let mut index_metadata_2 = index_metadata_for_test(index_2, source_2, 1, 1); + index_metadata_2.create_timestamp = index_metadata_1.create_timestamp + 1; + let mut metastore = MockMetastore::default(); + metastore.expect_list_indexes_metadatas().returning( + move |_list_indexes_query: ListIndexesQuery| { + Ok(vec![index_metadata_2.clone(), index_metadata_1.clone()]) + }, + ); + metastore.expect_list_shards().returning(|_| { + Ok(ListShardsResponse { + subresponses: Vec::new(), + }) + }); + let mut indexer_inboxes = Vec::new(); + + let indexer_pool = Pool::default(); + let ingester_pool = Pool::default(); + let change_stream = cluster.ready_nodes_change_stream().await; + let mut indexing_clients = HashMap::new(); + + for indexer in indexers { + let (indexing_service_mailbox, indexing_service_inbox) = universe.create_test_mailbox(); + indexing_clients.insert(indexer.self_node_id().to_string(), indexing_service_mailbox); + indexer_inboxes.push(indexing_service_inbox); + } + let indexer_change_stream = test_indexer_change_stream(change_stream, indexing_clients); + indexer_pool.listen_for_changes(indexer_change_stream); + + let self_node_id: NodeId = cluster.self_node_id().to_string().into(); + let (_, control_plane_handle) = ControlPlane::spawn( + universe, + cluster.cluster_id().to_string(), + self_node_id, + indexer_pool, + ingester_pool, + Arc::new(metastore), + 1, + ); + + (indexer_inboxes, control_plane_handle) +} + +#[tokio::test] +async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() { + quickwit_common::setup_logging_for_tests(); + let transport = ChannelTransport::default(); + let cluster = + create_cluster_for_test(Vec::new(), &["indexer", "control_plane"], &transport, true) + .await + .unwrap(); + cluster + .wait_for_ready_members(|members| members.len() == 1, Duration::from_secs(5)) + .await + .unwrap(); + let universe = Universe::with_accelerated_time(); + let (indexing_service_inboxes, control_plane_handler) = + start_control_plane(cluster.clone(), &[&cluster.clone()], &universe).await; + let indexing_service_inbox = indexing_service_inboxes[0].clone(); + let scheduler_state = control_plane_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + let indexing_service_inbox_messages = + indexing_service_inbox.drain_for_test_typed::(); + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1); + assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); + assert!(scheduler_state.last_applied_physical_plan.is_some()); + assert_eq!(indexing_service_inbox_messages.len(), 1); + + // After a CONTROL_PLAN_LOOP_INTERVAL, the control loop will check if the desired plan is + // running on the indexer. As chitchat state of the indexer is not updated (we did + // not instantiate a indexing service for that), the control loop will apply again + // the same plan. + // Check first the plan is not updated before `MIN_DURATION_BETWEEN_SCHEDULING`. + tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(0.5)).await; + let scheduler_state = control_plane_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1); + + // After `MIN_DURATION_BETWEEN_SCHEDULING`, we should see a plan update. + tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(0.7)).await; + let scheduler_state = control_plane_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + let indexing_service_inbox_messages = + indexing_service_inbox.drain_for_test_typed::(); + assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 2); + assert_eq!(indexing_service_inbox_messages.len(), 1); + let indexing_tasks = indexing_service_inbox_messages + .first() + .unwrap() + .indexing_tasks + .clone(); + + // Update the indexer state and check that the indexer does not receive any new + // `ApplyIndexingPlanRequest`. + cluster + .update_self_node_indexing_tasks(&indexing_tasks) + .await + .unwrap(); + let scheduler_state = control_plane_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 2); + let indexing_service_inbox_messages = + indexing_service_inbox.drain_for_test_typed::(); + assert_eq!(indexing_service_inbox_messages.len(), 0); + + // Update the indexer state with a different plan and check that the indexer does now + // receive a new `ApplyIndexingPlanRequest`. + cluster + .update_self_node_indexing_tasks(&[indexing_tasks[0].clone()]) + .await + .unwrap(); + tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(1.2)).await; + let scheduler_state = control_plane_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 3); + let indexing_service_inbox_messages = + indexing_service_inbox.drain_for_test_typed::(); + assert_eq!(indexing_service_inbox_messages.len(), 1); + universe.assert_quit().await; +} + +#[tokio::test] +async fn test_scheduler_scheduling_no_indexer() { + quickwit_common::setup_logging_for_tests(); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true) + .await + .unwrap(); + let universe = Universe::with_accelerated_time(); + let (indexing_service_inboxes, scheduler_handler) = + start_control_plane(cluster.clone(), &[], &universe).await; + assert_eq!(indexing_service_inboxes.len(), 0); + + // No indexer. + universe.sleep(CONTROL_PLAN_LOOP_INTERVAL).await; + let scheduler_state = scheduler_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); + assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); + assert!(scheduler_state.last_applied_physical_plan.is_none()); + + // Wait REFRESH_PLAN_LOOP_INTERVAL * 2, as there is no indexer, we should observe no + // scheduling. + universe.sleep(REFRESH_PLAN_LOOP_INTERVAL * 2).await; + let scheduler_state = scheduler_handler + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); + assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); + assert!(scheduler_state.last_applied_physical_plan.is_none()); + universe.assert_quit().await; +} + +#[tokio::test] +async fn test_scheduler_scheduling_multiple_indexers() { + quickwit_common::setup_logging_for_tests(); + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true) + .await + .unwrap(); + let cluster_indexer_1 = create_cluster_for_test( + vec![cluster.gossip_advertise_addr().to_string()], + &["indexer"], + &transport, + true, + ) + .await + .unwrap(); + let cluster_indexer_2 = create_cluster_for_test( + vec![cluster.gossip_advertise_addr().to_string()], + &["indexer"], + &transport, + true, + ) + .await + .unwrap(); + let universe = Universe::new(); + let (indexing_service_inboxes, scheduler_handler) = start_control_plane( + cluster.clone(), + &[&cluster_indexer_1, &cluster_indexer_2], + &universe, + ) + .await; + let indexing_service_inbox_1 = indexing_service_inboxes[0].clone(); + let indexing_service_inbox_2 = indexing_service_inboxes[1].clone(); + let scheduler_handler_arc = Arc::new(scheduler_handler); + + // No indexer. + let scheduler_state = scheduler_handler_arc + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + let indexing_service_inbox_messages = + indexing_service_inbox_1.drain_for_test_typed::(); + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 0); + assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); + assert!(scheduler_state.last_applied_physical_plan.is_none()); + assert_eq!(indexing_service_inbox_messages.len(), 0); + + cluster + .wait_for_ready_members( + |members| { + members + .iter() + .any(|member| member.enabled_services.contains(&QuickwitService::Indexer)) + }, + Duration::from_secs(5), + ) + .await + .unwrap(); + + // Wait for chitchat update, sheduler will detect new indexers and schedule a plan. + wait_until_predicate( + || { + let scheduler_handler_arc_clone = scheduler_handler_arc.clone(); + async move { + let scheduler_state = scheduler_handler_arc_clone + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + scheduler_state.num_schedule_indexing_plan == 1 + } + }, + CONTROL_PLAN_LOOP_INTERVAL * 4, + Duration::from_millis(100), + ) + .await + .unwrap(); + let scheduler_state = scheduler_handler_arc + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_applied_physical_indexing_plan, 1); + let indexing_service_inbox_messages_1 = + indexing_service_inbox_1.drain_for_test_typed::(); + let indexing_service_inbox_messages_2 = + indexing_service_inbox_2.drain_for_test_typed::(); + assert_eq!(indexing_service_inbox_messages_1.len(), 1); + assert_eq!(indexing_service_inbox_messages_2.len(), 1); + cluster_indexer_1 + .update_self_node_indexing_tasks(&indexing_service_inbox_messages_1[0].indexing_tasks) + .await + .unwrap(); + cluster_indexer_2 + .update_self_node_indexing_tasks(&indexing_service_inbox_messages_2[0].indexing_tasks) + .await + .unwrap(); + + // Wait 2 CONTROL_PLAN_LOOP_INTERVAL again and check the scheduler will not apply the plan + // several times. + universe.sleep(CONTROL_PLAN_LOOP_INTERVAL * 2).await; + let scheduler_state = scheduler_handler_arc + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + assert_eq!(scheduler_state.num_schedule_indexing_plan, 1); + + // Shutdown cluster and wait until the new scheduling. + cluster_indexer_2.shutdown().await; + + cluster + .wait_for_ready_members( + |members| { + members + .iter() + .filter(|member| member.enabled_services.contains(&QuickwitService::Indexer)) + .count() + == 1 + }, + Duration::from_secs(5), + ) + .await + .unwrap(); + + wait_until_predicate( + || { + let scheduler_handler_arc_clone = scheduler_handler_arc.clone(); + async move { + let scheduler_state = scheduler_handler_arc_clone + .process_pending_and_observe() + .await + .state + .indexing_scheduler; + scheduler_state.num_schedule_indexing_plan == 2 + } + }, + CONTROL_PLAN_LOOP_INTERVAL * 10, + Duration::from_millis(100), + ) + .await + .unwrap(); + + universe.assert_quit().await; +}