Skip to content

Commit

Permalink
use different proto
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Oct 28, 2024
1 parent 6887687 commit 08e31c4
Show file tree
Hide file tree
Showing 30 changed files with 1,994 additions and 2,245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ message MonitoringInfoSpecs {
]
}];

//import com.google.api.services.dataflow.model.PerWorkerMetrics;
API_REQUEST_LATENCIES = 20 [(monitoring_info_spec) = {
urn: "beam:metric:io:api_request_latencies:v1",
type: "beam:metrics:histogram_int64:v1",
Expand Down Expand Up @@ -587,9 +586,7 @@ message MonitoringInfoTypeUrns {
SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:set_string:v1"];

// Encoding: <iter><value1><value2>...<valueN></iter>
// - iter: beam:coder:iterable:v1
// - valueX: beam:coder:stringutf8:v1
// Represents histograms
PER_WORKER_HISTOGRAM = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:per_worker_histogram_int64:v1"];

Expand Down
10 changes: 1 addition & 9 deletions runners/core-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,21 @@ test {
}
}

// def google_api_services_dataflow = library.java.google_api_services_dataflow

dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
provided library.java.google_api_services_dataflow
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.joda_time
implementation library.java.vendored_grpc_1_60_1
implementation library.java.slf4j_api
implementation library.java.jackson_core
implementation library.java.jackson_databind
// implementation library.java.proto_google_common_protos
implementation library.java.google_cloud_dataflow_java_proto_library_all

testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.slf4j_api
testRuntimeOnly library.java.slf4j_simple
provided(library.java.google_api_services_dataflow)
provided library.java.google_cloud_dataflow_java_proto_library_all
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation(library.java.google_api_services_dataflow)
implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill", configuration: "shadow") // need histogram proto
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Default implementation of {@link org.apache.beam.sdk.metrics.MetricResults}, which takes static
Expand All @@ -40,7 +38,6 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class DefaultMetricResults extends MetricResults {
private static final Logger LOG = LoggerFactory.getLogger(DefaultMetricResults.class);

private final Iterable<MetricResult<Long>> counters;
private final Iterable<MetricResult<DistributionResult>> distributions;
Expand All @@ -54,7 +51,6 @@ public DefaultMetricResults(
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets,
Iterable<MetricResult<HistogramData>> perWorkerHistograms) {
LOG.info("xxx does this get here? DefaultMetricResults ");
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public void update(double value) {
dirty.afterModification();
}

/** Update it by another Histogram Data. */
@Override
public void update(HistogramData data) {
this.value.update(data);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public static MetricUpdates create(
Iterable<MetricUpdate<GaugeData>> gaugeUpdates,
Iterable<MetricUpdate<StringSetData>> stringSetUpdates,
Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates) {
// System.out.println("xxx metric create");
return new AutoValue_MetricUpdates(
counterUpdates,
distributionUpdates,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,28 +92,19 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {

private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);

// Should it be a cell Instead?
// Can this be a regular histogram instead of a cell'? see
// dirty state acts as being lock free, commits only non dirty metrics.
// also of type DISTRIBUTION_INT64_TYPE
// refactor to use Lock free histograms? later?
private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> perWorkerHistograms =
new MetricsMap<>(HistogramCell::new);

private MetricsMap<MetricName, StringSetCell> stringSets = new MetricsMap<>(StringSetCell::new);

// assume the same bucket type?
private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> histograms =
new MetricsMap<>(HistogramCell::new);

private MetricsContainerImpl(@Nullable String stepName, boolean isProcessWide) {
LOG.info("xxx create metric container {}: isProcessWide {}", stepName, isProcessWide);
this.stepName = stepName;
this.isProcessWide = isProcessWide;
}

// private static boolean enablePerWorkerMetrics = true; // default should be false

/**
* Create a new {@link MetricsContainerImpl} associated with the given {@code stepName}. If
* stepName is null, this MetricsContainer is not bound to a step.
Expand All @@ -127,7 +118,6 @@ public MetricsContainerImpl(@Nullable String stepName) {
* collecting processWide metrics for HarnessMonitoringInfoRequest/Response.
*/
public static MetricsContainerImpl createProcessWideContainer() {
LOG.info("xxx create createProcessWideContainer");
return new MetricsContainerImpl(null, true);
}

Expand Down Expand Up @@ -182,16 +172,8 @@ public DistributionCell getDistribution(MetricName metricName) {
@Override
public HistogramCell getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
// LOG.info("xxx stepName {}, getPerWorkerHistogram metric {}", stepName, metricName.getName());
// if not enabled, return a no op container from parent class
// if (!enablePerWorkerMetrics) {
// // will be a no op
// return null;
// // return MetricsContainer.super.getPerWorkerHistogram(metricName, bucketType);
// }
// return no op histogram instead
HistogramCell val = perWorkerHistograms.get(KV.of(metricName, bucketType));
return val; // no null chceks for the others
return val;
}

/**
Expand Down Expand Up @@ -285,13 +267,11 @@ ImmutableList<MetricUpdate<UpdateT>> extractUpdates(MetricsMap<MetricName, CellT
return updates.build();
}

// map is structured a little differently, use a different update method
private <UpdateT, CellT extends MetricCell<UpdateT>>
ImmutableList<MetricUpdate<UpdateT>> extractHistogramUpdates(
MetricsMap<KV<MetricName, HistogramData.BucketType>, CellT> cells) {
ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
cells.forEach(
// metric namd and bucket type pair, then cell
(key, value) -> {
if (value.getDirty().beforeCommit()) {
updates.add(
Expand Down Expand Up @@ -596,7 +576,6 @@ private String getShortId(
* committed.
*/
public void commitUpdates() {
// LOG.info("xxx does is commitUpdates?"); // add per worker metrics here?
counters.forEachValue(counter -> counter.getDirty().afterCommit());
distributions.forEachValue(distribution -> distribution.getDirty().afterCommit());
gauges.forEachValue(gauge -> gauge.getDirty().afterCommit());
Expand Down Expand Up @@ -897,7 +876,6 @@ public static MetricsContainerImpl deltaContainer(
currValue.getTopBucketCount() - prevValue.getTopBucketCount());
}

// treat per worker histograms differently
for (Map.Entry<KV<MetricName, HistogramData.BucketType>, HistogramCell> cell :
curr.perWorkerHistograms.entries()) {
HistogramData.BucketType bt = cell.getKey().getValue();
Expand Down
Loading

0 comments on commit 08e31c4

Please sign in to comment.