diff --git a/quickwit/quickwit-actors/src/universe.rs b/quickwit/quickwit-actors/src/universe.rs
index e2d77cb86ec..04869a351a8 100644
--- a/quickwit/quickwit-actors/src/universe.rs
+++ b/quickwit/quickwit-actors/src/universe.rs
@@ -23,7 +23,7 @@ use std::time::Duration;
use crate::mailbox::create_mailbox;
use crate::registry::ActorObservation;
-use crate::scheduler::{start_scheduler, NoAdvanceTimeGuard};
+use crate::scheduler::start_scheduler;
use crate::spawn_builder::{SpawnBuilder, SpawnContext};
use crate::{Actor, ActorExitStatus, Command, Inbox, Mailbox, QueueCapacity};
@@ -136,10 +136,6 @@ impl Universe {
.values()
.any(|status| matches!(status, ActorExitStatus::Panicked)));
}
-
- pub fn no_advance_guard(&self) -> NoAdvanceTimeGuard {
- self.spawn_ctx().scheduler_client.no_advance_time_guard()
- }
}
impl Drop for Universe {
diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs
index 80cb240f07b..933a93fc669 100644
--- a/quickwit/quickwit-control-plane/src/control_plane.rs
+++ b/quickwit/quickwit-control-plane/src/control_plane.rs
@@ -18,6 +18,8 @@
// along with this program. If not, see .
use std::collections::BTreeSet;
+use std::fmt;
+use std::fmt::Formatter;
use std::time::Duration;
use anyhow::Context;
@@ -60,8 +62,8 @@ pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, featur
Duration::from_secs(3)
};
-/// Minimum period between two reschedules.
-const DEBOUNCE_COOLDOWN_PERIOD: Duration = Duration::from_secs(5);
+/// Minimum period between two rebuild plan operation.
+const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(5);
#[derive(Debug)]
struct ControlPlanLoop;
@@ -84,6 +86,12 @@ pub struct ControlPlane {
rebuild_plan_debouncer: Debouncer,
}
+impl fmt::Debug for ControlPlane {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ f.debug_struct("ControlPlane").finish()
+ }
+}
+
impl ControlPlane {
pub fn spawn(
universe: &Universe,
@@ -107,7 +115,7 @@ impl ControlPlane {
metastore: metastore.clone(),
indexing_scheduler,
ingest_controller,
- rebuild_plan_debouncer: Debouncer::new(DEBOUNCE_COOLDOWN_PERIOD),
+ rebuild_plan_debouncer: Debouncer::new(REBUILD_PLAN_COOLDOWN_PERIOD),
}
})
}
@@ -152,6 +160,9 @@ impl Actor for ControlPlane {
}
impl ControlPlane {
+ /// Rebuilds the indexing plan.
+ ///
+ /// This method includes debouncing logic. Every call will be followed by a cooldown period.
fn rebuild_plan_debounced(&mut self, ctx: &ActorContext) {
self.rebuild_plan_debouncer
.self_send_with_cooldown::<_, RebuildPlan>(ctx);
@@ -231,7 +242,7 @@ impl Handler for ControlPlane {
_message: RebuildPlan,
_ctx: &ActorContext,
) -> Result<(), ActorExitStatus> {
- self.indexing_scheduler.rebuild_indexing_plan(&self.model);
+ self.indexing_scheduler.rebuild_plan(&self.model);
Ok(())
}
}
diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
index 2a586255691..a241cbbede4 100644
--- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
+++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
@@ -196,7 +196,10 @@ impl IndexingScheduler {
// Should be called whenever a change in the list of index/shard
// has happened.
- pub(crate) fn rebuild_indexing_plan(&mut self, model: &ControlPlaneModel) {
+ //
+ // Prefer not calling this method directly, and instead call
+ // `ControlPlane::rebuild_indexing_plan_debounced`.
+ pub(crate) fn rebuild_plan(&mut self, model: &ControlPlaneModel) {
crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc();
let sources = get_sources_to_schedule(model);
@@ -252,7 +255,7 @@ impl IndexingScheduler {
// If there is no plan, the node is probably starting and the scheduler did not find
// indexers yet. In this case, we want to schedule as soon as possible to find new
// indexers.
- self.rebuild_indexing_plan(model);
+ self.rebuild_plan(model);
return;
};
@@ -278,7 +281,7 @@ impl IndexingScheduler {
);
if !indexing_plans_diff.has_same_nodes() {
info!(plans_diff=?indexing_plans_diff, "running plan and last applied plan node IDs differ: schedule an indexing plan");
- self.rebuild_indexing_plan(model);
+ self.rebuild_plan(model);
} else if !indexing_plans_diff.has_same_tasks() {
// Some nodes may have not received their tasks, apply it again.
info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan");
diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs
index d0f4f5d2b40..633dd0931e3 100644
--- a/quickwit/quickwit-serve/src/lib.rs
+++ b/quickwit/quickwit-serve/src/lib.rs
@@ -120,6 +120,8 @@ const READINESS_REPORTING_INTERVAL: Duration = if cfg!(any(test, feature = "test
Duration::from_secs(10)
};
+const MAX_CONCURRENT_METASTORE_REQUESTS: usize = 6;
+
struct QuickwitServices {
pub node_config: Arc,
pub cluster: Cluster,
@@ -296,6 +298,9 @@ pub async fn serve_quickwit(
.stack_add_source_layer(broker_layer.clone())
.stack_delete_source_layer(broker_layer.clone())
.stack_toggle_source_layer(broker_layer)
+ .stack_layer(tower::limit::GlobalConcurrencyLimitLayer::new(
+ MAX_CONCURRENT_METASTORE_REQUESTS,
+ ))
.build(metastore);
Some(metastore)
} else {