From 5699660d61e00ea48524ef3bee6d6aa0d3f21fb1 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 6 Oct 2023 09:49:32 +0900 Subject: [PATCH] Removing planing loop (#3913) * Refactoring of the control plane. The two internal actors are replaced by two internal states. Their execution is not concurrent anymore, but we the code and the flow is simplified and we avoid some race conditions. The control plane is not getting the update notify signal through the broker anymore. The Notify RPC is entirely removed. Most of the test in the indexer scheduler become Control plane tests. * Removing RefreshPlan loop. The RefreshPlan loop is not necessary since after we moved to the control plane as a metastore proxy world, the control plan cannot miss any Index/Source event. --- .../src/control_plane.rs | 46 ++++--------------- quickwit/quickwit-control-plane/src/tests.rs | 6 +-- 2 files changed, 13 insertions(+), 39 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index f38de7a6ef1..5128f3812ec 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -54,19 +54,6 @@ pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, featur Duration::from_secs(3) }; -/// Interval between two scheduling of indexing plans. No need to be faster than the -/// control plan loop. -// Note: it's currently not possible to define a const duration with -// `CONTROL_PLAN_LOOP_INTERVAL * number`. -pub(crate) const REFRESH_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { - Duration::from_secs(3) -} else { - Duration::from_secs(60) -}; - -#[derive(Debug)] -struct RefreshPlanLoop; - #[derive(Debug)] struct ControlPlanLoop; @@ -134,7 +121,15 @@ impl Actor for ControlPlane { .await .context("failed to initialize ingest controller")?; - self.handle(RefreshPlanLoop, ctx).await?; + if let Err(error) = self + .indexing_scheduler + .schedule_indexing_plan_if_needed() + .await + { + // TODO inspect error. + error!("Error when scheduling indexing plan: `{}`.", error); + } + ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop) .await; @@ -218,28 +213,6 @@ impl Handler for ControlPlane { } } -#[async_trait] -impl Handler for ControlPlane { - type Reply = (); - - async fn handle( - &mut self, - _message: RefreshPlanLoop, - ctx: &ActorContext, - ) -> Result<(), ActorExitStatus> { - if let Err(error) = self - .indexing_scheduler - .schedule_indexing_plan_if_needed() - .await - { - error!("Error when scheduling indexing plan: `{}`.", error); - } - ctx.schedule_self_msg(REFRESH_PLAN_LOOP_INTERVAL, RefreshPlanLoop) - .await; - Ok(()) - } -} - #[async_trait] impl Handler for ControlPlane { type Reply = ControlPlaneResult; @@ -331,6 +304,7 @@ impl Handler for ControlPlane { self.ingest_controller .delete_source(&index_uid, &request.source_id); + self.indexing_scheduler.on_index_change().await?; let response = EmptyResponse {}; Ok(Ok(response)) diff --git a/quickwit/quickwit-control-plane/src/tests.rs b/quickwit/quickwit-control-plane/src/tests.rs index 7ed4f4521b3..9d910457dc9 100644 --- a/quickwit/quickwit-control-plane/src/tests.rs +++ b/quickwit/quickwit-control-plane/src/tests.rs @@ -37,7 +37,7 @@ use quickwit_proto::metastore::ListShardsResponse; use quickwit_proto::NodeId; use serde_json::json; -use crate::control_plane::{ControlPlane, CONTROL_PLAN_LOOP_INTERVAL, REFRESH_PLAN_LOOP_INTERVAL}; +use crate::control_plane::{ControlPlane, CONTROL_PLAN_LOOP_INTERVAL}; use crate::scheduler::MIN_DURATION_BETWEEN_SCHEDULING; use crate::IndexerNodeInfo; @@ -270,9 +270,9 @@ async fn test_scheduler_scheduling_no_indexer() { assert_eq!(scheduler_state.num_schedule_indexing_plan, 0); assert!(scheduler_state.last_applied_physical_plan.is_none()); - // Wait REFRESH_PLAN_LOOP_INTERVAL * 2, as there is no indexer, we should observe no + // There is no indexer, we should observe no // scheduling. - universe.sleep(REFRESH_PLAN_LOOP_INTERVAL * 2).await; + universe.sleep(Duration::from_secs(60)).await; let scheduler_state = scheduler_handler .process_pending_and_observe() .await