Skip to content

Commit

Permalink
Add independent tests on internal state
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 2, 2024
1 parent 449c6d5 commit e3afe0b
Showing 1 changed file with 49 additions and 0 deletions.
49 changes: 49 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,55 @@ mod tests {

use super::*;

#[tokio::test]
async fn test_shard_publish_states() {
let mut shard_publish_states = ShardPublishStates::default();
let notifier = Arc::new(Notify::new());

let shard_id_1 = ShardId::from("test-shard-1");
let shard_id_2 = ShardId::from("test-shard-2");
let shard_id_3 = ShardId::from("test-shard-3");
let shard_id_4 = ShardId::from("test-shard-4"); // not tracked

shard_publish_states.shard_tracked(shard_id_1.clone());
shard_publish_states.shard_tracked(shard_id_2.clone());
shard_publish_states.shard_tracked(shard_id_3.clone());

let notifier_receiver = notifier.clone();
let notified_subscription = notifier_receiver.notified();

shard_publish_states.position_persisted(&shard_id_1, &Position::offset(10usize));
assert_eq!(shard_publish_states.awaiting_count, 1);
shard_publish_states.position_persisted(&shard_id_2, &Position::offset(20usize));
assert_eq!(shard_publish_states.awaiting_count, 2);
shard_publish_states.position_published(&shard_id_1, &Position::offset(15usize), &notifier);
assert_eq!(shard_publish_states.awaiting_count, 1);
shard_publish_states.position_published(&shard_id_2, &Position::offset(20usize), &notifier);
assert_eq!(shard_publish_states.awaiting_count, 0);

// check that only the notification that was subscribed before holds a permit
tokio::time::timeout(Duration::from_millis(100), notifier.notified())
.await
.unwrap_err();
tokio::time::timeout(Duration::from_millis(100), notified_subscription)
.await
.unwrap();

let notified_subscription = notifier_receiver.notified();
shard_publish_states.position_published(&shard_id_3, &Position::offset(10usize), &notifier);
assert_eq!(shard_publish_states.awaiting_count, 0);
shard_publish_states.position_persisted(&shard_id_3, &Position::offset(10usize));
assert_eq!(shard_publish_states.awaiting_count, 0);
// no notification expected here as the shard never becomes AwaitingPublish
tokio::time::timeout(Duration::from_millis(100), notified_subscription)
.await
.unwrap_err();
// shard 4 is not tracked
shard_publish_states.position_published(&shard_id_4, &Position::offset(10usize), &notifier);
assert_eq!(shard_publish_states.awaiting_count, 0);
assert!(!shard_publish_states.states.contains_key(&shard_id_4));
}

#[tokio::test]
async fn test_publish_tracker() {
let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0);
Expand Down

0 comments on commit e3afe0b

Please sign in to comment.