diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs
new file mode 100644
index 00000000000..5e47c0d837d
--- /dev/null
+++ b/quickwit/quickwit-control-plane/src/control_plane.rs
@@ -0,0 +1,68 @@
+// Copyright (C) 2023 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use anyhow::Context;
+use async_trait::async_trait;
+use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox};
+use tracing::debug;
+
+use crate::scheduler::IndexingScheduler;
+use crate::{NotifyIndexChangeRequest, NotifyIndexChangeResponse};
+
+#[derive(Debug)]
+pub struct ControlPlane {
+ index_scheduler_mailbox: Mailbox,
+}
+
+#[async_trait]
+impl Actor for ControlPlane {
+ type ObservableState = ();
+
+ fn observable_state(&self) -> Self::ObservableState {}
+
+ fn name(&self) -> String {
+ "ControlPlane".to_string()
+ }
+}
+
+impl ControlPlane {
+ pub fn new(index_scheduler_mailbox: Mailbox) -> Self {
+ Self {
+ index_scheduler_mailbox,
+ }
+ }
+}
+
+#[async_trait]
+impl Handler for ControlPlane {
+ type Reply = crate::Result;
+
+ async fn handle(
+ &mut self,
+ request: NotifyIndexChangeRequest,
+ _: &ActorContext,
+ ) -> Result {
+ debug!("Index change notification: schedule indexing plan.");
+ self.index_scheduler_mailbox
+ .send_message(request)
+ .await
+ .context("Error sending index change notification to index scheduler.")?;
+ Ok(Ok(NotifyIndexChangeResponse {}))
+ }
+}
diff --git a/quickwit/quickwit-control-plane/src/lib.rs b/quickwit/quickwit-control-plane/src/lib.rs
index 6dc9d6c05a2..3ccdeb83b1d 100644
--- a/quickwit/quickwit-control-plane/src/lib.rs
+++ b/quickwit/quickwit-control-plane/src/lib.rs
@@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
+pub mod control_plane;
#[path = "codegen/control_plane_service.rs"]
mod control_plane_service;
pub mod indexing_plan;
@@ -90,7 +91,7 @@ impl From> for ControlPlaneError {
}
/// Starts the Control Plane.
-pub async fn start_control_plane_service(
+pub async fn start_indexing_scheduler(
cluster_id: String,
self_node_id: String,
universe: &Universe,
diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs
index 9e7b3cf396a..c8b754906a5 100644
--- a/quickwit/quickwit-serve/src/lib.rs
+++ b/quickwit/quickwit-serve/src/lib.rs
@@ -62,9 +62,10 @@ use quickwit_common::tower::{
};
use quickwit_config::service::QuickwitService;
use quickwit_config::{QuickwitConfig, SearcherConfig};
+use quickwit_control_plane::control_plane::ControlPlane;
use quickwit_control_plane::scheduler::IndexingScheduler;
use quickwit_control_plane::{
- start_control_plane_service, ControlPlaneServiceClient, IndexerNodeInfo, IndexerPool,
+ start_indexing_scheduler, ControlPlaneServiceClient, IndexerNodeInfo, IndexerPool,
};
use quickwit_core::{IndexService, IndexServiceError};
use quickwit_indexing::actors::IndexingService;
@@ -274,7 +275,7 @@ pub async fn serve_quickwit(
.contains(&QuickwitService::ControlPlane)
{
let cluster_change_stream = cluster.ready_nodes_change_stream().await;
- let control_plane_mailbox = setup_control_plane_service(
+ let control_plane_mailbox = start_control_plane(
&universe,
cluster.cluster_id().to_string(),
cluster.self_node_id().to_string(),
@@ -522,7 +523,29 @@ async fn setup_searcher(
Ok((search_job_placer, search_service))
}
-async fn setup_control_plane_service(
+async fn start_control_plane(
+ universe: &Universe,
+ cluster_id: String,
+ self_node_id: String,
+ cluster_change_stream: impl Stream- + Send + 'static,
+ indexing_service: Option>,
+ metastore: Arc,
+) -> anyhow::Result> {
+ let scheduler = setup_indexer_scheduler(
+ universe,
+ cluster_id,
+ self_node_id,
+ cluster_change_stream,
+ indexing_service,
+ metastore,
+ )
+ .await?;
+ let control_plane = ControlPlane::new(scheduler);
+ let (control_plane_mailbox, _) = universe.spawn_builder().spawn(control_plane);
+ Ok(control_plane_mailbox)
+}
+
+async fn setup_indexer_scheduler(
universe: &Universe,
cluster_id: String,
self_node_id: String,
@@ -531,7 +554,7 @@ async fn setup_control_plane_service(
metastore: Arc,
) -> anyhow::Result> {
let indexer_pool = IndexerPool::default();
- let indexing_scheduler = start_control_plane_service(
+ let indexing_scheduler = start_indexing_scheduler(
cluster_id,
self_node_id,
universe,