From d7ce392d235b1f5386e78f303cf776de34f09704 Mon Sep 17 00:00:00 2001 From: Abdullah Hadi Date: Mon, 29 Jan 2024 11:43:54 -0500 Subject: [PATCH] chore: write server info for sideinput sdk (#93) Signed-off-by: a3hadi --- .../java/io/numaproj/numaflow/sideinput/GRPCConfig.java | 3 +++ src/main/java/io/numaproj/numaflow/sideinput/Server.java | 8 ++++++++ .../io/numaproj/numaflow/sideinput/GRPCConfigTest.java | 4 ++++ .../java/io/numaproj/numaflow/sideinput/ServerTest.java | 1 + 4 files changed, 16 insertions(+) diff --git a/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java index 880a2189..c0a4fda2 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.sideinput; +import io.numaproj.numaflow.info.ServerInfoAccessor; import lombok.Builder; import lombok.Getter; @@ -11,12 +12,14 @@ public class GRPCConfig { private String socketPath; private int maxMessageSize; + private String infoFilePath; /** * Static method to create default GRPCConfig. */ static GRPCConfig defaultGrpcConfig() { return GRPCConfig.newBuilder() + .infoFilePath(ServerInfoAccessor.DEFAULT_SERVER_INFO_FILE_PATH) .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) .socketPath(Constants.DEFAULT_SOCKET_PATH).build(); } diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Server.java b/src/main/java/io/numaproj/numaflow/sideinput/Server.java index 862dc8de..1ffadd72 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/Server.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/Server.java @@ -1,8 +1,11 @@ package io.numaproj.numaflow.sideinput; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.grpc.ServerBuilder; import io.numaproj.numaflow.shared.GrpcServerUtils; +import io.numaproj.numaflow.info.ServerInfoAccessor; +import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; @@ -15,6 +18,7 @@ public class Server { private final GRPCConfig grpcConfig; private final Service service; + private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private io.grpc.Server server; /** @@ -43,6 +47,10 @@ public Server(SideInputRetriever sideInputRetriever, GRPCConfig grpcConfig) { * @throws Exception if server fails to start */ public void start() throws Exception { + GrpcServerUtils.writeServerInfo( + serverInfoAccessor, + grpcConfig.getSocketPath(), + grpcConfig.getInfoFilePath()); if (this.server == null) { // create server builder diff --git a/src/test/java/io/numaproj/numaflow/sideinput/GRPCConfigTest.java b/src/test/java/io/numaproj/numaflow/sideinput/GRPCConfigTest.java index 71a90c60..5fc41d5b 100644 --- a/src/test/java/io/numaproj/numaflow/sideinput/GRPCConfigTest.java +++ b/src/test/java/io/numaproj/numaflow/sideinput/GRPCConfigTest.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.sideinput; +import io.numaproj.numaflow.info.ServerInfoAccessor; import org.junit.Assert; import org.junit.Test; @@ -9,6 +10,9 @@ public class GRPCConfigTest { public void testDefaultGrpcConfig() { GRPCConfig grpcConfig = GRPCConfig.defaultGrpcConfig(); Assert.assertNotNull(grpcConfig); + Assert.assertEquals( + ServerInfoAccessor.DEFAULT_SERVER_INFO_FILE_PATH, + grpcConfig.getInfoFilePath()); Assert.assertEquals(Constants.DEFAULT_MESSAGE_SIZE, grpcConfig.getMaxMessageSize()); Assert.assertEquals(Constants.DEFAULT_SOCKET_PATH, grpcConfig.getSocketPath()); } diff --git a/src/test/java/io/numaproj/numaflow/sideinput/ServerTest.java b/src/test/java/io/numaproj/numaflow/sideinput/ServerTest.java index f388c4ab..aaa33083 100644 --- a/src/test/java/io/numaproj/numaflow/sideinput/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/sideinput/ServerTest.java @@ -28,6 +28,7 @@ public void setUp() throws Exception { GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) .socketPath(Constants.DEFAULT_SOCKET_PATH) + .infoFilePath("/tmp/numaflow-test-server-info)") .build(); server = new Server(