Skip to content

Commit

Permalink
feat: Add Map batch (#132)
Browse files Browse the repository at this point in the history
Signed-off-by: rashar <[email protected]>
Signed-off-by: RohanAshar <[email protected]>
Co-authored-by: rashar <[email protected]>
Co-authored-by: Keran Yang <[email protected]>
  • Loading branch information
3 people committed Aug 1, 2024
1 parent 3c2ec6f commit 7997724
Show file tree
Hide file tree
Showing 29 changed files with 1,403 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
matrix:
execution_ids: [
"mapt-event-time-filter-function", "flat-map-stream", "map-flatmap",
"batch-map-example", "mapt-event-time-filter-function", "flat-map-stream", "map-flatmap",
"even-odd", "simple-sink", "reduce-sum", "reduce-stream-sum",
"map-forward-message", "reduce-counter", "sideinput-example",
"udf-sideinput-example", "source-simple-source", "session-reduce-count"
Expand Down
19 changes: 19 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,25 @@
<artifactId>jib-maven-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>batch-map-example</id>
<phase>package</phase>
<goals>
<goal>dockerBuild</goal>
</goals>
<configuration>
<container>
<mainClass>
io.numaproj.numaflow.examples.batchmap.flatmap.BatchFlatMap
</mainClass>
</container>
<to>
<image>
numaflow-java-examples/batch-map-flatmap:${docker.tag}
</image>
</to>
</configuration>
</execution>
<execution>
<id>mapt-event-time-filter-function</id>
<phase>package</phase>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.numaproj.numaflow.examples.batchmap.flatmap;

import io.numaproj.numaflow.batchmapper.BatchMapper;
import io.numaproj.numaflow.batchmapper.BatchResponse;
import io.numaproj.numaflow.batchmapper.BatchResponses;
import io.numaproj.numaflow.batchmapper.Datum;
import io.numaproj.numaflow.batchmapper.DatumIterator;
import io.numaproj.numaflow.batchmapper.Message;
import io.numaproj.numaflow.batchmapper.Server;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class BatchFlatMap extends BatchMapper {
@Override
public BatchResponses processMessage(DatumIterator datumStream) {
BatchResponses batchResponses = new BatchResponses();
while (true) {
Datum datum = null;
try {
datum = datumStream.next();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
continue;
}
// null means the iterator is closed so we are good to break the loop.
if (datum == null) {
break;
}
try {
String msg = new String(datum.getValue());
String[] strs = msg.split(",");
BatchResponse batchResponse = new BatchResponse(datum.getId());
for (String str : strs) {
batchResponse.append(new Message(str.getBytes()));
}
batchResponses.append(batchResponse);
} catch (Exception e) {
batchResponses.append(new BatchResponse(datum.getId()));
}
}
return batchResponses;
}

public static void main(String[] args) throws Exception {
Server server = new Server(new BatchFlatMap());

// Start the server
server.start();

// wait for the server to shutdown
server.awaitTermination();
}
}
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@
</rules>
<excludes>
<exclude>io/numaproj/numaflow/examples/*</exclude>
<exclude>io/numaproj/numaflow/batchmap/v1/*</exclude>
<exclude>io/numaproj/numaflow/mapstream/v1/*</exclude>
<exclude>io/numaproj/numaflow/reduce/v1/*</exclude>
<exclude>io/numaproj/numaflow/sessionreduce/v1/*</exclude>
Expand Down Expand Up @@ -340,6 +341,7 @@
<excludePackageNames>
io.numaproj.numaflow.info,
io.numaproj.numaflow.map.v1,
io.numaproj.numaflow.batchmap.v1,
io.numaproj.numaflow.reduce.v1,
io.numaproj.numaflow.mapstream.v1,
io.numaproj.numaflow.sourcetransformer.v1,
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/io/numaproj/numaflow/batchmapper/BatchMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.numaproj.numaflow.batchmapper;

/**
* BatchMapper exposes method for performing batch map operation.
* Implementations should override the processMessage method
* which will be used for processing the input messages
*/

public abstract class BatchMapper {
/**
* method which will be used for processing messages. Please implement the interface to ensure that each message generates a corresponding BatchResponse object with a matching ID.
*
* @param datumStream current message to be processed
*
* @return BatchResponses which contains output from batch map
*/
public abstract BatchResponses processMessage(DatumIterator datumStream);
}
45 changes: 45 additions & 0 deletions src/main/java/io/numaproj/numaflow/batchmapper/BatchResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.numaproj.numaflow.batchmapper;

import lombok.Getter;

import java.util.ArrayList;
import java.util.List;

/**
* BatchResponse is used to collect and manage a batch of Message objects.
*/
public class BatchResponse {
@Getter
private final String id;
private final List<Message> messages;

/**
* Constructs a BatchResponse with a specified ID.
*
* @param id the unique identifier for this batch response
*/
public BatchResponse(String id) {
this.id = id;
this.messages = new ArrayList<>();
}

/**
* Appends a Message to the batch.
*
* @param msg the Message to be added to the batch
* @return the current BatchResponse instance for method chaining
*/
public BatchResponse append(Message msg) {
this.messages.add(msg);
return this;
}

/**
* Retrieves the list of Messages in the batch.
*
* @return the list of Messages
*/
public List<Message> getItems() {
return messages;
}
}
39 changes: 39 additions & 0 deletions src/main/java/io/numaproj/numaflow/batchmapper/BatchResponses.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.numaproj.numaflow.batchmapper;

import java.util.ArrayList;
import java.util.List;

/**
* BatchResponses is used to send a response from the batch map functions.
* It contains a list of BatchResponse objects.
*/
public class BatchResponses {
private final List<BatchResponse> batchResponses;

/**
* Constructs an empty BatchResponses object.
*/
public BatchResponses() {
this.batchResponses = new ArrayList<>();
}

/**
* Appends a BatchResponse to the list of batchResponses.
*
* @param batchResponse the BatchResponse to be added
* @return the current BatchResponses object
*/
public BatchResponses append(BatchResponse batchResponse) {
this.batchResponses.add(batchResponse);
return this;
}

/**
* Retrieves the list of BatchResponse objects.
*
* @return the list of BatchResponse objects
*/
public List<BatchResponse> getItems() {
return batchResponses;
}
}
20 changes: 20 additions & 0 deletions src/main/java/io/numaproj/numaflow/batchmapper/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.numaproj.numaflow.batchmapper;

class Constants {
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;

public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/batchmap.sock";

public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/mapper-server-info";

public static final int DEFAULT_PORT = 50051;

public static final String DEFAULT_HOST = "localhost";

public static final String SUCCESS = "SUCCESS";

public static final String MAP_MODE_KEY = "MAP_MODE";

public static final String MAP_MODE = "batch-map";
}

51 changes: 51 additions & 0 deletions src/main/java/io/numaproj/numaflow/batchmapper/Datum.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.numaproj.numaflow.batchmapper;

import java.time.Instant;
import java.util.Map;

/**
* Datum contains methods to get the payload information.
*/
public interface Datum {
/**
* method to get the payload keys
*
* @return returns the datum keys.
*/
String[] getKeys();

/**
* method to get the payload value
*
* @return returns the payload value in byte array
*/
byte[] getValue();

/**
* method to get the event time of the payload
*
* @return returns the event time of the payload
*/
Instant getEventTime();

/**
* method to get the watermark information
*
* @return returns the watermark
*/
Instant getWatermark();

/**
* method to get the ID for the Payload
*
* @return returns the ID
*/
String getId();

/**
* method to get the headers information of the payload
*
* @return returns the headers in the form of key value pair
*/
Map<String, String> getHeaders();
}
20 changes: 20 additions & 0 deletions src/main/java/io/numaproj/numaflow/batchmapper/DatumIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.numaproj.numaflow.batchmapper;


/**
* An iterator over a collection of {@link Datum} elements.
* Passed to {@link BatchMapper#processMessage(DatumIterator)} method.
*/
public interface DatumIterator {

/**
* Returns the next element in the iterator
* This method blocks until an element becomes available in the queue.
* When EOF_DATUM is received, this method will return null and the iterator will be closed.
*
* @return the next element in the iterator, null if EOF_DATUM is received or the iterator is already closed
*
* @throws InterruptedException if the thread is interrupted while waiting for the next element
*/
Datum next() throws InterruptedException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.numaproj.numaflow.batchmapper;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A thread-safe implementation of {@link DatumIterator}, backed by a blocking queue.
*/
@Slf4j
class DatumIteratorImpl implements DatumIterator {
private final BlockingQueue<Datum> blockingQueue = new LinkedBlockingDeque<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicInteger counter = new AtomicInteger(0); // Keep Track of number of requests

@Override
public Datum next() throws InterruptedException {
// if the iterator is closed, return null
if (closed.get()) {
return null;
}
Datum datum = blockingQueue.take();
// if EOF is received, close the iterator and return null
if (datum == HandlerDatum.EOF_DATUM) {
closed.set(true);
return null;
}
return datum;
}

// blocking call, waits until the write operation is successful
public void writeMessage(Datum datum) throws InterruptedException {
blockingQueue.put(datum);
counter.incrementAndGet();
}

public int getCount() {
return counter.get();
}

}
37 changes: 37 additions & 0 deletions src/main/java/io/numaproj/numaflow/batchmapper/GRPCConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.numaproj.numaflow.batchmapper;

import lombok.Builder;
import lombok.Getter;

/**
* GRPCConfig is used to provide configurations for map gRPC server.
*/
@Getter
@Builder(builderMethodName = "newBuilder")
public class GRPCConfig {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;

@Builder.Default
private int maxMessageSize = Constants.DEFAULT_MESSAGE_SIZE;

@Builder.Default
private String infoFilePath = Constants.DEFAULT_SERVER_INFO_FILE_PATH;

@Builder.Default
private int port = Constants.DEFAULT_PORT;

private boolean isLocal;

/**
* Static method to create default GRPCConfig.
*/
static GRPCConfig defaultGrpcConfig() {
return GRPCConfig.newBuilder()
.infoFilePath(Constants.DEFAULT_SERVER_INFO_FILE_PATH)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.isLocal(System.getenv("NUMAFLOW_POD")
== null) // if NUMAFLOW_POD is not set, then we are not running using numaflow
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
}
}
Loading

0 comments on commit 7997724

Please sign in to comment.