Skip to content

Commit

Permalink
Add kafka poll latency metrics
Browse files Browse the repository at this point in the history
Address Sam's comments

add UW support
  • Loading branch information
Naireen committed Oct 30, 2024
1 parent 24a0447 commit 210b7f0
Show file tree
Hide file tree
Showing 36 changed files with 2,696 additions and 1,832 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,16 @@ message MonitoringInfoSpecs {
}]
}];

USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = {
urn: "beam:metric:user:per_worker_histogram_int64:v1",
type: "beam:metrics:per_worker_histogram_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
}]
}];

// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down Expand Up @@ -342,6 +352,7 @@ message MonitoringInfoSpecs {
]
}];

//import com.google.api.services.dataflow.model.PerWorkerMetrics;
API_REQUEST_LATENCIES = 20 [(monitoring_info_spec) = {
urn: "beam:metric:io:api_request_latencies:v1",
type: "beam:metrics:histogram_int64:v1",
Expand Down Expand Up @@ -576,6 +587,12 @@ message MonitoringInfoTypeUrns {
SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:set_string:v1"];

// Encoding: <iter><value1><value2>...<valueN></iter>
// - iter: beam:coder:iterable:v1
// - valueX: beam:coder:stringutf8:v1
PER_WORKER_HISTOGRAM = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:per_worker_histogram_int64:v1"];

// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down
10 changes: 10 additions & 0 deletions runners/core-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ test {
}
}

// def google_api_services_dataflow = library.java.google_api_services_dataflow

dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
Expand All @@ -48,9 +50,17 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.jackson_core
implementation library.java.jackson_databind
// implementation library.java.proto_google_common_protos
implementation library.java.google_cloud_dataflow_java_proto_library_all

testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.slf4j_api
testRuntimeOnly library.java.slf4j_simple
provided(library.java.google_api_services_dataflow)
provided library.java.google_cloud_dataflow_java_proto_library_all
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation(library.java.google_api_services_dataflow)
implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill", configuration: "shadow") // need histogram proto
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Default implementation of {@link org.apache.beam.sdk.metrics.MetricResults}, which takes static
Expand All @@ -37,21 +40,26 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class DefaultMetricResults extends MetricResults {
private static final Logger LOG = LoggerFactory.getLogger(DefaultMetricResults.class);

private final Iterable<MetricResult<Long>> counters;
private final Iterable<MetricResult<DistributionResult>> distributions;
private final Iterable<MetricResult<GaugeResult>> gauges;
private final Iterable<MetricResult<StringSetResult>> stringSets;
private final Iterable<MetricResult<HistogramData>> perWorkerHistograms;

public DefaultMetricResults(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets) {
Iterable<MetricResult<StringSetResult>> stringSets,
Iterable<MetricResult<HistogramData>> perWorkerHistograms) {
LOG.info("xxx does this get here? DefaultMetricResults ");
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
this.perWorkerHistograms = perWorkerHistograms;
}

@Override
Expand All @@ -62,6 +70,9 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
distributions, distribution -> MetricFiltering.matches(filter, distribution.getKey())),
Iterables.filter(gauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())),
Iterables.filter(
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())));
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())),
Iterables.filter(
perWorkerHistograms,
perWorkerHistogram -> MetricFiltering.matches(filter, perWorkerHistogram.getKey())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public void update(double value) {
dirty.afterModification();
}

/** Update it by another Histogram Data. */
@Override
public void update(HistogramData data) {
this.value.update(data);
dirty.afterModification();
}

/**
* Increment all of the bucket counts in this histogram, by the bucket counts specified in other.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,13 @@ public static Histogram histogram(
boolean processWideContainer) {
return new DelegatingHistogram(metricName, bucketType, processWideContainer);
}

public static Histogram histogram(
MonitoringInfoMetricName metricName,
HistogramData.BucketType bucketType,
boolean processWideContainer,
boolean perWorkerHistogram) {
return new DelegatingHistogram(
metricName, bucketType, processWideContainer, perWorkerHistogram);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;

import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.memoized.Memoized;
import java.io.Serializable;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.ImmutableLongArray;

/**
* A lock free implementation of {@link org.apache.beam.sdk.metrics.Histogram}. This class supports
* extracting delta updates with the {@link #getSnapshotAndReset} method.
*/
@ThreadSafe
@Internal
public final class LockFreeHistogram implements Histogram {
private final HistogramData.BucketType bucketType;
private final AtomicLongArray buckets;
private final MetricName name;
private final AtomicReference<OutlierStatistic> underflowStatistic;
private final AtomicReference<OutlierStatistic> overflowStatistic;

/**
* Whether this histogram has updates that have not been extracted by {@code getSnapshotAndReset}.
* This values should be flipped to true AFTER recording a value, and flipped to false BEFORE
* extracting a snapshot. This ensures that recorded values will always be seen by a future {@code
* getSnapshotAndReset} call.
*/
private final AtomicBoolean dirty;

/** Create a histogram. */
public LockFreeHistogram(MetricName name, HistogramData.BucketType bucketType) {
this.name = name;
this.bucketType = bucketType;
this.buckets = new AtomicLongArray(bucketType.getNumBuckets());
this.underflowStatistic =
new AtomicReference<LockFreeHistogram.OutlierStatistic>(OutlierStatistic.EMPTY);
this.overflowStatistic =
new AtomicReference<LockFreeHistogram.OutlierStatistic>(OutlierStatistic.EMPTY);
this.dirty = new AtomicBoolean(false);
}

/**
* Represents the sum and mean of a collection of numbers. Used to represent the
* underflow/overflow statistics of a histogram.
*/
@AutoValue
public abstract static class OutlierStatistic implements Serializable {
abstract double sum();

public abstract long count();

public static final OutlierStatistic EMPTY = create(0, 0);

public static OutlierStatistic create(double sum, long count) {
return new AutoValue_LockFreeHistogram_OutlierStatistic(sum, count);
}

public OutlierStatistic combine(double value) {
return create(sum() + value, count() + 1);
}

public double mean() {
if (count() == 0) {
return 0;
}
return sum() / count();
}
}

/**
* The snapshot of a histogram. The snapshot contains the overflow/underflow statistic, number of
* values recorded in each bucket, and the BucketType of the underlying histogram.
*/
@AutoValue
public abstract static class Snapshot {
public abstract OutlierStatistic underflowStatistic();

public abstract OutlierStatistic overflowStatistic();

public abstract ImmutableLongArray buckets();

public abstract HistogramData.BucketType bucketType();

public static Snapshot create(
OutlierStatistic underflowStatistic,
OutlierStatistic overflowStatistic,
ImmutableLongArray buckets,
HistogramData.BucketType bucketType) {
return new AutoValue_LockFreeHistogram_Snapshot(
underflowStatistic, overflowStatistic, buckets, bucketType);
}

@Memoized
public long totalCount() {
long count = 0;
count += underflowStatistic().count();
count += overflowStatistic().count();
count += buckets().stream().sum();

return count;
}
}

/**
* Extract a delta update of this histogram. Update represents values that have been recorded in
* this histogram since the last time this method was called.
*
* <p>If this histogram is being updated concurrent to this method, then the returned snapshot is
* not guarenteed to contain those updates. However, those updates are not dropped and will be
* represented in a future call to this method.
*
* <p>If this histogram has not been updated since the last call to this method, an empty optional
* is returned.
*/
public Optional<Snapshot> getSnapshotAndReset() {
if (!dirty.getAndSet(false)) {
return Optional.empty();
}

ImmutableLongArray.Builder bucketsSnapshotBuilder =
ImmutableLongArray.builder(buckets.length());
for (int i = 0; i < buckets.length(); i++) {
bucketsSnapshotBuilder.add(buckets.getAndSet(i, 0));
}
OutlierStatistic overflowSnapshot = overflowStatistic.getAndSet(OutlierStatistic.EMPTY);
OutlierStatistic underflowSnapshot = underflowStatistic.getAndSet(OutlierStatistic.EMPTY);

return Optional.of(
Snapshot.create(
underflowSnapshot, overflowSnapshot, bucketsSnapshotBuilder.build(), bucketType));
}

@Override
public MetricName getName() {
return name;
}

private void updateInternal(double value) {
double rangeTo = bucketType.getRangeTo();
double rangeFrom = bucketType.getRangeFrom();
if (value >= rangeTo) {
recordTopRecordsValue(value);
} else if (value < rangeFrom) {
recordBottomRecordsValue(value);
} else {
recordInBoundsValue(value);
}
}

@Override
public void update(double value) {
updateInternal(value);
dirty.set(true);
}

@Override
public void update(double... values) {
for (double value : values) {
updateInternal(value);
}
dirty.set(true);
}

/** Record a inbounds value to the appropriate bucket. */
private void recordInBoundsValue(double value) {
int index = bucketType.getBucketIndex(value);
if (index < 0 || index >= bucketType.getNumBuckets()) {
return;
}

buckets.getAndIncrement(index);
}

/**
* Record a new value in {@code overflowStatistic}. This method should only be called when a
* Histogram is recording a value greater than the upper bound of it's largest bucket.
*
* @param value
*/
private void recordTopRecordsValue(double value) {
OutlierStatistic original;
do {
original = overflowStatistic.get();
} while (!overflowStatistic.compareAndSet(original, original.combine(value)));
}

/**
* Record a new value in {@code underflowStatistic}. This method should only be called when a
* Histogram is recording a value smaller than the lowerbound bound of it's smallest bucket.
*/
private void recordBottomRecordsValue(double value) {
OutlierStatistic original;
do {
original = underflowStatistic.get();
} while (!underflowStatistic.compareAndSet(original, original.combine(value)));
}
}
Loading

0 comments on commit 210b7f0

Please sign in to comment.