Skip to content

Commit

Permalink
feat: support headers for message (#98)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Mar 21, 2024
1 parent 68ffc5a commit 684e113
Show file tree
Hide file tree
Showing 31 changed files with 167 additions and 9 deletions.
8 changes: 8 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapper/Datum.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import java.time.Instant;
import java.util.Map;

/**
* Datum contains methods to get the payload information.
Expand All @@ -28,4 +29,11 @@ public interface Datum {
* @return returns the watermark
*/
public Instant getWatermark();

/**
* method to get the headers information of the payload
*
* @return returns the headers in the form of key value pair
*/
public Map<String, String> getHeaders();
}
7 changes: 7 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapper/HandlerDatum.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import lombok.AllArgsConstructor;

import java.time.Instant;
import java.util.Map;

@AllArgsConstructor
class HandlerDatum implements Datum {

private byte[] value;
private Instant watermark;
private Instant eventTime;
private Map<String, String> headers;


@Override
Expand All @@ -28,4 +30,9 @@ public Instant getEventTime() {
return this.eventTime;
}

@Override
public Map<String, String> getHeaders() {
return this.headers;
}

}
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public void mapFn(
request.getWatermark().getNanos()),
Instant.ofEpochSecond(
request.getEventTime().getSeconds(),
request.getEventTime().getNanos())
request.getEventTime().getNanos()),
request.getHeadersMap()
);

// process request
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapstreamer/Datum.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.mapstreamer;

import java.time.Instant;
import java.util.Map;

/**
* Datum contains methods to get the payload information.
Expand All @@ -27,4 +28,11 @@ public interface Datum {
* @return returns the watermark
*/
public Instant getWatermark();

/**
* method to get the headers information of the payload
*
* @return returns the headers in the form of key value pair
*/
public Map<String, String> getHeaders();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import lombok.AllArgsConstructor;

import java.time.Instant;
import java.util.Map;

@AllArgsConstructor
class HandlerDatum implements Datum {

private byte[] value;
private Instant watermark;
private Instant eventTime;
private Map<String, String> headers;


@Override
Expand All @@ -28,4 +30,9 @@ public Instant getEventTime() {
return this.eventTime;
}

@Override
public Map<String, String> getHeaders() {
return this.headers;
}

}
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapstreamer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public void mapStreamFn(
request.getWatermark().getNanos()),
Instant.ofEpochSecond(
request.getEventTime().getSeconds(),
request.getEventTime().getNanos())
request.getEventTime().getNanos()),
request.getHeadersMap()
);

// process Datum
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducer/Datum.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.reducer;

import java.time.Instant;
import java.util.Map;

/**
* Datum contains methods to get the payload information.
Expand All @@ -27,4 +28,11 @@ public interface Datum {
* @return returns the watermark
*/
public Instant getWatermark();

/**
* method to get the headers information of the payload
*
* @return returns the headers in the form of key value pair
*/
public Map<String, String> getHeaders();
}
7 changes: 7 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import lombok.AllArgsConstructor;

import java.time.Instant;
import java.util.Map;

