From 6b1fb7b806564298500db005f6a38f5701f745cd Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 4 Oct 2023 14:04:16 +0900 Subject: [PATCH] Removed the notify index change RPC method. --- .../src/control_plane.rs | 20 +- .../quickwit-control-plane/src/scheduler.rs | 20 -- .../protos/quickwit/control_plane.proto | 13 - .../quickwit/quickwit.control_plane.rs | 236 +----------------- 4 files changed, 2 insertions(+), 287 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index bd634c7570d..42777ab4233 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -29,8 +29,7 @@ use quickwit_ingest::IngesterPool; use quickwit_metastore::Metastore; use quickwit_proto::control_plane::{ CloseShardsRequest, CloseShardsResponse, ControlPlaneError, ControlPlaneResult, - GetOpenShardsRequest, GetOpenShardsResponse, NotifyIndexChangeRequest, - NotifyIndexChangeResponse, + GetOpenShardsRequest, GetOpenShardsResponse, }; use quickwit_proto::metastore::events::{DeleteSourceEvent, ToggleSourceEvent}; use quickwit_proto::metastore::{ @@ -39,7 +38,6 @@ use quickwit_proto::metastore::{ ToggleSourceRequest, }; use quickwit_proto::{IndexUid, NodeId}; -use tracing::debug; use crate::ingest::IngestController; use crate::scheduler::IndexingScheduler; @@ -272,22 +270,6 @@ impl Handler for ControlPlane { } } -#[async_trait] -impl Handler for ControlPlane { - type Reply = ControlPlaneResult; - - async fn handle( - &mut self, - _: NotifyIndexChangeRequest, - _: &ActorContext, - ) -> Result { - debug!("Index change notification: schedule indexing plan."); - self.indexing_scheduler.on_index_change().await?; - // TODO right now this kills the control plane on error. Is this what we want? - Ok(Ok(NotifyIndexChangeResponse {})) - } -} - #[async_trait] impl Handler for ControlPlane { type Reply = ControlPlaneResult; diff --git a/quickwit/quickwit-control-plane/src/scheduler.rs b/quickwit/quickwit-control-plane/src/scheduler.rs index dea2a1650e7..8b4ee037428 100644 --- a/quickwit/quickwit-control-plane/src/scheduler.rs +++ b/quickwit/quickwit-control-plane/src/scheduler.rs @@ -29,9 +29,6 @@ use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler}; use quickwit_config::{SourceConfig, INGEST_SOURCE_ID}; use quickwit_metastore::{ListIndexesQuery, Metastore}; -use quickwit_proto::control_plane::{ - ControlPlaneResult, NotifyIndexChangeRequest, NotifyIndexChangeResponse, -}; use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingService, IndexingTask}; use quickwit_proto::metastore::{ListShardsRequest, ListShardsSubrequest}; use quickwit_proto::{NodeId, ShardId}; @@ -374,23 +371,6 @@ impl IndexingScheduler { } } -#[async_trait] -impl Handler for IndexingScheduler { - type Reply = ControlPlaneResult; - - async fn handle( - &mut self, - _: NotifyIndexChangeRequest, - _: &ActorContext, - ) -> Result { - debug!("Index change notification: schedule indexing plan."); - self.schedule_indexing_plan_if_needed() - .await - .context("error when scheduling indexing plan")?; - Ok(Ok(NotifyIndexChangeResponse {})) - } -} - #[derive(Debug)] struct ControlPlanLoop; diff --git a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto index 38d30d084f5..7d6c7ca25ba 100644 --- a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto +++ b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto @@ -58,21 +58,8 @@ service ControlPlaneService { rpc GetOpenShards(GetOpenShardsRequest) returns (GetOpenShardsResponse); rpc CloseShards(CloseShardsRequest) returns (CloseShardsResponse); - - // Notify the Control Plane that a change on an index occurred. The change - // can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update. - // Note(fmassot): it's not very clear for a user to know which change triggers a control plane notification. - // This can be explicited in the attributes of `NotifyIndexChangeRequest` with an enum that describes the - // type of change. The index ID and/or source ID could also be added. - // However, these attributes will not be used by the Control Plane, at least at short term. - rpc NotifyIndexChange(NotifyIndexChangeRequest) returns (NotifyIndexChangeResponse); - } -message NotifyIndexChangeRequest {} - -message NotifyIndexChangeResponse {} - // Shard API message GetOpenShardsRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index d51f4844745..d1d9e815561 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -1,14 +1,6 @@ #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct NotifyIndexChangeRequest {} -#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct NotifyIndexChangeResponse {} -#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] pub struct GetOpenShardsRequest { #[prost(message, repeated, tag = "1")] pub subrequests: ::prost::alloc::vec::Vec, @@ -109,16 +101,6 @@ pub trait ControlPlaneService: std::fmt::Debug + dyn_clone::DynClone + Send + Sy &mut self, request: CloseShardsRequest, ) -> crate::control_plane::ControlPlaneResult; - /// Notify the Control Plane that a change on an index occurred. The change - /// can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update. - /// Note(fmassot): it's not very clear for a user to know which change triggers a control plane notification. - /// This can be explicited in the attributes of `NotifyIndexChangeRequest` with an enum that describes the - /// type of change. The index ID and/or source ID could also be added. - /// However, these attributes will not be used by the Control Plane, at least at short term. - async fn notify_index_change( - &mut self, - request: NotifyIndexChangeRequest, - ) -> crate::control_plane::ControlPlaneResult; } dyn_clone::clone_trait_object!(ControlPlaneService); #[cfg(any(test, feature = "testsuite"))] @@ -231,12 +213,6 @@ impl ControlPlaneService for ControlPlaneServiceClient { ) -> crate::control_plane::ControlPlaneResult { self.inner.close_shards(request).await } - async fn notify_index_change( - &mut self, - request: NotifyIndexChangeRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.inner.notify_index_change(request).await - } } #[cfg(any(test, feature = "testsuite"))] pub mod control_plane_service_mock { @@ -299,12 +275,6 @@ pub mod control_plane_service_mock { ) -> crate::control_plane::ControlPlaneResult { self.inner.lock().await.close_shards(request).await } - async fn notify_index_change( - &mut self, - request: super::NotifyIndexChangeRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.inner.lock().await.notify_index_change(request).await - } } impl From for ControlPlaneServiceClient { fn from(mock: MockControlPlaneService) -> Self { @@ -435,22 +405,6 @@ impl tower::Service for Box { Box::pin(fut) } } -impl tower::Service for Box { - type Response = NotifyIndexChangeResponse; - type Error = crate::control_plane::ControlPlaneError; - type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) - } - fn call(&mut self, request: NotifyIndexChangeRequest) -> Self::Future { - let mut svc = self.clone(); - let fut = async move { svc.notify_index_change(request).await }; - Box::pin(fut) - } -} /// A tower block is a set of towers. Each tower is stack of layers (middlewares) that are applied to a service. #[derive(Debug)] struct ControlPlaneServiceTowerBlock { @@ -489,11 +443,6 @@ struct ControlPlaneServiceTowerBlock { CloseShardsResponse, crate::control_plane::ControlPlaneError, >, - notify_index_change_svc: quickwit_common::tower::BoxService< - NotifyIndexChangeRequest, - NotifyIndexChangeResponse, - crate::control_plane::ControlPlaneError, - >, } impl Clone for ControlPlaneServiceTowerBlock { fn clone(&self) -> Self { @@ -505,7 +454,6 @@ impl Clone for ControlPlaneServiceTowerBlock { delete_source_svc: self.delete_source_svc.clone(), get_open_shards_svc: self.get_open_shards_svc.clone(), close_shards_svc: self.close_shards_svc.clone(), - notify_index_change_svc: self.notify_index_change_svc.clone(), } } } @@ -555,12 +503,6 @@ impl ControlPlaneService for ControlPlaneServiceTowerBlock { ) -> crate::control_plane::ControlPlaneResult { self.close_shards_svc.ready().await?.call(request).await } - async fn notify_index_change( - &mut self, - request: NotifyIndexChangeRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.notify_index_change_svc.ready().await?.call(request).await - } } #[derive(Debug, Default)] pub struct ControlPlaneServiceTowerBlockBuilder { @@ -627,15 +569,6 @@ pub struct ControlPlaneServiceTowerBlockBuilder { crate::control_plane::ControlPlaneError, >, >, - #[allow(clippy::type_complexity)] - notify_index_change_layer: Option< - quickwit_common::tower::BoxLayer< - Box, - NotifyIndexChangeRequest, - NotifyIndexChangeResponse, - crate::control_plane::ControlPlaneError, - >, - >, } impl ControlPlaneServiceTowerBlockBuilder { pub fn shared_layer(mut self, layer: L) -> Self @@ -693,12 +626,6 @@ impl ControlPlaneServiceTowerBlockBuilder { Error = crate::control_plane::ControlPlaneError, > + Clone + Send + Sync + 'static, >::Future: Send + 'static, - L::Service: tower::Service< - NotifyIndexChangeRequest, - Response = NotifyIndexChangeResponse, - Error = crate::control_plane::ControlPlaneError, - > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, { self .create_index_layer = Some( @@ -724,14 +651,7 @@ impl ControlPlaneServiceTowerBlockBuilder { .get_open_shards_layer = Some( quickwit_common::tower::BoxLayer::new(layer.clone()), ); - self - .close_shards_layer = Some( - quickwit_common::tower::BoxLayer::new(layer.clone()), - ); - self - .notify_index_change_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); + self.close_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn create_index_layer(mut self, layer: L) -> Self @@ -835,22 +755,6 @@ impl ControlPlaneServiceTowerBlockBuilder { self.close_shards_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn notify_index_change_layer(mut self, layer: L) -> Self - where - L: tower::Layer> + Send + Sync + 'static, - L::Service: tower::Service< - NotifyIndexChangeRequest, - Response = NotifyIndexChangeResponse, - Error = crate::control_plane::ControlPlaneError, - > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - { - self - .notify_index_change_layer = Some( - quickwit_common::tower::BoxLayer::new(layer), - ); - self - } pub fn build(self, instance: T) -> ControlPlaneServiceClient where T: ControlPlaneService, @@ -924,12 +828,6 @@ impl ControlPlaneServiceTowerBlockBuilder { } else { quickwit_common::tower::BoxService::new(boxed_instance.clone()) }; - let notify_index_change_svc = if let Some(layer) = self.notify_index_change_layer - { - layer.layer(boxed_instance.clone()) - } else { - quickwit_common::tower::BoxService::new(boxed_instance.clone()) - }; let tower_block = ControlPlaneServiceTowerBlock { create_index_svc, delete_index_svc, @@ -938,7 +836,6 @@ impl ControlPlaneServiceTowerBlockBuilder { delete_source_svc, get_open_shards_svc, close_shards_svc, - notify_index_change_svc, }; ControlPlaneServiceClient::new(tower_block) } @@ -1077,15 +974,6 @@ where CloseShardsResponse, crate::control_plane::ControlPlaneError, >, - > - + tower::Service< - NotifyIndexChangeRequest, - Response = NotifyIndexChangeResponse, - Error = crate::control_plane::ControlPlaneError, - Future = BoxFuture< - NotifyIndexChangeResponse, - crate::control_plane::ControlPlaneError, - >, >, { async fn create_index( @@ -1132,12 +1020,6 @@ where ) -> crate::control_plane::ControlPlaneResult { self.call(request).await } - async fn notify_index_change( - &mut self, - request: NotifyIndexChangeRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.call(request).await - } } #[derive(Debug, Clone)] pub struct ControlPlaneServiceGrpcClientAdapter { @@ -1233,16 +1115,6 @@ where .map(|response| response.into_inner()) .map_err(|error| error.into()) } - async fn notify_index_change( - &mut self, - request: NotifyIndexChangeRequest, - ) -> crate::control_plane::ControlPlaneResult { - self.inner - .notify_index_change(request) - .await - .map(|response| response.into_inner()) - .map_err(|error| error.into()) - } } #[derive(Debug)] pub struct ControlPlaneServiceGrpcServerAdapter { @@ -1336,17 +1208,6 @@ for ControlPlaneServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(|error| error.into()) } - async fn notify_index_change( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - self.inner - .clone() - .notify_index_change(request.into_inner()) - .await - .map(tonic::Response::new) - .map_err(|error| error.into()) - } } /// Generated client implementations. pub mod control_plane_service_grpc_client { @@ -1656,42 +1517,6 @@ pub mod control_plane_service_grpc_client { ); self.inner.unary(req, path, codec).await } - /// Notify the Control Plane that a change on an index occurred. The change - /// can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update. - /// Note(fmassot): it's not very clear for a user to know which change triggers a control plane notification. - /// This can be explicited in the attributes of `NotifyIndexChangeRequest` with an enum that describes the - /// type of change. The index ID and/or source ID could also be added. - /// However, these attributes will not be used by the Control Plane, at least at short term. - pub async fn notify_index_change( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/quickwit.control_plane.ControlPlaneService/NotifyIndexChange", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new( - "quickwit.control_plane.ControlPlaneService", - "NotifyIndexChange", - ), - ); - self.inner.unary(req, path, codec).await - } } } /// Generated server implementations. @@ -1757,19 +1582,6 @@ pub mod control_plane_service_grpc_server { tonic::Response, tonic::Status, >; - /// Notify the Control Plane that a change on an index occurred. The change - /// can be an index creation, deletion, or update that includes a source creation/deletion/num pipeline update. - /// Note(fmassot): it's not very clear for a user to know which change triggers a control plane notification. - /// This can be explicited in the attributes of `NotifyIndexChangeRequest` with an enum that describes the - /// type of change. The index ID and/or source ID could also be added. - /// However, these attributes will not be used by the Control Plane, at least at short term. - async fn notify_index_change( - &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; } #[derive(Debug)] pub struct ControlPlaneServiceGrpcServer { @@ -2186,52 +1998,6 @@ pub mod control_plane_service_grpc_server { }; Box::pin(fut) } - "/quickwit.control_plane.ControlPlaneService/NotifyIndexChange" => { - #[allow(non_camel_case_types)] - struct NotifyIndexChangeSvc(pub Arc); - impl< - T: ControlPlaneServiceGrpc, - > tonic::server::UnaryService - for NotifyIndexChangeSvc { - type Response = super::NotifyIndexChangeResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - (*inner).notify_index_change(request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = NotifyIndexChangeSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } _ => { Box::pin(async move { Ok(