Skip to content

Commit

Permalink
Removing Actor impl for ingest controller and indexing scheduler.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 4, 2023
1 parent 6b1fb7b commit a079bbb
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 118 deletions.
102 changes: 92 additions & 10 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use async_trait::async_trait;
Expand All @@ -38,11 +39,59 @@ use quickwit_proto::metastore::{
ToggleSourceRequest,
};
use quickwit_proto::{IndexUid, NodeId};
use serde::Serialize;
use tracing::error;

use crate::ingest::ingest_controller::IngestControllerState;
use crate::ingest::IngestController;
use crate::scheduler::IndexingScheduler;
use crate::scheduler::{IndexingScheduler, IndexingSchedulerState};
use crate::IndexerPool;

/// Interval between two controls (or checks) of the desired plan VS running plan.
const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(500)
} else {
Duration::from_secs(3)
};

/// Interval between two scheduling of indexing plans. No need to be faster than the
/// control plan loop.
// Note: it's currently not possible to define a const duration with
// `CONTROL_PLAN_LOOP_INTERVAL * number`.
const REFRESH_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_secs(3)
} else {
Duration::from_secs(60)
};

#[derive(Debug)]
struct RefreshPlanLoop;

#[derive(Debug)]
struct ControlPlanLoop;

#[async_trait]
impl Handler<RefreshPlanLoop> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
_message: RefreshPlanLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Err(error) = self
.indexing_scheduler
.schedule_indexing_plan_if_needed()
.await
{
error!("Error when scheduling indexing plan: `{}`.", error);
}
ctx.schedule_self_msg(REFRESH_PLAN_LOOP_INTERVAL, RefreshPlanLoop)
.await;
Ok(())
}
}

#[derive(Debug)]
pub struct ControlPlane {
metastore: Arc<dyn Metastore>,
Expand All @@ -57,6 +106,24 @@ pub struct ControlPlane {
ingest_controller: IngestController,
}

#[async_trait]
impl Handler<ControlPlanLoop> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
_message: ControlPlanLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Err(error) = self.indexing_scheduler.control_running_plan().await {
error!("Error when controlling the running plan: `{}`.", error);
}
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
.await;
Ok(())
}
}

impl ControlPlane {
pub fn spawn(
universe: &Universe,
Expand All @@ -80,23 +147,38 @@ impl ControlPlane {
}
}

#[derive(Debug, Clone, Serialize)]
pub struct ControlPlaneObservableState {
ingester_controller: IngestControllerState,
indexing_scheduler: IndexingSchedulerState,
}

#[async_trait]
impl Actor for ControlPlane {
type ObservableState = (
<IndexingScheduler as Actor>::ObservableState,
<IngestController as Actor>::ObservableState,
);
type ObservableState = ControlPlaneObservableState;

fn name(&self) -> String {
"ControlPlane".to_string()
}

fn observable_state(&self) -> Self::ObservableState {
todo!()
// (
// self.indexing_scheduler_handle.last_observation(),
// self.ingest_controller_handle.last_observation(),
// )
ControlPlaneObservableState {
ingester_controller: self.ingest_controller.observable_state(),
indexing_scheduler: self.indexing_scheduler.observable_state(),
}
}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
self.ingest_controller
.load_state(ctx)
.await
.context("failed to initialize ingest controller")?;

self.handle(RefreshPlanLoop, ctx).await?;
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
.await;

Ok(())
}
}

Expand Down
40 changes: 15 additions & 25 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::Context;
use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus};
use quickwit_actors::ActorContext;
use quickwit_config::INGEST_SOURCE_ID;
use quickwit_ingest::IngesterPool;
use quickwit_metastore::{ListIndexesQuery, Metastore};
Expand All @@ -42,7 +40,7 @@ use quickwit_proto::metastore::{EntityKind, MetastoreError};
use quickwit_proto::types::{IndexId, NodeId, SourceId};
use quickwit_proto::{metastore, IndexUid, NodeIdRef, ShardId};
use rand::seq::SliceRandom;
use serde_json::{json, Value as JsonValue};
use serde::Serialize;
use tokio::time::timeout;
use tracing::{error, info};

Expand Down Expand Up @@ -269,7 +267,10 @@ impl IngestController {
}
}

