Skip to content

Commit

Permalink
Added remove source
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 5, 2023
1 parent 793644f commit cdd98ea
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 12 deletions.
13 changes: 4 additions & 9 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use quickwit_proto::control_plane::{
CloseShardsRequest, CloseShardsResponse, ControlPlaneError, ControlPlaneResult,
GetOpenShardsRequest, GetOpenShardsResponse,
};
use quickwit_proto::metastore::events::{DeleteSourceEvent, ToggleSourceEvent};
use quickwit_proto::metastore::events::ToggleSourceEvent;
use quickwit_proto::metastore::{
serde_utils as metastore_serde_utils, AddSourceRequest, CreateIndexRequest,
CreateIndexResponse, DeleteIndexRequest, DeleteSourceRequest, EmptyResponse,
Expand Down Expand Up @@ -267,9 +267,7 @@ impl Handler<AddSourceRequest> for ControlPlane {
return Ok(Err(ControlPlaneError::from(error)));
};

self.ingest_controller
.add_source(&index_uid, &source_id)
.await;
self.ingest_controller.add_source(&index_uid, &source_id);

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
Expand Down Expand Up @@ -330,12 +328,9 @@ impl Handler<DeleteSourceRequest> for ControlPlane {
{
return Ok(Err(ControlPlaneError::from(error)));
};
let _event = DeleteSourceEvent {
index_uid,
source_id: request.source_id,
};

// the metastore. We should update the state of the control plane.
self.ingest_controller
.delete_source(&index_uid, &request.source_id);
self.indexing_scheduler.on_index_change().await?;
let response = EmptyResponse {};
Ok(Ok(response))
Expand Down
12 changes: 10 additions & 2 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ impl ShardTable {
}
}

fn remove_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) {
let key = (index_uid.clone(), source_id.clone());
self.table_entries.remove(&key);
}

/// Clears the shard table.
fn clear(&mut self) {
self.table_entries.clear();
Expand Down Expand Up @@ -215,7 +220,6 @@ impl ShardTable {
/// table.
fn remove_shards(&mut self, index_uid: &IndexUid, source_id: &SourceId, shard_ids: &[ShardId]) {
let key = (index_uid.clone(), source_id.clone());

if let Some(table_entry) = self.table_entries.get_mut(&key) {
table_entry
.shard_entries
Expand Down Expand Up @@ -595,10 +599,14 @@ impl IngestController {
self.shard_table.remove_index(index_uid.index_id());
}

pub(crate) async fn add_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) {
pub(crate) fn add_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) {
self.shard_table.add_source(index_uid, source_id);
}

pub(crate) fn delete_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) {
self.shard_table.remove_source(index_uid, source_id);
}

pub fn observable_state(&self) -> IngestControllerState {
IngestControllerState {
num_indexes: self.index_table.len(),
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub struct IndexingScheduler {
}

impl fmt::Debug for IndexingScheduler {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("IndexingScheduler")
.field("cluster_id", &self.cluster_id)
.field("node_id", &self.self_node_id)
Expand Down

0 comments on commit cdd98ea

Please sign in to comment.