Skip to content

Commit

Permalink
feat: implement reduce stream sdk (#91)
Browse files Browse the repository at this point in the history
* Define the user interface as two methods - processMessage and handleEndOfStream.
* Continue using akka to handle concurrency. Introduce an OutputActor to make sure we are sending responses back to the gRPC output stream synchronously.

Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jan 23, 2024
1 parent d4e6d35 commit cb91f37
Show file tree
Hide file tree
Showing 32 changed files with 1,937 additions and 12 deletions.
17 changes: 17 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,23 @@
</to>
</configuration>
</execution>
<execution>
<id>reduce-stream-sum</id>
<phase>package</phase>
<goals>
<goal>dockerBuild</goal>
</goals>
<configuration>
<container>
<mainClass>
io.numaproj.numaflow.examples.reducestreamer.sum.SumFactory
</mainClass>
</container>
<to>
<image>numaflow-java-examples/reduce-stream-sum</image>
</to>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.numaproj.numaflow.examples.reducestreamer.sum;

import io.numaproj.numaflow.reducestreamer.Server;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory;
import lombok.extern.slf4j.Slf4j;

/**
* SumFactory extends ReduceStreamerFactory to support creating instances of SumFunction.
* It also provides a main function to start a server for handling the reduce stream.
*/
@Slf4j
public class SumFactory extends ReduceStreamerFactory<SumFunction> {

public static void main(String[] args) throws Exception {
log.info("sum udf was invoked");
new Server(new SumFactory()).start();
}

@Override
public SumFunction createReduceStreamer() {
return new SumFunction();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.numaproj.numaflow.examples.reducestreamer.sum;

import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer;
import lombok.extern.slf4j.Slf4j;

/**
* SumFunction is a User Defined Reduce Stream Function example which sums up the values for the given keys
* and outputs the sum when the sum is greater than 100.
* When the input stream closes, the function outputs the sum no matter what value it holds.
*/
@Slf4j
public class SumFunction extends ReduceStreamer {

private int sum = 0;

@Override
public void processMessage(
String[] keys,
io.numaproj.numaflow.reducestreamer.model.Datum datum,
OutputStreamObserver outputStreamObserver,
io.numaproj.numaflow.reducestreamer.model.Metadata md) {
try {
sum += Integer.parseInt(new String(datum.getValue()));
} catch (NumberFormatException e) {
log.info("error while parsing integer - {}", e.getMessage());
}
if (sum >= 100) {
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes()));
sum = 0;
}
}

@Override
public void handleEndOfStream(
String[] keys,
OutputStreamObserver outputStreamObserver,
Metadata md) {
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* ActorRequest is a wrapper of the gRpc input request.
* It is constructed by the service when service receives an input request and then sent to
* the supervisor actor, to be distributed to reduce streamer actors.
*/
@Getter
@AllArgsConstructor
class ActorRequest {
ReduceOuterClass.ReduceRequest request;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getRequest().getPayload().getKeysList().toArray(new String[0]));
}

public String[] getKeySet() {
return this.getRequest().getPayload().getKeysList().toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

/**
* The actor response holds the final EOF response for a particular key set.
* <p>
* The isLast attribute indicates whether the response is globally the last one to be sent to
* the output gRPC stream, if set to true, it means the response is the very last response among
* all key sets. When output stream actor receives an isLast response, it sends the response and immediately
* closes the output stream.
*/
@Getter
@Setter
@AllArgsConstructor
class ActorResponse {
ReduceOuterClass.ReduceResponse response;
boolean isLast;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getActorUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getResponse().getResult().getKeysList().toArray(new String[0]));
}
}
13 changes: 13 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.numaproj.numaflow.reducestreamer;

class Constants {
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;

public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/reducestream.sock";

public static final String EOF = "EOF";

public static final String SUCCESS = "SUCCESS";

public static final String DELIMITER = ":";
}
26 changes: 26 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.info.ServerInfoAccessor;
import lombok.Builder;
import lombok.Getter;

/**
* GRPCConfig is used to provide configurations for gRPC server.
*/
@Getter
@Builder(builderMethodName = "newBuilder")
public class GRPCConfig {
private String socketPath;
private int maxMessageSize;
private String infoFilePath;

/**
* Static method to create default GRPCConfig.
*/
static GRPCConfig defaultGrpcConfig() {
return GRPCConfig.newBuilder()
.infoFilePath(ServerInfoAccessor.DEFAULT_SERVER_INFO_FILE_PATH)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reducestreamer.model.Datum;
import lombok.AllArgsConstructor;

import java.time.Instant;

@AllArgsConstructor
class HandlerDatum implements Datum {
private byte[] value;
private Instant watermark;
private Instant eventTime;

@Override
public Instant getWatermark() {
return this.watermark;
}

@Override
public byte[] getValue() {
return this.value;
}

@Override
public Instant getEventTime() {
return this.eventTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reducestreamer.model.IntervalWindow;
import lombok.AllArgsConstructor;

import java.time.Instant;

@AllArgsConstructor
class IntervalWindowImpl implements IntervalWindow {
private final Instant startTime;
private final Instant endTime;

@Override
public Instant getStartTime() {
return this.startTime;
}

@Override
public Instant getEndTime() {
return this.endTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reducestreamer.model.IntervalWindow;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import lombok.AllArgsConstructor;

@AllArgsConstructor
class MetadataImpl implements Metadata {
private final IntervalWindow intervalWindow;

@Override
public IntervalWindow getIntervalWindow() {
return intervalWindow;
}
}
48 changes: 48 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.numaproj.numaflow.reducestreamer;

import akka.actor.AbstractActor;
import akka.actor.Props;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* Output actor is a wrapper around the gRPC output stream.
* It ensures synchronized calls to the responseObserver onNext() and invokes onComplete at the end of the stream.
* ALL reduce responses are sent to the response stream actor before getting forwarded to the output gRPC stream.
* <p>
* More details about gRPC StreamObserver concurrency: https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html
*/
@Slf4j
@AllArgsConstructor
class OutputActor extends AbstractActor {
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver;

public static Props props(
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver) {
return Props.create(OutputActor.class, responseObserver);
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(ActorResponse.class, this::handleResponse)
.build();
}

private void handleResponse(ActorResponse actorResponse) {
if (actorResponse.isLast()) {
// send the very last response.
responseObserver.onNext(actorResponse.getResponse());
// close the output stream.
responseObserver.onCompleted();
// stop the AKKA system right after we close the output stream.
// note: could make more sense if the supervisor actor stops the system,
// but it requires an extra tell.
getContext().getSystem().stop(getSender());
} else {
responseObserver.onNext(actorResponse.getResponse());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.numaproj.numaflow.reducestreamer;

import akka.actor.ActorRef;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver;
import lombok.AllArgsConstructor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

@AllArgsConstructor
class OutputStreamObserverImpl implements OutputStreamObserver {
private final Metadata md;
private final ActorRef responseStreamActor;

@Override
public void send(Message message) {
this.responseStreamActor.tell(buildResponse(message), ActorRef.noSender());
}

private ActorResponse buildResponse(Message message) {
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
// set the window using the actor metadata.
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getStartTime().getNano()))
.setEnd(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
responseBuilder.setEOF(false);
// set the result.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>():Arrays.asList(message.getKeys()))
.addAllTags(
message.getTags() == null ? new ArrayList<>():List.of(message.getTags()))
.build());
return new ActorResponse(responseBuilder.build(), false);
}
}
Loading

0 comments on commit cb91f37

Please sign in to comment.