Skip to content

Commit

Permalink
Extend Shard API test suite (#4179)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Nov 23, 2023
1 parent 2986019 commit 5afd086
Showing 1 changed file with 109 additions and 16 deletions.
125 changes: 109 additions & 16 deletions quickwit/quickwit-metastore/src/tests/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
use quickwit_common::rand::append_random_suffix;
use quickwit_config::{IndexConfig, SourceConfig};
use quickwit_proto::metastore::{
AddSourceRequest, CreateIndexRequest, MetastoreService, OpenShardsRequest, OpenShardsSubrequest,
AddSourceRequest, CreateIndexRequest, EntityKind, MetastoreError, MetastoreService,
OpenShardsRequest, OpenShardsSubrequest,
};
use quickwit_proto::types::{IndexUid, SourceId};

Expand All @@ -29,18 +30,26 @@ use crate::tests::cleanup_index;
use crate::{AddSourceRequestExt, CreateIndexRequestExt, FileBackedMetastore, MetastoreServiceExt};

// TODO: Remove when `PostgresqlMetastore` implements Shard API.
pub trait DisableShardTestSuite {
fn disable_shard_test_suite() -> bool {
false
pub trait RunTests {
fn run_open_shards_test() -> bool {
true
}

fn run_other_tests() -> bool {
true
}
}

impl DisableShardTestSuite for FileBackedMetastore {}
impl RunTests for FileBackedMetastore {}

#[cfg(feature = "postgres")]
impl DisableShardTestSuite for crate::PostgresqlMetastore {
fn disable_shard_test_suite() -> bool {
true
impl RunTests for crate::PostgresqlMetastore {
fn run_other_tests() -> bool {
false
}

fn run_open_shards_test() -> bool {
false
}
}

Expand Down Expand Up @@ -81,9 +90,9 @@ impl TestIndex {
}

pub async fn test_metastore_open_shards<
MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + DisableShardTestSuite,
MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + RunTests,
>() {
if MetastoreUnderTest::disable_shard_test_suite() {
if !MetastoreUnderTest::run_open_shards_test() {
return;
}
let mut metastore = MetastoreUnderTest::default_for_test().await;
Expand All @@ -95,12 +104,50 @@ pub async fn test_metastore_open_shards<
)
.await;

// Test empty request.
let open_shards_request = OpenShardsRequest {
subrequests: Vec::new(),
};
let open_shards_response = metastore.open_shards(open_shards_request).await.unwrap();
assert!(open_shards_response.subresponses.is_empty());

// Test index not found.
let open_shards_request = OpenShardsRequest {
subrequests: vec![OpenShardsSubrequest {
index_uid: "index-does-not-exist:0".to_string(),
source_id: test_index.source_config.source_id.clone(),
leader_id: "test-ingester-foo".to_string(),
next_shard_id: 1,
..Default::default()
}],
};
let error = metastore
.open_shards(open_shards_request)
.await
.unwrap_err();
assert!(
matches!(error, MetastoreError::NotFound(EntityKind::Index { index_id }) if index_id == "index-does-not-exist")
);

// Test source not found.
let open_shards_request = OpenShardsRequest {
subrequests: vec![OpenShardsSubrequest {
index_uid: test_index.index_uid.clone().into(),
source_id: "source-does-not-exist".to_string(),
leader_id: "test-ingester-foo".to_string(),
next_shard_id: 1,
..Default::default()
}],
};
let error = metastore
.open_shards(open_shards_request)
.await
.unwrap_err();
assert!(
matches!(error, MetastoreError::NotFound(EntityKind::Source { source_id, ..}) if source_id == "source-does-not-exist")
);

// Test open shard #1.
let open_shards_request = OpenShardsRequest {
subrequests: vec![OpenShardsSubrequest {
index_uid: test_index.index_uid.clone().into(),
Expand All @@ -125,6 +172,7 @@ pub async fn test_metastore_open_shards<
assert_eq!(shard.shard_id, 1);
assert_eq!(shard.leader_id, "test-ingester-foo");

// Test open shard #1 is idempotent.
let open_shards_request = OpenShardsRequest {
subrequests: vec![OpenShardsSubrequest {
index_uid: test_index.index_uid.clone().into(),
Expand All @@ -138,19 +186,64 @@ pub async fn test_metastore_open_shards<
assert_eq!(open_shards_response.subresponses.len(), 1);

let subresponse = &open_shards_response.subresponses[0];
assert_eq!(subresponse.next_shard_id, 2);
assert_eq!(subresponse.opened_shards.len(), 1);

let shard = &subresponse.opened_shards[0];
assert_eq!(shard.shard_id, 1);
assert_eq!(shard.leader_id, "test-ingester-foo");

// Test open shard #2.
let open_shards_request = OpenShardsRequest {
subrequests: vec![OpenShardsSubrequest {
index_uid: test_index.index_uid.clone().into(),
source_id: test_index.source_config.source_id.clone(),
leader_id: "test-ingester-qux".to_string(),
next_shard_id: 2,
..Default::default()
}],
};
let open_shards_response = metastore.open_shards(open_shards_request).await.unwrap();
assert_eq!(open_shards_response.subresponses.len(), 1);

let subresponse = &open_shards_response.subresponses[0];
assert_eq!(subresponse.index_uid, test_index.index_uid.as_str());
assert_eq!(subresponse.source_id, test_index.source_config.source_id);
assert_eq!(subresponse.next_shard_id, 3);
assert_eq!(subresponse.opened_shards.len(), 1);

let shard = &subresponse.opened_shards[0];
assert_eq!(shard.index_uid, test_index.index_uid.as_str());
assert_eq!(shard.source_id, test_index.source_config.source_id);
assert_eq!(shard.shard_id, 2);
assert_eq!(shard.leader_id, "test-ingester-qux");

// Test open shard should not be called with a lagging `next_shard_id`.
let open_shards_request = OpenShardsRequest {
subrequests: vec![OpenShardsSubrequest {
index_uid: test_index.index_uid.clone().into(),
source_id: test_index.source_config.source_id.clone(),
leader_id: "test-ingester-foo".to_string(),
next_shard_id: 1,
..Default::default()
}],
};
let error = metastore
.open_shards(open_shards_request)
.await
.unwrap_err();
assert!(matches!(
error,
MetastoreError::InconsistentControlPlaneState
));

cleanup_index(&mut metastore, test_index.index_uid).await;
}

pub async fn test_metastore_acquire_shards<
MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + DisableShardTestSuite,
MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + RunTests,
>() {
if MetastoreUnderTest::disable_shard_test_suite() {
if !MetastoreUnderTest::run_other_tests() {
return;
}
let mut metastore = MetastoreUnderTest::default_for_test().await;
Expand All @@ -168,9 +261,9 @@ pub async fn test_metastore_acquire_shards<
}

pub async fn test_metastore_list_shards<
MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + DisableShardTestSuite,
MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + RunTests,
>() {
if MetastoreUnderTest::disable_shard_test_suite() {
if !MetastoreUnderTest::run_other_tests() {
return;
}
let mut metastore = MetastoreUnderTest::default_for_test().await;
Expand All @@ -188,9 +281,9 @@ pub async fn test_metastore_list_shards<
}

pub async fn test_metastore_delete_shards<
MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + DisableShardTestSuite,
MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + RunTests,
>() {
if MetastoreUnderTest::disable_shard_test_suite() {
if !MetastoreUnderTest::run_other_tests() {
return;
}
let mut metastore = MetastoreUnderTest::default_for_test().await;
Expand Down

0 comments on commit 5afd086

Please sign in to comment.