Skip to content

Commit

Permalink
Readding tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 4, 2023
1 parent a079bbb commit ba4f319
Show file tree
Hide file tree
Showing 6 changed files with 534 additions and 485 deletions.
9 changes: 9 additions & 0 deletions quickwit/quickwit-common/src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use futures::Future;

/// Progress makes it possible to register some progress.
/// It is used in lieu of healthcheck.
///
Expand Down Expand Up @@ -83,6 +85,13 @@ impl Progress {
.fetch_max(ProgressState::Updated.into(), Ordering::Relaxed);
}

/// Executes a future in a protected zone.
pub async fn protect_future<Fut, T>(&self, future: Fut) -> T
where Fut: Future<Output = T> {
let _guard = self.protect_zone();
future.await
}

pub fn protect_zone(&self) -> ProtectedZoneGuard {
loop {
let previous_state: ProgressState = self.0.load(Ordering::SeqCst).into();
Expand Down
107 changes: 50 additions & 57 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::scheduler::{IndexingScheduler, IndexingSchedulerState};
use crate::IndexerPool;

/// Interval between two controls (or checks) of the desired plan VS running plan.
const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(500)
} else {
Duration::from_secs(3)
Expand All @@ -58,7 +58,7 @@ const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsu
/// control plan loop.
// Note: it's currently not possible to define a const duration with
// `CONTROL_PLAN_LOOP_INTERVAL * number`.
const REFRESH_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
pub(crate) const REFRESH_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_secs(3)
} else {
Duration::from_secs(60)
Expand All @@ -70,28 +70,6 @@ struct RefreshPlanLoop;
#[derive(Debug)]
struct ControlPlanLoop;

#[async_trait]
impl Handler<RefreshPlanLoop> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
_message: RefreshPlanLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Err(error) = self
.indexing_scheduler
.schedule_indexing_plan_if_needed()
.await
{
error!("Error when scheduling indexing plan: `{}`.", error);
}
ctx.schedule_self_msg(REFRESH_PLAN_LOOP_INTERVAL, RefreshPlanLoop)
.await;
Ok(())
}
}

#[derive(Debug)]
pub struct ControlPlane {
metastore: Arc<dyn Metastore>,
Expand All @@ -106,24 +84,6 @@ pub struct ControlPlane {
ingest_controller: IngestController,
}

#[async_trait]
impl Handler<ControlPlanLoop> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
_message: ControlPlanLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Err(error) = self.indexing_scheduler.control_running_plan().await {
error!("Error when controlling the running plan: `{}`.", error);
}
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
.await;
Ok(())
}
}

impl ControlPlane {
pub fn spawn(
universe: &Universe,
Expand All @@ -147,10 +107,10 @@ impl ControlPlane {
}
}

#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Default)]
pub struct ControlPlaneObservableState {
ingester_controller: IngestControllerState,
indexing_scheduler: IndexingSchedulerState,
pub ingester_controller: IngestControllerState,
pub indexing_scheduler: IndexingSchedulerState,
}

#[async_trait]
Expand All @@ -170,7 +130,7 @@ impl Actor for ControlPlane {

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
self.ingest_controller
.load_state(ctx)
.load_state(ctx.progress())
.await
.context("failed to initialize ingest controller")?;

Expand All @@ -182,15 +142,23 @@ impl Actor for ControlPlane {
}
}

// macro_rules! handle_ask_res {
// ($ask_res:expr) => {
// match $ask_res {
// Ok(response) => response,
// Err(AskError::ErrorReply(error)) => return Ok(Err(error)),
// Err(error) => return Err(ActorExitStatus::Failure(anyhow::anyhow!(error).into())),
// }
// };
// }
#[async_trait]
impl Handler<ControlPlanLoop> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
_message: ControlPlanLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Err(error) = self.indexing_scheduler.control_running_plan().await {
error!("Error when controlling the running plan: `{}`.", error);
}
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
.await;
Ok(())
}
}

#[async_trait]
impl Handler<CreateIndexRequest> for ControlPlane {
Expand Down Expand Up @@ -250,6 +218,28 @@ impl Handler<DeleteIndexRequest> for ControlPlane {
}
}

#[async_trait]
impl Handler<RefreshPlanLoop> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
_message: RefreshPlanLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Err(error) = self
.indexing_scheduler
.schedule_indexing_plan_if_needed()
.await
{
error!("Error when scheduling indexing plan: `{}`.", error);
}
ctx.schedule_self_msg(REFRESH_PLAN_LOOP_INTERVAL, RefreshPlanLoop)
.await;
Ok(())
}
}

#[async_trait]
impl Handler<AddSourceRequest> for ControlPlane {
type Reply = ControlPlaneResult<EmptyResponse>;
Expand Down Expand Up @@ -361,7 +351,10 @@ impl Handler<GetOpenShardsRequest> for ControlPlane {
request: GetOpenShardsRequest,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
Ok(self.ingest_controller.get_open_shards(request, ctx).await)
Ok(self
.ingest_controller
.get_open_shards(request, ctx.progress())
.await)
}
}

Expand All @@ -377,7 +370,7 @@ impl Handler<CloseShardsRequest> for ControlPlane {
// TODO decide on what the error should be.
let close_shards_resp = self
.ingest_controller
.close_shards(request, ctx)
.close_shards(request, ctx.progress())
.await
.context("Failed to close shards")?;
Ok(Ok(close_shards_resp))
Expand Down
Loading

0 comments on commit ba4f319

Please sign in to comment.