Skip to content

Commit

Permalink
feat: Join Vertex (#875)
Browse files Browse the repository at this point in the history
Signed-off-by: Julie Vogelmani <[email protected]>
  • Loading branch information
juliev0 committed Jul 27, 2023
1 parent 85360f6 commit 7e86306
Show file tree
Hide file tree
Showing 38 changed files with 1,026 additions and 181 deletions.
12 changes: 7 additions & 5 deletions pkg/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
// let's track only the first element's watermark. This is important because we reassign the watermark we fetch
// to all the elements in the batch. If we were to assign last element's watermark, we will wrongly mark on-time data as late.
// we fetch the watermark for the partition from which we read the message.
processorWM := isdf.wmFetcher.GetWatermark(readMessages[0].ReadOffset, isdf.fromBufferPartition.GetPartitionIdx())
processorWM := isdf.wmFetcher.ComputeWatermark(readMessages[0].ReadOffset, isdf.fromBufferPartition.GetPartitionIdx())

var writeOffsets map[string][][]isb.Offset
if !isdf.opts.enableMapUdfStream {
Expand Down Expand Up @@ -458,8 +458,9 @@ func (isdf *InterStepDataForward) streamMessage(
// to send the result to. Then update the toBuffer(s) with writeMessage.
msgIndex := 0
for writeMessage := range writeMessageCh {
// add partition to the ID, this is to make sure that the ID is unique across partitions
writeMessage.ID = fmt.Sprintf("%s-%d-%d", dataMessages[0].ReadOffset.String(), isdf.fromBufferPartition.GetPartitionIdx(), msgIndex)
// add partition to the ID, to make sure that the ID is unique across partitions
// also add vertex name to the ID, since multiple vertices can publish to the same vertex and we need uniqueness across them
writeMessage.ID = fmt.Sprintf("%s-%s-%d-%d", dataMessages[0].ReadOffset.String(), isdf.vertexName, isdf.fromBufferPartition.GetPartitionIdx(), msgIndex)
msgIndex += 1
udfWriteMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(1))

Expand Down Expand Up @@ -680,8 +681,9 @@ func (isdf *InterStepDataForward) applyUDF(ctx context.Context, readMessage *isb
} else {
// if we do not get a time from UDF, we set it to the time from (N-1)th vertex
for index, m := range writeMessages {
// add partition to the ID, this is to make sure that the ID is unique across partitions
m.ID = fmt.Sprintf("%s-%d-%d", readMessage.ReadOffset.String(), isdf.fromBufferPartition.GetPartitionIdx(), index)
// add partition to the ID, to make sure that the ID is unique across partitions
// also add vertex name to the ID, since multiple vertices can publish to the same vertex and we need uniqueness across them
m.ID = fmt.Sprintf("%s-%s-%d-%d", readMessage.ReadOffset.String(), isdf.vertexName, isdf.fromBufferPartition.GetPartitionIdx(), index)
if m.EventTime.IsZero() {
m.EventTime = readMessage.EventTime
}
Expand Down
26 changes: 17 additions & 9 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,13 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

// GetWatermark uses current time as the watermark because we want to make sure
func (t *testForwardFetcher) ComputeWatermark(offset isb.Offset, partition int32) wmb.Watermark {
return t.getWatermark()
}

// getWatermark uses current time as the watermark because we want to make sure
// the test publisher is publishing watermark
func (t *testForwardFetcher) GetWatermark(offset isb.Offset, partition int32) wmb.Watermark {
func (t *testForwardFetcher) getWatermark() wmb.Watermark {
return wmb.Watermark(testSourceWatermark)
}

Expand Down Expand Up @@ -777,9 +781,13 @@ func (t *testWMBFetcher) Close() error {
return nil
}

// GetWatermark uses current time as the watermark because we want to make sure
func (t *testWMBFetcher) ComputeWatermark(offset isb.Offset, partition int32) wmb.Watermark {
return t.getWatermark()
}

// getWatermark uses current time as the watermark because we want to make sure
// the test publisher is publishing watermark
func (t *testWMBFetcher) GetWatermark(offset isb.Offset, partition int32) wmb.Watermark {
func (t *testWMBFetcher) getWatermark() wmb.Watermark {
return wmb.Watermark(testWMBWatermark)
}

Expand Down Expand Up @@ -1215,7 +1223,7 @@ func TestInterStepDataForwardSinglePartition(t *testing.T) {
vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{
PipelineName: "testPipeline",
AbstractVertex: dfv1.AbstractVertex{
Name: "testVertex",
Name: "receivingVertex",
},
}}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
Expand All @@ -1242,7 +1250,7 @@ func TestInterStepDataForwardSinglePartition(t *testing.T) {
assert.NoError(t, err, "expected no error")
assert.Len(t, readMessages, int(count))
assert.Equal(t, []interface{}{writeMessages[0].Header.Keys, writeMessages[1].Header.Keys}, []interface{}{readMessages[0].Header.Keys, readMessages[1].Header.Keys})
assert.Equal(t, []interface{}{writeMessages[0].Header.ID + "-0-0", writeMessages[1].Header.ID + "-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID})
assert.Equal(t, []interface{}{"0-receivingVertex-0-0", "1-receivingVertex-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID})

f.Stop()
time.Sleep(1 * time.Millisecond)
Expand All @@ -1261,7 +1269,7 @@ func TestInterStepDataForwardMultiplePartition(t *testing.T) {
vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{
PipelineName: "testPipeline",
AbstractVertex: dfv1.AbstractVertex{
Name: "testVertex",
Name: "receivingVertex",
},
}}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
Expand Down Expand Up @@ -1292,15 +1300,15 @@ func TestInterStepDataForwardMultiplePartition(t *testing.T) {
assert.NoError(t, err, "expected no error")
assert.Len(t, readMessages, 2)
assert.Equal(t, []interface{}{writeMessages[0].Header.Keys, writeMessages[2].Header.Keys}, []interface{}{readMessages[0].Header.Keys, readMessages[1].Header.Keys})
assert.Equal(t, []interface{}{writeMessages[0].Header.ID + "-0-0", writeMessages[2].Header.ID + "-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID})
assert.Equal(t, []interface{}{"0-receivingVertex-0-0", "2-receivingVertex-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID})

time.Sleep(time.Second)

readMessages, err = to12.Read(ctx, 2)
assert.NoError(t, err, "expected no error")
assert.Len(t, readMessages, 2)
assert.Equal(t, []interface{}{writeMessages[1].Header.Keys, writeMessages[3].Header.Keys}, []interface{}{readMessages[0].Header.Keys, readMessages[1].Header.Keys})
assert.Equal(t, []interface{}{writeMessages[1].Header.ID + "-0-0", writeMessages[3].Header.ID + "-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID})
assert.Equal(t, []interface{}{"1-receivingVertex-0-0", "3-receivingVertex-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID})

f.Stop()
time.Sleep(1 * time.Millisecond)
Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/stores/jetstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewJetStreamBufferWriter(ctx context.Context, client *jsclient.NATSClient,
js: js,
opts: o,
isFull: atomic.NewBool(true),
log: logging.FromContext(ctx).With("bufferWriter", name).With("stream", stream).With("subject", subject),
log: logging.FromContext(ctx).With("bufferWriter", name).With("stream", stream).With("subject", subject).With("partitionIdx", partitionIdx),
}

go result.runStatusChecker(ctx)
Expand Down Expand Up @@ -265,7 +265,7 @@ func (jw *jetStreamWriter) syncWrite(_ context.Context, messages []isb.Message,
} else {
writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence}
errs[idx] = nil
jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("domain", pubAck.Domain))
jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain))
}
}(msg, index)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/isb/stores/redis/write_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ func (bw *BufferWrite) setError(errMsg string, err error) {
// Today we care only about the timestamp and compare just that.
func splitId(id string) (int64, error) {
splitId := strings.Split(id, "-")

idValue, err := strconv.ParseInt(splitId[0], 10, 64)
if err != nil {
return 0, fmt.Errorf("ParseFloat err: %w", err)
Expand Down
12 changes: 8 additions & 4 deletions pkg/isb/testutils/rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -50,7 +51,7 @@ func BuildTestWriteMessages(count int64, startTime time.Time) []isb.Message {
MessageInfo: isb.MessageInfo{
EventTime: tmpTime,
},
ID: fmt.Sprintf("%d", i),
ID: fmt.Sprintf("%d-testVertex-0-0", i), // TODO: hard coded ID suffix ATM, make configurable if needed
Keys: []string{},
},
Body: isb.Body{Payload: result},
Expand Down Expand Up @@ -82,10 +83,13 @@ func BuildTestReadMessagesIntOffset(count int64, startTime time.Time) []isb.Read
var readMessages = make([]isb.ReadMessage, count)

for idx, writeMessage := range writeMessages {
offset, _ := strconv.Atoi(writeMessage.Header.ID)
splitStr := strings.Split(writeMessage.Header.ID, "-")
offset, _ := strconv.Atoi(splitStr[0])
readMessages[idx] = isb.ReadMessage{
Message: writeMessage,
ReadOffset: isb.SimpleIntOffset(func() int64 { return int64(offset) }),
Message: writeMessage,
ReadOffset: isb.SimpleIntOffset(func() int64 {
return int64(offset)
}),
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,9 @@ func (jss *jetStreamSvc) CreateWatermarkFetcher(ctx context.Context, bucketName
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatch, otWatch)
var pm *processor.ProcessorManager
if isReduce {
pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce))
pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce))
} else {
pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount))
pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount))
}
watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm, fromBufferPartitionCount)
watermarkFetchers = append(watermarkFetchers, watermarkFetcher)
Expand Down
4 changes: 2 additions & 2 deletions pkg/isbsvc/redis_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func (r *isbsRedisSvc) CreateWatermarkFetcher(ctx context.Context, bucketName st
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
var pm *processor.ProcessorManager
if isReduce {
pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce))
pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce))
} else {
pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount))
pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount))
}
watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm, fromBufferPartitionCount)
watermarkFetchers = append(watermarkFetchers, watermarkFetcher)
Expand Down
9 changes: 1 addition & 8 deletions pkg/reconciler/pipeline/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,7 @@ func ValidatePipeline(pl *dfv1.Pipeline) error {
return fmt.Errorf("not all the vertex names are defined in edges")
}

// Do not support N FROM -> 1 TO for now.
toInEdges := make(map[string]bool)
for _, e := range pl.Spec.Edges {
if _, existing := toInEdges[e.To]; existing {
return fmt.Errorf("vertex %q has multiple 'from', which is not supported yet", e.To)
}
toInEdges[e.To] = true
}
// TODO(Join): prevent pipelines with Cycles in the case that there is a Reduce Vertex at the point of the cycle or to the right of it

for _, v := range pl.Spec.Vertices {
if err := validateVertex(v); err != nil {
Expand Down
8 changes: 1 addition & 7 deletions pkg/reconciler/pipeline/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,7 @@ func TestValidatePipeline(t *testing.T) {
assert.Contains(t, err.Error(), "same from and to")
})

t.Run("N from -> 1 to", func(t *testing.T) {
testObj := testPipeline.DeepCopy()
testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "input", To: "output"})
err := ValidatePipeline(testObj)
assert.Error(t, err)
assert.Contains(t, err.Error(), "not supported")
})
// TODO(Join): we can test for certain types of invalid cycles here instead

t.Run("or conditional forwarding", func(t *testing.T) {
testObj := testPipeline.DeepCopy()
Expand Down
4 changes: 3 additions & 1 deletion pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func NewDataForward(ctx context.Context,

// Start starts reading messages from ISG
func (df *DataForward) Start() {

for {
select {
case <-df.ctx.Done():
Expand Down Expand Up @@ -250,7 +251,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
// fetch watermark using the first element's watermark, because we assign the watermark to all other
// elements in the batch based on the watermark we fetch from 0th offset.
// get the watermark for the partition from which we read the messages
processorWM := df.wmFetcher.GetWatermark(readMessages[0].ReadOffset, df.fromBufferPartition.GetPartitionIdx())
processorWM := df.wmFetcher.ComputeWatermark(readMessages[0].ReadOffset, df.fromBufferPartition.GetPartitionIdx())

for _, m := range readMessages {
if !df.keyed {
m.Keys = []string{dfv1.DefaultKeyForNonKeyedData}
Expand Down
17 changes: 12 additions & 5 deletions pkg/reduce/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ var nonKeyedVertex = &dfv1.VertexInstance{

type EventTypeWMProgressor struct {
watermarks map[string]wmb.Watermark
lastOffset isb.Offset
m sync.Mutex
}

Expand All @@ -95,10 +96,15 @@ func (e *EventTypeWMProgressor) Close() error {
return nil
}

func (e *EventTypeWMProgressor) GetWatermark(offset isb.Offset, partition int32) wmb.Watermark {
func (e *EventTypeWMProgressor) ComputeWatermark(offset isb.Offset, partition int32) wmb.Watermark {
e.lastOffset = offset
return e.getWatermark()
}

func (e *EventTypeWMProgressor) getWatermark() wmb.Watermark {
e.m.Lock()
defer e.m.Unlock()
return e.watermarks[offset.String()]
return e.watermarks[e.lastOffset.String()]
}

func (e *EventTypeWMProgressor) GetHeadWatermark(int32) wmb.Watermark {
Expand Down Expand Up @@ -1232,13 +1238,14 @@ func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryB
hbWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_PROCESSORS", hbWatcherCh)
otWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_OT", otWatcherCh)
storeWatcher := wmstore.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
pm := processor.NewProcessorManager(ctx, storeWatcher, 1, processor.WithIsReduce(true))
pm := processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 1, processor.WithIsReduce(true))
for waitForReadyP := pm.GetProcessor(fromBuffer.GetName()); waitForReadyP == nil; waitForReadyP = pm.GetProcessor(fromBuffer.GetName()) {
// wait until the test processor has been added to the processor list
time.Sleep(time.Millisecond * 100)
}
f := fetch.NewEdgeFetcher(ctx, fromBuffer.GetName(), storeWatcher, pm, 1)
return f, sourcePublisher
edgeFetcher := fetch.NewEdgeFetcher(ctx, fromBuffer.GetName(), storeWatcher, pm, 1)
edgeFetcherSet := fetch.NewEdgeFetcherSet(ctx, map[string]fetch.Fetcher{"fromVertex": edgeFetcher})
return edgeFetcherSet, sourcePublisher
}

func buildPublisherMapAndOTStore(ctx context.Context, toBuffers map[string][]isb.BufferWriter, pipelineName string) (map[string]publish.Publisher, map[string]wmstore.WatermarkKVStorer) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/reduce/pbq/store/wal/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func Test_batchSyncWithMaxBatchSize(t *testing.T) {
assert.NoError(t, err)
err = wal.Write(&message)
assert.NoError(t, err)
assert.Equal(t, int64(222), tempWAL.prevSyncedWOffset)
assert.Equal(t, int64(252), tempWAL.prevSyncedWOffset)

err = wal.Close()
assert.NoError(t, err)
Expand Down Expand Up @@ -318,7 +318,7 @@ func Test_batchSyncWithSyncDuration(t *testing.T) {
message := writeMessages[0]
storePrevSyncedTime := tempWAL.prevSyncedTime
err = wal.Write(&message)
assert.Equal(t, int64(130), tempWAL.prevSyncedWOffset)
assert.Equal(t, int64(145), tempWAL.prevSyncedWOffset)
assert.NotEqual(t, storePrevSyncedTime, tempWAL.prevSyncedTime)
assert.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (isdf *DataForward) forwardAChunk(ctx context.Context) {
isdf.srcWMPublisher.PublishSourceWatermarks(transformedReadMessages)
// fetch the source watermark again, we might not get the latest watermark because of publishing delay,
// but ideally we should use the latest to determine the IsLate attribute.
processorWM = isdf.wmFetcher.GetWatermark(readMessages[0].ReadOffset, isdf.reader.GetPartitionIdx())
processorWM = isdf.wmFetcher.ComputeWatermark(readMessages[0].ReadOffset, isdf.reader.GetPartitionIdx())
// assign isLate
for _, m := range writeMessages {
if processorWM.After(m.EventTime) { // Set late data at source level
Expand Down Expand Up @@ -533,7 +533,7 @@ func (isdf *DataForward) applyTransformer(ctx context.Context, readMessage *isb.
// if we do not get a time from Transformer, we set it to the time from (N-1)th vertex
for index, m := range writeMessages {
// add partition to the ID, this is to make sure that the ID is unique across partitions
m.ID = fmt.Sprintf("%s-%d-%d", readMessage.ReadOffset.String(), isdf.reader.GetPartitionIdx(), index)
m.ID = fmt.Sprintf("%s-%s-%d-%d", readMessage.ReadOffset.String(), isdf.vertexName, isdf.reader.GetPartitionIdx(), index)
if m.EventTime.IsZero() {
m.EventTime = readMessage.EventTime
}
Expand Down
Loading

0 comments on commit 7e86306

Please sign in to comment.