From 79ced9b9d7203f778cf53ed29f30a281e37dcbe9 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 18 Apr 2024 05:20:44 +0530 Subject: [PATCH] fix: pass headers to test kit server (#118) Signed-off-by: Yashash H L --- .../io/numaproj/numaflow/examples/sink/simple/SimpleSink.java | 2 +- .../java/io/numaproj/numaflow/examples/server/ServerTest.java | 2 ++ src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java | 2 ++ src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java | 2 ++ .../numaflow/sourcetransformer/SourceTransformerTestKit.java | 2 ++ 5 files changed, 9 insertions(+), 1 deletion(-) diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java b/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java index 479037d..db3ba1a 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple/SimpleSink.java @@ -43,7 +43,7 @@ public ResponseList processMessages(DatumIterator datumIterator) { } try { String msg = new String(datum.getValue()); - log.info("Received message: {}", msg); + log.info("Received message: {}, headers - {}", msg, datum.getHeaders()); responseListBuilder.addResponse(Response.responseOK(datum.getId())); } catch (Exception e) { responseListBuilder.addResponse(Response.responseFailure( diff --git a/examples/src/test/java/io/numaproj/numaflow/examples/server/ServerTest.java b/examples/src/test/java/io/numaproj/numaflow/examples/server/ServerTest.java index 78e86de..76be723 100644 --- a/examples/src/test/java/io/numaproj/numaflow/examples/server/ServerTest.java +++ b/examples/src/test/java/io/numaproj/numaflow/examples/server/ServerTest.java @@ -27,6 +27,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Map; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @Slf4j @@ -175,6 +176,7 @@ public void testSinkServerInvocation() { .builder() .id("id-" + i) .value(("value-" + i).getBytes()) + .headers(Map.of("test-key", "test-value")) .build()); } diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java b/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java index 1225979..51e8803 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java +++ b/src/main/java/io/numaproj/numaflow/mapper/MapperTestKit.java @@ -13,6 +13,7 @@ import java.time.Instant; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -149,6 +150,7 @@ public MessageList sendRequest(String[] keys, Datum data) { .setSeconds(data.getWatermark().getEpochSecond()) .setNanos(data.getWatermark().getNano()) .build()) + .putAllHeaders(data.getHeaders() == null ? new HashMap<>() : data.getHeaders()) .build(); try { diff --git a/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java b/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java index 1c3b7f2..081c1d3 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java +++ b/src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -164,6 +165,7 @@ public void onCompleted() { .build() : Timestamp.newBuilder() .setSeconds(datum.getWatermark().getEpochSecond()) .setNanos(datum.getWatermark().getNano()).build()) + .putAllHeaders(datum.getHeaders() == null ? new HashMap<>() : datum.getHeaders()) .build(); requestObserver.onNext(request); } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java index c9dda36..4ceb39a 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/SourceTransformerTestKit.java @@ -13,6 +13,7 @@ import java.time.Instant; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -158,6 +159,7 @@ public MessageList sendRequest(String[] keys, Datum data) { .setSeconds(data.getWatermark().getEpochSecond()) .setNanos(data.getWatermark().getNano()) .build()) + .putAllHeaders(data.getHeaders() == null ? new HashMap<>() : data.getHeaders()) .build(); try {