Skip to content

Commit

Permalink
Support poisioning instruction ids to prevent the FnApi data stream f…
Browse files Browse the repository at this point in the history
…rom blocking on failed instructions (#32857)
  • Loading branch information
scwhittle authored Nov 12, 2024
1 parent 43d27ed commit 785ec07
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
*/
package org.apache.beam.sdk.fn.data;

import java.time.Duration;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
Expand All @@ -30,6 +32,8 @@
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand All @@ -49,13 +53,20 @@
*/
public class BeamFnDataGrpcMultiplexer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer.class);
private static final Duration POISONED_INSTRUCTION_ID_CACHE_TIMEOUT = Duration.ofMinutes(20);
private final Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor;
private final StreamObserver<BeamFnApi.Elements> inboundObserver;
private final StreamObserver<BeamFnApi.Elements> outboundObserver;
private final ConcurrentMap<
private final ConcurrentHashMap<
/*instructionId=*/ String, CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>>>
receivers;
private final ConcurrentMap<String, Boolean> erroredInstructionIds;
private final Cache</*instructionId=*/ String, /*unused=*/ Boolean> poisonedInstructionIds;

private static class PoisonedException extends RuntimeException {
public PoisonedException() {
super("Instruction poisoned");
}
};

public BeamFnDataGrpcMultiplexer(
Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor,
Expand All @@ -64,7 +75,8 @@ public BeamFnDataGrpcMultiplexer(
baseOutboundObserverFactory) {
this.apiServiceDescriptor = apiServiceDescriptor;
this.receivers = new ConcurrentHashMap<>();
this.erroredInstructionIds = new ConcurrentHashMap<>();
this.poisonedInstructionIds =
CacheBuilder.newBuilder().expireAfterWrite(POISONED_INSTRUCTION_ID_CACHE_TIMEOUT).build();
this.inboundObserver = new InboundObserver();
this.outboundObserver =
outboundObserverFactory.outboundObserverFor(baseOutboundObserverFactory, inboundObserver);
Expand All @@ -87,29 +99,70 @@ public StreamObserver<BeamFnApi.Elements> getOutboundObserver() {
return outboundObserver;
}

private CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture(
String instructionId) {
return receivers.computeIfAbsent(instructionId, (unused) -> new CompletableFuture<>());
}

/**
* Registers a consumer for the specified instruction id.
*
* <p>The {@link BeamFnDataGrpcMultiplexer} partitions {@link BeamFnApi.Elements} with multiple
* instruction ids ensuring that the receiver will only see {@link BeamFnApi.Elements} with a
* single instruction id.
*
* <p>The caller must {@link #unregisterConsumer unregister the consumer} when they no longer wish
* to receive messages.
* <p>The caller must either {@link #unregisterConsumer unregister the consumer} when all messages
* have been processed or {@link #poisonInstructionId(String) poison the instruction} if messages
* for the instruction should be dropped.
*/
public void registerConsumer(
String instructionId, CloseableFnDataReceiver<BeamFnApi.Elements> receiver) {
receiverFuture(instructionId).complete(receiver);
receivers.compute(
instructionId,
(unused, existing) -> {
if (existing != null) {
if (!existing.complete(receiver)) {
throw new IllegalArgumentException("Instruction id was registered twice");
}
return existing;
}
if (poisonedInstructionIds.getIfPresent(instructionId) != null) {
throw new IllegalArgumentException("Instruction id was poisoned");
}
return CompletableFuture.completedFuture(receiver);
});
}

/** Unregisters a consumer. */
/** Unregisters a previously registered consumer. */
public void unregisterConsumer(String instructionId) {
receivers.remove(instructionId);
@Nullable
CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture =
receivers.remove(instructionId);
if (receiverFuture != null && !receiverFuture.isDone()) {
// The future must have been inserted by the inbound observer since registerConsumer completes
// the future.
throw new IllegalArgumentException("Unregistering consumer which was not registered.");
}
}

/**
* Poisons an instruction id.
*
* <p>Any records for the instruction on the inbound observer will be dropped for the next {@link
* #POISONED_INSTRUCTION_ID_CACHE_TIMEOUT}.
*/
public void poisonInstructionId(String instructionId) {
poisonedInstructionIds.put(instructionId, Boolean.TRUE);
@Nullable
CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture =
receivers.remove(instructionId);
if (receiverFuture != null) {
// Completing exceptionally has no effect if the future was already notified. In that case
// whatever registered the receiver needs to handle cancelling it.
receiverFuture.completeExceptionally(new PoisonedException());
if (!receiverFuture.isCompletedExceptionally()) {
try {
receiverFuture.get().close();
} catch (Exception e) {
LOG.warn("Unexpected error closing existing observer");
}
}
}
}

@VisibleForTesting
Expand Down Expand Up @@ -210,27 +263,42 @@ public void onNext(BeamFnApi.Elements value) {
}

private void forwardToConsumerForInstructionId(String instructionId, BeamFnApi.Elements value) {
if (erroredInstructionIds.containsKey(instructionId)) {
LOG.debug("Ignoring inbound data for failed instruction {}", instructionId);
return;
}
CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> consumerFuture =
receiverFuture(instructionId);
if (!consumerFuture.isDone()) {
LOG.debug(
"Received data for instruction {} without consumer ready. "
+ "Waiting for consumer to be registered.",
instructionId);
}
CloseableFnDataReceiver<BeamFnApi.Elements> consumer;
try {
consumer = consumerFuture.get();

CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> consumerFuture =
receivers.computeIfAbsent(
instructionId,
(unused) -> {
if (poisonedInstructionIds.getIfPresent(instructionId) != null) {
throw new PoisonedException();
}
LOG.debug(
"Received data for instruction {} without consumer ready. "
+ "Waiting for consumer to be registered.",
instructionId);
return new CompletableFuture<>();
});
// The consumer may not be registered until the bundle processor is fully constructed so we
// conservatively set
// a high timeout. Poisoning will prevent this for occurring for consumers that will not be
// registered.
consumer = consumerFuture.get(3, TimeUnit.HOURS);
/*
* TODO: On failure we should fail any bundles that were impacted eagerly
* instead of relying on the Runner harness to do all the failure handling.
*/
} catch (ExecutionException | InterruptedException e) {
} catch (TimeoutException e) {
LOG.error(
"Timed out waiting to observe consumer data stream for instruction {}",
instructionId,
e);
outboundObserver.onError(e);
return;
} catch (ExecutionException | InterruptedException | PoisonedException e) {
if (e instanceof PoisonedException || e.getCause() instanceof PoisonedException) {
LOG.debug("Received data for poisoned instruction {}. Dropping input.", instructionId);
return;
}
LOG.error(
"Client interrupted during handling of data for instruction {}", instructionId, e);
outboundObserver.onError(e);
Expand All @@ -240,10 +308,11 @@ private void forwardToConsumerForInstructionId(String instructionId, BeamFnApi.E
outboundObserver.onError(e);
return;
}

try {
consumer.accept(value);
} catch (Exception e) {
erroredInstructionIds.put(instructionId, true);
poisonInstructionId(instructionId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ public void testFailedProcessingCausesAdditionalInboundDataToBeIgnored() throws
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
final AtomicBoolean closed = new AtomicBoolean();
multiplexer.registerConsumer(
DATA_INSTRUCTION_ID,
new CloseableFnDataReceiver<BeamFnApi.Elements>() {
Expand All @@ -290,7 +291,7 @@ public void flush() throws Exception {

@Override
public void close() throws Exception {
fail("Unexpected call");
closed.set(true);
}

@Override
Expand Down Expand Up @@ -320,6 +321,7 @@ public void accept(BeamFnApi.Elements input) throws Exception {
dataInboundValues,
Matchers.contains(
BeamFnApi.Elements.newBuilder().addData(data.setTransformId("A").build()).build()));
assertTrue(closed.get());
}

@Test
Expand Down
Loading

0 comments on commit 785ec07

Please sign in to comment.