Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: udsink bidirectional streaming #141

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/sinker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void start() throws Exception {
log.info(
"Server started, listening on {}",
grpcConfig.isLocal() ?
"localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath());
"localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
139 changes: 77 additions & 62 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package io.numaproj.numaflow.sinker;

import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.sink.v1.SinkGrpc;
import io.numaproj.numaflow.sink.v1.SinkOuterClass;
import lombok.extern.slf4j.Slf4j;

import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static io.numaproj.numaflow.sink.v1.SinkGrpc.getSinkFnMethod;

@Slf4j
class Service extends SinkGrpc.SinkImplBase {
// sinkTaskExecutor is the executor for the sinker. It is a fixed size thread pool
Expand All @@ -24,12 +22,6 @@ class Service extends SinkGrpc.SinkImplBase {
private final ExecutorService sinkTaskExecutor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

// SHUTDOWN_TIME is the time to wait for the sinker to shut down, in seconds.
// We use 30 seconds as the default value because it provides a balance between giving tasks enough time to complete
// and not delaying program termination unduly.
private final long SHUTDOWN_TIME = 30;


private final Sinker sinker;

public Service(Sinker sinker) {
Expand All @@ -41,25 +33,60 @@ public Service(Sinker sinker) {
*/
@Override
public StreamObserver<SinkOuterClass.SinkRequest> sinkFn(StreamObserver<SinkOuterClass.SinkResponse> responseObserver) {
if (this.sinker == null) {
return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(
getSinkFnMethod(),
responseObserver);
}
return new StreamObserver<>() {
private boolean startOfStream = true;
private CompletableFuture<ResponseList> result;
private DatumIteratorImpl datumStream;
private boolean handshakeDone = false;

DatumIteratorImpl datumStream = new DatumIteratorImpl();
@Override
public void onNext(SinkOuterClass.SinkRequest request) {
// make sure the handshake is done before processing the messages
if (!handshakeDone) {
if (!request.hasHandshake() || !request.getHandshake().getSot()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Handshake request not received")
.asException());
return;
}
responseObserver.onNext(SinkOuterClass.SinkResponse.newBuilder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In go sdk, we also set result status to success.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to, I will change that in go sdk.

.setHandshake(request.getHandshake())
.build());
handshakeDone = true;
return;
}

Future<ResponseList> result = sinkTaskExecutor.submit(() -> this.sinker.processMessages(
datumStream));
// Create a DatumIterator to write the messages to the sinker
// and start the sinker if it is the start of the stream
if (startOfStream) {
datumStream = new DatumIteratorImpl();
result = CompletableFuture.supplyAsync(
() -> sinker.processMessages(datumStream),
RohanAshar marked this conversation as resolved.
Show resolved Hide resolved
sinkTaskExecutor);
startOfStream = false;
}

return new StreamObserver<SinkOuterClass.SinkRequest>() {
@Override
public void onNext(SinkOuterClass.SinkRequest d) {
try {
datumStream.writeMessage(constructHandlerDatum(d));
} catch (InterruptedException e) {
Thread.interrupted();
onError(e);
if (request.getStatus().getEot()) {
// End of transmission, write EOF datum to the stream
// Wait for the result and send the response back to the client
datumStream.writeMessage(HandlerDatum.EOF_DATUM);

ResponseList responses = result.join();
responses.getResponses().forEach(response -> {
SinkOuterClass.SinkResponse sinkResponse = buildResponse(response);
responseObserver.onNext(sinkResponse);
});

// reset the startOfStream flag, since the stream has ended
// so that the next request will be treated as the start of the stream
startOfStream = true;
} else {
datumStream.writeMessage(constructHandlerDatum(request));
}
} catch (Exception e) {
log.error("Encountered error in sinkFn - {}", e.getMessage());
responseObserver.onError(e);
}
}

Expand All @@ -71,26 +98,23 @@ public void onError(Throwable throwable) {

@Override
public void onCompleted() {
SinkOuterClass.SinkResponse response = SinkOuterClass.SinkResponse
.newBuilder()
.build();
try {
datumStream.writeMessage(HandlerDatum.EOF_DATUM);
// wait until the sink handler returns, result.get() is a blocking call
ResponseList responses = result.get();
// construct responseList from responses
response = buildResponseList(responses);

} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
onError(e);
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
}

private SinkOuterClass.SinkResponse buildResponse(Response response) {
SinkOuterClass.Status status = response.getFallback() ? SinkOuterClass.Status.FALLBACK :
response.getSuccess() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE;
return SinkOuterClass.SinkResponse.newBuilder()
.setResult(SinkOuterClass.SinkResponse.Result.newBuilder()
.setId(response.getId() == null ? "" : response.getId())
.setErrMsg(response.getErr() == null ? "" : response.getErr())
.setStatus(status)
.build())
.build();
}

/**
* IsReady is the heartbeat endpoint for gRPC.
*/
Expand All @@ -104,37 +128,28 @@ public void isReady(

private HandlerDatum constructHandlerDatum(SinkOuterClass.SinkRequest d) {
return new HandlerDatum(
d.getKeysList().toArray(new String[0]),
d.getValue().toByteArray(),
d.getRequest().getKeysList().toArray(new String[0]),
d.getRequest().getValue().toByteArray(),
Instant.ofEpochSecond(
d.getWatermark().getSeconds(),
d.getWatermark().getNanos()),
d.getRequest().getWatermark().getSeconds(),
d.getRequest().getWatermark().getNanos()),
Instant.ofEpochSecond(
d.getEventTime().getSeconds(),
d.getEventTime().getNanos()),
d.getId(),
d.getHeadersMap()
d.getRequest().getEventTime().getSeconds(),
d.getRequest().getEventTime().getNanos()),
d.getRequest().getId(),
d.getRequest().getHeadersMap()
);
}

public SinkOuterClass.SinkResponse buildResponseList(ResponseList responses) {
var responseBuilder = SinkOuterClass.SinkResponse.newBuilder();
responses.getResponses().forEach(response -> {
SinkOuterClass.Status status = response.getFallback() ? SinkOuterClass.Status.FALLBACK :
response.getSuccess() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE;
responseBuilder.addResults(SinkOuterClass.SinkResponse.Result.newBuilder()
.setId(response.getId() == null ? "" : response.getId())
.setErrMsg(response.getErr() == null ? "" : response.getErr())
.setStatus(status)
.build());
});
return responseBuilder.build();
}

// shuts down the executor service which is used for reduce
public void shutDown() {
this.sinkTaskExecutor.shutdown();
try {
// SHUTDOWN_TIME is the time to wait for the sinker to shut down, in seconds.
// We use 30 seconds as the default value because it provides a balance between giving tasks enough time to complete
// and not delaying program termination unduly.
long SHUTDOWN_TIME = 30;

if (!sinkTaskExecutor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
log.error("Sink executor did not terminate in the specified time.");
List<Runnable> droppedTasks = sinkTaskExecutor.shutdownNow();
Expand Down
49 changes: 33 additions & 16 deletions src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@ public Client(String host, int port) {
* @return response from the server as a ResponseList
*/
public ResponseList sendRequest(DatumIterator datumIterator) {
CompletableFuture<SinkOuterClass.SinkResponse> future = new CompletableFuture<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, as to why SinkerTestKit lives in this package?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for enabling users do "Component testing".

List<SinkOuterClass.SinkResponse> responses = new ArrayList<>();
CompletableFuture<List<SinkOuterClass.SinkResponse>> future = new CompletableFuture<>();

StreamObserver<SinkOuterClass.SinkResponse> responseObserver = new StreamObserver<>() {
@Override
public void onNext(SinkOuterClass.SinkResponse response) {
future.complete(response);
responses.add(response);
}

@Override
Expand All @@ -127,16 +129,19 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
if (!future.isDone()) {
future.completeExceptionally(new RuntimeException(
"Server completed without a response"));
}
future.complete(responses);
}
};

StreamObserver<SinkOuterClass.SinkRequest> requestObserver = sinkStub.sinkFn(
responseObserver);

// send handshake request
requestObserver.onNext(SinkOuterClass.SinkRequest.newBuilder()
.setHandshake(SinkOuterClass.Handshake.newBuilder().setSot(true).build())
.build());

// send actual requests
while (true) {
Datum datum = null;
try {
Expand All @@ -148,7 +153,8 @@ public void onCompleted() {
if (datum == null) {
break;
}
SinkOuterClass.SinkRequest request = SinkOuterClass.SinkRequest.newBuilder()
SinkOuterClass.SinkRequest.Request request = SinkOuterClass.SinkRequest.Request
.newBuilder()
.addAllKeys(
datum.getKeys()
== null ? new ArrayList<>() : List.of(datum.getKeys()))
Expand All @@ -168,28 +174,39 @@ public void onCompleted() {
.putAllHeaders(
datum.getHeaders() == null ? new HashMap<>() : datum.getHeaders())
.build();
requestObserver.onNext(request);
requestObserver.onNext(SinkOuterClass.SinkRequest
.newBuilder()
.setRequest(request)
.build());
}
// send end of transmission message
requestObserver.onNext(SinkOuterClass.SinkRequest.newBuilder().setStatus(
SinkOuterClass.SinkRequest.Status.newBuilder().setEot(true)).build());

requestObserver.onCompleted();

SinkOuterClass.SinkResponse response;
List<SinkOuterClass.SinkResponse> outputResponses;
try {
response = future.get();
outputResponses = future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}

ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder();
for (SinkOuterClass.SinkResponse.Result result : response.getResultsList()) {
if (result.getStatus() == SinkOuterClass.Status.SUCCESS) {
responseListBuilder.addResponse(Response.responseOK(result.getId()));
} else if (result.getStatus() == SinkOuterClass.Status.FALLBACK) {
for (SinkOuterClass.SinkResponse result : outputResponses) {
if (result.getHandshake().getSot()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does getSot do?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sot means start of the transmission. when set to true, it means the handshake is successful.

continue;
}
if (result.getResult().getStatus() == SinkOuterClass.Status.SUCCESS) {
responseListBuilder.addResponse(Response.responseOK(result
.getResult()
.getId()));
} else if (result.getResult().getStatus() == SinkOuterClass.Status.FALLBACK) {
responseListBuilder.addResponse(Response.responseFallback(
result.getId()));
result.getResult().getId()));
} else {
responseListBuilder.addResponse(Response.responseFailure(
result.getId(), result.getErrMsg()));
result.getResult().getId(), result.getResult().getErrMsg()));
}
}

Expand Down
33 changes: 23 additions & 10 deletions src/main/java/io/numaproj/numaflow/sourcer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,25 @@ public Service(Sourcer sourcer) {
@Override
public StreamObserver<SourceOuterClass.ReadRequest> readFn(final StreamObserver<SourceOuterClass.ReadResponse> responseObserver) {
return new StreamObserver<>() {
private boolean handshakeDone = false;

@Override
public void onNext(SourceOuterClass.ReadRequest request) {
// if the request is a handshake, send handshake response.
if (request.hasHandshake() && request.getHandshake().getSot()) {
// make sure that the handshake is done before processing the read requests
if (!handshakeDone) {
if (!request.hasHandshake() || !request.getHandshake().getSot()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Handshake request not received")
.asException());
return;
}
responseObserver.onNext(SourceOuterClass.ReadResponse.newBuilder()
.setHandshake(request.getHandshake())
.setStatus(SourceOuterClass.ReadResponse.Status.newBuilder()
.setCode(SourceOuterClass.ReadResponse.Status.Code.SUCCESS)
.build())
.build());
handshakeDone = true;
return;
}

ReadRequestImpl readRequest = new ReadRequestImpl(
request.getRequest().getNumRecords(),
Duration.ofMillis(request.getRequest().getTimeoutInMs()));
Expand Down Expand Up @@ -89,16 +96,22 @@ public void onCompleted() {
@Override
public StreamObserver<SourceOuterClass.AckRequest> ackFn(final StreamObserver<SourceOuterClass.AckResponse> responseObserver) {
return new StreamObserver<>() {
private boolean handshakeDone = false;

@Override
public void onNext(SourceOuterClass.AckRequest request) {
// if the request is a handshake, send a handshake response
if (request.hasHandshake() && request.getHandshake().getSot()) {
// make sure that the handshake is done before processing the ack requests
if (!handshakeDone) {
if (!request.hasHandshake() || !request.getHandshake().getSot()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("Handshake request not received")
.asException());
return;
}
responseObserver.onNext(SourceOuterClass.AckResponse.newBuilder()
.setHandshake(request.getHandshake())
.setResult(SourceOuterClass.AckResponse.Result.newBuilder().setSuccess(
Empty.newBuilder().build()))
.build());
return;
handshakeDone = true;
}

SourceOuterClass.Offset offset = request.getRequest().getOffset();
Expand Down
Loading
Loading