diff --git a/quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs b/quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs index bf826c4e107..64f9c907722 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs @@ -186,6 +186,55 @@ mod tests { use super::*; + #[tokio::test] + async fn test_shard_publish_states() { + let mut shard_publish_states = ShardPublishStates::default(); + let notifier = Arc::new(Notify::new()); + + let shard_id_1 = ShardId::from("test-shard-1"); + let shard_id_2 = ShardId::from("test-shard-2"); + let shard_id_3 = ShardId::from("test-shard-3"); + let shard_id_4 = ShardId::from("test-shard-4"); // not tracked + + shard_publish_states.shard_tracked(shard_id_1.clone()); + shard_publish_states.shard_tracked(shard_id_2.clone()); + shard_publish_states.shard_tracked(shard_id_3.clone()); + + let notifier_receiver = notifier.clone(); + let notified_subscription = notifier_receiver.notified(); + + shard_publish_states.position_persisted(&shard_id_1, &Position::offset(10usize)); + assert_eq!(shard_publish_states.awaiting_count, 1); + shard_publish_states.position_persisted(&shard_id_2, &Position::offset(20usize)); + assert_eq!(shard_publish_states.awaiting_count, 2); + shard_publish_states.position_published(&shard_id_1, &Position::offset(15usize), ¬ifier); + assert_eq!(shard_publish_states.awaiting_count, 1); + shard_publish_states.position_published(&shard_id_2, &Position::offset(20usize), ¬ifier); + assert_eq!(shard_publish_states.awaiting_count, 0); + + // check that only the notification that was subscribed before holds a permit + tokio::time::timeout(Duration::from_millis(100), notifier.notified()) + .await + .unwrap_err(); + tokio::time::timeout(Duration::from_millis(100), notified_subscription) + .await + .unwrap(); + + let notified_subscription = notifier_receiver.notified(); + shard_publish_states.position_published(&shard_id_3, &Position::offset(10usize), ¬ifier); + assert_eq!(shard_publish_states.awaiting_count, 0); + shard_publish_states.position_persisted(&shard_id_3, &Position::offset(10usize)); + assert_eq!(shard_publish_states.awaiting_count, 0); + // no notification expected here as the shard never becomes AwaitingPublish + tokio::time::timeout(Duration::from_millis(100), notified_subscription) + .await + .unwrap_err(); + // shard 4 is not tracked + shard_publish_states.position_published(&shard_id_4, &Position::offset(10usize), ¬ifier); + assert_eq!(shard_publish_states.awaiting_count, 0); + assert!(!shard_publish_states.states.contains_key(&shard_id_4)); + } + #[tokio::test] async fn test_publish_tracker() { let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0);