From 7930a1fc88b0c55883c9c26945a49ddb2082f1a5 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 26 Jul 2024 16:37:18 -0400 Subject: [PATCH] Revert "Avoid publishing string set metrics on the Dataflow legacy runner." (#32002) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Revert "Avoid publishing string set metrics on the Dataflow legacy runner. (#…" This reverts commit a22609494318e3125255105b6d2568bbba30a54f. * unskip stingset * Fix ClassCastException --- .../google-cloud-dataflow-java/build.gradle | 1 - .../runners/dataflow/DataflowMetrics.java | 4 ++-- .../worker/BatchModeExecutionContext.java | 19 ++++--------------- .../worker/StreamingStepMetricsContainer.java | 7 +------ .../StreamingStepMetricsContainerTest.java | 2 -- 5 files changed, 7 insertions(+), 26 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 177f8acf8042..4ddbb6ea41f4 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -183,7 +183,6 @@ def commonLegacyExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesExternalService', 'org.apache.beam.sdk.testing.UsesDistributionMetrics', 'org.apache.beam.sdk.testing.UsesGaugeMetrics', - 'org.apache.beam.sdk.testing.UsesStringSetMetrics', 'org.apache.beam.sdk.testing.UsesMultimapState', 'org.apache.beam.sdk.testing.UsesTestStream', 'org.apache.beam.sdk.testing.UsesParDoLifecycle', diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index 1fad140717f6..46fdce507c3d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -24,11 +24,11 @@ import com.google.api.services.dataflow.model.JobMetrics; import com.google.api.services.dataflow.model.MetricUpdate; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeResult; @@ -191,7 +191,7 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) { if (metricUpdate.getSet() == null) { return StringSetResult.empty(); } - return StringSetResult.create(ImmutableSet.copyOf(((Set) metricUpdate.getSet()))); + return StringSetResult.create(ImmutableSet.copyOf(((Collection) metricUpdate.getSet()))); } private DistributionResult getDistributionValue(MetricUpdate metricUpdate) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index 41bbae7cfdb3..aeef7784c2c3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -19,7 +19,6 @@ import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.SideInputInfo; -import java.util.Collections; import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.core.InMemoryStateInternals; @@ -78,9 +77,6 @@ public class BatchModeExecutionContext protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE = "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl"; - // TODO(BEAM-31814): Remove once Dataflow legacy runner supports this. - private final boolean populateStringSetMetrics; - private BatchModeExecutionContext( CounterFactory counterFactory, Cache> dataCache, @@ -88,8 +84,7 @@ private BatchModeExecutionContext( ReaderFactory readerFactory, PipelineOptions options, DataflowExecutionStateTracker executionStateTracker, - DataflowExecutionStateRegistry executionStateRegistry, - boolean populateStringSetMetrics) { + DataflowExecutionStateRegistry executionStateRegistry) { super( counterFactory, createMetricsContainerRegistry(), @@ -102,7 +97,6 @@ private BatchModeExecutionContext( this.dataCache = dataCache; this.containerRegistry = (MetricsContainerRegistry) getMetricsContainerRegistry(); - this.populateStringSetMetrics = populateStringSetMetrics; } private static MetricsContainerRegistry createMetricsContainerRegistry() { @@ -138,8 +132,7 @@ public static BatchModeExecutionContext forTesting( counterFactory, options, "test-work-item-id"), - stateRegistry, - true); + stateRegistry); } public static BatchModeExecutionContext forTesting(PipelineOptions options, String stageName) { @@ -252,8 +245,7 @@ public static BatchModeExecutionContext create( counterFactory, options, workItemId), - executionStateRegistry, - false); + executionStateRegistry); } /** Create a new {@link StepContext}. */ @@ -523,10 +515,7 @@ public Iterable extractMetricUpdates(boolean isFinalUpdate) { update -> MetricsToCounterUpdateConverter.fromDistribution( update.getKey(), true, update.getUpdate())), - FluentIterable.from( - populateStringSetMetrics - ? updates.stringSetUpdates() - : Collections.emptyList()) + FluentIterable.from(updates.stringSetUpdates()) .transform( update -> MetricsToCounterUpdateConverter.fromStringSet( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 04f983c4fa7f..7cc0dc68f7e7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -23,7 +23,6 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; @@ -89,9 +88,6 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private final Clock clock; - // TODO(BEAM-31814): Remove once Dataflow legacy runner supports this. - @VisibleForTesting boolean populateStringSetUpdates = false; - private StreamingStepMetricsContainer(String stepName) { this.stepName = stepName; this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>(); @@ -191,8 +187,7 @@ public Histogram getPerWorkerHistogram( public Iterable extractUpdates() { return counterUpdates() .append(distributionUpdates()) - .append(gaugeUpdates()) - .append(populateStringSetUpdates ? stringSetUpdates() : Collections.emptyList()); + .append(gaugeUpdates().append(stringSetUpdates())); } private FluentIterable counterUpdates() { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index d128255cd237..2d5a8d8266ae 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -292,7 +292,6 @@ public void testStringSetUpdateExtraction() { .setCumulative(false) .setStringList(new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh"))); - ((StreamingStepMetricsContainer) c1).populateStringSetUpdates = true; Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update)); @@ -315,7 +314,6 @@ public void testStringSetUpdateExtraction() { .setCumulative(false) .setStringList(new StringList().setElements(Arrays.asList("ij", "kl", "mn"))); - ((StreamingStepMetricsContainer) c2).populateStringSetUpdates = true; updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update, name2Update));