diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java index e86b2d5b5ceb..f1e710d3ec7e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java @@ -46,7 +46,7 @@ public class OutputSampler { // Temporarily holds exceptional elements. These elements can also be duplicated in the main // buffer. This is in order to always track exceptional elements even if the number of samples in // the main buffer drops it. - private final Map> exceptions = new HashMap<>(); + private Map> exceptions = new HashMap<>(); // Maximum number of elements in buffer. private final int maxElements; @@ -198,11 +198,16 @@ public List samples() throws IOException { // Serializing can take a lot of CPU time for larger or complex elements. Copy the array here // so as to not slow down the main processing hot path. List> bufferToSend; + Map> exceptionsToSend; int sampleIndex = 0; synchronized (this) { bufferToSend = buffer; - sampleIndex = resampleIndex; buffer = new ArrayList<>(maxElements); + + exceptionsToSend = exceptions; + exceptions = new HashMap<>(exceptions.size()); + + sampleIndex = resampleIndex; resampleIndex = 0; } @@ -210,14 +215,13 @@ public List samples() throws IOException { // to deduplicate samples. HashSet seen = new HashSet<>(); ByteStringOutputStream stream = new ByteStringOutputStream(); - for (Map.Entry> pair : exceptions.entrySet()) { + for (Map.Entry> pair : exceptionsToSend.entrySet()) { String processBundleId = pair.getKey(); ElementSample sample = pair.getValue(); seen.add(sample.id); ret.add(sampleToProto(sample, stream, processBundleId)); } - exceptions.clear(); for (int i = 0; i < bufferToSend.size(); i++) { int index = (sampleIndex + i) % bufferToSend.size(); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java index 8a4ea1b99844..5ca562e1c241 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java @@ -307,7 +307,9 @@ public void testConcurrentSamples() throws IOException, InterruptedException { } for (int i = 0; i < 1000000; i++) { - outputSampler.sample(WindowedValue.valueInGlobalWindow(i)); + ElementSample sample = + outputSampler.sample(WindowedValue.valueInGlobalWindow(i)); + outputSampler.exception(sample, new RuntimeException(""), "ptransformId", "pbId"); } doneSignal.countDown(); @@ -324,7 +326,9 @@ public void testConcurrentSamples() throws IOException, InterruptedException { } for (int i = -1000000; i < 0; i++) { - outputSampler.sample(WindowedValue.valueInGlobalWindow(i)); + ElementSample sample = + outputSampler.sample(WindowedValue.valueInGlobalWindow(i)); + outputSampler.exception(sample, new RuntimeException(""), "ptransformId", "pbId"); } doneSignal.countDown();