Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debouncing reschedule. #4416

Merged
merged 4 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions quickwit/quickwit-actors/src/actor_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,11 @@ impl<A: Actor> ActorContext<A> {
A: DeferableReplyHandler<M>,
M: Sync + Send + std::fmt::Debug + 'static,
{
let self_mailbox = self.inner.self_mailbox.clone();
let self_mailbox = self.mailbox().clone();
let callback = move || {
let _ = self_mailbox.send_message_with_high_priority(message);
};
self.inner
.spawn_ctx
.scheduler_client
.schedule_event(callback, after_duration);
self.spawn_ctx().schedule_event(callback, after_duration);
}
}

Expand Down
16 changes: 16 additions & 0 deletions quickwit/quickwit-actors/src/spawn_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::time::Duration;

use anyhow::Context;
use quickwit_common::metrics::IntCounter;
use sync_wrapper::SyncWrapper;
Expand Down Expand Up @@ -71,6 +73,20 @@ impl SpawnContext {
registry: self.registry.clone(),
}
}

/// Schedules a new event.
/// Once `timeout` is elapsed, the future `fut` is
/// executed.
///
/// `fut` will be executed in the scheduler task, so it is
/// required to be short.
pub fn schedule_event<F: FnOnce() + Send + Sync + 'static>(
&self,
callback: F,
timeout: Duration,
) {
self.scheduler_client.schedule_event(callback, timeout)
}
}

/// `SpawnBuilder` makes it possible to configure misc parameters before spawning an actor.
Expand Down
86 changes: 58 additions & 28 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeSet;
use std::fmt;
use std::fmt::Formatter;
use std::time::Duration;

use anyhow::Context;
Expand Down Expand Up @@ -47,6 +49,7 @@ use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid};
use serde::Serialize;
use tracing::error;

use crate::debouncer::Debouncer;
use crate::indexing_scheduler::{IndexingScheduler, IndexingSchedulerState};
use crate::ingest::IngestController;
use crate::model::{ControlPlaneModel, ControlPlaneModelMetrics};
Expand All @@ -59,10 +62,15 @@ pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, featur
Duration::from_secs(3)
};

/// Minimum period between two rebuild plan operations.
const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(2);

#[derive(Debug)]
struct ControlPlanLoop;

#[derive(Debug)]
#[derive(Debug, Default)]
struct RebuildPlan;

pub struct ControlPlane {
metastore: MetastoreServiceClient,
model: ControlPlaneModel,
Expand All @@ -75,6 +83,13 @@ pub struct ControlPlane {
// the different ingesters.
indexing_scheduler: IndexingScheduler,
ingest_controller: IngestController,
rebuild_plan_debouncer: Debouncer,
}

impl fmt::Debug for ControlPlane {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("ControlPlane").finish()
}
}

impl ControlPlane {
Expand All @@ -100,6 +115,7 @@ impl ControlPlane {
metastore: metastore.clone(),
indexing_scheduler,
ingest_controller,
rebuild_plan_debouncer: Debouncer::new(REBUILD_PLAN_COOLDOWN_PERIOD),
}
})
}
Expand Down Expand Up @@ -133,8 +149,7 @@ impl Actor for ControlPlane {
.await
.context("failed to initialize the model")?;

self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);

self.ingest_controller.sync_with_all_ingesters(&self.model);

Expand All @@ -145,13 +160,22 @@ impl Actor for ControlPlane {
}

impl ControlPlane {
/// Rebuilds the indexing plan.
///
/// This method includes debouncing logic. Every call will be followed by a cooldown period.
fn rebuild_plan_debounced(&mut self, ctx: &ActorContext<Self>) {
self.rebuild_plan_debouncer
.self_send_with_cooldown::<_, RebuildPlan>(ctx);
}

/// Deletes a set of shards from the metastore and the control plane model.
///
/// If the shards were already absent this operation is considered successful.
async fn delete_shards(
&mut self,
source_uid: &SourceUid,
shards: &[ShardId],
ctx: &ActorContext<ControlPlane>,
) -> anyhow::Result<()> {
let delete_shards_subrequest = DeleteShardsSubrequest {
index_uid: source_uid.index_uid.to_string(),
Expand All @@ -173,8 +197,7 @@ impl ControlPlane {
.await
.context("failed to delete shards in metastore")?;
self.model.delete_shards(source_uid, shards);
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);
Ok(())
}

Expand Down Expand Up @@ -210,14 +233,28 @@ impl ControlPlane {
}
}

#[async_trait]
impl Handler<RebuildPlan> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
_message: RebuildPlan,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.indexing_scheduler.rebuild_plan(&self.model);
Ok(())
}
}

