diff --git a/quickwit/quickwit-metastore/src/tests/shard.rs b/quickwit/quickwit-metastore/src/tests/shard.rs index 5748975e776..0d3d1f1cb8f 100644 --- a/quickwit/quickwit-metastore/src/tests/shard.rs +++ b/quickwit/quickwit-metastore/src/tests/shard.rs @@ -20,7 +20,8 @@ use quickwit_common::rand::append_random_suffix; use quickwit_config::{IndexConfig, SourceConfig}; use quickwit_proto::metastore::{ - AddSourceRequest, CreateIndexRequest, MetastoreService, OpenShardsRequest, OpenShardsSubrequest, + AddSourceRequest, CreateIndexRequest, EntityKind, MetastoreError, MetastoreService, + OpenShardsRequest, OpenShardsSubrequest, }; use quickwit_proto::types::{IndexUid, SourceId}; @@ -29,18 +30,26 @@ use crate::tests::cleanup_index; use crate::{AddSourceRequestExt, CreateIndexRequestExt, FileBackedMetastore, MetastoreServiceExt}; // TODO: Remove when `PostgresqlMetastore` implements Shard API. -pub trait DisableShardTestSuite { - fn disable_shard_test_suite() -> bool { - false +pub trait RunTests { + fn run_open_shards_test() -> bool { + true + } + + fn run_other_tests() -> bool { + true } } -impl DisableShardTestSuite for FileBackedMetastore {} +impl RunTests for FileBackedMetastore {} #[cfg(feature = "postgres")] -impl DisableShardTestSuite for crate::PostgresqlMetastore { - fn disable_shard_test_suite() -> bool { - true +impl RunTests for crate::PostgresqlMetastore { + fn run_other_tests() -> bool { + false + } + + fn run_open_shards_test() -> bool { + false } } @@ -81,9 +90,9 @@ impl TestIndex { } pub async fn test_metastore_open_shards< - MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + DisableShardTestSuite, + MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + RunTests, >() { - if MetastoreUnderTest::disable_shard_test_suite() { + if !MetastoreUnderTest::run_open_shards_test() { return; } let mut metastore = MetastoreUnderTest::default_for_test().await; @@ -95,12 +104,50 @@ pub async fn test_metastore_open_shards< ) .await; + // Test empty request. let open_shards_request = OpenShardsRequest { subrequests: Vec::new(), }; let open_shards_response = metastore.open_shards(open_shards_request).await.unwrap(); assert!(open_shards_response.subresponses.is_empty()); + // Test index not found. + let open_shards_request = OpenShardsRequest { + subrequests: vec![OpenShardsSubrequest { + index_uid: "index-does-not-exist:0".to_string(), + source_id: test_index.source_config.source_id.clone(), + leader_id: "test-ingester-foo".to_string(), + next_shard_id: 1, + ..Default::default() + }], + }; + let error = metastore + .open_shards(open_shards_request) + .await + .unwrap_err(); + assert!( + matches!(error, MetastoreError::NotFound(EntityKind::Index { index_id }) if index_id == "index-does-not-exist") + ); + + // Test source not found. + let open_shards_request = OpenShardsRequest { + subrequests: vec![OpenShardsSubrequest { + index_uid: test_index.index_uid.clone().into(), + source_id: "source-does-not-exist".to_string(), + leader_id: "test-ingester-foo".to_string(), + next_shard_id: 1, + ..Default::default() + }], + }; + let error = metastore + .open_shards(open_shards_request) + .await + .unwrap_err(); + assert!( + matches!(error, MetastoreError::NotFound(EntityKind::Source { source_id, ..}) if source_id == "source-does-not-exist") + ); + + // Test open shard #1. let open_shards_request = OpenShardsRequest { subrequests: vec![OpenShardsSubrequest { index_uid: test_index.index_uid.clone().into(), @@ -125,6 +172,7 @@ pub async fn test_metastore_open_shards< assert_eq!(shard.shard_id, 1); assert_eq!(shard.leader_id, "test-ingester-foo"); + // Test open shard #1 is idempotent. let open_shards_request = OpenShardsRequest { subrequests: vec![OpenShardsSubrequest { index_uid: test_index.index_uid.clone().into(), @@ -138,19 +186,64 @@ pub async fn test_metastore_open_shards< assert_eq!(open_shards_response.subresponses.len(), 1); let subresponse = &open_shards_response.subresponses[0]; + assert_eq!(subresponse.next_shard_id, 2); assert_eq!(subresponse.opened_shards.len(), 1); let shard = &subresponse.opened_shards[0]; assert_eq!(shard.shard_id, 1); assert_eq!(shard.leader_id, "test-ingester-foo"); + // Test open shard #2. + let open_shards_request = OpenShardsRequest { + subrequests: vec![OpenShardsSubrequest { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_config.source_id.clone(), + leader_id: "test-ingester-qux".to_string(), + next_shard_id: 2, + ..Default::default() + }], + }; + let open_shards_response = metastore.open_shards(open_shards_request).await.unwrap(); + assert_eq!(open_shards_response.subresponses.len(), 1); + + let subresponse = &open_shards_response.subresponses[0]; + assert_eq!(subresponse.index_uid, test_index.index_uid.as_str()); + assert_eq!(subresponse.source_id, test_index.source_config.source_id); + assert_eq!(subresponse.next_shard_id, 3); + assert_eq!(subresponse.opened_shards.len(), 1); + + let shard = &subresponse.opened_shards[0]; + assert_eq!(shard.index_uid, test_index.index_uid.as_str()); + assert_eq!(shard.source_id, test_index.source_config.source_id); + assert_eq!(shard.shard_id, 2); + assert_eq!(shard.leader_id, "test-ingester-qux"); + + // Test open shard should not be called with a lagging `next_shard_id`. + let open_shards_request = OpenShardsRequest { + subrequests: vec![OpenShardsSubrequest { + index_uid: test_index.index_uid.clone().into(), + source_id: test_index.source_config.source_id.clone(), + leader_id: "test-ingester-foo".to_string(), + next_shard_id: 1, + ..Default::default() + }], + }; + let error = metastore + .open_shards(open_shards_request) + .await + .unwrap_err(); + assert!(matches!( + error, + MetastoreError::InconsistentControlPlaneState + )); + cleanup_index(&mut metastore, test_index.index_uid).await; } pub async fn test_metastore_acquire_shards< - MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + DisableShardTestSuite, + MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + RunTests, >() { - if MetastoreUnderTest::disable_shard_test_suite() { + if !MetastoreUnderTest::run_other_tests() { return; } let mut metastore = MetastoreUnderTest::default_for_test().await; @@ -168,9 +261,9 @@ pub async fn test_metastore_acquire_shards< } pub async fn test_metastore_list_shards< - MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + DisableShardTestSuite, + MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + RunTests, >() { - if MetastoreUnderTest::disable_shard_test_suite() { + if !MetastoreUnderTest::run_other_tests() { return; } let mut metastore = MetastoreUnderTest::default_for_test().await; @@ -188,9 +281,9 @@ pub async fn test_metastore_list_shards< } pub async fn test_metastore_delete_shards< - MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + DisableShardTestSuite, + MetastoreUnderTest: MetastoreService + MetastoreServiceExt + DefaultForTest + RunTests, >() { - if MetastoreUnderTest::disable_shard_test_suite() { + if !MetastoreUnderTest::run_other_tests() { return; } let mut metastore = MetastoreUnderTest::default_for_test().await;