Skip to content

Commit

Permalink
Avoid calling truncate on ingest v2 for shard where the position is B…
Browse files Browse the repository at this point in the history
…eginning (#4676)
  • Loading branch information
fulmicoton authored Mar 6, 2024
1 parent 735685b commit 6cd6379
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,14 +608,17 @@ impl Source for IngestSource {
checkpoint: SourceCheckpoint,
_ctx: &SourceContext,
) -> anyhow::Result<()> {
let mut truncate_up_to_positions: Vec<(ShardId, Position)> =
Vec::with_capacity(checkpoint.num_partitions());

for (partition_id, position) in checkpoint.iter() {
let shard_id = ShardId::from(partition_id.as_str());
truncate_up_to_positions.push((shard_id, position));
let truncate_up_to_positions: Vec<(ShardId, Position)> = checkpoint
.iter()
.filter(|(_, position)| !matches!(position, Position::Beginning))
.map(|(partition_id, position)| {
let shard_id = ShardId::from(partition_id.as_str());
(shard_id, position)
})
.collect();
if !truncate_up_to_positions.is_empty() {
self.truncate(truncate_up_to_positions).await;
}
self.truncate(truncate_up_to_positions).await;
Ok(())
}

Expand Down Expand Up @@ -1789,7 +1792,6 @@ mod tests {
(ShardId::from(2u64), Position::offset(22u64)),
(ShardId::from(3u64), Position::eof(33u64)),
(ShardId::from(4u64), Position::offset(44u64)),
(ShardId::from(5u64), Position::Beginning),
(ShardId::from(6u64), Position::offset(66u64)),
],
);
Expand Down

0 comments on commit 6cd6379

Please sign in to comment.