Skip to content

Commit

Permalink
Removed the notify index change RPC method.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 4, 2023
1 parent 7e1715a commit 6b1fb7b
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 287 deletions.
20 changes: 1 addition & 19 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ use quickwit_ingest::IngesterPool;
use quickwit_metastore::Metastore;
use quickwit_proto::control_plane::{
CloseShardsRequest, CloseShardsResponse, ControlPlaneError, ControlPlaneResult,
GetOpenShardsRequest, GetOpenShardsResponse, NotifyIndexChangeRequest,
NotifyIndexChangeResponse,
GetOpenShardsRequest, GetOpenShardsResponse,
};
use quickwit_proto::metastore::events::{DeleteSourceEvent, ToggleSourceEvent};
use quickwit_proto::metastore::{
Expand All @@ -39,7 +38,6 @@ use quickwit_proto::metastore::{
ToggleSourceRequest,
};
use quickwit_proto::{IndexUid, NodeId};
use tracing::debug;

use crate::ingest::IngestController;
use crate::scheduler::IndexingScheduler;
Expand Down Expand Up @@ -272,22 +270,6 @@ impl Handler<DeleteSourceRequest> for ControlPlane {
}
}

#[async_trait]
impl Handler<NotifyIndexChangeRequest> for ControlPlane {
type Reply = ControlPlaneResult<NotifyIndexChangeResponse>;

async fn handle(
&mut self,
_: NotifyIndexChangeRequest,
_: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
debug!("Index change notification: schedule indexing plan.");
self.indexing_scheduler.on_index_change().await?;
// TODO right now this kills the control plane on error. Is this what we want?
Ok(Ok(NotifyIndexChangeResponse {}))
}
}

#[async_trait]
impl Handler<GetOpenShardsRequest> for ControlPlane {
type Reply = ControlPlaneResult<GetOpenShardsResponse>;
Expand Down
20 changes: 0 additions & 20 deletions quickwit/quickwit-control-plane/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler};
use quickwit_config::{SourceConfig, INGEST_SOURCE_ID};
use quickwit_metastore::{ListIndexesQuery, Metastore};
use quickwit_proto::control_plane::{
ControlPlaneResult, NotifyIndexChangeRequest, NotifyIndexChangeResponse,
};
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingService, IndexingTask};
use quickwit_proto::metastore::{ListShardsRequest, ListShardsSubrequest};
use quickwit_proto::{NodeId, ShardId};
Expand Down Expand Up @@ -374,23 +371,6 @@ impl IndexingScheduler {
}
}

#[async_trait]
impl Handler<NotifyIndexChangeRequest> for IndexingScheduler {
type Reply = ControlPlaneResult<NotifyIndexChangeResponse>;

async fn handle(
&mut self,
_: NotifyIndexChangeRequest,
_: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
debug!("Index change notification: schedule indexing plan.");
self.schedule_indexing_plan_if_needed()
.await
.context("error when scheduling indexing plan")?;
Ok(Ok(NotifyIndexChangeResponse {}))
}
}

#[derive(Debug)]
struct ControlPlanLoop;

Expand Down
13 changes: 0 additions & 13 deletions quickwit/quickwit-proto/protos/quickwit/control_plane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,8 @@ service ControlPlaneService {
rpc GetOpenShards(GetOpenShardsRequest) returns (GetOpenShardsResponse);

rpc CloseShards(CloseShardsRequest) returns (CloseShardsResponse);

// Notify the Control Plane that a change on an index occurred. The change
// can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update.
// Note(fmassot): it's not very clear for a user to know which change triggers a control plane notification.
// This can be explicited in the attributes of `NotifyIndexChangeRequest` with an enum that describes the
// type of change. The index ID and/or source ID could also be added.
// However, these attributes will not be used by the Control Plane, at least at short term.
rpc NotifyIndexChange(NotifyIndexChangeRequest) returns (NotifyIndexChangeResponse);

}

message NotifyIndexChangeRequest {}

message NotifyIndexChangeResponse {}

// Shard API

message GetOpenShardsRequest {
Expand Down
Loading

0 comments on commit 6b1fb7b

Please sign in to comment.