Skip to content

Commit

Permalink
Validate commits in StreamingDataflowWorker (#31822)
Browse files Browse the repository at this point in the history
* Grabs operational limits from streaming config and plumbs them to WindmillSink, which can then throw an exception or log a warning if outputs are too large.
  • Loading branch information
acrites authored Jul 30, 2024
1 parent 17ef888 commit 89d5e2f
Show file tree
Hide file tree
Showing 12 changed files with 311 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;

import com.google.auto.value.AutoBuilder;

/** Keep track of any operational limits required by the backend. */
public class OperationalLimits {
// Maximum size of a commit from a single work item.
public final long maxWorkItemCommitBytes;
// Maximum size of a single output element's serialized key.
public final long maxOutputKeyBytes;
// Maximum size of a single output element's serialized value.
public final long maxOutputValueBytes;
// Whether to throw an exception when processing output that violates any of the given limits.
public final boolean throwExceptionOnLargeOutput;

OperationalLimits(
long maxWorkItemCommitBytes,
long maxOutputKeyBytes,
long maxOutputValueBytes,
boolean throwExceptionOnLargeOutput) {
this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
this.maxOutputKeyBytes = maxOutputKeyBytes;
this.maxOutputValueBytes = maxOutputValueBytes;
this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput;
}

@AutoBuilder(ofClass = OperationalLimits.class)
public interface Builder {
Builder setMaxWorkItemCommitBytes(long bytes);

Builder setMaxOutputKeyBytes(long bytes);

Builder setMaxOutputValueBytes(long bytes);

Builder setThrowExceptionOnLargeOutput(boolean shouldThrow);

OperationalLimits build();
}

public static Builder builder() {
return new AutoBuilder_OperationalLimits_Builder()
.setMaxWorkItemCommitBytes(Long.MAX_VALUE)
.setMaxOutputKeyBytes(Long.MAX_VALUE)
.setMaxOutputValueBytes(Long.MAX_VALUE)
.setThrowExceptionOnLargeOutput(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;

import org.checkerframework.checker.nullness.qual.Nullable;

/** Indicates that an output element was too large. */
public class OutputTooLargeException extends RuntimeException {
public OutputTooLargeException(String reason) {
super(reason);
}

/** Returns whether an exception was caused by a {@link OutputTooLargeException}. */
public static boolean isCausedByOutputTooLargeException(@Nullable Throwable t) {
while (t != null) {
if (t instanceof OutputTooLargeException) {
return true;
}
t = t.getCause();
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -177,7 +177,7 @@ private StreamingDataflowWorker(
WorkFailureProcessor workFailureProcessor,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
AtomicInteger maxWorkItemCommitBytes,
AtomicReference<OperationalLimits> operationalLimits,
GrpcWindmillStreamFactory windmillStreamFactory,
Function<String, ScheduledExecutorService> executorSupplier,
ConcurrentMap<String, StageInfo> stageInfoMap) {
Expand Down Expand Up @@ -296,15 +296,14 @@ private StreamingDataflowWorker(
streamingCounters,
hotKeyLogger,
sampler,
maxWorkItemCommitBytes,
operationalLimits,
ID_GENERATOR,
stageInfoMap);

LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled);
LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint());
LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes.get());
}

public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) {
Expand All @@ -314,7 +313,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
StreamingCounters streamingCounters = StreamingCounters.create();
WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, LOG);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
AtomicInteger maxWorkItemCommitBytes = new AtomicInteger(Integer.MAX_VALUE);
AtomicReference<OperationalLimits> operationalLimits =
new AtomicReference<>(OperationalLimits.builder().build());
WindmillStateCache windmillStateCache =
WindmillStateCache.builder()
.setSizeMb(options.getWorkerCacheMb())
Expand All @@ -332,7 +332,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
createConfigFetcherComputationStateCacheAndWindmillClient(
options,
dataflowServiceClient,
maxWorkItemCommitBytes,
operationalLimits,
windmillStreamFactoryBuilder,
configFetcher ->
ComputationStateCache.create(
Expand Down Expand Up @@ -390,7 +390,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
workFailureProcessor,
streamingCounters,
memoryMonitor,
maxWorkItemCommitBytes,
operationalLimits,
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
executorSupplier,
stageInfo);
Expand All @@ -406,7 +406,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
createConfigFetcherComputationStateCacheAndWindmillClient(
DataflowWorkerHarnessOptions options,
WorkUnitClient dataflowServiceClient,
AtomicInteger maxWorkItemCommitBytes,
AtomicReference<OperationalLimits> operationalLimits,
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder,
Function<ComputationConfig.Fetcher, ComputationStateCache> computationStateCacheFactory) {
ComputationConfig.Fetcher configFetcher;
Expand All @@ -422,8 +422,9 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
config ->
onPipelineConfig(
config,
options,
dispatcherClient::consumeWindmillDispatcherEndpoints,
maxWorkItemCommitBytes));
operationalLimits::set));
computationStateCache = computationStateCacheFactory.apply(configFetcher);
windmillStreamFactory =
windmillStreamFactoryBuilder
Expand Down Expand Up @@ -469,9 +470,9 @@ static StreamingDataflowWorker forTesting(
Supplier<Instant> clock,
Function<String, ScheduledExecutorService> executorSupplier,
int localRetryTimeoutMs,
int maxWorkItemCommitBytesOverrides) {
OperationalLimits limits) {
ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
AtomicInteger maxWorkItemCommitBytes = new AtomicInteger(maxWorkItemCommitBytesOverrides);
AtomicReference<OperationalLimits> operationalLimits = new AtomicReference<>(limits);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
WindmillStateCache stateCache =
WindmillStateCache.builder()
Expand All @@ -488,8 +489,9 @@ static StreamingDataflowWorker forTesting(
config ->
onPipelineConfig(
config,
options,
windmillServer::setWindmillServiceEndpoints,
maxWorkItemCommitBytes))
operationalLimits::set))
: new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
ConcurrentMap<String, String> stateNameMap =
new ConcurrentHashMap<>(prePopulatedStateNameMappings);
Expand Down Expand Up @@ -557,7 +559,7 @@ static StreamingDataflowWorker forTesting(
workFailureProcessor,
streamingCounters,
memoryMonitor,
maxWorkItemCommitBytes,
operationalLimits,
options.isEnableStreamingEngine()
? windmillStreamFactory
.setHealthCheckIntervalMillis(
Expand All @@ -570,12 +572,18 @@ static StreamingDataflowWorker forTesting(

private static void onPipelineConfig(
StreamingEnginePipelineConfig config,
DataflowWorkerHarnessOptions options,
Consumer<ImmutableSet<HostAndPort>> consumeWindmillServiceEndpoints,
AtomicInteger maxWorkItemCommitBytes) {
if (config.maxWorkItemCommitBytes() != maxWorkItemCommitBytes.get()) {
LOG.info("Setting maxWorkItemCommitBytes to {}", maxWorkItemCommitBytes);
maxWorkItemCommitBytes.set((int) config.maxWorkItemCommitBytes());
}
Consumer<OperationalLimits> operationalLimits) {

operationalLimits.accept(
OperationalLimits.builder()
.setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes())
.setMaxOutputKeyBytes(config.maxOutputKeyBytes())
.setMaxOutputValueBytes(config.maxOutputValueBytes())
.setThrowExceptionOnLargeOutput(
DataflowRunner.hasExperiment(options, "throw_exceptions_on_large_output"))
.build());

if (!config.windmillServiceEndpoints().isEmpty()) {
consumeWindmillServiceEndpoints.accept(config.windmillServiceEndpoints());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
private Work work;
private WindmillComputationKey computationKey;
private SideInputStateFetcher sideInputStateFetcher;
// OperationalLimits is updated in start() because a StreamingModeExecutionContext can
// be used for processing many work items and these values can change during the context's
// lifetime. start() is called for each work item.
private OperationalLimits operationalLimits;
private Windmill.WorkItemCommitRequest.Builder outputBuilder;

/**
Expand Down Expand Up @@ -168,6 +172,18 @@ public final long getBacklogBytes() {
return backlogBytes;
}

public long getMaxOutputKeyBytes() {
return operationalLimits.maxOutputKeyBytes;
}

public long getMaxOutputValueBytes() {
return operationalLimits.maxOutputValueBytes;
}

public boolean throwExceptionsForLargeOutput() {
return operationalLimits.throwExceptionOnLargeOutput;
}

public boolean workIsFailed() {
return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
}
Expand All @@ -177,11 +193,13 @@ public void start(
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
OperationalLimits operationalLimits,
Windmill.WorkItemCommitRequest.Builder outputBuilder) {
this.key = key;
this.work = work;
this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey());
this.sideInputStateFetcher = sideInputStateFetcher;
this.operationalLimits = operationalLimits;
this.outputBuilder = outputBuilder;
this.sideInputCache.clear();
clearSinkFullHint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
Expand All @@ -54,6 +56,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
private final Coder<T> valueCoder;
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
private StreamingModeExecutionContext context;
private static final Logger LOG = LoggerFactory.getLogger(WindmillSink.class);

WindmillSink(
String destinationName,
Expand Down Expand Up @@ -172,6 +175,28 @@ public long add(WindowedValue<T> data) throws IOException {
key = context.getSerializedKey();
value = encode(valueCoder, data.getValue());
}
if (key.size() > context.getMaxOutputKeyBytes()) {
if (context.throwExceptionsForLargeOutput()) {
throw new OutputTooLargeException("Key too large: " + key.size());
} else {
LOG.error(
"Trying to output too large key with size "
+ key.size()
+ ". Limit is "
+ context.getMaxOutputKeyBytes());
}
}
if (value.size() > context.getMaxOutputValueBytes()) {
if (context.throwExceptionsForLargeOutput()) {
throw new OutputTooLargeException("Value too large: " + value.size());
} else {
LOG.error(
"Trying to output too large value with size "
+ value.size()
+ ". Limit is "
+ context.getMaxOutputValueBytes());
}
}

Windmill.KeyedMessageBundle.Builder keyedOutput = productionMap.get(key);
if (keyedOutput == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
import org.apache.beam.runners.dataflow.worker.OperationalLimits;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
Expand All @@ -45,7 +46,7 @@
* @implNote Once closed, it cannot be reused.
*/
// TODO(m-trieu): See if this can be combined/cleaned up with StreamingModeExecutionContext as the
// seperation of responsibilities are unclear.
// separation of responsibilities are unclear.
@AutoValue
@Internal
@NotThreadSafe
Expand All @@ -72,9 +73,11 @@ public final void executeWork(
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
OperationalLimits operationalLimits,
Windmill.WorkItemCommitRequest.Builder outputBuilder)
throws Exception {
context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder);
context()
.start(key, work, stateReader, sideInputStateFetcher, operationalLimits, outputBuilder);
workExecutor().execute();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private static Optional<StreamingConfigTask> fetchConfigWithRetry(
}
}

private static StreamingEnginePipelineConfig createPipelineConfig(StreamingConfigTask config) {
private StreamingEnginePipelineConfig createPipelineConfig(StreamingConfigTask config) {
StreamingEnginePipelineConfig.Builder pipelineConfig = StreamingEnginePipelineConfig.builder();
if (config.getUserStepToStateFamilyNameMap() != null) {
pipelineConfig.setUserStepToStateFamilyNameMap(config.getUserStepToStateFamilyNameMap());
Expand Down Expand Up @@ -187,6 +187,18 @@ private static StreamingEnginePipelineConfig createPipelineConfig(StreamingConfi
pipelineConfig.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue());
}

if (config.getOperationalLimits() != null) {
if (config.getOperationalLimits().getMaxKeyBytes() > 0
&& config.getOperationalLimits().getMaxKeyBytes() <= Integer.MAX_VALUE) {
pipelineConfig.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes());
}
if (config.getOperationalLimits().getMaxProductionOutputBytes() > 0
&& config.getOperationalLimits().getMaxProductionOutputBytes() <= Integer.MAX_VALUE) {
pipelineConfig.setMaxOutputValueBytes(
config.getOperationalLimits().getMaxProductionOutputBytes());
}
}

return pipelineConfig.build();
}

Expand Down Expand Up @@ -273,7 +285,7 @@ private synchronized void fetchInitialPipelineGlobalConfig() {

private Optional<StreamingEnginePipelineConfig> fetchGlobalConfig() {
return fetchConfigWithRetry(dataflowServiceClient::getGlobalStreamingConfigWorkItem)
.map(StreamingEngineComputationConfigFetcher::createPipelineConfig);
.map(config -> createPipelineConfig(config));
}

@FunctionalInterface
Expand Down
Loading

0 comments on commit 89d5e2f

Please sign in to comment.