Skip to content

Commit

Permalink
Add workbench test for publish wait
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Aug 29, 2024
1 parent 9141168 commit 11ac6a7
Showing 1 changed file with 60 additions and 31 deletions.
91 changes: 60 additions & 31 deletions quickwit/quickwit-ingest/src/ingest_v2/workbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,15 +679,19 @@ mod tests {
}

#[tokio::test]
async fn test_ingest_workbench_with_publish_tracking() {
let event_broker = EventBroker::default();
async fn test_workbench_publish_tracking_empty() {
let workbench =
IngestWorkbench::new_with_publish_tracking(Vec::new(), 1, event_broker.clone());
IngestWorkbench::new_with_publish_tracking(Vec::new(), 1, EventBroker::default());
assert!(workbench.is_complete());
assert_eq!(
workbench.into_ingest_result().await,
IngestResponseV2::default()
);
}

#[tokio::test]
async fn test_workbench_publish_tracking_happy_path() {
let event_broker = EventBroker::default();
let shard_id_1 = ShardId::from("test-shard-1");
let shard_id_2 = ShardId::from("test-shard-2");
let ingest_subrequests = vec![
Expand All @@ -713,41 +717,13 @@ mod tests {
};
workbench.record_persist_success(persist_success);

assert_eq!(workbench.num_successes, 1);
assert_eq!(pending_subrequests(&workbench.subworkbenches).count(), 1);
assert_eq!(
pending_subrequests(&workbench.subworkbenches)
.next()
.unwrap()
.subrequest_id,
1
);

let subworkbench = workbench.subworkbenches.get(&0).unwrap();
assert_eq!(subworkbench.num_attempts, 1);
assert!(!subworkbench.is_pending());

let persist_failure = PersistFailure {
subrequest_id: 1,
shard_id: Some(shard_id_2.clone()),
..Default::default()
};
workbench.record_persist_failure(&persist_failure);

assert_eq!(workbench.num_successes, 1);
assert_eq!(pending_subrequests(&workbench.subworkbenches).count(), 1);
assert_eq!(
pending_subrequests(&workbench.subworkbenches)
.next()
.unwrap()
.subrequest_id,
1
);

let subworkbench = workbench.subworkbenches.get(&1).unwrap();
assert_eq!(subworkbench.num_attempts, 1);
assert!(subworkbench.last_failure_opt.is_some());

let persist_success = PersistSuccess {
subrequest_id: 1,
shard_id: Some(shard_id_2.clone()),
Expand Down Expand Up @@ -778,6 +754,59 @@ mod tests {
assert_eq!(ingest_response.failures.len(), 0);
}

#[tokio::test]
async fn test_workbench_publish_tracking_waits() {
let event_broker = EventBroker::default();
let shard_id_1 = ShardId::from("test-shard-1");
let shard_id_2 = ShardId::from("test-shard-2");
let ingest_subrequests = vec![
IngestSubrequest {
subrequest_id: 0,
..Default::default()
},
IngestSubrequest {
subrequest_id: 1,
..Default::default()
},
];
let mut workbench =
IngestWorkbench::new_with_publish_tracking(ingest_subrequests, 1, event_broker.clone());

let persist_success = PersistSuccess {
subrequest_id: 0,
shard_id: Some(shard_id_1.clone()),
replication_position_inclusive: Some(Position::offset(42usize)),
..Default::default()
};
workbench.record_persist_success(persist_success);

let persist_success = PersistSuccess {
subrequest_id: 1,
shard_id: Some(shard_id_2.clone()),
replication_position_inclusive: Some(Position::offset(66usize)),
..Default::default()
};
workbench.record_persist_success(persist_success);

assert!(workbench.is_complete());
assert_eq!(workbench.num_successes, 2);
assert_eq!(pending_subrequests(&workbench.subworkbenches).count(), 0);

event_broker.publish(ShardPositionsUpdate {
source_uid: SourceUid {
index_uid: IndexUid::for_test("test-index", 0),
source_id: "test-source".to_string(),
},
updated_shard_positions: vec![(shard_id_2, Position::offset(66usize))]
.into_iter()
.collect(),
});
// still waits for shard 1 to be published
tokio::time::timeout(Duration::from_millis(200), workbench.into_ingest_result())
.await
.unwrap_err();
}

#[test]
fn test_ingest_workbench_record_get_or_create_open_shards_failure() {
let ingest_subrequests = vec![IngestSubrequest {
Expand Down

0 comments on commit 11ac6a7

Please sign in to comment.