Skip to content

Commit

Permalink
Debouncing reschedule.
Browse files Browse the repository at this point in the history
After this change, the creation of a new plan will happen at most once
by window of 5s. Outside of cooldown period, a (relevant) event
triggers a RebuildPlan instant.

Limiting concurrency of the Metastore client using a tower Layer.

Closes #4407
  • Loading branch information
fulmicoton committed Jan 18, 2024
1 parent 6d95fae commit 7208edb
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 36 deletions.
7 changes: 2 additions & 5 deletions quickwit/quickwit-actors/src/actor_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,11 @@ impl<A: Actor> ActorContext<A> {
A: DeferableReplyHandler<M>,
M: Sync + Send + std::fmt::Debug + 'static,
{
let self_mailbox = self.inner.self_mailbox.clone();
let self_mailbox = self.mailbox().clone();
let callback = move || {
let _ = self_mailbox.send_message_with_high_priority(message);
};
self.inner
.spawn_ctx
.scheduler_client
.schedule_event(callback, after_duration);
self.spawn_ctx().schedule_event(callback, after_duration);
}
}

Expand Down
16 changes: 16 additions & 0 deletions quickwit/quickwit-actors/src/spawn_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::time::Duration;

use anyhow::Context;
use quickwit_common::metrics::IntCounter;
use sync_wrapper::SyncWrapper;
Expand Down Expand Up @@ -71,6 +73,20 @@ impl SpawnContext {
registry: self.registry.clone(),
}
}

/// Schedules a new event.
/// Once `timeout` is elapsed, the future `fut` is
/// executed.
///
/// `fut` will be executed in the scheduler task, so it is
/// required to be short.
pub fn schedule_event<F: FnOnce() + Send + Sync + 'static>(
&self,
callback: F,
timeout: Duration,
) {
self.scheduler_client.schedule_event(callback, timeout)
}
}

/// `SpawnBuilder` makes it possible to configure misc parameters before spawning an actor.
Expand Down
86 changes: 58 additions & 28 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeSet;
use std::fmt;
use std::fmt::Formatter;
use std::time::Duration;

use anyhow::Context;
Expand Down Expand Up @@ -47,6 +49,7 @@ use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid};
use serde::Serialize;
use tracing::error;

use crate::debouncer::Debouncer;
use crate::indexing_scheduler::{IndexingScheduler, IndexingSchedulerState};
use crate::ingest::IngestController;
use crate::model::{ControlPlaneModel, ControlPlaneModelMetrics};
Expand All @@ -59,10 +62,15 @@ pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, featur
Duration::from_secs(3)
};

/// Minimum period between two rebuild plan operation.
const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(5);

#[derive(Debug)]
struct ControlPlanLoop;

#[derive(Debug)]
#[derive(Debug, Default)]
struct RebuildPlan;

pub struct ControlPlane {
metastore: MetastoreServiceClient,
model: ControlPlaneModel,
Expand All @@ -75,6 +83,13 @@ pub struct ControlPlane {
// the different ingesters.
indexing_scheduler: IndexingScheduler,
ingest_controller: IngestController,
rebuild_plan_debouncer: Debouncer,
}

impl fmt::Debug for ControlPlane {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("ControlPlane").finish()
}
}

impl ControlPlane {
Expand All @@ -100,6 +115,7 @@ impl ControlPlane {
metastore: metastore.clone(),
indexing_scheduler,
ingest_controller,
rebuild_plan_debouncer: Debouncer::new(REBUILD_PLAN_COOLDOWN_PERIOD),
}
})
}
Expand Down Expand Up @@ -133,8 +149,7 @@ impl Actor for ControlPlane {
.await
.context("failed to initialize the model")?;

self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);

self.ingest_controller.sync_with_all_ingesters(&self.model);

Expand All @@ -145,13 +160,22 @@ impl Actor for ControlPlane {
}

impl ControlPlane {
/// Rebuilds the indexing plan.
///
/// This method includes debouncing logic. Every call will be followed by a cooldown period.
fn rebuild_plan_debounced(&mut self, ctx: &ActorContext<Self>) {
self.rebuild_plan_debouncer
.self_send_with_cooldown::<_, RebuildPlan>(ctx);
}

/// Deletes a set of shards from the metastore and the control plane model.
///
/// If the shards were already absent this operation is considered successful.
async fn delete_shards(
&mut self,
source_uid: &SourceUid,
shards: &[ShardId],
ctx: &ActorContext<ControlPlane>,
) -> anyhow::Result<()> {
let delete_shards_subrequest = DeleteShardsSubrequest {
index_uid: source_uid.index_uid.to_string(),
Expand All @@ -173,8 +197,7 @@ impl ControlPlane {
.await
.context("failed to delete shards in metastore")?;
self.model.delete_shards(source_uid, shards);
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);
Ok(())
}

Expand Down Expand Up @@ -210,14 +233,28 @@ impl ControlPlane {
}
}

#[async_trait]
impl Handler<RebuildPlan> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
_message: RebuildPlan,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.indexing_scheduler.rebuild_plan(&self.model);
Ok(())
}
}

