Skip to content

Commit

Permalink
Readding tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 4, 2023
1 parent a079bbb commit b3849c8
Show file tree
Hide file tree
Showing 7 changed files with 494 additions and 465 deletions.
3 changes: 1 addition & 2 deletions quickwit/quickwit-actors/src/actor_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ impl<A: Actor> ActorContext<A> {
/// Executes a future in a protected zone.
pub async fn protect_future<Fut, T>(&self, future: Fut) -> T
where Fut: Future<Output = T> {
let _guard = self.protect_zone();
future.await
self.progress.protect_future(future).await
}

/// Cooperatively yields, while keeping the actor protected.
Expand Down
9 changes: 9 additions & 0 deletions quickwit/quickwit-common/src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use futures::Future;

/// Progress makes it possible to register some progress.
/// It is used in lieu of healthcheck.
///
Expand Down Expand Up @@ -83,6 +85,13 @@ impl Progress {
.fetch_max(ProgressState::Updated.into(), Ordering::Relaxed);
}

/// Executes a future in a protected zone.
pub async fn protect_future<Fut, T>(&self, future: Fut) -> T
where Fut: Future<Output = T> {
let _guard = self.protect_zone();
future.await
}

pub fn protect_zone(&self) -> ProtectedZoneGuard {
loop {
let previous_state: ProgressState = self.0.load(Ordering::SeqCst).into();
Expand Down
63 changes: 28 additions & 35 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::scheduler::{IndexingScheduler, IndexingSchedulerState};
use crate::IndexerPool;

/// Interval between two controls (or checks) of the desired plan VS running plan.
const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
pub const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(500)
} else {
Duration::from_secs(3)
Expand All @@ -58,7 +58,7 @@ const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsu
/// control plan loop.
// Note: it's currently not possible to define a const duration with
// `CONTROL_PLAN_LOOP_INTERVAL * number`.
const REFRESH_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
pub const REFRESH_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_secs(3)
} else {
Duration::from_secs(60)
Expand Down Expand Up @@ -106,24 +106,6 @@ pub struct ControlPlane {
ingest_controller: IngestController,
}

#[async_trait]
impl Handler<ControlPlanLoop> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
_message: ControlPlanLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Err(error) = self.indexing_scheduler.control_running_plan().await {
error!("Error when controlling the running plan: `{}`.", error);
}
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
.await;
Ok(())
}
}

impl ControlPlane {
pub fn spawn(
universe: &Universe,
Expand All @@ -147,10 +129,10 @@ impl ControlPlane {
}
}

#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Default)]
pub struct ControlPlaneObservableState {
ingester_controller: IngestControllerState,
indexing_scheduler: IndexingSchedulerState,
pub ingester_controller: IngestControllerState,
pub indexing_scheduler: IndexingSchedulerState,
}

#[async_trait]
Expand All @@ -170,7 +152,7 @@ impl Actor for ControlPlane {

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
self.ingest_controller
.load_state(ctx)
.load_state(ctx.progress())
.await
.context("failed to initialize ingest controller")?;

Expand All @@ -182,15 +164,23 @@ impl Actor for ControlPlane {
}
}

// macro_rules! handle_ask_res {
// ($ask_res:expr) => {
// match $ask_res {
// Ok(response) => response,
// Err(AskError::ErrorReply(error)) => return Ok(Err(error)),
// Err(error) => return Err(ActorExitStatus::Failure(anyhow::anyhow!(error).into())),
// }
// };
// }
#[async_trait]
impl Handler<ControlPlanLoop> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
_message: ControlPlanLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Err(error) = self.indexing_scheduler.control_running_plan().await {
error!("Error when controlling the running plan: `{}`.", error);
}
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
.await;
Ok(())
}
}

#[async_trait]
impl Handler<CreateIndexRequest> for ControlPlane {
Expand Down Expand Up @@ -361,7 +351,10 @@ impl Handler<GetOpenShardsRequest> for ControlPlane {
request: GetOpenShardsRequest,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
Ok(self.ingest_controller.get_open_shards(request, ctx).await)
Ok(self
.ingest_controller
.get_open_shards(request, ctx.progress())
.await)
}
}

Expand All @@ -377,7 +370,7 @@ impl Handler<CloseShardsRequest> for ControlPlane {
// TODO decide on what the error should be.
let close_shards_resp = self
.ingest_controller
.close_shards(request, ctx)
.close_shards(request, ctx.progress())
.await
.context("Failed to close shards")?;
Ok(Ok(close_shards_resp))
Expand Down
Loading

0 comments on commit b3849c8

Please sign in to comment.