@AllArgsConstructor
class HandlerDatum implements Datum {
private byte[] value;
private Instant watermark;
private Instant eventTime;
private Map<String, String> headers;

@Override
public Instant getWatermark() {
Expand All @@ -25,4 +27,9 @@ public byte[] getValue() {
public Instant getEventTime() {
return this.eventTime;
}

@Override
public Map<String, String> getHeaders() {
return this.headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payloa
payload.getWatermark().getNanos()),
Instant.ofEpochSecond(
payload.getEventTime().getSeconds(),
payload.getEventTime().getNanos())
payload.getEventTime().getNanos()),
payload.getHeadersMap()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import lombok.AllArgsConstructor;

import java.time.Instant;
import java.util.Map;

@AllArgsConstructor
class HandlerDatum implements Datum {
private byte[] value;
private Instant watermark;
private Instant eventTime;
private Map<String, String> headers;

@Override
public Instant getWatermark() {
Expand All @@ -25,4 +27,9 @@ public byte[] getValue() {
public Instant getEventTime() {
return this.eventTime;
}

@Override
public Map<String, String> getHeaders() {
return this.headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payloa
payload.getWatermark().getNanos()),
Instant.ofEpochSecond(
payload.getEventTime().getSeconds(),
payload.getEventTime().getNanos())
payload.getEventTime().getNanos()),
payload.getHeadersMap()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.reducestreamer.model;

import java.time.Instant;
import java.util.Map;

/**
* Datum contains methods to get the payload information.
Expand All @@ -26,4 +27,11 @@ public interface Datum {
* @return returns the watermark
*/
Instant getWatermark();

/**
* method to get the headers information of the payload
*
* @return returns the headers in the form of key value pair
*/
public Map<String, String> getHeaders();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import lombok.AllArgsConstructor;

import java.time.Instant;
import java.util.Map;

@AllArgsConstructor
class HandlerDatum implements Datum {
private byte[] value;
private Instant watermark;
private Instant eventTime;
private Map<String, String> headers;

@Override
public Instant getWatermark() {
Expand All @@ -25,4 +27,9 @@ public byte[] getValue() {
public Instant getEventTime() {
return this.eventTime;
}

@Override
public Map<String, String> getHeaders() {
return this.headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ private HandlerDatum constructHandlerDatum(Sessionreduce.SessionReduceRequest.Pa
payload.getWatermark().getNanos()),
Instant.ofEpochSecond(
payload.getEventTime().getSeconds(),
payload.getEventTime().getNanos())
payload.getEventTime().getNanos()),
payload.getHeadersMap()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.sessionreducer.model;

import java.time.Instant;
import java.util.Map;

/**
* Datum contains methods to get the payload information.
Expand All @@ -26,4 +27,11 @@ public interface Datum {
* @return returns the watermark
*/
Instant getWatermark();

/**
* method to get the headers information of the payload
*
* @return returns the headers in the form of key value pair
*/
public Map<String, String> getHeaders();
}
8 changes: 8 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/Datum.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.sinker;

import java.time.Instant;
import java.util.Map;

/**
* Datum contains methods to get the payload information.
Expand Down Expand Up @@ -40,4 +41,11 @@ public interface Datum {
* @return returns the ID
*/
String getId();

/**
* method to get the headers information of the payload
*
* @return returns the headers in the form of key value pair
*/
public Map<String, String> getHeaders();
}
9 changes: 8 additions & 1 deletion src/main/java/io/numaproj/numaflow/sinker/HandlerDatum.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
import lombok.AllArgsConstructor;

import java.time.Instant;
import java.util.Map;

@AllArgsConstructor
class HandlerDatum implements Datum {

// EOF_DATUM is used to indicate the end of the stream.
static final HandlerDatum EOF_DATUM = new HandlerDatum(null, null, null, null, null);
static final HandlerDatum EOF_DATUM = new HandlerDatum(null, null, null, null, null, null);
private String[] keys;
private byte[] value;
private Instant watermark;
private Instant eventTime;
private String id;
private Map<String, String> headers;

@Override
public String[] getKeys() {
Expand All @@ -39,4 +41,9 @@ public Instant getEventTime() {
public String getId() {
return id;
}

@Override
public Map<String, String> getHeaders() {
return this.headers;
}
}
4 changes: 3 additions & 1 deletion src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ private HandlerDatum constructHandlerDatum(SinkOuterClass.SinkRequest d) {
Instant.ofEpochSecond(
d.getEventTime().getSeconds(),
d.getEventTime().getNanos()),
d.getId());
d.getId(),
d.getHeadersMap()
);
}

public SinkOuterClass.SinkResponse buildResponseList(ResponseList responses) {
Expand Down
30 changes: 29 additions & 1 deletion src/main/java/io/numaproj/numaflow/sourcer/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lombok.Getter;

import java.time.Instant;
import java.util.Map;

/**
* Message is used to wrap the data returned by Sourcer.
Expand All @@ -15,6 +16,7 @@ public class Message {
private final byte[] value;
private final Offset offset;
private final Instant eventTime;
private final Map<String, String> headers;

/**
* used to create Message with value, offset and eventTime.
Expand All @@ -24,7 +26,7 @@ public class Message {
* @param eventTime message eventTime
*/
public Message(byte[] value, Offset offset, Instant eventTime) {
this(value, offset, eventTime, null);
this(value, offset, eventTime, null, null);
}

/**
Expand All @@ -36,9 +38,35 @@ public Message(byte[] value, Offset offset, Instant eventTime) {
* @param keys message keys
*/
public Message(byte[] value, Offset offset, Instant eventTime, String[] keys) {
this(value, offset, eventTime, keys, null);
}

/**
* used to create Message with value, offset, eventTime and headers.
*
* @param value message value
* @param offset message offset
* @param eventTime message eventTime
* @param headers message headers
*/
public Message(byte[] value, Offset offset, Instant eventTime, Map<String, String> headers) {
this(value, offset, eventTime, null, headers);
}

/**
* used to create Message with value, offset, eventTime, keys and headers.
*
* @param value message value
* @param offset message offset
* @param eventTime message eventTime
* @param keys message keys
* @param headers message headers
*/
public Message(byte[] value, Offset offset, Instant eventTime, String[] keys, Map<String, String> headers) {
this.value = value;
this.offset = offset;
this.eventTime = eventTime;
this.keys = keys;
this.headers = headers;
}
}
Loading

0 comments on commit 684e113

Please sign in to comment.