diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle b/runners/google-cloud-dataflow-java/worker/build.gradle index b7e6e981effe..92beccd067e2 100644 --- a/runners/google-cloud-dataflow-java/worker/build.gradle +++ b/runners/google-cloud-dataflow-java/worker/build.gradle @@ -54,6 +54,7 @@ def sdk_provided_project_dependencies = [ ":runners:google-cloud-dataflow-java", ":sdks:java:extensions:avro", ":sdks:java:extensions:google-cloud-platform-core", + ":sdks:java:io:kafka", // For metric propagation into worker ":sdks:java:io:google-cloud-platform", ] diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java index 30e920119120..77f867793ae2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java @@ -32,13 +32,15 @@ import java.util.Map.Entry; import java.util.Optional; import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; +import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics; import org.apache.beam.sdk.metrics.LabeledMetricNameUtils; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.util.HistogramData; /** * Converts metric updates to {@link PerStepNamespaceMetrics} protos. Currently we only support - * converting metrics from {@link BigQuerySinkMetrics} with this converter. + * converting metrics from {@link BigQuerySinkMetrics} and from {@link KafkaSinkMetrics} with this + * converter. */ public class MetricsToPerStepNamespaceMetricsConverter { @@ -65,7 +67,10 @@ private static Optional convertCounterToMetricValue( MetricName metricName, Long value, Map parsedPerWorkerMetricsCache) { - if (value == 0 || !metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE)) { + + if (value == 0 + || (!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE) + && !metricName.getNamespace().equals(KafkaSinkMetrics.METRICS_NAMESPACE))) { return Optional.empty(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 524906023722..c478341c1c39 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -93,6 +93,7 @@ import org.apache.beam.sdk.fn.JvmInitializers; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; +import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -663,6 +664,10 @@ public static void main(String[] args) throws Exception { enableBigQueryMetrics(); } + if (DataflowRunner.hasExperiment(options, "enable_kafka_metrics")) { + KafkaSinkMetrics.setSupportKafkaMetrics(true); + } + JvmInitializers.runBeforeProcessing(options); worker.startStatusPages(); worker.start(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java index a18ca8cfd6dc..525464ef2e1f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java @@ -35,6 +35,7 @@ import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; +import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; /** Contains a few of the stage specific fields. E.g. metrics container registry, counters etc. */ @@ -118,7 +119,9 @@ public List extractPerWorkerMetricValues() { private void translateKnownPerWorkerCounters(List metrics) { for (PerStepNamespaceMetrics perStepnamespaceMetrics : metrics) { if (!BigQuerySinkMetrics.METRICS_NAMESPACE.equals( - perStepnamespaceMetrics.getMetricsNamespace())) { + perStepnamespaceMetrics.getMetricsNamespace()) + && !KafkaSinkMetrics.METRICS_NAMESPACE.equals( + perStepnamespaceMetrics.getMetricsNamespace())) { continue; } for (MetricValue metric : perStepnamespaceMetrics.getMetricValues()) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 2d5a8d8266ae..37c5ad261280 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -366,7 +366,6 @@ public void testExtractPerWorkerMetricUpdates_populatedMetrics() { .setMetricsNamespace("BigQuerySink") .setMetricValues(Collections.singletonList(expectedCounter)); - // Expected histogram metric List bucketCounts = Collections.singletonList(1L); Linear linearOptions = new Linear().setNumberOfBuckets(10).setWidth(10.0).setStart(0.0); @@ -393,6 +392,44 @@ public void testExtractPerWorkerMetricUpdates_populatedMetrics() { assertThat(updates, containsInAnyOrder(histograms, counters)); } + @Test + public void testExtractPerWorkerMetricUpdatesKafka_populatedMetrics() { + StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true); + + MetricName histogramMetricName = MetricName.named("KafkaSink", "histogram"); + HistogramData.LinearBuckets linearBuckets = HistogramData.LinearBuckets.of(0, 10, 10); + c2.getPerWorkerHistogram(histogramMetricName, linearBuckets).update(5.0); + + Iterable updates = + StreamingStepMetricsContainer.extractPerWorkerMetricUpdates(registry); + + // Expected histogram metric + List bucketCounts = Collections.singletonList(1L); + + Linear linearOptions = new Linear().setNumberOfBuckets(10).setWidth(10.0).setStart(0.0); + BucketOptions bucketOptions = new BucketOptions().setLinear(linearOptions); + + DataflowHistogramValue linearHistogram = + new DataflowHistogramValue() + .setCount(1L) + .setBucketOptions(bucketOptions) + .setBucketCounts(bucketCounts); + + MetricValue expectedHistogram = + new MetricValue() + .setMetric("histogram") + .setMetricLabels(new HashMap<>()) + .setValueHistogram(linearHistogram); + + PerStepNamespaceMetrics histograms = + new PerStepNamespaceMetrics() + .setOriginalStep("s2") + .setMetricsNamespace("KafkaSink") + .setMetricValues(Collections.singletonList(expectedHistogram)); + + assertThat(updates, containsInAnyOrder(histograms)); + } + @Test public void testExtractPerWorkerMetricUpdates_emptyMetrics() { StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java new file mode 100644 index 000000000000..147a30dcdd1a --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import com.google.auto.value.AutoValue; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.sdk.metrics.Histogram; +import org.apache.beam.sdk.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Stores and exports metrics for a batch of Kafka Client RPCs. */ +public interface KafkaMetrics { + + void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime); + + void updateKafkaMetrics(); + + /** No-op implementation of {@code KafkaResults}. */ + class NoOpKafkaMetrics implements KafkaMetrics { + private NoOpKafkaMetrics() {} + + @Override + public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {} + + @Override + public void updateKafkaMetrics() {} + + private static NoOpKafkaMetrics singleton = new NoOpKafkaMetrics(); + + static NoOpKafkaMetrics getInstance() { + return singleton; + } + } + + /** + * Metrics of a batch of RPCs. Member variables are thread safe; however, this class does not have + * atomicity across member variables. + * + *

Expected usage: A number of threads record metrics in an instance of this class with the + * member methods. Afterwards, a single thread should call {@code updateStreamingInsertsMetrics} + * which will export all counters metrics and RPC latency distribution metrics to the underlying + * {@code perWorkerMetrics} container. Afterwards, metrics should not be written/read from this + * object. + */ + @AutoValue + abstract class KafkaMetricsImpl implements KafkaMetrics { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsImpl.class); + + static HashMap latencyHistograms = new HashMap(); + + abstract HashMap> perTopicRpcLatencies(); + + abstract AtomicBoolean isWritable(); + + public static KafkaMetricsImpl create() { + return new AutoValue_KafkaMetrics_KafkaMetricsImpl( + new HashMap>(), new AtomicBoolean(true)); + } + + /** Record the rpc status and latency of a successful Kafka poll RPC call. */ + @Override + public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) { + if (isWritable().get()) { + ConcurrentLinkedQueue latencies = perTopicRpcLatencies().get(topic); + if (latencies == null) { + latencies = new ConcurrentLinkedQueue(); + latencies.add(elapsedTime); + perTopicRpcLatencies().put(topic, latencies); + } else { + latencies.add(elapsedTime); + } + } + } + + /** Record rpc latency histogram metrics for all recorded topics. */ + private void recordRpcLatencyMetrics() { + for (Map.Entry> topicLatencies : + perTopicRpcLatencies().entrySet()) { + Histogram topicHistogram; + if (latencyHistograms.containsKey(topicLatencies.getKey())) { + topicHistogram = latencyHistograms.get(topicLatencies.getKey()); + } else { + topicHistogram = + KafkaSinkMetrics.createRPCLatencyHistogram( + KafkaSinkMetrics.RpcMethod.POLL, topicLatencies.getKey()); + latencyHistograms.put(topicLatencies.getKey(), topicHistogram); + } + // update all the latencies + for (Duration d : topicLatencies.getValue()) { + Preconditions.checkArgumentNotNull(topicHistogram); + topicHistogram.update(d.toMillis()); + } + } + } + + /** + * Export all metrics recorded in this instance to the underlying {@code perWorkerMetrics} + * containers. This function will only report metrics once per instance. Subsequent calls to + * this function will no-op. + */ + @Override + public void updateKafkaMetrics() { + if (!isWritable().compareAndSet(true, false)) { + LOG.warn("Updating stale Kafka metrics container"); + return; + } + recordRpcLatencyMetrics(); + } + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java new file mode 100644 index 000000000000..f71926f97d27 --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import org.apache.beam.sdk.metrics.DelegatingHistogram; +import org.apache.beam.sdk.metrics.Histogram; +import org.apache.beam.sdk.metrics.LabeledMetricNameUtils; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.util.HistogramData; + +/** + * Helper class to create per worker metrics for Kafka Sink stages. + * + *

Metrics will be in the namespace 'KafkaSink' and have their name formatted as: + * + *

'{baseName}-{metricLabelKey1}:{metricLabelVal1};...{metricLabelKeyN}:{metricLabelValN};' ???? + */ + +// TODO, refactor out common parts for BQ sink, so it can be reused with other sinks, eg, GCS? +// @SuppressWarnings("unused") +public class KafkaSinkMetrics { + private static boolean supportKafkaMetrics = false; + + public static final String METRICS_NAMESPACE = "KafkaSink"; + + // Base Metric names + private static final String RPC_LATENCY = "RpcLatency"; + + // Kafka Consumer Method names + enum RpcMethod { + POLL, + } + + // Metric labels + private static final String TOPIC_LABEL = "topic_name"; + private static final String RPC_METHOD = "rpc_method"; + + /** + * Creates an Histogram metric to record RPC latency. Metric will have name. + * + *

'RpcLatency*rpc_method:{method};topic_name:{topic};' + * + * @param method Kafka method associated with this metric. + * @param topic Kafka topic associated with this metric. + * @return Histogram with exponential buckets with a sqrt(2) growth factor. + */ + public static Histogram createRPCLatencyHistogram(RpcMethod method, String topic) { + LabeledMetricNameUtils.MetricNameBuilder nameBuilder = + LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(RPC_LATENCY); + nameBuilder.addLabel(RPC_METHOD, method.toString()); + nameBuilder.addLabel(TOPIC_LABEL, topic); + + MetricName metricName = nameBuilder.build(METRICS_NAMESPACE); + HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17); + + return new DelegatingHistogram(metricName, buckets, false, true); + } + + /** + * Returns a container to store metrics for Kafka metrics in Unbounded Readed. If these metrics + * are disabled, then we return a no-op container. + */ + static KafkaMetrics kafkaMetrics() { + if (supportKafkaMetrics) { + return KafkaMetrics.KafkaMetricsImpl.create(); + } else { + return KafkaMetrics.NoOpKafkaMetrics.getInstance(); + } + } + + public static void setSupportKafkaMetrics(boolean supportKafkaMetrics) { + KafkaSinkMetrics.supportKafkaMetrics = supportKafkaMetrics; + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index fed03047cf16..6ce6c7d5d233 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables; @@ -144,7 +145,6 @@ public boolean start() throws IOException { offsetFetcherThread.scheduleAtFixedRate( this::updateLatestOffsets, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS); - return advance(); } @@ -158,6 +158,9 @@ public boolean advance() throws IOException { */ while (true) { if (curBatch.hasNext()) { + // Initalize metrics container. + kafkaResults = KafkaSinkMetrics.kafkaMetrics(); + PartitionState pState = curBatch.next(); if (!pState.recordIter.hasNext()) { // -- (c) @@ -228,8 +231,10 @@ public boolean advance() throws IOException { for (Map.Entry backlogSplit : perPartitionBacklogMetrics.entrySet()) { backlogBytesOfSplit.set(backlogSplit.getValue()); } - return true; + // Pass metrics to container. + kafkaResults.updateKafkaMetrics(); + return true; } else { // -- (b) nextBatch(); @@ -377,6 +382,7 @@ public long getSplitBacklogBytes() { .setDaemon(true) .setNameFormat("KafkaConsumerPoll-thread") .build()); + private AtomicReference consumerPollException = new AtomicReference<>(); private final SynchronousQueue> availableRecordsQueue = new SynchronousQueue<>(); @@ -399,6 +405,11 @@ public long getSplitBacklogBytes() { /** watermark before any records have been read. */ private static Instant initialWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; + // Created in each next batch, and updated at the end. + public KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics(); + private Stopwatch stopwatch = Stopwatch.createUnstarted(); + private String kafkaTopic = ""; + @Override public String toString() { return name; @@ -509,6 +520,13 @@ String name() { List partitions = Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions()); + + // Each source has a single unique topic. + for (TopicPartition topicPartition : partitions) { + this.kafkaTopic = topicPartition.topic(); + break; + } + List> states = new ArrayList<>(partitions.size()); if (checkpointMark != null) { @@ -568,7 +586,16 @@ private void consumerPollLoop() { while (!closed.get()) { try { if (records.isEmpty()) { + // Each source has a single unique topic. + List topicPartitions = source.getSpec().getTopicPartitions(); + Preconditions.checkStateNotNull(topicPartitions); + + stopwatch.start(); records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); + stopwatch.stop(); + kafkaResults.updateSuccessfulRpcMetrics( + kafkaTopic, java.time.Duration.ofMillis(stopwatch.elapsed(TimeUnit.MILLISECONDS))); + } else if (availableRecordsQueue.offer( records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) { records = ConsumerRecords.empty(); @@ -592,7 +619,6 @@ private void consumerPollLoop() { private void commitCheckpointMark() { KafkaCheckpointMark checkpointMark = finalizedCheckpointMark.getAndSet(null); - if (checkpointMark != null) { LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark); Consumer consumer = Preconditions.checkStateNotNull(this.consumer); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java new file mode 100644 index 000000000000..b84e143be773 --- /dev/null +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.sdk.metrics.Histogram; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.util.HistogramData; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link KafkaSinkMetrics}. */ +// TODO:Naireen - Refactor to remove duplicate code between the two sinks +@RunWith(JUnit4.class) +public class KafkaMetricsTest { + public static class TestHistogram implements Histogram { + public List values = Lists.newArrayList(); + private MetricName metricName = MetricName.named("KafkaSink", "name"); + + @Override + public void update(double value) { + values.add(value); + } + + @Override + public MetricName getName() { + return metricName; + } + } + + public static class TestMetricsContainer extends MetricsContainerImpl { + public ConcurrentHashMap, TestHistogram> + perWorkerHistograms = + new ConcurrentHashMap, TestHistogram>(); + + public TestMetricsContainer() { + super("TestStep"); + } + + @Override + public Histogram getPerWorkerHistogram( + MetricName metricName, HistogramData.BucketType bucketType) { + perWorkerHistograms.computeIfAbsent(KV.of(metricName, bucketType), kv -> new TestHistogram()); + return perWorkerHistograms.get(KV.of(metricName, bucketType)); + } + + @Override + public void reset() { + perWorkerHistograms.clear(); + } + } + + @Test + public void testNoOpKafkaMetrics() throws Exception { + TestMetricsContainer testContainer = new TestMetricsContainer(); + MetricsEnvironment.setCurrentContainer(testContainer); + + KafkaMetrics results = KafkaMetrics.NoOpKafkaMetrics.getInstance(); + results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10)); + + results.updateKafkaMetrics(); + + assertThat(testContainer.perWorkerHistograms.size(), equalTo(0)); + } + + @Test + public void testKafkaRPCLatencyMetrics() throws Exception { + TestMetricsContainer testContainer = new TestMetricsContainer(); + MetricsEnvironment.setCurrentContainer(testContainer); + + KafkaSinkMetrics.setSupportKafkaMetrics(true); + + KafkaMetrics results = KafkaSinkMetrics.kafkaMetrics(); + + results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10)); + + results.updateKafkaMetrics(); + // RpcLatency*rpc_method:POLL;topic_name:test-topic + MetricName histogramName = + MetricName.named("KafkaSink", "RpcLatency*rpc_method:POLL;topic_name:test-topic;"); + HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(1, 17); + + assertThat(testContainer.perWorkerHistograms.size(), equalTo(1)); + assertThat( + testContainer.perWorkerHistograms.get(KV.of(histogramName, bucketType)).values, + containsInAnyOrder(Double.valueOf(10.0))); + } + + @Test + public void testKafkaRPCLatencyMetricsAreNotRecorded() throws Exception { + TestMetricsContainer testContainer = new TestMetricsContainer(); + MetricsEnvironment.setCurrentContainer(testContainer); + + KafkaSinkMetrics.setSupportKafkaMetrics(false); + + KafkaMetrics results = KafkaSinkMetrics.kafkaMetrics(); + + results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10)); + + results.updateKafkaMetrics(); + assertThat(testContainer.perWorkerHistograms.size(), equalTo(0)); + } +} diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetricsTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetricsTest.java new file mode 100644 index 000000000000..625a75c5316b --- /dev/null +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetricsTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import org.apache.beam.sdk.metrics.Histogram; +import org.apache.beam.sdk.metrics.MetricName; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link KafkaSinkMetrics}. */ +// TODO:Naireen - Refactor to remove duplicate code between the Kafka and BigQuery sinks +@RunWith(JUnit4.class) +public class KafkaSinkMetricsTest { + @Test + public void testCreatingHistogram() throws Exception { + + Histogram histogram = + KafkaSinkMetrics.createRPCLatencyHistogram(KafkaSinkMetrics.RpcMethod.POLL, "topic1"); + + MetricName histogramName = + MetricName.named("KafkaSink", "RpcLatency*rpc_method:POLL;topic_name:topic1;"); + assertThat(histogram.getName(), equalTo(histogramName)); + } +}