diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java index c818e422f5e5..128e04ae0001 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java @@ -264,4 +264,90 @@ public static HistogramData decodeInt64Histogram(ByteString payload) { throw new RuntimeException(e); } } + + /** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */ + public static ByteString encodeInt64Histogram(HistogramData inputHistogram) { + try { + int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets(); + + DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue(); + + if (inputHistogram.getBucketType() instanceof HistogramData.LinearBuckets) { + HistogramData.LinearBuckets buckets = + (HistogramData.LinearBuckets) inputHistogram.getBucketType(); + Linear linear = new Linear(); + linear.setNumberOfBuckets(numberOfBuckets); + linear.setWidth(buckets.getWidth()); + linear.setStart(buckets.getStart()); + outputHistogram2.setBucketOptions(new BucketOptions().setLinear(linear)); + } else if (inputHistogram.getBucketType() instanceof HistogramData.ExponentialBuckets) { + HistogramData.ExponentialBuckets buckets = + (HistogramData.ExponentialBuckets) inputHistogram.getBucketType(); + Base2Exponent base2Exp = new Base2Exponent(); + base2Exp.setNumberOfBuckets(numberOfBuckets); + base2Exp.setScale(buckets.getScale()); + outputHistogram2.setBucketOptions(new BucketOptions().setExponential(base2Exp)); + } else { + throw new RuntimeException("Unable to parse histogram, bucket is not recognized"); + } + + outputHistogram2.setCount(inputHistogram.getTotalCount()); + + List bucketCounts = new ArrayList<>(); + + Arrays.stream(inputHistogram.getBucketCount()) + .forEach( + val -> { + bucketCounts.add(val); + }); + + outputHistogram2.setBucketCounts(bucketCounts); + + ObjectMapper objectMapper = new ObjectMapper(); + String jsonString = objectMapper.writeValueAsString(outputHistogram2); + + return ByteString.copyFromUtf8(jsonString); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */ + public static HistogramData decodeInt64Histogram(ByteString payload) { + try { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // parse afterwards + DataflowHistogramValue newHist = new DataflowHistogramValue(); + newHist.setCount(jsonNode.get("count").asLong()); + + List bucketCounts = new ArrayList<>(); + Iterator itr = jsonNode.get("bucketCounts").iterator(); + while (itr.hasNext()) { + Long item = itr.next().asLong(); + bucketCounts.add(item); + } + newHist.setBucketCounts(bucketCounts); + + if (jsonNode.get("bucketOptions").has("linear")) { + Linear linear = new Linear(); + JsonNode linearNode = jsonNode.get("bucketOptions").get("linear"); + linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt()); + linear.setWidth(linearNode.get("width").asDouble()); + linear.setStart(linearNode.get("start").asDouble()); + newHist.setBucketOptions(new BucketOptions().setLinear(linear)); + } else if (jsonNode.get("bucketOptions").has("exponential")) { + Base2Exponent base2Exp = new Base2Exponent(); + JsonNode expNode = jsonNode.get("bucketOptions").get("exponential"); + base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt()); + base2Exp.setScale(expNode.get("scale").asInt()); + newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp)); + } else { + throw new RuntimeException("Unable to parse histogram, bucket is not recognized"); + } + return new HistogramData(newHist); + } catch (IOException e) { + throw new RuntimeException(e); + } + } }