Skip to content

Commit

Permalink
use different proto
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Oct 23, 2024
1 parent 6887687 commit 1855c5e
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 56 deletions.
4 changes: 3 additions & 1 deletion runners/core-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
// implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill") // need histogram proto
provided library.java.google_api_services_dataflow

implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.joda_time
implementation library.java.vendored_grpc_1_60_1
Expand All @@ -62,5 +65,4 @@ dependencies {
provided library.java.google_cloud_dataflow_java_proto_library_all
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation(library.java.google_api_services_dataflow)
implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill", configuration: "shadow") // need histogram proto
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@
*/
package org.apache.beam.runners.core.metrics;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.dataflow.model.BucketOptions;
import com.google.api.services.dataflow.model.DataflowHistogramValue;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Base2Exponent;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Linear;
// import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Base2Exponent;
// import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Linear;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
Expand All @@ -38,8 +43,7 @@
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.api.services.dataflow.model.DataflowHistogramValue;

import java.lang.reflect.Method;

// TODO Refactor out DataflowHistogramValue to be runner agnostic.

Expand Down Expand Up @@ -71,60 +75,108 @@ public static ByteString encodeInt64Distribution(DistributionData data) {
public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
LOG.info("Xxx: data {}", inputHistogram.getPercentileString("poll latency", "seconds"));
try {
org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.Builder outputHistogram =
org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.newBuilder();
int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();

// try with new proto:, and add outlier stats as well.
DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
LOG.info(" before buckets are inserted {}", outputHistogram2.toPrettyString());
// refactor out different bucket types?
if (inputHistogram.getBucketType() instanceof HistogramData.LinearBuckets) {
LOG.info("xxx linear buckets");
HistogramData.LinearBuckets buckets =
(HistogramData.LinearBuckets) inputHistogram.getBucketType();
Linear.Builder linearOptions =
Linear.newBuilder()
.setNumberOfBuckets(numberOfBuckets)
.setWidth(buckets.getWidth())
.setStart(buckets.getStart());
outputHistogram.getBucketOptionsBuilder().setLinear(linearOptions);
com.google.api.services.dataflow.model.Linear linear =
new com.google.api.services.dataflow.model.Linear();
linear.setNumberOfBuckets(numberOfBuckets);
linear.setWidth(buckets.getWidth());
linear.setStart(buckets.getStart());
// set null value to help with parsing in decoding step
outputHistogram2.setBucketOptions(new BucketOptions().setLinear(linear));
} else if (inputHistogram.getBucketType() instanceof HistogramData.ExponentialBuckets) {
LOG.info("xxx exp buckets");
HistogramData.ExponentialBuckets buckets =
(HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
Base2Exponent.Builder exponentialOptions =
Base2Exponent.newBuilder().setNumberOfBuckets(numberOfBuckets).setScale(buckets.getScale());
outputHistogram.getBucketOptionsBuilder().setExponential(exponentialOptions);
} else { // unsupported type
com.google.api.services.dataflow.model.Base2Exponent base2Exp =
new com.google.api.services.dataflow.model.Base2Exponent();
base2Exp.setNumberOfBuckets(numberOfBuckets);
base2Exp.setScale(buckets.getScale());
outputHistogram2.setBucketOptions(new BucketOptions().setExponential(base2Exp));
} else { // unsupported type
// should an error be thrown here?
LOG.info("xxx error, bucket type not recognized");
}
// LOG.info(" after buckets are inserted {}", outputHistogram2.toPrettyString());

outputHistogram.setCount(inputHistogram.getTotalCount());
LOG.info("xxx inputHistogram.getBucketType().getNumBuckets() {}", inputHistogram.getBucketType().getNumBuckets());
for (int i = 0; i < inputHistogram.getBucketType().getNumBuckets(); i++) {
LOG.info("xxx bucket counts {}, num buckets {}", i, inputHistogram.getBucketType().getNumBuckets());
// dont count overflow and underflow records
outputHistogram.addBucketCounts(inputHistogram.getCount(i));
}
outputHistogram2.setCount(inputHistogram.getTotalCount());

Method[] methods = outputHistogram.getClass().getMethods();
for (Method method : methods) {
System.out.println(method.toString());
}
LOG.info("Xxx: encoded data {} ", outputHistogram.toString());
List<Long> bucketCounts = new ArrayList<>();

Arrays.stream(inputHistogram.getBucketCount())
.forEach(
val -> {
bucketCounts.add(val);
});

outputHistogram2.setBucketCounts(bucketCounts);

// try with new proto:
ObjectMapper objectMapper = new ObjectMapper();
String jsonString = objectMapper.writeValueAsString(outputHistogram2);

return outputHistogram.build().toByteString();
// Method[] methods = inputHistogram.getClass().getMethods();
// for (Method method : methods) {
// System.out.println(method.toString());
// }
return ByteString.copyFromUtf8(jsonString);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
public static HistogramData decodeInt64Histogram(ByteString payload) {
// decode to DataflowHistogramValue, then create Histogram Data from it, and pass that along.
try {
org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram outputHistogram =
org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.parseFrom(payload);
LOG.info("Xxx: data {}, {} ", outputHistogram.toString(), payload);
return new HistogramData(outputHistogram);
} catch (Exception e) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // parse afterwards
LOG.info("xxx josn NOde pretty print {}", jsonNode.toPrettyString());
DataflowHistogramValue newHist = new DataflowHistogramValue();
newHist.setCount(jsonNode.get("count").asLong());

List<Long> bucketCounts = new ArrayList<>();
Iterator<JsonNode> itr = jsonNode.get("bucketCounts").iterator();
while (itr.hasNext()) {
Long item = itr.next().asLong();
// do something with array elements
bucketCounts.add(item);
}
newHist.setBucketCounts(bucketCounts);

// only one will be set
LOG.info("xxx bucketOptions {}", jsonNode.get("bucketOptions").toString());
if (jsonNode.get("bucketOptions").has("linear")) {
com.google.api.services.dataflow.model.Linear linear =
new com.google.api.services.dataflow.model.Linear();
JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
linear.setWidth(linearNode.get("width").asDouble());
linear.setStart(linearNode.get("start").asDouble());
LOG.info("xxx linear bucket: {}", linear);
newHist.setBucketOptions(new BucketOptions().setLinear(linear));
} else {
// assume exp for now
com.google.api.services.dataflow.model.Base2Exponent base2Exp =
new com.google.api.services.dataflow.model.Base2Exponent();
JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");

base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
base2Exp.setScale(expNode.get("scale").asInt());
newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
}

LOG.info("xxx jsonNode to proto {}", newHist.toString());
LOG.info("Xxx: data {} ", payload);
return new HistogramData(newHist); // update
} catch (IOException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class MonitoringInfoEncodingsTest {
public void testInt64DistributionEncoding() {
DistributionData data = DistributionData.create(1L, 2L, 3L, 4L);
ByteString payload = encodeInt64Distribution(data);
System.out.println("xxxx " + payload);
assertEquals(data, decodeInt64Distribution(payload));
}

Expand All @@ -64,18 +63,33 @@ public void testDoubleDistributionEncoding() {
}

@Test
public void testHistgramInt64Encoding() {
public void testHistgramInt64EncodingLinearHist() {
HistogramData.BucketType buckets = HistogramData.LinearBuckets.of(0, 5, 5);

HistogramData inputHistogram = new HistogramData(buckets);
inputHistogram.record(5, 10, 15, 20);
// LOG.info("Xxx: inputHistogram {}, {} ", inputHistogram.getBoun, payload);
ByteString payload = encodeInt64Histogram(inputHistogram);
System.out.println("xxx payload: " + payload);

// com.google.api.services.dataflow.model.DataflowHistogramValue histogramProto= new
// com.google.api.services.dataflow.model.DataflowHistogramValue();
// HistogramData data = inputHistogram.extractResult();
// System.out.println("xxx data {}" + data);
assertEquals(inputHistogram, decodeInt64Histogram(payload));
}

@Test
public void testHistgramInt64EncodingExpHist() {
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 10);

HistogramData inputHistogram = new HistogramData(buckets);
inputHistogram.record(2, 4, 8, 16, 32);
ByteString payload = encodeInt64Histogram(inputHistogram);
System.out.println("xxx payload: " + payload);
assertEquals(inputHistogram, decodeInt64Histogram(payload));
}

@Test
public void testInt64GaugeEncoding() {
GaugeData data = GaugeData.create(1L, new Instant(2L));
Expand Down
1 change: 1 addition & 0 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ test {
dependencies {
antlr library.java.antlr
implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill", configuration: "shadow") // need histogram proto
provided library.java.google_api_services_dataflow
// antlr is used to generate code from sdks/java/core/src/main/antlr/
permitUnusedDeclared library.java.antlr
// Required to load constants from the model, e.g. max timestamp for global window
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Base2Exponent;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Linear;
// import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions;
// import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Base2Exponent;
// import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Linear;
import com.google.api.services.dataflow.model.DataflowHistogramValue;
import java.lang.reflect.Method;

/**
* A histogram that supports estimated percentile with linear interpolation.
*
* <p>We may consider using Apache Commons or HdrHistogram library in the future for advanced
* features such as sparsely populated histograms.
* features such as sparsely populated histograms.
*/
public class HistogramData implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(HistogramData.class);
Expand Down Expand Up @@ -77,10 +79,10 @@ public HistogramData(BucketType bucketType) {
this.sumOfSquaredDeviations = 0;
}

public HistogramData(org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram histogramProto) {
// HistogramData newHist = null;
public HistogramData(com.google.api.services.dataflow.model.DataflowHistogramValue histogramProto) {

int numBuckets;
if(histogramProto.getBucketOptions().hasLinear()){
if(histogramProto.getBucketOptions().getLinear() != null){
double start = histogramProto.getBucketOptions().getLinear().getStart();
double width = histogramProto.getBucketOptions().getLinear().getWidth();
numBuckets = histogramProto.getBucketOptions().getLinear().getNumberOfBuckets();
Expand All @@ -89,10 +91,10 @@ public HistogramData(org.apache.beam.runners.dataflow.worker.windmill.Windmill.H

this.buckets = new long[bucketType.getNumBuckets()];
// populate with bucket counts with mean type for now, not used to determine equality
for (long val: histogramProto.getBucketCountsList()){
for (long val: histogramProto.getBucketCounts()){
this.buckets[idx] = val; // is this valid?
if (!(idx == 0 || idx == bucketType.getNumBuckets()-1 )){
LOG.info("xxx {} {}", val, idx);
// LOG.info("xxx val, idx {} {}", val, idx);
this.numBoundedBucketRecords+= val;
}
idx++;
Expand All @@ -107,18 +109,16 @@ public HistogramData(org.apache.beam.runners.dataflow.worker.windmill.Windmill.H
this.bucketType = ExponentialBuckets.of(scale, numBuckets);
this.buckets = new long[bucketType.getNumBuckets()];
// populate with bucket counts with mean type for now, not used to determine equality
for (long val: histogramProto.getBucketCountsList()){
for (long val: histogramProto.getBucketCounts()){
this.buckets[idx] = val; // is this valid?
if (!(idx == 0 || idx == bucketType.getNumBuckets()-1 )){
this.numBoundedBucketRecords+= val;
}
idx++;
}
}
LOG.info("xxx numBoundedBucketRecords when creating from proto {}", numBoundedBucketRecords);
}


public BucketType getBucketType() {
return this.bucketType;
}
Expand Down Expand Up @@ -224,6 +224,7 @@ public synchronized void incBucketCount(int bucketIndex, long count) {
}

public synchronized void incTopBucketCount(long count) {
LOG.info("xxx increment top bucket {}", count);
this.numTopRecords += count;
}

Expand Down Expand Up @@ -258,6 +259,7 @@ public synchronized void record(double value) {
double rangeTo = bucketType.getRangeTo();
double rangeFrom = bucketType.getRangeFrom();
if (value >= rangeTo) {
// LOG.info("xxx value, rangeTo {}, {}", value, rangeTo);
recordTopRecordsValue(value);
} else if (value < rangeFrom) {
recordBottomRecordsValue(value);
Expand Down Expand Up @@ -296,6 +298,7 @@ private synchronized void updateStatistics(double value) {
*
* @param value
*/
// out of bounds values
private synchronized void recordTopRecordsValue(double value) {
numTopRecords++;
topRecordsSum += value;
Expand Down Expand Up @@ -351,6 +354,10 @@ public synchronized long getCount(int bucketIndex) {
return buckets[bucketIndex];
}

public synchronized long[] getBucketCount() {
return buckets;
}

public synchronized long getTopBucketCount() {
return numTopRecords;
}
Expand Down Expand Up @@ -468,6 +475,7 @@ public double getInvLog2GrowthFactor() {
@Memoized
@Override
public double getRangeTo() {
// LOG.info("xxx {}, {}, range {}, {}", getBase(), getNumBuckets(), Math.pow(getBase(), getNumBuckets()), getScale());
return Math.pow(getBase(), getNumBuckets());
}

Expand Down Expand Up @@ -631,18 +639,17 @@ public double getRangeFrom() {
public double getRangeTo() {
return getStart() + getNumBuckets() * getWidth();
}

// Note: equals() and hashCode() are implemented by the AutoValue.
}

@Override
public synchronized boolean equals(@Nullable Object object) {
if (object instanceof HistogramData) {
HistogramData other = (HistogramData) object;
LOG.info("xxx {}, {}, {}", numBoundedBucketRecords == other.numBoundedBucketRecords, numBoundedBucketRecords, other.numBoundedBucketRecords);
LOG.info("xxx {}", numTopRecords == other.numTopRecords);
LOG.info("xxx {}, {}, {}", numTopRecords == other.numTopRecords, numTopRecords, other.numTopRecords);
LOG.info("xxx {}", numBottomRecords == other.numBottomRecords);
LOG.info("xxx {}", Arrays.equals(buckets, other.buckets));
LOG.info("xxx {}, {}", buckets, other.buckets);

synchronized (other) {
return Objects.equals(bucketType, other.bucketType)
Expand Down

0 comments on commit 1855c5e

Please sign in to comment.