Skip to content

Commit

Permalink
Bugfix (#4043)
Browse files Browse the repository at this point in the history
* Bugfix
- Update the control plane model on toggle source.
- Delete source from index config in control plane model
on delete source.

* Apply suggestions from code review

Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
fulmicoton and guilload authored Oct 28, 2023
1 parent 742347b commit ee33492
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 19 deletions.
64 changes: 47 additions & 17 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,17 +317,21 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
request: ToggleSourceRequest,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let index_uid: IndexUid = request.index_uid.clone().into();
let source_id = request.source_id.clone();
let enable = request.enable;

if let Err(error) = self.metastore.toggle_source(request).await {
return Ok(Err(ControlPlaneError::from(error)));
};

// TODO update the internal view.
// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.indexing_scheduler.on_index_change(&self.model).await?;
let has_changed = self.model.toggle_source(&index_uid, &source_id, enable)?;

let response = EmptyResponse {};
Ok(Ok(response))
if has_changed {
self.indexing_scheduler.on_index_change(&self.model).await?;
}

Ok(Ok(EmptyResponse {}))
}
}

Expand Down Expand Up @@ -557,6 +561,7 @@ mod tests {

#[tokio::test]
async fn test_control_plane_toggle_source() {
quickwit_common::setup_logging_for_tests();
let universe = Universe::with_accelerated_time();

let cluster_id = "test-cluster".to_string();
Expand All @@ -565,20 +570,36 @@ mod tests {
let ingester_pool = IngesterPool::default();

let mut mock_metastore = MetastoreServiceClient::mock();
let mut index_metadata = IndexMetadata::for_test("test-index", "ram://toto");
let test_source_config = SourceConfig::for_test("test-source", SourceParams::void());
index_metadata.add_source(test_source_config).unwrap();
mock_metastore
.expect_list_indexes_metadata()
.return_once(|_| {
Ok(
ListIndexesMetadataResponse::try_from_indexes_metadata(vec![index_metadata])
.unwrap(),
)
});

mock_metastore
.expect_toggle_source()
.withf(|toggle_source_request| {
.times(1)
.return_once(|toggle_source_request| {
assert_eq!(toggle_source_request.index_uid, "test-index:0");
assert_eq!(toggle_source_request.source_id, "test-source");
assert!(toggle_source_request.enable);
true
})
.returning(|_| Ok(EmptyResponse {}));
Ok(EmptyResponse {})
});
mock_metastore
.expect_list_indexes_metadata()
.returning(|_| {
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(Vec::new()).unwrap())
.expect_toggle_source()
.times(1)
.return_once(|toggle_source_request| {
assert_eq!(toggle_source_request.index_uid, "test-index:0");
assert_eq!(toggle_source_request.source_id, "test-source");
assert!(!toggle_source_request.enable);
Ok(EmptyResponse {})
});

let replication_factor = 1;

let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn(
Expand All @@ -590,17 +611,26 @@ mod tests {
MetastoreServiceClient::from(mock_metastore),
replication_factor,
);
let toggle_source_request = ToggleSourceRequest {

let enabling_source_req = ToggleSourceRequest {
index_uid: "test-index:0".to_string(),
source_id: "test-source".to_string(),
enable: true,
};
control_plane_mailbox
.ask_for_res(toggle_source_request)
.ask_for_res(enabling_source_req)
.await
.unwrap();

// TODO: Test that delete index event is properly sent to ingest controller.
let disabling_source_req = ToggleSourceRequest {
index_uid: "test-index:0".to_string(),
source_id: "test-source".to_string(),
enable: false,
};
control_plane_mailbox
.ask_for_res(disabling_source_req)
.await
.unwrap();

universe.assert_quit().await;
}
Expand Down
79 changes: 77 additions & 2 deletions quickwit/quickwit-control-plane/src/control_plane_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::collections::hash_map::Entry;
use std::time::Instant;

use anyhow::bail;
use fnv::{FnvHashMap, FnvHashSet};
#[cfg(test)]
use itertools::Itertools;
Expand All @@ -37,7 +38,7 @@ use quickwit_proto::metastore::{
};
use quickwit_proto::types::{IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId};
use serde::Serialize;
use tracing::{error, info};
use tracing::{error, info, warn};

use crate::SourceUid;

Expand Down Expand Up @@ -245,7 +246,35 @@ impl ControlPlaneModel {
}

pub(crate) fn delete_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) {
// Removing shards from shard table.
self.shard_table.delete_source(index_uid, source_id);
// Remove source from index config.
let Some(index_model) = self.index_table.get_mut(index_uid) else {
warn!(index_uid=%index_uid, source_id=%source_id, "delete source: index not found");
return;
};
if index_model.sources.remove(source_id).is_none() {
warn!(index_uid=%index_uid, source_id=%source_id, "delete source: source not found");
};
}

/// Returns `true` if the source status has changed, `false` otherwise.
/// Returns an error if the source could not be found.
pub(crate) fn toggle_source(
&mut self,
index_uid: &IndexUid,
source_id: &SourceId,
enable: bool,
) -> anyhow::Result<bool> {
let Some(index_model) = self.index_table.get_mut(index_uid) else {
bail!("index `{index_uid}` not found");
};
let Some(source_config) = index_model.sources.get_mut(source_id) else {
bail!("source `{source_id}` not found.");
};
let has_changed = source_config.enabled != enable;
source_config.enabled = enable;
Ok(has_changed)
}

/// Removes the shards identified by their index UID, source ID, and shard IDs.
Expand Down Expand Up @@ -492,7 +521,7 @@ impl ShardTable {

#[cfg(test)]
mod tests {
use quickwit_config::SourceConfig;
use quickwit_config::{SourceConfig, SourceParams};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::ingest::Shard;
use quickwit_proto::metastore::ListIndexesMetadataResponse;
Expand Down Expand Up @@ -859,4 +888,50 @@ mod tests {
assert_eq!(shards.len(), 0);
assert_eq!(table_entry.next_shard_id, 1);
}

#[test]
fn test_control_plane_model_toggle_source() {
let mut model = ControlPlaneModel::default();
let index_metadata = IndexMetadata::for_test("test-index", "ram://");
let index_uid = index_metadata.index_uid.clone();
model.add_index(index_metadata);
let source_config = SourceConfig::for_test("test-source", SourceParams::void());
model.add_source(&index_uid, source_config).unwrap();
{
let has_changed = model
.toggle_source(&index_uid, &"test-source".to_string(), true)
.unwrap();
assert!(!has_changed);
}
{
let has_changed = model
.toggle_source(&index_uid, &"test-source".to_string(), true)
.unwrap();
assert!(!has_changed);
}
{
let has_changed = model
.toggle_source(&index_uid, &"test-source".to_string(), false)
.unwrap();
assert!(has_changed);
}
{
let has_changed = model
.toggle_source(&index_uid, &"test-source".to_string(), false)
.unwrap();
assert!(!has_changed);
}
{
let has_changed = model
.toggle_source(&index_uid, &"test-source".to_string(), true)
.unwrap();
assert!(has_changed);
}
{
let has_changed = model
.toggle_source(&index_uid, &"test-source".to_string(), true)
.unwrap();
assert!(!has_changed);
}
}
}

0 comments on commit ee33492

Please sign in to comment.