Skip to content

Commit

Permalink
Kafka metrics (#32402)
Browse files Browse the repository at this point in the history
* Add kafka poll latency metrics

* Address Sam's comments

[Dataflow Streaming] Use isolated windmill streams based on job settings (#32503)

* Add kafka poll latency metrics

* address comments

* Ensure this is disabled for now until flag to enable it is explicitly
passed

---------

Co-authored-by: Naireen <[email protected]>
  • Loading branch information
Naireen and Naireen authored Oct 23, 2024
1 parent 68b600d commit 0ee13b2
Show file tree
Hide file tree
Showing 10 changed files with 476 additions and 7 deletions.
1 change: 1 addition & 0 deletions runners/google-cloud-dataflow-java/worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def sdk_provided_project_dependencies = [
":runners:google-cloud-dataflow-java",
":sdks:java:extensions:avro",
":sdks:java:extensions:google-cloud-platform-core",
":sdks:java:io:kafka", // For metric propagation into worker
":sdks:java:io:google-cloud-platform",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import java.util.Map.Entry;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
import org.apache.beam.sdk.metrics.LabeledMetricNameUtils;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;

/**
* Converts metric updates to {@link PerStepNamespaceMetrics} protos. Currently we only support
* converting metrics from {@link BigQuerySinkMetrics} with this converter.
* converting metrics from {@link BigQuerySinkMetrics} and from {@link KafkaSinkMetrics} with this
* converter.
*/
public class MetricsToPerStepNamespaceMetricsConverter {

Expand All @@ -65,7 +67,10 @@ private static Optional<MetricValue> convertCounterToMetricValue(
MetricName metricName,
Long value,
Map<MetricName, LabeledMetricNameUtils.ParsedMetricName> parsedPerWorkerMetricsCache) {
if (value == 0 || !metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE)) {

if (value == 0
|| (!metricName.getNamespace().equals(BigQuerySinkMetrics.METRICS_NAMESPACE)
&& !metricName.getNamespace().equals(KafkaSinkMetrics.METRICS_NAMESPACE))) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.apache.beam.sdk.fn.JvmInitializers;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -663,6 +664,10 @@ public static void main(String[] args) throws Exception {
enableBigQueryMetrics();
}

if (DataflowRunner.hasExperiment(options, "enable_kafka_metrics")) {
KafkaSinkMetrics.setSupportKafkaMetrics(true);
}

JvmInitializers.runBeforeProcessing(options);
worker.startStatusPages();
worker.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;

/** Contains a few of the stage specific fields. E.g. metrics container registry, counters etc. */
Expand Down Expand Up @@ -118,7 +119,9 @@ public List<PerStepNamespaceMetrics> extractPerWorkerMetricValues() {
private void translateKnownPerWorkerCounters(List<PerStepNamespaceMetrics> metrics) {
for (PerStepNamespaceMetrics perStepnamespaceMetrics : metrics) {
if (!BigQuerySinkMetrics.METRICS_NAMESPACE.equals(
perStepnamespaceMetrics.getMetricsNamespace())) {
perStepnamespaceMetrics.getMetricsNamespace())
&& !KafkaSinkMetrics.METRICS_NAMESPACE.equals(
perStepnamespaceMetrics.getMetricsNamespace())) {
continue;
}
for (MetricValue metric : perStepnamespaceMetrics.getMetricValues()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,6 @@ public void testExtractPerWorkerMetricUpdates_populatedMetrics() {
.setMetricsNamespace("BigQuerySink")
.setMetricValues(Collections.singletonList(expectedCounter));

// Expected histogram metric
List<Long> bucketCounts = Collections.singletonList(1L);

Linear linearOptions = new Linear().setNumberOfBuckets(10).setWidth(10.0).setStart(0.0);
Expand All @@ -393,6 +392,44 @@ public void testExtractPerWorkerMetricUpdates_populatedMetrics() {
assertThat(updates, containsInAnyOrder(histograms, counters));
}

@Test
public void testExtractPerWorkerMetricUpdatesKafka_populatedMetrics() {
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);

MetricName histogramMetricName = MetricName.named("KafkaSink", "histogram");
HistogramData.LinearBuckets linearBuckets = HistogramData.LinearBuckets.of(0, 10, 10);
c2.getPerWorkerHistogram(histogramMetricName, linearBuckets).update(5.0);

Iterable<PerStepNamespaceMetrics> updates =
StreamingStepMetricsContainer.extractPerWorkerMetricUpdates(registry);

// Expected histogram metric
List<Long> bucketCounts = Collections.singletonList(1L);

Linear linearOptions = new Linear().setNumberOfBuckets(10).setWidth(10.0).setStart(0.0);
BucketOptions bucketOptions = new BucketOptions().setLinear(linearOptions);

DataflowHistogramValue linearHistogram =
new DataflowHistogramValue()
.setCount(1L)
.setBucketOptions(bucketOptions)
.setBucketCounts(bucketCounts);

MetricValue expectedHistogram =
new MetricValue()
.setMetric("histogram")
.setMetricLabels(new HashMap<>())
.setValueHistogram(linearHistogram);

PerStepNamespaceMetrics histograms =
new PerStepNamespaceMetrics()
.setOriginalStep("s2")
.setMetricsNamespace("KafkaSink")
.setMetricValues(Collections.singletonList(expectedHistogram));

assertThat(updates, containsInAnyOrder(histograms));
}

@Test
public void testExtractPerWorkerMetricUpdates_emptyMetrics() {
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;

import com.google.auto.value.AutoValue;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Stores and exports metrics for a batch of Kafka Client RPCs. */
public interface KafkaMetrics {

void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime);

void updateKafkaMetrics();

/** No-op implementation of {@code KafkaResults}. */
class NoOpKafkaMetrics implements KafkaMetrics {
private NoOpKafkaMetrics() {}

@Override
public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {}

@Override
public void updateKafkaMetrics() {}

private static NoOpKafkaMetrics singleton = new NoOpKafkaMetrics();

static NoOpKafkaMetrics getInstance() {
return singleton;
}
}

/**
* Metrics of a batch of RPCs. Member variables are thread safe; however, this class does not have
* atomicity across member variables.
*
* <p>Expected usage: A number of threads record metrics in an instance of this class with the
* member methods. Afterwards, a single thread should call {@code updateStreamingInsertsMetrics}
* which will export all counters metrics and RPC latency distribution metrics to the underlying
* {@code perWorkerMetrics} container. Afterwards, metrics should not be written/read from this
* object.
*/
@AutoValue
abstract class KafkaMetricsImpl implements KafkaMetrics {

private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsImpl.class);

static HashMap<String, Histogram> latencyHistograms = new HashMap<String, Histogram>();

abstract HashMap<String, ConcurrentLinkedQueue<Duration>> perTopicRpcLatencies();

abstract AtomicBoolean isWritable();

public static KafkaMetricsImpl create() {
return new AutoValue_KafkaMetrics_KafkaMetricsImpl(
new HashMap<String, ConcurrentLinkedQueue<Duration>>(), new AtomicBoolean(true));
}

/** Record the rpc status and latency of a successful Kafka poll RPC call. */
@Override
public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {
if (isWritable().get()) {
ConcurrentLinkedQueue<Duration> latencies = perTopicRpcLatencies().get(topic);
if (latencies == null) {
latencies = new ConcurrentLinkedQueue<Duration>();
latencies.add(elapsedTime);
perTopicRpcLatencies().put(topic, latencies);
} else {
latencies.add(elapsedTime);
}
}
}

/** Record rpc latency histogram metrics for all recorded topics. */
private void recordRpcLatencyMetrics() {
for (Map.Entry<String, ConcurrentLinkedQueue<Duration>> topicLatencies :
perTopicRpcLatencies().entrySet()) {
Histogram topicHistogram;
if (latencyHistograms.containsKey(topicLatencies.getKey())) {
topicHistogram = latencyHistograms.get(topicLatencies.getKey());
} else {
topicHistogram =
KafkaSinkMetrics.createRPCLatencyHistogram(
KafkaSinkMetrics.RpcMethod.POLL, topicLatencies.getKey());
latencyHistograms.put(topicLatencies.getKey(), topicHistogram);
}
// update all the latencies
for (Duration d : topicLatencies.getValue()) {
Preconditions.checkArgumentNotNull(topicHistogram);
topicHistogram.update(d.toMillis());
}
}
}

/**
* Export all metrics recorded in this instance to the underlying {@code perWorkerMetrics}
* containers. This function will only report metrics once per instance. Subsequent calls to
* this function will no-op.
*/
@Override
public void updateKafkaMetrics() {
if (!isWritable().compareAndSet(true, false)) {
LOG.warn("Updating stale Kafka metrics container");
return;
}
recordRpcLatencyMetrics();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;

import org.apache.beam.sdk.metrics.DelegatingHistogram;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.LabeledMetricNameUtils;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;

/**
* Helper class to create per worker metrics for Kafka Sink stages.
*
* <p>Metrics will be in the namespace 'KafkaSink' and have their name formatted as:
*
* <p>'{baseName}-{metricLabelKey1}:{metricLabelVal1};...{metricLabelKeyN}:{metricLabelValN};' ????
*/

// TODO, refactor out common parts for BQ sink, so it can be reused with other sinks, eg, GCS?
// @SuppressWarnings("unused")
public class KafkaSinkMetrics {
private static boolean supportKafkaMetrics = false;

public static final String METRICS_NAMESPACE = "KafkaSink";

// Base Metric names
private static final String RPC_LATENCY = "RpcLatency";

// Kafka Consumer Method names
enum RpcMethod {
POLL,
}

// Metric labels
private static final String TOPIC_LABEL = "topic_name";
private static final String RPC_METHOD = "rpc_method";

/**
* Creates an Histogram metric to record RPC latency. Metric will have name.
*
* <p>'RpcLatency*rpc_method:{method};topic_name:{topic};'
*
* @param method Kafka method associated with this metric.
* @param topic Kafka topic associated with this metric.
* @return Histogram with exponential buckets with a sqrt(2) growth factor.
*/
public static Histogram createRPCLatencyHistogram(RpcMethod method, String topic) {
LabeledMetricNameUtils.MetricNameBuilder nameBuilder =
LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(RPC_LATENCY);
nameBuilder.addLabel(RPC_METHOD, method.toString());
nameBuilder.addLabel(TOPIC_LABEL, topic);

MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17);

return new DelegatingHistogram(metricName, buckets, false, true);
}

/**
* Returns a container to store metrics for Kafka metrics in Unbounded Readed. If these metrics
* are disabled, then we return a no-op container.
*/
static KafkaMetrics kafkaMetrics() {
if (supportKafkaMetrics) {
return KafkaMetrics.KafkaMetricsImpl.create();
} else {
return KafkaMetrics.NoOpKafkaMetrics.getInstance();
}
}

public static void setSupportKafkaMetrics(boolean supportKafkaMetrics) {
KafkaSinkMetrics.supportKafkaMetrics = supportKafkaMetrics;
}
}
Loading

0 comments on commit 0ee13b2

Please sign in to comment.