From babc8e1bc542679309375ef4bba2b27fc521317a Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 25 Sep 2024 23:41:18 -0400 Subject: [PATCH] feat: container-type level version compatibility check (#140) Signed-off-by: Keran Yang --- .../numaproj/numaflow/batchmapper/Server.java | 2 ++ .../numaproj/numaflow/info/ContainerType.java | 26 +++++++++++++++++++ .../io/numaproj/numaflow/info/ServerInfo.java | 14 +++++++++- .../io/numaproj/numaflow/mapper/Server.java | 6 +++-- .../numaproj/numaflow/mapstreamer/Server.java | 2 ++ .../io/numaproj/numaflow/reducer/Server.java | 6 +++-- .../numaflow/reducestreamer/Server.java | 6 +++-- .../numaflow/sessionreducer/Server.java | 6 +++-- .../numaflow/shared/GrpcServerUtils.java | 16 +++++++++--- .../numaproj/numaflow/sideinput/Server.java | 6 +++-- .../io/numaproj/numaflow/sinker/Server.java | 6 +++-- .../io/numaproj/numaflow/sourcer/Server.java | 6 +++-- .../numaflow/sourcetransformer/Server.java | 6 +++-- .../info/ServerInfoAccessorImplTest.java | 2 +- .../numaflow/shared/GrpcServerUtilsTest.java | 3 ++- 15 files changed, 91 insertions(+), 22 deletions(-) create mode 100644 src/main/java/io/numaproj/numaflow/info/ContainerType.java diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Server.java b/src/main/java/io/numaproj/numaflow/batchmapper/Server.java index 8aa8c43..83c276f 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Server.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Server.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.grpc.ServerBuilder; +import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.shared.GrpcServerUtils; @@ -52,6 +53,7 @@ public void start() throws Exception { serverInfoAccessor, grpcConfig.getSocketPath(), grpcConfig.getInfoFilePath(), + ContainerType.MAPPER, Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE)); if (this.server == null) { diff --git a/src/main/java/io/numaproj/numaflow/info/ContainerType.java b/src/main/java/io/numaproj/numaflow/info/ContainerType.java new file mode 100644 index 0000000..2469a52 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/info/ContainerType.java @@ -0,0 +1,26 @@ +package io.numaproj.numaflow.info; + +import com.fasterxml.jackson.annotation.JsonValue; + +public enum ContainerType { + SOURCER("sourcer"), + SOURCE_TRANSFORMER("sourcetransformer"), + SINKER("sinker"), + MAPPER("mapper"), + REDUCER("reducer"), + REDUCE_STREAMER("reducestreamer"), + SESSION_REDUCER("sessionreducer"), + SIDEINPUT("sideinput"), + FBSINKER("fb-sinker"); + + private final String name; + + ContainerType(String name) { + this.name = name; + } + + @JsonValue + public String getName() { + return name; + } +} diff --git a/src/main/java/io/numaproj/numaflow/info/ServerInfo.java b/src/main/java/io/numaproj/numaflow/info/ServerInfo.java index 625dc8b..cfae273 100644 --- a/src/main/java/io/numaproj/numaflow/info/ServerInfo.java +++ b/src/main/java/io/numaproj/numaflow/info/ServerInfo.java @@ -8,6 +8,8 @@ import java.util.Map; +import static java.util.Map.entry; + /** * Server Information to be used by client to determine: * - protocol: what is right protocol to use (UDS or TCP) @@ -24,7 +26,17 @@ public class ServerInfo { // Specify the minimum Numaflow version required by the current SDK version // To update this value, please follow the instructions for MINIMUM_NUMAFLOW_VERSION in // https://github.com/numaproj/numaflow-rs/blob/main/src/shared.rs - public static final String MINIMUM_NUMAFLOW_VERSION = "1.3.1-z"; + public static final Map MINIMUM_NUMAFLOW_VERSION = Map.ofEntries( + entry(ContainerType.SOURCER, "1.3.1-z"), + entry(ContainerType.SOURCE_TRANSFORMER, "1.3.1-z"), + entry(ContainerType.SINKER, "1.3.1-z"), + entry(ContainerType.MAPPER, "1.3.1-z"), + entry(ContainerType.REDUCER, "1.3.1-z"), + entry(ContainerType.REDUCE_STREAMER, "1.3.1-z"), + entry(ContainerType.SESSION_REDUCER, "1.3.1-z"), + entry(ContainerType.SIDEINPUT, "1.3.1-z"), + entry(ContainerType.FBSINKER, "1.3.1-z") + ); @JsonProperty("protocol") private Protocol protocol; @JsonProperty("language") diff --git a/src/main/java/io/numaproj/numaflow/mapper/Server.java b/src/main/java/io/numaproj/numaflow/mapper/Server.java index c4fc8f0..be82cc7 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Server.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Server.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.grpc.ServerBuilder; +import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.shared.GrpcServerUtils; @@ -55,11 +56,12 @@ public void start() throws Exception { serverInfoAccessor, grpcConfig.getSocketPath(), grpcConfig.getInfoFilePath(), + ContainerType.MAPPER, Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE)); } if (this.server == null) { - ServerBuilder serverBuilder = null; + ServerBuilder serverBuilder; // create server builder for domain socket server serverBuilder = GrpcServerUtils.createServerBuilder( grpcConfig.getSocketPath(), @@ -79,7 +81,7 @@ public void start() throws Exception { log.info( "Server started, listening on {}", grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + "localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java index ecd8da7..d3d576c 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.grpc.ServerBuilder; +import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.shared.GrpcServerUtils; @@ -52,6 +53,7 @@ public void start() throws Exception { serverInfoAccessor, grpcConfig.getSocketPath(), grpcConfig.getInfoFilePath(), + ContainerType.MAPPER, Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE)); if (this.server == null) { diff --git a/src/main/java/io/numaproj/numaflow/reducer/Server.java b/src/main/java/io/numaproj/numaflow/reducer/Server.java index ee1667a..2fe9095 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Server.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.grpc.ServerBuilder; +import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.shared.GrpcServerUtils; @@ -51,7 +52,8 @@ public void start() throws Exception { GrpcServerUtils.writeServerInfo( serverInfoAccessor, grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + grpcConfig.getInfoFilePath(), + ContainerType.REDUCER); } if (this.server == null) { @@ -74,7 +76,7 @@ public void start() throws Exception { log.info( "Server started, listening on {}", grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + "localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java index 9cea9f3..f97d425 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.grpc.ServerBuilder; +import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; @@ -54,7 +55,8 @@ public void start() throws Exception { GrpcServerUtils.writeServerInfo( serverInfoAccessor, grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + grpcConfig.getInfoFilePath(), + ContainerType.REDUCE_STREAMER); } if (this.server == null) { @@ -77,7 +79,7 @@ public void start() throws Exception { log.info( "Server started, listening on {}", grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + "localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java index c17e07d..5fbdb6a 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.grpc.ServerBuilder; +import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.sessionreducer.model.SessionReducer; @@ -54,7 +55,8 @@ public void start() throws Exception { GrpcServerUtils.writeServerInfo( serverInfoAccessor, grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + grpcConfig.getInfoFilePath(), + ContainerType.SESSION_REDUCER); } if (this.server == null) { @@ -77,7 +79,7 @@ public void start() throws Exception { log.info( "Server started, listening on {}", grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + "localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { diff --git a/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java b/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java index 03507ba..0ba9c9c 100644 --- a/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java +++ b/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java @@ -18,6 +18,7 @@ import io.netty.channel.kqueue.KQueueEventLoopGroup; import io.netty.channel.kqueue.KQueueServerDomainSocketChannel; import io.netty.channel.unix.DomainSocketAddress; +import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.Language; import io.numaproj.numaflow.info.Protocol; import io.numaproj.numaflow.info.ServerInfo; @@ -30,6 +31,8 @@ import java.util.HashMap; import java.util.Map; +import static io.numaproj.numaflow.info.ServerInfo.MINIMUM_NUMAFLOW_VERSION; + /** * GrpcServerUtils is the utility class for netty server channel. */ @@ -80,14 +83,21 @@ public static EventLoopGroup createEventLoopGroup(int threads, String name) { public static void writeServerInfo( ServerInfoAccessor serverInfoAccessor, String socketPath, - String infoFilePath) throws Exception { - writeServerInfo(serverInfoAccessor, socketPath, infoFilePath, new HashMap<>()); + String infoFilePath, + ContainerType containerType) throws Exception { + writeServerInfo( + serverInfoAccessor, + socketPath, + infoFilePath, + containerType, + new HashMap<>()); } public static void writeServerInfo( ServerInfoAccessor serverInfoAccessor, String socketPath, String infoFilePath, + ContainerType containerType, Map metaData) throws Exception { // cleanup socket path if it exists (unit test builder doesn't use one) if (socketPath != null) { @@ -111,7 +121,7 @@ public static void writeServerInfo( ServerInfo serverInfo = new ServerInfo( Protocol.UDS_PROTOCOL, Language.JAVA, - ServerInfo.MINIMUM_NUMAFLOW_VERSION, + MINIMUM_NUMAFLOW_VERSION.get(containerType), serverInfoAccessor.getSDKVersion(), metaData); log.info("Writing server info {} to {}", serverInfo, infoFilePath); diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Server.java b/src/main/java/io/numaproj/numaflow/sideinput/Server.java index 30d2c75..580855c 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/Server.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/Server.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.grpc.ServerBuilder; +import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.shared.GrpcServerUtils; @@ -51,7 +52,8 @@ public void start() throws Exception { GrpcServerUtils.writeServerInfo( serverInfoAccessor, grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + grpcConfig.getInfoFilePath(), + ContainerType.SIDEINPUT); } if (this.server == null) { @@ -74,7 +76,7 @@ public void start() throws Exception { log.info( "Server started, listening on {}", grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + "localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { diff --git a/src/main/java/io/numaproj/numaflow/sinker/Server.java b/src/main/java/io/numaproj/numaflow/sinker/Server.java index 1ab880f..d1c23ea 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Server.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Server.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.grpc.ServerBuilder; +import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.shared.GrpcServerUtils; @@ -50,7 +51,8 @@ public void start() throws Exception { GrpcServerUtils.writeServerInfo( serverInfoAccessor, grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + grpcConfig.getInfoFilePath(), + ContainerType.SINKER); } if (this.server == null) { @@ -73,7 +75,7 @@ public void start() throws Exception { log.info( "Server started, listening on {}", grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + "localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Server.java b/src/main/java/io/numaproj/numaflow/sourcer/Server.java index 1150a67..83961dc 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Server.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.grpc.ServerBuilder; +import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.shared.GrpcServerUtils; @@ -51,7 +52,8 @@ public void start() throws Exception { GrpcServerUtils.writeServerInfo( serverInfoAccessor, grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + grpcConfig.getInfoFilePath(), + ContainerType.SOURCER); } if (this.server == null) { @@ -74,7 +76,7 @@ public void start() throws Exception { log.info( "Server started, listening on {}", grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + "localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java index 5a7bf13..3dd29dc 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.grpc.ServerBuilder; +import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.shared.GrpcServerUtils; @@ -51,7 +52,8 @@ public void start() throws Exception { GrpcServerUtils.writeServerInfo( serverInfoAccessor, grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + grpcConfig.getInfoFilePath(), + ContainerType.SOURCE_TRANSFORMER); } if (this.server == null) { @@ -74,7 +76,7 @@ public void start() throws Exception { log.info( "Server started, listening on {}", grpcConfig.isLocal() ? - "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); + "localhost:" + grpcConfig.getPort():grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { diff --git a/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java b/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java index 89e8176..cc4d04d 100644 --- a/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java +++ b/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java @@ -26,7 +26,7 @@ public void given_writeServerInfo_when_read_then_returnExactSame() { ServerInfo testServerInfo = new ServerInfo( Protocol.TCP_PROTOCOL, Language.JAVA, - ServerInfo.MINIMUM_NUMAFLOW_VERSION, + "1.3.1-z", "0.4.3", new HashMap<>() {{ put("key1", "value1"); diff --git a/src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java b/src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java index 32bb013..a4ac662 100644 --- a/src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java +++ b/src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java @@ -4,6 +4,7 @@ import io.grpc.ServerBuilder; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; +import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import org.junit.Assert; import org.junit.Test; @@ -29,7 +30,7 @@ public void testCreateEventLoopGroup() { public void testWriteServerInfo() throws Exception { ServerInfoAccessor mockAccessor = Mockito.mock(ServerInfoAccessor.class); Mockito.when(mockAccessor.getSDKVersion()).thenReturn("1.0.0"); - GrpcServerUtils.writeServerInfo(mockAccessor, null, "infoFilePath"); + GrpcServerUtils.writeServerInfo(mockAccessor, null, "infoFilePath", ContainerType.MAPPER); Mockito .verify(mockAccessor, Mockito.times(1)) .write(Mockito.any(), Mockito.eq("infoFilePath"));