-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-3759 Additonal Trident Kafka Spout Metrics #3385
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,8 @@ | |
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; | ||
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_TIMESTAMP; | ||
|
||
import com.codahale.metrics.Gauge; | ||
import com.codahale.metrics.Meter; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import java.io.Serializable; | ||
import java.util.ArrayList; | ||
|
@@ -38,10 +40,13 @@ | |
import java.util.stream.Collectors; | ||
|
||
import org.apache.kafka.clients.consumer.Consumer; | ||
import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; | ||
import org.apache.kafka.common.Metric; | ||
import org.apache.kafka.common.MetricName; | ||
import org.apache.kafka.common.TopicPartition; | ||
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy; | ||
import org.apache.storm.kafka.spout.RecordTranslator; | ||
|
@@ -60,6 +65,14 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable { | |
private static final long serialVersionUID = -7343927794834130435L; | ||
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class); | ||
|
||
// Metrics | ||
public static final String UNDERSCORE = "_"; | ||
public static final String INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC = "records-lag-max"; | ||
public static final String KAFKA_CLIENT_MAX_LAG_METRIC_NAME = "kafkaClientMaxLag"; | ||
protected transient Gauge<Double> kafkaClientMaxLag; | ||
public static final String EVENT_EMIT_METRIC_NAME = "eventEmitRate"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would remove "Rate" from the metric name and variable name. A meter has metrics like |
||
protected transient Meter eventEmitRate; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the builtin emitted metric not sufficient for this purpose? https://github.com/apache/storm/blob/master/docs/Metrics.md#__emit-count There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This builtin might be sufficient. Our metric names seem to be reporting values like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the recent change on metrics in storm (converting to v2 metrics), stream name will be appended to the metric name.
DimensionalReporter can be used to separate dimensions (stream_name, componentId, etc) from metrics. |
||
|
||
// Kafka | ||
private final Consumer<K, V> consumer; | ||
private final KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig; | ||
|
@@ -87,7 +100,7 @@ public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, | |
|
||
@VisibleForTesting | ||
KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext, | ||
ConsumerFactory<K, V> consumerFactory, TopicAssigner topicAssigner) { | ||
ConsumerFactory<K, V> consumerFactory, TopicAssigner topicAssigner) { | ||
this.kafkaSpoutConfig = kafkaSpoutConfig; | ||
this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps()); | ||
this.topologyContext = topologyContext; | ||
|
@@ -97,13 +110,48 @@ public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, | |
this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); | ||
this.startTimeStamp = kafkaSpoutConfig.getStartTimeStamp(); | ||
LOG.debug("Created {}", this.toString()); | ||
|
||
registerMetric(); | ||
} | ||
|
||
/** | ||
* Acquires metric instances through registration with the TopologyContext. | ||
*/ | ||
private void registerMetric() { | ||
LOG.info("Registering Spout Metrics"); | ||
|
||
String configGroupId = ""; | ||
if (kafkaSpoutConfig.getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG) != null) { | ||
configGroupId = kafkaSpoutConfig.getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG).toString() + UNDERSCORE; | ||
} | ||
|
||
eventEmitRate = topologyContext.registerMeter( | ||
configGroupId + EVENT_EMIT_METRIC_NAME); | ||
kafkaClientMaxLag = topologyContext.registerGauge( | ||
configGroupId + KAFKA_CLIENT_MAX_LAG_METRIC_NAME, | ||
new Gauge<Double>() { | ||
@Override | ||
public Double getValue() { | ||
if (consumer == null) { | ||
return 0.0; | ||
} | ||
// Extract spout lag from consumer's internal metrics | ||
for (Map.Entry<MetricName, ? extends Metric> metricKeyVal : consumer.metrics().entrySet()) { | ||
Metric metric = metricKeyVal.getValue(); | ||
if (metric.metricName().name().equals(INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand correctly, there are actually two types of "records-lag-max" metrics, one is partition level and the other one is consumer level They use the same name as "records-lag-max", while the tags are different. In this case, it is hard to tell what the first "records-lag-max" metric from I noticed that older kafka version has different way of doing things . We will to take care of compatibility issues here. |
||
return metric.value(); | ||
} | ||
} | ||
return 0.0; | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* Emit a batch that has already been emitted. | ||
*/ | ||
public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collector, | ||
KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> currBatch) { | ||
KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> currBatch) { | ||
|
||
final TopicPartition currBatchTp = currBatchPartition.getTopicPartition(); | ||
|
||
|
@@ -115,12 +163,12 @@ public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collect | |
if (!topologyContext.getStormId().equals(currBatchMeta.getTopologyId()) | ||
&& isFirstPollOffsetStrategyIgnoringCommittedOffsets()) { | ||
LOG.debug("Skipping re-emit of batch that was originally emitted by another topology," | ||
+ " because the current first poll offset strategy ignores committed offsets."); | ||
+ " because the current first poll offset strategy ignores committed offsets."); | ||
return; | ||
} | ||
|
||
LOG.debug("Re-emitting batch: [transaction= {}], [currBatchPartition = {}], [currBatchMetadata = {}], [collector = {}]", | ||
tx, currBatchPartition, currBatch, collector); | ||
tx, currBatchPartition, currBatch, collector); | ||
|
||
try { | ||
// pause other topic-partitions to only poll from current topic-partition | ||
|
@@ -129,9 +177,9 @@ && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) { | |
long seekOffset = currBatchMeta.getFirstOffset(); | ||
if (seekOffset < 0 && currBatchMeta.getFirstOffset() == currBatchMeta.getLastOffset()) { | ||
LOG.debug("Skipping re-emit of batch with negative starting offset." | ||
+ " The spout may set a negative starting offset for an empty batch that occurs at the start of a partition." | ||
+ " It is not expected that Trident will replay such an empty batch," | ||
+ " but this guard is here in case it tries to do so. See STORM-2990, STORM-3279 for context."); | ||
+ " The spout may set a negative starting offset for an empty batch that occurs at the start of a partition." | ||
+ " It is not expected that Trident will replay such an empty batch," | ||
+ " but this guard is here in case it tries to do so. See STORM-2990, STORM-3279 for context."); | ||
return; | ||
} | ||
LOG.debug("Seeking to offset [{}] for topic partition [{}]", seekOffset, currBatchTp); | ||
|
@@ -145,9 +193,11 @@ && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) { | |
break; | ||
} | ||
if (record.offset() > currBatchMeta.getLastOffset()) { | ||
throw new RuntimeException(String.format("Error when re-emitting batch. Overshot the end of the batch." | ||
throw new RuntimeException(String.format( | ||
"Error when re-emitting batch. Overshot the end of the batch." | ||
+ " The batch end offset was [{%d}], but received [{%d}]." | ||
+ " Ensure log compaction is disabled in Kafka, since it is incompatible with non-opaque transactional spouts.", | ||
+ " Ensure log compaction is disabled in Kafka, since it is" | ||
+ " incompatible with non-opaque transactional spouts.", | ||
currBatchMeta.getLastOffset(), record.offset())); | ||
} | ||
emitTuple(collector, record); | ||
|
@@ -157,17 +207,17 @@ && isFirstPollOffsetStrategyIgnoringCommittedOffsets()) { | |
LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions); | ||
} | ||
LOG.debug("Re-emitted batch: [transaction = {}], [currBatchPartition = {}], [currBatchMetadata = {}], " | ||
+ "[collector = {}]", tx, currBatchPartition, currBatchMeta, collector); | ||
+ "[collector = {}]", tx, currBatchPartition, currBatchMeta, collector); | ||
} | ||
|
||
/** | ||
* Emit a new batch. | ||
*/ | ||
public Map<String, Object> emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector, | ||
KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) { | ||
KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) { | ||
|
||
LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]", | ||
tx, currBatchPartition, lastBatch, collector); | ||
tx, currBatchPartition, lastBatch, collector); | ||
|
||
final TopicPartition currBatchTp = currBatchPartition.getTopicPartition(); | ||
|
||
|
@@ -208,20 +258,21 @@ public Map<String, Object> emitPartitionBatchNew(TransactionAttempt tx, TridentC | |
LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions); | ||
} | ||
LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " | ||
+ "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector); | ||
+ "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector); | ||
|
||
return currentBatch.toMap(); | ||
} | ||
|
||
private boolean isFirstPollOffsetStrategyIgnoringCommittedOffsets() { | ||
return firstPollOffsetStrategy == FirstPollOffsetStrategy.EARLIEST | ||
|| firstPollOffsetStrategy == FirstPollOffsetStrategy.LATEST; | ||
|| firstPollOffsetStrategy == FirstPollOffsetStrategy.LATEST; | ||
} | ||
|
||
private void throwIfEmittingForUnassignedPartition(TopicPartition currBatchTp) { | ||
final Set<TopicPartition> assignments = consumer.assignment(); | ||
if (!assignments.contains(currBatchTp)) { | ||
throw new IllegalStateException("The spout is asked to emit tuples on a partition it is not assigned." | ||
throw new IllegalStateException( | ||
"The spout is asked to emit tuples on a partition it is not assigned." | ||
+ " This indicates a bug in the TopicFilter or ManualPartitioner implementations." | ||
+ " The current partition is [" + currBatchTp + "], the assigned partitions are [" + assignments + "]."); | ||
} | ||
|
@@ -230,6 +281,9 @@ private void throwIfEmittingForUnassignedPartition(TopicPartition currBatchTp) { | |
private void emitTuple(TridentCollector collector, ConsumerRecord<K, V> record) { | ||
final List<Object> tuple = translator.apply(record); | ||
collector.emit(tuple); | ||
|
||
// Track the number of records emitted | ||
eventEmitRate.mark(tuple.size()); | ||
LOG.debug("Emitted tuple {} for record [{}]", tuple, record); | ||
} | ||
|
||
|
@@ -247,8 +301,8 @@ private void emitTuple(TridentCollector collector, ConsumerRecord<K, V> record) | |
*/ | ||
private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) { | ||
if (isFirstPollSinceExecutorStarted(tp)) { | ||
boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null | ||
|| !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId()); | ||
boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null | ||
|| !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId()); | ||
if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) { | ||
LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp); | ||
consumer.seekToBeginning(Collections.singleton(tp)); | ||
|
@@ -269,7 +323,7 @@ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMet | |
consumer.seekToEnd(Collections.singleton(tp)); | ||
} else if (firstPollOffsetStrategy == UNCOMMITTED_TIMESTAMP) { | ||
LOG.debug("First poll for topic partition [{}] with no last batch metadata, " | ||
+ "seeking to partition based on startTimeStamp", tp); | ||
+ "seeking to partition based on startTimeStamp", tp); | ||
seekOffsetByStartTimeStamp(tp); | ||
} | ||
tpToFirstSeekOffset.put(tp, consumer.position(tp)); | ||
|
@@ -284,7 +338,7 @@ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMet | |
long initialFetchOffset = tpToFirstSeekOffset.get(tp); | ||
consumer.seek(tp, initialFetchOffset); | ||
LOG.debug("First poll for topic partition [{}], no last batch metadata present." | ||
+ " Using stored initial fetch offset [{}]", tp, initialFetchOffset); | ||
+ " Using stored initial fetch offset [{}]", tp, initialFetchOffset); | ||
} | ||
|
||
final long fetchOffset = consumer.position(tp); | ||
|
@@ -327,15 +381,15 @@ public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map | |
.collect(Collectors.toList()); | ||
final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(sortedPartitions); | ||
LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ", | ||
allPartitions, topologyContext.getThisTaskIndex(), getNumTasks()); | ||
allPartitions, topologyContext.getThisTaskIndex(), getNumTasks()); | ||
return allPartitions; | ||
} | ||
|
||
/** | ||
* Get the partitions that should be handled by this task. | ||
*/ | ||
public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks, | ||
List<KafkaTridentSpoutTopicPartition> allPartitionInfoSorted) { | ||
List<KafkaTridentSpoutTopicPartition> allPartitionInfoSorted) { | ||
List<TopicPartition> tps = allPartitionInfoSorted.stream() | ||
.map(kttp -> kttp.getTopicPartition()) | ||
.collect(Collectors.toList()); | ||
|
@@ -377,8 +431,8 @@ public void close() { | |
@Override | ||
public final String toString() { | ||
return super.toString() | ||
+ "{kafkaSpoutConfig=" + kafkaSpoutConfig | ||
+ '}'; | ||
+ "{kafkaSpoutConfig=" + kafkaSpoutConfig | ||
+ '}'; | ||
} | ||
|
||
/** | ||
|
@@ -389,13 +443,13 @@ private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceLi | |
@Override | ||
public void onPartitionsRevoked(Collection<TopicPartition> partitions) { | ||
LOG.info("Partitions revoked. [consumer={}, topic-partitions={}]", | ||
consumer, partitions); | ||
consumer, partitions); | ||
} | ||
|
||
@Override | ||
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { | ||
LOG.info("Partitions reassignment. [consumer={}, topic-partitions={}]", | ||
consumer, partitions); | ||
consumer, partitions); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can all there variables be
private
? If some is used in unit test, use the modifiers with the smallest scope as possible and add@VisibleForTesting
annotation to indicate that.