Skip to content

Commit

Permalink
feat: Unify MapStream and Unary Map Operations Using a Shared gRPC Pr…
Browse files Browse the repository at this point in the history
…otocol (#146)

Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Oct 14, 2024
1 parent a5cdc00 commit 707c5bf
Show file tree
Hide file tree
Showing 15 changed files with 231 additions and 137 deletions.
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ private void buildAndStreamResponse(
// Send an EOT message to indicate the end of the transmission for the batch.
MapOuterClass.MapResponse eotResponse = MapOuterClass.MapResponse
.newBuilder()
.setStatus(MapOuterClass.Status.newBuilder().setEot(true).build()).build();
.setStatus(MapOuterClass.TransmissionStatus.newBuilder().setEot(true).build())
.build();
responseObserver.onNext(eotResponse);
responseObserver.onCompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.mapstream.v1.Mapstream;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import lombok.AllArgsConstructor;

import java.util.ArrayList;
Expand All @@ -14,17 +14,17 @@
*/
@AllArgsConstructor
class OutputObserverImpl implements OutputObserver {
StreamObserver<Mapstream.MapStreamResponse> responseObserver;
StreamObserver<MapOuterClass.MapResponse> responseObserver;

@Override
public void send(Message message) {
Mapstream.MapStreamResponse response = buildResponse(message);
MapOuterClass.MapResponse response = buildResponse(message);
responseObserver.onNext(response);
}

private Mapstream.MapStreamResponse buildResponse(Message message) {
return Mapstream.MapStreamResponse.newBuilder()
.setResult(Mapstream.MapStreamResponse.Result.newBuilder()
private MapOuterClass.MapResponse buildResponse(Message message) {
return MapOuterClass.MapResponse.newBuilder()
.addResults(MapOuterClass.MapResponse.Result.newBuilder()
.setValue(
message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
message.getValue()))
Expand All @@ -33,6 +33,5 @@ private Mapstream.MapStreamResponse buildResponse(Message message) {
.addAllTags(message.getTags()
== null ? new ArrayList<>() : List.of(message.getTags()))
.build()).build();

}
}
117 changes: 79 additions & 38 deletions src/main/java/io/numaproj/numaflow/mapstreamer/Service.java
Original file line number Diff line number Diff line change
@@ -1,64 +1,105 @@
package io.numaproj.numaflow.mapstreamer;

import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.mapstream.v1.MapStreamGrpc;
import io.numaproj.numaflow.mapstream.v1.Mapstream;
import io.numaproj.numaflow.map.v1.MapGrpc;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.time.Instant;

import static io.numaproj.numaflow.mapstream.v1.MapStreamGrpc.getMapStreamFnMethod;

import static io.numaproj.numaflow.map.v1.MapGrpc.getMapFnMethod;

@Slf4j
@AllArgsConstructor
class Service extends MapStreamGrpc.MapStreamImplBase {
class Service extends MapGrpc.MapImplBase {

private final MapStreamer mapStreamer;

/**
* Applies a map stream function to each request.
*/
@Override
public void mapStreamFn(
Mapstream.MapStreamRequest request,
StreamObserver<Mapstream.MapStreamResponse> responseObserver) {
public StreamObserver<MapOuterClass.MapRequest> mapFn(StreamObserver<MapOuterClass.MapResponse> responseObserver) {

if (this.mapStreamer == null) {
io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(
getMapStreamFnMethod(),
return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(
getMapFnMethod(),
responseObserver);
return;
}

HandlerDatum handlerDatum = new HandlerDatum(
request.getValue().toByteArray(),
Instant.ofEpochSecond(
request.getWatermark().getSeconds(),
request.getWatermark().getNanos()),
Instant.ofEpochSecond(
request.getEventTime().getSeconds(),
request.getEventTime().getNanos()),
request.getHeadersMap()
);
return new StreamObserver<>() {
private boolean handshakeDone = false;

// process Datum
this.mapStreamer.processMessage(request
.getKeysList()
.toArray(new String[0]), handlerDatum, new OutputObserverImpl(responseObserver));
@Override
public void onNext(MapOuterClass.MapRequest request) {
// make sure the handshake is done before processing the messages
if (!handshakeDone) {
if (!request.hasHandshake() || !request.getHandshake().getSot()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Handshake request not received")
.asException());
return;
}
responseObserver.onNext(MapOuterClass.MapResponse.newBuilder()
.setHandshake(request.getHandshake())
.build());
handshakeDone = true;
return;
}

responseObserver.onCompleted();
}
try {
// process the message
mapStreamer.processMessage(
request
.getRequest()
.getKeysList()
.toArray(new String[0]),
constructHandlerDatum(request),
new OutputObserverImpl(responseObserver));
} catch (Exception e) {
log.error("Error processing message", e);
responseObserver.onError(Status.UNKNOWN
.withDescription(e.getMessage())
.asException());
return;
}

/**
* IsReady is the heartbeat endpoint for gRPC.
*/
@Override
public void isReady(Empty request, StreamObserver<Mapstream.ReadyResponse> responseObserver) {
responseObserver.onNext(Mapstream.ReadyResponse.newBuilder().setReady(true).build());
responseObserver.onCompleted();
// Send an EOT message to indicate the end of the transmission for the batch.
MapOuterClass.MapResponse eotResponse = MapOuterClass.MapResponse
.newBuilder()
.setStatus(MapOuterClass.TransmissionStatus
.newBuilder()
.setEot(true)
.build()).build();
responseObserver.onNext(eotResponse);
}

@Override
public void onError(Throwable throwable) {
log.error("Error Encountered in mapStream Stream", throwable);
var status = Status.UNKNOWN
.withDescription(throwable.getMessage())
.withCause(throwable);
responseObserver.onError(status.asException());
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}

// Construct a HandlerDatum from a MapRequest
private HandlerDatum constructHandlerDatum(MapOuterClass.MapRequest d) {
return new HandlerDatum(
d.getRequest().getValue().toByteArray(),
Instant.ofEpochSecond(
d.getRequest().getWatermark().getSeconds(),
d.getRequest().getWatermark().getNanos()),
Instant.ofEpochSecond(
d.getRequest().getEventTime().getSeconds(),
d.getRequest().getEventTime().getNanos()),
d.getRequest().getHeadersMap()
);
}
}
10 changes: 10 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ public void onNext(SinkOuterClass.SinkRequest request) {
responseObserver.onNext(sinkResponse);
});

// send eot response to indicate end of transmission for the batch
SinkOuterClass.SinkResponse eotResponse = SinkOuterClass.SinkResponse
.newBuilder()
.setStatus(SinkOuterClass.TransmissionStatus
.newBuilder()
.setEot(true)
.build())
.build();
responseObserver.onNext(eotResponse);

// reset the startOfStream flag, since the stream has ended
// so that the next request will be treated as the start of the stream
startOfStream = true;
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void onCompleted() {
}
// send end of transmission message
requestObserver.onNext(SinkOuterClass.SinkRequest.newBuilder().setStatus(
SinkOuterClass.SinkRequest.Status.newBuilder().setEot(true)).build());
SinkOuterClass.TransmissionStatus.newBuilder().setEot(true)).build());

requestObserver.onCompleted();

Expand All @@ -197,6 +197,9 @@ public void onCompleted() {
if (result.getHandshake().getSot()) {
continue;
}
if (result.hasStatus() && result.getStatus().getEot()) {
continue;
}
if (result.getResult().getStatus() == SinkOuterClass.Status.SUCCESS) {
responseListBuilder.addResponse(Response.responseOK(result
.getResult()
Expand Down
6 changes: 3 additions & 3 deletions src/main/proto/map/v1/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ message MapRequest {
// This ID is used to uniquely identify a map request
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
optional TransmissionStatus status = 4;
}

/*
Expand All @@ -44,7 +44,7 @@ message Handshake {
/*
* Status message to indicate the status of the message.
*/
message Status {
message TransmissionStatus {
bool eot = 1;
}

Expand All @@ -61,7 +61,7 @@ message MapResponse {
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
optional TransmissionStatus status = 4;
}

/**
Expand Down
46 changes: 0 additions & 46 deletions src/main/proto/mapstream/v1/mapstream.proto

This file was deleted.

13 changes: 9 additions & 4 deletions src/main/proto/sink/v1/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@ message SinkRequest {
string id = 5;
map<string, string> headers = 6;
}
message Status {
bool eot = 1;
}
// Required field indicating the request.
Request request = 1;
// Required field indicating the status of the request.
// If eot is set to true, it indicates the end of transmission.
Status status = 2;
TransmissionStatus status = 2;
// optional field indicating the handshake message.
optional Handshake handshake = 3;
}
Expand All @@ -54,6 +51,13 @@ message ReadyResponse {
bool ready = 1;
}

/**
* TransmissionStatus is the status of the transmission.
*/
message TransmissionStatus {
bool eot = 1;
}

/*
* Status is the status of the response.
*/
Expand All @@ -77,4 +81,5 @@ message SinkResponse {
}
Result result = 1;
optional Handshake handshake = 2;
optional TransmissionStatus status = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testErrorFromUDF() {
inputStreamObserver.onNext(request);
inputStreamObserver.onNext(MapOuterClass.MapRequest
.newBuilder()
.setStatus(MapOuterClass.Status.newBuilder().setEot(true))
.setStatus(MapOuterClass.TransmissionStatus.newBuilder().setEot(true))
.build());
inputStreamObserver.onCompleted();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testBatchMapHappyPath() {

inputStreamObserver.onNext(MapOuterClass.MapRequest
.newBuilder()
.setStatus(MapOuterClass.Status.newBuilder().setEot(true))
.setStatus(MapOuterClass.TransmissionStatus.newBuilder().setEot(true))
.build());
inputStreamObserver.onCompleted();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.numaproj.numaflow.mapstreamer;

import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapOuterClass;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class MapStreamOutputStreamObserver implements StreamObserver<MapOuterClass.MapResponse> {
List<MapOuterClass.MapResponse> mapResponses = new ArrayList<>();
CompletableFuture<Void> done = new CompletableFuture<>();
Integer responseCount;

public MapStreamOutputStreamObserver(Integer responseCount) {
this.responseCount = responseCount;
}

@Override
public void onNext(MapOuterClass.MapResponse mapResponse) {
System.out.println("Received response: " + mapResponse);
mapResponses.add(mapResponse);
if (mapResponses.size() == responseCount) {
done.complete(null);
}
}

@Override
public void onError(Throwable throwable) {
done.completeExceptionally(throwable);
}

@Override
public void onCompleted() {
done.complete(null);
}

public List<MapOuterClass.MapResponse> getMapResponses() {
return mapResponses;
}
}
Loading

0 comments on commit 707c5bf

Please sign in to comment.