Skip to content

Commit

Permalink
make MERGE a blocking call
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Mar 12, 2024
1 parent 8cc3eff commit cf4efa5
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ActorRequest {
private final Sessionreduce.KeyedWindow newKeyedWindow;
// the payload of the request
private final Sessionreduce.SessionReduceRequest.Payload payload;
// the id of the merge task this request belongs to
// the id of the merge task this request belongs to, it's equal to the unique id of the merged window,
// it is specified only when the actor request is a GET_ACCUMULATOR
private final String mergeTaskId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ public enum ActorRequestType {
CLOSE,
// expand a session window
EXPAND,
// open a brand-new session window which is created from a merge operation
MERGE_OPEN,
// ask a to-be-merged session window for it's accumulator
GET_ACCUMULATOR,
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ActorResponse {
boolean isLast;
// The accumulator attribute holds the accumulator of the session.
byte[] accumulator;
// The mergeTaskId attribute holds the merge task id that this session is to be merged into.
// The mergeTaskId attribute holds the id of the merged window that this session is to be merged into.
String mergeTaskId;

@Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
* containing the accumulator of the session.
* <p>
* "Hey supervisor, I am the session window fromKeyedWindow,
* I am returning my accumulator so that you can ask mergeTaskId to merge me."
* I am returning my accumulator so that you can merge me."
*/
@AllArgsConstructor
@Getter
class GetAccumulatorResponse {
private final Sessionreduce.KeyedWindow fromKeyedWindow;
private final String mergeTaskId;
private final byte[] accumulator;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@
* ask the actor to merge an accumulator.
* <p>
* "Hey, I received an accumulator from one of the sessions that are merging to you, please merge it with yourself."
* "Also, you may be interested to know that this one is the last one to merge,
* so that after merging it, you can mark yourself as no longer in a merging process."
*/
@AllArgsConstructor
@Getter
class MergeAccumulatorRequest {
private final boolean isLast;
private final byte[] accumulator;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.numaproj.numaflow.sessionreducer;

/**
* MergeDoneResponse indicates the current MERGE request is done.
*/
public class MergeDoneResponse {
}
29 changes: 28 additions & 1 deletion src/main/java/io/numaproj/numaflow/sessionreducer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AllDeadLetters;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
Expand All @@ -11,6 +13,9 @@
import io.numaproj.numaflow.sessionreducer.model.SessionReducer;
import io.numaproj.numaflow.sessionreducer.model.SessionReducerFactory;
import lombok.extern.slf4j.Slf4j;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -79,7 +84,29 @@ public StreamObserver<Sessionreduce.SessionReduceRequest> sessionReduceFn(final
public void onNext(Sessionreduce.SessionReduceRequest sessionReduceRequest) {
// send the message to parent actor, which takes care of distribution.
if (!supervisorActor.isTerminated()) {
supervisorActor.tell(sessionReduceRequest, ActorRef.noSender());
// if the operation is a MERGE, make it a blocking call.
if (sessionReduceRequest.getOperation().getEvent()
== Sessionreduce.SessionReduceRequest.WindowOperation.Event.MERGE) {
// set time out to 1 second as we expect a MERGE operation to finish quickly.
Timeout timeout = new Timeout(Duration.create(1, "seconds"));
try {
// ask the supervisor to process a merge request.
Future<Object> future = Patterns.ask(
supervisorActor,
sessionReduceRequest,
timeout);
// await for the merge done response.
MergeDoneResponse response = (MergeDoneResponse) Await.result(
future,
timeout.duration());
} catch (Exception e) {
responseObserver.onError(new Throwable(
"Supervisor actor failed processing a MERGE request: "
+ e.getMessage()));
}
} else {
supervisorActor.tell(sessionReduceRequest, ActorRef.noSender());
}
} else {
responseObserver.onError(new Throwable("Supervisor actor was terminated"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,28 @@ class SessionReducerActor extends AbstractActor {
private Sessionreduce.KeyedWindow keyedWindow;
private final SessionReducer sessionReducer;
private OutputStreamObserver outputStream;
// when set to true, it means the corresponding session is in the process of merging with other windows.
private boolean isMerging;
// when set to true, it means this session is pending EOF, it already received a CLOSE/EOF request, but it hasn't finished its MERGE job yet.
private boolean eofPending = false;
// when set to true, it means this session is already closed.
private boolean isClosed = false;

public SessionReducerActor(
Sessionreduce.KeyedWindow keyedWindow,
SessionReducer sessionReducer,
OutputStreamObserver outputStream,
boolean isMerging) {
OutputStreamObserver outputStream) {
this.keyedWindow = keyedWindow;
this.sessionReducer = sessionReducer;
this.outputStream = outputStream;
this.isMerging = isMerging;
}

public static Props props(
Sessionreduce.KeyedWindow keyedWindow,
SessionReducer groupBy,
ActorRef outputActor,
boolean isMerging) {
ActorRef outputActor) {
return Props.create(
SessionReducerActor.class,
keyedWindow,
groupBy,
new OutputStreamObserverImpl(outputActor, keyedWindow),
isMerging);
new OutputStreamObserverImpl(outputActor, keyedWindow)
);
}

@Override
Expand All @@ -64,15 +57,6 @@ public Receive createReceive() {
// receiving a new keyed window, update the keyed window.
// this is for EXPAND operation.
private void updateKeyedWindow(Sessionreduce.KeyedWindow newKeyedWindow) {
if (this.isClosed) {
throw new RuntimeException(
"received an expand request but the session is already closed.");
}
if (this.isMerging) {
throw new RuntimeException(
"cannot expand a session window when it's in a merging process."
+ " window info: " + this.keyedWindow.toString());
}
// update the keyed window
this.keyedWindow = newKeyedWindow;
// update the output stream to use the new keyed window
Expand All @@ -83,15 +67,6 @@ private void updateKeyedWindow(Sessionreduce.KeyedWindow newKeyedWindow) {
// when receiving a message, process it.
// this is for OPEN/APPEND operation.
private void invokeHandler(HandlerDatum handlerDatum) {
if (this.isClosed) {
throw new RuntimeException(
"received a message but the session is already closed.");
}
if (this.isMerging) {
throw new RuntimeException(
"cannot process messages when the session window is in a merging process."
+ " window info: " + this.keyedWindow.toString());
}
this.sessionReducer.processMessage(
keyedWindow.getKeysList().toArray(new String[0]),
handlerDatum,
Expand All @@ -101,30 +76,22 @@ private void invokeHandler(HandlerDatum handlerDatum) {
// receiving an EOF signal, close the window.
// this is for CLOSE operation or for the close of gRPC input stream.
private void handleEOF(String EOF) {
if (this.isMerging) {
// the session is in a merging process, wait until it finishes before processing EOF.
this.eofPending = true;
return;
} else if (this.isClosed) {
// we only process EOF once.
if (this.isClosed) {
return;
}
this.processEOF();
// invoke handleEndOfStream to materialize the messages received so far.
this.sessionReducer.handleEndOfStream(
keyedWindow.getKeysList().toArray(new String[0]),
outputStream);
// construct an actor response and send it back to the supervisor actor, indicating the actor
// has finished processing all the messages for the corresponding keyed window.
getSender().tell(buildEOFResponse(), getSelf());
this.isClosed = true;
}

// receiving a GetAccumulatorRequest, return the accumulator of the window.
// this is for MERGE operation.
private void handleGetAccumulatorRequest(GetAccumulatorRequest getAccumulatorRequest) {
if (this.isClosed) {
throw new RuntimeException(
"received a get accumulator request but the session is already closed.");
}
if (this.isMerging) {
throw new RuntimeException(
"cannot process a GetAccumulatorRequest when the session window is already in a merging process."
+ " window info: " + this.keyedWindow.toString());
}
this.isMerging = true;
getSender().tell(buildMergeResponse(
this.sessionReducer.accumulator(),
getAccumulatorRequest.getMergeTaskId())
Expand All @@ -137,36 +104,7 @@ private void handleGetAccumulatorRequest(GetAccumulatorRequest getAccumulatorReq
// receiving a MergeAccumulatorRequest, merge the accumulator.
// this is for MERGE operation.
private void handleMergeAccumulatorRequest(MergeAccumulatorRequest mergeAccumulatorRequest) {
if (this.isClosed) {
throw new RuntimeException(
"received a merge accumulator request but the session is already closed.");
}
if (!this.isMerging) {
throw new RuntimeException(
"received a merge accumulator request but the session is not in a merging process.");
}
this.sessionReducer.mergeAccumulator(mergeAccumulatorRequest.getAccumulator());
if (mergeAccumulatorRequest.isLast()) {
// I have merged the last accumulator, I am no longer in a MERGE process.
this.isMerging = false;
if (this.eofPending) {
// I was asked to close, now that I finished the MERGE operation,
// I can close myself.
this.processEOF();
}
}
}

private void processEOF() {
// invoke handleEndOfStream to materialize the messages received so far.
this.sessionReducer.handleEndOfStream(
keyedWindow.getKeysList().toArray(new String[0]),
outputStream);
// construct an actor response and send it back to the supervisor actor, indicating the actor
// has finished processing all the messages for the corresponding keyed window.
getSender().tell(buildEOFResponse(), getSelf());
this.eofPending = false;
this.isClosed = true;
}

private ActorResponse buildEOFResponse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ class SupervisorActor extends AbstractActor {
// actorMap maintains a map of active sessions.
// key is the unique id of a session, value is the reference to the actor working on the session.
private final Map<String, ActorRef> actorsMap = new HashMap<>();
// mergeTracker keeps track of the merge tasks that are in progress.
// key is the unique id of a merged task, value is how many accumulators are pending aggregation for this task.
private final Map<String, Integer> mergeTracker = new HashMap<>();
// number of accumulators to be collected by current MERGE request.
private int numberOfPendingAccumulators;
// when set to true, isInputStreamClosed means the gRPC input stream has reached EOF.
private boolean isInputStreamClosed = false;
// mergeRequestSender is used to track the reference to the merge request sender,
// it's used to report back when the MERGE request is completed.
// the mergeRequest is sent using ask, which creates a temporary actor as sender behind the scene.
private ActorRef mergeRequestSender;

public SupervisorActor(
SessionReducerFactory<? extends SessionReducer> sessionReducerFactory,
Expand Down Expand Up @@ -185,6 +188,7 @@ private void handleReduceRequest(Sessionreduce.SessionReduceRequest request) {
break;
}
case MERGE: {
this.mergeRequestSender = getSender();
Timestamp mergedStartTime = windowOperation.getKeyedWindows(0).getStart();
Timestamp mergedEndTime = windowOperation.getKeyedWindows(0).getEnd();
for (Sessionreduce.KeyedWindow window : windowOperation.getKeyedWindowsList()) {
Expand Down Expand Up @@ -221,7 +225,7 @@ private void handleReduceRequest(Sessionreduce.SessionReduceRequest request) {
.setSlot(windowOperation.getKeyedWindows(0).getSlot()).build();

String mergeTaskId = UniqueIdGenerator.getUniqueIdentifier(mergedWindow);
this.mergeTracker.put(mergeTaskId, windowOperation.getKeyedWindowsCount());
this.numberOfPendingAccumulators = windowOperation.getKeyedWindowsCount();
for (Sessionreduce.KeyedWindow window : windowOperation.getKeyedWindowsList()) {
// tell the session reducer actor - "hey, you are about to be merged."
ActorRequest getAccumulatorRequest = ActorRequest.builder()
Expand All @@ -237,11 +241,11 @@ private void handleReduceRequest(Sessionreduce.SessionReduceRequest request) {
// the existing window with the new one.
// the accumulator of the old window will get merged to the new window eventually,
// when the supervisor receives the get accumulator response.
ActorRequest mergeOpenRequest = ActorRequest.builder()
.type(ActorRequestType.MERGE_OPEN)
ActorRequest openRequest = ActorRequest.builder()
.type(ActorRequestType.OPEN)
.keyedWindow(mergedWindow)
.build();
this.invokeActor(mergeOpenRequest);
this.invokeActor(openRequest);
break;
}
default:
Expand All @@ -259,8 +263,8 @@ private void invokeActor(ActorRequest actorRequest) {
.actorOf(SessionReducerActor.props(
actorRequest.getKeyedWindow(),
sessionReducer,
this.outputActor,
false));
this.outputActor
));
this.actorsMap.put(uniqueId, actorRef);
break;
}
Expand All @@ -271,8 +275,7 @@ private void invokeActor(ActorRequest actorRequest) {
.actorOf(SessionReducerActor.props(
actorRequest.getKeyedWindow(),
sessionReducer,
this.outputActor,
false));
this.outputActor));
this.actorsMap.put(uniqueId, actorRef);
}
break;
Expand All @@ -287,17 +290,6 @@ private void invokeActor(ActorRequest actorRequest) {
this.actorsMap.get(uniqueId).tell(actorRequest.getNewKeyedWindow(), getSelf());
break;
}
case MERGE_OPEN: {
SessionReducer sessionReducer = sessionReducerFactory.createSessionReducer();
ActorRef actorRef = getContext()
.actorOf(SessionReducerActor.props(
actorRequest.getKeyedWindow(),
sessionReducer,
this.outputActor,
true));
this.actorsMap.put(uniqueId, actorRef);
break;
}
case GET_ACCUMULATOR: {
this.actorsMap
.get(uniqueId)
Expand Down Expand Up @@ -335,28 +327,24 @@ private void handleActorResponse(ActorResponse actorResponse) {
} else {
// handle get accumulator response
String mergeTaskId = actorResponse.getMergeTaskId();
if (!this.mergeTracker.containsKey(mergeTaskId)) {
throw new RuntimeException(
"received an accumulator but the corresponding merge task doesn't exist.");
}
if (!this.actorsMap.containsKey(mergeTaskId)) {
throw new RuntimeException(
"received an accumulator but the corresponding parent merge session doesn't exist.");
}
this.mergeTracker.put(mergeTaskId, this.mergeTracker.get(mergeTaskId) - 1);
this.numberOfPendingAccumulators--;
if (!responseWindowId.equals(mergeTaskId)) {
// release the session that returns us the accumulator, indicating it has finished its lifecycle.
// the session is released without being explicitly closed because it has been merged and tracked by the newly merged session.
// we release a session by simply removing it from the actor map so that the corresponding actor
// gets de-referred and handled by Java GC.
this.actorsMap.remove(responseWindowId);
}
this.actorsMap.get(mergeTaskId).tell(new MergeAccumulatorRequest(
this.mergeTracker.get(mergeTaskId) == 0,
actorResponse.getAccumulator()), getSelf());
if (this.mergeTracker.get(mergeTaskId) == 0) {
// remove the task from the merge tracker when there is no more pending accumulators to merge.
this.mergeTracker.remove(mergeTaskId);
this.actorsMap.get(mergeTaskId).tell(
new MergeAccumulatorRequest(
actorResponse.getAccumulator()), getSelf());
if (this.numberOfPendingAccumulators == 0) {
// tell the gRPC input stream that the merge request is completely processed.
this.mergeRequestSender.tell(new MergeDoneResponse(), getSelf());
}
}
}
Expand Down
Loading

0 comments on commit cf4efa5

Please sign in to comment.