From 7ba28695e4025148e2fa40c30f5d9294e23f94cc Mon Sep 17 00:00:00 2001 From: Naireen Date: Thu, 7 Nov 2024 19:58:09 +0000 Subject: [PATCH] Add per worker histogram to portable runner --- .../beam/model/pipeline/v1/metrics.proto | 34 +++++ .../core/metrics/DefaultMetricResults.java | 11 +- .../runners/core/metrics/HistogramCell.java | 6 + .../runners/core/metrics/MetricUpdates.java | 17 ++- .../core/metrics/MetricsContainerImpl.java | 134 +++++++++++++++++- .../core/metrics/MetricsContainerStepMap.java | 9 ++ .../core/metrics/MonitoringInfoConstants.java | 11 +- .../metrics/SimpleMonitoringInfoBuilder.java | 13 ++ .../core/metrics/HistogramCellTest.java | 9 ++ .../metrics/MetricsContainerImplTest.java | 23 +++ .../runners/direct/DirectMetricsTest.java | 13 +- .../apache/beam/sdk/metrics/Histogram.java | 5 + .../beam/sdk/metrics/MetricQueryResults.java | 18 ++- .../apache/beam/sdk/util/HistogramData.java | 11 ++ 14 files changed, 296 insertions(+), 18 deletions(-) diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto index 4ec189e4637f..cc3144e5be22 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto @@ -367,6 +367,36 @@ message MonitoringInfoSpecs { } ] }]; + + + USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = { + urn: "beam:metric:user:per_worker_histogram_int64:v1", + type: "beam:metrics:per_worker_histogram_int64:v1", + required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"], + annotations: [{ + key: "description", + value: "URN utilized to report user metric." + }] + }]; + + // Represents per worker metrics + PER_WORKER_LATENCY_METRIC= 23 [(monitoring_info_spec) = { + urn: "beam:metrics:per_worker_metric:v1", + type: "beam:metrics:per_worker_histogram_int64:v1", + required_labels: [ + "PTRANSFORM" + ], + annotations: [ + { + key: "description", + value: "Histogram counts for request latencies made to IO service APIs to batch read or write elements." + }, + { + key: "units", + value: "Milliseconds" + } + ] + }]; } } @@ -576,6 +606,10 @@ message MonitoringInfoTypeUrns { SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:metrics:set_string:v1"]; + PER_WORKER_HISTOGRAM = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:metrics:per_worker_histogram_int64:v1"]; + + // General monitored state information which contains structured information // which does not fit into a typical metric format. See MonitoringTableData // for more details. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java index ea8a333d397b..f45dd154eb9e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.metrics.StringSetResult; +import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.checkerframework.checker.nullness.qual.Nullable; @@ -42,16 +43,19 @@ public class DefaultMetricResults extends MetricResults { private final Iterable> distributions; private final Iterable> gauges; private final Iterable> stringSets; + private final Iterable> perWorkerHistograms; public DefaultMetricResults( Iterable> counters, Iterable> distributions, Iterable> gauges, - Iterable> stringSets) { + Iterable> stringSets, + Iterable> perWorkerHistograms) { this.counters = counters; this.distributions = distributions; this.gauges = gauges; this.stringSets = stringSets; + this.perWorkerHistograms = perWorkerHistograms; } @Override @@ -62,6 +66,9 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) { distributions, distribution -> MetricFiltering.matches(filter, distribution.getKey())), Iterables.filter(gauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())), Iterables.filter( - stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey()))); + stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())), + Iterables.filter( + perWorkerHistograms, + perWorkerHistogram -> MetricFiltering.matches(filter, perWorkerHistogram.getKey()))); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java index 2a594401754c..63a9633997a6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java @@ -70,6 +70,12 @@ public void update(HistogramCell other) { dirty.afterModification(); } + @Override + public void update(HistogramData data) { + this.value.update(data); + dirty.afterModification(); + } + // TODO(https://github.com/apache/beam/issues/20853): Update this function to allow incrementing // the infinite buckets as well. // and remove the incTopBucketCount and incBotBucketCount methods. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java index ada5bda4df4a..7e876810c63a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Collections; import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; /** Representation of multiple metric updates. */ @@ -34,6 +35,7 @@ public abstract class MetricUpdates { Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), + Collections.emptyList(), Collections.emptyList()); /** @@ -66,14 +68,22 @@ public static MetricUpdate create(MetricKey key, T update) { /** All the sets updates. */ public abstract Iterable> stringSetUpdates(); + /** All the histogram updates. */ + public abstract Iterable> perWorkerHistogramsUpdates(); + /** Create a new {@link MetricUpdates} bundle. */ public static MetricUpdates create( Iterable> counterUpdates, Iterable> distributionUpdates, Iterable> gaugeUpdates, - Iterable> stringSetUpdates) { + Iterable> stringSetUpdates, + Iterable> perWorkerHistogramsUpdates) { return new AutoValue_MetricUpdates( - counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates); + counterUpdates, + distributionUpdates, + gaugeUpdates, + stringSetUpdates, + perWorkerHistogramsUpdates); } /** Returns true if there are no updates in this MetricUpdates object. */ @@ -81,6 +91,7 @@ public boolean isEmpty() { return Iterables.isEmpty(counterUpdates()) && Iterables.isEmpty(distributionUpdates()) && Iterables.isEmpty(gaugeUpdates()) - && Iterables.isEmpty(stringSetUpdates()); + && Iterables.isEmpty(stringSetUpdates()) + && Iterables.isEmpty(perWorkerHistogramsUpdates()); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index 99cf98508505..551550012bf5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -19,15 +19,18 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE; +import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Histogram; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; @@ -44,6 +47,7 @@ import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo; import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate; import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.Metric; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; @@ -90,6 +94,9 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer { private MetricsMap stringSets = new MetricsMap<>(StringSetCell::new); + private MetricsMap, HistogramCell> perWorkerHistograms = + new MetricsMap<>(HistogramCell::new); + private MetricsMap, HistogramCell> histograms = new MetricsMap<>(HistogramCell::new); @@ -216,6 +223,22 @@ public StringSetCell getStringSet(MetricName metricName) { return stringSets.tryGet(metricName); } + /** + * Return the {@link Histogram} that should be used for implementing the given per-worker {@code + * metricName} in this container. + */ + @Override + public HistogramCell getPerWorkerHistogram( + MetricName metricName, HistogramData.BucketType bucketType) { + HistogramCell val = perWorkerHistograms.get(KV.of(metricName, bucketType)); + return val; + } + + public MetricsMap, HistogramCell> + getPerWorkerHistogram() { + return perWorkerHistograms; + } + private > ImmutableList> extractUpdates(MetricsMap cells) { ImmutableList.Builder> updates = ImmutableList.builder(); @@ -229,6 +252,22 @@ ImmutableList> extractUpdates(MetricsMap> + ImmutableList> extractHistogramUpdates( + MetricsMap, CellT> cells) { + ImmutableList.Builder> updates = ImmutableList.builder(); + cells.forEach( + (key, value) -> { + if (value.getDirty().beforeCommit()) { + updates.add( + MetricUpdate.create( + MetricKey.create(stepName, key.getKey()), value.getCumulative())); + } + }); + return updates.build(); + } + /** * Return the cumulative values for any metrics that have changed since the last time updates were * committed. @@ -238,7 +277,8 @@ public MetricUpdates getUpdates() { extractUpdates(counters), extractUpdates(distributions), extractUpdates(gauges), - extractUpdates(stringSets)); + extractUpdates(stringSets), + extractHistogramUpdates(perWorkerHistograms)); } /** @return The MonitoringInfo metadata from the metric. */ @@ -271,6 +311,20 @@ public MetricUpdates getUpdates() { return builder; } + /** + * @param metricUpdate + * @return The MonitoringInfo generated from the histogram metricUpdate. + */ + private @Nullable MonitoringInfo histogramUpdateToMonitoringInfo( + MetricUpdate metricUpdate) { + SimpleMonitoringInfoBuilder builder = histogramToMonitoringMetadata(metricUpdate.getKey()); + if (builder == null) { + return null; + } + builder.setInt64HistogramValue(metricUpdate.getUpdate()); + return builder.build(); + } + /** @return The MonitoringInfo metadata from the counter metric. */ private @Nullable SimpleMonitoringInfoBuilder counterToMonitoringMetadata(MetricKey metricKey) { return metricToMonitoringMetadata( @@ -342,6 +396,14 @@ public MetricUpdates getUpdates() { MonitoringInfoConstants.Urns.USER_SET_STRING); } + /** @return The MonitoringInfo metadata from the histogram metric. */ + private @Nullable SimpleMonitoringInfoBuilder histogramToMonitoringMetadata(MetricKey metricKey) { + return metricToMonitoringMetadata( + metricKey, + MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE, + MonitoringInfoConstants.Urns.PER_WORKER_LATENCY_METRIC); + } + /** * @param metricUpdate * @return The MonitoringInfo generated from the string set metricUpdate. @@ -390,6 +452,14 @@ public Iterable getMonitoringInfos() { monitoringInfos.add(mi); } } + + for (MetricUpdate metricUpdate : metricUpdates.perWorkerHistogramsUpdates()) { + MonitoringInfo mi = histogramUpdateToMonitoringInfo(metricUpdate); + if (mi != null) { + monitoringInfos.add(mi); + } + } + return monitoringInfos; } @@ -432,6 +502,16 @@ public Map getMonitoringData(ShortIdMap shortIds) { } } }); + perWorkerHistograms.forEach( + (metricName, histogramCell) -> { + if (histogramCell.getDirty().beforeCommit()) { + String shortId = + getShortId(metricName.getKey(), this::histogramToMonitoringMetadata, shortIds); + if (shortId != null) { + builder.put(shortId, encodeInt64Histogram(histogramCell.getCumulative())); + } + } + }); return builder.build(); } @@ -467,6 +547,10 @@ public void commitUpdates() { distributions.forEachValue(distribution -> distribution.getDirty().afterCommit()); gauges.forEachValue(gauge -> gauge.getDirty().afterCommit()); stringSets.forEachValue(sSets -> sSets.getDirty().afterCommit()); + perWorkerHistograms.forEachValue( + histogram -> { + histogram.getDirty().afterCommit(); + }); } private > @@ -480,6 +564,18 @@ ImmutableList> extractCumulatives(MetricsMap> + ImmutableList> extractHistogramCumulatives( + MetricsMap, CellT> cells) { + ImmutableList.Builder> updates = ImmutableList.builder(); + cells.forEach( + (key, value) -> { + UpdateT update = checkNotNull(value.getCumulative()); + updates.add(MetricUpdate.create(MetricKey.create(stepName, key.getKey()), update)); + }); + return updates.build(); + } + /** * Return the {@link MetricUpdates} representing the cumulative values of all metrics in this * container. @@ -489,7 +585,8 @@ public MetricUpdates getCumulative() { extractCumulatives(counters), extractCumulatives(distributions), extractCumulatives(gauges), - extractCumulatives(stringSets)); + extractCumulatives(stringSets), + extractHistogramCumulatives(perWorkerHistograms)); } /** Update values of this {@link MetricsContainerImpl} by merging the value of another cell. */ @@ -510,7 +607,6 @@ private void updateForSumInt64Type(MonitoringInfo monitoringInfo) { private void updateForDistributionInt64Type(MonitoringInfo monitoringInfo) { MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo); Distribution distribution = getDistribution(metricName); - DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload()); distribution.update(data.sum(), data.count(), data.min(), data.max()); } @@ -527,6 +623,14 @@ private void updateForStringSetType(MonitoringInfo monitoringInfo) { stringSet.update(decodeStringSet(monitoringInfo.getPayload())); } + private void updateForPerWorkerHistogramInt64(MonitoringInfo monitoringInfo) { + MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo); + HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17); + Histogram histogram = getPerWorkerHistogram(metricName, buckets); + HistogramData data = decodeInt64Histogram(monitoringInfo.getPayload()); + histogram.update(data); + } + /** Update values of this {@link MetricsContainerImpl} by reading from {@code monitoringInfos}. */ public void update(Iterable monitoringInfos) { for (MonitoringInfo monitoringInfo : monitoringInfos) { @@ -551,6 +655,9 @@ public void update(Iterable monitoringInfos) { updateForStringSetType(monitoringInfo); break; + case PER_WORKER_HISTOGRAM_TYPE: + updateForPerWorkerHistogramInt64(monitoringInfo); // use type, and not urn info + break; default: LOG.warn("Unsupported metric type {}", monitoringInfo.getType()); } @@ -593,14 +700,15 @@ public boolean equals(@Nullable Object object) { && Objects.equals(counters, metricsContainerImpl.counters) && Objects.equals(distributions, metricsContainerImpl.distributions) && Objects.equals(gauges, metricsContainerImpl.gauges) - && Objects.equals(stringSets, metricsContainerImpl.stringSets); + && Objects.equals(stringSets, metricsContainerImpl.stringSets) + && Objects.equals(perWorkerHistograms, metricsContainerImpl.perWorkerHistograms); } return false; } @Override public int hashCode() { - return Objects.hash(stepName, counters, distributions, gauges, stringSets); + return Objects.hash(stepName, counters, distributions, gauges, stringSets, perWorkerHistograms); } /** @@ -722,6 +830,22 @@ public static MetricsContainerImpl deltaContainer( deltaValueCell.incTopBucketCount( currValue.getTopBucketCount() - prevValue.getTopBucketCount()); } + for (Map.Entry, HistogramCell> cell : + curr.perWorkerHistograms.entries()) { + HistogramData.BucketType bt = cell.getKey().getValue(); + HistogramData prevValue = prev.perWorkerHistograms.get(cell.getKey()).getCumulative(); + HistogramData currValue = cell.getValue().getCumulative(); + HistogramCell deltaValueCell = deltaContainer.perWorkerHistograms.get(cell.getKey()); + deltaValueCell.incBottomBucketCount( + currValue.getBottomBucketCount() - prevValue.getBottomBucketCount()); + for (int i = 0; i < bt.getNumBuckets(); i++) { + Long bucketCountDelta = currValue.getCount(i) - prevValue.getCount(i); + deltaValueCell.incBucketCount(i, bucketCountDelta); + } + deltaValueCell.incTopBucketCount( + currValue.getTopBucketCount() - prevValue.getTopBucketCount()); + } + for (Map.Entry cell : curr.stringSets.entries()) { // Simply take the most recent value for stringSets, no need to count deltas. deltaContainer.stringSets.get(cell.getKey()).update(cell.getValue().getCumulative()); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java index 688491184e67..1ece1bb901d7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java @@ -39,6 +39,7 @@ import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.util.JsonFormat; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; +import org.apache.beam.sdk.util.HistogramData; /** * Metrics containers by step. @@ -137,6 +138,7 @@ public static MetricResults asMetricResults( Map> distributions = new HashMap<>(); Map> gauges = new HashMap<>(); Map> sets = new HashMap<>(); + Map> perWorkerHistograms = new HashMap<>(); attemptedMetricsContainers.forEachMetricContainer( container -> { @@ -146,6 +148,8 @@ public static MetricResults asMetricResults( distributions, cumulative.distributionUpdates(), DistributionData::combine); mergeAttemptedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine); mergeAttemptedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine); + mergeAttemptedResults( + perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine); }); committedMetricsContainers.forEachMetricContainer( container -> { @@ -155,6 +159,8 @@ public static MetricResults asMetricResults( distributions, cumulative.distributionUpdates(), DistributionData::combine); mergeCommittedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine); mergeCommittedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine); + mergeCommittedResults( + perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine); }); return new DefaultMetricResults( @@ -167,6 +173,9 @@ public static MetricResults asMetricResults( .collect(toList()), sets.values().stream() .map(result -> result.transform(StringSetData::extractResult)) + .collect(toList()), + perWorkerHistograms.values().stream() + .map(result -> result.transform(HistogramData::extractResult)) .collect(toList())); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java index 2bb935111d38..208984543eae 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java @@ -54,6 +54,8 @@ public static final class Urns { extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_DOUBLE); public static final String USER_SET_STRING = extractUrn(MonitoringInfoSpecs.Enum.USER_SET_STRING); + public static final String USER_PER_WORKER_HISTOGRAM = + extractUrn(MonitoringInfoSpecs.Enum.USER_PER_WORKER_HISTOGRAM); public static final String SAMPLED_BYTE_SIZE = extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE); public static final String WORK_COMPLETED = extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED); @@ -64,7 +66,8 @@ public static final class Urns { extractUrn(MonitoringInfoSpecs.Enum.API_REQUEST_COUNT); public static final String API_REQUEST_LATENCIES = extractUrn(MonitoringInfoSpecs.Enum.API_REQUEST_LATENCIES); - + public static final String PER_WORKER_LATENCY_METRIC = + extractUrn(MonitoringInfoSpecs.Enum.PER_WORKER_LATENCY_METRIC); static { // Validate that compile time constants match the values stored in the protos. // Defining these as constants allows for usage in switch case statements and also @@ -165,7 +168,8 @@ public static final class TypeUrns { public static final String BOTTOM_N_DOUBLE_TYPE = "beam:metrics:bottom_n_double:v1"; public static final String PROGRESS_TYPE = "beam:metrics:progress:v1"; public static final String SET_STRING_TYPE = "beam:metrics:set_string:v1"; - + public static final String PER_WORKER_HISTOGRAM_TYPE = + "beam:metrics:per_worker_histogram_int64:v1"; static { // Validate that compile time constants match the values stored in the protos. // Defining these as constants allows for usage in switch case statements and also @@ -191,6 +195,9 @@ public static final class TypeUrns { BOTTOM_N_DOUBLE_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.BOTTOM_N_DOUBLE_TYPE))); checkArgument(PROGRESS_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.PROGRESS_TYPE))); checkArgument(SET_STRING_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.SET_STRING_TYPE))); + checkArgument( + PER_WORKER_HISTOGRAM_TYPE.equals( + getUrn(MonitoringInfoTypeUrns.Enum.PER_WORKER_HISTOGRAM))); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java index e0f5092e6b1f..fe7edc018cae 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/SimpleMonitoringInfoBuilder.java @@ -24,6 +24,7 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import java.util.HashMap; @@ -32,6 +33,7 @@ import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpec; import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfoSpecs; import org.checkerframework.checker.nullness.qual.Nullable; +import org.apache.beam.sdk.util.HistogramData; /** * Simplified building of MonitoringInfo fields, allows setting one field at a time with simpler @@ -159,6 +161,17 @@ public SimpleMonitoringInfoBuilder setStringSetValue(StringSetData value) { return this; } + /** + * Encodes the value and sets the type to {@link + * MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM_TYPE}. + */ + public SimpleMonitoringInfoBuilder setInt64HistogramValue(HistogramData data) { + this.builder.setPayload(encodeInt64Histogram(data)); + this.builder.setType(MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE); + return this; + } + + /** Sets the MonitoringInfo label to the given name and value. */ public SimpleMonitoringInfoBuilder setLabel(String labelName, String labelValue) { this.builder.putLabels(labelName, labelValue); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/HistogramCellTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/HistogramCellTest.java index 3f25d6810217..a5a4accfba75 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/HistogramCellTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/HistogramCellTest.java @@ -106,4 +106,13 @@ public void testReset() { assertThat(cell.getCumulative(), equalTo(HistogramData.linear(0, 10, 100))); assertThat(cell.getDirty(), equalTo(new DirtyState())); } + + @Test + public void testUpdateWithHistogramData() { + HistogramCell cell = new HistogramCell(KV.of(MetricName.named("hello", "world"), bucketType)); + HistogramData data = HistogramData.linear(0, 10, 100); + data.record(5, 7, 9); + cell.update(data); + assertThat(cell.getCumulative(), equalTo(data)); + } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java index 5b3d71f4873e..a74f42467837 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java @@ -372,6 +372,7 @@ public void testDeltaCounters() { HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 2, 5); MetricName hName = MetricName.named("namespace", "histogram"); MetricName stringSetName = MetricName.named("namespace", "stringset"); + MetricName pwhName = MetricName.named("namespace", "perWorkerHistogram"); MetricsContainerImpl prevContainer = new MetricsContainerImpl(null); prevContainer.getCounter(cName).inc(2L); @@ -383,6 +384,10 @@ public void testDeltaCounters() { prevContainer.getHistogram(hName, bucketType).update(3); prevContainer.getHistogram(hName, bucketType).update(20); + // Set PerWorkerBucketCounts to [0,1,1,0,0,0,0] + prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(1); + prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(3); + MetricsContainerImpl nextContainer = new MetricsContainerImpl(null); nextContainer.getCounter(cName).inc(9L); nextContainer.getGauge(gName).set(8L); @@ -401,6 +406,10 @@ public void testDeltaCounters() { nextContainer.getHistogram(hName, bucketType).update(20); nextContainer.getHistogram(hName, bucketType).update(20); + // Set PerWorkerBucketCounts to [1,0,0,0,0,0,1] + nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(-1); + nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(20); + MetricsContainerImpl deltaContainer = MetricsContainerImpl.deltaContainer(prevContainer, nextContainer); // Expect counter value: 7 = 9 - 2 @@ -426,6 +435,20 @@ public void testDeltaCounters() { } assertEquals( 2, deltaContainer.getHistogram(hName, bucketType).getCumulative().getTopBucketCount()); + + // Expect per worker bucket counts: [1,0,0,0,0,0,1] + assertEquals( + 1, + deltaContainer + .getPerWorkerHistogram(pwhName, bucketType) + .getCumulative() + .getBottomBucketCount()); + assertEquals( + 1, + deltaContainer + .getPerWorkerHistogram(pwhName, bucketType) + .getCumulative() + .getTopBucketCount()); } @Test diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java index 00df20c4ac39..2044850ce277 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java @@ -92,8 +92,8 @@ public void testApplyCommittedNoFilter() { ImmutableList.of( MetricUpdate.create( MetricKey.create("step1", NAME4), - StringSetData.create(ImmutableSet.of("ab")))))); - metrics.commitLogical( + MetricKey.create("step1", NAME4), StringSetData.create(ImmutableSet.of("ab")))), + ImmutableList.of())); metrics.commitLogical( bundle1, MetricUpdates.create( ImmutableList.of( @@ -106,9 +106,8 @@ public void testApplyCommittedNoFilter() { MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(27L))), ImmutableList.of( MetricUpdate.create( - MetricKey.create("step1", NAME4), - StringSetData.create(ImmutableSet.of("cd")))))); - + MetricKey.create("step1", NAME4), StringSetData.create(ImmutableSet.of("cd")))), + ImmutableList.of())); MetricQueryResults results = metrics.allMetrics(); assertThat( results.getCounters(), @@ -157,6 +156,7 @@ public void testApplyAttemptedCountersQueryOneNamespace() { MetricUpdate.create(MetricKey.create("step1", NAME3), 8L)), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())); metrics.updatePhysical( bundle1, @@ -166,6 +166,7 @@ public void testApplyAttemptedCountersQueryOneNamespace() { MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())); MetricQueryResults results = @@ -195,6 +196,7 @@ public void testApplyAttemptedQueryCompositeScope() { MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())); metrics.updatePhysical( bundle1, @@ -204,6 +206,7 @@ public void testApplyAttemptedQueryCompositeScope() { MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)), ImmutableList.of(), ImmutableList.of(), + ImmutableList.of(), ImmutableList.of())); MetricQueryResults results = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Histogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Histogram.java index a37625ed05d6..943e00dc11da 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Histogram.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Histogram.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.metrics; +import org.apache.beam.sdk.util.HistogramData; + /** A metric that reports information about the histogram of reported values. */ public interface Histogram extends Metric { /** Add an observation to this histogram. */ @@ -28,4 +30,7 @@ default void update(double... values) { update(value); } } + + /** Add a histogram to this histogram. Requires underlying implementation to implement this */ + default void update(HistogramData data) {} } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java index 9f60ce3d6c07..5e2605b2dc70 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java @@ -18,7 +18,9 @@ package org.apache.beam.sdk.metrics; import com.google.auto.value.AutoValue; +import java.util.Collections; import java.util.List; +import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; /** The results of a query for metrics. Allows accessing all the metrics that matched the filter. */ @@ -36,6 +38,9 @@ public abstract class MetricQueryResults { /** Return the metric results for the sets that matched the filter. */ public abstract Iterable> getStringSets(); + /** Return the metric results for the sets that matched the filter. */ + public abstract Iterable> getPerWorkerHistograms(); + static void printMetrics(String type, Iterable> metrics, StringBuilder sb) { List> metricsList = ImmutableList.copyOf(metrics); if (!metricsList.isEmpty()) { @@ -74,6 +79,17 @@ public static MetricQueryResults create( Iterable> distributions, Iterable> gauges, Iterable> stringSets) { - return new AutoValue_MetricQueryResults(counters, distributions, gauges, stringSets); + return new AutoValue_MetricQueryResults( + counters, distributions, gauges, stringSets, Collections.emptyList()); + } + + public static MetricQueryResults create( + Iterable> counters, + Iterable> distributions, + Iterable> gauges, + Iterable> stringSets, + Iterable> histogramData) { + return new AutoValue_MetricQueryResults( + counters, distributions, gauges, stringSets, histogramData); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java index 516425108e68..27bb05cd2fa1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java @@ -304,6 +304,17 @@ public synchronized long getTotalCount() { return numBoundedBucketRecords + numTopRecords + numBottomRecords; } + public HistogramData extractResult() { + HistogramData other = new HistogramData(this.getBucketType()); + other.update(this); + return other; + } + + public HistogramData combine(HistogramData value) { + this.update(value); + return this; + } + public synchronized String getPercentileString(String elemType, String unit) { return String.format( "Total number of %s: %s, P99: %.0f %s, P90: %.0f %s, P50: %.0f %s",