diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs new file mode 100644 index 00000000000..5e47c0d837d --- /dev/null +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -0,0 +1,68 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use anyhow::Context; +use async_trait::async_trait; +use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox}; +use tracing::debug; + +use crate::scheduler::IndexingScheduler; +use crate::{NotifyIndexChangeRequest, NotifyIndexChangeResponse}; + +#[derive(Debug)] +pub struct ControlPlane { + index_scheduler_mailbox: Mailbox, +} + +#[async_trait] +impl Actor for ControlPlane { + type ObservableState = (); + + fn observable_state(&self) -> Self::ObservableState {} + + fn name(&self) -> String { + "ControlPlane".to_string() + } +} + +impl ControlPlane { + pub fn new(index_scheduler_mailbox: Mailbox) -> Self { + Self { + index_scheduler_mailbox, + } + } +} + +#[async_trait] +impl Handler for ControlPlane { + type Reply = crate::Result; + + async fn handle( + &mut self, + request: NotifyIndexChangeRequest, + _: &ActorContext, + ) -> Result { + debug!("Index change notification: schedule indexing plan."); + self.index_scheduler_mailbox + .send_message(request) + .await + .context("Error sending index change notification to index scheduler.")?; + Ok(Ok(NotifyIndexChangeResponse {})) + } +} diff --git a/quickwit/quickwit-control-plane/src/lib.rs b/quickwit/quickwit-control-plane/src/lib.rs index 6dc9d6c05a2..3ccdeb83b1d 100644 --- a/quickwit/quickwit-control-plane/src/lib.rs +++ b/quickwit/quickwit-control-plane/src/lib.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +pub mod control_plane; #[path = "codegen/control_plane_service.rs"] mod control_plane_service; pub mod indexing_plan; @@ -90,7 +91,7 @@ impl From> for ControlPlaneError { } /// Starts the Control Plane. -pub async fn start_control_plane_service( +pub async fn start_indexing_scheduler( cluster_id: String, self_node_id: String, universe: &Universe, diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 9e7b3cf396a..d4ea62446ce 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -62,9 +62,10 @@ use quickwit_common::tower::{ }; use quickwit_config::service::QuickwitService; use quickwit_config::{QuickwitConfig, SearcherConfig}; +use quickwit_control_plane::control_plane::ControlPlane; use quickwit_control_plane::scheduler::IndexingScheduler; use quickwit_control_plane::{ - start_control_plane_service, ControlPlaneServiceClient, IndexerNodeInfo, IndexerPool, + start_indexing_scheduler, ControlPlaneServiceClient, IndexerNodeInfo, IndexerPool, }; use quickwit_core::{IndexService, IndexServiceError}; use quickwit_indexing::actors::IndexingService; @@ -274,7 +275,7 @@ pub async fn serve_quickwit( .contains(&QuickwitService::ControlPlane) { let cluster_change_stream = cluster.ready_nodes_change_stream().await; - let control_plane_mailbox = setup_control_plane_service( + let control_plane_mailbox = start_control_plane( &universe, cluster.cluster_id().to_string(), cluster.self_node_id().to_string(), @@ -522,7 +523,29 @@ async fn setup_searcher( Ok((search_job_placer, search_service)) } -async fn setup_control_plane_service( +async fn start_control_plane( + universe: &Universe, + cluster_id: String, + self_node_id: String, + cluster_change_stream: impl Stream + Send + 'static, + indexing_service: Option>, + metastore: Arc, +) -> anyhow::Result> { + let scheduler = setup_indexing_scheduler( + universe, + cluster_id, + self_node_id, + cluster_change_stream, + indexing_service, + metastore, + ) + .await?; + let control_plane = ControlPlane::new(scheduler); + let (control_plane_mailbox, _) = universe.spawn_builder().spawn(control_plane); + Ok(control_plane_mailbox) +} + +async fn setup_indexing_scheduler( universe: &Universe, cluster_id: String, self_node_id: String, @@ -531,7 +554,7 @@ async fn setup_control_plane_service( metastore: Arc, ) -> anyhow::Result> { let indexer_pool = IndexerPool::default(); - let indexing_scheduler = start_control_plane_service( + let indexing_scheduler = start_indexing_scheduler( cluster_id, self_node_id, universe,