From cb91f37fcd77d2111922b3e104e6ae19442237e8 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Mon, 22 Jan 2024 22:50:39 -0500 Subject: [PATCH] feat: implement reduce stream sdk (#91) * Define the user interface as two methods - processMessage and handleEndOfStream. * Continue using akka to handle concurrency. Introduce an OutputActor to make sure we are sending responses back to the gRPC output stream synchronously. Signed-off-by: Keran Yang --- examples/pom.xml | 17 ++ .../reducestreamer/sum/SumFactory.java | 23 ++ .../reducestreamer/sum/SumFunction.java | 43 +++ .../numaflow/reducestreamer/ActorRequest.java | 30 ++ .../reducestreamer/ActorResponse.java | 32 +++ .../numaflow/reducestreamer/Constants.java | 13 + .../numaflow/reducestreamer/GRPCConfig.java | 26 ++ .../numaflow/reducestreamer/HandlerDatum.java | 28 ++ .../reducestreamer/IntervalWindowImpl.java | 22 ++ .../numaflow/reducestreamer/MetadataImpl.java | 15 + .../numaflow/reducestreamer/OutputActor.java | 48 ++++ .../OutputStreamObserverImpl.java | 49 ++++ .../reducestreamer/ReduceStreamerActor.java | 78 ++++++ .../numaflow/reducestreamer/Server.java | 112 ++++++++ .../numaflow/reducestreamer/Service.java | 131 +++++++++ .../reducestreamer/ShutdownActor.java | 65 +++++ .../reducestreamer/SupervisorActor.java | 182 ++++++++++++ .../numaflow/reducestreamer/model/Datum.java | 29 ++ .../reducestreamer/model/IntervalWindow.java | 22 ++ .../reducestreamer/model/Message.java | 56 ++++ .../reducestreamer/model/Metadata.java | 14 + .../model/OutputStreamObserver.java | 13 + .../reducestreamer/model/ReduceStreamer.java | 36 +++ .../model/ReduceStreamerFactory.java | 13 + .../numaflow/reducer/ShutDownActorTest.java | 10 +- ...ctorTest.java => SupervisorActorTest.java} | 14 +- .../reducestreamer/GRPCConfigTest.java | 39 +++ .../ReduceOutputStreamObserver.java | 38 +++ .../reducestreamer/ServerErrTest.java | 179 ++++++++++++ .../numaflow/reducestreamer/ServerTest.java | 261 ++++++++++++++++++ .../reducestreamer/ShutdownActorTest.java | 141 ++++++++++ .../reducestreamer/SupervisorActorTest.java | 170 ++++++++++++ 32 files changed, 1937 insertions(+), 12 deletions(-) create mode 100644 examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFactory.java create mode 100644 examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/ActorRequest.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/ActorResponse.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/HandlerDatum.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/IntervalWindowImpl.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/MetadataImpl.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/Server.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/Service.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/ShutdownActor.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/model/Datum.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/model/IntervalWindow.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/model/Message.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/model/Metadata.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/model/OutputStreamObserver.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/model/ReduceStreamer.java create mode 100644 src/main/java/io/numaproj/numaflow/reducestreamer/model/ReduceStreamerFactory.java rename src/test/java/io/numaproj/numaflow/reducer/{ReduceSupervisorActorTest.java => SupervisorActorTest.java} (93%) create mode 100644 src/test/java/io/numaproj/numaflow/reducestreamer/GRPCConfigTest.java create mode 100644 src/test/java/io/numaproj/numaflow/reducestreamer/ReduceOutputStreamObserver.java create mode 100644 src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java create mode 100644 src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java create mode 100644 src/test/java/io/numaproj/numaflow/reducestreamer/ShutdownActorTest.java create mode 100644 src/test/java/io/numaproj/numaflow/reducestreamer/SupervisorActorTest.java diff --git a/examples/pom.xml b/examples/pom.xml index 8696e5d6..63df86f2 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -134,6 +134,23 @@ + + reduce-stream-sum + package + + dockerBuild + + + + + io.numaproj.numaflow.examples.reducestreamer.sum.SumFactory + + + + numaflow-java-examples/reduce-stream-sum + + + diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFactory.java b/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFactory.java new file mode 100644 index 00000000..5faa39d5 --- /dev/null +++ b/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFactory.java @@ -0,0 +1,23 @@ +package io.numaproj.numaflow.examples.reducestreamer.sum; + +import io.numaproj.numaflow.reducestreamer.Server; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory; +import lombok.extern.slf4j.Slf4j; + +/** + * SumFactory extends ReduceStreamerFactory to support creating instances of SumFunction. + * It also provides a main function to start a server for handling the reduce stream. + */ +@Slf4j +public class SumFactory extends ReduceStreamerFactory { + + public static void main(String[] args) throws Exception { + log.info("sum udf was invoked"); + new Server(new SumFactory()).start(); + } + + @Override + public SumFunction createReduceStreamer() { + return new SumFunction(); + } +} diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java new file mode 100644 index 00000000..77bfcba7 --- /dev/null +++ b/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java @@ -0,0 +1,43 @@ +package io.numaproj.numaflow.examples.reducestreamer.sum; + +import io.numaproj.numaflow.reducestreamer.model.Message; +import io.numaproj.numaflow.reducestreamer.model.Metadata; +import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; +import lombok.extern.slf4j.Slf4j; + +/** + * SumFunction is a User Defined Reduce Stream Function example which sums up the values for the given keys + * and outputs the sum when the sum is greater than 100. + * When the input stream closes, the function outputs the sum no matter what value it holds. + */ +@Slf4j +public class SumFunction extends ReduceStreamer { + + private int sum = 0; + + @Override + public void processMessage( + String[] keys, + io.numaproj.numaflow.reducestreamer.model.Datum datum, + OutputStreamObserver outputStreamObserver, + io.numaproj.numaflow.reducestreamer.model.Metadata md) { + try { + sum += Integer.parseInt(new String(datum.getValue())); + } catch (NumberFormatException e) { + log.info("error while parsing integer - {}", e.getMessage()); + } + if (sum >= 100) { + outputStreamObserver.send(new Message(String.valueOf(sum).getBytes())); + sum = 0; + } + } + + @Override + public void handleEndOfStream( + String[] keys, + OutputStreamObserver outputStreamObserver, + Metadata md) { + outputStreamObserver.send(new Message(String.valueOf(sum).getBytes())); + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ActorRequest.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ActorRequest.java new file mode 100644 index 00000000..831bf667 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ActorRequest.java @@ -0,0 +1,30 @@ +package io.numaproj.numaflow.reducestreamer; + +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * ActorRequest is a wrapper of the gRpc input request. + * It is constructed by the service when service receives an input request and then sent to + * the supervisor actor, to be distributed to reduce streamer actors. + */ +@Getter +@AllArgsConstructor +class ActorRequest { + ReduceOuterClass.ReduceRequest request; + + // TODO - do we need to include window information in the id? + // for aligned reducer, there is always single window. + // but at the same time, would like to be consistent with GO SDK implementation. + // we will revisit this one later. + public String getUniqueIdentifier() { + return String.join( + Constants.DELIMITER, + this.getRequest().getPayload().getKeysList().toArray(new String[0])); + } + + public String[] getKeySet() { + return this.getRequest().getPayload().getKeysList().toArray(new String[0]); + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ActorResponse.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ActorResponse.java new file mode 100644 index 00000000..3a73509c --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ActorResponse.java @@ -0,0 +1,32 @@ +package io.numaproj.numaflow.reducestreamer; + +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +/** + * The actor response holds the final EOF response for a particular key set. + *

+ * The isLast attribute indicates whether the response is globally the last one to be sent to + * the output gRPC stream, if set to true, it means the response is the very last response among + * all key sets. When output stream actor receives an isLast response, it sends the response and immediately + * closes the output stream. + */ +@Getter +@Setter +@AllArgsConstructor +class ActorResponse { + ReduceOuterClass.ReduceResponse response; + boolean isLast; + + // TODO - do we need to include window information in the id? + // for aligned reducer, there is always single window. + // but at the same time, would like to be consistent with GO SDK implementation. + // we will revisit this one later. + public String getActorUniqueIdentifier() { + return String.join( + Constants.DELIMITER, + this.getResponse().getResult().getKeysList().toArray(new String[0])); + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java new file mode 100644 index 00000000..16746dab --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java @@ -0,0 +1,13 @@ +package io.numaproj.numaflow.reducestreamer; + +class Constants { + public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64; + + public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/reducestream.sock"; + + public static final String EOF = "EOF"; + + public static final String SUCCESS = "SUCCESS"; + + public static final String DELIMITER = ":"; +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java new file mode 100644 index 00000000..5f5a1137 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java @@ -0,0 +1,26 @@ +package io.numaproj.numaflow.reducestreamer; + +import io.numaproj.numaflow.info.ServerInfoAccessor; +import lombok.Builder; +import lombok.Getter; + +/** + * GRPCConfig is used to provide configurations for gRPC server. + */ +@Getter +@Builder(builderMethodName = "newBuilder") +public class GRPCConfig { + private String socketPath; + private int maxMessageSize; + private String infoFilePath; + + /** + * Static method to create default GRPCConfig. + */ + static GRPCConfig defaultGrpcConfig() { + return GRPCConfig.newBuilder() + .infoFilePath(ServerInfoAccessor.DEFAULT_SERVER_INFO_FILE_PATH) + .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .socketPath(Constants.DEFAULT_SOCKET_PATH).build(); + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/reducestreamer/HandlerDatum.java new file mode 100644 index 00000000..a36df0ce --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/HandlerDatum.java @@ -0,0 +1,28 @@ +package io.numaproj.numaflow.reducestreamer; + +import io.numaproj.numaflow.reducestreamer.model.Datum; +import lombok.AllArgsConstructor; + +import java.time.Instant; + +@AllArgsConstructor +class HandlerDatum implements Datum { + private byte[] value; + private Instant watermark; + private Instant eventTime; + + @Override + public Instant getWatermark() { + return this.watermark; + } + + @Override + public byte[] getValue() { + return this.value; + } + + @Override + public Instant getEventTime() { + return this.eventTime; + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/IntervalWindowImpl.java b/src/main/java/io/numaproj/numaflow/reducestreamer/IntervalWindowImpl.java new file mode 100644 index 00000000..a4f75369 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/IntervalWindowImpl.java @@ -0,0 +1,22 @@ +package io.numaproj.numaflow.reducestreamer; + +import io.numaproj.numaflow.reducestreamer.model.IntervalWindow; +import lombok.AllArgsConstructor; + +import java.time.Instant; + +@AllArgsConstructor +class IntervalWindowImpl implements IntervalWindow { + private final Instant startTime; + private final Instant endTime; + + @Override + public Instant getStartTime() { + return this.startTime; + } + + @Override + public Instant getEndTime() { + return this.endTime; + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/MetadataImpl.java b/src/main/java/io/numaproj/numaflow/reducestreamer/MetadataImpl.java new file mode 100644 index 00000000..2e8f7da9 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/MetadataImpl.java @@ -0,0 +1,15 @@ +package io.numaproj.numaflow.reducestreamer; + +import io.numaproj.numaflow.reducestreamer.model.IntervalWindow; +import io.numaproj.numaflow.reducestreamer.model.Metadata; +import lombok.AllArgsConstructor; + +@AllArgsConstructor +class MetadataImpl implements Metadata { + private final IntervalWindow intervalWindow; + + @Override + public IntervalWindow getIntervalWindow() { + return intervalWindow; + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java new file mode 100644 index 00000000..afa4e724 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java @@ -0,0 +1,48 @@ +package io.numaproj.numaflow.reducestreamer; + +import akka.actor.AbstractActor; +import akka.actor.Props; +import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * Output actor is a wrapper around the gRPC output stream. + * It ensures synchronized calls to the responseObserver onNext() and invokes onComplete at the end of the stream. + * ALL reduce responses are sent to the response stream actor before getting forwarded to the output gRPC stream. + *

+ * More details about gRPC StreamObserver concurrency: https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html + */ +@Slf4j +@AllArgsConstructor +class OutputActor extends AbstractActor { + StreamObserver responseObserver; + + public static Props props( + StreamObserver responseObserver) { + return Props.create(OutputActor.class, responseObserver); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(ActorResponse.class, this::handleResponse) + .build(); + } + + private void handleResponse(ActorResponse actorResponse) { + if (actorResponse.isLast()) { + // send the very last response. + responseObserver.onNext(actorResponse.getResponse()); + // close the output stream. + responseObserver.onCompleted(); + // stop the AKKA system right after we close the output stream. + // note: could make more sense if the supervisor actor stops the system, + // but it requires an extra tell. + getContext().getSystem().stop(getSender()); + } else { + responseObserver.onNext(actorResponse.getResponse()); + } + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java new file mode 100644 index 00000000..9a37726d --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java @@ -0,0 +1,49 @@ +package io.numaproj.numaflow.reducestreamer; + +import akka.actor.ActorRef; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import io.numaproj.numaflow.reducestreamer.model.Message; +import io.numaproj.numaflow.reducestreamer.model.Metadata; +import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver; +import lombok.AllArgsConstructor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +@AllArgsConstructor +class OutputStreamObserverImpl implements OutputStreamObserver { + private final Metadata md; + private final ActorRef responseStreamActor; + + @Override + public void send(Message message) { + this.responseStreamActor.tell(buildResponse(message), ActorRef.noSender()); + } + + private ActorResponse buildResponse(Message message) { + ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder(); + // set the window using the actor metadata. + responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder() + .setStart(Timestamp.newBuilder() + .setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond()) + .setNanos(this.md.getIntervalWindow().getStartTime().getNano())) + .setEnd(Timestamp.newBuilder() + .setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond()) + .setNanos(this.md.getIntervalWindow().getEndTime().getNano())) + .setSlot("slot-0").build()); + responseBuilder.setEOF(false); + // set the result. + responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result + .newBuilder() + .setValue(ByteString.copyFrom(message.getValue())) + .addAllKeys(message.getKeys() + == null ? new ArrayList<>():Arrays.asList(message.getKeys())) + .addAllTags( + message.getTags() == null ? new ArrayList<>():List.of(message.getTags())) + .build()); + return new ActorResponse(responseBuilder.build(), false); + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.java new file mode 100644 index 00000000..27bc71f8 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.java @@ -0,0 +1,78 @@ +package io.numaproj.numaflow.reducestreamer; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.japi.pf.ReceiveBuilder; +import com.google.protobuf.Timestamp; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import io.numaproj.numaflow.reducestreamer.model.Metadata; +import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; +import lombok.AllArgsConstructor; + +import java.util.List; + +/** + * Reduce streamer actor invokes user defined functions to handle reduce requests. + * When receiving an input request, it invokes the processMessage to handle the datum. + * When receiving an EOF signal from the supervisor, it invokes the handleEndOfStream to execute + * the user-defined end of stream processing logics. + */ +@AllArgsConstructor +class ReduceStreamerActor extends AbstractActor { + private String[] keys; + private Metadata md; + private ReduceStreamer groupBy; + private OutputStreamObserver outputStream; + + public static Props props( + String[] keys, Metadata md, ReduceStreamer groupBy, ActorRef responseStreamActor) { + return Props.create( + ReduceStreamerActor.class, + keys, + md, + groupBy, + new OutputStreamObserverImpl(md, responseStreamActor)); + } + + @Override + public Receive createReceive() { + return ReceiveBuilder + .create() + .match(HandlerDatum.class, this::invokeHandler) + .match(String.class, this::sendEOF) + .build(); + } + + private void invokeHandler(HandlerDatum handlerDatum) { + this.groupBy.processMessage(keys, handlerDatum, outputStream, md); + } + + private void sendEOF(String EOF) { + // invoke handleEndOfStream to materialize the messages received so far. + this.groupBy.handleEndOfStream(keys, outputStream, md); + // construct an actor response and send it back to the supervisor actor, indicating the actor + // has finished processing all the messages for the corresponding key set. + getSender().tell(buildEOFResponse(), getSelf()); + } + + private ActorResponse buildEOFResponse() { + ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder(); + responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder() + .setStart(Timestamp.newBuilder() + .setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond()) + .setNanos(this.md.getIntervalWindow().getStartTime().getNano())) + .setEnd(Timestamp.newBuilder() + .setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond()) + .setNanos(this.md.getIntervalWindow().getEndTime().getNano())) + .setSlot("slot-0").build()); + responseBuilder.setEOF(true); + // set a dummy result with the keys. + responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result + .newBuilder() + .addAllKeys(List.of(this.keys)) + .build()); + return new ActorResponse(responseBuilder.build(), false); + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java new file mode 100644 index 00000000..cf21236f --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java @@ -0,0 +1,112 @@ +package io.numaproj.numaflow.reducestreamer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import io.grpc.ServerBuilder; +import io.numaproj.numaflow.info.ServerInfoAccessor; +import io.numaproj.numaflow.info.ServerInfoAccessorImpl; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory; +import io.numaproj.numaflow.shared.GrpcServerUtils; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.TimeUnit; + +/** + * Server is the gRPC server for executing reduce stream operation. + */ +@Slf4j +public class Server { + private final GRPCConfig grpcConfig; + private final Service service; + private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); + private io.grpc.Server server; + + /** + * constructor to create gRPC server. + * + * @param reduceStreamerFactory to process the message + */ + public Server(ReduceStreamerFactory reduceStreamerFactory) { + this(reduceStreamerFactory, GRPCConfig.defaultGrpcConfig()); + } + + /** + * constructor to create gRPC server with gRPC config. + * + * @param grpcConfig to configure the max message size for grpc + * @param reduceStreamerFactory to process the message + */ + public Server( + ReduceStreamerFactory reduceStreamerFactory, + GRPCConfig grpcConfig) { + this.service = new Service(reduceStreamerFactory); + this.grpcConfig = grpcConfig; + } + + /** + * Start serving requests. + * + * @throws Exception if server fails to start + */ + public void start() throws Exception { + GrpcServerUtils.writeServerInfo( + serverInfoAccessor, + grpcConfig.getSocketPath(), + grpcConfig.getInfoFilePath()); + + if (this.server == null) { + // create server builder + ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( + grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize()); + + // build server + this.server = serverBuilder + .addService(this.service) + .build(); + } + + // start server + server.start(); + + log.info( + "Server started, listening on socket path: " + + grpcConfig.getSocketPath()); + + // register shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + Server.this.stop(); + } catch (InterruptedException e) { + Thread.interrupted(); + e.printStackTrace(System.err); + } + })); + } + + /** + * Stop serving requests and shutdown resources. Await termination on the main thread since the + * grpc library uses daemon threads. + * + * @throws InterruptedException if shutdown is interrupted + */ + public void stop() throws InterruptedException { + if (server != null) { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + + /** + * Set server builder for testing. + * + * @param serverBuilder + */ + @VisibleForTesting + void setServerBuilder(ServerBuilder serverBuilder) { + this.server = serverBuilder + .addService(this.service) + .build(); + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java new file mode 100644 index 00000000..c8029469 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java @@ -0,0 +1,131 @@ +package io.numaproj.numaflow.reducestreamer; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.AllDeadLetters; +import com.google.protobuf.Empty; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.reduce.v1.ReduceGrpc; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import io.numaproj.numaflow.reducestreamer.model.IntervalWindow; +import io.numaproj.numaflow.reducestreamer.model.Metadata; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory; +import io.numaproj.numaflow.shared.GrpcServerUtils; +import lombok.extern.slf4j.Slf4j; + +import java.time.Instant; +import java.util.concurrent.CompletableFuture; + +import static io.numaproj.numaflow.reduce.v1.ReduceGrpc.getReduceFnMethod; + +@Slf4j +class Service extends ReduceGrpc.ReduceImplBase { + public static final ActorSystem reduceActorSystem = ActorSystem.create("reducestream"); + + private ReduceStreamerFactory reduceStreamerFactory; + + public Service(ReduceStreamerFactory reduceStreamerFactory) { + this.reduceStreamerFactory = reduceStreamerFactory; + } + + static void handleFailure( + CompletableFuture failureFuture, + StreamObserver responseObserver) { + new Thread(() -> { + try { + failureFuture.get(); + } catch (Exception e) { + e.printStackTrace(); + var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e); + responseObserver.onError(status.asException()); + } + }).start(); + } + + /** + * Streams input data to reduceFn and returns the result. + */ + @Override + public StreamObserver reduceFn(final StreamObserver responseObserver) { + if (this.reduceStreamerFactory == null) { + return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall( + getReduceFnMethod(), + responseObserver); + } + + // get window start and end time from gPRC metadata + String winSt = GrpcServerUtils.WINDOW_START_TIME.get(); + String winEt = GrpcServerUtils.WINDOW_END_TIME.get(); + + // convert the start and end time to Instant + Instant startTime = Instant.ofEpochMilli(Long.parseLong(winSt)); + Instant endTime = Instant.ofEpochMilli(Long.parseLong(winEt)); + + // create metadata + IntervalWindow iw = new IntervalWindowImpl(startTime, endTime); + Metadata md = new MetadataImpl(iw); + + CompletableFuture failureFuture = new CompletableFuture<>(); + + // create a shutdown actor that listens to exceptions. + ActorRef shutdownActorRef = reduceActorSystem. + actorOf(ShutdownActor.props(failureFuture)); + + // subscribe for dead letters + reduceActorSystem.getEventStream().subscribe(shutdownActorRef, AllDeadLetters.class); + + handleFailure(failureFuture, responseObserver); + + // create an output actor that ensures synchronized delivery of reduce responses. + ActorRef outputActor = reduceActorSystem. + actorOf(OutputActor.props(responseObserver)); + /* + create a supervisor actor which assign the tasks to child actors. + we create a child actor for every unique set of keys in a window. + */ + ActorRef supervisorActor = reduceActorSystem + .actorOf(SupervisorActor.props( + reduceStreamerFactory, + md, + shutdownActorRef, + outputActor)); + + + return new StreamObserver<>() { + @Override + public void onNext(ReduceOuterClass.ReduceRequest datum) { + // send the message to parent actor, which takes care of distribution. + if (!supervisorActor.isTerminated()) { + supervisorActor.tell(new ActorRequest(datum), ActorRef.noSender()); + } else { + responseObserver.onError(new Throwable("Supervisor actor was terminated")); + } + } + + @Override + public void onError(Throwable throwable) { + log.error("Error from the client - {}", throwable.getMessage()); + responseObserver.onError(throwable); + } + + @Override + public void onCompleted() { + // indicate the end of input to the supervisor + supervisorActor.tell(Constants.EOF, ActorRef.noSender()); + } + }; + } + + /** + * IsReady is the heartbeat endpoint for gRPC. + */ + @Override + public void isReady( + Empty request, + StreamObserver responseObserver) { + responseObserver.onNext(ReduceOuterClass.ReadyResponse.newBuilder().setReady(true).build()); + responseObserver.onCompleted(); + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ShutdownActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ShutdownActor.java new file mode 100644 index 00000000..6c690fcf --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ShutdownActor.java @@ -0,0 +1,65 @@ +package io.numaproj.numaflow.reducestreamer; + +import akka.actor.AbstractActor; +import akka.actor.AllDeadLetters; +import akka.actor.Props; +import akka.japi.pf.ReceiveBuilder; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +/** + * Shutdown actor, listens to exceptions and handles shutdown. + */ +@Slf4j +@AllArgsConstructor +class ShutdownActor extends AbstractActor { + private final CompletableFuture failureFuture; + + public static Props props( + CompletableFuture failureFuture) { + return Props.create(ShutdownActor.class, failureFuture); + } + + @Override + public void preRestart(Throwable reason, Optional message) { + failureFuture.completeExceptionally(reason); + } + + @Override + public Receive createReceive() { + return ReceiveBuilder + .create() + .match(Throwable.class, this::shutdown) + .match(String.class, this::completedSuccessfully) + .match(AllDeadLetters.class, this::handleDeadLetters) + .build(); + } + + /* + complete the future with exception so that the exception will be thrown + indicate that same to response observer. + */ + private void shutdown(Throwable throwable) { + log.debug("got a shut down exception"); + failureFuture.completeExceptionally(throwable); + } + + // if there are no exceptions, complete the future without exception. + private void completedSuccessfully(String eof) { + log.debug("completed successfully of shutdown executed"); + failureFuture.complete(null); + // if all the actors completed successfully, we can stop the shutdown actor. + getContext().getSystem().stop(getSelf()); + } + + // if we see dead letters, we need to stop the execution and exit + // to make sure no messages are lost + private void handleDeadLetters(AllDeadLetters deadLetter) { + log.debug("got a dead letter, stopping the execution"); + failureFuture.completeExceptionally(new Throwable("dead letters")); + getContext().getSystem().stop(getSelf()); + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java new file mode 100644 index 00000000..a4d782b4 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java @@ -0,0 +1,182 @@ +package io.numaproj.numaflow.reducestreamer; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ChildRestartStats; +import akka.actor.Props; +import akka.actor.SupervisorStrategy; +import akka.japi.pf.DeciderBuilder; +import akka.japi.pf.ReceiveBuilder; +import com.google.common.base.Preconditions; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import io.numaproj.numaflow.reducestreamer.model.Metadata; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory; +import lombok.extern.slf4j.Slf4j; +import scala.PartialFunction; +import scala.collection.Iterable; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Supervisor actor distributes the messages to other actors and handles failures. + */ +@Slf4j +class SupervisorActor extends AbstractActor { + private final ReduceStreamerFactory reduceStreamerFactory; + private final Metadata md; + private final ActorRef shutdownActor; + private final ActorRef responseStreamActor; + private final Map actorsMap = new HashMap<>(); + + public SupervisorActor( + ReduceStreamerFactory reduceStreamerFactory, + Metadata md, + ActorRef shutdownActor, + ActorRef responseStreamActor) { + this.reduceStreamerFactory = reduceStreamerFactory; + this.md = md; + this.shutdownActor = shutdownActor; + this.responseStreamActor = responseStreamActor; + } + + public static Props props( + ReduceStreamerFactory reduceStreamerFactory, + Metadata md, + ActorRef shutdownActor, + ActorRef responseStreamActor) { + return Props.create( + SupervisorActor.class, + reduceStreamerFactory, + md, + shutdownActor, + responseStreamActor); + } + + // if there is an uncaught exception stop in the supervisor actor, send a signal to shut down + @Override + public void preRestart(Throwable reason, Optional message) { + log.debug("supervisor pre restart was executed"); + shutdownActor.tell(reason, ActorRef.noSender()); + Service.reduceActorSystem.stop(getSelf()); + } + + @Override + public SupervisorStrategy supervisorStrategy() { + return new ReduceSupervisorStrategy(); + } + + + @Override + public void postStop() { + log.debug("post stop of supervisor executed - {}", getSelf().toString()); + shutdownActor.tell(Constants.SUCCESS, ActorRef.noSender()); + } + + @Override + public Receive createReceive() { + return ReceiveBuilder + .create() + .match(ActorRequest.class, this::invokeActor) + .match(String.class, this::sendEOF) + .match(ActorResponse.class, this::handleActorResponse) + .build(); + } + + /* + based on the keys of the input message invoke the right reduce streamer actor + if there is no actor for an incoming set of keys, create a new reduce streamer actor + track all the child actors using actors map + */ + private void invokeActor(ActorRequest actorRequest) { + String[] keys = actorRequest.getKeySet(); + String uniqueId = actorRequest.getUniqueIdentifier(); + if (!actorsMap.containsKey(uniqueId)) { + ReduceStreamer reduceStreamerHandler = reduceStreamerFactory.createReduceStreamer(); + ActorRef actorRef = getContext() + .actorOf(ReduceStreamerActor.props( + keys, + this.md, + reduceStreamerHandler, + this.responseStreamActor)); + actorsMap.put(uniqueId, actorRef); + } + HandlerDatum handlerDatum = constructHandlerDatum(actorRequest.getRequest().getPayload()); + actorsMap.get(uniqueId).tell(handlerDatum, getSelf()); + } + + private void sendEOF(String EOF) { + for (Map.Entry entry : actorsMap.entrySet()) { + entry.getValue().tell(EOF, getSelf()); + } + } + + private void handleActorResponse(ActorResponse actorResponse) { + // when the supervisor receives an actor response, it means the corresponding + // reduce streamer actor has finished its job. + // we remove the entry from the actors map. + actorsMap.remove(actorResponse.getActorUniqueIdentifier()); + if (actorsMap.isEmpty()) { + // since the actors map is empty, this particular actor response is the last response to forward to output gRPC stream. + actorResponse.setLast(true); + this.responseStreamActor.tell(actorResponse, getSelf()); + } else { + this.responseStreamActor.tell(actorResponse, getSelf()); + } + } + + private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payload payload) { + return new HandlerDatum( + payload.getValue().toByteArray(), + Instant.ofEpochSecond( + payload.getWatermark().getSeconds(), + payload.getWatermark().getNanos()), + Instant.ofEpochSecond( + payload.getEventTime().getSeconds(), + payload.getEventTime().getNanos()) + ); + } + + /* + We need supervisor to handle failures, by default if there are any failures + actors will be restarted, but we want to escalate the exception and terminate + the system. + */ + private final class ReduceSupervisorStrategy extends SupervisorStrategy { + + @Override + public PartialFunction decider() { + return DeciderBuilder.match(Exception.class, e -> SupervisorStrategy.stop()).build(); + } + + @Override + public void handleChildTerminated( + akka.actor.ActorContext context, + ActorRef child, + Iterable children) { + + } + + @Override + public void processFailure( + akka.actor.ActorContext context, + boolean restart, + ActorRef child, + Throwable cause, + ChildRestartStats stats, + Iterable children) { + + Preconditions.checkArgument( + !restart, + "on failures, we will never restart our actors, we escalate"); + /* + tell the shutdown actor about the exception. + */ + log.debug("process failure of supervisor strategy executed - {}", getSelf().toString()); + shutdownActor.tell(cause, context.parent()); + } + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/model/Datum.java b/src/main/java/io/numaproj/numaflow/reducestreamer/model/Datum.java new file mode 100644 index 00000000..a590a1ce --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/model/Datum.java @@ -0,0 +1,29 @@ +package io.numaproj.numaflow.reducestreamer.model; + +import java.time.Instant; + +/** + * Datum contains methods to get the payload information. + */ +public interface Datum { + /** + * method to get the payload value + * + * @return returns the payload value in byte array + */ + byte[] getValue(); + + /** + * method to get the event time of the payload + * + * @return returns the event time of the payload + */ + Instant getEventTime(); + + /** + * method to get the watermark information + * + * @return returns the watermark + */ + Instant getWatermark(); +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/model/IntervalWindow.java b/src/main/java/io/numaproj/numaflow/reducestreamer/model/IntervalWindow.java new file mode 100644 index 00000000..4068d8a6 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/model/IntervalWindow.java @@ -0,0 +1,22 @@ +package io.numaproj.numaflow.reducestreamer.model; + +import java.time.Instant; + +/** + * IntervalWindow contains methods to get the information for a given interval window. + */ +public interface IntervalWindow { + /** + * method to get the start time of the interval window + * + * @return start time of the window + */ + Instant getStartTime(); + + /** + * method to get the end time of the interval window + * + * @return end time of the window + */ + Instant getEndTime(); +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/model/Message.java b/src/main/java/io/numaproj/numaflow/reducestreamer/model/Message.java new file mode 100644 index 00000000..05e3e7b8 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/model/Message.java @@ -0,0 +1,56 @@ +package io.numaproj.numaflow.reducestreamer.model; + +import lombok.Getter; + +/** + * Message is used to wrap the data returned by Reduce Streamer functions. + */ +@Getter +public class Message { + public static final String DROP = "U+005C__DROP__"; + private final String[] keys; + private final byte[] value; + private final String[] tags; + + + /** + * used to create Message with value, keys and tags(used for conditional forwarding) + * + * @param value message value + * @param keys message keys + * @param tags message tags which will be used for conditional forwarding + */ + public Message(byte[] value, String[] keys, String[] tags) { + this.keys = keys; + this.value = value; + this.tags = tags; + } + + /** + * used to create Message with value. + * + * @param value message value + */ + public Message(byte[] value) { + this(value, null, null); + } + + /** + * used to create Message with value and keys. + * + * @param value message value + * @param keys message keys + */ + public Message(byte[] value, String[] keys) { + this(value, keys, null); + } + + /** + * creates a Message which will be dropped + * + * @return returns the Message which will be dropped + */ + public static Message toDrop() { + return new Message(new byte[0], null, new String[]{DROP}); + } +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/model/Metadata.java b/src/main/java/io/numaproj/numaflow/reducestreamer/model/Metadata.java new file mode 100644 index 00000000..50e285c4 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/model/Metadata.java @@ -0,0 +1,14 @@ +package io.numaproj.numaflow.reducestreamer.model; + +/** + * Metadata contains methods to get the metadata for the reduce stream operation. + */ +public interface Metadata { + /** + * method to get the interval window. + * + * @return IntervalWindow which has the window information + */ + IntervalWindow getIntervalWindow(); +} + diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/model/OutputStreamObserver.java b/src/main/java/io/numaproj/numaflow/reducestreamer/model/OutputStreamObserver.java new file mode 100644 index 00000000..a2eed211 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/model/OutputStreamObserver.java @@ -0,0 +1,13 @@ +package io.numaproj.numaflow.reducestreamer.model; + +/** + * OutputStreamObserver sends to the output stream, the messages generate by the reduce streamer. + */ +public interface OutputStreamObserver { + /** + * method will be used for sending messages to the output stream. + * + * @param message the message to be sent + */ + void send(Message message); +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/model/ReduceStreamer.java b/src/main/java/io/numaproj/numaflow/reducestreamer/model/ReduceStreamer.java new file mode 100644 index 00000000..c4032ad0 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/model/ReduceStreamer.java @@ -0,0 +1,36 @@ +package io.numaproj.numaflow.reducestreamer.model; + +/** + * ReduceStreamer exposes methods for performing reduce streaming operations. + */ +public abstract class ReduceStreamer { + /** + * processMessage is invoked for each reduce input message. + * It reads the input data from the datum and performs reduce operations for the given keys. + * An output stream is provided for sending back the result to the reduce output stream. + * + * @param keys message keys + * @param datum current message to be processed + * @param outputStream observer of the reduce result, which is used to send back reduce responses + * @param md metadata associated with the window + */ + public abstract void processMessage( + String[] keys, + Datum datum, + OutputStreamObserver outputStream, + Metadata md); + + /** + * handleEndOfStream handles the closure of the reduce input stream. + * This method is invoked when the input reduce stream is closed. + * It provides the capability of constructing final responses based on the messages processed so far. + * + * @param keys message keys + * @param outputStreamObserver observer of the reduce result, which is used to send back reduce responses + * @param md metadata associated with the window + */ + public abstract void handleEndOfStream( + String[] keys, + OutputStreamObserver outputStreamObserver, + Metadata md); +} diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/model/ReduceStreamerFactory.java b/src/main/java/io/numaproj/numaflow/reducestreamer/model/ReduceStreamerFactory.java new file mode 100644 index 00000000..b90ef7d9 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/model/ReduceStreamerFactory.java @@ -0,0 +1,13 @@ +package io.numaproj.numaflow.reducestreamer.model; + +/** + * ReduceStreamerFactory is the factory for ReduceStreamer. + */ +public abstract class ReduceStreamerFactory { + /** + * Helper function to create a reduce streamer. + * + * @return a concrete reduce streamer instance + */ + public abstract ReduceStreamerT createReduceStreamer(); +} diff --git a/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java b/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java index b3bc7e7d..897872b8 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java @@ -36,7 +36,7 @@ public void testFailure() { Metadata md = new MetadataImpl( new IntervalWindowImpl(Instant.now(), Instant.now())); - ActorRef supervisor = actorSystem + ActorRef supervisorActor = actorSystem .actorOf(ReduceSupervisorActor .props( new TestExceptionFactory(), @@ -50,7 +50,7 @@ public void testFailure() { .setValue(ByteString.copyFromUtf8(String.valueOf(1))) .build()) .build()); - supervisor.tell(reduceRequest, ActorRef.noSender()); + supervisorActor.tell(reduceRequest, ActorRef.noSender()); try { completableFuture.get(); @@ -74,7 +74,7 @@ public void testDeadLetterHandling() { Metadata md = new MetadataImpl( new IntervalWindowImpl(Instant.now(), Instant.now())); - ActorRef supervisor = actorSystem + ActorRef supervisorActor = actorSystem .actorOf(ReduceSupervisorActor .props( new TestExceptionFactory(), @@ -82,8 +82,8 @@ public void testDeadLetterHandling() { shutdownActor, new ReduceOutputStreamObserver())); - DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisor); - supervisor.tell(deadLetter, ActorRef.noSender()); + DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisorActor); + supervisorActor.tell(deadLetter, ActorRef.noSender()); try { completableFuture.get(); diff --git a/src/test/java/io/numaproj/numaflow/reducer/ReduceSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java similarity index 93% rename from src/test/java/io/numaproj/numaflow/reducer/ReduceSupervisorActorTest.java rename to src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java index e33cbd21..d98760ff 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ReduceSupervisorActorTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java @@ -17,7 +17,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class ReduceSupervisorActorTest { +public class SupervisorActorTest { @Test public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException { @@ -33,7 +33,7 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); - ActorRef supervisor = actorSystem + ActorRef supervisorActor = actorSystem .actorOf(ReduceSupervisorActor .props(new TestReducerFactory(), md, shutdownActor, outputStreamObserver)); @@ -47,9 +47,9 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .build()) .build()); - supervisor.tell(reduceRequest, ActorRef.noSender()); + supervisorActor.tell(reduceRequest, ActorRef.noSender()); } - supervisor.tell(Constants.EOF, ActorRef.noSender()); + supervisorActor.tell(Constants.EOF, ActorRef.noSender()); try { completableFuture.get(); @@ -83,7 +83,7 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas new IntervalWindowImpl(Instant.now(), Instant.now())); ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); - ActorRef supervisor = actorSystem + ActorRef supervisorActor = actorSystem .actorOf(ReduceSupervisorActor .props( new TestReducerFactory(), @@ -102,10 +102,10 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .build()) .build()); - supervisor.tell(reduceRequest, ActorRef.noSender()); + supervisorActor.tell(reduceRequest, ActorRef.noSender()); } - supervisor.tell(Constants.EOF, ActorRef.noSender()); + supervisorActor.tell(Constants.EOF, ActorRef.noSender()); try { completableFuture.get(); // each reduce request generates two reduce responses, one containing the data and the other one indicating EOF. diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/GRPCConfigTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/GRPCConfigTest.java new file mode 100644 index 00000000..a84a1539 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/GRPCConfigTest.java @@ -0,0 +1,39 @@ +package io.numaproj.numaflow.reducestreamer; + +import io.numaproj.numaflow.info.ServerInfoAccessor; +import org.junit.Assert; +import org.junit.Test; + +public class GRPCConfigTest { + + @Test + public void testDefaultGrpcConfig() { + GRPCConfig grpcConfig = GRPCConfig.defaultGrpcConfig(); + Assert.assertNotNull(grpcConfig); + Assert.assertEquals( + ServerInfoAccessor.DEFAULT_SERVER_INFO_FILE_PATH, + grpcConfig.getInfoFilePath()); + Assert.assertEquals( + io.numaproj.numaflow.reducestreamer.Constants.DEFAULT_MESSAGE_SIZE, + grpcConfig.getMaxMessageSize()); + Assert.assertEquals( + io.numaproj.numaflow.reducestreamer.Constants.DEFAULT_SOCKET_PATH, + grpcConfig.getSocketPath()); + } + + @Test + public void testNewBuilder() { + String socketPath = "test-socket-path"; + int maxMessageSize = 2000; + String infoFilePath = "test-info-file-path"; + GRPCConfig grpcConfig = GRPCConfig.newBuilder() + .socketPath(socketPath) + .maxMessageSize(maxMessageSize) + .infoFilePath(infoFilePath) + .build(); + Assert.assertNotNull(grpcConfig); + Assert.assertEquals(socketPath, grpcConfig.getSocketPath()); + Assert.assertEquals(maxMessageSize, grpcConfig.getMaxMessageSize()); + Assert.assertEquals(infoFilePath, grpcConfig.getInfoFilePath()); + } +} diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ReduceOutputStreamObserver.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ReduceOutputStreamObserver.java new file mode 100644 index 00000000..07b74bd4 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ReduceOutputStreamObserver.java @@ -0,0 +1,38 @@ +package io.numaproj.numaflow.reducestreamer; + +import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This is a dummy implementation of reduce output stream observer for testing purpose. + */ +@Slf4j +public class ReduceOutputStreamObserver implements StreamObserver { + public AtomicReference completed = new AtomicReference<>(false); + public AtomicReference> resultDatum = new AtomicReference<>( + new ArrayList<>()); + public Throwable t; + + @Override + public void onNext(ReduceOuterClass.ReduceResponse response) { + List receivedResponses = resultDatum.get(); + receivedResponses.add(response); + resultDatum.set(receivedResponses); + } + + @Override + public void onError(Throwable throwable) { + t = throwable; + } + + @Override + public void onCompleted() { + log.info("on completed executed"); + this.completed.set(true); + } +} diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java new file mode 100644 index 00000000..cbd13c48 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java @@ -0,0 +1,179 @@ +package io.numaproj.numaflow.reducestreamer; + +import com.google.protobuf.ByteString; +import io.grpc.Context; +import io.grpc.Contexts; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.MetadataUtils; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.numaproj.numaflow.reduce.v1.ReduceGrpc; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import io.numaproj.numaflow.reducestreamer.model.Datum; +import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory; +import io.numaproj.numaflow.shared.GrpcServerUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicReference; + +import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY; +import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class ServerErrTest { + public static final Metadata.Key DATUM_METADATA_WIN_START = Metadata.Key.of( + WIN_START_KEY, + Metadata.ASCII_STRING_MARSHALLER); + public static final Metadata.Key DATUM_METADATA_WIN_END = Metadata.Key.of( + WIN_END_KEY, + Metadata.ASCII_STRING_MARSHALLER); + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private Server server; + private ManagedChannel inProcessChannel; + + @Before + public void setUp() throws Exception { + ServerInterceptor interceptor = new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next) { + final var context = + Context.current().withValues( + GrpcServerUtils.WINDOW_START_TIME, + headers.get(DATUM_METADATA_WIN_START), + GrpcServerUtils.WINDOW_END_TIME, + headers.get(DATUM_METADATA_WIN_END)); + return Contexts.interceptCall(context, call, headers, next); + } + }; + + String serverName = InProcessServerBuilder.generateName(); + + GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() + .maxMessageSize(io.numaproj.numaflow.reducestreamer.Constants.DEFAULT_MESSAGE_SIZE) + .socketPath(io.numaproj.numaflow.reducestreamer.Constants.DEFAULT_SOCKET_PATH) + .infoFilePath("/tmp/numaflow-test-server-info)") + .build(); + + server = new Server( + new ReduceStreamerErrTestFactory(), + grpcServerConfig); + + server.setServerBuilder(InProcessServerBuilder.forName(serverName) + .intercept(interceptor) + .directExecutor()); + + server.start(); + + inProcessChannel = grpcCleanup.register(InProcessChannelBuilder + .forName(serverName) + .directExecutor() + .build()); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowable() { + Metadata metadata = new Metadata(); + metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000"); + metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000"); + + // create an output stream observer + ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); + // we need to maintain a reference to any exceptions thrown inside the thread, otherwise even if the assertion failed in the thread, + // the test can still succeed. + AtomicReference exceptionInThread = new AtomicReference<>(); + + Thread t = new Thread(() -> { + while (outputStreamObserver.t == null) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + exceptionInThread.set(e); + } + } + try { + assertEquals( + "UNKNOWN: java.lang.RuntimeException: unknown exception", + outputStreamObserver.t.getMessage()); + } catch (Throwable e) { + exceptionInThread.set(e); + } + }); + t.start(); + + StreamObserver inputStreamObserver = ReduceGrpc + .newStub(inProcessChannel) + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) + .reduceFn(outputStreamObserver); + + for (int i = 1; i <= 10; i++) { + ReduceOuterClass.ReduceRequest reduceRequest = ReduceOuterClass.ReduceRequest + .newBuilder() + .setPayload(ReduceOuterClass.ReduceRequest.Payload + .newBuilder() + .addKeys("reduce-key") + .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .build()) + .build(); + inputStreamObserver.onNext(reduceRequest); + } + + inputStreamObserver.onCompleted(); + + try { + t.join(); + } catch (InterruptedException e) { + fail("Thread got interrupted before test assertion."); + } + // Fail the test if any exception caught in the thread + if (exceptionInThread.get() != null) { + fail("Assertion failed in the thread: " + exceptionInThread.get().getMessage()); + } + } + + public static class ReduceStreamerErrTestFactory extends ReduceStreamerFactory { + @Override + public TestReduceStreamHandler createReduceStreamer() { + return new ServerErrTest.ReduceStreamerErrTestFactory.TestReduceStreamHandler(); + } + + public static class TestReduceStreamHandler extends ReduceStreamer { + @Override + public void processMessage( + String[] keys, + Datum datum, + OutputStreamObserver outputStream, + io.numaproj.numaflow.reducestreamer.model.Metadata md) { + throw new RuntimeException("unknown exception"); + } + + @Override + public void handleEndOfStream( + String[] keys, + OutputStreamObserver outputStreamObserver, + io.numaproj.numaflow.reducestreamer.model.Metadata md) { + + } + } + } +} diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java new file mode 100644 index 00000000..f64830c0 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java @@ -0,0 +1,261 @@ +package io.numaproj.numaflow.reducestreamer; + +import com.google.protobuf.ByteString; +import io.grpc.Context; +import io.grpc.Contexts; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.MetadataUtils; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.numaproj.numaflow.reduce.v1.ReduceGrpc; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import io.numaproj.numaflow.reducestreamer.model.Datum; +import io.numaproj.numaflow.reducestreamer.model.Message; +import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory; +import io.numaproj.numaflow.shared.GrpcServerUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY; +import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ServerTest { + public static final Metadata.Key DATUM_METADATA_WIN_START = Metadata.Key.of( + WIN_START_KEY, + Metadata.ASCII_STRING_MARSHALLER); + public static final Metadata.Key DATUM_METADATA_WIN_END = Metadata.Key.of( + WIN_END_KEY, + Metadata.ASCII_STRING_MARSHALLER); + private final static String REDUCE_PROCESSED_KEY_SUFFIX = "-processed"; + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private io.numaproj.numaflow.reducestreamer.Server server; + private ManagedChannel inProcessChannel; + + @Before + public void setUp() throws Exception { + + ServerInterceptor interceptor = new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next) { + final var context = + Context.current().withValues( + GrpcServerUtils.WINDOW_START_TIME, + headers.get(DATUM_METADATA_WIN_START), + GrpcServerUtils.WINDOW_END_TIME, + headers.get(DATUM_METADATA_WIN_END)); + return Contexts.interceptCall(context, call, headers, next); + } + }; + + String serverName = InProcessServerBuilder.generateName(); + + io.numaproj.numaflow.reducestreamer.GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() + .maxMessageSize(io.numaproj.numaflow.reducestreamer.Constants.DEFAULT_MESSAGE_SIZE) + .socketPath(io.numaproj.numaflow.reducestreamer.Constants.DEFAULT_SOCKET_PATH) + .infoFilePath("/tmp/numaflow-test-server-info)") + .build(); + + server = new Server( + new ReduceStreamerTestFactory(), + grpcServerConfig); + + server.setServerBuilder(InProcessServerBuilder.forName(serverName) + .intercept(interceptor) + .directExecutor()); + + server.start(); + + inProcessChannel = grpcCleanup.register(InProcessChannelBuilder + .forName(serverName) + .directExecutor() + .build()); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequestsGetAggregatedToOneResponse() { + String reduceKey = "reduce-key"; + + Metadata metadata = new Metadata(); + metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000"); + metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000"); + + // create an output stream observer + io.numaproj.numaflow.reducer.ReduceOutputStreamObserver outputStreamObserver = new io.numaproj.numaflow.reducer.ReduceOutputStreamObserver(); + + StreamObserver inputStreamObserver = ReduceGrpc + .newStub(inProcessChannel) + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) + .reduceFn(outputStreamObserver); + + for (int i = 1; i <= 11; i++) { + ReduceOuterClass.ReduceRequest request = ReduceOuterClass.ReduceRequest.newBuilder() + .setPayload(ReduceOuterClass.ReduceRequest.Payload + .newBuilder() + .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .addAllKeys(Arrays.asList(reduceKey)) + .build()) + .build(); + inputStreamObserver.onNext(request); + } + + inputStreamObserver.onCompleted(); + + String[] expectedKeys = new String[]{reduceKey + REDUCE_PROCESSED_KEY_SUFFIX}; + // sum of first 10 numbers 1 to 10 -> 55 + ByteString expectedFirstResponse = ByteString.copyFromUtf8(String.valueOf(55)); + // after the sum reaches 55, the test reducer reset the sum, hence when EOF is sent from input stream, the sum is 11 and gets sent to output stream. + ByteString expectedSecondResponse = ByteString.copyFromUtf8(String.valueOf(11)); + while (!outputStreamObserver.completed.get()) ; + + // Expect 2 responses, one containing the aggregated data and the other indicating EOF. + assertEquals(3, outputStreamObserver.resultDatum.get().size()); + assertEquals( + expectedKeys, + outputStreamObserver.resultDatum + .get() + .get(0) + .getResult() + .getKeysList() + .toArray(new String[0])); + assertEquals( + expectedFirstResponse, + outputStreamObserver.resultDatum + .get() + .get(0) + .getResult() + .getValue()); + assertEquals( + expectedKeys, + outputStreamObserver.resultDatum + .get() + .get(1) + .getResult() + .getKeysList() + .toArray(new String[0])); + assertEquals( + expectedSecondResponse, + outputStreamObserver.resultDatum + .get() + .get(1) + .getResult() + .getValue()); + assertTrue(outputStreamObserver.resultDatum.get().get(2).getEOF()); + } + + @Test + public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then_requestsGetAggregatedSeparately() { + String reduceKey = "reduce-key"; + int keyCount = 3; + + Metadata metadata = new Metadata(); + metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000"); + metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000"); + + // create an output stream observer + io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); + + StreamObserver inputStreamObserver = ReduceGrpc + .newStub(inProcessChannel) + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) + .reduceFn(outputStreamObserver); + + // send messages with keyCount different keys + for (int j = 0; j < keyCount; j++) { + for (int i = 1; i <= 11; i++) { + ReduceOuterClass.ReduceRequest request = ReduceOuterClass.ReduceRequest + .newBuilder() + .setPayload(ReduceOuterClass.ReduceRequest.Payload.newBuilder() + .addAllKeys(Arrays.asList(reduceKey + j)) + .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .build()) + .build(); + inputStreamObserver.onNext(request); + } + } + + inputStreamObserver.onCompleted(); + + // sum of first 10 numbers 1 to 10 -> 55 + ByteString expectedFirstResponse = ByteString.copyFromUtf8(String.valueOf(55)); + // after the sum reaches 55, the test reducer reset the sum, hence when EOF is sent from input stream, the sum is 11 and gets sent to output stream. + ByteString expectedSecondResponse = ByteString.copyFromUtf8(String.valueOf(11)); + + while (!outputStreamObserver.completed.get()) ; + List result = outputStreamObserver.resultDatum.get(); + // the outputStreamObserver should have observed 3*keyCount responses, 2 with real output sum data, one as EOF. + assertEquals(keyCount * 3, result.size()); + result.forEach(response -> { + assertTrue(response.getResult().getValue().equals(expectedFirstResponse) || + response.getResult().getValue().equals(expectedSecondResponse) + || response.getEOF()); + + }); + } + + public static class ReduceStreamerTestFactory extends ReduceStreamerFactory { + @Override + public ServerTest.ReduceStreamerTestFactory.TestReduceStreamHandler createReduceStreamer() { + return new ServerTest.ReduceStreamerTestFactory.TestReduceStreamHandler(); + } + + public static class TestReduceStreamHandler extends ReduceStreamer { + private int sum = 0; + + @Override + public void processMessage( + String[] keys, + Datum datum, + OutputStreamObserver outputStreamObserver, + io.numaproj.numaflow.reducestreamer.model.Metadata md) { + sum += Integer.parseInt(new String(datum.getValue())); + if (sum > 50) { + String[] updatedKeys = Arrays + .stream(keys) + .map(c -> c + "-processed") + .toArray(String[]::new); + Message message = new Message(String.valueOf(sum).getBytes(), updatedKeys); + outputStreamObserver.send(message); + // reset sum + sum = 0; + } + } + + @Override + public void handleEndOfStream( + String[] keys, + OutputStreamObserver outputStreamObserver, + io.numaproj.numaflow.reducestreamer.model.Metadata md) { + String[] updatedKeys = Arrays + .stream(keys) + .map(c -> c + "-processed") + .toArray(String[]::new); + Message message = new Message(String.valueOf(sum).getBytes(), updatedKeys); + outputStreamObserver.send(message); + } + } + } +} diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ShutdownActorTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ShutdownActorTest.java new file mode 100644 index 00000000..9d8bfa6e --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ShutdownActorTest.java @@ -0,0 +1,141 @@ +package io.numaproj.numaflow.reducestreamer; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.AllDeadLetters; +import akka.actor.DeadLetter; +import com.google.protobuf.ByteString; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import io.numaproj.numaflow.reducestreamer.model.Datum; +import io.numaproj.numaflow.reducestreamer.model.Message; +import io.numaproj.numaflow.reducestreamer.model.Metadata; +import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory; +import org.junit.Test; + +import java.time.Instant; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + + +public class ShutdownActorTest { + @Test + public void testFailure() { + final ActorSystem actorSystem = ActorSystem.create("test-system-1"); + CompletableFuture completableFuture = new CompletableFuture<>(); + + String reduceKey = "reduce-key"; + ReduceOuterClass.ReduceRequest.Payload.Builder payloadBuilder = ReduceOuterClass.ReduceRequest.Payload + .newBuilder() + .addKeys(reduceKey); + + ActorRef shutdownActor = actorSystem + .actorOf(ShutdownActor + .props(completableFuture)); + + Metadata md = new MetadataImpl( + new IntervalWindowImpl(Instant.now(), Instant.now())); + + io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver(); + + ActorRef outputActor = actorSystem.actorOf(OutputActor + .props(reduceOutputStreamObserver)); + + ActorRef supervisorActor = actorSystem + .actorOf(SupervisorActor + .props( + new TestExceptionFactory(), + md, + shutdownActor, + outputActor)); + + io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest( + ReduceOuterClass.ReduceRequest.newBuilder() + .setPayload(payloadBuilder + .addKeys("reduce-test") + .setValue(ByteString.copyFromUtf8(String.valueOf(1))) + .build()) + .build()); + supervisorActor.tell(reduceRequest, ActorRef.noSender()); + + try { + completableFuture.get(); + fail("Expected the future to complete with exception"); + } catch (Exception e) { + assertEquals(e.getMessage(), "java.lang.RuntimeException: UDF Failure"); + } + } + + @Test + public void testDeadLetterHandling() { + final ActorSystem actorSystem = ActorSystem.create("test-system-2"); + CompletableFuture completableFuture = new CompletableFuture<>(); + + ActorRef shutdownActor = actorSystem + .actorOf(ShutdownActor + .props(completableFuture)); + + actorSystem.eventStream().subscribe(shutdownActor, AllDeadLetters.class); + + Metadata md = new MetadataImpl( + new IntervalWindowImpl(Instant.now(), Instant.now())); + + io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver(); + + ActorRef outputActor = actorSystem.actorOf(OutputActor + .props(reduceOutputStreamObserver)); + + ActorRef supervisorActor = actorSystem + .actorOf(SupervisorActor + .props( + new TestExceptionFactory(), + md, + shutdownActor, + outputActor)); + + DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisorActor); + supervisorActor.tell(deadLetter, ActorRef.noSender()); + + try { + completableFuture.get(); + fail("Expected the future to complete with exception"); + } catch (Exception e) { + assertEquals(e.getMessage(), "java.lang.Throwable: dead letters"); + } + } + + + public static class TestExceptionFactory extends ReduceStreamerFactory { + + @Override + public TestException createReduceStreamer() { + return new TestException(); + } + + public static class TestException extends ReduceStreamer { + + int count = 0; + + @Override + public void processMessage( + String[] keys, + Datum datum, + OutputStreamObserver outputStream, + Metadata md) { + count += 1; + throw new RuntimeException("UDF Failure"); + } + + @Override + public void handleEndOfStream( + String[] keys, + OutputStreamObserver outputStreamObserver, + Metadata md) { + outputStreamObserver.send(new Message(String.valueOf(count).getBytes())); + } + } + } +} diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/SupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/SupervisorActorTest.java new file mode 100644 index 00000000..8e989db6 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/SupervisorActorTest.java @@ -0,0 +1,170 @@ +package io.numaproj.numaflow.reducestreamer; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import com.google.protobuf.ByteString; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import io.numaproj.numaflow.reducestreamer.model.Datum; +import io.numaproj.numaflow.reducestreamer.model.Message; +import io.numaproj.numaflow.reducestreamer.model.Metadata; +import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; +import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory; +import org.junit.Test; + +import java.time.Instant; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class SupervisorActorTest { + @Test + public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException { + final ActorSystem actorSystem = ActorSystem.create("test-system-1"); + CompletableFuture completableFuture = new CompletableFuture<>(); + + ActorRef shutdownActor = actorSystem + .actorOf(ShutdownActor + .props(completableFuture)); + + Metadata md = new MetadataImpl( + new IntervalWindowImpl(Instant.now(), Instant.now())); + + io.numaproj.numaflow.reducer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducer.ReduceOutputStreamObserver(); + + ActorRef outputActor = actorSystem.actorOf(OutputActor + .props(reduceOutputStreamObserver)); + + ActorRef supervisorActor = actorSystem + .actorOf(SupervisorActor + .props( + new TestReduceStreamerFactory(), + md, + shutdownActor, + outputActor)); + + for (int i = 1; i <= 10; i++) { + io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest( + ReduceOuterClass.ReduceRequest + .newBuilder() + .setPayload(ReduceOuterClass.ReduceRequest.Payload + .newBuilder() + // all reduce requests share same set of keys. + .addAllKeys(Arrays.asList("key-1", "key-2")) + .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .build()) + .build()); + supervisorActor.tell(reduceRequest, ActorRef.noSender()); + } + supervisorActor.tell( + io.numaproj.numaflow.reducestreamer.Constants.EOF, + ActorRef.noSender()); + + try { + completableFuture.get(); + // the observer should receive 2 messages, one is the aggregated result, the other is the EOF response. + assertEquals(2, reduceOutputStreamObserver.resultDatum.get().size()); + assertEquals("10", reduceOutputStreamObserver.resultDatum + .get() + .get(0) + .getResult() + .getValue() + .toStringUtf8()); + assertTrue(reduceOutputStreamObserver.resultDatum + .get() + .get(1) + .getEOF()); + } catch (InterruptedException | ExecutionException e) { + fail("Expected the future to complete without exception"); + } + } + + @Test + public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcasts_then_multipleReducerActorsHandleKeySetsSeparately() throws RuntimeException { + final ActorSystem actorSystem = ActorSystem.create("test-system-2"); + CompletableFuture completableFuture = new CompletableFuture<>(); + + ActorRef shutdownActor = actorSystem + .actorOf(ShutdownActor + .props(completableFuture)); + + Metadata md = new MetadataImpl( + new IntervalWindowImpl(Instant.now(), Instant.now())); + + io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new ReduceOutputStreamObserver(); + ActorRef outputActor = actorSystem.actorOf(OutputActor + .props(reduceOutputStreamObserver)); + ActorRef supervisorActor = actorSystem + .actorOf(SupervisorActor + .props( + new TestReduceStreamerFactory(), + md, + shutdownActor, + outputActor) + ); + + for (int i = 1; i <= 10; i++) { + io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest( + ReduceOuterClass.ReduceRequest + .newBuilder() + .setPayload(ReduceOuterClass.ReduceRequest.Payload + .newBuilder() + // each request contain a unique set of keys. + .addAllKeys(Arrays.asList("shared-key", "unique-key-" + i)) + .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .build()) + .build()); + supervisorActor.tell(reduceRequest, ActorRef.noSender()); + } + + supervisorActor.tell( + io.numaproj.numaflow.reducestreamer.Constants.EOF, + ActorRef.noSender()); + try { + completableFuture.get(); + // each reduce request generates two reduce responses, one containing the data and the other one indicating EOF. + assertEquals(20, reduceOutputStreamObserver.resultDatum.get().size()); + for (int i = 0; i < 20; i++) { + ReduceOuterClass.ReduceResponse response = reduceOutputStreamObserver.resultDatum + .get() + .get(i); + assertTrue(response.getResult().getValue().toStringUtf8().equals("1") + || response.getEOF()); + } + } catch (InterruptedException | ExecutionException e) { + fail("Expected the future to complete without exception"); + } + } + + public static class TestReduceStreamerFactory extends ReduceStreamerFactory { + @Override + public TestReduceStreamerHandler createReduceStreamer() { + return new TestReduceStreamerHandler(); + } + + public static class TestReduceStreamerHandler extends ReduceStreamer { + int count = 0; + + @Override + public void processMessage( + String[] keys, + Datum datum, + OutputStreamObserver outputStream, + Metadata md) { + count += 1; + } + + @Override + public void handleEndOfStream( + String[] keys, + OutputStreamObserver outputStreamObserver, + Metadata md) { + outputStreamObserver.send(new Message(String.valueOf(count).getBytes())); + } + } + } +}