From 88ada9dfee3c4602ff62b0f6ebdded29518760c9 Mon Sep 17 00:00:00 2001 From: martin trieu Date: Wed, 30 Oct 2024 04:19:36 -0600 Subject: [PATCH] fix silent failures in dispatch loop from stalling the pipeline (#32922) * use ExecutorService instead of ScheduledExecutorService which swallows exceptions into futures that were not examined Co-authored-by: Arun Pandian --- .../worker/DataflowWorkerHarnessHelper.java | 4 +- .../worker/StreamingDataflowWorker.java | 14 +-- .../WorkerUncaughtExceptionHandler.java | 10 +- .../FanOutStreamingEngineWorkerHarness.java | 2 +- .../harness/SingleSourceWorkerHarness.java | 2 +- .../StreamingApplianceWorkCommitter.java | 2 +- .../work/budget/GetWorkBudgetRefresher.java | 2 +- .../processing/StreamingWorkScheduler.java | 2 +- ...ingEngineComputationConfigFetcherTest.java | 1 - .../SingleSourceWorkerHarnessTest.java | 117 ++++++++++++++++++ 10 files changed, 136 insertions(+), 20 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java index 94c894608a47..a28a5e989c88 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java @@ -82,7 +82,9 @@ public static T initializeGlobalStateAn @SuppressWarnings("Slf4jIllegalPassedClass") public static void initializeLogging(Class workerHarnessClass) { - /* Set up exception handling tied to the workerHarnessClass. */ + // Set up exception handling for raw Threads tied to the workerHarnessClass. + // Does NOT handle exceptions thrown by threads created by + // ScheduledExecutors/ScheduledExecutorServices. Thread.setDefaultUncaughtExceptionHandler( new WorkerUncaughtExceptionHandler(LoggerFactory.getLogger(workerHarnessClass))); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index c478341c1c39..ff72add83e4d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -175,7 +175,7 @@ private StreamingDataflowWorker( StreamingCounters streamingCounters, MemoryMonitor memoryMonitor, GrpcWindmillStreamFactory windmillStreamFactory, - Function executorSupplier, + ScheduledExecutorService activeWorkRefreshExecutorFn, ConcurrentMap stageInfoMap) { // Register standard file systems. FileSystems.setDefaultPipelineOptions(options); @@ -285,7 +285,7 @@ private StreamingDataflowWorker( stuckCommitDurationMillis, computationStateCache::getAllPresentComputations, sampler, - executorSupplier.apply("RefreshWork"), + activeWorkRefreshExecutorFn, getDataMetricTracker::trackHeartbeats); this.statusPages = @@ -347,10 +347,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o .setSizeMb(options.getWorkerCacheMb()) .setSupportMapViaMultimap(options.isEnableStreamingEngine()) .build(); - Function executorSupplier = - threadName -> - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(threadName).build()); + GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder = createGrpcwindmillStreamFactoryBuilder(options, clientId); @@ -417,7 +414,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o streamingCounters, memoryMonitor, configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(), - executorSupplier, + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("RefreshWork").build()), stageInfo); } @@ -595,7 +593,7 @@ static StreamingDataflowWorker forTesting( options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) .build() : windmillStreamFactory.build(), - executorSupplier, + executorSupplier.apply("RefreshWork"), stageInfo); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java index 5a8e87d23ab9..b4ec170099d5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerUncaughtExceptionHandler.java @@ -28,16 +28,16 @@ * This uncaught exception handler logs the {@link Throwable} to the logger, {@link System#err} and * exits the application with status code 1. */ -class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler { +public final class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler { + @VisibleForTesting public static final int JVM_TERMINATED_STATUS_CODE = 1; private final JvmRuntime runtime; private final Logger logger; - WorkerUncaughtExceptionHandler(Logger logger) { + public WorkerUncaughtExceptionHandler(Logger logger) { this(JvmRuntime.INSTANCE, logger); } - @VisibleForTesting - WorkerUncaughtExceptionHandler(JvmRuntime runtime, Logger logger) { + public WorkerUncaughtExceptionHandler(JvmRuntime runtime, Logger logger) { this.runtime = runtime; this.logger = logger; } @@ -59,7 +59,7 @@ public void uncaughtException(Thread thread, Throwable e) { t.printStackTrace(originalStdErr); } } finally { - runtime.halt(1); + runtime.halt(JVM_TERMINATED_STATUS_CODE); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java index 458cf57ca8e7..3eed4ee6d835 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java @@ -137,7 +137,7 @@ private FanOutStreamingEngineWorkerHarness( Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build()); this.workerMetadataConsumer = - Executors.newSingleThreadScheduledExecutor( + Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build()); this.getWorkBudgetDistributor = getWorkBudgetDistributor; this.totalGetWorkBudget = totalGetWorkBudget; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index bc93e6d89c41..06598b61c458 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -82,7 +82,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness { this.waitForResources = waitForResources; this.computationStateFetcher = computationStateFetcher; this.workProviderExecutor = - Executors.newSingleThreadScheduledExecutor( + Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() .setDaemon(true) .setPriority(Thread.MIN_PRIORITY) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java index d092ebf53fc1..6889764afe69 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitter.java @@ -57,7 +57,7 @@ private StreamingApplianceWorkCommitter( WeightedBoundedQueue.create( MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize())); this.commitWorkers = - Executors.newSingleThreadScheduledExecutor( + Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() .setDaemon(true) .setPriority(Thread.MAX_PRIORITY) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java index e39aa8dbc8a5..d81c7d0593f3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java @@ -51,7 +51,7 @@ public GetWorkBudgetRefresher( Supplier isBudgetRefreshPaused, Runnable redistributeBudget) { this.budgetRefreshTrigger = new AdvancingPhaser(1); this.budgetRefreshExecutor = - Executors.newSingleThreadScheduledExecutor( + Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() .setNameFormat(BUDGET_REFRESH_THREAD) .setUncaughtExceptionHandler( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 9a3e6eb6b099..c74874c465a6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -70,7 +70,7 @@ */ @Internal @ThreadSafe -public final class StreamingWorkScheduler { +public class StreamingWorkScheduler { private static final Logger LOG = LoggerFactory.getLogger(StreamingWorkScheduler.class); private final DataflowWorkerHarnessOptions options; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java index 9fa17588c94d..3a0ae7bb2084 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java @@ -47,7 +47,6 @@ @RunWith(JUnit4.class) public class StreamingEngineComputationConfigFetcherTest { - private final WorkUnitClient mockDataflowServiceClient = mock(WorkUnitClient.class, new Returns(Optional.empty())); private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java new file mode 100644 index 000000000000..5a2df4baae61 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.harness; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.WorkerUncaughtExceptionHandler; +import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; +import org.apache.beam.runners.dataflow.worker.util.common.worker.JvmRuntime; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; +import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient; +import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(JUnit4.class) +public class SingleSourceWorkerHarnessTest { + private static final Logger LOG = LoggerFactory.getLogger(SingleSourceWorkerHarnessTest.class); + private final WorkCommitter workCommitter = mock(WorkCommitter.class); + private final GetDataClient getDataClient = mock(GetDataClient.class); + private final HeartbeatSender heartbeatSender = mock(HeartbeatSender.class); + private final Runnable waitForResources = () -> {}; + private final Function> computationStateFetcher = + ignored -> Optional.empty(); + private final StreamingWorkScheduler streamingWorkScheduler = mock(StreamingWorkScheduler.class); + + private SingleSourceWorkerHarness createWorkerHarness( + SingleSourceWorkerHarness.GetWorkSender getWorkSender, JvmRuntime runtime) { + // In non-test scenario this is set in DataflowWorkerHarnessHelper.initializeLogging(...). + Thread.setDefaultUncaughtExceptionHandler(new WorkerUncaughtExceptionHandler(runtime, LOG)); + return SingleSourceWorkerHarness.builder() + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setHeartbeatSender(heartbeatSender) + .setWaitForResources(waitForResources) + .setStreamingWorkScheduler(streamingWorkScheduler) + .setComputationStateFetcher(computationStateFetcher) + .setGetWorkSender(getWorkSender) + .build(); + } + + @Test + public void testDispatchLoop_unexpectedFailureKillsJvm_appliance() { + SingleSourceWorkerHarness.GetWorkSender getWorkSender = + SingleSourceWorkerHarness.GetWorkSender.forAppliance( + () -> { + throw new RuntimeException("something bad happened"); + }); + + FakeJvmRuntime fakeJvmRuntime = new FakeJvmRuntime(); + createWorkerHarness(getWorkSender, fakeJvmRuntime).start(); + assertTrue(fakeJvmRuntime.waitForRuntimeDeath(5, TimeUnit.SECONDS)); + fakeJvmRuntime.assertJvmTerminated(); + } + + @Test + public void testDispatchLoop_unexpectedFailureKillsJvm_streamingEngine() { + SingleSourceWorkerHarness.GetWorkSender getWorkSender = + SingleSourceWorkerHarness.GetWorkSender.forStreamingEngine( + workItemReceiver -> { + throw new RuntimeException("something bad happened"); + }); + + FakeJvmRuntime fakeJvmRuntime = new FakeJvmRuntime(); + createWorkerHarness(getWorkSender, fakeJvmRuntime).start(); + assertTrue(fakeJvmRuntime.waitForRuntimeDeath(5, TimeUnit.SECONDS)); + fakeJvmRuntime.assertJvmTerminated(); + } + + private static class FakeJvmRuntime implements JvmRuntime { + private final CountDownLatch haltedLatch = new CountDownLatch(1); + private volatile int exitStatus = 0; + + @Override + public void halt(int status) { + exitStatus = status; + haltedLatch.countDown(); + } + + public boolean waitForRuntimeDeath(long timeout, TimeUnit unit) { + try { + return haltedLatch.await(timeout, unit); + } catch (InterruptedException e) { + return false; + } + } + + private void assertJvmTerminated() { + assertThat(exitStatus).isEqualTo(WorkerUncaughtExceptionHandler.JVM_TERMINATED_STATUS_CODE); + } + } +}