diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 391aeccb642..89c021d7f58 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -53,7 +53,7 @@ import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault; import org.apache.storm.kafka.spout.internal.OffsetManager; import org.apache.storm.kafka.spout.internal.Timer; -import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric; +import org.apache.storm.kafka.spout.metrics2.KafkaOffsetMetricManager; import org.apache.storm.kafka.spout.subscription.TopicAssigner; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; @@ -102,7 +102,7 @@ public class KafkaSpout extends BaseRichSpout { private transient Timer refreshAssignmentTimer; private transient TopologyContext context; private transient CommitMetadataManager commitMetadataManager; - private transient KafkaOffsetMetric kafkaOffsetMetric; + private transient KafkaOffsetMetricManager kafkaOffsetMetricManager; private transient KafkaSpoutConsumerRebalanceListener rebalanceListener; public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) { @@ -147,19 +147,12 @@ public void open(Map conf, TopologyContext context, SpoutOutputC consumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps()); tupleListener.open(conf, context); - if (canRegisterMetrics()) { - registerMetric(); - } + this.kafkaOffsetMetricManager + = new KafkaOffsetMetricManager<>(() -> Collections.unmodifiableMap(offsetManagers), () -> consumer, context); LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); } - private void registerMetric() { - LOG.info("Registering Spout Metrics"); - kafkaOffsetMetric = new KafkaOffsetMetric<>(() -> Collections.unmodifiableMap(offsetManagers), () -> consumer); - context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs()); - } - private boolean canRegisterMetrics() { try { KafkaConsumer.class.getDeclaredMethod("beginningOffsets", Collection.class); @@ -632,7 +625,11 @@ private void refreshAssignment() { Collections.sort(allPartitionsSorted, TopicPartitionComparator.INSTANCE); Set assignedPartitions = kafkaSpoutConfig.getTopicPartitioner() .getPartitionsForThisTask(allPartitionsSorted, context); - topicAssigner.assignPartitions(consumer, assignedPartitions, rebalanceListener); + boolean partitionChanged = topicAssigner.assignPartitions(consumer, assignedPartitions, rebalanceListener); + if (partitionChanged && canRegisterMetrics()) { + LOG.info("Partitions assignments has changed, updating metrics."); + kafkaOffsetMetricManager.registerMetricsForNewTopicPartitions(assignedPartitions); + } } @Override @@ -741,7 +738,7 @@ public boolean shouldPoll() { } @VisibleForTesting - KafkaOffsetMetric getKafkaOffsetMetric() { - return kafkaOffsetMetric; + KafkaOffsetMetricManager getKafkaOffsetMetricManager() { + return kafkaOffsetMetricManager; } } diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java deleted file mode 100644 index 04951f0a5f8..00000000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout.metrics; - -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.function.Supplier; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RetriableException; -import org.apache.storm.kafka.spout.internal.OffsetManager; -import org.apache.storm.metric.api.IMetric; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class is used compute the partition and topic level offset metrics. - *

- * Partition level metrics are: - * topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition - * topicName/partition_{number}/latestTimeOffset //gives end offset of the partition - * topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout - * topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout - * topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset - * topicName/partition_{number}/recordsInPartition // total number of records in the partition - *

- *

- * Topic level metrics are: - * topicName/totalEarliestTimeOffset //gives the total beginning offset of all the associated partitions of this spout - * topicName/totalLatestTimeOffset //gives the total end offset of all the associated partitions of this spout - * topicName/totalLatestEmittedOffset //gives the total latest emitted offset of all the associated partitions of this spout - * topicName/totalLatestCompletedOffset //gives the total latest committed offset of all the associated partitions of this spout - * topicName/spoutLag // total spout lag of all the associated partitions of this spout - * topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout - *

- */ -public class KafkaOffsetMetric implements IMetric { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class); - private final Supplier> offsetManagerSupplier; - private final Supplier> consumerSupplier; - - public KafkaOffsetMetric(Supplier> offsetManagerSupplier, - Supplier> consumerSupplier) { - this.offsetManagerSupplier = offsetManagerSupplier; - this.consumerSupplier = consumerSupplier; - } - - @Override - public Object getValueAndReset() { - - Map offsetManagers = offsetManagerSupplier.get(); - Consumer consumer = consumerSupplier.get(); - - if (offsetManagers == null || offsetManagers.isEmpty() || consumer == null) { - LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null."); - return null; - } - - Map topicMetricsMap = new HashMap<>(); - Set topicPartitions = offsetManagers.keySet(); - - Map beginningOffsets; - Map endOffsets; - - try { - beginningOffsets = consumer.beginningOffsets(topicPartitions); - endOffsets = consumer.endOffsets(topicPartitions); - } catch (RetriableException e) { - LOG.warn("Failed to get offsets from Kafka! Will retry on next metrics tick.", e); - return null; - } - - //map to hold partition level and topic level metrics - Map result = new HashMap<>(); - - for (Map.Entry entry : offsetManagers.entrySet()) { - TopicPartition topicPartition = entry.getKey(); - OffsetManager offsetManager = entry.getValue(); - - long latestTimeOffset = endOffsets.get(topicPartition); - long earliestTimeOffset = beginningOffsets.get(topicPartition); - - long latestEmittedOffset = offsetManager.getLatestEmittedOffset(); - long latestCompletedOffset = offsetManager.getCommittedOffset(); - long spoutLag = latestTimeOffset - latestCompletedOffset; - long recordsInPartition = latestTimeOffset - earliestTimeOffset; - - String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition(); - result.put(metricPath + "/" + "spoutLag", spoutLag); - result.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset); - result.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset); - result.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset); - result.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset); - result.put(metricPath + "/" + "recordsInPartition", recordsInPartition); - - TopicMetrics topicMetrics = topicMetricsMap.get(topicPartition.topic()); - if (topicMetrics == null) { - topicMetrics = new TopicMetrics(); - topicMetricsMap.put(topicPartition.topic(), topicMetrics); - } - - topicMetrics.totalSpoutLag += spoutLag; - topicMetrics.totalEarliestTimeOffset += earliestTimeOffset; - topicMetrics.totalLatestTimeOffset += latestTimeOffset; - topicMetrics.totalLatestEmittedOffset += latestEmittedOffset; - topicMetrics.totalLatestCompletedOffset += latestCompletedOffset; - topicMetrics.totalRecordsInPartitions += recordsInPartition; - } - - for (Map.Entry e : topicMetricsMap.entrySet()) { - String topic = e.getKey(); - TopicMetrics topicMetrics = e.getValue(); - result.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag); - result.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset); - result.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset); - result.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset); - result.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset); - result.put(topic + "/" + "totalRecordsInPartitions", topicMetrics.totalRecordsInPartitions); - } - - LOG.debug("Metrics Tick: value : {}", result); - return result; - } - - private class TopicMetrics { - long totalSpoutLag = 0; - long totalEarliestTimeOffset = 0; - long totalLatestTimeOffset = 0; - long totalLatestEmittedOffset = 0; - long totalLatestCompletedOffset = 0; - long totalRecordsInPartitions = 0; - } -} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java new file mode 100644 index 00000000000..f552cad358f --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.metrics2; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.OffsetManager; +import org.apache.storm.task.TopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to manage both the partition and topic level offset metrics. + */ +public class KafkaOffsetMetricManager { + private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetricManager.class); + private final Supplier> offsetManagerSupplier; + private final Supplier> consumerSupplier; + private TopologyContext topologyContext; + + private Map topicMetricsMap; + private Map topicPartitionMetricsMap; + + public KafkaOffsetMetricManager(Supplier> offsetManagerSupplier, + Supplier> consumerSupplier, + TopologyContext topologyContext) { + this.offsetManagerSupplier = offsetManagerSupplier; + this.consumerSupplier = consumerSupplier; + this.topologyContext = topologyContext; + + this.topicMetricsMap = new HashMap<>(); + this.topicPartitionMetricsMap = new HashMap<>(); + LOG.info("Running KafkaOffsetMetricManager"); + } + + public void registerMetricsForNewTopicPartitions(Set newAssignment) { + for (TopicPartition topicPartition : newAssignment) { + if (!topicPartitionMetricsMap.containsKey(topicPartition)) { + LOG.info("Registering metric for topicPartition: {}", topicPartition); + // create topic level metrics for given topic if absent + String topic = topicPartition.topic(); + KafkaOffsetTopicMetrics topicMetrics = topicMetricsMap.get(topic); + if (topicMetrics == null) { + topicMetrics = new KafkaOffsetTopicMetrics(topic); + topicMetricsMap.put(topic, topicMetrics); + topologyContext.registerMetricSet("kafkaOffset", topicMetrics); + } + + KafkaOffsetPartitionMetrics topicPartitionMetricSet + = new KafkaOffsetPartitionMetrics<>(offsetManagerSupplier, consumerSupplier, topicPartition, topicMetrics); + topicPartitionMetricsMap.put(topicPartition, topicPartitionMetricSet); + topologyContext.registerMetricSet("kafkaOffset", topicPartitionMetricSet); + } + } + } + + public Map getTopicPartitionMetricsMap() { + return topicPartitionMetricsMap; + } + + public Map getTopicMetricsMap() { + return topicMetricsMap; + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java new file mode 100644 index 00000000000..28a20d1cd79 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.metrics2; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.storm.kafka.spout.internal.OffsetManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Partition level metrics. + *

+ * topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition + * topicName/partition_{number}/latestTimeOffset //gives end offset of the partition + * topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout + * topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout + * topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset + * topicName/partition_{number}/recordsInPartition // total number of records in the partition + *

+ */ +public class KafkaOffsetPartitionMetrics implements MetricSet { + private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetPartitionMetrics.class); + private final Supplier> offsetManagerSupplier; + private final Supplier> consumerSupplier; + + private TopicPartition topicPartition; + private KafkaOffsetTopicMetrics topicMetrics; + + public KafkaOffsetPartitionMetrics(Supplier> offsetManagerSupplier, + Supplier> consumerSupplier, + TopicPartition topicPartition, + KafkaOffsetTopicMetrics topicMetrics) { + this.offsetManagerSupplier = offsetManagerSupplier; + this.consumerSupplier = consumerSupplier; + this.topicPartition = topicPartition; + this.topicMetrics = topicMetrics; + + LOG.info("Running KafkaOffsetMetricSet"); + } + + @Override + public Map getMetrics() { + Map metrics = new HashMap(); + + String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition(); + Gauge spoutLagGauge = new Gauge() { + @Override + public Long getValue() { + Map endOffsets = getEndOffsets(Collections.singleton(topicPartition)); + if (endOffsets == null || endOffsets.isEmpty()) { + LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition); + return 0L; + } + // add value to topic level metric + OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); + Long ret = endOffsets.get(topicPartition) - offsetManager.getCommittedOffset(); + topicMetrics.totalSpoutLag += ret; + return ret; + } + }; + + Gauge earliestTimeOffsetGauge = new Gauge() { + @Override + public Long getValue() { + Map beginningOffsets = getBeginningOffsets(Collections.singleton(topicPartition)); + if (beginningOffsets == null || beginningOffsets.isEmpty()) { + LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", topicPartition); + return 0L; + } + // add value to topic level metric + Long ret = beginningOffsets.get(topicPartition); + topicMetrics.totalEarliestTimeOffset += beginningOffsets.get(topicPartition); + return ret; + } + }; + + Gauge latestTimeOffsetGauge = new Gauge() { + @Override + public Long getValue() { + Map endOffsets = getEndOffsets(Collections.singleton(topicPartition)); + if (endOffsets == null || endOffsets.isEmpty()) { + LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition); + return 0L; + } + // add value to topic level metric + Long ret = endOffsets.get(topicPartition); + topicMetrics.totalLatestTimeOffset += ret; + return ret; + } + }; + + Gauge latestEmittedOffsetGauge = new Gauge() { + @Override + public Long getValue() { + // add value to topic level metric + OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); + Long ret = offsetManager.getLatestEmittedOffset(); + topicMetrics.totalLatestEmittedOffset += ret; + return ret; + } + }; + + Gauge latestCompletedOffsetGauge = new Gauge() { + @Override + public Long getValue() { + // add value to topic level metric + OffsetManager offsetManager = offsetManagerSupplier.get().get(topicPartition); + Long ret = offsetManager.getCommittedOffset(); + topicMetrics.totalLatestCompletedOffset += ret; + return ret; + } + }; + + Gauge recordsInPartitionGauge = new Gauge() { + @Override + public Long getValue() { + Map endOffsets = getEndOffsets(Collections.singleton(topicPartition)); + if (endOffsets == null || endOffsets.isEmpty()) { + LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", topicPartition); + return 0L; + } + Map beginningOffsets = getBeginningOffsets(Collections.singleton(topicPartition)); + if (beginningOffsets == null || beginningOffsets.isEmpty()) { + LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", topicPartition); + return 0L; + } + // add value to topic level metric + Long ret = endOffsets.get(topicPartition) - beginningOffsets.get(topicPartition); + topicMetrics.totalRecordsInPartitions += ret; + return ret; + } + }; + + metrics.put(metricPath + "/" + "spoutLag", spoutLagGauge); + metrics.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffsetGauge); + metrics.put(metricPath + "/" + "latestTimeOffset", latestTimeOffsetGauge); + metrics.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffsetGauge); + metrics.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffsetGauge); + metrics.put(metricPath + "/" + "recordsInPartition", recordsInPartitionGauge); + + return metrics; + } + + private Map getBeginningOffsets(Set topicPartitions) { + Consumer consumer = consumerSupplier.get(); + if (consumer == null) { + LOG.error("Kafka consumer object is null, returning 0."); + return Collections.EMPTY_MAP; + } + + Map beginningOffsets; + try { + beginningOffsets = consumer.beginningOffsets(topicPartitions); + } catch (RetriableException e) { + LOG.error("Failed to get offset from Kafka for topic partitions: {}.", topicPartition, e); + return Collections.EMPTY_MAP; + } + return beginningOffsets; + } + + private Map getEndOffsets(Set topicPartitions) { + Consumer consumer = consumerSupplier.get(); + if (consumer == null) { + LOG.error("Kafka consumer object is null, returning 0."); + return Collections.EMPTY_MAP; + } + + Map endOffsets; + try { + endOffsets = consumer.endOffsets(topicPartitions); + } catch (RetriableException e) { + LOG.error("Failed to get offset from Kafka for topic partitions: {}.", topicPartition, e); + return Collections.EMPTY_MAP; + } + return endOffsets; + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java new file mode 100644 index 00000000000..cd0fbfc6b2c --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.metrics2; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Topic level metrics. + *

+ * topicName/totalEarliestTimeOffset //gives the total beginning offset of all the associated partitions of this spout + * topicName/totalLatestTimeOffset //gives the total end offset of all the associated partitions of this spout + * topicName/totalLatestEmittedOffset //gives the total latest emitted offset of all the associated partitions of this spout + * topicName/totalLatestCompletedOffset //gives the total latest committed offset of all the associated partitions of this spout + * topicName/spoutLag // total spout lag of all the associated partitions of this spout + * topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout + *

+ */ +public class KafkaOffsetTopicMetrics implements MetricSet { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetTopicMetrics.class); + + private String topic; + long totalSpoutLag; + long totalEarliestTimeOffset; + long totalLatestTimeOffset; + long totalLatestEmittedOffset; + long totalLatestCompletedOffset; + long totalRecordsInPartitions; + + + public KafkaOffsetTopicMetrics(String topic) { + this.topic = topic; + this.totalSpoutLag = 0L; + this.totalEarliestTimeOffset = 0L; + this.totalLatestTimeOffset = 0L; + this.totalLatestEmittedOffset = 0L; + this.totalLatestCompletedOffset = 0L; + this.totalRecordsInPartitions = 0L; + LOG.info("Create KafkaOffsetTopicMetrics for topic: {}", topic); + } + + @Override + public Map getMetrics() { + Map metrics = new HashMap(); + + Gauge totalSpoutLagGauge = new Gauge() { + @Override + public Long getValue() { + return totalSpoutLag; + } + }; + + Gauge totalEarliestTimeOffsetGauge = new Gauge() { + @Override + public Long getValue() { + return totalEarliestTimeOffset; + } + }; + + Gauge totalLatestTimeOffsetGauge = new Gauge() { + @Override + public Long getValue() { + return totalLatestTimeOffset; + } + }; + + Gauge totalLatestEmittedOffsetGauge = new Gauge() { + @Override + public Long getValue() { + return totalLatestEmittedOffset; + } + }; + + Gauge totalLatestCompletedOffsetGauge = new Gauge() { + @Override + public Long getValue() { + return totalLatestCompletedOffset; + } + }; + + Gauge totalRecordsInPartitionsGauge = new Gauge() { + @Override + public Long getValue() { + return totalRecordsInPartitions; + } + }; + + metrics.put(topic + "/" + "totalSpoutLag", totalSpoutLagGauge); + metrics.put(topic + "/" + "totalEarliestTimeOffset", totalEarliestTimeOffsetGauge); + metrics.put(topic + "/" + "totalLatestTimeOffset", totalLatestTimeOffsetGauge); + metrics.put(topic + "/" + "totalLatestEmittedOffset", totalLatestEmittedOffsetGauge); + metrics.put(topic + "/" + "totalLatestCompletedOffset", totalLatestCompletedOffsetGauge); + metrics.put(topic + "/" + "totalRecordsInPartitions", totalRecordsInPartitionsGauge); + return metrics; + } + + private class TopicMetrics { + long totalSpoutLag = 0L; + long totalEarliestTimeOffset = 0L; + long totalLatestTimeOffset = 0L; + long totalLatestEmittedOffset = 0L; + long totalLatestCompletedOffset = 0L; + long totalRecordsInPartitions = 0L; + + public void incrementTotalSpoutLag(long offset) { + totalSpoutLag += offset; + } + + public void incrementTotalEarliestTimeOffset(long offset) { + totalEarliestTimeOffset += offset; + } + + public void incrementTotalLatestTimeOffset(long offset) { + totalLatestTimeOffset += offset; + } + } +} diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java index 300adececb4..b8f931364fd 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java @@ -36,15 +36,18 @@ public class TopicAssigner implements Serializable { * @param consumer The Kafka consumer to assign partitions to * @param newAssignment The partitions to assign. * @param listener The rebalance listener to call back on when the assignment changes + * @return a boolean value indicating whether the partition assignment changed */ - public void assignPartitions(Consumer consumer, Set newAssignment, + public boolean assignPartitions(Consumer consumer, Set newAssignment, ConsumerRebalanceListener listener) { Set currentAssignment = consumer.assignment(); if (!newAssignment.equals(currentAssignment)) { listener.onPartitionsRevoked(currentAssignment); consumer.assign(newAssignment); listener.onPartitionsAssigned(newAssignment); + return true; } + return false; } } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java index 9f74f5208b6..7180c45c29c 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.Map; + import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -146,21 +147,4 @@ public void testSpoutShouldResumeWhereItLeftOffWithEarliestStrategy() throws Exc //With earliest, the spout should also resume where it left off, rather than restart at the earliest offset. doReactivationTest(FirstPollOffsetStrategy.EARLIEST); } - - @Test - public void testSpoutMustHandleGettingMetricsWhileDeactivated() throws Exception { - //Storm will try to get metrics from the spout even while deactivated, the spout must be able to handle this - prepareSpout(10, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST); - - for (int i = 0; i < 5; i++) { - KafkaSpoutMessageId msgId = emitOne(); - spout.ack(msgId); - } - - spout.deactivate(); - - Map offsetMetric = (Map) spout.getKafkaOffsetMetric().getValueAndReset(); - assertThat(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC + "/totalSpoutLag"), is(5L)); - } - } diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java index 322724dedcd..e648faf93bd 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java @@ -20,8 +20,6 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; @@ -36,7 +34,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Time; @@ -392,48 +389,4 @@ public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception { spout.nextTuple(); verify(collectorMock).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); } - - @Test - public void testOffsetMetrics() throws Exception { - final int messageCount = 10; - prepareSpout(messageCount); - - Map offsetMetric = (Map) spout.getKafkaOffsetMetric().getValueAndReset(); - assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue(), 0); - // the offset of the last available message + 1. - assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue(), 10); - assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalRecordsInPartitions").longValue(), 10); - assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue(), 0); - assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue(), 0); - //totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset - assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue(), 10); - - //Emit all messages and check that they are emitted. Ack the messages too - for (int i = 0; i < messageCount; i++) { - nextTuple_verifyEmitted_ack_resetCollector(i); - } - - commitAndVerifyAllMessagesCommitted(messageCount); - - offsetMetric = (Map) spout.getKafkaOffsetMetric().getValueAndReset(); - assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue(), 0); - assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue(), 10); - //latest offset - assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue(), 9); - // offset where processing will resume upon spout restart - assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue(), 10); - assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue(), 0); - } - - @Test - public void testOffsetMetricsReturnsNullWhenRetriableExceptionThrown() throws Exception { - final int messageCount = 10; - prepareSpout(messageCount); - - // Ensure a timeout exception results in the return value being null - when(getKafkaConsumer().beginningOffsets(anyCollection())).thenThrow(TimeoutException.class); - - Map offsetMetric = (Map) spout.getKafkaOffsetMetric().getValueAndReset(); - assertNull(offsetMetric); - } }