Skip to content

Commit

Permalink
Merge branch 'main' into improve-ci-1363
Browse files Browse the repository at this point in the history
  • Loading branch information
ayildirim21 committed Mar 20, 2024
2 parents cfe40be + 68ffc5a commit 23e708d
Show file tree
Hide file tree
Showing 37 changed files with 4,551 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.numaproj.numaflow.examples.reducesession.counter;

import io.numaproj.numaflow.sessionreducer.Server;
import io.numaproj.numaflow.sessionreducer.model.SessionReducerFactory;
import lombok.extern.slf4j.Slf4j;

/**
* CountFactory extends SessionReducerFactory to support creating instances of SumFunction.
* It also provides a main function to start a server for handling the session reduce stream.
*/
@Slf4j
public class CountFactory extends SessionReducerFactory<CountFunction> {

public static void main(String[] args) throws Exception {
log.info("count udf was invoked");
new Server(new CountFactory()).start();
}

@Override
public CountFunction createSessionReducer() {
return new CountFunction();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.numaproj.numaflow.examples.reducesession.counter;

import io.numaproj.numaflow.sessionreducer.model.Datum;
import io.numaproj.numaflow.sessionreducer.model.Message;
import io.numaproj.numaflow.sessionreducer.model.SessionReducer;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicInteger;

/**
* CountFunction is a simple session reducer which counts the number of events in a session.
*/
@Slf4j
public class CountFunction extends SessionReducer {

private final AtomicInteger count = new AtomicInteger(0);

@Override
public void processMessage(
String[] keys,
Datum datum,
io.numaproj.numaflow.sessionreducer.model.OutputStreamObserver outputStreamObserver) {
this.count.incrementAndGet();
}

@Override
public void handleEndOfStream(
String[] keys,
io.numaproj.numaflow.sessionreducer.model.OutputStreamObserver outputStreamObserver) {
outputStreamObserver.send(new Message(String.valueOf(this.count.get()).getBytes()));
}

@Override
public byte[] accumulator() {
return String.valueOf(this.count.get()).getBytes();
}

@Override
public void mergeAccumulator(byte[] accumulator) {
int value = 0;
try {
value = Integer.parseInt(new String(accumulator));
} catch (NumberFormatException e) {
log.info("error while parsing integer - {}", e.getMessage());
}
this.count.addAndGet(value);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.examples.reducestreamer.sum;

import io.numaproj.numaflow.reducestreamer.model.Datum;
import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver;
Expand All @@ -19,9 +20,9 @@ public class SumFunction extends ReduceStreamer {
@Override
public void processMessage(
String[] keys,
io.numaproj.numaflow.reducestreamer.model.Datum datum,
Datum datum,
OutputStreamObserver outputStreamObserver,
io.numaproj.numaflow.reducestreamer.model.Metadata md) {
Metadata md) {
try {
sum += Integer.parseInt(new String(datum.getValue()));
} catch (NumberFormatException e) {
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@
<exclude>io.numaproj.numaflow.mapstream.v1</exclude>
<exclude>io.numaproj.numaflow.map.v1</exclude>
<exclude>io.numaproj.numaflow.reduce.v1</exclude>
<exclude>io.numaproj.numaflow.sessionreduce.v1</exclude>
<exclude>io.numaproj.numaflow.sourcetransformer.v1</exclude>
<exclude>io.numaproj.numaflow.sink.v1</exclude>
<exclude>io.numaproj.numaflow.sideinput.v1</exclude>
Expand Down Expand Up @@ -337,6 +338,7 @@
<exclude>io/numaproj/numaflow/examples/*</exclude>
<exclude>io/numaproj/numaflow/mapstream/v1/*</exclude>
<exclude>io/numaproj/numaflow/reduce/v1/*</exclude>
<exclude>io/numaproj/numaflow/sessionreduce/v1/*</exclude>
<exclude>io/numaproj/numaflow/map/v1/*</exclude>
<exclude>io/numaproj/numaflow/sourcetransformer/v1/*</exclude>
<exclude>io/numaproj/numaflow/sink/v1/*</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
/**
* Output actor is a wrapper around the gRPC output stream.
* It ensures synchronized calls to the responseObserver onNext() and invokes onComplete at the end of the stream.
* ALL reduce responses are sent to the response stream actor before getting forwarded to the output gRPC stream.
* ALL reduce responses are sent to the output actor before getting forwarded to the output gRPC stream.
* <p>
* More details about gRPC StreamObserver concurrency: https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
@AllArgsConstructor
class OutputStreamObserverImpl implements OutputStreamObserver {
private final Metadata md;
private final ActorRef responseStreamActor;
private final ActorRef outputActor;

@Override
public void send(Message message) {
this.responseStreamActor.tell(buildResponse(message), ActorRef.noSender());
this.outputActor.tell(buildResponse(message), ActorRef.noSender());
}

private ActorResponse buildResponse(Message message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ class ReduceStreamerActor extends AbstractActor {
private OutputStreamObserver outputStream;

public static Props props(
String[] keys, Metadata md, ReduceStreamer groupBy, ActorRef responseStreamActor) {
String[] keys, Metadata md, ReduceStreamer groupBy, ActorRef outputActor) {
return Props.create(
ReduceStreamerActor.class,
keys,
md,
groupBy,
new OutputStreamObserverImpl(md, responseStreamActor));
new OutputStreamObserverImpl(md, outputActor));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,31 @@ class SupervisorActor extends AbstractActor {
private final ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory;
private final Metadata md;
private final ActorRef shutdownActor;
private final ActorRef responseStreamActor;
private final ActorRef outputActor;
private final Map<String, ActorRef> actorsMap = new HashMap<>();

public SupervisorActor(
ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory,
Metadata md,
ActorRef shutdownActor,
ActorRef responseStreamActor) {
ActorRef outputActor) {
this.reduceStreamerFactory = reduceStreamerFactory;
this.md = md;
this.shutdownActor = shutdownActor;
this.responseStreamActor = responseStreamActor;
this.outputActor = outputActor;
}

public static Props props(
ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory,
Metadata md,
ActorRef shutdownActor,
ActorRef responseStreamActor) {
ActorRef outputActor) {
return Props.create(
SupervisorActor.class,
reduceStreamerFactory,
md,
shutdownActor,
responseStreamActor);
outputActor);
}

// if there is an uncaught exception stop in the supervisor actor, send a signal to shut down
Expand Down Expand Up @@ -101,7 +101,7 @@ private void invokeActor(ActorRequest actorRequest) {
keys,
this.md,
reduceStreamerHandler,
this.responseStreamActor));
this.outputActor));
actorsMap.put(uniqueId, actorRef);
}
HandlerDatum handlerDatum = constructHandlerDatum(actorRequest.getRequest().getPayload());
Expand All @@ -122,9 +122,9 @@ private void handleActorResponse(ActorResponse actorResponse) {
if (actorsMap.isEmpty()) {
// since the actors map is empty, this particular actor response is the last response to forward to output gRPC stream.
actorResponse.setLast(true);
this.responseStreamActor.tell(actorResponse, getSelf());
this.outputActor.tell(actorResponse, getSelf());
} else {
this.responseStreamActor.tell(actorResponse, getSelf());
this.outputActor.tell(actorResponse, getSelf());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.numaproj.numaflow.sessionreducer;

import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
import lombok.Builder;
import lombok.Getter;

/**
* ActorRequest is used by the supervisor actor to distribute session reduce operations to
* individual session reducer actors. One actor request is sent to only one session reducer actor.
*/
@Getter
class ActorRequest {
private final ActorRequestType type;
// the window of the target session the actor request is sent to
private final Sessionreduce.KeyedWindow keyedWindow;
// the new keyed window the target session is to be expanded to
// it is specified only when the actor request is an EXPAND
private final Sessionreduce.KeyedWindow newKeyedWindow;
// the payload of the request
private final Sessionreduce.SessionReduceRequest.Payload payload;
// the id of the merge task this request belongs to, it's equal to the unique id of the merged window,
// it is specified only when the actor request is a GET_ACCUMULATOR
private final String mergeTaskId;

@Builder
private ActorRequest(
ActorRequestType type,
Sessionreduce.KeyedWindow keyedWindow,
Sessionreduce.KeyedWindow newKeyedWindow,
Sessionreduce.SessionReduceRequest.Payload payload,
String mergeTaskId
) {
this.type = type;
this.keyedWindow = keyedWindow;
this.newKeyedWindow = newKeyedWindow;
this.payload = payload;
this.mergeTaskId = mergeTaskId;
}

static class ActorRequestBuilder {
ActorRequest build() {
if (newKeyedWindow != null && type != ActorRequestType.EXPAND) {
throw new IllegalStateException(
"attribute newKeyedWindow can only be set when the request is an EXPAND.");
}
if (newKeyedWindow == null && type == ActorRequestType.EXPAND) {
throw new IllegalStateException(
"attribute newKeyedWindow must be set when the request is an EXPAND.");
}
if (mergeTaskId != null && type != ActorRequestType.GET_ACCUMULATOR) {
throw new IllegalStateException(
"attribute mergeTaskId can only be set when the request is a GET_ACCUMULATOR.");
}
if (mergeTaskId == null && type == ActorRequestType.GET_ACCUMULATOR) {
throw new IllegalStateException(
"attribute mergeTaskId must be set when the request is a GET_ACCUMULATOR.");
}
return new ActorRequest(type, keyedWindow, newKeyedWindow, payload, mergeTaskId);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.numaproj.numaflow.sessionreducer;

/**
* ActorRequestType represents the purpose of an ActorRequest.
*/
public enum ActorRequestType {
// open a brand-new session window
OPEN,
// append a message to an existing session window
APPEND,
// close a session window
CLOSE,
// expand a session window
EXPAND,
// ask a to-be-merged session window for it's accumulator
GET_ACCUMULATOR,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.numaproj.numaflow.sessionreducer;

import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

/**
* The actor response holds the session reduce response from a particular session window.
*/
@Getter
@Setter
class ActorResponse {
Sessionreduce.SessionReduceResponse response;
// The isLast attribute indicates whether the response is globally the last one to be sent to
// the output gRPC stream, if set to true, it means the response is the very last response among
// all windows. When output actor receives an isLast response, it sends the response to and immediately
// closes the output stream.
boolean isLast;
// The accumulator attribute holds the accumulator of the session.
byte[] accumulator;
// The mergeTaskId attribute holds the id of the merged window that this session is to be merged into.
String mergeTaskId;

@Builder
private ActorResponse(
Sessionreduce.SessionReduceResponse response,
boolean isLast,
byte[] accumulator,
String mergeTaskId
) {
this.response = response;
this.isLast = isLast;
this.accumulator = accumulator;
this.mergeTaskId = mergeTaskId;
}

static class ActorResponseBuilder {
ActorResponse build() {
if ((accumulator != null && mergeTaskId == null) || (accumulator == null
&& mergeTaskId != null)) {
throw new IllegalStateException(
"attributes accumulator and mergeTaskId should be either both null or both non-null.");
}
return new ActorResponse(response, isLast, accumulator, mergeTaskId);
}
}

boolean isEOFResponse() {
return this.accumulator == null && this.mergeTaskId == null;
}
}
15 changes: 15 additions & 0 deletions src/main/java/io/numaproj/numaflow/sessionreducer/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.numaproj.numaflow.sessionreducer;

class Constants {
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;

public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/sessionreduce.sock";

public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sessionreducer-server-info";

public static final String EOF = "EOF";

public static final String SUCCESS = "SUCCESS";

public static final String DELIMITER = ":";
}
25 changes: 25 additions & 0 deletions src/main/java/io/numaproj/numaflow/sessionreducer/GRPCConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.numaproj.numaflow.sessionreducer;

import lombok.Builder;
import lombok.Getter;

/**
* GRPCConfig is used to provide configurations for gRPC server.
*/
@Getter
@Builder(builderMethodName = "newBuilder")
public class GRPCConfig {
private String socketPath;
private int maxMessageSize;
private String infoFilePath;

/**
* Static method to create default GRPCConfig.
*/
static GRPCConfig defaultGrpcConfig() {
return GRPCConfig.newBuilder()
.infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.numaproj.numaflow.sessionreducer;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* GetAccumulatorRequest is sent by supervisor actor to inform a session reducer actor that
* the window is to be merged with other windows.
* <p>
* "I am working on a merge task (mergeTaskId),
* and you are one of the windows to be merged.
* Please give me your accumulator."
*/
@AllArgsConstructor
@Getter
class GetAccumulatorRequest {
private final String mergeTaskId;
}
Loading

0 comments on commit 23e708d

Please sign in to comment.