Skip to content

Commit

Permalink
fix silent failures in dispatch loop from stalling the pipeline (#32922)
Browse files Browse the repository at this point in the history
* use ExecutorService instead of ScheduledExecutorService which swallows exceptions into futures that were not examined

Co-authored-by: Arun Pandian <[email protected]>
  • Loading branch information
m-trieu and arunpandianp authored Oct 30, 2024
1 parent 8b1ca21 commit 88ada9d
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ public static <T extends DataflowWorkerHarnessOptions> T initializeGlobalStateAn

@SuppressWarnings("Slf4jIllegalPassedClass")
public static void initializeLogging(Class<?> workerHarnessClass) {
/* Set up exception handling tied to the workerHarnessClass. */
// Set up exception handling for raw Threads tied to the workerHarnessClass.
// Does NOT handle exceptions thrown by threads created by
// ScheduledExecutors/ScheduledExecutorServices.
Thread.setDefaultUncaughtExceptionHandler(
new WorkerUncaughtExceptionHandler(LoggerFactory.getLogger(workerHarnessClass)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private StreamingDataflowWorker(
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
GrpcWindmillStreamFactory windmillStreamFactory,
Function<String, ScheduledExecutorService> executorSupplier,
ScheduledExecutorService activeWorkRefreshExecutorFn,
ConcurrentMap<String, StageInfo> stageInfoMap) {
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(options);
Expand Down Expand Up @@ -285,7 +285,7 @@ private StreamingDataflowWorker(
stuckCommitDurationMillis,
computationStateCache::getAllPresentComputations,
sampler,
executorSupplier.apply("RefreshWork"),
activeWorkRefreshExecutorFn,
getDataMetricTracker::trackHeartbeats);

this.statusPages =
Expand Down Expand Up @@ -347,10 +347,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
.setSizeMb(options.getWorkerCacheMb())
.setSupportMapViaMultimap(options.isEnableStreamingEngine())
.build();
Function<String, ScheduledExecutorService> executorSupplier =
threadName ->
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName).build());

GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder =
createGrpcwindmillStreamFactoryBuilder(options, clientId);

Expand Down Expand Up @@ -417,7 +414,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
streamingCounters,
memoryMonitor,
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
executorSupplier,
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("RefreshWork").build()),
stageInfo);
}

