Skip to content

Commit

Permalink
Added a layer to limit concurrency on the metastore client.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jan 18, 2024
1 parent de5aeb4 commit 83bfeb6
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 12 deletions.
6 changes: 1 addition & 5 deletions quickwit/quickwit-actors/src/universe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::time::Duration;

use crate::mailbox::create_mailbox;
use crate::registry::ActorObservation;
use crate::scheduler::{start_scheduler, NoAdvanceTimeGuard};
use crate::scheduler::start_scheduler;
use crate::spawn_builder::{SpawnBuilder, SpawnContext};
use crate::{Actor, ActorExitStatus, Command, Inbox, Mailbox, QueueCapacity};

Expand Down Expand Up @@ -136,10 +136,6 @@ impl Universe {
.values()
.any(|status| matches!(status, ActorExitStatus::Panicked)));
}

pub fn no_advance_guard(&self) -> NoAdvanceTimeGuard {
self.spawn_ctx().scheduler_client.no_advance_time_guard()
}
}

impl Drop for Universe {
Expand Down
19 changes: 15 additions & 4 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeSet;
use std::fmt;
use std::fmt::Formatter;
use std::time::Duration;

use anyhow::Context;
Expand Down Expand Up @@ -60,8 +62,8 @@ pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, featur
Duration::from_secs(3)
};

/// Minimum period between two reschedules.
const DEBOUNCE_COOLDOWN_PERIOD: Duration = Duration::from_secs(5);
/// Minimum period between two rebuild plan operation.
const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(5);

#[derive(Debug)]
struct ControlPlanLoop;
Expand All @@ -84,6 +86,12 @@ pub struct ControlPlane {
rebuild_plan_debouncer: Debouncer,
}

impl fmt::Debug for ControlPlane {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("ControlPlane").finish()
}
}

impl ControlPlane {
pub fn spawn(
universe: &Universe,
Expand All @@ -107,7 +115,7 @@ impl ControlPlane {
metastore: metastore.clone(),
indexing_scheduler,
ingest_controller,
rebuild_plan_debouncer: Debouncer::new(DEBOUNCE_COOLDOWN_PERIOD),
rebuild_plan_debouncer: Debouncer::new(REBUILD_PLAN_COOLDOWN_PERIOD),
}
})
}
Expand Down Expand Up @@ -152,6 +160,9 @@ impl Actor for ControlPlane {
}

impl ControlPlane {
/// Rebuilds the indexing plan.
///
/// This method includes debouncing logic. Every call will be followed by a cooldown period.
fn rebuild_plan_debounced(&mut self, ctx: &ActorContext<Self>) {
self.rebuild_plan_debouncer
.self_send_with_cooldown::<_, RebuildPlan>(ctx);
Expand Down Expand Up @@ -231,7 +242,7 @@ impl Handler<RebuildPlan> for ControlPlane {
_message: RebuildPlan,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.indexing_scheduler.rebuild_indexing_plan(&self.model);
self.indexing_scheduler.rebuild_plan(&self.model);
Ok(())
}
}
Expand Down
9 changes: 6 additions & 3 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ impl IndexingScheduler {

// Should be called whenever a change in the list of index/shard
// has happened.
pub(crate) fn rebuild_indexing_plan(&mut self, model: &ControlPlaneModel) {
//
// Prefer not calling this method directly, and instead call
// `ControlPlane::rebuild_indexing_plan_debounced`.
pub(crate) fn rebuild_plan(&mut self, model: &ControlPlaneModel) {
crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc();

let sources = get_sources_to_schedule(model);
Expand Down Expand Up @@ -252,7 +255,7 @@ impl IndexingScheduler {
// If there is no plan, the node is probably starting and the scheduler did not find
// indexers yet. In this case, we want to schedule as soon as possible to find new
// indexers.
self.rebuild_indexing_plan(model);
self.rebuild_plan(model);
return;
};

Expand All @@ -278,7 +281,7 @@ impl IndexingScheduler {
);
if !indexing_plans_diff.has_same_nodes() {
info!(plans_diff=?indexing_plans_diff, "running plan and last applied plan node IDs differ: schedule an indexing plan");
self.rebuild_indexing_plan(model);
self.rebuild_plan(model);
} else if !indexing_plans_diff.has_same_tasks() {
// Some nodes may have not received their tasks, apply it again.
info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan");
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ const READINESS_REPORTING_INTERVAL: Duration = if cfg!(any(test, feature = "test
Duration::from_secs(10)
};

const MAX_CONCURRENT_METASTORE_REQUESTS: usize = 6;

struct QuickwitServices {
pub node_config: Arc<NodeConfig>,
pub cluster: Cluster,
Expand Down Expand Up @@ -296,6 +298,9 @@ pub async fn serve_quickwit(
.stack_add_source_layer(broker_layer.clone())
.stack_delete_source_layer(broker_layer.clone())
.stack_toggle_source_layer(broker_layer)
.stack_layer(tower::limit::GlobalConcurrencyLimitLayer::new(
MAX_CONCURRENT_METASTORE_REQUESTS,
))
.build(metastore);
Some(metastore)
} else {
Expand Down

0 comments on commit 83bfeb6

Please sign in to comment.