Skip to content

Commit

Permalink
feat: Test Kit for Servers (#111)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
Co-authored-by: Keran Yang <[email protected]>
  • Loading branch information
yhl25 and KeranYang committed Apr 15, 2024
1 parent ca13ac1 commit 0ac0102
Show file tree
Hide file tree
Showing 91 changed files with 2,288 additions and 366 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ Add this dependency to your project's build file:
compile "io.numaproj.numaflow:numaflow-java:0.7.0"
```

```
### Build

```bash
Expand Down
30 changes: 22 additions & 8 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<version>5.10.2</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand All @@ -59,7 +61,9 @@
</mainClass>
</container>
<to>
<image>numaflow-java-examples/mapt-event-time-filter-function:${docker.tag}</image>
<image>
numaflow-java-examples/mapt-event-time-filter-function:${docker.tag}
</image>
</to>
</configuration>
</execution>
Expand Down Expand Up @@ -160,7 +164,8 @@
</mainClass>
</container>
<to>
<image>numaflow-java-examples/reduce-stream-sum:${docker.tag}</image>
<image>numaflow-java-examples/reduce-stream-sum:${docker.tag}
</image>
</to>
</configuration>
</execution>
Expand All @@ -177,7 +182,8 @@
</mainClass>
</container>
<to>
<image>numaflow-java-examples/map-forward-message:${docker.tag}</image>
<image>numaflow-java-examples/map-forward-message:${docker.tag}
</image>
</to>
</configuration>
</execution>
Expand Down Expand Up @@ -211,7 +217,8 @@
</mainClass>
</container>
<to>
<image>numaflow-java-examples/sideinput-example:${docker.tag}</image>
<image>numaflow-java-examples/sideinput-example:${docker.tag}
</image>
</to>
</configuration>
</execution>
Expand All @@ -228,7 +235,8 @@
</mainClass>
</container>
<to>
<image>numaflow-java-examples/udf-sideinput-example:${docker.tag}</image>
<image>numaflow-java-examples/udf-sideinput-example:${docker.tag}
</image>
</to>
</configuration>
</execution>
Expand All @@ -245,7 +253,8 @@
</mainClass>
</container>
<to>
<image>numaflow-java-examples/source-simple-source:${docker.tag}</image>
<image>numaflow-java-examples/source-simple-source:${docker.tag}
</image>
</to>
</configuration>
</execution>
Expand All @@ -262,7 +271,8 @@
</mainClass>
</container>
<to>
<image>numaflow-java-examples/session-reduce-count:${docker.tag}</image>
<image>numaflow-java-examples/session-reduce-count:${docker.tag}
</image>
</to>
</configuration>
</execution>
Expand All @@ -271,6 +281,10 @@
</plugins>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
public class FlatMapFunction extends Mapper {

public static void main(String[] args) throws Exception {
new Server(new FlatMapFunction()).start();
Server server = new Server(new FlatMapFunction());

// Start the server
server.start();

// Wait for the server to shut down
server.awaitTermination();
}

public MessageList processMessage(String[] keys, Datum data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@

public class ForwardFunction extends Mapper {
public static void main(String[] args) throws Exception {
new Server(new ForwardFunction()).start();
Server server = new Server(new ForwardFunction());

// Start the server
server.start();

// Wait for the server to shut down
server.awaitTermination();
}

public MessageList processMessage(String[] keys, Datum data) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package io.numaproj.numaflow.examples.mapstream.flatmapstream;

import io.numaproj.numaflow.mapstreamer.Datum;
import io.numaproj.numaflow.mapstreamer.MapStreamer;
import io.numaproj.numaflow.mapstreamer.Message;
import io.numaproj.numaflow.mapstreamer.OutputObserver;
import io.numaproj.numaflow.mapstreamer.Server;
import io.numaproj.numaflow.mapstreamer.*;


/**
Expand All @@ -17,7 +13,13 @@
public class FlatMapStreamFunction extends MapStreamer {

public static void main(String[] args) throws Exception {
new Server(new FlatMapStreamFunction()).start();
Server server = new Server(new FlatMapStreamFunction());

// Start the server
server.start();

// wait for the server to shutdown
server.awaitTermination();
}

public void processMessage(String[] keys, Datum data, OutputObserver outputObserver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ public class CounterFactory extends ReducerFactory<CounterFactory.Counter> {
public static void main(String[] args) throws Exception {
log.info("counter udf was invoked");
Config config = new Config(1);
new Server(new CounterFactory(config)).start();
Server server = new Server(new CounterFactory(config));

// Start the server
server.start();

// wait for the server to shut down
server.awaitTermination();
}

@Override
Expand All @@ -42,8 +48,8 @@ public Counter(Config config) {

@Override
public void addMessage(String[] keys, Datum datum, Metadata md) {
// increment based on the value specified in the config
count += config.getIncrementBy();
// increment based on the value specified in the config
count += config.getIncrementBy();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ public class SumFactory extends ReducerFactory<SumFunction> {

public static void main(String[] args) throws Exception {
log.info("sum udf was invoked");
new Server(new SumFactory()).start();
Server server = new Server(new SumFactory());

// Start the server
server.start();

// wait for the server to shut down
server.awaitTermination();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ public class CountFactory extends SessionReducerFactory<CountFunction> {

public static void main(String[] args) throws Exception {
log.info("count udf was invoked");
new Server(new CountFactory()).start();
Server server = new Server(new CountFactory());

// Start the server
server.start();

// wait for the server to shut down
server.awaitTermination();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ public class SumFactory extends ReduceStreamerFactory<SumFunction> {

public static void main(String[] args) throws Exception {
log.info("sum udf was invoked");
new Server(new SumFactory()).start();
Server server = new Server(new SumFactory());

// Start the server
server.start();

// wait for the server to shut down
server.awaitTermination();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ public SimpleSideInput(Config config) {
}

public static void main(String[] args) throws Exception {
new Server(new SimpleSideInput(new Config("sampling", 0.5F))).start();
Server server = new Server(new SimpleSideInput(new Config("sampling", 0.5F)));

// Start the server
server.start();

// wait for the server to shut down
server.awaitTermination();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ public static void main(String[] args) throws Exception {
sideInputWatcher.startWatching();

// start the server
new Server(new SimpleMapWithSideInput(sideInputWatcher)).start();
Server server = new Server(new SimpleMapWithSideInput(sideInputWatcher));

// Start the server
server.start();

// wait for the server to shut down
server.awaitTermination();

// Stop watching for side input
sideInputWatcher.stopWatching();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
public class SimpleSink extends Sinker {

public static void main(String[] args) throws Exception {
new Server(new SimpleSink()).start();
Server server = new Server(new SimpleSink());

// Start the server
server.start();

// wait for the server to shut down
server.awaitTermination();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.numaproj.numaflow.examples.source.simple;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
import io.numaproj.numaflow.sourcer.AckRequest;
import io.numaproj.numaflow.sourcer.Message;
Expand Down Expand Up @@ -29,7 +28,13 @@ public class SimpleSource extends Sourcer {
private long readIndex = 0;

public static void main(String[] args) throws Exception {
new Server(new SimpleSource()).start();
Server server = new Server(new SimpleSource());

// Start the server
server.start();

// wait for the server to shut down
server.awaitTermination();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ public class EventTimeFilterFunction extends SourceTransformer {
private static final Instant januaryFirst2023 = Instant.ofEpochMilli(1672531200000L);

public static void main(String[] args) throws Exception {
new Server(new EventTimeFilterFunction())
.start();
Server server = new Server(new EventTimeFilterFunction());

// Start the server
server.start();

// wait for the server to shut down
server.awaitTermination();
}

public MessageList processMessage(String[] keys, Datum data) {
Expand Down
Loading

0 comments on commit 0ac0102

Please sign in to comment.