Skip to content

Commit

Permalink
Add logging to see which topic each split is reading from (#33031)
Browse files Browse the repository at this point in the history
Co-authored-by: Naireen <[email protected]>
  • Loading branch information
Naireen and Naireen authored Nov 13, 2024
1 parent 6a45624 commit f4d07c4
Showing 1 changed file with 10 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -398,10 +399,10 @@ public long getSplitBacklogBytes() {
/** watermark before any records have been read. */
private static Instant initialWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

// Created in each next batch, and updated at the end.
public KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics();
private Stopwatch stopwatch = Stopwatch.createUnstarted();
private String kafkaTopic = "";

private Set<String> kafkaTopics;

@Override
public String toString() {
Expand Down Expand Up @@ -510,12 +511,9 @@ Instant updateAndGetWatermark() {
List<TopicPartition> partitions =
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());

// Each source has a single unique topic.
for (TopicPartition topicPartition : partitions) {
this.kafkaTopic = topicPartition.topic();
break;
}
this.kafkaTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());

LOG.info("{} is reading from topics {}", this.name, kafkaTopics);
List<PartitionState<K, V>> states = new ArrayList<>(partitions.size());

if (checkpointMark != null) {
Expand Down Expand Up @@ -573,16 +571,14 @@ private void consumerPollLoop() {
while (!closed.get()) {
try {
if (records.isEmpty()) {
// Each source has a single unique topic.
List<TopicPartition> topicPartitions = source.getSpec().getTopicPartitions();
Preconditions.checkStateNotNull(topicPartitions);

stopwatch.start();
records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
stopwatch.stop();
kafkaResults.updateSuccessfulRpcMetrics(
kafkaTopic, java.time.Duration.ofMillis(stopwatch.elapsed(TimeUnit.MILLISECONDS)));

for (String kafkaTopic : kafkaTopics) {
kafkaResults.updateSuccessfulRpcMetrics(
kafkaTopic,
java.time.Duration.ofMillis(stopwatch.elapsed(TimeUnit.MILLISECONDS)));
}
} else if (availableRecordsQueue.offer(
records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) {
records = ConsumerRecords.empty();
Expand Down

0 comments on commit f4d07c4

Please sign in to comment.