Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: send one EOF response only for reduce stream #102

Merged
merged 7 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/build-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ on:

jobs:
docker_publish:
# run it only on numaproj/numaflow-java repository
# forked repositories normally don't have the proper permission setup.
if: ${{ github.repository }} == "numaproj/numaflow-java"
name: Build, Tag, and Push Image
runs-on: ubuntu-latest

Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Java SDK for Numaflow

[![Build](https://github.com/numaproj/numaflow-java/actions/workflows/ci.yaml/badge.svg?branch=main)](https://github.com/numaproj/numaflow-java/actions/workflows/ci.yaml)
[![Build](https://github.com/numaproj/numaflow-java/actions/workflows/run-tests.yaml/badge.svg?branch=main)](https://github.com/numaproj/numaflow-java/actions/workflows/run-tests.yaml)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)
[![Release Version](https://img.shields.io/github/v/release/numaproj/numaflow-java?label=numaflow-java)](https://github.com/numaproj/numaflow-java/releases/latest)
[![Maven Central](https://img.shields.io/maven-central/v/io.numaproj.numaflow/numaflow-java.svg?label=Maven%20Central)](https://central.sonatype.com/search?q=numaflow+java&smo=true)
Expand Down Expand Up @@ -57,6 +57,8 @@ mvn clean install
* [MapStream](examples/src/main/java/io/numaproj/numaflow/examples/mapstream/flatmapstream)
* [Map](examples/src/main/java/io/numaproj/numaflow/examples/map)
* [Reduce](examples/src/main/java/io/numaproj/numaflow/examples/reduce)
* [ReduceStream](examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer)
* [SessionReduce](examples/src/main/java/io/numaproj/numaflow/examples/reducesession)

* **User Defined Sink(UDSink)**
* [Sink](examples/src/main/java/io/numaproj/numaflow/examples/sink/simple)
Expand Down
6 changes: 3 additions & 3 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ you can take the desired image and test it in a pipeline

If you want to build and push all the example images at once, you can run:
```shell
./hack/update_examples -bp -t <tag>
./hack/update_examples.sh -bp -t <tag>
```
The default tag is `stable`, but it is recommended you specify your own for testing purposes, as the Github Actions CI uses the `stable` tag.
This consistent tag name is used so that the tags in the [E2E test pipelines](https://github.com/numaproj/numaflow/tree/main/test) do not need to be
updated each time an SDK change is made.
updated each time an SDK change is made.

You can alternatively build and push a specific example image by running the following:
```shell
./hack/update_examples -bpe <example-execution-id> -t <tag>
./hack/update_examples.sh -bpe <example-execution-id> -t <tag>
```
Both `-bpe` and `-bp` first build a local image with the naming convention
`numaflow-java-examples/<example-execution-id>:<tag>`, which then gets pushed as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,17 @@ private void responseListener(ActorResponse actorResponse) {
if there are no entries in the map, that means processing is
done we can close the stream.
*/
responseObserver.onNext(actorResponse.getResponse());
if (actorResponse.getResponse().getEOF()) {
actorsMap.remove(actorResponse.getUniqueIdentifier());
if (actorsMap.isEmpty()) {
// only send the last EOF to the response gRPC output stream.
responseObserver.onNext(actorResponse.getResponse());
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
}
} else {
// send non-EOF responses to the output stream.
responseObserver.onNext(actorResponse.getResponse());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,12 @@

/**
* The actor response holds the final EOF response for a particular key set.
* <p>
* The isLast attribute indicates whether the response is globally the last one to be sent to
* the output gRPC stream, if set to true, it means the response is the very last response among
* all key sets. When output stream actor receives an isLast response, it sends the response and immediately
* closes the output stream.
*/
@Getter
@Setter
@AllArgsConstructor
class ActorResponse {
ReduceOuterClass.ReduceResponse response;
boolean isLast;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Receive createReceive() {
}

private void handleResponse(ActorResponse actorResponse) {
if (actorResponse.isLast()) {
if (actorResponse.getResponse().getEOF()) {
// send the very last response.
responseObserver.onNext(actorResponse.getResponse());
// close the output stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ private ActorResponse buildResponse(Message message) {
.addAllTags(
message.getTags() == null ? new ArrayList<>():List.of(message.getTags()))
.build());
return new ActorResponse(responseBuilder.build(), false);
return new ActorResponse(responseBuilder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ private ActorResponse buildEOFResponse() {
.newBuilder()
.addAllKeys(List.of(this.keys))
.build());
return new ActorResponse(responseBuilder.build(), false);
return new ActorResponse(responseBuilder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public Receive createReceive() {
.create()
.match(ActorRequest.class, this::invokeActor)
.match(String.class, this::sendEOF)
.match(ActorResponse.class, this::handleActorResponse)
.match(ActorResponse.class, this::handleActorEOFResponse)
.build();
}

Expand Down Expand Up @@ -114,16 +114,15 @@ private void sendEOF(String EOF) {
}
}

private void handleActorResponse(ActorResponse actorResponse) {
private void handleActorEOFResponse(ActorResponse actorResponse) {
// when the supervisor receives an actor response, it means the corresponding
// reduce streamer actor has finished its job.
// we remove the entry from the actors map.
actorsMap.remove(actorResponse.getActorUniqueIdentifier());
if (actorsMap.isEmpty()) {
// since the actors map is empty, this particular actor response is the last response to forward to output gRPC stream.
actorResponse.setLast(true);
this.outputActor.tell(actorResponse, getSelf());
} else {
// for reduce streamer, we only send to output stream one single EOF response, which is the last one.
// we don't care about per-key-set EOFs.
this.outputActor.tell(actorResponse, getSelf());
}
}
Expand Down
34 changes: 17 additions & 17 deletions src/test/java/io/numaproj/numaflow/reducer/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.junit.Rule;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;

import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY;
Expand Down Expand Up @@ -111,7 +110,7 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
.setPayload(ReduceOuterClass.ReduceRequest.Payload
.newBuilder()
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.addAllKeys(Arrays.asList(reduceKey))
.addAllKeys(List.of(reduceKey))
.build())
.build();
inputStreamObserver.onNext(request);
Expand All @@ -123,31 +122,28 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
// sum of first 10 numbers 1 to 10 -> 55
ByteString expectedValue = ByteString.copyFromUtf8(String.valueOf(55));
while (!outputStreamObserver.completed.get()) ;
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();

// Expect 2 responses, one containing the aggregated data and the other indicating EOF.
assertEquals(2, outputStreamObserver.resultDatum.get().size());
assertEquals(2, result.size());
assertEquals(
expectedKeys,
outputStreamObserver.resultDatum
.get()
expectedKeys, result
.get(0)
.getResult()
.getKeysList()
.toArray(new String[0]));
assertEquals(
expectedValue,
outputStreamObserver.resultDatum
.get()
expectedValue, result
.get(0)
.getResult()
.getValue());
assertTrue(outputStreamObserver.resultDatum.get().get(1).getEOF());
assertTrue(result.get(1).getEOF());
}

@Test
public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then_requestsGetAggregatedSeparately() {
String reduceKey = "reduce-key";
int keyCount = 3;
int keyCount = 10;

Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
Expand All @@ -167,7 +163,7 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then
ReduceOuterClass.ReduceRequest request = ReduceOuterClass.ReduceRequest
.newBuilder()
.setPayload(ReduceOuterClass.ReduceRequest.Payload.newBuilder()
.addAllKeys(Arrays.asList(reduceKey + j))
.addAllKeys(List.of(reduceKey + j))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build();
Expand All @@ -182,10 +178,14 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then

while (!outputStreamObserver.completed.get()) ;
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();
// the outputStreamObserver should have observed 2*keyCount responses, because for each key set, one response for the aggregated result, the other for EOF.
assertEquals(keyCount * 2, result.size());
result.forEach(response -> {
assertTrue(response.getResult().getValue().equals(expectedValue) || response.getEOF());
});

// the outputStreamObserver should have observed keyCount+ 1 responses, one with real output sum data per key, one as the final single EOF response.
assertEquals(keyCount + 1, result.size());
for (int i = 0; i < keyCount; i++) {
ReduceOuterClass.ReduceResponse response = result.get(i);
assertEquals(response.getResult().getValue(), expectedValue);
}
// verify the last one is the EOF.
assertTrue(result.get(keyCount).getEOF());
}
}
33 changes: 17 additions & 16 deletions src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

Expand All @@ -22,7 +23,7 @@ public class SupervisorActorTest {
@Test
public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-1");
CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
CompletableFuture<Void> completableFuture = new CompletableFuture<>();

ActorRef shutdownActor = actorSystem
.actorOf(ReduceShutdownActor
Expand Down Expand Up @@ -53,16 +54,15 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then

try {
completableFuture.get();
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();
// the observer should receive 2 messages, one is the aggregated result, the other is the EOF response.
assertEquals(2, outputStreamObserver.resultDatum.get().size());
assertEquals("10", outputStreamObserver.resultDatum
.get()
assertEquals(2, result.size());
assertEquals("10", result
.get(0)
.getResult()
.getValue()
.toStringUtf8());
assertEquals(true, outputStreamObserver.resultDatum
.get()
assertTrue(result
.get(1)
.getEOF());
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -73,7 +73,8 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
@Test
public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcasts_then_multipleReducerActorsHandleKeySetsSeparately() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-2");
CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
int keyCount = 10;

ActorRef shutdownActor = actorSystem
.actorOf(ReduceShutdownActor
Expand All @@ -92,7 +93,7 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
outputStreamObserver)
);

for (int i = 1; i <= 10; i++) {
for (int i = 1; i <= keyCount; i++) {
ActorRequest reduceRequest = new ActorRequest(ReduceOuterClass.ReduceRequest
.newBuilder()
.setPayload(ReduceOuterClass.ReduceRequest.Payload
Expand All @@ -108,15 +109,15 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
supervisorActor.tell(Constants.EOF, ActorRef.noSender());
try {
completableFuture.get();
// each reduce request generates two reduce responses, one containing the data and the other one indicating EOF.
assertEquals(20, outputStreamObserver.resultDatum.get().size());
for (int i = 0; i < 20; i++) {
ReduceOuterClass.ReduceResponse response = outputStreamObserver.resultDatum
.get()
.get(i);
assertTrue(response.getResult().getValue().toStringUtf8().equals("1")
|| response.getEOF());
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();
// expect keyCount number of responses with data, plus one final EOF response.
assertEquals(keyCount + 1, result.size());
for (int i = 0; i < keyCount; i++) {
ReduceOuterClass.ReduceResponse response = result.get(i);
assertEquals("1", response.getResult().getValue().toStringUtf8());
}
// verify the last one is the EOF.
assertTrue(result.get(keyCount).getEOF());
} catch (InterruptedException | ExecutionException e) {
fail("Expected the future to complete without exception");
}
Expand Down
21 changes: 11 additions & 10 deletions src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
.setPayload(ReduceOuterClass.ReduceRequest.Payload
.newBuilder()
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.addAllKeys(Arrays.asList(reduceKey))
.addAllKeys(List.of(reduceKey))
.build())
.build();
inputStreamObserver.onNext(request);
Expand Down Expand Up @@ -169,7 +169,7 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
@Test
public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then_requestsGetAggregatedSeparately() {
String reduceKey = "reduce-key";
int keyCount = 3;
int keyCount = 10;

Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
Expand All @@ -189,7 +189,7 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then
ReduceOuterClass.ReduceRequest request = ReduceOuterClass.ReduceRequest
.newBuilder()
.setPayload(ReduceOuterClass.ReduceRequest.Payload.newBuilder()
.addAllKeys(Arrays.asList(reduceKey + j))
.addAllKeys(List.of(reduceKey + j))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build();
Expand All @@ -206,14 +206,15 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then

while (!outputStreamObserver.completed.get()) ;
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();
// the outputStreamObserver should have observed 3*keyCount responses, 2 with real output sum data, one as EOF.
assertEquals(keyCount * 3, result.size());
result.forEach(response -> {
// the outputStreamObserver should have observed (keyCount * 2 + 1) responses, 2 with real output sum data per key, 1 as the final single EOF response.
assertEquals(keyCount * 2 + 1, result.size());
for (int i = 0; i < keyCount * 2; i++) {
ReduceOuterClass.ReduceResponse response = result.get(i);
assertTrue(response.getResult().getValue().equals(expectedFirstResponse) ||
response.getResult().getValue().equals(expectedSecondResponse)
|| response.getEOF());

});
response.getResult().getValue().equals(expectedSecondResponse));
}
// verify the last one is the EOF.
assertTrue(result.get(keyCount * 2).getEOF());
}

public static class ReduceStreamerTestFactory extends ReduceStreamerFactory<ServerTest.ReduceStreamerTestFactory.TestReduceStreamHandler> {
Expand Down
Loading
Loading