Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restrict list shards to enabled ingest V2 sources #4067

Merged
merged 1 commit into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ impl SourceConfig {
.expect("`SourceParams` should be JSON serializable")
}

/// Creates an ingest source.
pub fn ingest_default() -> Self {
/// Creates an ingest source v2.
pub fn ingest_v2_default() -> Self {
Self {
source_id: INGEST_SOURCE_ID.to_string(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"),
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,8 @@ mod tests {
.expect_list_indexes_metadata()
.returning(|_| {
let mut index_metadata = IndexMetadata::for_test("test-index", "ram:///test-index");
let source_config = SourceConfig::for_test(INGEST_SOURCE_ID, SourceParams::void());
let mut source_config = SourceConfig::ingest_v2_default();
source_config.enabled = true;
index_metadata.add_source(source_config).unwrap();
Ok(
ListIndexesMetadataResponse::try_from_indexes_metadata(vec![index_metadata])
Expand Down Expand Up @@ -772,7 +773,7 @@ mod tests {
let mut mock_metastore = MetastoreServiceClient::mock();

let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0");
let source = SourceConfig::ingest_default();
let source = SourceConfig::ingest_v2_default();
index_0.add_source(source.clone()).unwrap();

mock_metastore
Expand Down
47 changes: 29 additions & 18 deletions quickwit/quickwit-control-plane/src/control_plane_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ use fnv::{FnvHashMap, FnvHashSet};
#[cfg(test)]
use itertools::Itertools;
use quickwit_common::Progress;
use quickwit_config::{SourceConfig, INGEST_SOURCE_ID};
use quickwit_config::SourceConfig;
use quickwit_metastore::{IndexMetadata, ListIndexesMetadataResponseExt};
use quickwit_proto::control_plane::ControlPlaneResult;
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore;
use quickwit_proto::metastore::{
EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, MetastoreError, MetastoreService,
MetastoreServiceClient,
self, EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, MetastoreError,
MetastoreService, MetastoreServiceClient, SourceType,
};
use quickwit_proto::types::{IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId};
use serde::Serialize;
Expand Down Expand Up @@ -117,10 +116,12 @@ impl ControlPlaneModel {
) -> ControlPlaneResult<()> {
let now = Instant::now();
self.clear();

let index_metadatas = progress
.protect_future(metastore.list_indexes_metadata(ListIndexesMetadataRequest::all()))
.await?
.deserialize_indexes_metadata()?;

let num_indexes = index_metadatas.len();
self.index_table.reserve(num_indexes);

Expand All @@ -134,16 +135,17 @@ impl ControlPlaneModel {
}

for index_metadata in self.index_table.values() {
for source_id in index_metadata.sources.keys() {
if source_id != INGEST_SOURCE_ID {
for source_config in index_metadata.sources.values() {
num_sources += 1;

if source_config.source_type() != SourceType::IngestV2 || !source_config.enabled {
continue;
}
let request = ListShardsSubrequest {
index_uid: index_metadata.index_uid.clone().into(),
source_id: source_id.to_string(),
source_id: source_config.source_id.clone(),
shard_state: Some(ShardState::Open as i32),
};
num_sources += 1;
subrequests.push(request);
}
}
Expand Down Expand Up @@ -519,7 +521,7 @@ impl ShardTable {

#[cfg(test)]
mod tests {
use quickwit_config::{SourceConfig, SourceParams};
use quickwit_config::{SourceConfig, SourceParams, INGEST_SOURCE_ID};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::ingest::Shard;
use quickwit_proto::metastore::ListIndexesMetadataResponse;
Expand Down Expand Up @@ -808,13 +810,18 @@ mod tests {
assert_eq!(request, ListIndexesMetadataRequest::all());

let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0");
let source = SourceConfig::ingest_default();
index_0.add_source(source.clone()).unwrap();
let mut source_config = SourceConfig::ingest_v2_default();
source_config.enabled = true;
index_0.add_source(source_config.clone()).unwrap();

let mut index_1 = IndexMetadata::for_test("test-index-1", "ram:///test-index-1");
index_1.add_source(source).unwrap();
index_1.add_source(source_config.clone()).unwrap();

let mut index_2 = IndexMetadata::for_test("test-index-2", "ram:///test-index-2");
source_config.enabled = false;
index_2.add_source(source_config.clone()).unwrap();

let indexes = vec![index_0, index_1];
let indexes = vec![index_0, index_1, index_2];
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(indexes).unwrap())
});
mock_metastore.expect_list_shards().returning(|request| {
Expand Down Expand Up @@ -855,14 +862,18 @@ mod tests {
.await
.unwrap();

assert_eq!(model.index_table.len(), 2);
assert_eq!(model.index_table.len(), 3);
assert_eq!(
model.index_uid("test-index-0").unwrap().as_str(),
"test-index-0:0"
);
assert_eq!(
model.index_uid("test-index-0").unwrap(),
IndexUid::from("test-index-0:0".to_string())
model.index_uid("test-index-1").unwrap().as_str(),
"test-index-1:0"
);
assert_eq!(
model.index_uid("test-index-1").unwrap(),
IndexUid::from("test-index-1:0".to_string())
model.index_uid("test-index-2").unwrap().as_str(),
"test-index-2:0"
);

assert_eq!(model.shard_table.table_entries.len(), 2);
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl IndexService {
metastore.add_source(add_ingest_api_source_request).await?;
let add_ingest_source_request = AddSourceRequest::try_from_source_config(
index_uid.clone(),
SourceConfig::ingest_default(),
SourceConfig::ingest_v2_default(),
)?;
metastore.add_source(add_ingest_source_request).await?;
let add_ingest_cli_source_request = AddSourceRequest::try_from_source_config(
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-metastore/src/tests/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub async fn test_metastore_open_shards<
let test_index = TestIndex::create_index_with_source(
&mut metastore,
"test-open-shards",
SourceConfig::ingest_default(),
SourceConfig::ingest_v2_default(),
)
.await;

Expand Down Expand Up @@ -158,7 +158,7 @@ pub async fn test_metastore_acquire_shards<
let test_index = TestIndex::create_index_with_source(
&mut metastore,
"test-acquire-shards",
SourceConfig::ingest_default(),
SourceConfig::ingest_v2_default(),
)
.await;

Expand All @@ -178,7 +178,7 @@ pub async fn test_metastore_list_shards<
let test_index = TestIndex::create_index_with_source(
&mut metastore,
"test-open-shards",
SourceConfig::ingest_default(),
SourceConfig::ingest_v2_default(),
)
.await;

Expand All @@ -198,7 +198,7 @@ pub async fn test_metastore_delete_shards<
let test_index = TestIndex::create_index_with_source(
&mut metastore,
"test-open-shards",
SourceConfig::ingest_default(),
SourceConfig::ingest_v2_default(),
)
.await;

Expand Down