diff --git a/README.md b/README.md index 1bb4572d..77513b2c 100644 --- a/README.md +++ b/README.md @@ -38,8 +38,6 @@ Add this dependency to your project's build file: compile "io.numaproj.numaflow:numaflow-java:0.7.0" ``` -``` - ### Build ```bash diff --git a/examples/pom.xml b/examples/pom.xml index 81fc0cd6..ebb7fc30 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -25,16 +25,18 @@ slf4j-simple 2.0.7 + org.mockito mockito-core 4.8.1 test + org.junit.jupiter junit-jupiter - RELEASE + 5.10.2 test @@ -59,7 +61,9 @@ - numaflow-java-examples/mapt-event-time-filter-function:${docker.tag} + + numaflow-java-examples/mapt-event-time-filter-function:${docker.tag} + @@ -160,7 +164,8 @@ - numaflow-java-examples/reduce-stream-sum:${docker.tag} + numaflow-java-examples/reduce-stream-sum:${docker.tag} + @@ -177,7 +182,8 @@ - numaflow-java-examples/map-forward-message:${docker.tag} + numaflow-java-examples/map-forward-message:${docker.tag} + @@ -211,7 +217,8 @@ - numaflow-java-examples/sideinput-example:${docker.tag} + numaflow-java-examples/sideinput-example:${docker.tag} + @@ -228,7 +235,8 @@ - numaflow-java-examples/udf-sideinput-example:${docker.tag} + numaflow-java-examples/udf-sideinput-example:${docker.tag} + @@ -245,7 +253,8 @@ - numaflow-java-examples/source-simple-source:${docker.tag} + numaflow-java-examples/source-simple-source:${docker.tag} + @@ -262,7 +271,8 @@ - numaflow-java-examples/session-reduce-count:${docker.tag} + numaflow-java-examples/session-reduce-count:${docker.tag} + @@ -271,6 +281,10 @@ + + maven-surefire-plugin + 2.22.2 + maven-compiler-plugin 3.10.1 diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/map/flatmap/FlatMapFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/map/flatmap/FlatMapFunction.java index ade96758..bb0b7396 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/map/flatmap/FlatMapFunction.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/map/flatmap/FlatMapFunction.java @@ -16,7 +16,13 @@ public class FlatMapFunction extends Mapper { public static void main(String[] args) throws Exception { - new Server(new FlatMapFunction()).start(); + Server server = new Server(new FlatMapFunction()); + + // Start the server + server.start(); + + // Wait for the server to shut down + server.awaitTermination(); } public MessageList processMessage(String[] keys, Datum data) { diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/map/forward/ForwardFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/map/forward/ForwardFunction.java index 45c46b95..bfbbaa3e 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/map/forward/ForwardFunction.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/map/forward/ForwardFunction.java @@ -12,7 +12,13 @@ public class ForwardFunction extends Mapper { public static void main(String[] args) throws Exception { - new Server(new ForwardFunction()).start(); + Server server = new Server(new ForwardFunction()); + + // Start the server + server.start(); + + // Wait for the server to shut down + server.awaitTermination(); } public MessageList processMessage(String[] keys, Datum data) { diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/mapstream/flatmapstream/FlatMapStreamFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/mapstream/flatmapstream/FlatMapStreamFunction.java index 4f901d41..c911b33b 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/mapstream/flatmapstream/FlatMapStreamFunction.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/mapstream/flatmapstream/FlatMapStreamFunction.java @@ -1,10 +1,6 @@ package io.numaproj.numaflow.examples.mapstream.flatmapstream; -import io.numaproj.numaflow.mapstreamer.Datum; -import io.numaproj.numaflow.mapstreamer.MapStreamer; -import io.numaproj.numaflow.mapstreamer.Message; -import io.numaproj.numaflow.mapstreamer.OutputObserver; -import io.numaproj.numaflow.mapstreamer.Server; +import io.numaproj.numaflow.mapstreamer.*; /** @@ -17,7 +13,13 @@ public class FlatMapStreamFunction extends MapStreamer { public static void main(String[] args) throws Exception { - new Server(new FlatMapStreamFunction()).start(); + Server server = new Server(new FlatMapStreamFunction()); + + // Start the server + server.start(); + + // wait for the server to shutdown + server.awaitTermination(); } public void processMessage(String[] keys, Datum data, OutputObserver outputObserver) { diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/reduce/count/CounterFactory.java b/examples/src/main/java/io/numaproj/numaflow/examples/reduce/count/CounterFactory.java index ee47490a..561a71fe 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/reduce/count/CounterFactory.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/reduce/count/CounterFactory.java @@ -23,7 +23,13 @@ public class CounterFactory extends ReducerFactory { public static void main(String[] args) throws Exception { log.info("counter udf was invoked"); Config config = new Config(1); - new Server(new CounterFactory(config)).start(); + Server server = new Server(new CounterFactory(config)); + + // Start the server + server.start(); + + // wait for the server to shut down + server.awaitTermination(); } @Override @@ -42,8 +48,8 @@ public Counter(Config config) { @Override public void addMessage(String[] keys, Datum datum, Metadata md) { - // increment based on the value specified in the config - count += config.getIncrementBy(); + // increment based on the value specified in the config + count += config.getIncrementBy(); } @Override diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/reduce/sum/SumFactory.java b/examples/src/main/java/io/numaproj/numaflow/examples/reduce/sum/SumFactory.java index 12135e09..d6c74386 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/reduce/sum/SumFactory.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/reduce/sum/SumFactory.java @@ -9,7 +9,13 @@ public class SumFactory extends ReducerFactory { public static void main(String[] args) throws Exception { log.info("sum udf was invoked"); - new Server(new SumFactory()).start(); + Server server = new Server(new SumFactory()); + + // Start the server + server.start(); + + // wait for the server to shut down + server.awaitTermination(); } @Override diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/reducesession/counter/CountFactory.java b/examples/src/main/java/io/numaproj/numaflow/examples/reducesession/counter/CountFactory.java index 21c3822f..9c2d6701 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/reducesession/counter/CountFactory.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/reducesession/counter/CountFactory.java @@ -13,7 +13,13 @@ public class CountFactory extends SessionReducerFactory { public static void main(String[] args) throws Exception { log.info("count udf was invoked"); - new Server(new CountFactory()).start(); + Server server = new Server(new CountFactory()); + + // Start the server + server.start(); + + // wait for the server to shut down + server.awaitTermination(); } @Override diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFactory.java b/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFactory.java index 5faa39d5..e209fc24 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFactory.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFactory.java @@ -13,7 +13,13 @@ public class SumFactory extends ReduceStreamerFactory { public static void main(String[] args) throws Exception { log.info("sum udf was invoked"); - new Server(new SumFactory()).start(); + Server server = new Server(new SumFactory()); + + // Start the server + server.start(); + + // wait for the server to shut down + server.awaitTermination(); } @Override diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sideinput/simple/SimpleSideInput.java b/examples/src/main/java/io/numaproj/numaflow/examples/sideinput/simple/SimpleSideInput.java index 49c36b02..7ca7ba2c 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/sideinput/simple/SimpleSideInput.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sideinput/simple/SimpleSideInput.java @@ -28,7 +28,13 @@ public SimpleSideInput(Config config) { } public static void main(String[] args) throws Exception { - new Server(new SimpleSideInput(new Config("sampling", 0.5F))).start(); + Server server = new Server(new SimpleSideInput(new Config("sampling", 0.5F))); + + // Start the server + server.start(); + + // wait for the server to shut down + server.awaitTermination(); } @Override diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sideinput/udf/SimpleMapWithSideInput.java b/examples/src/main/java/io/numaproj/numaflow/examples/sideinput/udf/SimpleMapWithSideInput.java index 0ba05a1f..10dbddb8 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/sideinput/udf/SimpleMapWithSideInput.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sideinput/udf/SimpleMapWithSideInput.java @@ -38,7 +38,13 @@ public static void main(String[] args) throws Exception { sideInputWatcher.startWatching(); // start the server - new Server(new SimpleMapWithSideInput(sideInputWatcher)).start(); + Server server = new Server(new SimpleMapWithSideInput(sideInputWatcher)); + + // Start the server + server.start(); + + // wait for the server to shut down + server.awaitTermination(); // Stop watching for side input sideInputWatcher.stopWatching(); diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java b/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java index 224ef7be..479037d2 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java @@ -17,7 +17,13 @@ public class SimpleSink extends Sinker { public static void main(String[] args) throws Exception { - new Server(new SimpleSink()).start(); + Server server = new Server(new SimpleSink()); + + // Start the server + server.start(); + + // wait for the server to shut down + server.awaitTermination(); } @Override diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java index 5025d59d..1df5871d 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.examples.source.simple; -import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Longs; import io.numaproj.numaflow.sourcer.AckRequest; import io.numaproj.numaflow.sourcer.Message; @@ -29,7 +28,13 @@ public class SimpleSource extends Sourcer { private long readIndex = 0; public static void main(String[] args) throws Exception { - new Server(new SimpleSource()).start(); + Server server = new Server(new SimpleSource()); + + // Start the server + server.start(); + + // wait for the server to shut down + server.awaitTermination(); } @Override diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sourcetransformer/eventtimefilter/EventTimeFilterFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/sourcetransformer/eventtimefilter/EventTimeFilterFunction.java index 8e4d0b0a..e1f37d2d 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/sourcetransformer/eventtimefilter/EventTimeFilterFunction.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sourcetransformer/eventtimefilter/EventTimeFilterFunction.java @@ -23,8 +23,13 @@ public class EventTimeFilterFunction extends SourceTransformer { private static final Instant januaryFirst2023 = Instant.ofEpochMilli(1672531200000L); public static void main(String[] args) throws Exception { - new Server(new EventTimeFilterFunction()) - .start(); + Server server = new Server(new EventTimeFilterFunction()); + + // Start the server + server.start(); + + // wait for the server to shut down + server.awaitTermination(); } public MessageList processMessage(String[] keys, Datum data) { diff --git a/examples/src/test/java/io/numaproj/numaflow/examples/map/evenodd/EvenOddFunctionTest.java b/examples/src/test/java/io/numaproj/numaflow/examples/map/evenodd/EvenOddFunctionTest.java index 35dc08eb..c3d5482a 100644 --- a/examples/src/test/java/io/numaproj/numaflow/examples/map/evenodd/EvenOddFunctionTest.java +++ b/examples/src/test/java/io/numaproj/numaflow/examples/map/evenodd/EvenOddFunctionTest.java @@ -1,60 +1,68 @@ package io.numaproj.numaflow.examples.map.evenodd; - -import io.numaproj.numaflow.examples.utils.TestDatum; +import io.numaproj.numaflow.mapper.MapperTestKit; import io.numaproj.numaflow.mapper.Message; import io.numaproj.numaflow.mapper.MessageList; import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.List; -import static org.junit.jupiter.api.Assertions.assertEquals; @Slf4j public class EvenOddFunctionTest { @Test public void testEvenNumber() { - TestDatum datum = TestDatum.builder().value("2".getBytes()).build(); + MapperTestKit.TestDatum datum = MapperTestKit.TestDatum + .builder() + .value("2".getBytes()) + .build(); EvenOddFunction evenOddFunction = new EvenOddFunction(); MessageList result = evenOddFunction.processMessage(new String[]{}, datum); List messages = result.getMessages(); - assertEquals(1, messages.size()); + Assertions.assertEquals(1, messages.size()); // The message should have the key "even" and tag "even-tag" - assertEquals("even", messages.get(0).getKeys()[0]); - assertEquals("even-tag", messages.get(0).getTags()[0]); + Assertions.assertEquals("even", messages.get(0).getKeys()[0]); + Assertions.assertEquals("even-tag", messages.get(0).getTags()[0]); } @Test public void testOddNumber() { - TestDatum datum = TestDatum.builder().value("3".getBytes()).build(); + MapperTestKit.TestDatum datum = MapperTestKit.TestDatum + .builder() + .value("3".getBytes()) + .build(); EvenOddFunction evenOddFunction = new EvenOddFunction(); MessageList result = evenOddFunction.processMessage(new String[]{}, datum); List messages = result.getMessages(); - assertEquals(1, messages.size()); + Assertions.assertEquals(1, messages.size()); // The message should have the key "odd" and tag "odd-tag" - assertEquals("odd", messages.get(0).getKeys()[0]); - assertEquals("odd-tag", messages.get(0).getTags()[0]); + Assertions.assertEquals("odd", messages.get(0).getKeys()[0]); + Assertions.assertEquals("odd-tag", messages.get(0).getTags()[0]); } @Test public void testNonNumeric() { - TestDatum datum = TestDatum.builder().value("abc".getBytes()).build(); + MapperTestKit.TestDatum datum = MapperTestKit.TestDatum + .builder() + .value("abc".getBytes()) + .build(); EvenOddFunction evenOddFunction = new EvenOddFunction(); MessageList result = evenOddFunction.processMessage(new String[]{}, datum); List messages = result.getMessages(); - assertEquals(1, messages.size()); + Assertions.assertEquals(1, messages.size()); // The message should be dropped - assertEquals(Message.toDrop().getTags()[0], messages.get(0).getTags()[0]); + Assertions.assertEquals(Message.toDrop().getTags()[0], messages.get(0).getTags()[0]); } } diff --git a/examples/src/test/java/io/numaproj/numaflow/examples/map/flatmap/FlatMapFunctionTest.java b/examples/src/test/java/io/numaproj/numaflow/examples/map/flatmap/FlatMapFunctionTest.java index 882fd670..f4f8d728 100644 --- a/examples/src/test/java/io/numaproj/numaflow/examples/map/flatmap/FlatMapFunctionTest.java +++ b/examples/src/test/java/io/numaproj/numaflow/examples/map/flatmap/FlatMapFunctionTest.java @@ -1,57 +1,48 @@ package io.numaproj.numaflow.examples.map.flatmap; -import io.numaproj.numaflow.examples.utils.TestDatum; +import io.numaproj.numaflow.mapper.MapperTestKit; import io.numaproj.numaflow.mapper.Message; import io.numaproj.numaflow.mapper.MessageList; import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.List; -import static org.junit.jupiter.api.Assertions.assertEquals; @Slf4j public class FlatMapFunctionTest { - @Test - public void testCommaSeparatedString() { - TestDatum datum = TestDatum.builder().value("apple,banana,carrot".getBytes()).build(); - - FlatMapFunction flatMapFunction = new FlatMapFunction(); - MessageList result = flatMapFunction.processMessage(new String[]{}, datum); - - List messages = result.getMessages(); - assertEquals(3, messages.size()); - - assertEquals("apple", new String(messages.get(0).getValue())); - assertEquals("banana", new String(messages.get(1).getValue())); - assertEquals("carrot", new String(messages.get(2).getValue())); - } - @Test public void testSingleString() { - TestDatum datum = TestDatum.builder().value("apple".getBytes()).build(); + MapperTestKit.TestDatum datum = MapperTestKit.TestDatum + .builder() + .value("apple".getBytes()) + .build(); FlatMapFunction flatMapFunction = new FlatMapFunction(); MessageList result = flatMapFunction.processMessage(new String[]{}, datum); List messages = result.getMessages(); - assertEquals(1, messages.size()); + Assertions.assertEquals(1, messages.size()); - assertEquals("apple", new String(messages.get(0).getValue())); + Assertions.assertEquals("apple", new String(messages.get(0).getValue())); } @Test public void testEmptyString() { - TestDatum datum = TestDatum.builder().value("".getBytes()).build(); + MapperTestKit.TestDatum datum = MapperTestKit.TestDatum + .builder() + .value("".getBytes()) + .build(); FlatMapFunction flatMapFunction = new FlatMapFunction(); MessageList result = flatMapFunction.processMessage(new String[]{}, datum); List messages = result.getMessages(); - assertEquals(1, messages.size()); + Assertions.assertEquals(1, messages.size()); - assertEquals("", new String(messages.get(0).getValue())); + Assertions.assertEquals("", new String(messages.get(0).getValue())); } } diff --git a/examples/src/test/java/io/numaproj/numaflow/examples/server/ServerTest.java b/examples/src/test/java/io/numaproj/numaflow/examples/server/ServerTest.java new file mode 100644 index 00000000..78e86de6 --- /dev/null +++ b/examples/src/test/java/io/numaproj/numaflow/examples/server/ServerTest.java @@ -0,0 +1,273 @@ +package io.numaproj.numaflow.examples.server; + +import io.numaproj.numaflow.examples.map.evenodd.EvenOddFunction; +import io.numaproj.numaflow.examples.map.flatmap.FlatMapFunction; +import io.numaproj.numaflow.examples.reduce.sum.SumFactory; +import io.numaproj.numaflow.examples.sink.simple.SimpleSink; +import io.numaproj.numaflow.examples.source.simple.SimpleSource; +import io.numaproj.numaflow.examples.sourcetransformer.eventtimefilter.EventTimeFilterFunction; +import io.numaproj.numaflow.mapper.MapperTestKit; +import io.numaproj.numaflow.mapper.Message; +import io.numaproj.numaflow.mapper.MessageList; +import io.numaproj.numaflow.reducer.Datum; +import io.numaproj.numaflow.reducer.ReducerTestKit; +import io.numaproj.numaflow.sinker.Response; +import io.numaproj.numaflow.sinker.ResponseList; +import io.numaproj.numaflow.sinker.SinkerTestKit; +import io.numaproj.numaflow.sourcer.SourcerTestKit; +import io.numaproj.numaflow.sourcetransformer.SourceTransformerTestKit; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@Slf4j +public class ServerTest { + + @Test + @Order(1) + public void testMapServerInvocation() { + MapperTestKit mapperTestKit = new MapperTestKit(new EvenOddFunction()); + try { + mapperTestKit.startServer(); + } catch (Exception e) { + log.error("Failed to start server", e); + Assertions.fail("Failed to start server"); + } + + // Create a client which can send requests to the server + MapperTestKit.Client client = new MapperTestKit.Client(); + MapperTestKit.TestDatum datum = MapperTestKit.TestDatum + .builder() + .value("2".getBytes()) + .build(); + MessageList result = client.sendRequest(new String[]{}, datum); + + List messages = result.getMessages(); + Assertions.assertEquals(1, messages.size()); + Assertions.assertEquals("even", messages.get(0).getKeys()[0]); + + try { + client.close(); + mapperTestKit.stopServer(); + } catch (Exception e) { + log.error("Failed to stop server", e); + Assertions.fail("Failed to stop server"); + } + } + + @Test + @Order(2) + public void testFlatMapServerInvocation() { + MapperTestKit mapperTestKit = new MapperTestKit(new FlatMapFunction()); + try { + mapperTestKit.startServer(); + } catch (Exception e) { + log.error("Failed to start server", e); + } + + MapperTestKit.Client client = new MapperTestKit.Client(); + MapperTestKit.TestDatum datum = MapperTestKit.TestDatum + .builder() + .value("apple,banana,carrot".getBytes()) + .build(); + + MessageList result = client.sendRequest(new String[]{}, datum); + + List messages = result.getMessages(); + Assertions.assertEquals(3, messages.size()); + + Assertions.assertEquals("apple", new String(messages.get(0).getValue())); + Assertions.assertEquals("banana", new String(messages.get(1).getValue())); + Assertions.assertEquals("carrot", new String(messages.get(2).getValue())); + + try { + client.close(); + mapperTestKit.stopServer(); + } catch (Exception e) { + log.error("Failed to stop server", e); + } + } + + @Test + @Order(3) + public void testReduceServerInvocation() { + SumFactory sumFactory = new SumFactory(); + + ReducerTestKit reducerTestKit = new ReducerTestKit(sumFactory); + + // Start the server + try { + reducerTestKit.startServer(); + } catch (Exception e) { + Assertions.fail("Failed to start server"); + } + + // List of datum to be sent to the server + // create 10 datum with values 1 to 10 + List datumList = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + datumList.add(ReducerTestKit.TestDatum + .builder() + .value(Integer.toString(i).getBytes()) + .build()); + } + + // create a client and send requests to the server + ReducerTestKit.Client client = new ReducerTestKit.Client(); + + ReducerTestKit.TestReduceRequest testReduceRequest = ReducerTestKit.TestReduceRequest + .builder() + .datumList(datumList) + .keys(new String[]{"test-key"}) + .startTime(Instant.ofEpochSecond(60000)) + .endTime(Instant.ofEpochSecond(60010)) + .build(); + + try { + io.numaproj.numaflow.reducer.MessageList messageList = client.sendReduceRequest( + testReduceRequest); + // check if the response is correct + if (messageList.getMessages().size() != 1) { + Assertions.fail("Expected 1 message in the response"); + } + Assertions.assertEquals("55", new String(messageList.getMessages().get(0).getValue())); + + } catch (Exception e) { + e.printStackTrace(); + Assertions.fail("Failed to send request to server - "); + } + + // Stop the server + try { + client.close(); + reducerTestKit.stopServer(); + } catch (InterruptedException e) { + Assertions.fail("Failed to stop server"); + } + } + + @Test + @Order(4) + public void testSinkServerInvocation() { + int datumCount = 10; + SinkerTestKit sinkerTestKit = new SinkerTestKit(new SimpleSink()); + + // Start the server + try { + sinkerTestKit.startServer(); + } catch (Exception e) { + Assertions.fail("Failed to start server"); + } + + // Create a test datum iterator with 10 messages + SinkerTestKit.TestListIterator testListIterator = new SinkerTestKit.TestListIterator(); + for (int i = 0; i < datumCount; i++) { + testListIterator.addDatum(SinkerTestKit.TestDatum + .builder() + .id("id-" + i) + .value(("value-" + i).getBytes()) + .build()); + } + + SinkerTestKit.Client client = new SinkerTestKit.Client(); + try { + ResponseList responseList = client.sendRequest(testListIterator); + Assertions.assertEquals(datumCount, responseList.getResponses().size()); + for (Response response : responseList.getResponses()) { + Assertions.assertEquals(true, response.getSuccess()); + } + } catch (Exception e) { + Assertions.fail("Failed to send requests"); + } + + // Stop the server + try { + client.close(); + sinkerTestKit.stopServer(); + } catch (InterruptedException e) { + Assertions.fail("Failed to stop server"); + } + + // we can add the logic to verify if the messages were + // successfully written to the sink(could be a file, database, etc.) + } + + @Test + @Order(5) + public void testSourceServerInvocation() { + SimpleSource simpleSource = new SimpleSource(); + + SourcerTestKit sourcerTestKit = new SourcerTestKit(simpleSource); + try { + sourcerTestKit.startServer(); + } catch (Exception e) { + Assertions.fail("Failed to start server"); + } + + // create a client to send requests to the server + SourcerTestKit.Client sourcerClient = new SourcerTestKit.Client(); + // create a test observer to receive messages from the server + SourcerTestKit.TestListBasedObserver testObserver = new SourcerTestKit.TestListBasedObserver(); + // create a read request with count 10 and timeout 1 second + SourcerTestKit.TestReadRequest testReadRequest = SourcerTestKit.TestReadRequest.builder() + .count(10).timeout(Duration.ofSeconds(1)).build(); + + try { + sourcerClient.sendReadRequest(testReadRequest, testObserver); + Assertions.assertEquals(10, testObserver.getMessages().size()); + } catch (Exception e) { + Assertions.fail("Failed to send request to server"); + } + + try { + sourcerClient.close(); + sourcerTestKit.stopServer(); + } catch (InterruptedException e) { + Assertions.fail("Failed to stop server"); + } + } + + @Test + @Order(6) + public void testSourceTransformerServerInvocation() { + SourceTransformerTestKit sourceTransformerTestKit = new SourceTransformerTestKit(new EventTimeFilterFunction()); + try { + sourceTransformerTestKit.startServer(); + } catch (Exception e) { + Assertions.fail("Failed to start server"); + } + + // Create a client which can send requests to the server + SourceTransformerTestKit.Client client = new SourceTransformerTestKit.Client(); + + SourceTransformerTestKit.TestDatum datum = SourceTransformerTestKit.TestDatum.builder() + .eventTime(Instant.ofEpochMilli(1640995200000L)) + .value("test".getBytes()) + .build(); + io.numaproj.numaflow.sourcetransformer.MessageList result = client.sendRequest( + new String[]{}, + datum); + + List messages = result.getMessages(); + Assertions.assertEquals(1, messages.size()); + + Assertions.assertEquals("test", new String(messages.get(0).getValue())); + Assertions.assertEquals("within_year_2022", messages.get(0).getTags()[0]); + + try { + client.close(); + sourceTransformerTestKit.stopServer(); + } catch (Exception e) { + Assertions.fail("Failed to stop server"); + } + } +} diff --git a/examples/src/test/java/io/numaproj/numaflow/examples/sink/simple/SimpleSinkTest.java b/examples/src/test/java/io/numaproj/numaflow/examples/sink/simple/SimpleSinkTest.java new file mode 100644 index 00000000..3c8d34aa --- /dev/null +++ b/examples/src/test/java/io/numaproj/numaflow/examples/sink/simple/SimpleSinkTest.java @@ -0,0 +1,37 @@ +package io.numaproj.numaflow.examples.sink.simple; + +import io.numaproj.numaflow.sinker.Response; +import io.numaproj.numaflow.sinker.ResponseList; +import io.numaproj.numaflow.sinker.SinkerTestKit; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + + +@Slf4j +public class SimpleSinkTest { + + @Test + public void testSimpleSink() { + int datumCount = 10; + SimpleSink simpleSink = new SimpleSink(); + + // Create a test datum iterator with 10 messages + SinkerTestKit.TestListIterator testListIterator = new SinkerTestKit.TestListIterator(); + for (int i = 0; i < datumCount; i++) { + testListIterator.addDatum(SinkerTestKit.TestDatum + .builder() + .id("id-" + i) + .value(("value-" + i).getBytes()) + .build()); + } + + ResponseList responseList = simpleSink.processMessages(testListIterator); + Assertions.assertEquals(datumCount, responseList.getResponses().size()); + for (Response response : responseList.getResponses()) { + Assertions.assertEquals(true, response.getSuccess()); + } + // we can add the logic to verify if the messages were + // successfully written to the sink(could be a file, database, etc.) + } +} diff --git a/examples/src/test/java/io/numaproj/numaflow/examples/source/simple/SimpleSourceTest.java b/examples/src/test/java/io/numaproj/numaflow/examples/source/simple/SimpleSourceTest.java index 1bbc2ff5..cbd086f0 100644 --- a/examples/src/test/java/io/numaproj/numaflow/examples/source/simple/SimpleSourceTest.java +++ b/examples/src/test/java/io/numaproj/numaflow/examples/source/simple/SimpleSourceTest.java @@ -1,78 +1,62 @@ package io.numaproj.numaflow.examples.source.simple; -import io.numaproj.numaflow.sourcer.AckRequest; import io.numaproj.numaflow.sourcer.Message; import io.numaproj.numaflow.sourcer.Offset; -import io.numaproj.numaflow.sourcer.OutputObserver; -import io.numaproj.numaflow.sourcer.ReadRequest; -import lombok.Data; +import io.numaproj.numaflow.sourcer.SourcerTestKit; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import java.time.Duration; import java.util.ArrayList; -import java.util.List; -import static org.junit.jupiter.api.Assertions.assertEquals; public class SimpleSourceTest { - @Data - static class TestObserver implements OutputObserver { - List messages = new ArrayList<>(); - - @Override - public void send(Message message) { - messages.add(message); - } - } - @Test public void test_ReadAndAck() { SimpleSource simpleSource = new SimpleSource(); - TestObserver testObserver = new TestObserver(); + SourcerTestKit.TestListBasedObserver testObserver = new SourcerTestKit.TestListBasedObserver(); // Read 2 messages - ReadRequest readRequest = Mockito.mock(ReadRequest.class); - Mockito.when(readRequest.getCount()).thenReturn(2L); - Mockito.when(readRequest.getTimeout()).thenReturn(Duration.ofMillis(1000)); + SourcerTestKit.TestReadRequest readRequest = SourcerTestKit.TestReadRequest.builder() + .count(2).timeout(Duration.ofMillis(1000)).build(); simpleSource.read(readRequest, testObserver); - assertEquals(2, testObserver.messages.size()); + Assertions.assertEquals(2, testObserver.getMessages().size()); // Try reading 4 more messages // Since the previous batch didn't get acked, the data source shouldn't allow us to read more messages // We should get 0 messages, meaning the observer only holds the previous 2 messages - ReadRequest readRequest2 = Mockito.mock(ReadRequest.class); - Mockito.when(readRequest2.getCount()).thenReturn(2L); - Mockito.when(readRequest2.getTimeout()).thenReturn(Duration.ofMillis(1000)); + SourcerTestKit.TestReadRequest readRequest2 = SourcerTestKit.TestReadRequest.builder() + .count(2).timeout(Duration.ofMillis(1000)).build(); simpleSource.read(readRequest2, testObserver); - assertEquals(2, testObserver.messages.size()); + Assertions.assertEquals(2, testObserver.getMessages().size()); // Ack the first batch - AckRequest ackRequest = Mockito.mock(AckRequest.class); + ArrayList offsets = new ArrayList<>(); // iterate over the testObserver messages and get the offset - for (Message message : testObserver.messages) { + for (Message message : testObserver.getMessages()) { offsets.add(message.getOffset()); } - Mockito.when(ackRequest.getOffsets()).thenReturn(offsets); + SourcerTestKit.TestAckRequest ackRequest = SourcerTestKit.TestAckRequest.builder() + .offsets(offsets).build(); simpleSource.ack(ackRequest); // Try reading 6 more messages // Since the previous batch got acked, the data source should allow us to read more messages // We should get 6 more messages - total of 2+6=8 - ReadRequest readRequest3 = Mockito.mock(ReadRequest.class); - Mockito.when(readRequest3.getCount()).thenReturn(6L); - Mockito.when(readRequest3.getTimeout()).thenReturn(Duration.ofMillis(1000)); + SourcerTestKit.TestReadRequest readRequest3 = SourcerTestKit.TestReadRequest.builder() + .count(6).timeout(Duration.ofMillis(1000)).build(); simpleSource.read(readRequest3, testObserver); - assertEquals(8, testObserver.messages.size()); + Assertions.assertEquals(8, testObserver.getMessages().size()); } @Test public void testPending() { SimpleSource simpleSource = new SimpleSource(); // simple source getPending always returns 0. - assertEquals(0, simpleSource.getPending()); + Assertions.assertEquals(0, simpleSource.getPending()); } + } diff --git a/examples/src/test/java/io/numaproj/numaflow/examples/sourcetransformer/eventtimefilter/EventTimeFilterFunctionTest.java b/examples/src/test/java/io/numaproj/numaflow/examples/sourcetransformer/eventtimefilter/EventTimeFilterFunctionTest.java index 768ae521..e202dcc6 100644 --- a/examples/src/test/java/io/numaproj/numaflow/examples/sourcetransformer/eventtimefilter/EventTimeFilterFunctionTest.java +++ b/examples/src/test/java/io/numaproj/numaflow/examples/sourcetransformer/eventtimefilter/EventTimeFilterFunctionTest.java @@ -1,20 +1,51 @@ package io.numaproj.numaflow.examples.sourcetransformer.eventtimefilter; -import io.numaproj.numaflow.examples.utils.TestDatum; import io.numaproj.numaflow.sourcetransformer.Message; import io.numaproj.numaflow.sourcetransformer.MessageList; +import io.numaproj.numaflow.sourcetransformer.SourceTransformerTestKit; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.time.Instant; import java.util.List; -import static org.junit.jupiter.api.Assertions.assertEquals; - public class EventTimeFilterFunctionTest { + @Test + public void testServerInvocation() { + SourceTransformerTestKit sourceTransformerTestKit = new SourceTransformerTestKit(new EventTimeFilterFunction()); + try { + sourceTransformerTestKit.startServer(); + } catch (Exception e) { + Assertions.fail("Failed to start server"); + } + + // Create a client which can send requests to the server + SourceTransformerTestKit.Client client = new SourceTransformerTestKit.Client(); + + SourceTransformerTestKit.TestDatum datum = SourceTransformerTestKit.TestDatum.builder() + .eventTime(Instant.ofEpochMilli(1640995200000L)) + .value("test".getBytes()) + .build(); + MessageList result = client.sendRequest(new String[]{}, datum); + + List messages = result.getMessages(); + Assertions.assertEquals(1, messages.size()); + + Assertions.assertEquals("test", new String(messages.get(0).getValue())); + Assertions.assertEquals("within_year_2022", messages.get(0).getTags()[0]); + + try { + client.close(); + sourceTransformerTestKit.stopServer(); + } catch (Exception e) { + Assertions.fail("Failed to stop server"); + } + } + @Test public void testBefore2022() { - TestDatum datum = TestDatum + SourceTransformerTestKit.TestDatum datum = SourceTransformerTestKit.TestDatum .builder() .eventTime(Instant.ofEpochMilli(1640995199999L)) .build(); @@ -23,20 +54,20 @@ public void testBefore2022() { MessageList result = eventTimeFilterFunction.processMessage(new String[]{}, datum); List messages = result.getMessages(); - assertEquals(1, messages.size()); + Assertions.assertEquals(1, messages.size()); - assertEquals( + Assertions.assertEquals( Message.toDrop(datum.getEventTime()).getEventTime(), messages.get(0).getEventTime()); - assertEquals(0, messages.get(0).getValue().length); - assertEquals( + Assertions.assertEquals(0, messages.get(0).getValue().length); + Assertions.assertEquals( Message.toDrop(datum.getEventTime()).getTags()[0], messages.get(0).getTags()[0]); } @Test public void testWithin2022() { - TestDatum datum = TestDatum.builder() + SourceTransformerTestKit.TestDatum datum = SourceTransformerTestKit.TestDatum.builder() .eventTime(Instant.ofEpochMilli(1640995200000L)) .value("test".getBytes()) .build(); @@ -45,15 +76,15 @@ public void testWithin2022() { MessageList result = eventTimeFilterFunction.processMessage(new String[]{}, datum); List messages = result.getMessages(); - assertEquals(1, messages.size()); + Assertions.assertEquals(1, messages.size()); - assertEquals("test", new String(messages.get(0).getValue())); - assertEquals("within_year_2022", messages.get(0).getTags()[0]); + Assertions.assertEquals("test", new String(messages.get(0).getValue())); + Assertions.assertEquals("within_year_2022", messages.get(0).getTags()[0]); } @Test public void testAfter2022() { - TestDatum datum = TestDatum.builder() + SourceTransformerTestKit.TestDatum datum = SourceTransformerTestKit.TestDatum.builder() .eventTime(Instant.ofEpochMilli(1672531200000L)) .value("test".getBytes()) .build(); @@ -62,10 +93,10 @@ public void testAfter2022() { MessageList result = eventTimeFilterFunction.processMessage(new String[]{}, datum); List messages = result.getMessages(); - assertEquals(1, messages.size()); + Assertions.assertEquals(1, messages.size()); - assertEquals("test", new String(messages.get(0).getValue())); - assertEquals("after_year_2022", messages.get(0).getTags()[0]); + Assertions.assertEquals("test", new String(messages.get(0).getValue())); + Assertions.assertEquals("after_year_2022", messages.get(0).getTags()[0]); } } diff --git a/examples/src/test/java/io/numaproj/numaflow/examples/utils/TestDatum.java b/examples/src/test/java/io/numaproj/numaflow/examples/utils/TestDatum.java deleted file mode 100644 index c4a0b365..00000000 --- a/examples/src/test/java/io/numaproj/numaflow/examples/utils/TestDatum.java +++ /dev/null @@ -1,35 +0,0 @@ -package io.numaproj.numaflow.examples.utils; - -import io.numaproj.numaflow.mapper.Datum; -import lombok.Builder; - -import java.time.Instant; -import java.util.Map; - -@Builder -public class TestDatum implements Datum, io.numaproj.numaflow.sourcetransformer.Datum { - private byte[] value; - private Instant eventTime; - private Instant watermark; - private Map headers; - - @Override - public byte[] getValue() { - return value; - } - - @Override - public Instant getEventTime() { - return eventTime; - } - - @Override - public Instant getWatermark() { - return watermark; - } - - @Override - public Map getHeaders() { - return headers; - } -} diff --git a/pom.xml b/pom.xml index 4ff07067..a314e074 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ 3.21.8 1.59.0 4.1.108.Final - 2.8.1 + 2.8.5 2.0.7 @@ -170,7 +170,6 @@ io.grpc grpc-testing - test @@ -187,9 +186,10 @@ test + com.typesafe.akka - akka-actor-typed_2.13 + akka-actor_3 ${akka.version} @@ -244,26 +244,6 @@ - - maven-shade-plugin - 2.4.1 - - - package - - shade - - - - - reference.conf - - - - - - org.xolstice.maven.plugins protobuf-maven-plugin @@ -290,19 +270,6 @@ org.jacoco jacoco-maven-plugin 0.8.7 - - - io.numaproj.numaflow.examples.* - io.numaproj.numaflow.mapstream.v1 - io.numaproj.numaflow.map.v1 - io.numaproj.numaflow.reduce.v1 - io.numaproj.numaflow.sessionreduce.v1 - io.numaproj.numaflow.sourcetransformer.v1 - io.numaproj.numaflow.sink.v1 - io.numaproj.numaflow.sideinput.v1 - io.numaproj.numaflow.source.v1 - - @@ -345,6 +312,7 @@ io/numaproj/numaflow/shared/* io/numaproj/numaflow/sideinput/v1/* io/numaproj/numaflow/source/v1/* + **/*TestKit* @@ -370,7 +338,17 @@ false - io.numaproj.numaflow.info:io.numaproj.numaflow.map.v1:io.numaproj.numaflow.reduce.v1:io.numaproj.numaflow.mapstream.v1:io.numaproj.numaflow.sourcetransformer.v1:io.numaproj.numaflow.sink.v1:io.numaproj.numaflow.reduce.metadata:io.numaproj.numaflow.shared:io.numaproj.numaflow.sideinput.v1:io.numaproj.numaflow.source.v1 + io.numaproj.numaflow.info, + io.numaproj.numaflow.map.v1, + io.numaproj.numaflow.reduce.v1, + io.numaproj.numaflow.mapstream.v1, + io.numaproj.numaflow.sourcetransformer.v1, + io.numaproj.numaflow.sink.v1, + io.numaproj.numaflow.reduce.metadata, + io.numaproj.numaflow.shared, + io.numaproj.numaflow.sideinput.v1, + io.numaproj.numaflow.source.v1, + io.numaproj.numaflow.sessionreduce.v1 diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/map/evenodd/EvenOddFunction.java b/src/main/java/EvenOddFunction.java similarity index 88% rename from examples/src/main/java/io/numaproj/numaflow/examples/map/evenodd/EvenOddFunction.java rename to src/main/java/EvenOddFunction.java index 45ef7219..a49f69d5 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/map/evenodd/EvenOddFunction.java +++ b/src/main/java/EvenOddFunction.java @@ -18,7 +18,13 @@ public class EvenOddFunction extends Mapper { public static void main(String[] args) throws Exception { - new Server(new EvenOddFunction()).start(); + Server server = new Server(new EvenOddFunction()); + + // Start the server + server.start(); + + // Wait for the server to shutdown + server.awaitTermination(); } public MessageList processMessage(String[] keys, Datum data) { diff --git a/src/main/java/io/numaproj/numaflow/mapper/Constants.java b/src/main/java/io/numaproj/numaflow/mapper/Constants.java index 220d2d20..7140118e 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Constants.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Constants.java @@ -6,4 +6,8 @@ class Constants { public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/map.sock"; public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/mapper-server-info"; + + public static final int DEFAULT_PORT = 50051; + + public static final String DEFAULT_HOST = "localhost"; } diff --git a/src/main/java/io/numaproj/numaflow/mapper/Datum.java b/src/main/java/io/numaproj/numaflow/mapper/Datum.java index 028dd264..0ef1a55f 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Datum.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Datum.java @@ -14,26 +14,26 @@ public interface Datum { * * @return returns the payload value in byte array */ - public byte[] getValue(); + byte[] getValue(); /** * method to get the event time of the payload * * @return returns the event time of the payload */ - public Instant getEventTime(); + Instant getEventTime(); /** * method to get the watermark information * * @return returns the watermark */ - public Instant getWatermark(); + Instant getWatermark(); /** * method to get the headers information of the payload * * @return returns the headers in the form of key value pair */ - public Map getHeaders(); + Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java index 6a574cbe..99774a17 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.mapper; -import io.numaproj.numaflow.info.ServerInfoAccessor; import lombok.Builder; import lombok.Getter; @@ -10,9 +9,19 @@ @Getter @Builder(builderMethodName = "newBuilder") public class GRPCConfig { - private String socketPath; - private int maxMessageSize; - private String infoFilePath; + @Builder.Default + private String socketPath = Constants.DEFAULT_SOCKET_PATH; + + @Builder.Default + private int maxMessageSize = Constants.DEFAULT_MESSAGE_SIZE; + + @Builder.Default + private String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH; + + @Builder.Default + private int port = Constants.DEFAULT_PORT; + + private boolean isLocal; /** * Static method to create default GRPCConfig. @@ -21,6 +30,7 @@ static GRPCConfig defaultGrpcConfig() { return GRPCConfig.newBuilder() .infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH) .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow .socketPath(Constants.DEFAULT_SOCKET_PATH).build(); } } diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java b/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java new file mode 100644 index 00000000..1225979d --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java @@ -0,0 +1,190 @@ +package io.numaproj.numaflow.mapper; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.map.v1.MapGrpc; +import io.numaproj.numaflow.map.v1.MapOuterClass; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * MapperTestKit is a test kit for testing Mapper implementations. + * It provides methods to start and stop the server and send requests to the server. + */ +@Slf4j +public class MapperTestKit { + private final Mapper mapper; + private final GRPCConfig grpcConfig; + private Server server; + + /** + * Create a new MapperTestKit with the given Mapper. + * + * @param mapper the mapper to test + */ + public MapperTestKit(Mapper mapper) { + this(mapper, GRPCConfig.defaultGrpcConfig()); + } + + /** + * Create a new MapperTestKit with the given Mapper and GRPCConfig. + * + * @param mapper the mapper to test + * @param grpcConfig the grpc configuration to use. + */ + public MapperTestKit(Mapper mapper, GRPCConfig grpcConfig) { + this.mapper = mapper; + this.grpcConfig = grpcConfig; + } + + /** + * Start the server. + * + * @throws Exception if server fails to start + */ + public void startServer() throws Exception { + server = new Server(this.mapper, this.grpcConfig); + server.start(); + } + + /** + * Stops the server. + * + * @throws Exception if server fails to stop + */ + public void stopServer() throws Exception { + if (server != null) { + server.stop(); + } + } + + /** + * Client is a client for sending requests to the map server. + */ + public static class Client { + private final ManagedChannel channel; + private final MapGrpc.MapStub mapStub; + + /** + * empty constructor for Client. + * default host is localhost and port is 50051. + */ + public Client() { + this(Constants.DEFAULT_HOST, Constants.DEFAULT_PORT); + } + + /** + * constructor for Client with host and port. + * + * @param host the host to connect to + * @param port the port to connect to + */ + public Client(String host, int port) { + this.channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); + this.mapStub = MapGrpc.newStub(channel); + } + + private CompletableFuture sendGrpcRequest(MapOuterClass.MapRequest request) { + CompletableFuture future = new CompletableFuture<>(); + StreamObserver responseObserver = new StreamObserver<>() { + @Override + public void onNext(MapOuterClass.MapResponse response) { + future.complete(response); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + if (!future.isDone()) { + future.completeExceptionally(new RuntimeException( + "Server completed without a response")); + } + } + }; + + mapStub.mapFn( + request, responseObserver); + + return future; + } + + /** + * Send a request to the server. + * + * @param keys keys to send in the request + * @param data data to send in the request + * + * @return response from the server as a MessageList + */ + public MessageList sendRequest(String[] keys, Datum data) { + MapOuterClass.MapRequest request = MapOuterClass.MapRequest.newBuilder() + .addAllKeys(keys == null ? new ArrayList<>() : List.of(keys)) + .setValue(data.getValue() + == null ? ByteString.EMPTY : ByteString.copyFrom(data.getValue())) + .setEventTime( + data.getEventTime() == null ? Timestamp.newBuilder().build() : Timestamp + .newBuilder() + .setSeconds(data.getEventTime().getEpochSecond()) + .setNanos(data.getEventTime().getNano()) + .build()) + .setWatermark( + data.getWatermark() == null ? Timestamp.newBuilder().build() : Timestamp + .newBuilder() + .setSeconds(data.getWatermark().getEpochSecond()) + .setNanos(data.getWatermark().getNano()) + .build()) + .build(); + + try { + MapOuterClass.MapResponse response = this.sendGrpcRequest(request).get(); + List messages = response.getResultsList().stream() + .map(result -> new Message( + result.getValue().toByteArray(), + result.getKeysList().toArray(new String[0]), + result.getTagsList().toArray(new String[0]))) + .collect(Collectors.toList()); + + return new MessageList(messages); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Close the client. + * + * @throws InterruptedException if the client fails to close + */ + public void close() throws InterruptedException { + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } + } + + /** + * TestDatum is a Datum for testing. + */ + @Getter + @Builder + public static class TestDatum implements Datum { + private final byte[] value; + private final Instant eventTime; + private final Instant watermark; + private final Map headers; + } +} diff --git a/src/main/java/io/numaproj/numaflow/mapper/Server.java b/src/main/java/io/numaproj/numaflow/mapper/Server.java index 3ca6c381..042bf9a2 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Server.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Server.java @@ -1,7 +1,6 @@ package io.numaproj.numaflow.mapper; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; import io.grpc.ServerBuilder; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; @@ -42,20 +41,29 @@ public Server(Mapper mapper, GRPCConfig grpcConfig) { } /** - * Start serving requests. + * Starts the gRPC server and begins listening for requests. If the server is configured to be non-local, + * it writes server information to a specified path. A shutdown hook is registered to ensure the server + * is properly shut down when the JVM is shutting down. * - * @throws Exception if server fails to start + * @throws Exception if the server fails to start */ public void start() throws Exception { - GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + + if (!grpcConfig.isLocal()) { + GrpcServerUtils.writeServerInfo( + serverInfoAccessor, + grpcConfig.getSocketPath(), + grpcConfig.getInfoFilePath()); + } if (this.server == null) { - // create server builder - ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( - grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize()); + ServerBuilder serverBuilder = null; + // create server builder for domain socket server + serverBuilder = GrpcServerUtils.createServerBuilder( + grpcConfig.getSocketPath(), + grpcConfig.getMaxMessageSize(), + grpcConfig.isLocal(), + grpcConfig.getPort()); // build server this.server = serverBuilder @@ -67,7 +75,8 @@ public void start() throws Exception { server.start(); log.info( - "Server started, listening on socket path: " + grpcConfig.getSocketPath()); + "Server started, listening on {}", + grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -82,6 +91,17 @@ public void start() throws Exception { })); } + /** + * Blocks until the server has terminated. If the server is already terminated, this method + * will return immediately. If the server is not yet terminated, this method will block the + * calling thread until the server has terminated. + * + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public void awaitTermination() throws InterruptedException { + server.awaitTermination(); + } + /** * Stop serving requests and shutdown resources. Await termination on the main thread since the * grpc library uses daemon threads. @@ -95,11 +115,11 @@ public void stop() throws InterruptedException { } /** - * Set server builder for testing. + * Sets the server builder. This method can be used for testing purposes to provide a different + * grpc server builder. * - * @param serverBuilder in process server builder can be used for testing + * @param serverBuilder the server builder to be used */ - @VisibleForTesting public void setServerBuilder(ServerBuilder serverBuilder) { this.server = serverBuilder .addService(this.service) diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Constants.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Constants.java index 67cea812..46472913 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Constants.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Constants.java @@ -6,4 +6,8 @@ class Constants { public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/mapstream.sock"; public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/mapstreamer-server-info"; + + public static final int DEFAULT_PORT = 50051; + + public static final String DEFAULT_HOST = "localhost"; } diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Datum.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Datum.java index 379ec499..509d25db 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Datum.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Datum.java @@ -13,26 +13,26 @@ public interface Datum { * * @return returns the payload value in byte array */ - public byte[] getValue(); + byte[] getValue(); /** * method to get the event time of the payload * * @return returns the event time of the payload */ - public Instant getEventTime(); + Instant getEventTime(); /** * method to get the watermark information * * @return returns the watermark */ - public Instant getWatermark(); + Instant getWatermark(); /** * method to get the headers information of the payload * * @return returns the headers in the form of key value pair */ - public Map getHeaders(); + Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/mapstreamer/GRPCConfig.java index e0ae5671..07d5f219 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/GRPCConfig.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.mapstreamer; -import io.numaproj.numaflow.info.ServerInfoAccessor; import lombok.Builder; import lombok.Getter; @@ -10,9 +9,19 @@ @Getter @Builder(builderMethodName = "newBuilder") public class GRPCConfig { - private String socketPath; - private int maxMessageSize; - private String infoFilePath; + @Builder.Default + private String socketPath = Constants.DEFAULT_SOCKET_PATH; + + @Builder.Default + private int maxMessageSize = Constants.DEFAULT_MESSAGE_SIZE; + + @Builder.Default + private String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH; + + @Builder.Default + private int port = Constants.DEFAULT_PORT; + + private boolean isLocal; /** * Static method to create default GRPCConfig. @@ -21,6 +30,7 @@ static GRPCConfig defaultGrpcConfig() { return GRPCConfig.newBuilder() .infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH) .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow .socketPath(Constants.DEFAULT_SOCKET_PATH).build(); } } diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java index 1f5ab78d..29fcdc96 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java @@ -55,7 +55,10 @@ public void start() throws Exception { if (this.server == null) { // create server builder ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( - grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize()); + grpcConfig.getSocketPath(), + grpcConfig.getMaxMessageSize(), + grpcConfig.isLocal(), + grpcConfig.getPort()); // build server this.server = serverBuilder .addService(this.service) @@ -81,6 +84,17 @@ public void start() throws Exception { })); } + /** + * Blocks until the server has terminated. If the server is already terminated, this method + * will return immediately. If the server is not yet terminated, this method will block the + * calling thread until the server has terminated. + * + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public void awaitTermination() throws InterruptedException { + server.awaitTermination(); + } + /** * Stop serving requests and shutdown resources. Await termination on the main thread since the * grpc library uses daemon threads. diff --git a/src/main/java/io/numaproj/numaflow/reducer/ActorRequest.java b/src/main/java/io/numaproj/numaflow/reducer/ActorRequest.java index 6c1f7786..9977c7b9 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ActorRequest.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ActorRequest.java @@ -21,7 +21,7 @@ public String getUniqueIdentifier() { Constants.DELIMITER, this.getRequest().getPayload().getKeysList().toArray(new String[0])); } - + public String[] getKeySet() { return this.getRequest().getPayload().getKeysList().toArray(new String[0]); } diff --git a/src/main/java/io/numaproj/numaflow/reducer/Constants.java b/src/main/java/io/numaproj/numaflow/reducer/Constants.java index bf50bc08..4f55ce62 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Constants.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Constants.java @@ -7,6 +7,10 @@ class Constants { public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/reducer-server-info"; + public static final int DEFAULT_PORT = 50051; + + public static final String DEFAULT_HOST = "localhost"; + public static final String EOF = "EOF"; public static final String SUCCESS = "SUCCESS"; diff --git a/src/main/java/io/numaproj/numaflow/reducer/Datum.java b/src/main/java/io/numaproj/numaflow/reducer/Datum.java index 3b7165fc..63ffb05a 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Datum.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Datum.java @@ -13,26 +13,26 @@ public interface Datum { * * @return returns the payload value in byte array */ - public byte[] getValue(); + byte[] getValue(); /** * method to get the event time of the payload * * @return returns the event time of the payload */ - public Instant getEventTime(); + Instant getEventTime(); /** * method to get the watermark information * * @return returns the watermark */ - public Instant getWatermark(); + Instant getWatermark(); /** * method to get the headers information of the payload * * @return returns the headers in the form of key value pair */ - public Map getHeaders(); + Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/reducer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/reducer/GRPCConfig.java index aa4d0a89..a1b90a2a 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/reducer/GRPCConfig.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.reducer; -import io.numaproj.numaflow.info.ServerInfoAccessor; import lombok.Builder; import lombok.Getter; @@ -10,9 +9,19 @@ @Getter @Builder(builderMethodName = "newBuilder") public class GRPCConfig { - private String socketPath; - private int maxMessageSize; - private String infoFilePath; + @Builder.Default + private String socketPath = Constants.DEFAULT_SOCKET_PATH; + + @Builder.Default + private int maxMessageSize = Constants.DEFAULT_MESSAGE_SIZE; + + @Builder.Default + private String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH; + + @Builder.Default + private int port = Constants.DEFAULT_PORT; + + private boolean isLocal; /** * Static method to create default GRPCConfig. @@ -21,6 +30,7 @@ static GRPCConfig defaultGrpcConfig() { return GRPCConfig.newBuilder() .infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH) .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow .socketPath(Constants.DEFAULT_SOCKET_PATH).build(); } } diff --git a/src/main/java/io/numaproj/numaflow/reducer/MessageList.java b/src/main/java/io/numaproj/numaflow/reducer/MessageList.java index d22176a1..d85f52f3 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/MessageList.java +++ b/src/main/java/io/numaproj/numaflow/reducer/MessageList.java @@ -28,7 +28,7 @@ public MessageListBuilder addMessages(Iterable messages) { this.messages = new ArrayList<>(); } - for (Message message: messages) { + for (Message message : messages) { this.messages.add(message); } return this; diff --git a/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java b/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java index aeb2253e..327354a0 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java @@ -68,9 +68,9 @@ private ActorResponse buildActorResponse(Message message) { .newBuilder() .setValue(ByteString.copyFrom(message.getValue())) .addAllKeys(message.getKeys() - == null ? new ArrayList<>():Arrays.asList(message.getKeys())) + == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) .addAllTags( - message.getTags() == null ? new ArrayList<>():List.of(message.getTags())) + message.getTags() == null ? new ArrayList<>() : List.of(message.getTags())) .build()); return new ActorResponse(responseBuilder.build()); } diff --git a/src/main/java/io/numaproj/numaflow/reducer/ReducerTestKit.java b/src/main/java/io/numaproj/numaflow/reducer/ReducerTestKit.java new file mode 100644 index 00000000..b70317a0 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/reducer/ReducerTestKit.java @@ -0,0 +1,241 @@ +package io.numaproj.numaflow.reducer; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.stub.MetadataUtils; +import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.reduce.v1.ReduceGrpc; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY; +import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY; + +/** + * ReducerTestKit is a test kit for testing Reducer implementations. + * It provides methods to start and stop the server and send requests to the server. + * It also provides a simple implementation of Datum for testing. + * It also provides a simple client to send requests to the server. + */ +public class ReducerTestKit { + private final ReducerFactory reducer; + private final GRPCConfig grpcConfig; + private Server server; + + public ReducerTestKit(ReducerFactory reducer) { + this(reducer, GRPCConfig.defaultGrpcConfig()); + } + + public ReducerTestKit(ReducerFactory reducer, GRPCConfig grpcConfig) { + this.reducer = reducer; + this.grpcConfig = grpcConfig; + } + + /** + * startServer starts the server. + * + * @throws Exception if server fails to start + */ + public void startServer() throws Exception { + server = new Server(reducer, grpcConfig); + server.start(); + } + + /** + * stopServer stops the server. + * + * @throws InterruptedException if server fails to stop + */ + public void stopServer() throws InterruptedException { + if (server != null) { + server.stop(); + } + } + + /** + * Client is a client to send requests to the server. + * It provides a method to send a reduce request to the server. + */ + public static class Client { + private final ManagedChannel channel; + private final ReduceGrpc.ReduceStub reduceStub; + + /** + * Create a new Client with the default host and port. + * The default host is localhost and the default port is 50051. + */ + public Client() { + this(Constants.DEFAULT_HOST, Constants.DEFAULT_PORT); + } + + /** + * Create a new Client with the given host and port. + * + * @param host the host + * @param port the port + */ + public Client(String host, int port) { + this.channel = ManagedChannelBuilder.forAddress(host, port) + .usePlaintext() + .build(); + this.reduceStub = ReduceGrpc.newStub(channel); + } + + /** + * sendReduceRequest sends a reduce request to the server. + * + * @param testReduceRequest the request to send + * + * @return the response from the server + * + * @throws Exception if the request fails + */ + public MessageList sendReduceRequest(TestReduceRequest testReduceRequest) throws Exception { + List payloadList = new ArrayList<>(); + Metadata metadata = new io.grpc.Metadata(); + metadata.put( + Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), + String.valueOf(testReduceRequest.getStartTime().toEpochMilli())); + metadata.put( + Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), + String.valueOf(testReduceRequest.getEndTime().toEpochMilli())); + + for (Datum datum : testReduceRequest.getDatumList()) { + ReduceOuterClass.ReduceRequest.Payload.Builder payloadBuilder = + ReduceOuterClass.ReduceRequest.Payload.newBuilder() + .setValue(datum.getValue() + != null ? ByteString.copyFrom(datum.getValue()) : ByteString.EMPTY) + .setEventTime(datum.getEventTime() == null ? Timestamp + .newBuilder() + .build() : Timestamp + .newBuilder() + .setSeconds(datum.getEventTime().getEpochSecond()) + .setNanos(datum.getEventTime().getNano()) + .build()) + .setWatermark(datum.getWatermark() == null ? Timestamp + .newBuilder() + .build() : Timestamp + .newBuilder() + .setSeconds(datum.getWatermark().getEpochSecond()) + .setNanos(datum.getWatermark().getNano()) + .build()) + .addAllKeys(Arrays.asList(testReduceRequest.getKeys())) + .putAllHeaders(datum.getHeaders() + == null ? new HashMap<>() : datum.getHeaders()); + payloadList.add(payloadBuilder.build()); + } + + List responseList = new ArrayList<>(); + CompletableFuture responseFuture = new CompletableFuture<>(); + StreamObserver requestObserver = reduceStub + .withInterceptors( + MetadataUtils.newAttachHeadersInterceptor(metadata)) + .reduceFn(new StreamObserver<>() { + @Override + public void onNext(ReduceOuterClass.ReduceResponse value) { + responseList.add(value); + } + + @Override + public void onError(Throwable t) { + responseFuture.completeExceptionally(t); + } + + @Override + public void onCompleted() { + responseFuture.complete(true); + } + }); + + for (ReduceOuterClass.ReduceRequest.Payload payload : payloadList) { + // create a window from the start and end time + ReduceOuterClass.Window window = ReduceOuterClass.Window.newBuilder() + .setStart(Timestamp + .newBuilder() + .setSeconds(testReduceRequest.getStartTime().getEpochSecond()) + .setNanos(testReduceRequest.getStartTime().getNano()) + .build()) + .setEnd(Timestamp + .newBuilder() + .setSeconds(testReduceRequest.getEndTime().getEpochSecond()) + .setNanos(testReduceRequest.getEndTime().getNano()) + .build()) + .build(); + + // create a request with the payload and window + ReduceOuterClass.ReduceRequest grpcRequest = ReduceOuterClass.ReduceRequest + .newBuilder() + .setPayload(payload) + .setOperation(ReduceOuterClass.ReduceRequest.WindowOperation + .newBuilder() + .setEvent(ReduceOuterClass.ReduceRequest.WindowOperation.Event.APPEND) + .addWindows(window) + .build()) + .build(); + requestObserver.onNext(grpcRequest); + } + requestObserver.onCompleted(); + responseFuture.get(); + + MessageList.MessageListBuilder messageListBuilder = MessageList.newBuilder(); + for (ReduceOuterClass.ReduceResponse response : responseList) { + if (response.getEOF()) { + break; + } + messageListBuilder.addMessage(new Message( + response.getResult().getValue().toByteArray(), + response.getResult().getKeysList().toArray(new String[0]), + response.getResult().getTagsList().toArray(new String[0]) + )); + } + return messageListBuilder.build(); + } + + /** + * close the client. + */ + public void close() { + channel.shutdown(); + } + } + + /** + * TestReduceRequest is the request to send to the server for testing. + * It contains a list of Datum, keys, start time and end time. + */ + @Getter + @Builder + @Setter + public static class TestReduceRequest { + private List datumList; + private String[] keys; + private Instant startTime; + private Instant endTime; + } + + /** + * TestDatum is a simple implementation of Datum for testing. + */ + @Builder + @Getter + public static class TestDatum implements Datum { + private byte[] value; + private Instant eventTime; + private Instant watermark; + private Map headers; + } + +} diff --git a/src/main/java/io/numaproj/numaflow/reducer/Server.java b/src/main/java/io/numaproj/numaflow/reducer/Server.java index 769901bf..78d9f63a 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Server.java @@ -47,15 +47,20 @@ public Server(ReducerFactory reducerFactory, GRPCConfig grpcC * @throws Exception if server fails to start */ public void start() throws Exception { - GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + if (!grpcConfig.isLocal()) { + GrpcServerUtils.writeServerInfo( + serverInfoAccessor, + grpcConfig.getSocketPath(), + grpcConfig.getInfoFilePath()); + } if (this.server == null) { // create server builder ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( - grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize()); + grpcConfig.getSocketPath(), + grpcConfig.getMaxMessageSize(), + grpcConfig.isLocal(), + grpcConfig.getPort()); // build server this.server = serverBuilder @@ -67,8 +72,8 @@ public void start() throws Exception { server.start(); log.info( - "Server started, listening on socket path: " - + grpcConfig.getSocketPath()); + "Server started, listening on {}", + grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -83,6 +88,17 @@ public void start() throws Exception { })); } + /** + * Blocks until the server has terminated. If the server is already terminated, this method + * will return immediately. If the server is not yet terminated, this method will block the + * calling thread until the server has terminated. + * + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public void awaitTermination() throws InterruptedException { + server.awaitTermination(); + } + /** * Stop serving requests and shutdown resources. Await termination on the main thread since the * grpc library uses daemon threads. diff --git a/src/main/java/io/numaproj/numaflow/reducer/Service.java b/src/main/java/io/numaproj/numaflow/reducer/Service.java index c60a23e7..1a0c0c53 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Service.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Service.java @@ -23,7 +23,7 @@ class Service extends ReduceGrpc.ReduceImplBase { public static final ActorSystem reduceActorSystem = ActorSystem.create("reduce"); - private ReducerFactory reducerFactory; + private final ReducerFactory reducerFactory; public Service(ReducerFactory reducerFactory) { this.reducerFactory = reducerFactory; diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java index 0f05d241..e7984bc1 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java @@ -7,6 +7,10 @@ class Constants { public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/reducestreamer-server-info"; + public static final int DEFAULT_PORT = 50051; + + public static final String DEFAULT_HOST = "localhost"; + public static final String EOF = "EOF"; public static final String SUCCESS = "SUCCESS"; diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java index 6812e389..4d4c4382 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.reducestreamer; -import io.numaproj.numaflow.info.ServerInfoAccessor; import lombok.Builder; import lombok.Getter; @@ -10,9 +9,18 @@ @Getter @Builder(builderMethodName = "newBuilder") public class GRPCConfig { - private String socketPath; - private int maxMessageSize; - private String infoFilePath; + @Builder.Default + private String socketPath = Constants.DEFAULT_SOCKET_PATH; + @Builder.Default + private int maxMessageSize = Constants.DEFAULT_MESSAGE_SIZE; + + @Builder.Default + private String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH; + + @Builder.Default + private int port = Constants.DEFAULT_PORT; + + private boolean isLocal; /** * Static method to create default GRPCConfig. @@ -21,6 +29,7 @@ static GRPCConfig defaultGrpcConfig() { return GRPCConfig.newBuilder() .infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH) .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow .socketPath(Constants.DEFAULT_SOCKET_PATH).build(); } } diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java index 0222c661..688fecaf 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java @@ -40,9 +40,9 @@ private ActorResponse buildResponse(Message message) { .newBuilder() .setValue(ByteString.copyFrom(message.getValue())) .addAllKeys(message.getKeys() - == null ? new ArrayList<>():Arrays.asList(message.getKeys())) + == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) .addAllTags( - message.getTags() == null ? new ArrayList<>():List.of(message.getTags())) + message.getTags() == null ? new ArrayList<>() : List.of(message.getTags())) .build()); return new ActorResponse(responseBuilder.build()); } diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java index cf21236f..f48efec1 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java @@ -50,15 +50,20 @@ public Server( * @throws Exception if server fails to start */ public void start() throws Exception { - GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + if (!grpcConfig.isLocal()) { + GrpcServerUtils.writeServerInfo( + serverInfoAccessor, + grpcConfig.getSocketPath(), + grpcConfig.getInfoFilePath()); + } if (this.server == null) { // create server builder ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( - grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize()); + grpcConfig.getSocketPath(), + grpcConfig.getMaxMessageSize(), + grpcConfig.isLocal(), + grpcConfig.getPort()); // build server this.server = serverBuilder @@ -70,8 +75,8 @@ public void start() throws Exception { server.start(); log.info( - "Server started, listening on socket path: " - + grpcConfig.getSocketPath()); + "Server started, listening on {}", + grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -86,6 +91,17 @@ public void start() throws Exception { })); } + /** + * Blocks until the server has terminated. If the server is already terminated, this method + * will return immediately. If the server is not yet terminated, this method will block the + * calling thread until the server has terminated. + * + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public void awaitTermination() throws InterruptedException { + server.awaitTermination(); + } + /** * Stop serving requests and shutdown resources. Await termination on the main thread since the * grpc library uses daemon threads. diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java index c8029469..ae5a83ba 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java @@ -24,7 +24,7 @@ class Service extends ReduceGrpc.ReduceImplBase { public static final ActorSystem reduceActorSystem = ActorSystem.create("reducestream"); - private ReduceStreamerFactory reduceStreamerFactory; + private final ReduceStreamerFactory reduceStreamerFactory; public Service(ReduceStreamerFactory reduceStreamerFactory) { this.reduceStreamerFactory = reduceStreamerFactory; diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/model/Datum.java b/src/main/java/io/numaproj/numaflow/reducestreamer/model/Datum.java index c0d28a08..43a6a2c8 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/model/Datum.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/model/Datum.java @@ -33,5 +33,5 @@ public interface Datum { * * @return returns the headers in the form of key value pair */ - public Map getHeaders(); + Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/ActorResponse.java b/src/main/java/io/numaproj/numaflow/sessionreducer/ActorResponse.java index f5192ca6..fb5a345c 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/ActorResponse.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/ActorResponse.java @@ -35,6 +35,10 @@ private ActorResponse( this.mergeTaskId = mergeTaskId; } + boolean isEOFResponse() { + return this.accumulator == null && this.mergeTaskId == null; + } + static class ActorResponseBuilder { ActorResponse build() { if ((accumulator != null && mergeTaskId == null) || (accumulator == null @@ -45,8 +49,4 @@ ActorResponse build() { return new ActorResponse(response, isLast, accumulator, mergeTaskId); } } - - boolean isEOFResponse() { - return this.accumulator == null && this.mergeTaskId == null; - } } diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/Constants.java b/src/main/java/io/numaproj/numaflow/sessionreducer/Constants.java index b4b9c425..cbc9e166 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/Constants.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/Constants.java @@ -7,9 +7,13 @@ class Constants { public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sessionreducer-server-info"; + public static final int DEFAULT_PORT = 50051; + public static final String EOF = "EOF"; public static final String SUCCESS = "SUCCESS"; + public static final String DEFAULT_HOST = "localhost"; + public static final String DELIMITER = ":"; } diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sessionreducer/GRPCConfig.java index e990f5e9..94f27599 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/GRPCConfig.java @@ -9,9 +9,19 @@ @Getter @Builder(builderMethodName = "newBuilder") public class GRPCConfig { - private String socketPath; - private int maxMessageSize; - private String infoFilePath; + @Builder.Default + private String socketPath = Constants.DEFAULT_SOCKET_PATH; + + @Builder.Default + private int maxMessageSize = Constants.DEFAULT_MESSAGE_SIZE; + + @Builder.Default + private String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH; + + @Builder.Default + private int port = Constants.DEFAULT_PORT; + + private boolean isLocal; /** * Static method to create default GRPCConfig. @@ -20,6 +30,7 @@ static GRPCConfig defaultGrpcConfig() { return GRPCConfig.newBuilder() .infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH) .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow .socketPath(Constants.DEFAULT_SOCKET_PATH).build(); } } diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/OutputStreamObserverImpl.java b/src/main/java/io/numaproj/numaflow/sessionreducer/OutputStreamObserverImpl.java index b95605db..95f2307a 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/OutputStreamObserverImpl.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/OutputStreamObserverImpl.java @@ -40,9 +40,9 @@ private ActorResponse buildResponse(Message message, Sessionreduce.KeyedWindow k .newBuilder() .setValue(ByteString.copyFrom(message.getValue())) .addAllKeys(message.getKeys() - == null ? new ArrayList<>():Arrays.asList(message.getKeys())) + == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) .addAllTags( - message.getTags() == null ? new ArrayList<>():List.of(message.getTags())) + message.getTags() == null ? new ArrayList<>() : List.of(message.getTags())) .build()); return ActorResponse.builder() .response(responseBuilder.build()) diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java index 72047178..b66d8d21 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java @@ -50,15 +50,20 @@ public Server( * @throws Exception if server fails to start */ public void start() throws Exception { - GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + if (!grpcConfig.isLocal()) { + GrpcServerUtils.writeServerInfo( + serverInfoAccessor, + grpcConfig.getSocketPath(), + grpcConfig.getInfoFilePath()); + } if (this.server == null) { // create server builder ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( - grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize()); + grpcConfig.getSocketPath(), + grpcConfig.getMaxMessageSize(), + grpcConfig.isLocal(), + grpcConfig.getPort()); // build server this.server = serverBuilder @@ -70,8 +75,8 @@ public void start() throws Exception { server.start(); log.info( - "Server started, listening on socket path: " - + grpcConfig.getSocketPath()); + "Server started, listening on {}", + grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -86,6 +91,17 @@ public void start() throws Exception { })); } + /** + * Blocks until the server has terminated. If the server is already terminated, this method + * will return immediately. If the server is not yet terminated, this method will block the + * calling thread until the server has terminated. + * + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public void awaitTermination() throws InterruptedException { + server.awaitTermination(); + } + /** * Stop serving requests and shutdown resources. Await termination on the main thread since the * grpc library uses daemon threads. diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/SessionReducerActor.java b/src/main/java/io/numaproj/numaflow/sessionreducer/SessionReducerActor.java index b59eea85..3a636bad 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/SessionReducerActor.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/SessionReducerActor.java @@ -14,10 +14,10 @@ * a session reducer actor only works on its assigned single session window. */ class SessionReducerActor extends AbstractActor { + private final SessionReducer sessionReducer; // the session window the actor is working on private Sessionreduce.KeyedWindow keyedWindow; - private final SessionReducer sessionReducer; - private OutputStreamObserver outputStream; + private final OutputStreamObserver outputStream; // when set to true, it means this session is already closed. private boolean isClosed = false; diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/SupervisorActor.java b/src/main/java/io/numaproj/numaflow/sessionreducer/SupervisorActor.java index 499cebe3..484d8005 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/SupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/SupervisorActor.java @@ -120,7 +120,7 @@ private void handleReduceRequest(Sessionreduce.SessionReduceRequest request) { ActorRequest createRequest = ActorRequest.builder() .type(ActorRequestType.OPEN) .keyedWindow(windowOperation.getKeyedWindows(0)) - .payload(request.hasPayload() ? request.getPayload():null) + .payload(request.hasPayload() ? request.getPayload() : null) .build(); this.invokeActor(createRequest); break; @@ -133,7 +133,7 @@ private void handleReduceRequest(Sessionreduce.SessionReduceRequest request) { ActorRequest appendRequest = ActorRequest.builder() .type(ActorRequestType.APPEND) .keyedWindow(windowOperation.getKeyedWindows(0)) - .payload(request.hasPayload() ? request.getPayload():null) + .payload(request.hasPayload() ? request.getPayload() : null) .build(); this.invokeActor(appendRequest); break; @@ -182,7 +182,7 @@ private void handleReduceRequest(Sessionreduce.SessionReduceRequest request) { ActorRequest appendRequest = ActorRequest.builder() .type(ActorRequestType.APPEND) .keyedWindow(windowOperation.getKeyedWindows(1)) - .payload(request.hasPayload() ? request.getPayload():null) + .payload(request.hasPayload() ? request.getPayload() : null) .build(); this.invokeActor(appendRequest); break; diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/model/Datum.java b/src/main/java/io/numaproj/numaflow/sessionreducer/model/Datum.java index c6d370e3..9cfe4cb6 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/model/Datum.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/model/Datum.java @@ -33,5 +33,5 @@ public interface Datum { * * @return returns the headers in the form of key value pair */ - public Map getHeaders(); + Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java b/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java index 183a290c..8aa7fa20 100644 --- a/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java +++ b/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java @@ -89,7 +89,12 @@ public static void writeServerInfo( } } - // write server info to file + // server info file can be null if the Grpc server is used for local component testing + // write server info to file if file path is not null + if (infoFilePath == null) { + return; + } + ServerInfo serverInfo = new ServerInfo( Protocol.UDS_PROTOCOL, Language.JAVA, @@ -100,7 +105,11 @@ public static void writeServerInfo( serverInfoAccessor.write(serverInfo, infoFilePath); } - public static ServerBuilder createServerBuilder(String socketPath, int maxMessageSize) { + public static ServerBuilder createServerBuilder( + String socketPath, + int maxMessageSize, + boolean isLocal, + int port) { ServerInterceptor interceptor = new ServerInterceptor() { @Override public ServerCall.Listener interceptCall( @@ -144,6 +153,13 @@ private void handleException( }; } }; + + if (isLocal) { + return ServerBuilder.forPort(port) + .maxInboundMessageSize(maxMessageSize) + .intercept(interceptor); + } + return NettyServerBuilder .forAddress(new DomainSocketAddress(socketPath)) .channelType(GrpcServerUtils.getChannelTypeClass()) diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Constants.java b/src/main/java/io/numaproj/numaflow/sideinput/Constants.java index 849db530..b34cd73b 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/Constants.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/Constants.java @@ -6,4 +6,8 @@ public class Constants { static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sideinput-server-info"; static int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64; + + static int DEFAULT_PORT = 50051; + + static final String DEFAULT_HOST = "localhost"; } diff --git a/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java index f7066777..b5f24cf4 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/GRPCConfig.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.sideinput; -import io.numaproj.numaflow.info.ServerInfoAccessor; import lombok.Builder; import lombok.Getter; @@ -10,9 +9,19 @@ @Getter @Builder(builderMethodName = "newBuilder") public class GRPCConfig { - private String socketPath; - private int maxMessageSize; - private String infoFilePath; + @Builder.Default + private String socketPath = Constants.DEFAULT_SOCKET_PATH; + + @Builder.Default + private int maxMessageSize = Constants.DEFAULT_MESSAGE_SIZE; + + @Builder.Default + private String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH; + + @Builder.Default + private int port = Constants.DEFAULT_PORT; + + private boolean isLocal; /** * Static method to create default GRPCConfig. @@ -21,6 +30,7 @@ static GRPCConfig defaultGrpcConfig() { return GRPCConfig.newBuilder() .infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH) .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow .socketPath(Constants.DEFAULT_SOCKET_PATH).build(); } } diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Server.java b/src/main/java/io/numaproj/numaflow/sideinput/Server.java index 1ffadd72..78ac6f65 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/Server.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/Server.java @@ -3,9 +3,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.grpc.ServerBuilder; -import io.numaproj.numaflow.shared.GrpcServerUtils; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; +import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; @@ -47,15 +47,20 @@ public Server(SideInputRetriever sideInputRetriever, GRPCConfig grpcConfig) { * @throws Exception if server fails to start */ public void start() throws Exception { - GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + if (!grpcConfig.isLocal()) { + GrpcServerUtils.writeServerInfo( + serverInfoAccessor, + grpcConfig.getSocketPath(), + grpcConfig.getInfoFilePath()); + } if (this.server == null) { // create server builder ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( - grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize()); + grpcConfig.getSocketPath(), + grpcConfig.getMaxMessageSize(), + grpcConfig.isLocal(), + grpcConfig.getPort()); // build server this.server = serverBuilder @@ -67,7 +72,8 @@ public void start() throws Exception { server.start(); log.info( - "Server started, listening on socket path: " + grpcConfig.getSocketPath()); + "Server started, listening on {}", + grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -82,6 +88,17 @@ public void start() throws Exception { })); } + /** + * Blocks until the server has terminated. If the server is already terminated, this method + * will return immediately. If the server is not yet terminated, this method will block the + * calling thread until the server has terminated. + * + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public void awaitTermination() throws InterruptedException { + server.awaitTermination(); + } + /** * Stop serving requests and shutdown resources. Await termination on the main thread since the * grpc library uses daemon threads. diff --git a/src/main/java/io/numaproj/numaflow/sinker/Constants.java b/src/main/java/io/numaproj/numaflow/sinker/Constants.java index d52b86b4..87a8bb4e 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Constants.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Constants.java @@ -7,4 +7,7 @@ class Constants { public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64; + public static final int DEFAULT_PORT = 50051; + + public static final String DEFAULT_HOST = "localhost"; } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Datum.java b/src/main/java/io/numaproj/numaflow/sinker/Datum.java index bceb2f62..68cc91a4 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Datum.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Datum.java @@ -47,5 +47,5 @@ public interface Datum { * * @return returns the headers in the form of key value pair */ - public Map getHeaders(); + Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java index 9e7f5df1..4c7ec956 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/sinker/GRPCConfig.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.sinker; -import io.numaproj.numaflow.info.ServerInfoAccessor; import lombok.Builder; import lombok.Getter; @@ -10,9 +9,19 @@ @Getter @Builder(builderMethodName = "newBuilder") public class GRPCConfig { - private String socketPath; - private int maxMessageSize; - private String infoFilePath; + @Builder.Default + private String socketPath = Constants.DEFAULT_SOCKET_PATH; + + @Builder.Default + private int maxMessageSize = Constants.DEFAULT_MESSAGE_SIZE; + + @Builder.Default + private String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH; + + @Builder.Default + private int port = Constants.DEFAULT_PORT; + + private boolean isLocal; /** * Static method to create default GRPCConfig. @@ -21,6 +30,7 @@ static GRPCConfig defaultGrpcConfig() { return GRPCConfig.newBuilder() .infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH) .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow .socketPath(Constants.DEFAULT_SOCKET_PATH).build(); } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/ResponseList.java b/src/main/java/io/numaproj/numaflow/sinker/ResponseList.java index a80299c7..77146777 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/ResponseList.java +++ b/src/main/java/io/numaproj/numaflow/sinker/ResponseList.java @@ -32,7 +32,7 @@ public ResponseListBuilder addResponses(Iterable responses) { this.responses = new ArrayList<>(); } - for (Response response: responses) { + for (Response response : responses) { this.responses.add(response); } return this; diff --git a/src/main/java/io/numaproj/numaflow/sinker/Server.java b/src/main/java/io/numaproj/numaflow/sinker/Server.java index bfa8b2d0..58eae576 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Server.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Server.java @@ -1,7 +1,6 @@ package io.numaproj.numaflow.sinker; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; import io.grpc.ServerBuilder; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; @@ -47,15 +46,20 @@ public Server(Sinker sinker, GRPCConfig grpcConfig) { * @throws Exception if server fails to start */ public void start() throws Exception { - GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + if (!grpcConfig.isLocal()) { + GrpcServerUtils.writeServerInfo( + serverInfoAccessor, + grpcConfig.getSocketPath(), + grpcConfig.getInfoFilePath()); + } if (this.server == null) { // create server builder ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( - grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize()); + grpcConfig.getSocketPath(), + grpcConfig.getMaxMessageSize(), + grpcConfig.isLocal(), + grpcConfig.getPort()); // build server this.server = serverBuilder @@ -67,7 +71,8 @@ public void start() throws Exception { server.start(); log.info( - "Server started, listening on socket path: " + grpcConfig.getSocketPath()); + "Server started, listening on {}", + grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -82,6 +87,17 @@ public void start() throws Exception { })); } + /** + * Blocks until the server has terminated. If the server is already terminated, this method + * will return immediately. If the server is not yet terminated, this method will block the + * calling thread until the server has terminated. + * + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public void awaitTermination() throws InterruptedException { + server.awaitTermination(); + } + /** * Stop serving requests and shutdown resources. Await termination on the main thread since the * grpc library uses daemon threads. @@ -96,11 +112,11 @@ public void stop() throws InterruptedException { } /** - * Set server builder for testing. + * Sets the server builder. This method can be used for testing purposes to provide a different + * grpc server builder. * - * @param serverBuilder in process server builder can be used for testing + * @param serverBuilder the server builder to be used */ - @VisibleForTesting public void setServerBuilder(ServerBuilder serverBuilder) { this.server = serverBuilder .addService(this.service) diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 243b393f..53e82112 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -139,7 +139,7 @@ public void shutDown() { log.error("Sink executor was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed."); } else { - log.error("Sink executor was terminated."); + log.info("Sink executor was terminated."); } } catch (InterruptedException e) { Thread.interrupted(); diff --git a/src/main/java/io/numaproj/numaflow/sinker/Sinker.java b/src/main/java/io/numaproj/numaflow/sinker/Sinker.java index 075cdec3..0fa9dc2c 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Sinker.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Sinker.java @@ -14,6 +14,7 @@ public abstract class Sinker { * response list should be returned. * * @param datumStream stream of messages to be processed + * * @return response list */ public abstract ResponseList processMessages(DatumIterator datumStream); diff --git a/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java b/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java new file mode 100644 index 00000000..0dabf910 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java @@ -0,0 +1,277 @@ +package io.numaproj.numaflow.sinker; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.sink.v1.SinkGrpc; +import io.numaproj.numaflow.sink.v1.SinkOuterClass; +import jdk.jfr.Experimental; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * SinkerTestKit is a test kit for testing Sinker implementations. + * It provides methods to start and stop the server and send requests to the server. + * It also provides a simple implementation of Datum for testing. + * It also provides a simple client to send requests to the server. + */ +@Experimental +@Slf4j +public class SinkerTestKit { + + private final Sinker sinker; + private final GRPCConfig grpcConfig; + private Server server; + + /** + * Create a new SinkerTestKit. + * + * @param sinker the sinker to test + */ + public SinkerTestKit(Sinker sinker) { + this(sinker, GRPCConfig.defaultGrpcConfig()); + } + + /** + * Create a new SinkerTestKit with the given Sinker and GRPCConfig. + * + * @param sinker the sinker to test + * @param grpcConfig the grpc configuration to use + */ + public SinkerTestKit(Sinker sinker, GRPCConfig grpcConfig) { + this.sinker = sinker; + this.grpcConfig = grpcConfig; + } + + /** + * Start the server. + * + * @throws IOException if server fails to start + */ + public void startServer() throws Exception { + server = new Server(sinker, grpcConfig); + server.start(); + } + + /** + * Stop the server. + * + * @throws InterruptedException if server fails to stop + */ + public void stopServer() throws InterruptedException { + if (server != null) { + server.stop(); + } + } + + /** + * Client is a client for sending requests to the server. + */ + public static class Client { + private final ManagedChannel channel; + private final SinkGrpc.SinkStub sinkStub; + + /** + * Create a new Client with default host and port. + * Default host is localhost and default port is 50051. + */ + public Client() { + this(Constants.DEFAULT_HOST, Constants.DEFAULT_PORT); + } + + /** + * Create a new Client with the given host and port. + * + * @param host the host to connect to + * @param port the port to connect to + */ + public Client(String host, int port) { + this.channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); + this.sinkStub = SinkGrpc.newStub(channel); + } + + + /** + * Send request to the server. + * + * @param datumIterator iterator of Datum objects to send to the server + * + * @return response from the server as a ResponseList + */ + public ResponseList sendRequest(DatumIterator datumIterator) { + CompletableFuture future = new CompletableFuture<>(); + StreamObserver responseObserver = new StreamObserver<>() { + @Override + public void onNext(SinkOuterClass.SinkResponse response) { + future.complete(response); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + if (!future.isDone()) { + future.completeExceptionally(new RuntimeException( + "Server completed without a response")); + } + } + }; + + StreamObserver requestObserver = sinkStub.sinkFn( + responseObserver); + + while (true) { + Datum datum = null; + try { + datum = datumIterator.next(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + continue; + } + if (datum == null) { + break; + } + SinkOuterClass.SinkRequest request = SinkOuterClass.SinkRequest.newBuilder() + .addAllKeys( + datum.getKeys() + == null ? new ArrayList<>() : List.of(datum.getKeys())) + .setValue(datum.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom( + datum.getValue())) + .setId(datum.getId()) + .setEventTime(datum.getEventTime() == null ? Timestamp + .newBuilder() + .build() : Timestamp.newBuilder() + .setSeconds(datum.getEventTime().getEpochSecond()) + .setNanos(datum.getEventTime().getNano()).build()) + .setWatermark(datum.getWatermark() == null ? Timestamp + .newBuilder() + .build() : Timestamp.newBuilder() + .setSeconds(datum.getWatermark().getEpochSecond()) + .setNanos(datum.getWatermark().getNano()).build()) + .build(); + requestObserver.onNext(request); + } + + requestObserver.onCompleted(); + + SinkOuterClass.SinkResponse response; + try { + response = future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder(); + for (SinkOuterClass.SinkResponse.Result result : response.getResultsList()) { + if (result.getSuccess()) { + responseListBuilder.addResponse(Response.responseOK(result.getId())); + } else { + responseListBuilder.addResponse(Response.responseFailure( + result.getId(), + result.getErrMsg())); + } + } + + return responseListBuilder.build(); + } + + /** + * close the client. + * + * @throws InterruptedException if client fails to close + */ + public void close() throws InterruptedException { + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } + } + + /** + * TestListIterator is a list based DatumIterator for testing. + */ + @Getter + @Setter + public static class TestListIterator implements DatumIterator { + private final List data; + private int index; + + public TestListIterator() { + this.data = new ArrayList<>(); + this.index = 0; + } + + @Override + public Datum next() throws InterruptedException { + if (index < data.size()) { + return data.get(index++); + } + return null; + } + + public void addDatum(Datum datum) { + data.add(datum); + } + } + + /** + * TestBlockingIterator is a blocking queue based DatumIterator for testing. + * It has a queue size of 1. Users can use this to stream data to the server. + * If the queue is full, the iterator will block until the queue has space. + * users should invoke close() to indicate the end of the stream to the server. + */ + public static class TestBlockingIterator implements DatumIterator { + private final LinkedBlockingQueue queue; + private volatile boolean closed = false; + + public TestBlockingIterator() { + this.queue = new LinkedBlockingQueue<>(1); // set the queue size to 10 + } + + @Override + public Datum next() throws InterruptedException { + if (!closed) { + return queue.take(); + } + return null; + } + + public void addDatum(Datum datum) throws InterruptedException { + if (!closed) { + queue.put(datum); + } + } + + public void close() { + closed = true; + } + } + + /** + * TestDatum is a Datum for testing. + */ + @Getter + @Builder + public static class TestDatum implements Datum { + private final String id; + private final byte[] value; + private final String[] keys; + private final Instant eventTime; + private final Instant watermark; + private final Map headers; + } +} diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Constants.java b/src/main/java/io/numaproj/numaflow/sourcer/Constants.java index eaf6f3af..9bf1f3f5 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Constants.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Constants.java @@ -7,4 +7,7 @@ class Constants { public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64; + public static final int DEFAULT_PORT = 50051; + + public static final String DEFAULT_HOST = "localhost"; } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sourcer/GRPCConfig.java index a1ba43ce..cc9d1f98 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/GRPCConfig.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.sourcer; -import io.numaproj.numaflow.info.ServerInfoAccessor; import lombok.Builder; import lombok.Getter; @@ -10,9 +9,19 @@ @Getter @Builder(builderMethodName = "newBuilder") public class GRPCConfig { - private String socketPath; - private int maxMessageSize; - private String infoFilePath; + @Builder.Default + private String socketPath = Constants.DEFAULT_SOCKET_PATH; + + @Builder.Default + private int maxMessageSize = Constants.DEFAULT_MESSAGE_SIZE; + + @Builder.Default + private String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH; + + @Builder.Default + private int port = Constants.DEFAULT_PORT; + + private boolean isLocal; /** * Static method to create default GRPCConfig. @@ -21,6 +30,7 @@ static GRPCConfig defaultGrpcConfig() { return GRPCConfig.newBuilder() .infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH) .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow .socketPath(Constants.DEFAULT_SOCKET_PATH).build(); } } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Message.java b/src/main/java/io/numaproj/numaflow/sourcer/Message.java index 104f9606..7389d51b 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Message.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Message.java @@ -62,7 +62,12 @@ public Message(byte[] value, Offset offset, Instant eventTime, Map headers) { + public Message( + byte[] value, + Offset offset, + Instant eventTime, + String[] keys, + Map headers) { this.value = value; this.offset = offset; this.eventTime = eventTime; diff --git a/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java b/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java index fb06c4ea..dbc004a0 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java @@ -44,7 +44,8 @@ private SourceOuterClass.ReadResponse buildResponse(Message message) { .getOffset() .getValue())) .setPartitionId(message.getOffset().getPartitionId())) - .putAllHeaders(message.getHeaders() != null ? message.getHeaders() : new HashMap<>()) + .putAllHeaders(message.getHeaders() + != null ? message.getHeaders() : new HashMap<>()) .build()); return builder.build(); diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Server.java b/src/main/java/io/numaproj/numaflow/sourcer/Server.java index 3a13b957..5fbdbbff 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Server.java @@ -47,15 +47,20 @@ public Server(Sourcer sourcer, GRPCConfig grpcConfig) { * @throws Exception if server fails to start */ public void start() throws Exception { - GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + if (!grpcConfig.isLocal()) { + GrpcServerUtils.writeServerInfo( + serverInfoAccessor, + grpcConfig.getSocketPath(), + grpcConfig.getInfoFilePath()); + } if (this.server == null) { // create server builder ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( - grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize()); + grpcConfig.getSocketPath(), + grpcConfig.getMaxMessageSize(), + grpcConfig.isLocal(), + grpcConfig.getPort()); // build server this.server = serverBuilder @@ -67,7 +72,8 @@ public void start() throws Exception { server.start(); log.info( - "Server started, listening on socket path: " + grpcConfig.getSocketPath()); + "Server started, listening on {}", + grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -82,6 +88,17 @@ public void start() throws Exception { })); } + /** + * Blocks until the server has terminated. If the server is already terminated, this method + * will return immediately. If the server is not yet terminated, this method will block the + * calling thread until the server has terminated. + * + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public void awaitTermination() throws InterruptedException { + server.awaitTermination(); + } + /** * Stop serving requests and shutdown resources. Await termination on the main thread since the * grpc library uses daemon threads. diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java b/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java index 26b262c2..88dc8c57 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Sourcer.java @@ -11,6 +11,19 @@ * which will be used for getting the number of pending messages in the */ public abstract class Sourcer { + /** + * method returns default partitions for the source. + * It can be used in the getPartitions() function of the Sourcer interface only + * if the source doesn't have partitions. DefaultPartition will be the pod replica + * index of the source. + * + * @return list of partitions + */ + public static List defaultPartitions() { + String partition = System.getenv().getOrDefault("NUMAFLOW_REPLICA", "0"); + return Collections.singletonList(Integer.parseInt(partition)); + } + /** * method will be used for reading messages from source. * @@ -44,17 +57,4 @@ public abstract class Sourcer { * @return list of partitions */ public abstract List getPartitions(); - - /** - * method returns default partitions for the source. - * It can be used in the getPartitions() function of the Sourcer interface only - * if the source doesn't have partitions. DefaultPartition will be the pod replica - * index of the source. - * - * @return list of partitions - */ - public static List defaultPartitions() { - String partition = System.getenv().getOrDefault("NUMAFLOW_REPLICA", "0"); - return Collections.singletonList(Integer.parseInt(partition)); - } } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/SourcerTestKit.java b/src/main/java/io/numaproj/numaflow/sourcer/SourcerTestKit.java new file mode 100644 index 00000000..6ae903d6 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/sourcer/SourcerTestKit.java @@ -0,0 +1,305 @@ +package io.numaproj.numaflow.sourcer; + +import com.google.protobuf.Empty; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.source.v1.SourceGrpc; +import io.numaproj.numaflow.source.v1.SourceOuterClass; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * SourcerTestKit is a test kit for testing Sourcer implementations. + * It provides methods to start and stop the server and send requests to the server. + * It also provides simple implementations of ReadRequest, AckRequest and OutputObserver for testing. + * It also provides a simple client to send requests to the server. + */ +public class SourcerTestKit { + private final Sourcer sourcer; + private final GRPCConfig grpcConfig; + private Server server; + + /** + * Create a new SourcerTestKit with the given Sourcer. + * + * @param sourcer the sourcer to test + */ + public SourcerTestKit(Sourcer sourcer) { + this(sourcer, GRPCConfig.defaultGrpcConfig()); + } + + /** + * Create a new SourcerTestKit with the given Sourcer and GRPCConfig. + * + * @param sourcer the sourcer to test + * @param grpcConfig the grpc configuration to use + */ + public SourcerTestKit(Sourcer sourcer, GRPCConfig grpcConfig) { + this.sourcer = sourcer; + this.grpcConfig = grpcConfig; + } + + /** + * startServer starts the server. + * + * @throws Exception if server fails to start + */ + public void startServer() throws Exception { + server = new Server(sourcer, grpcConfig); + server.start(); + } + + /** + * stopServer stops the server. + * + * @throws InterruptedException if server fails to stop + */ + public void stopServer() throws InterruptedException { + if (server != null) { + server.stop(); + } + } + + /** + * SourcerClient is a client to send requests to the server. + * It provides methods to send read, ack and pending requests to the server. + */ + public static class Client { + private final ManagedChannel channel; + private final SourceGrpc.SourceStub sourceStub; + + /** + * empty constructor for Client. + * default host is localhost and port is 50051. + */ + public Client() { + this(Constants.DEFAULT_HOST, Constants.DEFAULT_PORT); + } + + /** + * Create a new SourcerClient with the given host and port. + * + * @param host the host + * @param port the port + */ + public Client(String host, int port) { + this.channel = ManagedChannelBuilder.forAddress(host, port) + .usePlaintext() + .build(); + this.sourceStub = SourceGrpc.newStub(channel); + } + + /** + * close closes the client. + * + * @throws InterruptedException if the client fails to close + */ + public void close() throws InterruptedException { + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } + + /** + * sendReadRequest sends a read request to the server. + * + * @param request the read request + * @param observer the output observer to receive the messages + * + * @throws Exception if the request fails + */ + public void sendReadRequest(ReadRequest request, OutputObserver observer) throws Exception { + SourceOuterClass.ReadRequest grpcRequest = SourceOuterClass.ReadRequest.newBuilder() + .setRequest(SourceOuterClass.ReadRequest.Request.newBuilder() + .setNumRecords(request.getCount()) + .setTimeoutInMs((int) request.getTimeout().toMillis()) + .build()) + .build(); + + CompletableFuture future = new CompletableFuture<>(); + sourceStub.readFn(grpcRequest, new StreamObserver<>() { + @Override + public void onNext(SourceOuterClass.ReadResponse value) { + Message message = new Message( + value.getResult().getPayload().toByteArray(), + new Offset( + value.getResult().getOffset().getOffset().toByteArray(), + value.getResult().getOffset().getPartitionId()), + Instant.ofEpochSecond( + value.getResult().getEventTime().getSeconds(), + value.getResult().getEventTime().getNanos() + ), + value.getResult().getKeysList().toArray(new String[0]), + value.getResult().getHeadersMap()); + observer.send(message); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + future.complete(true); + } + }); + future.get(); + } + + /** + * sendAckRequest sends an ack request to the server. + * + * @param request the ack request + * + * @throws Exception if the request fails + */ + public void sendAckRequest(AckRequest request) throws Exception { + CompletableFuture future = new CompletableFuture<>(); + SourceOuterClass.AckRequest.Request.Builder builder = SourceOuterClass.AckRequest.Request.newBuilder(); + for (Offset offset : request.getOffsets()) { + builder.addOffsets(SourceOuterClass.Offset.newBuilder() + .setOffset(com.google.protobuf.ByteString.copyFrom(offset.getValue())) + .setPartitionId(offset.getPartitionId()) + .build()); + } + + SourceOuterClass.AckRequest grpcRequest = SourceOuterClass.AckRequest.newBuilder() + .setRequest(builder.build()) + .build(); + + sourceStub.ackFn(grpcRequest, new StreamObserver<>() { + @Override + public void onNext(SourceOuterClass.AckResponse value) { + future.complete(value); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + if (!future.isDone()) { + future.completeExceptionally(new RuntimeException( + "Server completed without a response")); + } + } + }); + future.get(); + } + + /** + * sendPendingRequest sends a pending request to the server. + * + * @return the number of pending messages + * + * @throws Exception if the request fails + */ + public long sendPendingRequest() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + StreamObserver observer = new StreamObserver<>() { + + @Override + public void onNext(SourceOuterClass.PendingResponse value) { + future.complete(value); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + if (!future.isDone()) { + future.completeExceptionally(new RuntimeException( + "Server completed without a response")); + } + } + }; + sourceStub.pendingFn(Empty.newBuilder().build(), observer); + return future.get().getResult().getCount(); + } + + /** + * sendGetPartitionsRequest sends a getPartitions request to the server. + * + * @return the list of source partitions + * + * @throws Exception if the request fails + */ + public List sendGetPartitionsRequest() throws Exception { + CompletableFuture future = new CompletableFuture<>(); + StreamObserver observer = new StreamObserver<>() { + + @Override + public void onNext(SourceOuterClass.PartitionsResponse value) { + future.complete(value); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + if (!future.isDone()) { + future.completeExceptionally(new RuntimeException( + "Server completed without a response")); + } + } + }; + sourceStub.partitionsFn(Empty.newBuilder().build(), observer); + return future.get().getResult().getPartitionsList(); + } + } + + + /** + * TestReadRequest is a simple implementation of ReadRequest for testing. + */ + @Getter + @Setter + @Builder + public static class TestReadRequest implements ReadRequest { + private long count; + private Duration timeout; + } + + /** + * TestAckRequest is a simple implementation of AckRequest for testing. + */ + @Getter + @Setter + @Builder + public static class TestAckRequest implements AckRequest { + List offsets; + } + + /** + * TestListBasedObserver is a simple list based implementation of OutputObserver for testing. + */ + @Getter + @Setter + public static class TestListBasedObserver implements OutputObserver { + private List messages = new ArrayList<>(); + + + @Override + public void send(Message message) { + messages.add(message); + } + + } + +} diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Constants.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Constants.java index 4d7788db..63f4e015 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Constants.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Constants.java @@ -6,4 +6,8 @@ class Constants { public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sourcetransformer-server-info"; public static int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64; + + public static int DEFAULT_PORT = 50051; + + public static final String DEFAULT_HOST = "localhost"; } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Datum.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Datum.java index 91f87917..0f2b3953 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Datum.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Datum.java @@ -14,26 +14,26 @@ public interface Datum { * * @return returns the payload value in byte array */ - public byte[] getValue(); + byte[] getValue(); /** * method to get the event time of the payload * * @return returns the event time of the payload */ - public Instant getEventTime(); + Instant getEventTime(); /** * method to get the watermark information * * @return returns the watermark */ - public Instant getWatermark(); + Instant getWatermark(); /** * method to get the headers information of the payload * * @return returns the headers in the form of key value pair */ - public Map getHeaders(); + Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/GRPCConfig.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/GRPCConfig.java index 7eddaabf..c338d702 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/GRPCConfig.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/GRPCConfig.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.sourcetransformer; -import io.numaproj.numaflow.info.ServerInfoAccessor; import lombok.Builder; import lombok.Getter; @@ -10,9 +9,19 @@ @Getter @Builder(builderMethodName = "newBuilder") public class GRPCConfig { - private String socketPath; - private int maxMessageSize; - private String infoFilePath; + @Builder.Default + private String socketPath = Constants.DEFAULT_SOCKET_PATH; + + @Builder.Default + private int maxMessageSize = Constants.DEFAULT_MESSAGE_SIZE; + + @Builder.Default + private String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH; + + @Builder.Default + private int port = Constants.DEFAULT_PORT; + + private boolean isLocal; /** * Static method to create default GRPCConfig. @@ -21,6 +30,7 @@ static GRPCConfig defaultGrpcConfig() { return GRPCConfig.newBuilder() .infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH) .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .isLocal(System.getenv("NUMAFLOW_POD") == null) // if NUMAFLOW_POD is not set, then we are not running using numaflow .socketPath(Constants.DEFAULT_SOCKET_PATH).build(); } } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java index 72e2586c..93f84b23 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java @@ -47,15 +47,20 @@ public Server(SourceTransformer sourceTransformer, GRPCConfig grpcConfig) { * @throws Exception if server fails to start */ public void start() throws Exception { - GrpcServerUtils.writeServerInfo( - serverInfoAccessor, - grpcConfig.getSocketPath(), - grpcConfig.getInfoFilePath()); + if (!grpcConfig.isLocal()) { + GrpcServerUtils.writeServerInfo( + serverInfoAccessor, + grpcConfig.getSocketPath(), + grpcConfig.getInfoFilePath()); + } if (this.server == null) { // create server builder ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( - grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize()); + grpcConfig.getSocketPath(), + grpcConfig.getMaxMessageSize(), + grpcConfig.isLocal(), + grpcConfig.getPort()); // build server this.server = serverBuilder @@ -67,7 +72,8 @@ public void start() throws Exception { server.start(); log.info( - "Server started, listening on socket path: " + grpcConfig.getSocketPath()); + "Server started, listening on {}", + grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -82,6 +88,17 @@ public void start() throws Exception { })); } + /** + * Blocks until the server has terminated. If the server is already terminated, this method + * will return immediately. If the server is not yet terminated, this method will block the + * calling thread until the server has terminated. + * + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public void awaitTermination() throws InterruptedException { + server.awaitTermination(); + } + /** * Stop serving requests and shutdown resources. Await termination on the main thread since the * grpc library uses daemon threads. diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java new file mode 100644 index 00000000..c9dda363 --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java @@ -0,0 +1,204 @@ +package io.numaproj.numaflow.sourcetransformer; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc; +import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * SourceTransformerTestKit is a test kit for testing SourceTransformer implementations. + * It provides methods to start and stop the server and send requests to the server. + */ +@Slf4j +public class SourceTransformerTestKit { + private final SourceTransformer sourceTransformer; + private final GRPCConfig grpcConfig; + private Server server; + + /** + * Create a new SourceTransformerTestKit with the given SourceTransformer. + * + * @param sourceTransformer the source transformer to test + */ + public SourceTransformerTestKit(SourceTransformer sourceTransformer) { + this(sourceTransformer, GRPCConfig.defaultGrpcConfig()); + } + + /** + * Create a new SourceTransformerTestKit with the given SourceTransformer and GRPCConfig. + * + * @param sourceTransformer the sourceTransformer to test + * @param grpcConfig the grpc configuration to use. + */ + public SourceTransformerTestKit(SourceTransformer sourceTransformer, GRPCConfig grpcConfig) { + this.sourceTransformer = sourceTransformer; + this.grpcConfig = grpcConfig; + } + + /** + * Start the server. + * + * @throws Exception if server fails to start + */ + public void startServer() throws Exception { + server = new Server(this.sourceTransformer, this.grpcConfig); + server.start(); + } + + /** + * Stops the server. + * + * @throws Exception if server fails to stop + */ + public void stopServer() throws Exception { + if (server != null) { + server.stop(); + } + } + + /** + * SourceTransformerClient is a client for sending requests to the source transformer server. + */ + public static class Client { + private final ManagedChannel channel; + private final SourceTransformGrpc.SourceTransformStub sourceTransformStub; + + /** + * empty constructor for Client. + * default host is localhost and port is 50051. + */ + public Client() { + this(Constants.DEFAULT_HOST, Constants.DEFAULT_PORT); + } + + /** + * constructor for Client with host and port. + * + * @param host the host to connect to + * @param port the port to connect to + */ + public Client(String host, int port) { + this.channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); + this.sourceTransformStub = SourceTransformGrpc.newStub(channel); + } + + /** + * Send a gRPC request to the server. + * + * @param request the request to send + * + * @return a CompletableFuture that will be completed when the response is received + */ + private CompletableFuture sendGrpcRequest( + Sourcetransformer.SourceTransformRequest request) { + CompletableFuture future = new CompletableFuture<>(); + StreamObserver responseObserver = new StreamObserver<>() { + @Override + public void onNext(Sourcetransformer.SourceTransformResponse response) { + future.complete(response); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + if (!future.isDone()) { + future.completeExceptionally(new RuntimeException( + "Server completed without a response")); + } + } + }; + + sourceTransformStub.sourceTransformFn( + request, responseObserver); + + return future; + } + + /** + * Send a request to the server. + * + * @param keys keys to send in the request + * @param data data to send in the request + * + * @return response from the server as a MessageList + */ + public MessageList sendRequest(String[] keys, Datum data) { + Sourcetransformer.SourceTransformRequest request = Sourcetransformer.SourceTransformRequest + .newBuilder() + .addAllKeys(keys == null ? new ArrayList<>() : List.of(keys)) + .setValue(data.getValue() + == null ? ByteString.EMPTY : ByteString.copyFrom(data.getValue())) + .setEventTime( + data.getEventTime() == null ? Timestamp.newBuilder().build() : Timestamp + .newBuilder() + .setSeconds(data.getEventTime().getEpochSecond()) + .setNanos(data.getEventTime().getNano()) + .build()) + .setWatermark( + data.getWatermark() == null ? Timestamp.newBuilder().build() : Timestamp + .newBuilder() + .setSeconds(data.getWatermark().getEpochSecond()) + .setNanos(data.getWatermark().getNano()) + .build()) + .build(); + + try { + Sourcetransformer.SourceTransformResponse response = this + .sendGrpcRequest(request) + .get(); + List messages = response.getResultsList().stream() + .map(result -> new Message( + result.getValue().toByteArray(), + Instant.ofEpochSecond( + result.getEventTime().getSeconds(), + result.getEventTime().getNanos()), + result.getKeysList().toArray(new String[0]), + result.getTagsList().toArray(new String[0]))) + .collect(Collectors.toList()); + + return new MessageList(messages); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Close the client. + * + * @throws InterruptedException if the client fails to close + */ + public void close() throws InterruptedException { + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } + } + + /** + * TestDatum is a Datum for testing. + */ + @Getter + @Builder + public static class TestDatum implements Datum { + private final byte[] value; + private final Instant eventTime; + private final Instant watermark; + private final Map headers; + } +} diff --git a/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java b/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java index 6223258e..89e81766 100644 --- a/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java +++ b/src/test/java/io/numaproj/numaflow/info/ServerInfoAccessorImplTest.java @@ -13,7 +13,7 @@ @RunWith(JUnit4.class) public class ServerInfoAccessorImplTest { - private ServerInfoAccessor underTest = new ServerInfoAccessorImpl(new ObjectMapper()); + private final ServerInfoAccessor underTest = new ServerInfoAccessorImpl(new ObjectMapper()); @Test public void given_localEnvironment_when_getNumaflowJavaSDKVersion_then_returnAValidVersion() { @@ -39,7 +39,9 @@ public void given_writeServerInfo_when_read_then_returnExactSame() { ServerInfo got = this.underTest.read(testFilePath); assertEquals(testServerInfo.getLanguage(), got.getLanguage()); assertEquals(testServerInfo.getProtocol(), got.getProtocol()); - assertEquals(testServerInfo.getMinimum_numaflow_version(), got.getMinimum_numaflow_version()); + assertEquals( + testServerInfo.getMinimum_numaflow_version(), + got.getMinimum_numaflow_version()); assertEquals(testServerInfo.getVersion(), got.getVersion()); assertEquals(testServerInfo.getMetadata(), got.getMetadata()); } catch (Exception e) { diff --git a/src/test/java/io/numaproj/numaflow/mapper/GRPCConfigTest.java b/src/test/java/io/numaproj/numaflow/mapper/GRPCConfigTest.java index fef83f10..aae5b7b4 100644 --- a/src/test/java/io/numaproj/numaflow/mapper/GRPCConfigTest.java +++ b/src/test/java/io/numaproj/numaflow/mapper/GRPCConfigTest.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.mapper; -import io.numaproj.numaflow.info.ServerInfoAccessor; import org.junit.Assert; import org.junit.Test; @@ -15,10 +14,13 @@ public void testDefaultGrpcConfig() { grpcConfig.getInfoFilePath()); Assert.assertEquals(Constants.DEFAULT_MESSAGE_SIZE, grpcConfig.getMaxMessageSize()); Assert.assertEquals(Constants.DEFAULT_SOCKET_PATH, grpcConfig.getSocketPath()); + Assert.assertEquals(Constants.DEFAULT_PORT, grpcConfig.getPort()); + Assert.assertTrue(grpcConfig.isLocal()); } @Test public void testNewBuilder() { + int port = 8001; String socketPath = "test-socket-path"; int maxMessageSize = 2000; String infoFilePath = "test-info-file-path"; @@ -26,11 +28,15 @@ public void testNewBuilder() { .socketPath(socketPath) .maxMessageSize(maxMessageSize) .infoFilePath(infoFilePath) + .port(port) + .isLocal(false) .build(); Assert.assertNotNull(grpcConfig); Assert.assertEquals(socketPath, grpcConfig.getSocketPath()); Assert.assertEquals(maxMessageSize, grpcConfig.getMaxMessageSize()); Assert.assertEquals(infoFilePath, grpcConfig.getInfoFilePath()); + Assert.assertEquals(port, grpcConfig.getPort()); + Assert.assertFalse(grpcConfig.isLocal()); } } diff --git a/src/test/java/io/numaproj/numaflow/mapstreamer/GRPCConfigTest.java b/src/test/java/io/numaproj/numaflow/mapstreamer/GRPCConfigTest.java index 6ed2d840..86435bd4 100644 --- a/src/test/java/io/numaproj/numaflow/mapstreamer/GRPCConfigTest.java +++ b/src/test/java/io/numaproj/numaflow/mapstreamer/GRPCConfigTest.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.mapstreamer; -import io.numaproj.numaflow.info.ServerInfoAccessor; import org.junit.Assert; import org.junit.Test; @@ -15,6 +14,8 @@ public void testDefaultGrpcConfig() { grpcConfig.getInfoFilePath()); Assert.assertEquals(Constants.DEFAULT_MESSAGE_SIZE, grpcConfig.getMaxMessageSize()); Assert.assertEquals(Constants.DEFAULT_SOCKET_PATH, grpcConfig.getSocketPath()); + Assert.assertEquals(Constants.DEFAULT_PORT, grpcConfig.getPort()); + Assert.assertTrue(grpcConfig.isLocal()); } @Test @@ -22,15 +23,20 @@ public void testNewBuilder() { String socketPath = "test-socket-path"; int maxMessageSize = 2000; String infoFilePath = "test-info-file-path"; + int port = 8001; GRPCConfig grpcConfig = GRPCConfig.newBuilder() .socketPath(socketPath) .maxMessageSize(maxMessageSize) .infoFilePath(infoFilePath) + .port(port) + .isLocal(false) .build(); Assert.assertNotNull(grpcConfig); Assert.assertEquals(socketPath, grpcConfig.getSocketPath()); Assert.assertEquals(maxMessageSize, grpcConfig.getMaxMessageSize()); Assert.assertEquals(infoFilePath, grpcConfig.getInfoFilePath()); + Assert.assertEquals(port, grpcConfig.getPort()); + Assert.assertFalse(grpcConfig.isLocal()); } } diff --git a/src/test/java/io/numaproj/numaflow/reducer/GRPCConfigTest.java b/src/test/java/io/numaproj/numaflow/reducer/GRPCConfigTest.java index 4633469d..9c3558f4 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/GRPCConfigTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/GRPCConfigTest.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.reducer; -import io.numaproj.numaflow.info.ServerInfoAccessor; import org.junit.Assert; import org.junit.Test; @@ -15,6 +14,8 @@ public void testDefaultGrpcConfig() { grpcConfig.getInfoFilePath()); Assert.assertEquals(Constants.DEFAULT_MESSAGE_SIZE, grpcConfig.getMaxMessageSize()); Assert.assertEquals(Constants.DEFAULT_SOCKET_PATH, grpcConfig.getSocketPath()); + Assert.assertEquals(Constants.DEFAULT_PORT, grpcConfig.getPort()); + Assert.assertTrue(grpcConfig.isLocal()); } @Test @@ -22,14 +23,19 @@ public void testNewBuilder() { String socketPath = "test-socket-path"; int maxMessageSize = 2000; String infoFilePath = "test-info-file-path"; + int port = 8001; GRPCConfig grpcConfig = GRPCConfig.newBuilder() .socketPath(socketPath) .maxMessageSize(maxMessageSize) .infoFilePath(infoFilePath) + .port(port) + .isLocal(false) .build(); Assert.assertNotNull(grpcConfig); Assert.assertEquals(socketPath, grpcConfig.getSocketPath()); Assert.assertEquals(maxMessageSize, grpcConfig.getMaxMessageSize()); Assert.assertEquals(infoFilePath, grpcConfig.getInfoFilePath()); + Assert.assertEquals(port, grpcConfig.getPort()); + Assert.assertFalse(grpcConfig.isLocal()); } } diff --git a/src/test/java/io/numaproj/numaflow/reducer/ReduceErrTestFactory.java b/src/test/java/io/numaproj/numaflow/reducer/ReduceErrTestFactory.java index 5950b612..00b939f0 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ReduceErrTestFactory.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ReduceErrTestFactory.java @@ -10,7 +10,7 @@ public ReduceTestFn createReducer() { } public static class ReduceTestFn extends Reducer { - private int sum = 0; + private final int sum = 0; @Override public void addMessage(String[] keys, Datum datum, Metadata md) { diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/GRPCConfigTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/GRPCConfigTest.java index ae908b69..9065abb8 100644 --- a/src/test/java/io/numaproj/numaflow/reducestreamer/GRPCConfigTest.java +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/GRPCConfigTest.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.reducestreamer; -import io.numaproj.numaflow.info.ServerInfoAccessor; import org.junit.Assert; import org.junit.Test; @@ -13,12 +12,10 @@ public void testDefaultGrpcConfig() { Assert.assertEquals( Constants.DEFAULT_SERVER_INFO_FILE_PATH, grpcConfig.getInfoFilePath()); - Assert.assertEquals( - io.numaproj.numaflow.reducestreamer.Constants.DEFAULT_MESSAGE_SIZE, - grpcConfig.getMaxMessageSize()); - Assert.assertEquals( - io.numaproj.numaflow.reducestreamer.Constants.DEFAULT_SOCKET_PATH, - grpcConfig.getSocketPath()); + Assert.assertEquals(Constants.DEFAULT_MESSAGE_SIZE, grpcConfig.getMaxMessageSize()); + Assert.assertEquals(Constants.DEFAULT_SOCKET_PATH, grpcConfig.getSocketPath()); + Assert.assertEquals(Constants.DEFAULT_PORT, grpcConfig.getPort()); + Assert.assertTrue(grpcConfig.isLocal()); } @Test @@ -26,14 +23,19 @@ public void testNewBuilder() { String socketPath = "test-socket-path"; int maxMessageSize = 2000; String infoFilePath = "test-info-file-path"; + int port = 8001; GRPCConfig grpcConfig = GRPCConfig.newBuilder() .socketPath(socketPath) .maxMessageSize(maxMessageSize) .infoFilePath(infoFilePath) + .port(port) + .isLocal(false) .build(); Assert.assertNotNull(grpcConfig); Assert.assertEquals(socketPath, grpcConfig.getSocketPath()); Assert.assertEquals(maxMessageSize, grpcConfig.getMaxMessageSize()); Assert.assertEquals(infoFilePath, grpcConfig.getInfoFilePath()); + Assert.assertEquals(port, grpcConfig.getPort()); + Assert.assertFalse(grpcConfig.isLocal()); } } diff --git a/src/test/java/io/numaproj/numaflow/sessionreducer/GRPCConfigTest.java b/src/test/java/io/numaproj/numaflow/sessionreducer/GRPCConfigTest.java index 9cf2da48..8433083a 100644 --- a/src/test/java/io/numaproj/numaflow/sessionreducer/GRPCConfigTest.java +++ b/src/test/java/io/numaproj/numaflow/sessionreducer/GRPCConfigTest.java @@ -25,14 +25,19 @@ public void testNewBuilder() { String socketPath = "test-socket-path"; int maxMessageSize = 2000; String infoFilePath = "test-info-file-path"; + int port = 8001; GRPCConfig grpcConfig = GRPCConfig.newBuilder() .socketPath(socketPath) .maxMessageSize(maxMessageSize) .infoFilePath(infoFilePath) + .port(port) + .isLocal(false) .build(); Assert.assertNotNull(grpcConfig); Assert.assertEquals(socketPath, grpcConfig.getSocketPath()); Assert.assertEquals(maxMessageSize, grpcConfig.getMaxMessageSize()); Assert.assertEquals(infoFilePath, grpcConfig.getInfoFilePath()); + Assert.assertEquals(port, grpcConfig.getPort()); + Assert.assertFalse(grpcConfig.isLocal()); } } diff --git a/src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java b/src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java index 55179598..32bb0139 100644 --- a/src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java +++ b/src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java @@ -37,7 +37,11 @@ public void testWriteServerInfo() throws Exception { @Test public void testCreateServerBuilder() { - ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder("socketPath", 1000); + ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( + "socketPath", + 1000, + false, + 50051); Assert.assertNotNull(serverBuilder); } diff --git a/src/test/java/io/numaproj/numaflow/sideinput/GRPCConfigTest.java b/src/test/java/io/numaproj/numaflow/sideinput/GRPCConfigTest.java index 06f9c71c..560e5897 100644 --- a/src/test/java/io/numaproj/numaflow/sideinput/GRPCConfigTest.java +++ b/src/test/java/io/numaproj/numaflow/sideinput/GRPCConfigTest.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.sideinput; -import io.numaproj.numaflow.info.ServerInfoAccessor; import org.junit.Assert; import org.junit.Test; @@ -15,18 +14,28 @@ public void testDefaultGrpcConfig() { grpcConfig.getInfoFilePath()); Assert.assertEquals(Constants.DEFAULT_MESSAGE_SIZE, grpcConfig.getMaxMessageSize()); Assert.assertEquals(Constants.DEFAULT_SOCKET_PATH, grpcConfig.getSocketPath()); + Assert.assertEquals(Constants.DEFAULT_PORT, grpcConfig.getPort()); + Assert.assertTrue(grpcConfig.isLocal()); } @Test public void testNewBuilder() { + String serverInfoFile = "test-server-info-path"; String socketPath = "test-socket-path"; int maxMessageSize = 2000; + int port = 8001; GRPCConfig grpcConfig = GRPCConfig.newBuilder() .socketPath(socketPath) .maxMessageSize(maxMessageSize) + .infoFilePath(serverInfoFile) + .port(port) + .isLocal(false) .build(); Assert.assertNotNull(grpcConfig); Assert.assertEquals(socketPath, grpcConfig.getSocketPath()); Assert.assertEquals(maxMessageSize, grpcConfig.getMaxMessageSize()); + Assert.assertEquals(serverInfoFile, grpcConfig.getInfoFilePath()); + Assert.assertEquals(port, grpcConfig.getPort()); + Assert.assertFalse(grpcConfig.isLocal()); } } diff --git a/src/test/java/io/numaproj/numaflow/sinker/GRPCConfigTest.java b/src/test/java/io/numaproj/numaflow/sinker/GRPCConfigTest.java index d31f3ebb..e8f30c84 100644 --- a/src/test/java/io/numaproj/numaflow/sinker/GRPCConfigTest.java +++ b/src/test/java/io/numaproj/numaflow/sinker/GRPCConfigTest.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.sinker; -import io.numaproj.numaflow.info.ServerInfoAccessor; import org.junit.Assert; import org.junit.Test; @@ -15,6 +14,8 @@ public void testDefaultGrpcConfig() { grpcConfig.getInfoFilePath()); Assert.assertEquals(Constants.DEFAULT_MESSAGE_SIZE, grpcConfig.getMaxMessageSize()); Assert.assertEquals(Constants.DEFAULT_SOCKET_PATH, grpcConfig.getSocketPath()); + Assert.assertEquals(Constants.DEFAULT_PORT, grpcConfig.getPort()); + Assert.assertTrue(grpcConfig.isLocal()); } @Test @@ -22,14 +23,19 @@ public void testNewBuilder() { String socketPath = "test-socket-path"; int maxMessageSize = 2000; String infoFilePath = "test-info-file-path"; + int port = 8001; GRPCConfig grpcConfig = GRPCConfig.newBuilder() .socketPath(socketPath) .maxMessageSize(maxMessageSize) .infoFilePath(infoFilePath) + .port(port) + .isLocal(false) .build(); Assert.assertNotNull(grpcConfig); Assert.assertEquals(socketPath, grpcConfig.getSocketPath()); Assert.assertEquals(maxMessageSize, grpcConfig.getMaxMessageSize()); Assert.assertEquals(infoFilePath, grpcConfig.getInfoFilePath()); + Assert.assertEquals(port, grpcConfig.getPort()); + Assert.assertFalse(grpcConfig.isLocal()); } } diff --git a/src/test/java/io/numaproj/numaflow/sourcer/GRPCConfigTest.java b/src/test/java/io/numaproj/numaflow/sourcer/GRPCConfigTest.java index cf20efaa..5b2ff1a8 100644 --- a/src/test/java/io/numaproj/numaflow/sourcer/GRPCConfigTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcer/GRPCConfigTest.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.sourcer; -import io.numaproj.numaflow.info.ServerInfoAccessor; import org.junit.Assert; import org.junit.Test; @@ -15,6 +14,8 @@ public void testDefaultGrpcConfig() { grpcConfig.getInfoFilePath()); Assert.assertEquals(Constants.DEFAULT_MESSAGE_SIZE, grpcConfig.getMaxMessageSize()); Assert.assertEquals(Constants.DEFAULT_SOCKET_PATH, grpcConfig.getSocketPath()); + Assert.assertEquals(Constants.DEFAULT_PORT, grpcConfig.getPort()); + Assert.assertTrue(grpcConfig.isLocal()); } @Test @@ -22,15 +23,20 @@ public void testNewBuilder() { String socketPath = "test-socket-path"; int maxMessageSize = 2000; String infoFilePath = "test-info-file-path"; + int port = 8001; GRPCConfig grpcConfig = GRPCConfig.newBuilder() .socketPath(socketPath) .maxMessageSize(maxMessageSize) .infoFilePath(infoFilePath) + .port(port) + .isLocal(false) .build(); Assert.assertNotNull(grpcConfig); Assert.assertEquals(socketPath, grpcConfig.getSocketPath()); Assert.assertEquals(maxMessageSize, grpcConfig.getMaxMessageSize()); Assert.assertEquals(infoFilePath, grpcConfig.getInfoFilePath()); + Assert.assertEquals(port, grpcConfig.getPort()); + Assert.assertFalse(grpcConfig.isLocal()); } } diff --git a/src/test/java/io/numaproj/numaflow/sourcetransformer/GRPCConfigTest.java b/src/test/java/io/numaproj/numaflow/sourcetransformer/GRPCConfigTest.java index 55fa83c2..d999bae2 100644 --- a/src/test/java/io/numaproj/numaflow/sourcetransformer/GRPCConfigTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcetransformer/GRPCConfigTest.java @@ -1,6 +1,5 @@ package io.numaproj.numaflow.sourcetransformer; -import io.numaproj.numaflow.info.ServerInfoAccessor; import org.junit.Assert; import org.junit.Test; @@ -15,6 +14,8 @@ public void testDefaultGrpcConfig() { grpcConfig.getInfoFilePath()); Assert.assertEquals(Constants.DEFAULT_MESSAGE_SIZE, grpcConfig.getMaxMessageSize()); Assert.assertEquals(Constants.DEFAULT_SOCKET_PATH, grpcConfig.getSocketPath()); + Assert.assertEquals(Constants.DEFAULT_PORT, grpcConfig.getPort()); + Assert.assertTrue(grpcConfig.isLocal()); } @Test @@ -22,14 +23,19 @@ public void testNewBuilder() { String socketPath = "test-socket-path"; int maxMessageSize = 2000; String infoFilePath = "test-info-file-path"; + int port = 8001; GRPCConfig grpcConfig = GRPCConfig.newBuilder() .socketPath(socketPath) .maxMessageSize(maxMessageSize) .infoFilePath(infoFilePath) + .port(port) + .isLocal(false) .build(); Assert.assertNotNull(grpcConfig); Assert.assertEquals(socketPath, grpcConfig.getSocketPath()); Assert.assertEquals(maxMessageSize, grpcConfig.getMaxMessageSize()); Assert.assertEquals(infoFilePath, grpcConfig.getInfoFilePath()); + Assert.assertEquals(port, grpcConfig.getPort()); + Assert.assertFalse(grpcConfig.isLocal()); } }