Skip to content

Commit

Permalink
Add support to record multiple values at once in Histogram metric (ap…
Browse files Browse the repository at this point in the history
…ache#30717)

* Add support to record multiple values at once in Histogram metric

* Address comments
  • Loading branch information
JayajP authored Mar 28, 2024
1 parent 2f8854a commit e894d8c
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.metrics;

import java.io.Serializable;
import java.util.Optional;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.util.HistogramData;

Expand Down Expand Up @@ -60,22 +61,31 @@ public DelegatingHistogram(
this.perWorkerHistogram = perWorkerHistogram;
}

@Override
public void update(double value) {
private Optional<Histogram> getHistogram() {
MetricsContainer container =
processWideContainer
? MetricsEnvironment.getProcessWideContainer()
: MetricsEnvironment.getCurrentContainer();
if (container == null) {
return;
return Optional.empty();
}
if (perWorkerHistogram) {
container.getPerWorkerHistogram(name, bucketType).update(value);
return Optional.of(container.getPerWorkerHistogram(name, bucketType));
} else {
container.getHistogram(name, bucketType).update(value);
return Optional.of(container.getHistogram(name, bucketType));
}
}

@Override
public void update(double value) {
getHistogram().ifPresent(histogram -> histogram.update(value));
}

@Override
public void update(double... values) {
getHistogram().ifPresent(histogram -> histogram.update(values));
}

@Override
public MetricName getName() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,11 @@
public interface Histogram extends Metric {
/** Add an observation to this histogram. */
void update(double value);

/** Add observations to this histogram. */
default void update(double... values) {
for (double value : values) {
update(value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ private NoOpHistogram() {}
@Override
public void update(double value) {}

@Override
public void update(double... value) {}

@Override
public MetricName getName() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ public void updateSuccessfulAndFailedRows(int totalRows, int failedRows) {
}
}

/** Record rpc latency histogram metrics. */
private void recordRpcLatencyMetrics() {
Histogram latencyHistogram =
BigQuerySinkMetrics.createRPCLatencyHistogram(
BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS);
double[] rpcLatencies =
rpcLatencies().stream().mapToDouble(duration -> duration.toMillis()).toArray();
latencyHistogram.update(rpcLatencies);
}

/**
* Export all metrics recorded in this instance to the underlying {@code perWorkerMetrics}
* containers. This function will only report metrics once per instance. Subsequent calls to
Expand Down Expand Up @@ -223,12 +233,7 @@ public void updateStreamingInsertsMetrics(@Nullable TableReference tableRef) {
.inc(successfulRowsCount().longValue());
}

Histogram latencyHistogram =
BigQuerySinkMetrics.createRPCLatencyHistogram(
BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS);
for (Duration latency : rpcLatencies()) {
latencyHistogram.update(latency.toMillis());
}
recordRpcLatencyMetrics();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testNoOpStreamingInsertsMetrics() throws Exception {

StreamingInsertsMetrics results =
StreamingInsertsMetrics.NoOpStreamingInsertsMetrics.getInstance();
results.updateRetriedRowsWithStatus("INTERNAL", 0);
results.updateRetriedRowsWithStatus("INTERNAL", 10);
Instant t1 = Instant.now();
results.updateSuccessfulRpcMetrics(t1, t1.plus(Duration.ofMillis(10)));
TableReference ref = new TableReference().setTableId("t").setDatasetId("d");
Expand Down

0 comments on commit e894d8c

Please sign in to comment.