diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 209dee14da1..069607955c6 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -398,10 +399,10 @@ public long getSplitBacklogBytes() { /** watermark before any records have been read. */ private static Instant initialWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; - // Created in each next batch, and updated at the end. public KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics(); private Stopwatch stopwatch = Stopwatch.createUnstarted(); - private String kafkaTopic = ""; + + private Set kafkaTopics; @Override public String toString() { @@ -510,12 +511,9 @@ Instant updateAndGetWatermark() { List partitions = Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions()); - // Each source has a single unique topic. - for (TopicPartition topicPartition : partitions) { - this.kafkaTopic = topicPartition.topic(); - break; - } + this.kafkaTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet()); + LOG.info("{} is reading from topics {}", this.name, kafkaTopics); List> states = new ArrayList<>(partitions.size()); if (checkpointMark != null) { @@ -573,16 +571,14 @@ private void consumerPollLoop() { while (!closed.get()) { try { if (records.isEmpty()) { - // Each source has a single unique topic. - List topicPartitions = source.getSpec().getTopicPartitions(); - Preconditions.checkStateNotNull(topicPartitions); - stopwatch.start(); records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); stopwatch.stop(); - kafkaResults.updateSuccessfulRpcMetrics( - kafkaTopic, java.time.Duration.ofMillis(stopwatch.elapsed(TimeUnit.MILLISECONDS))); - + for (String kafkaTopic : kafkaTopics) { + kafkaResults.updateSuccessfulRpcMetrics( + kafkaTopic, + java.time.Duration.ofMillis(stopwatch.elapsed(TimeUnit.MILLISECONDS))); + } } else if (availableRecordsQueue.offer( records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) { records = ConsumerRecords.empty();