Skip to content

Commit

Permalink
Call truncate on leaders AND followers (#3908)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Oct 8, 2023
1 parent e05b708 commit 9676b12
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 200 deletions.
80 changes: 57 additions & 23 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl ClientId {
#[derive(Debug, Eq, PartialEq)]
struct AssignedShard {
leader_id: NodeId,
follower_id_opt: Option<NodeId>,
partition_id: PartitionId,
current_position_inclusive: Position,
}
Expand Down Expand Up @@ -180,18 +181,14 @@ impl IngestSource {
}

async fn truncate(&self, truncation_point: &[(ShardId, Position)]) {
let mut per_leader_truncate_subrequests: HashMap<&NodeId, Vec<TruncateSubrequest>> =
let mut per_ingester_truncate_subrequests: HashMap<&NodeId, Vec<TruncateSubrequest>> =
HashMap::new();

for (shard_id, truncate_position) in truncation_point {
if matches!(truncate_position, Position::Beginning) {
continue;
}
let Some(leader_id) = self
.assigned_shards
.get(shard_id)
.map(|shard| &shard.leader_id)
else {
let Some(shard) = self.assigned_shards.get(shard_id) else {
warn!(
"failed to truncate shard: shard `{}` is no longer assigned",
shard_id
Expand All @@ -208,21 +205,27 @@ impl IngestSource {
shard_id: *shard_id,
to_position_inclusive,
};
per_leader_truncate_subrequests
.entry(leader_id)
if let Some(follower_id) = &shard.follower_id_opt {
per_ingester_truncate_subrequests
.entry(follower_id)
.or_default()
.push(truncate_subrequest.clone());
}
per_ingester_truncate_subrequests
.entry(&shard.leader_id)
.or_default()
.push(truncate_subrequest);
}
for (leader_id, truncate_subrequests) in per_leader_truncate_subrequests {
let Some(mut ingester) = self.ingester_pool.get(leader_id) else {
for (ingester_id, truncate_subrequests) in per_ingester_truncate_subrequests {
let Some(mut ingester) = self.ingester_pool.get(ingester_id) else {
warn!(
"failed to truncate shard: ingester `{}` is unavailable",
leader_id
ingester_id
);
continue;
};
let truncate_request = TruncateRequest {
leader_id: leader_id.clone().into(),
ingester_id: ingester_id.clone().into(),
subrequests: truncate_subrequests,
};
let truncate_future = async move {
Expand Down Expand Up @@ -349,9 +352,7 @@ impl Source for IngestSource {

for acquired_shard in acquire_shards_subresponse.acquired_shards {
let leader_id: NodeId = acquired_shard.leader_id.into();
let follower_id: Option<NodeId> = acquired_shard
.follower_id
.map(|follower_id| follower_id.into());
let follower_id_opt: Option<NodeId> = acquired_shard.follower_id.map(Into::into);
let index_uid: IndexUid = acquired_shard.index_uid.into();
let source_id: SourceId = acquired_shard.source_id;
let shard_id = acquired_shard.shard_id;
Expand All @@ -364,7 +365,7 @@ impl Source for IngestSource {
if let Err(error) = ctx
.protect_future(self.fetch_stream.subscribe(
leader_id.clone(),
follower_id.clone(),
follower_id_opt.clone(),
index_uid,
source_id,
shard_id,
Expand All @@ -380,6 +381,7 @@ impl Source for IngestSource {

let assigned_shard = AssignedShard {
leader_id,
follower_id_opt,
partition_id,
current_position_inclusive,
};
Expand Down Expand Up @@ -511,7 +513,7 @@ mod tests {
.expect_truncate()
.once()
.returning(|request| {
assert_eq!(request.leader_id, "test-ingester-0");
assert_eq!(request.ingester_id, "test-ingester-0");
assert_eq!(request.subrequests.len(), 1);

let subrequest = &request.subrequests[0];
Expand Down Expand Up @@ -583,6 +585,7 @@ mod tests {
let assigned_shard = source.assigned_shards.get(&1).unwrap();
let expected_assigned_shard = AssignedShard {
leader_id: "test-ingester-0".into(),
follower_id_opt: None,
partition_id: 1u64.into(),
current_position_inclusive: Position::from(11u64),
};
Expand Down Expand Up @@ -631,6 +634,7 @@ mod tests {
1,
AssignedShard {
leader_id: "test-ingester-0".into(),
follower_id_opt: None,
partition_id: 1u64.into(),
current_position_inclusive: Position::from(11u64),
},
Expand All @@ -639,6 +643,7 @@ mod tests {
2,
AssignedShard {
leader_id: "test-ingester-1".into(),
follower_id_opt: None,
partition_id: 2u64.into(),
current_position_inclusive: Position::from(22u64),
},
Expand Down Expand Up @@ -721,8 +726,8 @@ mod tests {
.expect_truncate()
.once()
.returning(|request| {
assert_eq!(request.leader_id, "test-ingester-0");
assert_eq!(request.subrequests.len(), 2);
assert_eq!(request.ingester_id, "test-ingester-0");
assert_eq!(request.subrequests.len(), 3);

let subrequest_0 = &request.subrequests[0];
assert_eq!(subrequest_0.shard_id, 1);
Expand All @@ -732,6 +737,10 @@ mod tests {
assert_eq!(subrequest_1.shard_id, 2);
assert_eq!(subrequest_1.to_position_inclusive, 22);

let subrequest_2 = &request.subrequests[2];
assert_eq!(subrequest_2.shard_id, 3);
assert_eq!(subrequest_2.to_position_inclusive, 33);

Ok(TruncateResponse {})
});
let ingester_0: IngesterServiceClient = ingester_mock_0.into();
Expand All @@ -742,18 +751,39 @@ mod tests {
.expect_truncate()
.once()
.returning(|request| {
assert_eq!(request.leader_id, "test-ingester-1");
assert_eq!(request.subrequests.len(), 1);
assert_eq!(request.ingester_id, "test-ingester-1");
assert_eq!(request.subrequests.len(), 2);

let subrequest_0 = &request.subrequests[0];
assert_eq!(subrequest_0.shard_id, 3);
assert_eq!(subrequest_0.to_position_inclusive, 33);
assert_eq!(subrequest_0.shard_id, 1);
assert_eq!(subrequest_0.to_position_inclusive, 11);

let subrequest_1 = &request.subrequests[1];
assert_eq!(subrequest_1.shard_id, 2);
assert_eq!(subrequest_1.to_position_inclusive, 22);

Ok(TruncateResponse {})
});
let ingester_1: IngesterServiceClient = ingester_mock_1.into();
ingester_pool.insert("test-ingester-1".into(), ingester_1.clone());

let mut ingester_mock_3 = IngesterServiceClient::mock();
ingester_mock_3
.expect_truncate()
.once()
.returning(|request| {
assert_eq!(request.ingester_id, "test-ingester-3");
assert_eq!(request.subrequests.len(), 1);

let subrequest_0 = &request.subrequests[0];
assert_eq!(subrequest_0.shard_id, 4);
assert_eq!(subrequest_0.to_position_inclusive, 44);

Ok(TruncateResponse {})
});
let ingester_3: IngesterServiceClient = ingester_mock_3.into();
ingester_pool.insert("test-ingester-3".into(), ingester_3.clone());

let runtime_args = Arc::new(SourceRuntimeArgs {
pipeline_id,
source_config,
Expand All @@ -778,6 +808,7 @@ mod tests {
1,
AssignedShard {
leader_id: "test-ingester-0".into(),
follower_id_opt: None,
partition_id: 1u64.into(),
current_position_inclusive: Position::from(11u64),
},
Expand All @@ -786,6 +817,7 @@ mod tests {
2,
AssignedShard {
leader_id: "test-ingester-0".into(),
follower_id_opt: Some("test-ingester-1".into()),
partition_id: 2u64.into(),
current_position_inclusive: Position::from(22u64),
},
Expand All @@ -794,6 +826,7 @@ mod tests {
3,
AssignedShard {
leader_id: "test-ingester-1".into(),
follower_id_opt: Some("test-ingester-0".into()),
partition_id: 3u64.into(),
current_position_inclusive: Position::from(33u64),
},
Expand All @@ -802,6 +835,7 @@ mod tests {
4,
AssignedShard {
leader_id: "test-ingester-2".into(),
follower_id_opt: Some("test-ingester-3".into()),
partition_id: 4u64.into(),
current_position_inclusive: Position::from(44u64),
},
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ mod tests {
mrecordlog.clone(),
state.clone(),
);
tokio::time::sleep(REMOVAL_GRACE_PERIOD * 2).await;
// Wait for the removal task to run.
tokio::time::sleep(Duration::from_millis(100)).await;

let state_guard = state.read().await;
assert!(state_guard.primary_shards.is_empty());
Expand Down
Loading

0 comments on commit 9676b12

Please sign in to comment.