#[async_trait]
impl Handler<ShardPositionsUpdate> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
shard_positions_update: ShardPositionsUpdate,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let Some(shard_entries) = self
.model
Expand All @@ -240,7 +277,7 @@ impl Handler<ShardPositionsUpdate> for ControlPlane {
if shard_ids_to_close.is_empty() {
return Ok(());
}
self.delete_shards(&shard_positions_update.source_uid, &shard_ids_to_close)
self.delete_shards(&shard_positions_update.source_uid, &shard_ids_to_close, ctx)
.await?;
Ok(())
}
Expand Down Expand Up @@ -323,7 +360,7 @@ impl Handler<CreateIndexRequest> for ControlPlane {
async fn handle(
&mut self,
request: CreateIndexRequest,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let index_config = match metastore_serde_utils::from_json_str(&request.index_config_json) {
Ok(index_config) => index_config,
Expand All @@ -341,8 +378,7 @@ impl Handler<CreateIndexRequest> for ControlPlane {

self.model.add_index(index_metadata);

self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);

let response = CreateIndexResponse {
index_uid: index_uid.into(),
Expand All @@ -361,7 +397,7 @@ impl Handler<DeleteIndexRequest> for ControlPlane {
async fn handle(
&mut self,
request: DeleteIndexRequest,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let index_uid: IndexUid = request.index_uid.clone().into();

Expand All @@ -382,8 +418,7 @@ impl Handler<DeleteIndexRequest> for ControlPlane {

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);

let response = EmptyResponse {};
Ok(Ok(response))
Expand All @@ -399,7 +434,7 @@ impl Handler<AddSourceRequest> for ControlPlane {
async fn handle(
&mut self,
request: AddSourceRequest,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let index_uid: IndexUid = request.index_uid.clone().into();
let source_config: SourceConfig =
Expand All @@ -419,8 +454,7 @@ impl Handler<AddSourceRequest> for ControlPlane {

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);

let response = EmptyResponse {};
Ok(Ok(response))
Expand All @@ -436,7 +470,7 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
async fn handle(
&mut self,
request: ToggleSourceRequest,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let index_uid: IndexUid = request.index_uid.clone().into();
let source_id = request.source_id.clone();
Expand All @@ -449,8 +483,7 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
let has_changed = self.model.toggle_source(&index_uid, &source_id, enable)?;

if has_changed {
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);
}

Ok(Ok(EmptyResponse {}))
Expand All @@ -466,7 +499,7 @@ impl Handler<DeleteSourceRequest> for ControlPlane {
async fn handle(
&mut self,
request: DeleteSourceRequest,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<ControlPlaneResult<EmptyResponse>, ActorExitStatus> {
let index_uid: IndexUid = request.index_uid.clone().into();
let source_id = request.source_id.clone();
Expand Down Expand Up @@ -498,8 +531,7 @@ impl Handler<DeleteSourceRequest> for ControlPlane {

self.model.delete_source(&source_uid);

self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);
let response = EmptyResponse {};

Ok(Ok(response))
Expand Down Expand Up @@ -529,9 +561,7 @@ impl Handler<GetOrCreateOpenShardsRequest> for ControlPlane {
return Ok(Err(control_plane_error));
}
};
// TODO: Why do we return an error if the indexing scheduler fails?
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);
Ok(Ok(response))
}
}
Expand All @@ -548,8 +578,7 @@ impl Handler<LocalShardsUpdate> for ControlPlane {
self.ingest_controller
.handle_local_shards_update(local_shards_update, &mut self.model, ctx.progress())
.await;
self.indexing_scheduler
.schedule_indexing_plan_if_needed(&self.model);
self.rebuild_plan_debounced(ctx);
Ok(Ok(()))
}
}
Expand Down Expand Up @@ -1113,7 +1142,7 @@ mod tests {
#[tokio::test]
async fn test_delete_shard_on_eof() {
quickwit_common::setup_logging_for_tests();
let universe = Universe::default();
let universe = Universe::with_accelerated_time();
let node_id = NodeId::new("control-plane-node".to_string());
let indexer_pool = IndexerPool::default();
let (client_mailbox, client_inbox) = universe.create_test_mailbox();
Expand Down Expand Up @@ -1215,6 +1244,7 @@ mod tests {
assert_eq!(indexing_tasks[0].shard_ids, [ShardId::from(17)]);
let _ = client_inbox.drain_for_test();

universe.sleep(Duration::from_secs(30)).await;
// This update should trigger the deletion of the shard and a new indexing plan.
control_plane_mailbox
.ask(ShardPositionsUpdate {
Expand Down
Loading