diff --git a/pkg/reduce/pnf/ordered.go b/pkg/reduce/pnf/ordered.go index f75f9c86b5..8f15dba4a0 100644 --- a/pkg/reduce/pnf/ordered.go +++ b/pkg/reduce/pnf/ordered.go @@ -130,13 +130,9 @@ func (op *OrderedProcessor) SchedulePnF( // to wait for the close-of-book on the PBQ to materialize the writeMessages. func (op *OrderedProcessor) reduceOp(ctx context.Context, t *ForwardTask) { start := time.Now() - for { - // FIXME: this error handling won't work with streams. We cannot do infinite retries - // because whatever is written to the stream is lost between retries. - err := t.pf.Process(ctx) - if err == nil { - break - } else if err == ctx.Err() { + err := t.pf.Process(ctx) + if err != nil { + if err == ctx.Err() { udfError.With(map[string]string{ metrics.LabelVertex: op.vertexName, metrics.LabelPipeline: op.pipelineName, @@ -144,9 +140,10 @@ func (op *OrderedProcessor) reduceOp(ctx context.Context, t *ForwardTask) { }).Inc() op.log.Infow("ReduceOp exiting", zap.String("partitionID", t.pf.PartitionID.String()), zap.Error(ctx.Err())) return + } else { + // panic since we cannot retry, when the pod restarts, it will replay the records from the PBQ. + op.log.Panic("Process failed exiting...", zap.String("partitionID", t.pf.PartitionID.String()), zap.Error(err)) } - op.log.Errorw("Process failed", zap.String("partitionID", t.pf.PartitionID.String()), zap.Error(err)) - time.Sleep(retryDelay) } // indicate that we are done with reduce UDF invocation. diff --git a/pkg/watermark/fetch/edge_fetcher_test.go b/pkg/watermark/fetch/edge_fetcher_test.go index cfcd5f7158..d262e29906 100644 --- a/pkg/watermark/fetch/edge_fetcher_test.go +++ b/pkg/watermark/fetch/edge_fetcher_test.go @@ -866,18 +866,8 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) { assert.NoError(t, err) defer otStore.Close() - otValueByte, err := otValueToBytes(testOffset, epoch, false, 0) - assert.NoError(t, err) - err = otStore.PutKV(ctx, "p1", otValueByte) - assert.NoError(t, err) - epoch += 60000 - otValueByte, err = otValueToBytes(testOffset+5, epoch, false, 0) - assert.NoError(t, err) - err = otStore.PutKV(ctx, "p2", otValueByte) - assert.NoError(t, err) - hbWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_PROCESSORS", hbWatcherCh) assert.NoError(t, err) otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh) @@ -901,7 +891,6 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) { t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err()) } default: - //println("waiting for processors to be added") time.Sleep(1 * time.Millisecond) allProcessors = processorManager.GetAllProcessors() } @@ -910,27 +899,35 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) { assert.True(t, allProcessors["p1"].IsActive()) assert.True(t, allProcessors["p2"].IsActive()) - heartBeatManagerMap["p1"].stop() + otValueByte, err := otValueToBytes(testOffset, epoch, false, 0) + assert.NoError(t, err) + err = otStore.PutKV(ctx, "p1", otValueByte) + assert.NoError(t, err) - // "p1" status becomes deleted since we stopped the heartbeat - for !allProcessors["p1"].IsDeleted() { + otValueByte, err = otValueToBytes(testOffset+5, epoch, false, 0) + assert.NoError(t, err) + err = otStore.PutKV(ctx, "p2", otValueByte) + assert.NoError(t, err) + + for allProcessors["p1"].GetOffsetTimelines()[0].GetHeadOffset() != 100 { select { case <-ctx.Done(): if ctx.Err() == context.DeadlineExceeded { - t.Fatalf("expected p1 to be deleted: %s", ctx.Err()) + t.Fatalf("expected p1 head offset to be 100: %s", ctx.Err()) } default: time.Sleep(1 * time.Millisecond) - allProcessors = processorManager.GetAllProcessors() } } - // "p2" will become active since we are sending heartbeat for it - for !allProcessors["p2"].IsActive() { + heartBeatManagerMap["p1"].stop() + + // "p1" status becomes deleted since we stopped the heartbeat + for !allProcessors["p1"].IsDeleted() { select { case <-ctx.Done(): if ctx.Err() == context.DeadlineExceeded { - t.Fatalf("expected p2 to be active: %s", ctx.Err()) + t.Fatalf("expected p1 to be deleted: %s", ctx.Err()) } default: time.Sleep(1 * time.Millisecond) @@ -938,14 +935,16 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) { } } - for allProcessors["p1"].GetOffsetTimelines()[0].GetHeadOffset() != 100 { + // "p2" will become active since we are sending heartbeat for it + for !allProcessors["p2"].IsActive() { select { case <-ctx.Done(): if ctx.Err() == context.DeadlineExceeded { - t.Fatalf("expected p1 head offset to be 100: %s", ctx.Err()) + t.Fatalf("expected p2 to be active: %s", ctx.Err()) } default: time.Sleep(1 * time.Millisecond) + allProcessors = processorManager.GetAllProcessors() } } @@ -1107,30 +1106,6 @@ func TestFetcherWithSameOTBucketWithSinglePartition(t *testing.T) { assert.NoError(t, err) defer otStore.Close() - // put values into otStore - // this first entry should not be in the offset timeline because we set the wmb bucket history to 2 - otValueByte, err := otValueToBytes(testOffset, epoch+100, false, 0) - assert.NoError(t, err) - err = otStore.PutKV(ctx, "p1", otValueByte) - assert.NoError(t, err) - - otValueByte, err = otValueToBytes(testOffset+1, epoch+200, false, 0) - assert.NoError(t, err) - err = otStore.PutKV(ctx, "p1", otValueByte) - assert.NoError(t, err) - - otValueByte, err = otValueToBytes(testOffset+2, epoch+300, false, 0) - assert.NoError(t, err) - err = otStore.PutKV(ctx, "p1", otValueByte) - assert.NoError(t, err) - - epoch += 60000 - - otValueByte, err = otValueToBytes(testOffset+5, epoch+500, false, 0) - assert.NoError(t, err) - err = otStore.PutKV(ctx, "p2", otValueByte) - assert.NoError(t, err) - // create watchers for heartbeat and offset timeline hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient) assert.NoError(t, err) @@ -1161,7 +1136,42 @@ func TestFetcherWithSameOTBucketWithSinglePartition(t *testing.T) { } } - for allProcessors["p1"].GetOffsetTimelines()[0].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" { + for !allProcessors["p1"].IsActive() { + select { + case <-ctx.Done(): + if ctx.Err() == context.DeadlineExceeded { + t.Fatalf("expected p1 to be active, got %t: %s", allProcessors["p1"].IsActive(), ctx.Err()) + } + default: + time.Sleep(1 * time.Millisecond) + } + } + + // put values into otStore + // this first entry should not be in the offset timeline because we set the wmb bucket history to 2 + otValueByte, err := otValueToBytes(testOffset, epoch+100, false, 0) + assert.NoError(t, err) + err = otStore.PutKV(ctx, "p1", otValueByte) + assert.NoError(t, err) + + otValueByte, err = otValueToBytes(testOffset+1, epoch+200, false, 0) + assert.NoError(t, err) + err = otStore.PutKV(ctx, "p1", otValueByte) + assert.NoError(t, err) + + otValueByte, err = otValueToBytes(testOffset+2, epoch+300, false, 0) + assert.NoError(t, err) + err = otStore.PutKV(ctx, "p1", otValueByte) + assert.NoError(t, err) + + epoch += 60000 + + otValueByte, err = otValueToBytes(testOffset+5, epoch+500, false, 0) + assert.NoError(t, err) + err = otStore.PutKV(ctx, "p2", otValueByte) + assert.NoError(t, err) + + for allProcessors["p1"].GetOffsetTimelines()[0].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [1651161600100:100] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" { select { case <-ctx.Done(): if ctx.Err() == context.DeadlineExceeded { @@ -1215,7 +1225,6 @@ func TestFetcherWithSameOTBucketWithSinglePartition(t *testing.T) { t.Fatalf("expected p1 to be active: %s", ctx.Err()) } default: - //println("waiting for p1 to be active line 1221") time.Sleep(1 * time.Millisecond) allProcessors = processorManager.GetAllProcessors() } @@ -1389,6 +1398,36 @@ func TestFetcherWithSameOTBucketWithMultiplePartition(t *testing.T) { assert.NoError(t, err) defer otStore.Close() + // create watchers for heartbeat and offset timeline + hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient) + assert.NoError(t, err) + otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient) + assert.NoError(t, err) + storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) + processorManager := processor.NewProcessorManager(ctx, storeWatcher, 3) + fetcher := NewEdgeFetcher(ctx, "testBuffer", storeWatcher, processorManager, 3) + + var heartBeatManagerMap = make(map[string]*heartBeatManager) + heartBeatManagerMap["p1"] = manageHeartbeat(ctx, "p1", hbStore, &wg) + heartBeatManagerMap["p2"] = manageHeartbeat(ctx, "p2", hbStore, &wg) + + // start the heartbeats for p1 and p2 + heartBeatManagerMap["p1"].start() + heartBeatManagerMap["p2"].start() + + allProcessors := processorManager.GetAllProcessors() + for len(allProcessors) != 2 { + select { + case <-ctx.Done(): + if ctx.Err() == context.DeadlineExceeded { + t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err()) + } + default: + time.Sleep(1 * time.Millisecond) + allProcessors = processorManager.GetAllProcessors() + } + } + // put values into otStore otValueByteOne, err := otValueToBytes(testOffset, epoch+100, false, 0) assert.NoError(t, err) @@ -1446,36 +1485,6 @@ func TestFetcherWithSameOTBucketWithMultiplePartition(t *testing.T) { err = otStore.PutKV(ctx, "p2", otValueByteThree) assert.NoError(t, err) - // create watchers for heartbeat and offset timeline - hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient) - assert.NoError(t, err) - otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient) - assert.NoError(t, err) - storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher) - processorManager := processor.NewProcessorManager(ctx, storeWatcher, 3) - fetcher := NewEdgeFetcher(ctx, "testBuffer", storeWatcher, processorManager, 3) - - var heartBeatManagerMap = make(map[string]*heartBeatManager) - heartBeatManagerMap["p1"] = manageHeartbeat(ctx, "p1", hbStore, &wg) - heartBeatManagerMap["p2"] = manageHeartbeat(ctx, "p2", hbStore, &wg) - - // start the heartbeats for p1 and p2 - heartBeatManagerMap["p1"].start() - heartBeatManagerMap["p2"].start() - - allProcessors := processorManager.GetAllProcessors() - for len(allProcessors) != 2 { - select { - case <-ctx.Done(): - if ctx.Err() == context.DeadlineExceeded { - t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err()) - } - default: - time.Sleep(1 * time.Millisecond) - allProcessors = processorManager.GetAllProcessors() - } - } - for allProcessors["p1"].GetOffsetTimelines()[0].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [1651161600100:100] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" && allProcessors["p1"].GetOffsetTimelines()[1].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [1651161600100:100] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" && allProcessors["p1"].GetOffsetTimelines()[2].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [1651161600100:100] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" { diff --git a/pkg/watermark/processor/processor_manager.go b/pkg/watermark/processor/processor_manager.go index ca3c4c1212..dedb0bc4cc 100644 --- a/pkg/watermark/processor/processor_manager.go +++ b/pkg/watermark/processor/processor_manager.go @@ -31,7 +31,6 @@ import ( "time" "go.uber.org/zap" - "k8s.io/apimachinery/pkg/util/wait" "github.com/numaproj/numaflow/pkg/shared/logging" "github.com/numaproj/numaflow/pkg/watermark/store" @@ -243,19 +242,10 @@ func (v *ProcessorManager) startTimeLineWatcher() { } switch value.Operation() { case store.KVPut: + // a new processor's OT might take up to 5 secs to be reflected because we are not waiting for it to be added. + // This should not be a problem because the processor will send heartbeat as soon as it boots up. + // In case we miss it, we might see a delay. p := v.GetProcessor(value.Key()) - _ = wait.ExponentialBackoffWithContext(v.ctx, wait.Backoff{ - // default heartbeat rate is set to 5 seconds, so retry every "duration * factor + [0, jitter]" interval for 5 times - Duration: 1 * time.Second, - Factor: 1, - Jitter: 0.1, - Steps: 5, - }, func() (done bool, err error) { - if p = v.GetProcessor(value.Key()); p == nil { - return false, nil - } - return true, nil - }) if p == nil { v.log.Errorw("Unable to find the processor", zap.String("processorEntity", value.Key())) continue