async fn load_state(&mut self, ctx: &ActorContext<Self>) -> ControlPlaneResult<()> {
pub(crate) async fn load_state(
&mut self,
ctx: &ActorContext<ControlPlane>,
) -> ControlPlaneResult<()> {
info!("syncing internal state with metastore");
let now = Instant::now();

Expand Down Expand Up @@ -581,26 +582,9 @@ enum PingError {
FollowerUnavailable,
}

#[async_trait]
impl Actor for IngestController {
type ObservableState = JsonValue;

fn observable_state(&self) -> Self::ObservableState {
json!({
"num_indexes": self.index_table.len(),
})
}

fn name(&self) -> String {
"IngestController".to_string()
}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
self.load_state(ctx)
.await
.context("failed to initialize ingest controller")?;
Ok(())
}
#[derive(Debug, Clone, Serialize)]
pub struct IngestControllerState {
num_indexes: usize,
}

impl IngestController {
Expand All @@ -619,6 +603,12 @@ impl IngestController {
pub(crate) async fn add_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) {
self.shard_table.add_source(index_uid, source_id);
}

pub fn observable_state(&self) -> IngestControllerState {
IngestControllerState {
num_indexes: self.index_table.len(),
}
}
}

#[cfg(test)]
Expand Down
89 changes: 6 additions & 83 deletions quickwit/quickwit-control-plane/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::Context;
use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler};
use quickwit_config::{SourceConfig, INGEST_SOURCE_ID};
use quickwit_metastore::{ListIndexesQuery, Metastore};
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingService, IndexingTask};
Expand All @@ -40,23 +38,6 @@ use crate::indexing_plan::{
};
use crate::{IndexerNodeInfo, IndexerPool};

/// Interval between two controls (or checks) of the desired plan VS running plan.
const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(500)
} else {
Duration::from_secs(3)
};

/// Interval between two scheduling of indexing plans. No need to be faster than the
/// control plan loop.
// Note: it's currently not possible to define a const duration with
// `CONTROL_PLAN_LOOP_INTERVAL * number`.
const REFRESH_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_secs(3)
} else {
Duration::from_secs(60)
};

const MIN_DURATION_BETWEEN_SCHEDULING: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(50)
} else {
Expand Down Expand Up @@ -128,26 +109,6 @@ impl fmt::Debug for IndexingScheduler {
}
}

#[async_trait]
impl Actor for IndexingScheduler {
type ObservableState = IndexingSchedulerState;

fn observable_state(&self) -> Self::ObservableState {
self.state.clone()
}

fn name(&self) -> String {
"IndexingScheduler".to_string()
}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
self.handle(RefreshPlanLoop, ctx).await?;
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
.await;
Ok(())
}
}

impl IndexingScheduler {
pub fn new(
cluster_id: String,
Expand All @@ -164,7 +125,11 @@ impl IndexingScheduler {
}
}

async fn schedule_indexing_plan_if_needed(&mut self) -> anyhow::Result<()> {
pub fn observable_state(&self) -> IndexingSchedulerState {
self.state.clone()
}

pub(crate) async fn schedule_indexing_plan_if_needed(&mut self) -> anyhow::Result<()> {
let mut indexers = self.get_indexers_from_indexer_pool().await;
if indexers.is_empty() {
warn!("No indexer available, cannot schedule an indexing plan.");
Expand Down Expand Up @@ -280,7 +245,7 @@ impl IndexingScheduler {
/// chitchat cluster state. If true, do nothing.
/// - If node IDs differ, schedule a new indexing plan.
/// - If indexing tasks differ, apply again the last plan.
async fn control_running_plan(&mut self) -> anyhow::Result<()> {
pub(crate) async fn control_running_plan(&mut self) -> anyhow::Result<()> {
let last_applied_plan =
if let Some(last_applied_plan) = self.state.last_applied_physical_plan.as_ref() {
last_applied_plan
Expand Down Expand Up @@ -371,48 +336,6 @@ impl IndexingScheduler {
}
}

#[derive(Debug)]
struct ControlPlanLoop;

#[async_trait]
impl Handler<ControlPlanLoop> for IndexingScheduler {
type Reply = ();

async fn handle(
&mut self,
_message: ControlPlanLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Err(error) = self.control_running_plan().await {
error!("Error when controlling the running plan: `{}`.", error);
}
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
.await;
Ok(())
}
}

#[derive(Debug)]
struct RefreshPlanLoop;

#[async_trait]
impl Handler<RefreshPlanLoop> for IndexingScheduler {
type Reply = ();

async fn handle(
&mut self,
_message: RefreshPlanLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Err(error) = self.schedule_indexing_plan_if_needed().await {
error!("Error when scheduling indexing plan: `{}`.", error);
}
ctx.schedule_self_msg(REFRESH_PLAN_LOOP_INTERVAL, RefreshPlanLoop)
.await;
Ok(())
}
}

struct IndexingPlansDiff<'a> {
pub missing_node_ids: HashSet<&'a str>,
pub unplanned_node_ids: HashSet<&'a str>,
Expand Down

0 comments on commit a079bbb

Please sign in to comment.