From cdd98ea5893c3aaa1925af799c176801ee08e878 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 5 Oct 2023 11:38:23 +0900 Subject: [PATCH] Added remove source --- .../quickwit-control-plane/src/control_plane.rs | 13 ++++--------- .../src/ingest/ingest_controller.rs | 12 ++++++++++-- quickwit/quickwit-control-plane/src/scheduler.rs | 2 +- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 4d4049daaf3..f38de7a6ef1 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -32,7 +32,7 @@ use quickwit_proto::control_plane::{ CloseShardsRequest, CloseShardsResponse, ControlPlaneError, ControlPlaneResult, GetOpenShardsRequest, GetOpenShardsResponse, }; -use quickwit_proto::metastore::events::{DeleteSourceEvent, ToggleSourceEvent}; +use quickwit_proto::metastore::events::ToggleSourceEvent; use quickwit_proto::metastore::{ serde_utils as metastore_serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteIndexRequest, DeleteSourceRequest, EmptyResponse, @@ -267,9 +267,7 @@ impl Handler for ControlPlane { return Ok(Err(ControlPlaneError::from(error))); }; - self.ingest_controller - .add_source(&index_uid, &source_id) - .await; + self.ingest_controller.add_source(&index_uid, &source_id); // TODO: Refine the event. Notify index will have the effect to reload the entire state from // the metastore. We should update the state of the control plane. @@ -330,12 +328,9 @@ impl Handler for ControlPlane { { return Ok(Err(ControlPlaneError::from(error))); }; - let _event = DeleteSourceEvent { - index_uid, - source_id: request.source_id, - }; - // the metastore. We should update the state of the control plane. + self.ingest_controller + .delete_source(&index_uid, &request.source_id); self.indexing_scheduler.on_index_change().await?; let response = EmptyResponse {}; Ok(Ok(response)) diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 0f8eda4d95f..f52dc57fdd1 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -144,6 +144,11 @@ impl ShardTable { } } + fn remove_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { + let key = (index_uid.clone(), source_id.clone()); + self.table_entries.remove(&key); + } + /// Clears the shard table. fn clear(&mut self) { self.table_entries.clear(); @@ -215,7 +220,6 @@ impl ShardTable { /// table. fn remove_shards(&mut self, index_uid: &IndexUid, source_id: &SourceId, shard_ids: &[ShardId]) { let key = (index_uid.clone(), source_id.clone()); - if let Some(table_entry) = self.table_entries.get_mut(&key) { table_entry .shard_entries @@ -595,10 +599,14 @@ impl IngestController { self.shard_table.remove_index(index_uid.index_id()); } - pub(crate) async fn add_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { + pub(crate) fn add_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { self.shard_table.add_source(index_uid, source_id); } + pub(crate) fn delete_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { + self.shard_table.remove_source(index_uid, source_id); + } + pub fn observable_state(&self) -> IngestControllerState { IngestControllerState { num_indexes: self.index_table.len(), diff --git a/quickwit/quickwit-control-plane/src/scheduler.rs b/quickwit/quickwit-control-plane/src/scheduler.rs index 003993b4696..4009c6e1c55 100644 --- a/quickwit/quickwit-control-plane/src/scheduler.rs +++ b/quickwit/quickwit-control-plane/src/scheduler.rs @@ -101,7 +101,7 @@ pub struct IndexingScheduler { } impl fmt::Debug for IndexingScheduler { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("IndexingScheduler") .field("cluster_id", &self.cluster_id) .field("node_id", &self.self_node_id)