#[async_trait]
impl Handler<ShardPositionsUpdate> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
shard_positions_update: ShardPositionsUpdate,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let Some(shard_entries) = self
.model
Expand All @@ -240,7 +277,7 @@ impl Handler<ShardPositionsUpdate> for ControlPlane {
if shard_ids_to_close.is_empty() {
return Ok(());
}
self.delete_shards(&shard_positions_update.source_uid, &shard_ids_to_close)
self.delete_shards(&shard_positions_update.source_uid, &shard_ids_to_close, ctx)
.await?;
Ok(())
}
Expand Down Expand Up @@ -323,7 +360,7 @@ impl Handler<CreateIndexRequest> for ControlPlane {
async fn handle(
&mut self,
request: CreateIndexRequest,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let index_config = match metastore_serde_utils::from_json_str(&request.index_config_json) {
Ok(index_config) => index_config,
Expand All @@ -341,8 +378,7 @@ impl Handler<CreateIndexRequest> for ControlPlane {

self.model.add_index(index_metadata);

self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);

let response = CreateIndexResponse {
index_uid: index_uid.into(),
Expand All @@ -361,7 +397,7 @@ impl Handler<DeleteIndexRequest> for ControlPlane {
async fn handle(
&mut self,
request: DeleteIndexRequest,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let index_uid: IndexUid = request.index_uid.clone().into();

Expand All @@ -382,8 +418,7 @@ impl Handler<DeleteIndexRequest> for ControlPlane {

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);

let response = EmptyResponse {};
Ok(Ok(response))
Expand All @@ -399,7 +434,7 @@ impl Handler<AddSourceRequest> for ControlPlane {
async fn handle(
&mut self,
request: AddSourceRequest,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let index_uid: IndexUid = request.index_uid.clone().into();
let source_config: SourceConfig =
Expand All @@ -419,8 +454,7 @@ impl Handler<AddSourceRequest> for ControlPlane {

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);

let response = EmptyResponse {};
Ok(Ok(response))
Expand All @@ -436,7 +470,7 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
async fn handle(
&mut self,
request: ToggleSourceRequest,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let index_uid: IndexUid = request.index_uid.clone().into();
let source_id = request.source_id.clone();
Expand All @@ -449,8 +483,7 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
let has_changed = self.model.toggle_source(&index_uid, &source_id, enable)?;

if has_changed {
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);
}

Ok(Ok(EmptyResponse {}))
Expand All @@ -466,7 +499,7 @@ impl Handler<DeleteSourceRequest> for ControlPlane {
async fn handle(
&mut self,
request: DeleteSourceRequest,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<ControlPlaneResult<EmptyResponse>, ActorExitStatus> {
let index_uid: IndexUid = request.index_uid.clone().into();
let source_id = request.source_id.clone();
Expand Down Expand Up @@ -498,8 +531,7 @@ impl Handler<DeleteSourceRequest> for ControlPlane {

self.model.delete_source(&source_uid);

self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);
let response = EmptyResponse {};

Ok(Ok(response))
Expand Down Expand Up @@ -529,9 +561,7 @@ impl Handler<GetOrCreateOpenShardsRequest> for ControlPlane {
return Ok(Err(control_plane_error));
}
};
// TODO: Why do we return an error if the indexing scheduler fails?
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);
Ok(Ok(response))
}
}
Expand All @@ -548,8 +578,7 @@ impl Handler<LocalShardsUpdate> for ControlPlane {
self.ingest_controller
.handle_local_shards_update(local_shards_update, &mut self.model, ctx.progress())
.await;
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);
Ok(Ok(()))
}
}
Expand Down Expand Up @@ -1113,7 +1142,7 @@ mod tests {
#[tokio::test]
async fn test_delete_shard_on_eof() {
quickwit_common::setup_logging_for_tests();
let universe = Universe::default();
let universe = Universe::with_accelerated_time();
let node_id = NodeId::new("control-plane-node".to_string());
let indexer_pool = IndexerPool::default();
let (client_mailbox, client_inbox) = universe.create_test_mailbox();
Expand Down Expand Up @@ -1215,6 +1244,7 @@ mod tests {
assert_eq!(indexing_tasks[0].shard_ids, [ShardId::from(17)]);
let _ = client_inbox.drain_for_test();

universe.sleep(Duration::from_secs(30)).await;
// This update should trigger the deletion of the shard and a new indexing plan.
control_plane_mailbox
.ask(ShardPositionsUpdate {
Expand Down
Loading

0 comments on commit 7208edb

Please sign in to comment.