Skip to content

Commit

Permalink
add ability to encode and decode histogram data to portable runners
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Nov 12, 2024
1 parent 25a7cfd commit fca3295
Showing 1 changed file with 86 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,90 @@ public static HistogramData decodeInt64Histogram(ByteString payload) {
throw new RuntimeException(e);
}
}

/** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
try {
int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();

DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();

if (inputHistogram.getBucketType() instanceof HistogramData.LinearBuckets) {
HistogramData.LinearBuckets buckets =
(HistogramData.LinearBuckets) inputHistogram.getBucketType();
Linear linear = new Linear();
linear.setNumberOfBuckets(numberOfBuckets);
linear.setWidth(buckets.getWidth());
linear.setStart(buckets.getStart());
outputHistogram2.setBucketOptions(new BucketOptions().setLinear(linear));
} else if (inputHistogram.getBucketType() instanceof HistogramData.ExponentialBuckets) {
HistogramData.ExponentialBuckets buckets =
(HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
Base2Exponent base2Exp = new Base2Exponent();
base2Exp.setNumberOfBuckets(numberOfBuckets);
base2Exp.setScale(buckets.getScale());
outputHistogram2.setBucketOptions(new BucketOptions().setExponential(base2Exp));
} else {
throw new RuntimeException("Unable to parse histogram, bucket is not recognized");
}

outputHistogram2.setCount(inputHistogram.getTotalCount());

List<Long> bucketCounts = new ArrayList<>();

Arrays.stream(inputHistogram.getBucketCount())
.forEach(
val -> {
bucketCounts.add(val);
});

outputHistogram2.setBucketCounts(bucketCounts);

ObjectMapper objectMapper = new ObjectMapper();
String jsonString = objectMapper.writeValueAsString(outputHistogram2);

return ByteString.copyFromUtf8(jsonString);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
public static HistogramData decodeInt64Histogram(ByteString payload) {
try {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // parse afterwards
DataflowHistogramValue newHist = new DataflowHistogramValue();
newHist.setCount(jsonNode.get("count").asLong());

List<Long> bucketCounts = new ArrayList<>();
Iterator<JsonNode> itr = jsonNode.get("bucketCounts").iterator();
while (itr.hasNext()) {
Long item = itr.next().asLong();
bucketCounts.add(item);
}
newHist.setBucketCounts(bucketCounts);

if (jsonNode.get("bucketOptions").has("linear")) {
Linear linear = new Linear();
JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
linear.setWidth(linearNode.get("width").asDouble());
linear.setStart(linearNode.get("start").asDouble());
newHist.setBucketOptions(new BucketOptions().setLinear(linear));
} else if (jsonNode.get("bucketOptions").has("exponential")) {
Base2Exponent base2Exp = new Base2Exponent();
JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");
base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
base2Exp.setScale(expNode.get("scale").asInt());
newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
} else {
throw new RuntimeException("Unable to parse histogram, bucket is not recognized");
}
return new HistogramData(newHist);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit fca3295

Please sign in to comment.