diff --git a/pkg/forward/forward.go b/pkg/forward/forward.go index 5f7f0f42ef..3178101a34 100644 --- a/pkg/forward/forward.go +++ b/pkg/forward/forward.go @@ -260,7 +260,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { // let's track only the first element's watermark. This is important because we reassign the watermark we fetch // to all the elements in the batch. If we were to assign last element's watermark, we will wrongly mark on-time data as late. // we fetch the watermark for the partition from which we read the message. - processorWM := isdf.wmFetcher.GetWatermark(readMessages[0].ReadOffset, isdf.fromBufferPartition.GetPartitionIdx()) + processorWM := isdf.wmFetcher.ComputeWatermark(readMessages[0].ReadOffset, isdf.fromBufferPartition.GetPartitionIdx()) var writeOffsets map[string][][]isb.Offset if !isdf.opts.enableMapUdfStream { @@ -458,8 +458,9 @@ func (isdf *InterStepDataForward) streamMessage( // to send the result to. Then update the toBuffer(s) with writeMessage. msgIndex := 0 for writeMessage := range writeMessageCh { - // add partition to the ID, this is to make sure that the ID is unique across partitions - writeMessage.ID = fmt.Sprintf("%s-%d-%d", dataMessages[0].ReadOffset.String(), isdf.fromBufferPartition.GetPartitionIdx(), msgIndex) + // add partition to the ID, to make sure that the ID is unique across partitions + // also add vertex name to the ID, since multiple vertices can publish to the same vertex and we need uniqueness across them + writeMessage.ID = fmt.Sprintf("%s-%s-%d-%d", dataMessages[0].ReadOffset.String(), isdf.vertexName, isdf.fromBufferPartition.GetPartitionIdx(), msgIndex) msgIndex += 1 udfWriteMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(1)) @@ -680,8 +681,9 @@ func (isdf *InterStepDataForward) applyUDF(ctx context.Context, readMessage *isb } else { // if we do not get a time from UDF, we set it to the time from (N-1)th vertex for index, m := range writeMessages { - // add partition to the ID, this is to make sure that the ID is unique across partitions - m.ID = fmt.Sprintf("%s-%d-%d", readMessage.ReadOffset.String(), isdf.fromBufferPartition.GetPartitionIdx(), index) + // add partition to the ID, to make sure that the ID is unique across partitions + // also add vertex name to the ID, since multiple vertices can publish to the same vertex and we need uniqueness across them + m.ID = fmt.Sprintf("%s-%s-%d-%d", readMessage.ReadOffset.String(), isdf.vertexName, isdf.fromBufferPartition.GetPartitionIdx(), index) if m.EventTime.IsZero() { m.EventTime = readMessage.EventTime } diff --git a/pkg/forward/forward_test.go b/pkg/forward/forward_test.go index 75f5b4ba98..d6a397a869 100644 --- a/pkg/forward/forward_test.go +++ b/pkg/forward/forward_test.go @@ -69,9 +69,13 @@ func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } -// GetWatermark uses current time as the watermark because we want to make sure +func (t *testForwardFetcher) ComputeWatermark(offset isb.Offset, partition int32) wmb.Watermark { + return t.getWatermark() +} + +// getWatermark uses current time as the watermark because we want to make sure // the test publisher is publishing watermark -func (t *testForwardFetcher) GetWatermark(offset isb.Offset, partition int32) wmb.Watermark { +func (t *testForwardFetcher) getWatermark() wmb.Watermark { return wmb.Watermark(testSourceWatermark) } @@ -777,9 +781,13 @@ func (t *testWMBFetcher) Close() error { return nil } -// GetWatermark uses current time as the watermark because we want to make sure +func (t *testWMBFetcher) ComputeWatermark(offset isb.Offset, partition int32) wmb.Watermark { + return t.getWatermark() +} + +// getWatermark uses current time as the watermark because we want to make sure // the test publisher is publishing watermark -func (t *testWMBFetcher) GetWatermark(offset isb.Offset, partition int32) wmb.Watermark { +func (t *testWMBFetcher) getWatermark() wmb.Watermark { return wmb.Watermark(testWMBWatermark) } @@ -1215,7 +1223,7 @@ func TestInterStepDataForwardSinglePartition(t *testing.T) { vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{ PipelineName: "testPipeline", AbstractVertex: dfv1.AbstractVertex{ - Name: "testVertex", + Name: "receivingVertex", }, }} ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) @@ -1242,7 +1250,7 @@ func TestInterStepDataForwardSinglePartition(t *testing.T) { assert.NoError(t, err, "expected no error") assert.Len(t, readMessages, int(count)) assert.Equal(t, []interface{}{writeMessages[0].Header.Keys, writeMessages[1].Header.Keys}, []interface{}{readMessages[0].Header.Keys, readMessages[1].Header.Keys}) - assert.Equal(t, []interface{}{writeMessages[0].Header.ID + "-0-0", writeMessages[1].Header.ID + "-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID}) + assert.Equal(t, []interface{}{"0-receivingVertex-0-0", "1-receivingVertex-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID}) f.Stop() time.Sleep(1 * time.Millisecond) @@ -1261,7 +1269,7 @@ func TestInterStepDataForwardMultiplePartition(t *testing.T) { vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{ PipelineName: "testPipeline", AbstractVertex: dfv1.AbstractVertex{ - Name: "testVertex", + Name: "receivingVertex", }, }} ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) @@ -1292,7 +1300,7 @@ func TestInterStepDataForwardMultiplePartition(t *testing.T) { assert.NoError(t, err, "expected no error") assert.Len(t, readMessages, 2) assert.Equal(t, []interface{}{writeMessages[0].Header.Keys, writeMessages[2].Header.Keys}, []interface{}{readMessages[0].Header.Keys, readMessages[1].Header.Keys}) - assert.Equal(t, []interface{}{writeMessages[0].Header.ID + "-0-0", writeMessages[2].Header.ID + "-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID}) + assert.Equal(t, []interface{}{"0-receivingVertex-0-0", "2-receivingVertex-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID}) time.Sleep(time.Second) @@ -1300,7 +1308,7 @@ func TestInterStepDataForwardMultiplePartition(t *testing.T) { assert.NoError(t, err, "expected no error") assert.Len(t, readMessages, 2) assert.Equal(t, []interface{}{writeMessages[1].Header.Keys, writeMessages[3].Header.Keys}, []interface{}{readMessages[0].Header.Keys, readMessages[1].Header.Keys}) - assert.Equal(t, []interface{}{writeMessages[1].Header.ID + "-0-0", writeMessages[3].Header.ID + "-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID}) + assert.Equal(t, []interface{}{"1-receivingVertex-0-0", "3-receivingVertex-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID}) f.Stop() time.Sleep(1 * time.Millisecond) diff --git a/pkg/isb/stores/jetstream/writer.go b/pkg/isb/stores/jetstream/writer.go index a8a9f8c43d..2e77ffddb6 100644 --- a/pkg/isb/stores/jetstream/writer.go +++ b/pkg/isb/stores/jetstream/writer.go @@ -71,7 +71,7 @@ func NewJetStreamBufferWriter(ctx context.Context, client *jsclient.NATSClient, js: js, opts: o, isFull: atomic.NewBool(true), - log: logging.FromContext(ctx).With("bufferWriter", name).With("stream", stream).With("subject", subject), + log: logging.FromContext(ctx).With("bufferWriter", name).With("stream", stream).With("subject", subject).With("partitionIdx", partitionIdx), } go result.runStatusChecker(ctx) @@ -265,7 +265,7 @@ func (jw *jetStreamWriter) syncWrite(_ context.Context, messages []isb.Message, } else { writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence} errs[idx] = nil - jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("domain", pubAck.Domain)) + jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain)) } }(msg, index) } diff --git a/pkg/isb/stores/redis/write_info.go b/pkg/isb/stores/redis/write_info.go index 833eca351b..c1b703a043 100644 --- a/pkg/isb/stores/redis/write_info.go +++ b/pkg/isb/stores/redis/write_info.go @@ -356,6 +356,7 @@ func (bw *BufferWrite) setError(errMsg string, err error) { // Today we care only about the timestamp and compare just that. func splitId(id string) (int64, error) { splitId := strings.Split(id, "-") + idValue, err := strconv.ParseInt(splitId[0], 10, 64) if err != nil { return 0, fmt.Errorf("ParseFloat err: %w", err) diff --git a/pkg/isb/testutils/rw.go b/pkg/isb/testutils/rw.go index 25e913c874..71cbe53624 100644 --- a/pkg/isb/testutils/rw.go +++ b/pkg/isb/testutils/rw.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "strconv" + "strings" "time" "github.com/redis/go-redis/v9" @@ -50,7 +51,7 @@ func BuildTestWriteMessages(count int64, startTime time.Time) []isb.Message { MessageInfo: isb.MessageInfo{ EventTime: tmpTime, }, - ID: fmt.Sprintf("%d", i), + ID: fmt.Sprintf("%d-testVertex-0-0", i), // TODO: hard coded ID suffix ATM, make configurable if needed Keys: []string{}, }, Body: isb.Body{Payload: result}, @@ -82,10 +83,13 @@ func BuildTestReadMessagesIntOffset(count int64, startTime time.Time) []isb.Read var readMessages = make([]isb.ReadMessage, count) for idx, writeMessage := range writeMessages { - offset, _ := strconv.Atoi(writeMessage.Header.ID) + splitStr := strings.Split(writeMessage.Header.ID, "-") + offset, _ := strconv.Atoi(splitStr[0]) readMessages[idx] = isb.ReadMessage{ - Message: writeMessage, - ReadOffset: isb.SimpleIntOffset(func() int64 { return int64(offset) }), + Message: writeMessage, + ReadOffset: isb.SimpleIntOffset(func() int64 { + return int64(offset) + }), } } diff --git a/pkg/isbsvc/jetstream_service.go b/pkg/isbsvc/jetstream_service.go index a0a99f9d44..d2cfd971f0 100644 --- a/pkg/isbsvc/jetstream_service.go +++ b/pkg/isbsvc/jetstream_service.go @@ -336,9 +336,9 @@ func (jss *jetStreamSvc) CreateWatermarkFetcher(ctx context.Context, bucketName storeWatcher := store.BuildWatermarkStoreWatcher(hbWatch, otWatch) var pm *processor.ProcessorManager if isReduce { - pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce)) + pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce)) } else { - pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount)) + pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount)) } watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm, fromBufferPartitionCount) watermarkFetchers = append(watermarkFetchers, watermarkFetcher) diff --git a/pkg/isbsvc/redis_service.go b/pkg/isbsvc/redis_service.go index d03f2f59e0..83a317c263 100644 --- a/pkg/isbsvc/redis_service.go +++ b/pkg/isbsvc/redis_service.go @@ -152,9 +152,9 @@ func (r *isbsRedisSvc) CreateWatermarkFetcher(ctx context.Context, bucketName st storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) var pm *processor.ProcessorManager if isReduce { - pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce)) + pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount), processor.WithVertexReplica(int32(i)), processor.WithIsReduce(isReduce)) } else { - pm = processor.NewProcessorManager(ctx, storeWatcher, int32(fromBufferPartitionCount)) + pm = processor.NewProcessorManager(ctx, storeWatcher, bucketName, int32(fromBufferPartitionCount)) } watermarkFetcher := fetch.NewEdgeFetcher(ctx, bucketName, storeWatcher, pm, fromBufferPartitionCount) watermarkFetchers = append(watermarkFetchers, watermarkFetcher) diff --git a/pkg/reconciler/pipeline/validate.go b/pkg/reconciler/pipeline/validate.go index 62ae6ab7d1..8b3592ef2d 100644 --- a/pkg/reconciler/pipeline/validate.go +++ b/pkg/reconciler/pipeline/validate.go @@ -155,14 +155,7 @@ func ValidatePipeline(pl *dfv1.Pipeline) error { return fmt.Errorf("not all the vertex names are defined in edges") } - // Do not support N FROM -> 1 TO for now. - toInEdges := make(map[string]bool) - for _, e := range pl.Spec.Edges { - if _, existing := toInEdges[e.To]; existing { - return fmt.Errorf("vertex %q has multiple 'from', which is not supported yet", e.To) - } - toInEdges[e.To] = true - } + // TODO(Join): prevent pipelines with Cycles in the case that there is a Reduce Vertex at the point of the cycle or to the right of it for _, v := range pl.Spec.Vertices { if err := validateVertex(v); err != nil { diff --git a/pkg/reconciler/pipeline/validate_test.go b/pkg/reconciler/pipeline/validate_test.go index a7975b6b9c..198525d1ae 100644 --- a/pkg/reconciler/pipeline/validate_test.go +++ b/pkg/reconciler/pipeline/validate_test.go @@ -314,13 +314,7 @@ func TestValidatePipeline(t *testing.T) { assert.Contains(t, err.Error(), "same from and to") }) - t.Run("N from -> 1 to", func(t *testing.T) { - testObj := testPipeline.DeepCopy() - testObj.Spec.Edges = append(testObj.Spec.Edges, dfv1.Edge{From: "input", To: "output"}) - err := ValidatePipeline(testObj) - assert.Error(t, err) - assert.Contains(t, err.Error(), "not supported") - }) + // TODO(Join): we can test for certain types of invalid cycles here instead t.Run("or conditional forwarding", func(t *testing.T) { testObj := testPipeline.DeepCopy() diff --git a/pkg/reduce/data_forward.go b/pkg/reduce/data_forward.go index fd95486bdc..622241c79d 100644 --- a/pkg/reduce/data_forward.go +++ b/pkg/reduce/data_forward.go @@ -122,6 +122,7 @@ func NewDataForward(ctx context.Context, // Start starts reading messages from ISG func (df *DataForward) Start() { + for { select { case <-df.ctx.Done(): @@ -250,7 +251,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { // fetch watermark using the first element's watermark, because we assign the watermark to all other // elements in the batch based on the watermark we fetch from 0th offset. // get the watermark for the partition from which we read the messages - processorWM := df.wmFetcher.GetWatermark(readMessages[0].ReadOffset, df.fromBufferPartition.GetPartitionIdx()) + processorWM := df.wmFetcher.ComputeWatermark(readMessages[0].ReadOffset, df.fromBufferPartition.GetPartitionIdx()) + for _, m := range readMessages { if !df.keyed { m.Keys = []string{dfv1.DefaultKeyForNonKeyedData} diff --git a/pkg/reduce/data_forward_test.go b/pkg/reduce/data_forward_test.go index 19e69fa4e6..141b5304d1 100644 --- a/pkg/reduce/data_forward_test.go +++ b/pkg/reduce/data_forward_test.go @@ -74,6 +74,7 @@ var nonKeyedVertex = &dfv1.VertexInstance{ type EventTypeWMProgressor struct { watermarks map[string]wmb.Watermark + lastOffset isb.Offset m sync.Mutex } @@ -95,10 +96,15 @@ func (e *EventTypeWMProgressor) Close() error { return nil } -func (e *EventTypeWMProgressor) GetWatermark(offset isb.Offset, partition int32) wmb.Watermark { +func (e *EventTypeWMProgressor) ComputeWatermark(offset isb.Offset, partition int32) wmb.Watermark { + e.lastOffset = offset + return e.getWatermark() +} + +func (e *EventTypeWMProgressor) getWatermark() wmb.Watermark { e.m.Lock() defer e.m.Unlock() - return e.watermarks[offset.String()] + return e.watermarks[e.lastOffset.String()] } func (e *EventTypeWMProgressor) GetHeadWatermark(int32) wmb.Watermark { @@ -1232,13 +1238,14 @@ func fetcherAndPublisher(ctx context.Context, fromBuffer *simplebuffer.InMemoryB hbWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_PROCESSORS", hbWatcherCh) otWatcher, _ := inmem.NewInMemWatch(ctx, pipelineName, keyspace+"_OT", otWatcherCh) storeWatcher := wmstore.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - pm := processor.NewProcessorManager(ctx, storeWatcher, 1, processor.WithIsReduce(true)) + pm := processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 1, processor.WithIsReduce(true)) for waitForReadyP := pm.GetProcessor(fromBuffer.GetName()); waitForReadyP == nil; waitForReadyP = pm.GetProcessor(fromBuffer.GetName()) { // wait until the test processor has been added to the processor list time.Sleep(time.Millisecond * 100) } - f := fetch.NewEdgeFetcher(ctx, fromBuffer.GetName(), storeWatcher, pm, 1) - return f, sourcePublisher + edgeFetcher := fetch.NewEdgeFetcher(ctx, fromBuffer.GetName(), storeWatcher, pm, 1) + edgeFetcherSet := fetch.NewEdgeFetcherSet(ctx, map[string]fetch.Fetcher{"fromVertex": edgeFetcher}) + return edgeFetcherSet, sourcePublisher } func buildPublisherMapAndOTStore(ctx context.Context, toBuffers map[string][]isb.BufferWriter, pipelineName string) (map[string]publish.Publisher, map[string]wmstore.WatermarkKVStorer) { diff --git a/pkg/reduce/pbq/store/wal/segment_test.go b/pkg/reduce/pbq/store/wal/segment_test.go index e358506dc3..495f99a420 100644 --- a/pkg/reduce/pbq/store/wal/segment_test.go +++ b/pkg/reduce/pbq/store/wal/segment_test.go @@ -261,7 +261,7 @@ func Test_batchSyncWithMaxBatchSize(t *testing.T) { assert.NoError(t, err) err = wal.Write(&message) assert.NoError(t, err) - assert.Equal(t, int64(222), tempWAL.prevSyncedWOffset) + assert.Equal(t, int64(252), tempWAL.prevSyncedWOffset) err = wal.Close() assert.NoError(t, err) @@ -318,7 +318,7 @@ func Test_batchSyncWithSyncDuration(t *testing.T) { message := writeMessages[0] storePrevSyncedTime := tempWAL.prevSyncedTime err = wal.Write(&message) - assert.Equal(t, int64(130), tempWAL.prevSyncedWOffset) + assert.Equal(t, int64(145), tempWAL.prevSyncedWOffset) assert.NotEqual(t, storePrevSyncedTime, tempWAL.prevSyncedTime) assert.NoError(t, err) diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index 8be79486db..dd4c271adb 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -271,7 +271,7 @@ func (isdf *DataForward) forwardAChunk(ctx context.Context) { isdf.srcWMPublisher.PublishSourceWatermarks(transformedReadMessages) // fetch the source watermark again, we might not get the latest watermark because of publishing delay, // but ideally we should use the latest to determine the IsLate attribute. - processorWM = isdf.wmFetcher.GetWatermark(readMessages[0].ReadOffset, isdf.reader.GetPartitionIdx()) + processorWM = isdf.wmFetcher.ComputeWatermark(readMessages[0].ReadOffset, isdf.reader.GetPartitionIdx()) // assign isLate for _, m := range writeMessages { if processorWM.After(m.EventTime) { // Set late data at source level @@ -533,7 +533,7 @@ func (isdf *DataForward) applyTransformer(ctx context.Context, readMessage *isb. // if we do not get a time from Transformer, we set it to the time from (N-1)th vertex for index, m := range writeMessages { // add partition to the ID, this is to make sure that the ID is unique across partitions - m.ID = fmt.Sprintf("%s-%d-%d", readMessage.ReadOffset.String(), isdf.reader.GetPartitionIdx(), index) + m.ID = fmt.Sprintf("%s-%s-%d-%d", readMessage.ReadOffset.String(), isdf.vertexName, isdf.reader.GetPartitionIdx(), index) if m.EventTime.IsZero() { m.EventTime = readMessage.EventTime } diff --git a/pkg/sources/forward/data_forward_test.go b/pkg/sources/forward/data_forward_test.go index 1c3e14eb04..f0150c1286 100644 --- a/pkg/sources/forward/data_forward_test.go +++ b/pkg/sources/forward/data_forward_test.go @@ -69,9 +69,13 @@ func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } -// GetWatermark uses current time as the watermark because we want to make sure +// ComputeWatermark uses current time as the watermark because we want to make sure // the test publisher is publishing watermark -func (t *testForwardFetcher) GetWatermark(offset isb.Offset, partition int32) wmb.Watermark { +func (t *testForwardFetcher) ComputeWatermark(offset isb.Offset, partition int32) wmb.Watermark { + return t.getWatermark() +} + +func (t *testForwardFetcher) getWatermark() wmb.Watermark { return wmb.Watermark(testSourceWatermark) } @@ -852,7 +856,7 @@ func TestDataForwardSinglePartition(t *testing.T) { vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{ PipelineName: "testPipeline", AbstractVertex: dfv1.AbstractVertex{ - Name: "testVertex", + Name: "receivingVertex", }, }} ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) @@ -878,7 +882,7 @@ func TestDataForwardSinglePartition(t *testing.T) { assert.NoError(t, err, "expected no error") assert.Len(t, readMessages, int(count)) assert.Equal(t, []interface{}{writeMessages[0].Header.Keys, writeMessages[1].Header.Keys}, []interface{}{readMessages[0].Header.Keys, readMessages[1].Header.Keys}) - assert.Equal(t, []interface{}{writeMessages[0].Header.ID + "-0-0", writeMessages[1].Header.ID + "-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID}) + assert.Equal(t, []interface{}{"0-receivingVertex-0-0", "1-receivingVertex-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID}) for _, m := range readMessages { // verify new event time gets assigned to messages. assert.Equal(t, testSourceNewEventTime, m.EventTime) @@ -902,7 +906,7 @@ func TestDataForwardMultiplePartition(t *testing.T) { vertex := &dfv1.Vertex{Spec: dfv1.VertexSpec{ PipelineName: "testPipeline", AbstractVertex: dfv1.AbstractVertex{ - Name: "testVertex", + Name: "receivingVertex", }, }} ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) @@ -932,7 +936,7 @@ func TestDataForwardMultiplePartition(t *testing.T) { assert.NoError(t, err, "expected no error") assert.Len(t, readMessages, 2) assert.Equal(t, []interface{}{writeMessages[0].Header.Keys, writeMessages[2].Header.Keys}, []interface{}{readMessages[0].Header.Keys, readMessages[1].Header.Keys}) - assert.Equal(t, []interface{}{writeMessages[0].Header.ID + "-0-0", writeMessages[2].Header.ID + "-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID}) + assert.Equal(t, []interface{}{"0-receivingVertex-0-0", "2-receivingVertex-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID}) for _, m := range readMessages { // verify new event time gets assigned to messages. assert.Equal(t, testSourceNewEventTime, m.EventTime) @@ -946,7 +950,7 @@ func TestDataForwardMultiplePartition(t *testing.T) { assert.NoError(t, err, "expected no error") assert.Len(t, readMessages, 2) assert.Equal(t, []interface{}{writeMessages[1].Header.Keys, writeMessages[3].Header.Keys}, []interface{}{readMessages[0].Header.Keys, readMessages[1].Header.Keys}) - assert.Equal(t, []interface{}{writeMessages[1].Header.ID + "-0-0", writeMessages[3].Header.ID + "-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID}) + assert.Equal(t, []interface{}{"1-receivingVertex-0-0", "3-receivingVertex-0-0"}, []interface{}{readMessages[0].Header.ID, readMessages[1].Header.ID}) for _, m := range readMessages { // verify new event time gets assigned to messages. assert.Equal(t, testSourceNewEventTime, m.EventTime) diff --git a/pkg/sources/source.go b/pkg/sources/source.go index 7c0047c22a..60ba91475d 100644 --- a/pkg/sources/source.go +++ b/pkg/sources/source.go @@ -140,7 +140,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { shuffleFuncMap := make(map[string]*shuffle.Shuffle) for _, edge := range sp.VertexInstance.Vertex.Spec.ToEdges { if edge.ToVertexType == dfv1.VertexTypeReduceUDF && edge.GetToVertexPartitionCount() > 1 { - s := shuffle.NewShuffle(sp.VertexInstance.Vertex.GetName(), edge.GetToVertexPartitionCount()) + s := shuffle.NewShuffle(edge.To, edge.GetToVertexPartitionCount()) shuffleFuncMap[fmt.Sprintf("%s:%s", edge.From, edge.To)] = s } toVertexPartitionMap[edge.To] = edge.GetToVertexPartitionCount() diff --git a/pkg/udf/map_udf.go b/pkg/udf/map_udf.go index 577a8fc6c0..df01678aa2 100644 --- a/pkg/udf/map_udf.go +++ b/pkg/udf/map_udf.go @@ -111,7 +111,7 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { shuffleFuncMap := make(map[string]*shuffle.Shuffle) for _, edge := range u.VertexInstance.Vertex.Spec.ToEdges { if edge.ToVertexType == dfv1.VertexTypeReduceUDF && edge.GetToVertexPartitionCount() > 1 { - s := shuffle.NewShuffle(u.VertexInstance.Vertex.GetName(), edge.GetToVertexPartitionCount()) + s := shuffle.NewShuffle(edge.To, edge.GetToVertexPartitionCount()) shuffleFuncMap[fmt.Sprintf("%s:%s", edge.From, edge.To)] = s } } diff --git a/pkg/udf/reduce_udf.go b/pkg/udf/reduce_udf.go index 9217776c2f..3ec77647eb 100644 --- a/pkg/udf/reduce_udf.go +++ b/pkg/udf/reduce_udf.go @@ -122,7 +122,7 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error { shuffleFuncMap := make(map[string]*shuffle.Shuffle) for _, edge := range u.VertexInstance.Vertex.Spec.ToEdges { if edge.ToVertexType == dfv1.VertexTypeReduceUDF && edge.GetToVertexPartitionCount() > 1 { - s := shuffle.NewShuffle(u.VertexInstance.Vertex.GetName(), edge.GetToVertexPartitionCount()) + s := shuffle.NewShuffle(edge.To, edge.GetToVertexPartitionCount()) shuffleFuncMap[fmt.Sprintf("%s:%s", edge.From, edge.To)] = s } } diff --git a/pkg/watermark/fetch/edge_fetcher.go b/pkg/watermark/fetch/edge_fetcher.go index 529fb72f7d..2c6c52ca9c 100644 --- a/pkg/watermark/fetch/edge_fetcher.go +++ b/pkg/watermark/fetch/edge_fetcher.go @@ -47,7 +47,7 @@ type edgeFetcher struct { processorManager *processor.ProcessorManager lastProcessedWm []int64 log *zap.SugaredLogger - sync.Mutex + sync.RWMutex } // NewEdgeFetcher returns a new edge fetcher. @@ -67,19 +67,27 @@ func NewEdgeFetcher(ctx context.Context, bucketName string, storeWatcher store.W storeWatcher: storeWatcher, processorManager: manager, lastProcessedWm: lastProcessedWm, - log: log, + log: log.With("bucket", bucketName), } } -// GetWatermark gets the largest possible watermark from all the active processors of all partitions for the given offset and partition. -// We calculate the watermark for the given offset and partition, update the lastProcessedWatermark of the given partition to the watermark we just calculate. -// Then, we compare the lastProcessedWatermark from all partitions and return the minimum as the edge watermark. -// deletes the processor if it's not active. -func (e *edgeFetcher) GetWatermark(inputOffset isb.Offset, fromPartitionIdx int32) wmb.Watermark { +// ComputeWatermark processes the offset on the partition indicated and returns the overall Watermark +// from all Partitions +func (e *edgeFetcher) ComputeWatermark(offset isb.Offset, fromPartitionIdx int32) wmb.Watermark { + err := e.updateWatermark(offset, fromPartitionIdx) + if err != nil { + return wmb.InitialWatermark + } + return e.getWatermark() +} + +// updateWatermark updates state (lastProcessedWm) for the given partition based on the provided offset. +// Also deletes the processor if it's not active. +func (e *edgeFetcher) updateWatermark(inputOffset isb.Offset, fromPartitionIdx int32) error { var offset, err = inputOffset.Sequence() if err != nil { e.log.Errorw("Unable to get offset from isb.Offset.Sequence()", zap.Error(err)) - return wmb.InitialWatermark + return err } var debugString strings.Builder var epoch int64 = math.MaxInt64 @@ -119,22 +127,19 @@ func (e *edgeFetcher) GetWatermark(inputOffset isb.Offset, fromPartitionIdx int3 } // update the last processed watermark for the partition e.Lock() - defer e.Unlock() e.lastProcessedWm[fromPartitionIdx] = epoch - // get the smallest watermark among all the partitions - // since we cannot compare the offset of different partitions, we get the smallest among the last processed watermarks of all the partitions - minEpoch := e.getMinFromLastProcessed(epoch) + e.Unlock() - e.log.Debugf("%s[%s] get watermark for offset %d: %+v", debugString.String(), e.bucketName, offset, epoch) - return wmb.Watermark(time.UnixMilli(minEpoch)) + e.log.Debugf("%s[%s] processed watermark for offset %d: %+v", debugString.String(), e.bucketName, offset, epoch) + return nil } -// GetHeadWatermark returns the latest watermark among all processors for the given partition. +// GetHeadWatermark returns the smallest head watermark among all the processors for given partition // This can be used in showing the watermark // progression for a vertex when not consuming the messages directly (eg. UX, tests) // NOTE // - We don't use this function in the regular pods in the vertex. -// - UX only uses GetHeadWatermark, so the `p.IsDeleted()` check in the GetWatermark never happens. +// - UX only uses GetHeadWatermark, so the `p.IsDeleted()` check in updateWatermark() never happens. // Meaning, in the UX (daemon service) we never delete any processor. func (e *edgeFetcher) GetHeadWatermark(fromPartitionIdx int32) wmb.Watermark { var debugString strings.Builder @@ -202,14 +207,22 @@ func (e *edgeFetcher) GetHeadWMB(fromPartitionIdx int32) wmb.WMB { return wmb.WMB{} } - e.Lock() - defer e.Unlock() // update the last processed watermark for the partition + e.Lock() e.lastProcessedWm[fromPartitionIdx] = headWMB.Watermark + e.Unlock() + + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // TODO(join): the check below is temporarily remaining here instead of in EdgeFetcherSet::GetHeadWMB() so that this method + // can maintain its existing contract. + // Note that this means this method will return wmb.WMB{} some of the time which will cause EdgeFetcherSet::GetHeadWMB() to do the same + // even in cases where the overall Idle WMB < overall last processed watermark. + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // we only consider idle watermark if it is smaller than or equal to min of all the last processed watermarks. - if headWMB.Watermark > e.getMinFromLastProcessed(headWMB.Watermark) { + if headWMB.Watermark > e.getWatermark().UnixMilli() { return wmb.WMB{} } + e.log.Debugf("GetHeadWMB: %s[%s] get idle head wmb for offset", debugString.String(), e.bucketName) return headWMB } @@ -224,13 +237,15 @@ func (e *edgeFetcher) Close() error { return nil } -// getMinFromLastProcessed returns the smallest watermark among all the last processed watermarks. -func (e *edgeFetcher) getMinFromLastProcessed(watermark int64) int64 { - minWm := watermark +// getWatermark returns the smallest watermark among all the last processed watermarks. +func (e *edgeFetcher) getWatermark() wmb.Watermark { + minWm := int64(math.MaxInt64) + e.RLock() for _, wm := range e.lastProcessedWm { if minWm > wm { minWm = wm } } - return minWm + e.RUnlock() + return wmb.Watermark(time.UnixMilli(minWm)) } diff --git a/pkg/watermark/fetch/edge_fetcher_set.go b/pkg/watermark/fetch/edge_fetcher_set.go new file mode 100644 index 0000000000..b7973adee7 --- /dev/null +++ b/pkg/watermark/fetch/edge_fetcher_set.go @@ -0,0 +1,135 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fetch + +import ( + "context" + "errors" + "math" + "time" + + "github.com/numaproj/numaflow/pkg/isb" + "github.com/numaproj/numaflow/pkg/shared/logging" + "github.com/numaproj/numaflow/pkg/watermark/wmb" + "go.uber.org/zap" +) + +// a set of EdgeFetchers, incoming to a Vertex +// (In the case of a Join Vertex, there are multiple incoming Edges) +type edgeFetcherSet struct { + edgeFetchers map[string]Fetcher // key = name of From Vertex + log *zap.SugaredLogger +} + +func NewEdgeFetcherSet(ctx context.Context, edgeFetchers map[string]Fetcher) Fetcher { + return &edgeFetcherSet{ + edgeFetchers, + logging.FromContext(ctx), + } +} + +// ComputeWatermark processes the offset on the partition indicated and returns the overall Watermark +// from all Partitions +func (efs *edgeFetcherSet) ComputeWatermark(inputOffset isb.Offset, fromPartitionIdx int32) wmb.Watermark { + var wm wmb.Watermark + overallWatermark := wmb.Watermark(time.UnixMilli(math.MaxInt64)) + for fromVertex, fetcher := range efs.edgeFetchers { + wm = fetcher.ComputeWatermark(inputOffset, fromPartitionIdx) + efs.log.Debugf("Got Edge watermark from vertex=%q: %v", fromVertex, wm.UnixMilli()) + if wm.BeforeWatermark(overallWatermark) { + overallWatermark = wm + } + } + return overallWatermark +} + +// GetHeadWatermark returns the latest watermark among all processors for the given partition. +// This can be used in showing the watermark +// progression for a vertex when not consuming the messages directly (eg. UX, tests) +func (efs *edgeFetcherSet) GetHeadWatermark(fromPartitionIdx int32) wmb.Watermark { + // get the most conservative time (minimum watermark) across all Edges + var wm wmb.Watermark + overallWatermark := wmb.Watermark(time.UnixMilli(math.MaxInt64)) + for fromVertex, fetcher := range efs.edgeFetchers { + wm = fetcher.GetHeadWatermark(fromPartitionIdx) + if wm == wmb.InitialWatermark { // unset + continue + } + efs.log.Debugf("Got Edge Head Watermark from vertex=%q while processing partition %d: %v", fromVertex, fromPartitionIdx, wm.UnixMilli()) + if wm.BeforeWatermark(overallWatermark) { + overallWatermark = wm + } + } + return overallWatermark +} + +// GetHeadWMB returns the latest idle WMB with the smallest watermark for the given partition +// Only returns one if all Publishers are idle and if it's the smallest one of any partitions +func (efs *edgeFetcherSet) GetHeadWMB(fromPartitionIdx int32) wmb.WMB { + // if we get back one that's empty it means that there could be one that's not Idle, so we need to return empty + + // call GetHeadWMB() for all Edges and get the smallest one + var watermarkBuffer, unsetWMB wmb.WMB + var overallHeadWMB = wmb.WMB{ + // we find the head WMB based on watermark + Offset: math.MaxInt64, + Watermark: math.MaxInt64, + } + + for fromVertex, fetcher := range efs.edgeFetchers { + watermarkBuffer = fetcher.GetHeadWMB(fromPartitionIdx) + if watermarkBuffer == unsetWMB { // unset + return wmb.WMB{} + } + efs.log.Debugf("Got Edge Head WMB from vertex=%q while processing partition %d: %v", fromVertex, fromPartitionIdx, watermarkBuffer) + if watermarkBuffer.Watermark != -1 { + // find the smallest head offset of the smallest WMB.watermark (though latest) + if watermarkBuffer.Watermark < overallHeadWMB.Watermark { + overallHeadWMB = watermarkBuffer + } else if watermarkBuffer.Watermark == overallHeadWMB.Watermark && watermarkBuffer.Offset < overallHeadWMB.Offset { + overallHeadWMB = watermarkBuffer + } + } + } + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // TODO(join): the check below has been temporarily moved from here to EdgeFetcher::GetHeadWMB() so that EdgeFetcher::GetHeadWMB() + // can maintain its existing contract. + // Note that this means that method will return wmb.WMB{} some of the time which will cause this to do the same + // even in cases where the overall Idle WMB < overall last processed watermark. + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // we only consider idle watermark if it is smaller than or equal to min of all the last processed watermarks. + //if overallHeadWMB.Watermark > efs.GetWatermark().UnixMilli() { + // return wmb.WMB{} + //} + return overallHeadWMB + +} + +func (efs *edgeFetcherSet) Close() error { + aggregateErr := "" + for _, fetcher := range efs.edgeFetchers { + err := fetcher.Close() + if err != nil { + aggregateErr += err.Error() + "; " + } + } + if aggregateErr != "" { + return errors.New(aggregateErr) + } + return nil +} diff --git a/pkg/watermark/fetch/edge_fetcher_set_test.go b/pkg/watermark/fetch/edge_fetcher_set_test.go new file mode 100644 index 0000000000..36188c45b6 --- /dev/null +++ b/pkg/watermark/fetch/edge_fetcher_set_test.go @@ -0,0 +1,337 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fetch + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/numaproj/numaflow/pkg/isb" + "github.com/numaproj/numaflow/pkg/watermark/processor" + "github.com/numaproj/numaflow/pkg/watermark/store" + "github.com/numaproj/numaflow/pkg/watermark/store/noop" + "github.com/numaproj/numaflow/pkg/watermark/wmb" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" +) + +func Test_EdgeFetcherSet_ComputeWatermark(t *testing.T) { + var ctx = context.Background() + + // test fetching from 2 edges + + numIncomingVertices := 2 + partitionCount := int32(3) + processorManagers := make([]*processor.ProcessorManager, 0) + for i := 0; i < numIncomingVertices; i++ { + processorManagers = append(processorManagers, createProcessorManager(ctx, partitionCount)) + } + + testPodTimelines := [][][]wmb.WMB{ + // first vertex + { + // first Pod in first vertex + { + {Watermark: 11, Offset: 9, Partition: 0}, + {Watermark: 12, Offset: 20, Partition: 1}, + {Watermark: 13, Offset: 21, Partition: 2}, + {Watermark: 14, Offset: 22, Partition: 0}, + {Watermark: 17, Offset: 28, Partition: 1}, + {Watermark: 25, Offset: 30, Partition: 2}, + {Watermark: 26, Offset: 31, Partition: 0}, + {Watermark: 27, Offset: 32, Partition: 1}, + }, + // second Pod in first vertex + { + {Watermark: 8, Offset: 13, Partition: 0}, + {Watermark: 9, Offset: 16, Partition: 1}, + {Watermark: 10, Offset: 18, Partition: 2}, + {Watermark: 17, Offset: 26, Partition: 0}, + {Watermark: 27, Offset: 29, Partition: 1}, + {Watermark: 28, Offset: 33, Partition: 2}, + {Watermark: 29, Offset: 34, Partition: 0}, + }, + }, + // second vertex + { + // only Pod in second vertex + { + {Watermark: 10, Offset: 14, Partition: 0}, + {Watermark: 12, Offset: 17, Partition: 1}, + {Watermark: 14, Offset: 19, Partition: 2}, + {Watermark: 17, Offset: 24, Partition: 0}, + {Watermark: 25, Offset: 35, Partition: 1}, + {Watermark: 26, Offset: 36, Partition: 2}, + {Watermark: 27, Offset: 37, Partition: 0}, + }, + }, + } + + testPodsByVertex := make([][]*processor.ProcessorToFetch, numIncomingVertices) + for vertex := 0; vertex < numIncomingVertices; vertex++ { + numPods := len(testPodTimelines[vertex]) + testPodsByVertex[vertex] = make([]*processor.ProcessorToFetch, numPods) + for pod := 0; pod < numPods; pod++ { + name := fmt.Sprintf("test-pod-%d-%d", vertex, pod) + testPodsByVertex[vertex][pod] = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity(name), "test-bucket", 5, partitionCount) + for _, watermark := range testPodTimelines[vertex][pod] { + testPodsByVertex[vertex][pod].GetOffsetTimelines()[watermark.Partition].Put(watermark) + } + processorManagers[vertex].AddProcessor(name, testPodsByVertex[vertex][pod]) + } + + } + + tests := []struct { + name string + offset int64 + partitionIdx int32 + want int64 + lastProcessedWm [][]int64 // last processed watermark for each partition for each edge + }{ + + { + // test case where we end up using one of the lastProcessedWms since it's smallest + // for first EdgeFetcher: + //// offset 23 on partition 0 will produce WM 8 + //// if lastProcessedWm on other partitions is 7, we take 7 since 7<8 + // for second EdgeFetcher: + //// offset 23 on partition 0 will produce WM 10 + //// if lastProcessedWm on other partitions is 9, we take 9 since 9<10 + // then we compare EdgeFetchers 1 and 2 and get 7 since 7<9 + name: "useLastProcessedWm", + offset: 23, + want: 7, + partitionIdx: 0, + lastProcessedWm: [][]int64{ + {7, 7, 7}, + {9, 9, 9}, + }, + }, + { + // test case where we end up using the newly calculated watermark for one of the EdgeFetchers since it's smallest + // for first EdgeFetcher: + //// offset 23 on partition 0 will produce WM 8 + //// if lastProcessedWm on other partitions is 9, we take 8 since 8<9 + // for second EdgeFetcher: + //// offset 23 on partition 0 will produce WM 10 + //// if lastProcessedWm on other partitions is 9, we take 9 since 9<10 + // then we compare EdgeFetchers 1 and 2 and get 8 since 8<9 + name: "useLastProcessedWm", + offset: 23, + want: 8, + partitionIdx: 0, + lastProcessedWm: [][]int64{ + {6, 9, 9}, + {9, 9, 9}, + }, + }, + { + // test case in which other partitions haven't been processed yet + name: "unprocessedPartitions", + offset: 15, + want: -1, + partitionIdx: 1, + lastProcessedWm: [][]int64{ + {-1, -1, -1}, + {-1, -1, -1}, + }, + }, + } + + location, _ := time.LoadLocation("UTC") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // create EdgeFetcherSet with 2 EdgeFetchers + efs := &edgeFetcherSet{ + edgeFetchers: map[string]Fetcher{}, + log: zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel)).Sugar(), + } + + for vertex := 0; vertex < numIncomingVertices; vertex++ { + vertexName := fmt.Sprintf("vertex-%d", vertex) + efs.edgeFetchers[vertexName] = &edgeFetcher{ + ctx: ctx, + processorManager: processorManagers[vertex], + log: zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel)).Sugar(), + lastProcessedWm: tt.lastProcessedWm[vertex], + } + } + if got := efs.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(tt.offset, 10) }), tt.partitionIdx); time.Time(got).In(location) != time.UnixMilli(tt.want).In(location) { + t.Errorf("ComputeWatermark() = %v, want %v", got, wmb.Watermark(time.UnixMilli(tt.want))) + } + + }) + } + +} + +func createProcessorManager(ctx context.Context, partitionCount int32) *processor.ProcessorManager { + hbWatcher := noop.NewKVOpWatch() + otWatcher := noop.NewKVOpWatch() + storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) + return processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) +} + +type TestEdgeFetcher struct { + // are all of the publishers to this EdgeFetcher Idle? + allProcessorsIdle bool + + currentWatermark wmb.Watermark + + currentHeadWatermark wmb.Watermark +} + +func (t *TestEdgeFetcher) ComputeWatermark(inputOffset isb.Offset, fromPartitionIdx int32) wmb.Watermark { + return t.getWatermark() +} + +func (t *TestEdgeFetcher) getWatermark() wmb.Watermark { + return t.currentWatermark +} +func (t *TestEdgeFetcher) GetHeadWMB(fromPartitionIdx int32) wmb.WMB { + if t.allProcessorsIdle { + return wmb.WMB{Watermark: t.GetHeadWatermark(fromPartitionIdx).UnixMilli()} + } else { + return wmb.WMB{} + } +} +func (t *TestEdgeFetcher) GetHeadWatermark(fromPartitionIdx int32) wmb.Watermark { + return t.currentHeadWatermark +} +func (t *TestEdgeFetcher) Close() error { + return nil +} + +func Test_EdgeFetcherSet_GetHeadWMB(t *testing.T) { + + // cases to test: + // (should test 1 Edge Fetcher as well as 2) + // 1. one of them has all publishers Idle and 1 doesn't: should return WMB{} + // 2. all publishers Idle: should not return WMB{} and should return most conservative Watermark + // 3. all publishers Idle but somehow the GetWatermark() of one of the EdgeFetchers is higher than the returned value + + tests := []struct { + name string + edgeFetchers map[string]Fetcher + expectedWMB wmb.WMB + }{ + { + "oneNonIdle", + map[string]Fetcher{ + "nonidle": &TestEdgeFetcher{ + allProcessorsIdle: false, + currentWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + currentHeadWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + }, + }, + wmb.WMB{}, + }, + { + "oneIdle", + map[string]Fetcher{ + "idle": &TestEdgeFetcher{ + allProcessorsIdle: true, + currentWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + currentHeadWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 58, 0, time.UTC)), + }, + }, + wmb.WMB{Watermark: time.Date(2023, 11, 17, 20, 34, 58, 0, time.UTC).UnixMilli()}, + }, + { + "twoNonIdle", + map[string]Fetcher{ + "nonidle1": &TestEdgeFetcher{ + allProcessorsIdle: false, + currentWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + currentHeadWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + }, + "nonidle2": &TestEdgeFetcher{ + allProcessorsIdle: false, + currentWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + currentHeadWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 58, 0, time.UTC)), + }, + }, + wmb.WMB{}, + }, + { + "oneOfTwoIdle", + map[string]Fetcher{ + "idle": &TestEdgeFetcher{ + allProcessorsIdle: true, + currentWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + currentHeadWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + }, + "nonidle": &TestEdgeFetcher{ + allProcessorsIdle: false, + currentWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + currentHeadWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + }, + }, + wmb.WMB{}, + }, + { + "twoIdle", + map[string]Fetcher{ + "idle": &TestEdgeFetcher{ + allProcessorsIdle: true, + currentWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + currentHeadWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + }, + "idle2": &TestEdgeFetcher{ + allProcessorsIdle: true, + currentWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + currentHeadWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + }, + }, + wmb.WMB{Watermark: time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC).UnixMilli()}, + }, + /*{ //TODO(join): this is temporarily removed since the underlying code is temporarily removed + "exceedingWM", + map[string]Fetcher{ + "idle": &TestEdgeFetcher{ + allProcessorsIdle: true, + currentWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + currentHeadWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + }, + "idle2": &TestEdgeFetcher{ + allProcessorsIdle: true, + currentWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 58, 0, time.UTC)), + currentHeadWatermark: wmb.Watermark(time.Date(2023, 11, 17, 20, 34, 59, 0, time.UTC)), + }, + }, + wmb.WMB{}, + },*/ + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + efs := &edgeFetcherSet{ + edgeFetchers: tt.edgeFetchers, + log: zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel)).Sugar(), + } + headWMB := efs.GetHeadWMB(1) + assert.Equal(t, tt.expectedWMB, headWMB) + }) + } +} diff --git a/pkg/watermark/fetch/edge_fetcher_test.go b/pkg/watermark/fetch/edge_fetcher_test.go index 5eaead8b9c..0b3bbfda02 100644 --- a/pkg/watermark/fetch/edge_fetcher_test.go +++ b/pkg/watermark/fetch/edge_fetcher_test.go @@ -40,7 +40,7 @@ import ( "github.com/numaproj/numaflow/pkg/watermark/store" ) -func TestBuffer_GetWatermarkWithOnePartition(t *testing.T) { +func TestBuffer_ComputeWatermarkWithOnePartition(t *testing.T) { var ctx = context.Background() // We don't really need watcher because we manually call the `Put` function and the `addProcessor` function @@ -48,11 +48,11 @@ func TestBuffer_GetWatermarkWithOnePartition(t *testing.T) { hbWatcher := noop.NewKVOpWatch() otWatcher := noop.NewKVOpWatch() storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - processorManager := processor.NewProcessorManager(ctx, storeWatcher, 1) + processorManager := processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 1) var ( - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, 1) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, 1) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, 1) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, 1) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, 1) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, 1) pod0Timeline = []wmb.WMB{ {Watermark: 11, Offset: 9, Partition: 0}, {Watermark: 12, Offset: 20, Partition: 0}, @@ -159,8 +159,8 @@ func TestBuffer_GetWatermarkWithOnePartition(t *testing.T) { log: zaptest.NewLogger(t).Sugar(), lastProcessedWm: lastProcessed, } - if got := b.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(tt.args.offset, 10) }), 0); time.Time(got).In(location) != time.UnixMilli(tt.want).In(location) { - t.Errorf("GetWatermark() = %v, want %v", got, wmb.Watermark(time.UnixMilli(tt.want))) + if got := b.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(tt.args.offset, 10) }), 0); time.Time(got).In(location) != time.UnixMilli(tt.want).In(location) { + t.Errorf("ComputeWatermark() = %v, want %v", got, wmb.Watermark(time.UnixMilli(tt.want))) } // this will always be 17 because the timeline has been populated ahead of time // GetHeadWatermark is only used in UI and test @@ -169,7 +169,7 @@ func TestBuffer_GetWatermarkWithOnePartition(t *testing.T) { } } -func TestBuffer_GetWatermarkWithMultiplePartition(t *testing.T) { +func TestBuffer_ComputeGetWatermarkWithMultiplePartition(t *testing.T) { var ctx = context.Background() // We don't really need watcher because we manually call the `Put` function and the `addProcessor` function @@ -178,11 +178,11 @@ func TestBuffer_GetWatermarkWithMultiplePartition(t *testing.T) { otWatcher := noop.NewKVOpWatch() storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) partitionCount := int32(3) - processorManager := processor.NewProcessorManager(ctx, storeWatcher, partitionCount) + processorManager := processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) var ( - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) pod0Timeline = []wmb.WMB{ {Watermark: 11, Offset: 9, Partition: 0}, {Watermark: 12, Offset: 20, Partition: 1}, @@ -323,8 +323,8 @@ func TestBuffer_GetWatermarkWithMultiplePartition(t *testing.T) { log: zaptest.NewLogger(t).Sugar(), lastProcessedWm: tt.lastProcessedWm, } - if got := b.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(tt.args.offset, 10) }), tt.partitionIdx); time.Time(got).In(location) != time.UnixMilli(tt.want).In(location) { - t.Errorf("GetWatermark() = %v, want %v", got, wmb.Watermark(time.UnixMilli(tt.want))) + if got := b.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(tt.args.offset, 10) }), tt.partitionIdx); time.Time(got).In(location) != time.UnixMilli(tt.want).In(location) { + t.Errorf("ComputeWatermark() = %v, want %v", got, wmb.Watermark(time.UnixMilli(tt.want))) } // this will always be 27 because the timeline has been populated ahead of time // GetHeadWatermark is only used in UI and test @@ -341,8 +341,8 @@ func Test_edgeFetcher_GetHeadWatermark(t *testing.T) { hbWatcher = noop.NewKVOpWatch() otWatcher = noop.NewKVOpWatch() storeWatcher = store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - processorManager1 = processor.NewProcessorManager(ctx, storeWatcher, partitionCount) - processorManager2 = processor.NewProcessorManager(ctx, storeWatcher, partitionCount) + processorManager1 = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) + processorManager2 = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) ) getHeadWMTest1(ctx, processorManager1) @@ -381,9 +381,9 @@ func Test_edgeFetcher_GetHeadWatermark(t *testing.T) { func getHeadWMTest1(ctx context.Context, processorManager1 *processor.ProcessorManager) { var ( partitionCount = int32(2) - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) pod0Timeline = []wmb.WMB{ { Idle: true, @@ -445,9 +445,9 @@ func getHeadWMTest1(ctx context.Context, processorManager1 *processor.ProcessorM func getHeadWMTest2(ctx context.Context, processorManager2 *processor.ProcessorManager) { var ( partitionCount = int32(2) - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) pod0Timeline = []wmb.WMB{ { Idle: false, @@ -514,10 +514,10 @@ func Test_edgeFetcher_GetHeadWMB(t *testing.T) { hbWatcher = noop.NewKVOpWatch() otWatcher = noop.NewKVOpWatch() storeWatcher = store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - processorManager1 = processor.NewProcessorManager(ctx, storeWatcher, partitionCount) - processorManager2 = processor.NewProcessorManager(ctx, storeWatcher, partitionCount) - processorManager3 = processor.NewProcessorManager(ctx, storeWatcher, partitionCount) - processorManager4 = processor.NewProcessorManager(ctx, storeWatcher, partitionCount) + processorManager1 = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) + processorManager2 = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) + processorManager3 = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) + processorManager4 = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", partitionCount) ) getHeadWMBTest1(ctx, processorManager1) @@ -552,7 +552,7 @@ func Test_edgeFetcher_GetHeadWMB(t *testing.T) { }, { name: "all pods empty timeline", - processorManager: processorManager3, + processorManager: processorManager4, want: wmb.WMB{}, }, } @@ -578,9 +578,9 @@ func Test_edgeFetcher_GetHeadWMB(t *testing.T) { func getHeadWMBTest1(ctx context.Context, processorManager1 *processor.ProcessorManager) { var ( partitionCount = int32(3) - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) pod0Timeline = []wmb.WMB{ { Idle: true, @@ -660,9 +660,9 @@ func getHeadWMBTest1(ctx context.Context, processorManager1 *processor.Processor func getHeadWMBTest2(ctx context.Context, processorManager2 *processor.ProcessorManager) { var ( partitionCount = int32(3) - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) pod0Timeline = []wmb.WMB{ { Idle: false, @@ -742,9 +742,9 @@ func getHeadWMBTest2(ctx context.Context, processorManager2 *processor.Processor func getHeadWMBTest3(ctx context.Context, processorManager3 *processor.ProcessorManager) { var ( partitionCount = int32(3) - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) pod0Timeline = []wmb.WMB{ { Idle: false, @@ -824,9 +824,9 @@ func getHeadWMBTest3(ctx context.Context, processorManager3 *processor.Processor func getHeadWMBTest4(ctx context.Context, processorManager4 *processor.ProcessorManager) { var ( partitionCount = int32(3) - testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, partitionCount) - testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, partitionCount) - testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, partitionCount) + testPod0 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), "test-bucket", 5, partitionCount) + testPod1 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), "test-bucket", 5, partitionCount) + testPod2 = processor.NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), "test-bucket", 5, partitionCount) ) processorManager4.AddProcessor("testPod0", testPod0) processorManager4.AddProcessor("testPod1", testPod1) @@ -873,7 +873,7 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) { otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh) assert.NoError(t, err) storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - var processorManager = processor.NewProcessorManager(ctx, storeWatcher, 1) + var processorManager = processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 1) var fetcher = NewEdgeFetcher(ctx, "testBuffer", storeWatcher, processorManager, 1) var heartBeatManagerMap = make(map[string]*heartBeatManager) @@ -948,14 +948,14 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) { } } - _ = fetcher.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset, 10) }), 0) + _ = fetcher.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset, 10) }), 0) allProcessors = processorManager.GetAllProcessors() assert.Equal(t, 2, len(allProcessors)) assert.True(t, allProcessors["p1"].IsDeleted()) assert.True(t, allProcessors["p2"].IsActive()) // "p1" should be deleted after this GetWatermark offset=103 // because "p1" offsetTimeline's head offset=100, which is < inputOffset 103 - _ = fetcher.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+3, 10) }), 0) + _ = fetcher.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+3, 10) }), 0) allProcessors = processorManager.GetAllProcessors() assert.Equal(t, 1, len(allProcessors)) assert.True(t, allProcessors["p2"].IsActive()) @@ -989,7 +989,7 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) { } // "p1" has been deleted from vertex.Processors // so "p1" will be considered as a new processors with a new default offset timeline - _ = fetcher.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+1, 10) }), 0) + _ = fetcher.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+1, 10) }), 0) p1 := processorManager.GetProcessor("p1") assert.NotNil(t, p1) assert.True(t, p1.IsActive()) @@ -1111,7 +1111,7 @@ func TestFetcherWithSameOTBucketWithSinglePartition(t *testing.T) { otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient) assert.NoError(t, err) storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - processorManager := processor.NewProcessorManager(ctx, storeWatcher, 1) + processorManager := processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 1) fetcher := NewEdgeFetcher(ctx, "testBuffer", storeWatcher, processorManager, 1) var heartBeatManagerMap = make(map[string]*heartBeatManager) @@ -1201,14 +1201,14 @@ func TestFetcherWithSameOTBucketWithSinglePartition(t *testing.T) { assert.True(t, allProcessors["p1"].IsDeleted()) assert.True(t, allProcessors["p2"].IsActive()) - _ = fetcher.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset, 10) }), 0) + _ = fetcher.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset, 10) }), 0) allProcessors = processorManager.GetAllProcessors() assert.Equal(t, 2, len(allProcessors)) assert.True(t, allProcessors["p1"].IsDeleted()) assert.True(t, allProcessors["p2"].IsActive()) // "p1" should be deleted after this GetWatermark offset=103 // because "p1" offsetTimeline's head offset=102, which is < inputOffset 103 - _ = fetcher.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+3, 10) }), 0) + _ = fetcher.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+3, 10) }), 0) allProcessors = processorManager.GetAllProcessors() assert.Equal(t, 1, len(allProcessors)) assert.True(t, allProcessors["p2"].IsActive()) @@ -1233,7 +1233,7 @@ func TestFetcherWithSameOTBucketWithSinglePartition(t *testing.T) { assert.True(t, allProcessors["p2"].IsActive()) // "p1" has been deleted from vertex.Processors // so "p1" will be considered as a new processors with a new default offset timeline - _ = fetcher.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+1, 10) }), 0) + _ = fetcher.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+1, 10) }), 0) p1 := processorManager.GetProcessor("p1") assert.NotNil(t, p1) assert.True(t, p1.IsActive()) @@ -1402,7 +1402,7 @@ func TestFetcherWithSameOTBucketWithMultiplePartition(t *testing.T) { otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient) assert.NoError(t, err) storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - processorManager := processor.NewProcessorManager(ctx, storeWatcher, 3) + processorManager := processor.NewProcessorManager(ctx, storeWatcher, "test-bucket", 3) fetcher := NewEdgeFetcher(ctx, "testBuffer", storeWatcher, processorManager, 3) var heartBeatManagerMap = make(map[string]*heartBeatManager) @@ -1520,9 +1520,9 @@ func TestFetcherWithSameOTBucketWithMultiplePartition(t *testing.T) { assert.True(t, allProcessors["p1"].IsDeleted()) assert.True(t, allProcessors["p2"].IsActive()) - _ = fetcher.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset, 10) }), 0) - _ = fetcher.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset, 10) }), 1) - _ = fetcher.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset, 10) }), 2) + _ = fetcher.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset, 10) }), 0) + _ = fetcher.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset, 10) }), 1) + _ = fetcher.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset, 10) }), 2) allProcessors = processorManager.GetAllProcessors() assert.Equal(t, 2, len(allProcessors)) @@ -1530,7 +1530,7 @@ func TestFetcherWithSameOTBucketWithMultiplePartition(t *testing.T) { assert.True(t, allProcessors["p2"].IsActive()) // "p1" should be deleted after this GetWatermark offset=103 // because "p1" offsetTimeline's head offset=102, which is < inputOffset 103 - _ = fetcher.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+3, 10) }), 0) + _ = fetcher.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+3, 10) }), 0) allProcessors = processorManager.GetAllProcessors() assert.Equal(t, 1, len(allProcessors)) assert.True(t, allProcessors["p2"].IsActive()) @@ -1555,7 +1555,7 @@ func TestFetcherWithSameOTBucketWithMultiplePartition(t *testing.T) { assert.True(t, allProcessors["p2"].IsActive()) // "p1" has been deleted from vertex.Processors // so "p1" will be considered as a new processors with a new default offset timeline - _ = fetcher.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+1, 10) }), 0) + _ = fetcher.ComputeWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+1, 10) }), 0) p1 := processorManager.GetProcessor("p1") assert.NotNil(t, p1) assert.True(t, p1.IsActive()) diff --git a/pkg/watermark/fetch/interface.go b/pkg/watermark/fetch/interface.go index 384403f4a5..12588c3eda 100644 --- a/pkg/watermark/fetch/interface.go +++ b/pkg/watermark/fetch/interface.go @@ -26,8 +26,9 @@ import ( // Fetcher fetches watermark data from Vn-1 vertex. type Fetcher interface { io.Closer - // GetWatermark returns the inorder monotonically increasing watermark of the edge connected to Vn-1. - GetWatermark(offset isb.Offset, fromPartitionIdx int32) wmb.Watermark + // ComputeWatermark processes the offset on the partition indicated and returns the overall Watermark + // from all Partitions + ComputeWatermark(offset isb.Offset, fromPartitionIdx int32) wmb.Watermark // GetHeadWatermark returns the latest watermark among all processors GetHeadWatermark(fromPartitionIdx int32) wmb.Watermark // GetHeadWMB returns the latest idle WMB among all processors diff --git a/pkg/watermark/fetch/source_fetcher.go b/pkg/watermark/fetch/source_fetcher.go index 0b7c5ccbb1..7ef9874e77 100644 --- a/pkg/watermark/fetch/source_fetcher.go +++ b/pkg/watermark/fetch/source_fetcher.go @@ -56,19 +56,28 @@ func NewSourceFetcher(ctx context.Context, sourceBufferName string, storeWatcher } } -// GetWatermark returns the lowest of the latest Watermark of all the processors, +func (e *sourceFetcher) ComputeWatermark(offset isb.Offset, fromPartitionIdx int32) wmb.Watermark { + return e.getWatermark() +} + +// getWatermark returns the lowest of the latest Watermark of all the processors, // it ignores the input Offset. -func (e *sourceFetcher) GetWatermark(offset isb.Offset, fromPartitionIdx int32) wmb.Watermark { +func (e *sourceFetcher) getWatermark() wmb.Watermark { var epoch int64 = math.MaxInt64 var debugString strings.Builder for _, p := range e.processorManager.GetAllProcessors() { + + if len(p.GetOffsetTimelines()) != 1 { + e.log.Fatalf("sourceFetcher %+v has %d offset timelines, expected 1", e, len(p.GetOffsetTimelines())) + } + offsetTimeline := p.GetOffsetTimelines()[0] debugString.WriteString(fmt.Sprintf("[Processor: %v] \n", p)) if !p.IsActive() { continue } - if p.GetOffsetTimelines()[fromPartitionIdx].GetHeadWatermark() < epoch { - epoch = p.GetOffsetTimelines()[fromPartitionIdx].GetHeadWatermark() + if offsetTimeline.GetHeadWatermark() < epoch { + epoch = offsetTimeline.GetHeadWatermark() } } if epoch == math.MaxInt64 { diff --git a/pkg/watermark/generic/jetstream/generic.go b/pkg/watermark/generic/jetstream/generic.go index e9dbe52822..8bccd25338 100644 --- a/pkg/watermark/generic/jetstream/generic.go +++ b/pkg/watermark/generic/jetstream/generic.go @@ -51,25 +51,75 @@ func BuildWatermarkProgressors(ctx context.Context, vertexInstance *v1alpha1.Ver } pipelineName := vertexInstance.Vertex.Spec.PipelineName - fromBucket := vertexInstance.Vertex.GetFromBuckets()[0] + fetchWatermark, err := buildFetcher(ctx, vertexInstance, client) + if err != nil { + return nil, nil, err + } + + publishWatermark, err := buildPublishers(ctx, pipelineName, vertexInstance, client) + if err != nil { + return nil, nil, err + } + + return fetchWatermark, publishWatermark, nil +} + +// buildFetcher creates a Fetcher (implemented by EdgeFetcherSet) which is used to fetch the Watermarks for a given Vertex +// (for all incoming Edges) and resolve the overall Watermark for the Vertex +func buildFetcher(ctx context.Context, vertexInstance *v1alpha1.VertexInstance, client *jsclient.NATSClient) (fetch.Fetcher, error) { + // if watermark is not enabled, use no-op. + if vertexInstance.Vertex.Spec.Watermark.Disabled { + return nil, fmt.Errorf("watermark disabled") + } + + pipelineName := vertexInstance.Vertex.Spec.PipelineName + edgeFetchers := make(map[string]fetch.Fetcher) + + vertex := vertexInstance.Vertex + if vertex.IsASource() { + fromBucket := v1alpha1.GenerateSourceBucketName(vertex.Namespace, vertex.Spec.PipelineName, vertex.Spec.Name) + edgeFetcher, err := buildFetcherForBucket(ctx, vertexInstance, fromBucket, client) + if err != nil { + return nil, err + } + // For source vertex, we use the vertex name as the from buffer name + edgeFetchers[vertex.Spec.Name] = edgeFetcher + } else { + for _, e := range vertex.Spec.FromEdges { + fromBucket := v1alpha1.GenerateEdgeBucketName(vertexInstance.Vertex.Namespace, pipelineName, e.From, e.To) + edgeFetcher, err := buildFetcherForBucket(ctx, vertexInstance, fromBucket, client) + if err != nil { + return nil, err + } + edgeFetchers[e.From] = edgeFetcher + } + } + + return fetch.NewEdgeFetcherSet(ctx, edgeFetchers), nil +} + +// buildFetcherForBucket creates a Fetcher (implemented by EdgeFetcher) which is used to fetch the Watermarks for a single incoming Edge +// to a Vertex (a single Edge has a single Bucket) +func buildFetcherForBucket(ctx context.Context, vertexInstance *v1alpha1.VertexInstance, fromBucket string, client *jsclient.NATSClient) (fetch.Fetcher, error) { var fetchWatermark fetch.Fetcher + pipelineName := vertexInstance.Vertex.Spec.PipelineName hbBucketName := isbsvc.JetStreamProcessorBucket(fromBucket) hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, hbBucketName, client) if err != nil { - return nil, nil, fmt.Errorf("failed at new HB KVJetStreamKVWatch, HeartbeatBucket: %s, %w", hbBucketName, err) + return nil, fmt.Errorf("failed at new HB KVJetStreamKVWatch, HeartbeatBucket: %s, %w", hbBucketName, err) } otBucketName := isbsvc.JetStreamOTBucket(fromBucket) otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, otBucketName, client) if err != nil { - return nil, nil, fmt.Errorf("failed at new OT KVJetStreamKVWatch, OTBucket: %s, %w", otBucketName, err) + return nil, fmt.Errorf("failed at new OT KVJetStreamKVWatch, OTBucket: %s, %w", otBucketName, err) } // create a store watcher that watches the heartbeat and ot store. storeWatcher := store.BuildWatermarkStoreWatcher(hbWatch, otWatch) // create processor manager with the store watcher which will keep track of all the active processors and updates the offset timelines accordingly. - processManager := processor.NewProcessorManager(ctx, storeWatcher, int32(len(vertexInstance.Vertex.OwnedBuffers())), + processManager := processor.NewProcessorManager(ctx, storeWatcher, fromBucket, int32(len(vertexInstance.Vertex.OwnedBuffers())), processor.WithVertexReplica(vertexInstance.Replica), processor.WithIsReduce(vertexInstance.Vertex.IsReduceUDF()), processor.WithIsSource(vertexInstance.Vertex.IsASource())) // create a fetcher that fetches watermark. @@ -81,6 +131,11 @@ func BuildWatermarkProgressors(ctx context.Context, vertexInstance *v1alpha1.Ver fetchWatermark = fetch.NewEdgeFetcher(ctx, fromBucket, storeWatcher, processManager, vertexInstance.Vertex.Spec.GetPartitionCount()) } + return fetchWatermark, nil +} + +// buildPublishers creates the Watermark Publishers for a given Vertex, one per Edge +func buildPublishers(ctx context.Context, pipelineName string, vertexInstance *v1alpha1.VertexInstance, client *jsclient.NATSClient) (map[string]publish.Publisher, error) { // Publisher map creation, we need a publisher per out buffer. var publishWatermark = make(map[string]publish.Publisher) var processorName = fmt.Sprintf("%s-%d", vertexInstance.Vertex.Name, vertexInstance.Replica) @@ -90,13 +145,13 @@ func BuildWatermarkProgressors(ctx context.Context, vertexInstance *v1alpha1.Ver hbPublisherBucketName := isbsvc.JetStreamProcessorBucket(toBucket) hbStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, hbPublisherBucketName, client) if err != nil { - return nil, nil, fmt.Errorf("failed at new HB Publish JetStreamKVStore, HeartbeatPublisherBucket: %s, %w", hbPublisherBucketName, err) + return nil, fmt.Errorf("failed at new HB Publish JetStreamKVStore, HeartbeatPublisherBucket: %s, %w", hbPublisherBucketName, err) } otStoreBucketName := isbsvc.JetStreamOTBucket(toBucket) otStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, otStoreBucketName, client) if err != nil { - return nil, nil, fmt.Errorf("failed at new OT Publish JetStreamKVStore, OTBucket: %s, %w", otStoreBucketName, err) + return nil, fmt.Errorf("failed at new OT Publish JetStreamKVStore, OTBucket: %s, %w", otStoreBucketName, err) } // For sink vertex, we use the vertex name as the to buffer name, which is the key for the publisher map. publishWatermark[vertexInstance.Vertex.Spec.Name] = publish.NewPublish(ctx, publishEntity, store.BuildWatermarkStore(hbStore, otStore), 1, publish.IsSink()) @@ -106,18 +161,19 @@ func BuildWatermarkProgressors(ctx context.Context, vertexInstance *v1alpha1.Ver hbPublisherBucketName := isbsvc.JetStreamProcessorBucket(toBucket) hbStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, hbPublisherBucketName, client) if err != nil { - return nil, nil, fmt.Errorf("failed at new HB Publish JetStreamKVStore, HeartbeatPublisherBucket: %s, %w", hbPublisherBucketName, err) + return nil, fmt.Errorf("failed at new HB Publish JetStreamKVStore, HeartbeatPublisherBucket: %s, %w", hbPublisherBucketName, err) } otStoreBucketName := isbsvc.JetStreamOTBucket(toBucket) otStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, otStoreBucketName, client) if err != nil { - return nil, nil, fmt.Errorf("failed at new OT Publish JetStreamKVStore, OTBucket: %s, %w", otStoreBucketName, err) + return nil, fmt.Errorf("failed at new OT Publish JetStreamKVStore, OTBucket: %s, %w", otStoreBucketName, err) } publishWatermark[e.To] = publish.NewPublish(ctx, publishEntity, store.BuildWatermarkStore(hbStore, otStore), int32(e.GetToVertexPartitionCount())) } } - return fetchWatermark, publishWatermark, nil + + return publishWatermark, nil } // BuildSourcePublisherStores builds the watermark stores for source publisher. diff --git a/pkg/watermark/generic/noop.go b/pkg/watermark/generic/noop.go index ad75b968c8..a66b49914a 100644 --- a/pkg/watermark/generic/noop.go +++ b/pkg/watermark/generic/noop.go @@ -36,8 +36,7 @@ func NewNoOpWMProgressor() *NoOpWMProgressor { return &NoOpWMProgressor{} } -// GetWatermark returns the default watermark. -func (n NoOpWMProgressor) GetWatermark(isb.Offset, int32) wmb.Watermark { +func (n NoOpWMProgressor) ComputeWatermark(isb.Offset, int32) wmb.Watermark { return wmb.Watermark{} } diff --git a/pkg/watermark/processor/processor_manager.go b/pkg/watermark/processor/processor_manager.go index 009fd77fee..a7514fbb18 100644 --- a/pkg/watermark/processor/processor_manager.go +++ b/pkg/watermark/processor/processor_manager.go @@ -48,6 +48,8 @@ type ProcessorManager struct { // processors has reference to the actual processing unit (ProcessorEntitier) which includes offset timeline which is // used for tracking watermark. processors map[string]*ProcessorToFetch + // name of the bucket, used for logging + bucket string // fromBufferPartitionCount is the number of partitions in the fromBuffer fromBufferPartitionCount int32 lock sync.RWMutex @@ -58,7 +60,7 @@ type ProcessorManager struct { } // NewProcessorManager returns a new ProcessorManager instance -func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.WatermarkStoreWatcher, fromBufferPartitionCount int32, inputOpts ...ProcessorManagerOption) *ProcessorManager { +func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.WatermarkStoreWatcher, bucket string, fromBufferPartitionCount int32, inputOpts ...ProcessorManagerOption) *ProcessorManager { opts := &processorManagerOptions{ podHeartbeatRate: 5, refreshingProcessorsRate: 5, @@ -75,8 +77,9 @@ func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.Waterm otWatcher: watermarkStoreWatcher.OffsetTimelineWatcher(), heartbeat: NewProcessorHeartbeat(), processors: make(map[string]*ProcessorToFetch), + bucket: bucket, fromBufferPartitionCount: fromBufferPartitionCount, - log: logging.FromContext(ctx), + log: logging.FromContext(ctx).With("bucket", bucket), opts: opts, } if v.opts.isReduce || v.opts.isSource { @@ -196,7 +199,7 @@ func (v *ProcessorManager) startHeatBeatWatcher() { var entity = NewProcessorEntity(value.Key()) // if the processor is a reduce or source processor, then we only need one fromProcessor // because the reduce or source will read from only one partition. - fromProcessor := NewProcessorToFetch(v.ctx, entity, 10, v.fromBufferPartitionCount) + fromProcessor := NewProcessorToFetch(v.ctx, entity, v.bucket, 10, v.fromBufferPartitionCount) v.AddProcessor(value.Key(), fromProcessor) v.log.Infow("v.AddProcessor successfully added a new fromProcessor", zap.String("fromProcessor", value.Key())) } else { // else just make a note that this processor is still active diff --git a/pkg/watermark/processor/processor_manager_test.go b/pkg/watermark/processor/processor_manager_test.go index 1d5a765253..634f6380c5 100644 --- a/pkg/watermark/processor/processor_manager_test.go +++ b/pkg/watermark/processor/processor_manager_test.go @@ -67,7 +67,7 @@ func TestProcessorManager(t *testing.T) { otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh) assert.NoError(t, err) storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - var processorManager = NewProcessorManager(ctx, storeWatcher, 1) + var processorManager = NewProcessorManager(ctx, storeWatcher, "my-bucket", 1) // start p1 heartbeat for 3 loops then delete p1 go func() { var err error @@ -156,7 +156,7 @@ func TestProcessorManagerWatchForMapWithOnePartition(t *testing.T) { otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh) assert.NoError(t, err) storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - var processorManager = NewProcessorManager(ctx, storeWatcher, 1) + var processorManager = NewProcessorManager(ctx, storeWatcher, "", 1) // start p1 heartbeat for 3 loops go func(ctx context.Context) { for { @@ -255,7 +255,7 @@ func TestProcessorManagerWatchForReduce(t *testing.T) { otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh) assert.NoError(t, err) storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - var processorManager = NewProcessorManager(ctx, storeWatcher, 1, WithIsReduce(true), WithVertexReplica(2)) + var processorManager = NewProcessorManager(ctx, storeWatcher, "my-bucket", 1, WithIsReduce(true), WithVertexReplica(2)) // start p1 heartbeat for 3 loops go func(ctx context.Context) { for { @@ -368,7 +368,7 @@ func TestProcessorManagerWatchForMapWithMultiplePartition(t *testing.T) { otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh) assert.NoError(t, err) storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - var processorManager = NewProcessorManager(ctx, storeWatcher, 3) + var processorManager = NewProcessorManager(ctx, storeWatcher, "my-bucket", 3) // start p1 heartbeat for 3 loops go func(ctx context.Context) { var err error diff --git a/pkg/watermark/processor/processor_to_fetch.go b/pkg/watermark/processor/processor_to_fetch.go index 1ab1cec19d..9e0e808306 100644 --- a/pkg/watermark/processor/processor_to_fetch.go +++ b/pkg/watermark/processor/processor_to_fetch.go @@ -79,11 +79,11 @@ func (p *ProcessorToFetch) String() string { } // NewProcessorToFetch creates ProcessorToFetch. -func NewProcessorToFetch(ctx context.Context, processor ProcessorEntitier, capacity int, fromBufferPartitionCount int32) *ProcessorToFetch { +func NewProcessorToFetch(ctx context.Context, processor ProcessorEntitier, bucket string, capacity int, fromBufferPartitionCount int32) *ProcessorToFetch { var offsetTimelines []*timeline.OffsetTimeline for i := int32(0); i < fromBufferPartitionCount; i++ { - t := timeline.NewOffsetTimeline(ctx, capacity) + t := timeline.NewOffsetTimeline(ctx, capacity, bucket) offsetTimelines = append(offsetTimelines, t) } p := &ProcessorToFetch{ diff --git a/pkg/watermark/processor/processor_to_fetch_test.go b/pkg/watermark/processor/processor_to_fetch_test.go index 859be7e88b..dd121e5cae 100644 --- a/pkg/watermark/processor/processor_to_fetch_test.go +++ b/pkg/watermark/processor/processor_to_fetch_test.go @@ -25,7 +25,7 @@ import ( func TestFromProcessor_setStatus(t *testing.T) { var ctx = context.Background() - p := NewProcessorToFetch(ctx, NewProcessorEntity("test-pod"), 5, 1) + p := NewProcessorToFetch(ctx, NewProcessorEntity("test-pod"), "test-bucket", 5, 1) p.setStatus(_inactive) assert.Equal(t, _inactive, p.status) } diff --git a/pkg/watermark/publish/publisher.go b/pkg/watermark/publish/publisher.go index 994b076610..6643e59b34 100644 --- a/pkg/watermark/publish/publisher.go +++ b/pkg/watermark/publish/publisher.go @@ -148,17 +148,17 @@ func (p *publish) PublishWatermark(wm wmb.Watermark, offset isb.Offset, toVertex value, err := otValue.EncodeToBytes() if err != nil { - p.log.Errorw("Unable to publish watermark", zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Error(err)) + p.log.Errorw("Unable to publish watermark", zap.Int32("toVertexPartitionIdx", toVertexPartitionIdx), zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Error(err)) } for { err := p.otStore.PutKV(p.ctx, key, value) if err != nil { - p.log.Errorw("Unable to publish watermark", zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Error(err)) + p.log.Errorw("Unable to publish watermark", zap.Int32("toVertexPartitionIdx", toVertexPartitionIdx), zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Error(err)) // TODO: better exponential backoff time.Sleep(time.Millisecond * 250) } else { - p.log.Debugw("New watermark published with offset", zap.Int64("head", p.GetHeadWM(toVertexPartitionIdx).UnixMilli()), zap.Int64("new", validWM.UnixMilli()), zap.Int64("offset", seq)) + p.log.Debugw("New watermark published with offset", zap.Int32("toVertexPartitionIdx", toVertexPartitionIdx), zap.Int64("head", p.GetHeadWM(toVertexPartitionIdx).UnixMilli()), zap.Int64("new", validWM.UnixMilli()), zap.Int64("offset", seq)) break } } @@ -172,14 +172,14 @@ func (p *publish) validateWatermark(wm wmb.Watermark, toVertexPartitionIdx int32 } // update p.headWatermarks only if wm > p.headWatermarks headWM := p.GetHeadWM(toVertexPartitionIdx) - if wm.After(time.Time(headWM)) { - p.log.Debugw("New watermark is updated for the head watermark", zap.String("head", headWM.String()), zap.String("new", wm.String())) + if wm.AfterWatermark(headWM) { + p.log.Debugw("New watermark is updated for the head watermark", zap.Int32("toVertexPartitionIdx", toVertexPartitionIdx), zap.Int64("head", headWM.UnixMilli()), zap.Int64("new", wm.UnixMilli())) p.SetHeadWM(wm, toVertexPartitionIdx) - } else if wm.Before(time.Time(headWM)) { - p.log.Infow("Skip publishing the new watermark because it's older than the current watermark", zap.String("entity", p.entity.GetName()), zap.Int64("head", headWM.UnixMilli()), zap.Int64("new", wm.UnixMilli())) + } else if wm.BeforeWatermark(headWM) { + p.log.Infow("Skip publishing the new watermark because it's older than the current watermark", zap.Int32("toVertexPartitionIdx", toVertexPartitionIdx), zap.String("entity", p.entity.GetName()), zap.Int64("head", headWM.UnixMilli()), zap.Int64("new", wm.UnixMilli())) return wmb.Watermark{}, true } else { - p.log.Debugw("Skip publishing the new watermark because it's the same as the current watermark", zap.String("entity", p.entity.GetName()), zap.Int64("head", headWM.UnixMilli()), zap.Int64("new", wm.UnixMilli())) + p.log.Debugw("Skip publishing the new watermark because it's the same as the current watermark", zap.Int32("toVertexPartitionIdx", toVertexPartitionIdx), zap.String("entity", p.entity.GetName()), zap.Int64("head", headWM.UnixMilli()), zap.Int64("new", wm.UnixMilli())) return wmb.Watermark{}, true } return wm, false @@ -210,17 +210,17 @@ func (p *publish) PublishIdleWatermark(wm wmb.Watermark, offset isb.Offset, toVe value, err := otValue.EncodeToBytes() if err != nil { - p.log.Errorw("Unable to publish idle watermark", zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Error(err)) + p.log.Errorw("Unable to publish idle watermark", zap.Int32("toVertexPartitionIdx", toVertexPartitionIdx), zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Error(err)) } for { err := p.otStore.PutKV(p.ctx, key, value) if err != nil { - p.log.Errorw("Unable to publish idle watermark", zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Error(err)) + p.log.Errorw("Unable to publish idle watermark", zap.Int32("toVertexPartitionIdx", toVertexPartitionIdx), zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Error(err)) // TODO: better exponential backoff time.Sleep(time.Millisecond * 250) } else { - p.log.Debugw("New idle watermark published", zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Int64("offset", seq), zap.Int64("watermark", validWM.UnixMilli())) + p.log.Debugw("New idle watermark published", zap.Int32("toVertexPartitionIdx", toVertexPartitionIdx), zap.String("HB", p.heartbeatStore.GetStoreName()), zap.String("OT", p.otStore.GetStoreName()), zap.String("key", key), zap.Int64("offset", seq), zap.Int64("watermark", validWM.UnixMilli())) break } } @@ -252,7 +252,7 @@ func (p *publish) loadLatestFromStore() wmb.Watermark { func (p *publish) GetLatestWatermark() wmb.Watermark { var latestWatermark = wmb.InitialWatermark for _, wm := range p.headWatermarks { - if wm.After(time.Time(latestWatermark)) { + if wm.AfterWatermark(latestWatermark) { latestWatermark = wm } } diff --git a/pkg/watermark/timeline/offset_timeline.go b/pkg/watermark/timeline/offset_timeline.go index 1aeec9bb0a..d20991781e 100644 --- a/pkg/watermark/timeline/offset_timeline.go +++ b/pkg/watermark/timeline/offset_timeline.go @@ -42,13 +42,13 @@ type OffsetTimeline struct { } // NewOffsetTimeline returns OffsetTimeline. -func NewOffsetTimeline(ctx context.Context, c int) *OffsetTimeline { +func NewOffsetTimeline(ctx context.Context, c int, bucket string) *OffsetTimeline { // Initialize a new empty watermarks DLL with nil values of the size capacity. // This is to avoid length check: when a new element is added, the tail element will be deleted. offsetTimeline := OffsetTimeline{ ctx: ctx, capacity: c, - log: logging.FromContext(ctx), + log: logging.FromContext(ctx).With("bucket", bucket), } for i := 0; i < c; i++ { @@ -97,7 +97,7 @@ func (t *OffsetTimeline) Put(node wmb.WMB) { } } else if node.Watermark > elementNode.Watermark { if node.Offset < elementNode.Offset { - t.log.Errorw("The new input offset should never be smaller than the existing offset", zap.Int64("watermark", node.Watermark), + t.log.Errorw("The new input offset should never be smaller than the existing offset", zap.Int64("watermark", node.Watermark), zap.Int64("existingWatermark", elementNode.Watermark), zap.Int64("existingOffset", elementNode.Offset), zap.Int64("inputOffset", node.Offset)) return } diff --git a/pkg/watermark/timeline/offset_timeline_test.go b/pkg/watermark/timeline/offset_timeline_test.go index 13d8b93eea..9263fd3507 100644 --- a/pkg/watermark/timeline/offset_timeline_test.go +++ b/pkg/watermark/timeline/offset_timeline_test.go @@ -29,8 +29,8 @@ import ( func TestTimeline_GetEventTime(t *testing.T) { var ( ctx = context.Background() - emptyTimeline = NewOffsetTimeline(ctx, 5) - testTimeline = NewOffsetTimeline(ctx, 10) + emptyTimeline = NewOffsetTimeline(ctx, 5, "myBucket") + testTimeline = NewOffsetTimeline(ctx, 10, "myBucket") testwatermarks = []wmb.WMB{ {Watermark: 10, Offset: 9}, {Watermark: 12, Offset: 10}, @@ -127,7 +127,7 @@ func TestTimeline_GetEventTime(t *testing.T) { func TestOffsetTimeline_GetOffset(t *testing.T) { var ( ctx = context.Background() - testTimeline = NewOffsetTimeline(ctx, 10) + testTimeline = NewOffsetTimeline(ctx, 10, "myBucket") testwatermarks = []wmb.WMB{ {Watermark: 10, Offset: 9}, {Watermark: 12, Offset: 20}, @@ -223,7 +223,7 @@ func TestOffsetTimeline_GetOffset(t *testing.T) { func TestOffsetTimeline_PutIdle(t *testing.T) { var ( ctx = context.Background() - testTimeline = NewOffsetTimeline(ctx, 10) + testTimeline = NewOffsetTimeline(ctx, 10, "myBucket") setUps = []wmb.WMB{ {Idle: false, Watermark: 10, Offset: 9}, {Idle: false, Watermark: 12, Offset: 20}, diff --git a/pkg/watermark/wmb/watermark.go b/pkg/watermark/wmb/watermark.go index f26119b43f..d920e93e60 100644 --- a/pkg/watermark/wmb/watermark.go +++ b/pkg/watermark/wmb/watermark.go @@ -39,6 +39,14 @@ func (w Watermark) After(t time.Time) bool { return time.Time(w).After(t) } +func (w Watermark) AfterWatermark(compare Watermark) bool { + return w.After(time.Time(compare)) +} + func (w Watermark) Before(t time.Time) bool { return time.Time(w).Before(t) } + +func (w Watermark) BeforeWatermark(compare Watermark) bool { + return w.Before(time.Time(compare)) +} diff --git a/test/e2e-suite-2/functional_test.go b/test/e2e-suite-2/functional_test.go index 17ee32d1ab..81652a36ab 100644 --- a/test/e2e-suite-2/functional_test.go +++ b/test/e2e-suite-2/functional_test.go @@ -75,6 +75,22 @@ func (s *FunctionalSuite) TestDropOnFull() { } } } +func (s *FunctionalSuite) TestJoinSinkVertex() { + w := s.Given().Pipeline("@testdata/join-on-sink.yaml"). + When(). + CreatePipelineAndWait() + defer w.DeletePipelineAndWait() + pipelineName := "join-on-sink" + + // wait for all the pods to come up + w.Expect().VertexPodsRunning() + + w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("888888"))). + SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("888889"))) + + w.Expect().SinkContains("out", "888888") + w.Expect().SinkContains("out", "888889") +} func TestFunctionalSuite(t *testing.T) { suite.Run(t, new(FunctionalSuite)) diff --git a/test/e2e-suite-2/testdata/join-on-sink.yaml b/test/e2e-suite-2/testdata/join-on-sink.yaml new file mode 100644 index 0000000000..67944b5044 --- /dev/null +++ b/test/e2e-suite-2/testdata/join-on-sink.yaml @@ -0,0 +1,50 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: join-on-sink +spec: + vertices: + - name: in + source: + http: {} + - name: even-or-odd + udf: + container: + # Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/even_odd + image: quay.io/numaio/numaflow-go/map-even-odd + - name: even-cat + udf: + builtin: + name: cat + - name: odd-cat + udf: + builtin: + name: cat + - name: out + sink: + udsink: + container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-sinks/tree/main/redis-e2e-test-sink + image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:latest + + edges: + - from: in + to: even-or-odd + - from: even-or-odd + to: even-cat + conditions: + tags: + operator: or + values: + - even-tag + - from: even-or-odd + to: odd-cat + conditions: + tags: + operator: or + values: + - odd-tag + - from: even-cat + to: out + - from: odd-cat + to: out diff --git a/test/reduce-e2e/reduce_test.go b/test/reduce-e2e/reduce_test.go index 36bddd2638..451027167c 100644 --- a/test/reduce-e2e/reduce_test.go +++ b/test/reduce-e2e/reduce_test.go @@ -147,6 +147,87 @@ func (r *ReduceSuite) TestComplexReducePipelineKeyedNonKeyed() { done <- struct{}{} } +func (r *ReduceSuite) TestJoinOnReducePipeline() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + w := r.Given().Pipeline("@testdata/join-on-reduce-pipeline.yaml"). + When(). + CreatePipelineAndWait() + defer w.DeletePipelineAndWait() + pipelineName := "join-on-reduce" + + // wait for all the pods to come up + w.Expect().VertexPodsRunning() + + done := make(chan struct{}) + go func() { + // publish messages to source vertex, with event time starting from 60000 + startTime := 60000 + for i := 0; true; i++ { + select { + case <-ctx.Done(): + return + case <-done: + return + default: + eventTime := strconv.Itoa(startTime + i*1000) + w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("1")).WithHeader("X-Numaflow-Event-Time", eventTime)). + SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("2")).WithHeader("X-Numaflow-Event-Time", eventTime)). + SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("3")).WithHeader("X-Numaflow-Event-Time", eventTime)) + } + } + }() + + // todo: this only tests for one occurrence: ideally should verify all + w.Expect(). + SinkContains("sink", "40"). // per 10 second window: (10 * 2) * 2 atoi vertices + SinkContains("sink", "80") // per 10 second window: 10 * (1 + 3) * 2 atoi vertices + done <- struct{}{} +} + +func (r *ReduceSuite) TestJoinOnMapPipeline() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + w := r.Given().Pipeline("@testdata/join-on-map-pipeline.yaml"). + When(). + CreatePipelineAndWait() + defer w.DeletePipelineAndWait() + pipelineName := "join-on-map" + + // wait for all the pods to come up + w.Expect().VertexPodsRunning() + + done := make(chan struct{}) + go func() { + // publish messages to source vertex, with event time starting from 60000 + startTime := 60000 + for i := 0; true; i++ { + select { + case <-ctx.Done(): + return + case <-done: + return + default: + eventTime := strconv.Itoa(startTime + i*1000) + w.SendMessageTo(pipelineName, "in-0", NewHttpPostRequest().WithBody([]byte("1")).WithHeader("X-Numaflow-Event-Time", eventTime)). + SendMessageTo(pipelineName, "in-0", NewHttpPostRequest().WithBody([]byte("2")).WithHeader("X-Numaflow-Event-Time", eventTime)). + SendMessageTo(pipelineName, "in-0", NewHttpPostRequest().WithBody([]byte("3")).WithHeader("X-Numaflow-Event-Time", eventTime)) + w.SendMessageTo(pipelineName, "in-1", NewHttpPostRequest().WithBody([]byte("5")).WithHeader("X-Numaflow-Event-Time", eventTime)). + SendMessageTo(pipelineName, "in-1", NewHttpPostRequest().WithBody([]byte("6")).WithHeader("X-Numaflow-Event-Time", eventTime)). + SendMessageTo(pipelineName, "in-1", NewHttpPostRequest().WithBody([]byte("7")).WithHeader("X-Numaflow-Event-Time", eventTime)) + + } + } + }() + + // todo: this only tests for one occurrence: consider verifying that all match? (but test will take longer) + w.Expect(). + SinkContains("sink", "80"). // per 10 second window: 10 * (2 + 6) = 80 + SinkContains("sink", "160") // per 10 second window: 10 * (1 + 3 + 5 + 7) = 160 + + done <- struct{}{} +} + func (r *ReduceSuite) TestSimpleReducePipelineFailOverUsingWAL() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() diff --git a/test/reduce-e2e/testdata/join-on-map-pipeline.yaml b/test/reduce-e2e/testdata/join-on-map-pipeline.yaml new file mode 100644 index 0000000000..65f127d72d --- /dev/null +++ b/test/reduce-e2e/testdata/join-on-map-pipeline.yaml @@ -0,0 +1,58 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: join-on-map +spec: + limits: + readBatchSize: 50 + vertices: + - name: in-0 + scale: + min: 1 + source: + http: {} + - name: in-1 + scale: + min: 1 + source: + http: {} + - name: atoi + partitions: 2 + scale: + min: 1 + udf: + container: + # Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/even_odd + image: quay.io/numaio/numaflow-go/map-even-odd + - name: compute-sum + partitions: 2 + udf: + container: + # Compute the sum, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/sum + image: quay.io/numaio/numaflow-go/reduce-sum + groupBy: + window: + fixed: + length: 10s + keyed: true + storage: + persistentVolumeClaim: + volumeSize: 10Gi + accessMode: ReadWriteOnce + - name: sink + scale: + min: 1 + sink: + udsink: + container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-sinks/tree/main/redis-e2e-test-sink + image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:latest + edges: + - from: in-0 + to: atoi + - from: in-1 + to: atoi + - from: atoi + to: compute-sum + - from: compute-sum + to: sink diff --git a/test/reduce-e2e/testdata/join-on-reduce-pipeline.yaml b/test/reduce-e2e/testdata/join-on-reduce-pipeline.yaml new file mode 100644 index 0000000000..1e045f7408 --- /dev/null +++ b/test/reduce-e2e/testdata/join-on-reduce-pipeline.yaml @@ -0,0 +1,62 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: join-on-reduce +spec: + limits: + readBatchSize: 50 + vertices: + - name: in + scale: + min: 1 + source: + http: {} + - name: atoi-0 + scale: + min: 1 + udf: + container: + # Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/even_odd + image: quay.io/numaio/numaflow-go/map-even-odd + - name: atoi-1 + partitions: 2 + scale: + min: 1 + udf: + container: + # Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/even_odd + image: quay.io/numaio/numaflow-go/map-even-odd + - name: compute-sum + partitions: 2 + udf: + container: + # Compute the sum, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/sum + image: quay.io/numaio/numaflow-go/reduce-sum + groupBy: + window: + fixed: + length: 10s + keyed: true + storage: + persistentVolumeClaim: + volumeSize: 10Gi + accessMode: ReadWriteOnce + - name: sink + scale: + min: 1 + sink: + udsink: + container: + # A redis sink for e2e testing, see https://github.com/numaproj/numaflow-sinks/tree/main/redis-e2e-test-sink + image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:latest + edges: + - from: in + to: atoi-0 + - from: in + to: atoi-1 + - from: atoi-0 + to: compute-sum + - from: atoi-1 + to: compute-sum + - from: compute-sum + to: sink