diff --git a/src/main/java/io/numaproj/numaflow/sinker/Server.java b/src/main/java/io/numaproj/numaflow/sinker/Server.java index 53dbb1a..2339054 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Server.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Server.java @@ -8,6 +8,7 @@ import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -18,6 +19,7 @@ public class Server { private final GRPCConfig grpcConfig; private final Service service; + public final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private io.grpc.Server server; @@ -37,7 +39,8 @@ public Server(Sinker sinker) { * @param sinker sink to process the message */ public Server(Sinker sinker, GRPCConfig grpcConfig) { - this.service = new Service(sinker); + this.shutdownSignal = new CompletableFuture<>(); + this.service = new Service(sinker, this.shutdownSignal); this.grpcConfig = grpcConfig; } @@ -80,7 +83,11 @@ public void start() throws Exception { // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down gRPC server since JVM is shutting down"); + System.err.println("*** shutting down sink gRPC server since JVM is shutting down"); + if (server.isTerminated()) { + log.info("Server already terminated"); + return; + } try { Server.this.stop(); } catch (InterruptedException e) { @@ -88,6 +95,24 @@ public void start() throws Exception { e.printStackTrace(System.err); } })); + + // if there are any exceptions, shutdown the server gracefully. + shutdownSignal.whenCompleteAsync((v, e) -> { + if (server.isTerminated()) { + return; + } + + if (e != null) { + System.err.println("*** shutting down sink gRPC server because of an exception - " + e.getMessage()); + try { + Server.this.stop(); + } catch (InterruptedException ex) { + Thread.interrupted(); + ex.printStackTrace(System.err); + } + } + System.exit(0); + }); } /** diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 770562e..a93a8d7 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -23,9 +23,11 @@ class Service extends SinkGrpc.SinkImplBase { .newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); private final Sinker sinker; + private final CompletableFuture shutdownSignal; - public Service(Sinker sinker) { + public Service(Sinker sinker, CompletableFuture shutdownSignal) { this.sinker = sinker; + this.shutdownSignal = shutdownSignal; } /** @@ -97,7 +99,9 @@ public void onNext(SinkOuterClass.SinkRequest request) { } } catch (Exception e) { log.error("Encountered error in sinkFn - {}", e.getMessage()); - responseObserver.onError(e); + shutdownSignal.completeExceptionally(e); + responseObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).asException()); + // responseObserver.onError(e); } } diff --git a/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java deleted file mode 100644 index da88c8f..0000000 --- a/src/test/java/io/numaproj/numaflow/sinker/ServerErrTest.java +++ /dev/null @@ -1,160 +0,0 @@ -package io.numaproj.numaflow.sinker; - -import com.google.protobuf.ByteString; -import io.grpc.ManagedChannel; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; -import io.grpc.stub.StreamObserver; -import io.grpc.testing.GrpcCleanupRule; -import io.numaproj.numaflow.sink.v1.SinkGrpc; -import io.numaproj.numaflow.sink.v1.SinkOuterClass; -import lombok.extern.slf4j.Slf4j; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; - -@Slf4j -@RunWith(JUnit4.class) -public class ServerErrTest { - @Rule - public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); - private Server server; - private ManagedChannel inProcessChannel; - - @Before - public void setUp() throws Exception { - String serverName = InProcessServerBuilder.generateName(); - - GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() - .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) - .socketPath(Constants.DEFAULT_SOCKET_PATH) - .infoFilePath("/tmp/numaflow-test-server-info)") - .build(); - - server = new Server( - new TestSinkFnErr(), - grpcServerConfig); - - server.setServerBuilder(InProcessServerBuilder.forName(serverName) - .directExecutor()); - - server.start(); - - inProcessChannel = grpcCleanup.register(InProcessChannelBuilder - .forName(serverName) - .directExecutor() - .build()); - } - - @After - public void tearDown() throws Exception { - server.stop(); - } - - @Test - public void sinkerException() { - //create an output stream observer - SinkOutputStreamObserver outputStreamObserver = new SinkOutputStreamObserver(); - - Thread t = new Thread(() -> { - while (outputStreamObserver.t == null) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - assertEquals( - "UNKNOWN: java.lang.RuntimeException: unknown exception", - outputStreamObserver.t.getMessage()); - }); - t.start(); - - StreamObserver inputStreamObserver = SinkGrpc - .newStub(inProcessChannel) - .sinkFn(outputStreamObserver); - String actualId = "sink_test_id"; - - // send handshake request - inputStreamObserver.onNext(SinkOuterClass.SinkRequest.newBuilder() - .setHandshake(SinkOuterClass.Handshake.newBuilder().setSot(true).build()) - .build()); - - for (int i = 1; i <= 100; i++) { - String[] keys; - if (i < 100) { - keys = new String[]{"valid-key"}; - } else { - keys = new String[]{"invalid-key"}; - } - SinkOuterClass.SinkRequest.Request request = SinkOuterClass.SinkRequest.Request - .newBuilder() - .setValue(ByteString.copyFromUtf8(String.valueOf(i))) - .setId(actualId) - .addAllKeys(List.of(keys)) - .build(); - inputStreamObserver.onNext(SinkOuterClass.SinkRequest - .newBuilder() - .setRequest(request) - .build()); - } - - // send eot message - inputStreamObserver.onNext(SinkOuterClass.SinkRequest.newBuilder() - .setStatus(SinkOuterClass.TransmissionStatus.newBuilder().setEot(true)).build()); - - inputStreamObserver.onCompleted(); - - try { - t.join(); - } catch (InterruptedException e) { - fail("Thread interrupted"); - } - } - - @Test - public void sinkerNoHandshake() { - // Create an output stream observer - SinkOutputStreamObserver outputStreamObserver = new SinkOutputStreamObserver(); - - StreamObserver inputStreamObserver = SinkGrpc - .newStub(inProcessChannel) - .sinkFn(outputStreamObserver); - - // Send a request without sending a handshake request - SinkOuterClass.SinkRequest request = SinkOuterClass.SinkRequest.newBuilder() - .setRequest(SinkOuterClass.SinkRequest.Request.newBuilder() - .setValue(ByteString.copyFromUtf8("test")) - .setId("test_id") - .addKeys("test_key") - .build()) - .build(); - inputStreamObserver.onNext(request); - - // Wait for the server to process the request - while (!outputStreamObserver.completed.get()) ; - - // Check if an error was received - assertNotNull(outputStreamObserver.t); - assertEquals( - "INVALID_ARGUMENT: Handshake request not received", - outputStreamObserver.t.getMessage()); - } - - @Slf4j - private static class TestSinkFnErr extends Sinker { - @Override - public ResponseList processMessages(DatumIterator datumIterator) { - throw new RuntimeException("unknown exception"); - } - } -}