From 11ac6a7d0c102b548a2dccafa86e91e3cbe1c72b Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 29 Aug 2024 11:20:14 +0200 Subject: [PATCH] Add workbench test for publish wait --- .../src/ingest_v2/workbench.rs | 91 ++++++++++++------- 1 file changed, 60 insertions(+), 31 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs index 56365ecd9b1..388a990086b 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs @@ -679,15 +679,19 @@ mod tests { } #[tokio::test] - async fn test_ingest_workbench_with_publish_tracking() { - let event_broker = EventBroker::default(); + async fn test_workbench_publish_tracking_empty() { let workbench = - IngestWorkbench::new_with_publish_tracking(Vec::new(), 1, event_broker.clone()); + IngestWorkbench::new_with_publish_tracking(Vec::new(), 1, EventBroker::default()); assert!(workbench.is_complete()); assert_eq!( workbench.into_ingest_result().await, IngestResponseV2::default() ); + } + + #[tokio::test] + async fn test_workbench_publish_tracking_happy_path() { + let event_broker = EventBroker::default(); let shard_id_1 = ShardId::from("test-shard-1"); let shard_id_2 = ShardId::from("test-shard-2"); let ingest_subrequests = vec![ @@ -713,20 +717,6 @@ mod tests { }; workbench.record_persist_success(persist_success); - assert_eq!(workbench.num_successes, 1); - assert_eq!(pending_subrequests(&workbench.subworkbenches).count(), 1); - assert_eq!( - pending_subrequests(&workbench.subworkbenches) - .next() - .unwrap() - .subrequest_id, - 1 - ); - - let subworkbench = workbench.subworkbenches.get(&0).unwrap(); - assert_eq!(subworkbench.num_attempts, 1); - assert!(!subworkbench.is_pending()); - let persist_failure = PersistFailure { subrequest_id: 1, shard_id: Some(shard_id_2.clone()), @@ -734,20 +724,6 @@ mod tests { }; workbench.record_persist_failure(&persist_failure); - assert_eq!(workbench.num_successes, 1); - assert_eq!(pending_subrequests(&workbench.subworkbenches).count(), 1); - assert_eq!( - pending_subrequests(&workbench.subworkbenches) - .next() - .unwrap() - .subrequest_id, - 1 - ); - - let subworkbench = workbench.subworkbenches.get(&1).unwrap(); - assert_eq!(subworkbench.num_attempts, 1); - assert!(subworkbench.last_failure_opt.is_some()); - let persist_success = PersistSuccess { subrequest_id: 1, shard_id: Some(shard_id_2.clone()), @@ -778,6 +754,59 @@ mod tests { assert_eq!(ingest_response.failures.len(), 0); } + #[tokio::test] + async fn test_workbench_publish_tracking_waits() { + let event_broker = EventBroker::default(); + let shard_id_1 = ShardId::from("test-shard-1"); + let shard_id_2 = ShardId::from("test-shard-2"); + let ingest_subrequests = vec![ + IngestSubrequest { + subrequest_id: 0, + ..Default::default() + }, + IngestSubrequest { + subrequest_id: 1, + ..Default::default() + }, + ]; + let mut workbench = + IngestWorkbench::new_with_publish_tracking(ingest_subrequests, 1, event_broker.clone()); + + let persist_success = PersistSuccess { + subrequest_id: 0, + shard_id: Some(shard_id_1.clone()), + replication_position_inclusive: Some(Position::offset(42usize)), + ..Default::default() + }; + workbench.record_persist_success(persist_success); + + let persist_success = PersistSuccess { + subrequest_id: 1, + shard_id: Some(shard_id_2.clone()), + replication_position_inclusive: Some(Position::offset(66usize)), + ..Default::default() + }; + workbench.record_persist_success(persist_success); + + assert!(workbench.is_complete()); + assert_eq!(workbench.num_successes, 2); + assert_eq!(pending_subrequests(&workbench.subworkbenches).count(), 0); + + event_broker.publish(ShardPositionsUpdate { + source_uid: SourceUid { + index_uid: IndexUid::for_test("test-index", 0), + source_id: "test-source".to_string(), + }, + updated_shard_positions: vec![(shard_id_2, Position::offset(66usize))] + .into_iter() + .collect(), + }); + // still waits for shard 1 to be published + tokio::time::timeout(Duration::from_millis(200), workbench.into_ingest_result()) + .await + .unwrap_err(); + } + #[test] fn test_ingest_workbench_record_get_or_create_open_shards_failure() { let ingest_subrequests = vec![IngestSubrequest {