Skip to content

Commit

Permalink
test - yhl's change
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Nov 5, 2024
1 parent 134414a commit dc557dc
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 164 deletions.
29 changes: 27 additions & 2 deletions src/main/java/io/numaproj/numaflow/sinker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.numaproj.numaflow.shared.GrpcServerUtils;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -18,6 +19,7 @@ public class Server {

private final GRPCConfig grpcConfig;
private final Service service;
public final CompletableFuture<Void> shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private io.grpc.Server server;

Expand All @@ -37,7 +39,8 @@ public Server(Sinker sinker) {
* @param sinker sink to process the message
*/
public Server(Sinker sinker, GRPCConfig grpcConfig) {
this.service = new Service(sinker);
this.shutdownSignal = new CompletableFuture<>();
this.service = new Service(sinker, this.shutdownSignal);
this.grpcConfig = grpcConfig;
}

Expand Down Expand Up @@ -80,14 +83,36 @@ public void start() throws Exception {
// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
System.err.println("*** shutting down sink gRPC server since JVM is shutting down");
if (server.isTerminated()) {
log.info("Server already terminated");
return;
}
try {
Server.this.stop();
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
}
}));

// if there are any exceptions, shutdown the server gracefully.
shutdownSignal.whenCompleteAsync((v, e) -> {
if (server.isTerminated()) {
return;
}

if (e != null) {
System.err.println("*** shutting down sink gRPC server because of an exception - " + e.getMessage());
try {
Server.this.stop();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);
}
}
System.exit(0);
});
}

/**
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ class Service extends SinkGrpc.SinkImplBase {
.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

private final Sinker sinker;
private final CompletableFuture<Void> shutdownSignal;

public Service(Sinker sinker) {
public Service(Sinker sinker, CompletableFuture<Void> shutdownSignal) {
this.sinker = sinker;
this.shutdownSignal = shutdownSignal;
}

/**
Expand Down Expand Up @@ -97,7 +99,9 @@ public void onNext(SinkOuterClass.SinkRequest request) {
}
} catch (Exception e) {
log.error("Encountered error in sinkFn - {}", e.getMessage());
responseObserver.onError(e);
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).asException());
// responseObserver.onError(e);
}
}

Expand Down
160 changes: 0 additions & 160 deletions src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java

This file was deleted.

0 comments on commit dc557dc

Please sign in to comment.