Expand Down Expand Up @@ -595,7 +593,7 @@ static StreamingDataflowWorker forTesting(
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
.build()
: windmillStreamFactory.build(),
executorSupplier,
executorSupplier.apply("RefreshWork"),
stageInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@
* This uncaught exception handler logs the {@link Throwable} to the logger, {@link System#err} and
* exits the application with status code 1.
*/
class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler {
public final class WorkerUncaughtExceptionHandler implements UncaughtExceptionHandler {
@VisibleForTesting public static final int JVM_TERMINATED_STATUS_CODE = 1;
private final JvmRuntime runtime;
private final Logger logger;

WorkerUncaughtExceptionHandler(Logger logger) {
public WorkerUncaughtExceptionHandler(Logger logger) {
this(JvmRuntime.INSTANCE, logger);
}

@VisibleForTesting
WorkerUncaughtExceptionHandler(JvmRuntime runtime, Logger logger) {
public WorkerUncaughtExceptionHandler(JvmRuntime runtime, Logger logger) {
this.runtime = runtime;
this.logger = logger;
}
Expand All @@ -59,7 +59,7 @@ public void uncaughtException(Thread thread, Throwable e) {
t.printStackTrace(originalStdErr);
}
} finally {
runtime.halt(1);
runtime.halt(JVM_TERMINATED_STATUS_CODE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private FanOutStreamingEngineWorkerHarness(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build());
this.workerMetadataConsumer =
Executors.newSingleThreadScheduledExecutor(
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build());
this.getWorkBudgetDistributor = getWorkBudgetDistributor;
this.totalGetWorkBudget = totalGetWorkBudget;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
this.waitForResources = waitForResources;
this.computationStateFetcher = computationStateFetcher;
this.workProviderExecutor =
Executors.newSingleThreadScheduledExecutor(
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private StreamingApplianceWorkCommitter(
WeightedBoundedQueue.create(
MAX_COMMIT_QUEUE_BYTES, commit -> Math.min(MAX_COMMIT_QUEUE_BYTES, commit.getSize()));
this.commitWorkers =
Executors.newSingleThreadScheduledExecutor(
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MAX_PRIORITY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public GetWorkBudgetRefresher(
Supplier<Boolean> isBudgetRefreshPaused, Runnable redistributeBudget) {
this.budgetRefreshTrigger = new AdvancingPhaser(1);
this.budgetRefreshExecutor =
Executors.newSingleThreadScheduledExecutor(
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setNameFormat(BUDGET_REFRESH_THREAD)
.setUncaughtExceptionHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
*/
@Internal
@ThreadSafe
public final class StreamingWorkScheduler {
public class StreamingWorkScheduler {
private static final Logger LOG = LoggerFactory.getLogger(StreamingWorkScheduler.class);

private final DataflowWorkerHarnessOptions options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@

@RunWith(JUnit4.class)
public class StreamingEngineComputationConfigFetcherTest {

private final WorkUnitClient mockDataflowServiceClient =
mock(WorkUnitClient.class, new Returns(Optional.empty()));
private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.streaming.harness;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.beam.runners.dataflow.worker.WorkerUncaughtExceptionHandler;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import org.apache.beam.runners.dataflow.worker.util.common.worker.JvmRuntime;
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
public class SingleSourceWorkerHarnessTest {
private static final Logger LOG = LoggerFactory.getLogger(SingleSourceWorkerHarnessTest.class);
private final WorkCommitter workCommitter = mock(WorkCommitter.class);
private final GetDataClient getDataClient = mock(GetDataClient.class);
private final HeartbeatSender heartbeatSender = mock(HeartbeatSender.class);
private final Runnable waitForResources = () -> {};
private final Function<String, Optional<ComputationState>> computationStateFetcher =
ignored -> Optional.empty();
private final StreamingWorkScheduler streamingWorkScheduler = mock(StreamingWorkScheduler.class);

private SingleSourceWorkerHarness createWorkerHarness(
SingleSourceWorkerHarness.GetWorkSender getWorkSender, JvmRuntime runtime) {
// In non-test scenario this is set in DataflowWorkerHarnessHelper.initializeLogging(...).
Thread.setDefaultUncaughtExceptionHandler(new WorkerUncaughtExceptionHandler(runtime, LOG));
return SingleSourceWorkerHarness.builder()
.setWorkCommitter(workCommitter)
.setGetDataClient(getDataClient)
.setHeartbeatSender(heartbeatSender)
.setWaitForResources(waitForResources)
.setStreamingWorkScheduler(streamingWorkScheduler)
.setComputationStateFetcher(computationStateFetcher)
.setGetWorkSender(getWorkSender)
.build();
}

@Test
public void testDispatchLoop_unexpectedFailureKillsJvm_appliance() {
SingleSourceWorkerHarness.GetWorkSender getWorkSender =
SingleSourceWorkerHarness.GetWorkSender.forAppliance(
() -> {
throw new RuntimeException("something bad happened");
});

FakeJvmRuntime fakeJvmRuntime = new FakeJvmRuntime();
createWorkerHarness(getWorkSender, fakeJvmRuntime).start();
assertTrue(fakeJvmRuntime.waitForRuntimeDeath(5, TimeUnit.SECONDS));
fakeJvmRuntime.assertJvmTerminated();
}

@Test
public void testDispatchLoop_unexpectedFailureKillsJvm_streamingEngine() {
SingleSourceWorkerHarness.GetWorkSender getWorkSender =
SingleSourceWorkerHarness.GetWorkSender.forStreamingEngine(
workItemReceiver -> {
throw new RuntimeException("something bad happened");
});

FakeJvmRuntime fakeJvmRuntime = new FakeJvmRuntime();
createWorkerHarness(getWorkSender, fakeJvmRuntime).start();
assertTrue(fakeJvmRuntime.waitForRuntimeDeath(5, TimeUnit.SECONDS));
fakeJvmRuntime.assertJvmTerminated();
}

private static class FakeJvmRuntime implements JvmRuntime {
private final CountDownLatch haltedLatch = new CountDownLatch(1);
private volatile int exitStatus = 0;

@Override
public void halt(int status) {
exitStatus = status;
haltedLatch.countDown();
}

public boolean waitForRuntimeDeath(long timeout, TimeUnit unit) {
try {
return haltedLatch.await(timeout, unit);
} catch (InterruptedException e) {
return false;
}
}

private void assertJvmTerminated() {
assertThat(exitStatus).isEqualTo(WorkerUncaughtExceptionHandler.JVM_TERMINATED_STATUS_CODE);
}
}
}

0 comments on commit 88ada9d

Please sign in to comment.