Skip to content

Commit

Permalink
[STORM-3782] Refactor KafkaOffsetMetric to use V2 metrics (#3404)
Browse files Browse the repository at this point in the history
* [STORM-3782] Refactor KafkaOffsetMetric to use V2 metrics
  • Loading branch information
RuiLi8080 authored Aug 15, 2023
1 parent 9259006 commit 40b9822
Show file tree
Hide file tree
Showing 8 changed files with 445 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric;
import org.apache.storm.kafka.spout.metrics2.KafkaOffsetMetricManager;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
Expand Down Expand Up @@ -102,7 +102,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private transient Timer refreshAssignmentTimer;
private transient TopologyContext context;
private transient CommitMetadataManager commitMetadataManager;
private transient KafkaOffsetMetric<K, V> kafkaOffsetMetric;
private transient KafkaOffsetMetricManager<K, V> kafkaOffsetMetricManager;
private transient KafkaSpoutConsumerRebalanceListener rebalanceListener;

public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
Expand Down Expand Up @@ -147,19 +147,12 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC
consumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());

tupleListener.open(conf, context);
if (canRegisterMetrics()) {
registerMetric();
}
this.kafkaOffsetMetricManager
= new KafkaOffsetMetricManager<>(() -> Collections.unmodifiableMap(offsetManagers), () -> consumer, context);

LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
}

private void registerMetric() {
LOG.info("Registering Spout Metrics");
kafkaOffsetMetric = new KafkaOffsetMetric<>(() -> Collections.unmodifiableMap(offsetManagers), () -> consumer);
context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
}

private boolean canRegisterMetrics() {
try {
KafkaConsumer.class.getDeclaredMethod("beginningOffsets", Collection.class);
Expand Down Expand Up @@ -632,7 +625,11 @@ private void refreshAssignment() {
Collections.sort(allPartitionsSorted, TopicPartitionComparator.INSTANCE);
Set<TopicPartition> assignedPartitions = kafkaSpoutConfig.getTopicPartitioner()
.getPartitionsForThisTask(allPartitionsSorted, context);
topicAssigner.assignPartitions(consumer, assignedPartitions, rebalanceListener);
boolean partitionChanged = topicAssigner.assignPartitions(consumer, assignedPartitions, rebalanceListener);
if (partitionChanged && canRegisterMetrics()) {
LOG.info("Partitions assignments has changed, updating metrics.");
kafkaOffsetMetricManager.registerMetricsForNewTopicPartitions(assignedPartitions);
}
}

@Override
Expand Down Expand Up @@ -741,7 +738,7 @@ public boolean shouldPoll() {
}

@VisibleForTesting
KafkaOffsetMetric<K, V> getKafkaOffsetMetric() {
return kafkaOffsetMetric;
KafkaOffsetMetricManager<K, V> getKafkaOffsetMetricManager() {
return kafkaOffsetMetricManager;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.kafka.spout.metrics2;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.task.TopologyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is used to manage both the partition and topic level offset metrics.
*/
public class KafkaOffsetMetricManager<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetricManager.class);
private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
private final Supplier<Consumer<K, V>> consumerSupplier;
private TopologyContext topologyContext;

private Map<String, KafkaOffsetTopicMetrics> topicMetricsMap;
private Map<TopicPartition, KafkaOffsetPartitionMetrics> topicPartitionMetricsMap;

public KafkaOffsetMetricManager(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier,
Supplier<Consumer<K, V>> consumerSupplier,
TopologyContext topologyContext) {
this.offsetManagerSupplier = offsetManagerSupplier;
this.consumerSupplier = consumerSupplier;
this.topologyContext = topologyContext;

this.topicMetricsMap = new HashMap<>();
this.topicPartitionMetricsMap = new HashMap<>();
LOG.info("Running KafkaOffsetMetricManager");
}

public void registerMetricsForNewTopicPartitions(Set<TopicPartition> newAssignment) {
for (TopicPartition topicPartition : newAssignment) {
if (!topicPartitionMetricsMap.containsKey(topicPartition)) {
LOG.info("Registering metric for topicPartition: {}", topicPartition);
// create topic level metrics for given topic if absent
String topic = topicPartition.topic();
KafkaOffsetTopicMetrics topicMetrics = topicMetricsMap.get(topic);
if (topicMetrics == null) {
topicMetrics = new KafkaOffsetTopicMetrics(topic);
topicMetricsMap.put(topic, topicMetrics);
topologyContext.registerMetricSet("kafkaOffset", topicMetrics);
}

KafkaOffsetPartitionMetrics topicPartitionMetricSet
= new KafkaOffsetPartitionMetrics<>(offsetManagerSupplier, consumerSupplier, topicPartition, topicMetrics);
topicPartitionMetricsMap.put(topicPartition, topicPartitionMetricSet);
topologyContext.registerMetricSet("kafkaOffset", topicPartitionMetricSet);
}
}
}

public Map<TopicPartition, KafkaOffsetPartitionMetrics> getTopicPartitionMetricsMap() {
return topicPartitionMetricsMap;
}

public Map<String, KafkaOffsetTopicMetrics> getTopicMetricsMap() {
return topicMetricsMap;
}
}
Loading

0 comments on commit 40b9822

Please sign in to comment.