From a06affd09e989b80706a128bee4248a2cf1eadff Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 18 Jan 2024 12:02:14 +0900 Subject: [PATCH] Debouncing reschedule. After this change, the creation of a new plan will happen at most once by window of 5s. Outside of cooldown period, a (relevant) event triggers a RebuildPlan instant. Limiting concurrency of the Metastore client using a tower Layer. Closes #4407 --- quickwit/quickwit-actors/src/actor_context.rs | 7 +- quickwit/quickwit-actors/src/spawn_builder.rs | 16 ++ .../src/control_plane.rs | 86 +++++--- .../quickwit-control-plane/src/debouncer.rs | 185 ++++++++++++++++++ .../src/indexing_scheduler/mod.rs | 9 +- quickwit/quickwit-control-plane/src/lib.rs | 1 + quickwit/quickwit-serve/src/lib.rs | 5 + 7 files changed, 273 insertions(+), 36 deletions(-) create mode 100644 quickwit/quickwit-control-plane/src/debouncer.rs diff --git a/quickwit/quickwit-actors/src/actor_context.rs b/quickwit/quickwit-actors/src/actor_context.rs index 2c052931f02..aff023ad602 100644 --- a/quickwit/quickwit-actors/src/actor_context.rs +++ b/quickwit/quickwit-actors/src/actor_context.rs @@ -354,14 +354,11 @@ impl ActorContext { A: DeferableReplyHandler, M: Sync + Send + std::fmt::Debug + 'static, { - let self_mailbox = self.inner.self_mailbox.clone(); + let self_mailbox = self.mailbox().clone(); let callback = move || { let _ = self_mailbox.send_message_with_high_priority(message); }; - self.inner - .spawn_ctx - .scheduler_client - .schedule_event(callback, after_duration); + self.spawn_ctx().schedule_event(callback, after_duration); } } diff --git a/quickwit/quickwit-actors/src/spawn_builder.rs b/quickwit/quickwit-actors/src/spawn_builder.rs index 652816909f5..f7641fb9413 100644 --- a/quickwit/quickwit-actors/src/spawn_builder.rs +++ b/quickwit/quickwit-actors/src/spawn_builder.rs @@ -17,6 +17,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::time::Duration; + use anyhow::Context; use quickwit_common::metrics::IntCounter; use sync_wrapper::SyncWrapper; @@ -71,6 +73,20 @@ impl SpawnContext { registry: self.registry.clone(), } } + + /// Schedules a new event. + /// Once `timeout` is elapsed, the future `fut` is + /// executed. + /// + /// `fut` will be executed in the scheduler task, so it is + /// required to be short. + pub fn schedule_event( + &self, + callback: F, + timeout: Duration, + ) { + self.scheduler_client.schedule_event(callback, timeout) + } } /// `SpawnBuilder` makes it possible to configure misc parameters before spawning an actor. diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 568263fce25..933a93fc669 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -18,6 +18,8 @@ // along with this program. If not, see . use std::collections::BTreeSet; +use std::fmt; +use std::fmt::Formatter; use std::time::Duration; use anyhow::Context; @@ -47,6 +49,7 @@ use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid}; use serde::Serialize; use tracing::error; +use crate::debouncer::Debouncer; use crate::indexing_scheduler::{IndexingScheduler, IndexingSchedulerState}; use crate::ingest::IngestController; use crate::model::{ControlPlaneModel, ControlPlaneModelMetrics}; @@ -59,10 +62,15 @@ pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, featur Duration::from_secs(3) }; +/// Minimum period between two rebuild plan operation. +const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(5); + #[derive(Debug)] struct ControlPlanLoop; -#[derive(Debug)] +#[derive(Debug, Default)] +struct RebuildPlan; + pub struct ControlPlane { metastore: MetastoreServiceClient, model: ControlPlaneModel, @@ -75,6 +83,13 @@ pub struct ControlPlane { // the different ingesters. indexing_scheduler: IndexingScheduler, ingest_controller: IngestController, + rebuild_plan_debouncer: Debouncer, +} + +impl fmt::Debug for ControlPlane { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + f.debug_struct("ControlPlane").finish() + } } impl ControlPlane { @@ -100,6 +115,7 @@ impl ControlPlane { metastore: metastore.clone(), indexing_scheduler, ingest_controller, + rebuild_plan_debouncer: Debouncer::new(REBUILD_PLAN_COOLDOWN_PERIOD), } }) } @@ -133,8 +149,7 @@ impl Actor for ControlPlane { .await .context("failed to initialize the model")?; - self.indexing_scheduler - .schedule_indexing_plan_if_needed(&self.model); + self.rebuild_plan_debounced(ctx); self.ingest_controller.sync_with_all_ingesters(&self.model); @@ -145,6 +160,14 @@ impl Actor for ControlPlane { } impl ControlPlane { + /// Rebuilds the indexing plan. + /// + /// This method includes debouncing logic. Every call will be followed by a cooldown period. + fn rebuild_plan_debounced(&mut self, ctx: &ActorContext) { + self.rebuild_plan_debouncer + .self_send_with_cooldown::<_, RebuildPlan>(ctx); + } + /// Deletes a set of shards from the metastore and the control plane model. /// /// If the shards were already absent this operation is considered successful. @@ -152,6 +175,7 @@ impl ControlPlane { &mut self, source_uid: &SourceUid, shards: &[ShardId], + ctx: &ActorContext, ) -> anyhow::Result<()> { let delete_shards_subrequest = DeleteShardsSubrequest { index_uid: source_uid.index_uid.to_string(), @@ -173,8 +197,7 @@ impl ControlPlane { .await .context("failed to delete shards in metastore")?; self.model.delete_shards(source_uid, shards); - self.indexing_scheduler - .schedule_indexing_plan_if_needed(&self.model); + self.rebuild_plan_debounced(ctx); Ok(()) } @@ -210,6 +233,20 @@ impl ControlPlane { } } +#[async_trait] +impl Handler for ControlPlane { + type Reply = (); + + async fn handle( + &mut self, + _message: RebuildPlan, + _ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + self.indexing_scheduler.rebuild_plan(&self.model); + Ok(()) + } +} + #[async_trait] impl Handler for ControlPlane { type Reply = (); @@ -217,7 +254,7 @@ impl Handler for ControlPlane { async fn handle( &mut self, shard_positions_update: ShardPositionsUpdate, - _ctx: &ActorContext, + ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { let Some(shard_entries) = self .model @@ -240,7 +277,7 @@ impl Handler for ControlPlane { if shard_ids_to_close.is_empty() { return Ok(()); } - self.delete_shards(&shard_positions_update.source_uid, &shard_ids_to_close) + self.delete_shards(&shard_positions_update.source_uid, &shard_ids_to_close, ctx) .await?; Ok(()) } @@ -323,7 +360,7 @@ impl Handler for ControlPlane { async fn handle( &mut self, request: CreateIndexRequest, - _ctx: &ActorContext, + ctx: &ActorContext, ) -> Result { let index_config = match metastore_serde_utils::from_json_str(&request.index_config_json) { Ok(index_config) => index_config, @@ -341,8 +378,7 @@ impl Handler for ControlPlane { self.model.add_index(index_metadata); - self.indexing_scheduler - .schedule_indexing_plan_if_needed(&self.model); + self.rebuild_plan_debounced(ctx); let response = CreateIndexResponse { index_uid: index_uid.into(), @@ -361,7 +397,7 @@ impl Handler for ControlPlane { async fn handle( &mut self, request: DeleteIndexRequest, - _ctx: &ActorContext, + ctx: &ActorContext, ) -> Result { let index_uid: IndexUid = request.index_uid.clone().into(); @@ -382,8 +418,7 @@ impl Handler for ControlPlane { // TODO: Refine the event. Notify index will have the effect to reload the entire state from // the metastore. We should update the state of the control plane. - self.indexing_scheduler - .schedule_indexing_plan_if_needed(&self.model); + self.rebuild_plan_debounced(ctx); let response = EmptyResponse {}; Ok(Ok(response)) @@ -399,7 +434,7 @@ impl Handler for ControlPlane { async fn handle( &mut self, request: AddSourceRequest, - _ctx: &ActorContext, + ctx: &ActorContext, ) -> Result { let index_uid: IndexUid = request.index_uid.clone().into(); let source_config: SourceConfig = @@ -419,8 +454,7 @@ impl Handler for ControlPlane { // TODO: Refine the event. Notify index will have the effect to reload the entire state from // the metastore. We should update the state of the control plane. - self.indexing_scheduler - .schedule_indexing_plan_if_needed(&self.model); + self.rebuild_plan_debounced(ctx); let response = EmptyResponse {}; Ok(Ok(response)) @@ -436,7 +470,7 @@ impl Handler for ControlPlane { async fn handle( &mut self, request: ToggleSourceRequest, - _ctx: &ActorContext, + ctx: &ActorContext, ) -> Result { let index_uid: IndexUid = request.index_uid.clone().into(); let source_id = request.source_id.clone(); @@ -449,8 +483,7 @@ impl Handler for ControlPlane { let has_changed = self.model.toggle_source(&index_uid, &source_id, enable)?; if has_changed { - self.indexing_scheduler - .schedule_indexing_plan_if_needed(&self.model); + self.rebuild_plan_debounced(ctx); } Ok(Ok(EmptyResponse {})) @@ -466,7 +499,7 @@ impl Handler for ControlPlane { async fn handle( &mut self, request: DeleteSourceRequest, - _ctx: &ActorContext, + ctx: &ActorContext, ) -> Result, ActorExitStatus> { let index_uid: IndexUid = request.index_uid.clone().into(); let source_id = request.source_id.clone(); @@ -498,8 +531,7 @@ impl Handler for ControlPlane { self.model.delete_source(&source_uid); - self.indexing_scheduler - .schedule_indexing_plan_if_needed(&self.model); + self.rebuild_plan_debounced(ctx); let response = EmptyResponse {}; Ok(Ok(response)) @@ -529,9 +561,7 @@ impl Handler for ControlPlane { return Ok(Err(control_plane_error)); } }; - // TODO: Why do we return an error if the indexing scheduler fails? - self.indexing_scheduler - .schedule_indexing_plan_if_needed(&self.model); + self.rebuild_plan_debounced(ctx); Ok(Ok(response)) } } @@ -548,8 +578,7 @@ impl Handler for ControlPlane { self.ingest_controller .handle_local_shards_update(local_shards_update, &mut self.model, ctx.progress()) .await; - self.indexing_scheduler - .schedule_indexing_plan_if_needed(&self.model); + self.rebuild_plan_debounced(ctx); Ok(Ok(())) } } @@ -1113,7 +1142,7 @@ mod tests { #[tokio::test] async fn test_delete_shard_on_eof() { quickwit_common::setup_logging_for_tests(); - let universe = Universe::default(); + let universe = Universe::with_accelerated_time(); let node_id = NodeId::new("control-plane-node".to_string()); let indexer_pool = IndexerPool::default(); let (client_mailbox, client_inbox) = universe.create_test_mailbox(); @@ -1215,6 +1244,7 @@ mod tests { assert_eq!(indexing_tasks[0].shard_ids, [ShardId::from(17)]); let _ = client_inbox.drain_for_test(); + universe.sleep(Duration::from_secs(30)).await; // This update should trigger the deletion of the shard and a new indexing plan. control_plane_mailbox .ask(ShardPositionsUpdate { diff --git a/quickwit/quickwit-control-plane/src/debouncer.rs b/quickwit/quickwit-control-plane/src/debouncer.rs new file mode 100644 index 00000000000..270b2469b17 --- /dev/null +++ b/quickwit/quickwit-control-plane/src/debouncer.rs @@ -0,0 +1,185 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use quickwit_actors::{Actor, ActorContext, DeferableReplyHandler, Handler}; + +#[derive(Clone)] +pub struct Debouncer { + cool_down_period: Duration, + cooldown_state: Arc, +} + +impl Debouncer { + pub fn new(cool_down_period: Duration) -> Debouncer { + Debouncer { + cool_down_period, + cooldown_state: Arc::new(AtomicU8::new(NO_COOLDOWN)), + } + } +} + +const NO_COOLDOWN: u8 = 0u8; + +// We are possibly within the cooldown period, and no event has been scheduled right after +// the cooldown yet. +const COOLDOWN_NOT_SCHEDULED: u8 = 1u8; + +// We are within the cooldown period, and an event has been scheduled right after +// the cooldown. +const COOLDOWN_SCHEDULED: u8 = 2u8; + +impl Debouncer { + pub fn self_send_with_cooldown(&self, ctx: &ActorContext) + where + A: Actor + Handler + DeferableReplyHandler, + M: Default + std::fmt::Debug + Send + Sync + 'static, + { + let cooldown_state = self.cooldown_state.load(Ordering::SeqCst); + if cooldown_state != NO_COOLDOWN { + self.cooldown_state + .store(COOLDOWN_SCHEDULED, Ordering::SeqCst); + return; + } + let ctx_clone = ctx.clone(); + let self_clone = self.clone(); + let callback = move || { + let is_scheduled = self_clone.cooldown_state.load(Ordering::SeqCst); + self_clone + .cooldown_state + .store(NO_COOLDOWN, Ordering::SeqCst); + if is_scheduled == COOLDOWN_SCHEDULED { + self_clone.self_send_with_cooldown(&ctx_clone); + } + }; + ctx.spawn_ctx() + .schedule_event(callback, self.cool_down_period); + let _ = ctx.mailbox().send_message_with_high_priority(M::default()); + self.cooldown_state + .store(COOLDOWN_NOT_SCHEDULED, Ordering::SeqCst); + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use async_trait::async_trait; + use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Universe}; + + use crate::debouncer::Debouncer; + + struct DebouncingActor { + count: usize, + debouncer: Debouncer, + } + + impl DebouncingActor { + pub fn new(cooldown_duration: Duration) -> DebouncingActor { + DebouncingActor { + count: 0, + debouncer: Debouncer::new(cooldown_duration), + } + } + } + + #[derive(Debug, Default)] + struct Increment; + + #[derive(Debug)] + struct DebouncedIncrement; + + #[async_trait] + impl Actor for DebouncingActor { + type ObservableState = usize; + + fn observable_state(&self) -> Self::ObservableState { + self.count + } + } + + #[async_trait] + impl Handler for DebouncingActor { + type Reply = (); + + async fn handle( + &mut self, + _message: Increment, + _ctx: &ActorContext, + ) -> Result { + self.count += 1; + Ok(()) + } + } + + #[async_trait] + impl Handler for DebouncingActor { + type Reply = (); + + async fn handle( + &mut self, + _message: DebouncedIncrement, + ctx: &ActorContext, + ) -> Result { + self.debouncer.self_send_with_cooldown::<_, Increment>(ctx); + Ok(()) + } + } + + #[tokio::test] + async fn test_debouncer() { + let universe = Universe::default(); + let cooldown_period = Duration::from_millis(1_000); + let debouncer = DebouncingActor::new(cooldown_period); + let (debouncer_mailbox, debouncer_handle) = universe.spawn_builder().spawn(debouncer); + { + let count = *debouncer_handle.process_pending_and_observe().await; + assert_eq!(count, 0); + } + { + let _ = debouncer_mailbox.ask(DebouncedIncrement).await; + let count = *debouncer_handle.process_pending_and_observe().await; + assert_eq!(count, 1); + } + for _ in 0..10 { + let _ = debouncer_mailbox.ask(DebouncedIncrement).await; + let count = *debouncer_handle.process_pending_and_observe().await; + assert_eq!(count, 1); + } + { + universe.sleep(cooldown_period.mul_f32(1.2f32)).await; + let count = *debouncer_handle.process_pending_and_observe().await; + assert_eq!(count, 2); + } + { + let _ = debouncer_mailbox.ask(DebouncedIncrement).await; + let count = *debouncer_handle.process_pending_and_observe().await; + assert_eq!(count, 2); + } + { + universe.sleep(cooldown_period * 2).await; + let count = *debouncer_handle.process_pending_and_observe().await; + assert_eq!(count, 3); + } + universe.assert_quit().await; + } +} diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 57f77e655c5..a241cbbede4 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -196,7 +196,10 @@ impl IndexingScheduler { // Should be called whenever a change in the list of index/shard // has happened. - pub(crate) fn schedule_indexing_plan_if_needed(&mut self, model: &ControlPlaneModel) { + // + // Prefer not calling this method directly, and instead call + // `ControlPlane::rebuild_indexing_plan_debounced`. + pub(crate) fn rebuild_plan(&mut self, model: &ControlPlaneModel) { crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc(); let sources = get_sources_to_schedule(model); @@ -252,7 +255,7 @@ impl IndexingScheduler { // If there is no plan, the node is probably starting and the scheduler did not find // indexers yet. In this case, we want to schedule as soon as possible to find new // indexers. - self.schedule_indexing_plan_if_needed(model); + self.rebuild_plan(model); return; }; @@ -278,7 +281,7 @@ impl IndexingScheduler { ); if !indexing_plans_diff.has_same_nodes() { info!(plans_diff=?indexing_plans_diff, "running plan and last applied plan node IDs differ: schedule an indexing plan"); - self.schedule_indexing_plan_if_needed(model); + self.rebuild_plan(model); } else if !indexing_plans_diff.has_same_tasks() { // Some nodes may have not received their tasks, apply it again. info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan"); diff --git a/quickwit/quickwit-control-plane/src/lib.rs b/quickwit/quickwit-control-plane/src/lib.rs index ac0e3cdecb3..65ac176af8d 100644 --- a/quickwit/quickwit-control-plane/src/lib.rs +++ b/quickwit/quickwit-control-plane/src/lib.rs @@ -37,5 +37,6 @@ pub struct IndexerNodeInfo { pub type IndexerPool = Pool; +mod debouncer; #[cfg(test)] mod tests; diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index d0f4f5d2b40..633dd0931e3 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -120,6 +120,8 @@ const READINESS_REPORTING_INTERVAL: Duration = if cfg!(any(test, feature = "test Duration::from_secs(10) }; +const MAX_CONCURRENT_METASTORE_REQUESTS: usize = 6; + struct QuickwitServices { pub node_config: Arc, pub cluster: Cluster, @@ -296,6 +298,9 @@ pub async fn serve_quickwit( .stack_add_source_layer(broker_layer.clone()) .stack_delete_source_layer(broker_layer.clone()) .stack_toggle_source_layer(broker_layer) + .stack_layer(tower::limit::GlobalConcurrencyLimitLayer::new( + MAX_CONCURRENT_METASTORE_REQUESTS, + )) .build(metastore); Some(metastore) } else {