From 176b7b6aa833f66d95761cbd1c4c12c248588980 Mon Sep 17 00:00:00 2001 From: iliax Date: Wed, 22 May 2024 10:03:19 +0400 Subject: [PATCH] ISSUE-299: Fixing first offsets retrieval for compacted topic --- .../io/kafbat/ui/emitter/OffsetsInfo.java | 26 ++++++++++++++++++- .../ui/emitter/RangePollingEmitter.java | 10 ++++--- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java b/api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java index 9fc6c27db..a361834d0 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java +++ b/api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java @@ -6,11 +6,13 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnsupportedVersionException; @Slf4j @Getter @@ -34,7 +36,7 @@ class OffsetsInfo { OffsetsInfo(Consumer consumer, Collection targetPartitions) { this.consumer = consumer; - this.beginOffsets = consumer.beginningOffsets(targetPartitions); + this.beginOffsets = firstOffsetsForPolling(consumer, targetPartitions); this.endOffsets = consumer.endOffsets(targetPartitions); endOffsets.forEach((tp, endOffset) -> { var beginningOffset = beginOffsets.get(tp); @@ -46,6 +48,28 @@ class OffsetsInfo { }); } + + private Map firstOffsetsForPolling(Consumer consumer, + Collection partitions) { + try { + // we try to use offsetsForTimes() to find earliest offsets, since for + // some topics (like compacted) beginningOffsets() ruturning 0 offsets + // even when effectively first offset can be very high + var offsets = consumer.offsetsForTimes( + partitions.stream().collect(Collectors.toMap(p -> p, p -> 0L)) + ); + // result of offsetsForTimes() can be null, if message version < 0.10.0 + if (offsets.entrySet().stream().noneMatch(e -> e.getValue() == null)) { + return offsets.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); + } + } catch (UnsupportedOperationException | UnsupportedVersionException e) { + // offsetsForTimes() not supported + } + //falling back to beginningOffsets() if offsetsForTimes() not supported + return consumer.beginningOffsets(partitions); + } + boolean assignedPartitionsFullyPolled() { for (var tp : consumer.assignment()) { Preconditions.checkArgument(endOffsets.containsKey(tp)); diff --git a/api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java b/api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java index a5712492e..794c70e57 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java +++ b/api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java @@ -3,7 +3,9 @@ import io.kafbat.ui.model.ConsumerPosition; import io.kafbat.ui.model.TopicMessageEventDTO; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.TreeMap; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; @@ -84,7 +86,8 @@ private List> poll(EnhancedConsumer consumer, range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from)); List> result = new ArrayList<>(); - while (!sink.isCancelled() && consumer.paused().size() < range.size()) { + Set paused = new HashSet<>(); + while (!sink.isCancelled() && paused.size() < range.size()) { var polledRecords = poll(sink, consumer); range.forEach((tp, fromTo) -> { polledRecords.records(tp).stream() @@ -92,12 +95,13 @@ private List> poll(EnhancedConsumer consumer, .forEach(result::add); //next position is out of target range -> pausing partition - if (consumer.position(tp) >= fromTo.to) { + if (!paused.contains(tp) && consumer.position(tp) >= fromTo.to) { + paused.add(tp); consumer.pause(List.of(tp)); } }); } - consumer.resume(consumer.paused()); + consumer.resume(paused); return result; } }