Skip to content

Commit

Permalink
Revert "Revert "fix: remove retry when the processor is not found. (n…
Browse files Browse the repository at this point in the history
…umaproj#868)""

This reverts commit cf8a7d5.
  • Loading branch information
KeranYang committed Jul 20, 2023
1 parent cf8a7d5 commit fc724df
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 99 deletions.
15 changes: 6 additions & 9 deletions pkg/reduce/pnf/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,23 +130,20 @@ func (op *OrderedProcessor) SchedulePnF(
// to wait for the close-of-book on the PBQ to materialize the writeMessages.
func (op *OrderedProcessor) reduceOp(ctx context.Context, t *ForwardTask) {
start := time.Now()
for {
// FIXME: this error handling won't work with streams. We cannot do infinite retries
// because whatever is written to the stream is lost between retries.
err := t.pf.Process(ctx)
if err == nil {
break
} else if err == ctx.Err() {
err := t.pf.Process(ctx)
if err != nil {
if err == ctx.Err() {
udfError.With(map[string]string{
metrics.LabelVertex: op.vertexName,
metrics.LabelPipeline: op.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(op.vertexReplica)),
}).Inc()
op.log.Infow("ReduceOp exiting", zap.String("partitionID", t.pf.PartitionID.String()), zap.Error(ctx.Err()))
return
} else {
// panic since we cannot retry, when the pod restarts, it will replay the records from the PBQ.
op.log.Panic("Process failed exiting...", zap.String("partitionID", t.pf.PartitionID.String()), zap.Error(err))
}
op.log.Errorw("Process failed", zap.String("partitionID", t.pf.PartitionID.String()), zap.Error(err))
time.Sleep(retryDelay)
}

// indicate that we are done with reduce UDF invocation.
Expand Down
163 changes: 86 additions & 77 deletions pkg/watermark/fetch/edge_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,18 +866,8 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
assert.NoError(t, err)
defer otStore.Close()

otValueByte, err := otValueToBytes(testOffset, epoch, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

epoch += 60000

otValueByte, err = otValueToBytes(testOffset+5, epoch, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p2", otValueByte)
assert.NoError(t, err)

hbWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_PROCESSORS", hbWatcherCh)
assert.NoError(t, err)
otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh)
Expand All @@ -901,7 +891,6 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err())
}
default:
//println("waiting for processors to be added")
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
Expand All @@ -910,42 +899,52 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
assert.True(t, allProcessors["p1"].IsActive())
assert.True(t, allProcessors["p2"].IsActive())

heartBeatManagerMap["p1"].stop()
otValueByte, err := otValueToBytes(testOffset, epoch, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

// "p1" status becomes deleted since we stopped the heartbeat
for !allProcessors["p1"].IsDeleted() {
otValueByte, err = otValueToBytes(testOffset+5, epoch, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p2", otValueByte)
assert.NoError(t, err)

for allProcessors["p1"].GetOffsetTimelines()[0].GetHeadOffset() != 100 {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected p1 to be deleted: %s", ctx.Err())
t.Fatalf("expected p1 head offset to be 100: %s", ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
}

// "p2" will become active since we are sending heartbeat for it
for !allProcessors["p2"].IsActive() {
heartBeatManagerMap["p1"].stop()

// "p1" status becomes deleted since we stopped the heartbeat
for !allProcessors["p1"].IsDeleted() {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected p2 to be active: %s", ctx.Err())
t.Fatalf("expected p1 to be deleted: %s", ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
}

for allProcessors["p1"].GetOffsetTimelines()[0].GetHeadOffset() != 100 {
// "p2" will become active since we are sending heartbeat for it
for !allProcessors["p2"].IsActive() {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected p1 head offset to be 100: %s", ctx.Err())
t.Fatalf("expected p2 to be active: %s", ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
}

Expand Down Expand Up @@ -1107,30 +1106,6 @@ func TestFetcherWithSameOTBucketWithSinglePartition(t *testing.T) {
assert.NoError(t, err)
defer otStore.Close()

// put values into otStore
// this first entry should not be in the offset timeline because we set the wmb bucket history to 2
otValueByte, err := otValueToBytes(testOffset, epoch+100, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

otValueByte, err = otValueToBytes(testOffset+1, epoch+200, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

otValueByte, err = otValueToBytes(testOffset+2, epoch+300, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

epoch += 60000

otValueByte, err = otValueToBytes(testOffset+5, epoch+500, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p2", otValueByte)
assert.NoError(t, err)

// create watchers for heartbeat and offset timeline
hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient)
assert.NoError(t, err)
Expand Down Expand Up @@ -1161,7 +1136,42 @@ func TestFetcherWithSameOTBucketWithSinglePartition(t *testing.T) {
}
}

for allProcessors["p1"].GetOffsetTimelines()[0].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" {
for !allProcessors["p1"].IsActive() {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected p1 to be active, got %t: %s", allProcessors["p1"].IsActive(), ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
}
}

// put values into otStore
// this first entry should not be in the offset timeline because we set the wmb bucket history to 2
otValueByte, err := otValueToBytes(testOffset, epoch+100, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

otValueByte, err = otValueToBytes(testOffset+1, epoch+200, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

otValueByte, err = otValueToBytes(testOffset+2, epoch+300, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

epoch += 60000

otValueByte, err = otValueToBytes(testOffset+5, epoch+500, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p2", otValueByte)
assert.NoError(t, err)

for allProcessors["p1"].GetOffsetTimelines()[0].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [1651161600100:100] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
Expand Down Expand Up @@ -1215,7 +1225,6 @@ func TestFetcherWithSameOTBucketWithSinglePartition(t *testing.T) {
t.Fatalf("expected p1 to be active: %s", ctx.Err())
}
default:
//println("waiting for p1 to be active line 1221")
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
Expand Down Expand Up @@ -1389,6 +1398,36 @@ func TestFetcherWithSameOTBucketWithMultiplePartition(t *testing.T) {
assert.NoError(t, err)
defer otStore.Close()

// create watchers for heartbeat and offset timeline
hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient)
assert.NoError(t, err)
otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient)
assert.NoError(t, err)
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
processorManager := processor.NewProcessorManager(ctx, storeWatcher, 3)
fetcher := NewEdgeFetcher(ctx, "testBuffer", storeWatcher, processorManager, 3)

var heartBeatManagerMap = make(map[string]*heartBeatManager)
heartBeatManagerMap["p1"] = manageHeartbeat(ctx, "p1", hbStore, &wg)
heartBeatManagerMap["p2"] = manageHeartbeat(ctx, "p2", hbStore, &wg)

// start the heartbeats for p1 and p2
heartBeatManagerMap["p1"].start()
heartBeatManagerMap["p2"].start()

allProcessors := processorManager.GetAllProcessors()
for len(allProcessors) != 2 {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
}

// put values into otStore
otValueByteOne, err := otValueToBytes(testOffset, epoch+100, false, 0)
assert.NoError(t, err)
Expand Down Expand Up @@ -1446,36 +1485,6 @@ func TestFetcherWithSameOTBucketWithMultiplePartition(t *testing.T) {
err = otStore.PutKV(ctx, "p2", otValueByteThree)
assert.NoError(t, err)

// create watchers for heartbeat and offset timeline
hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient)
assert.NoError(t, err)
otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient)
assert.NoError(t, err)
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
processorManager := processor.NewProcessorManager(ctx, storeWatcher, 3)
fetcher := NewEdgeFetcher(ctx, "testBuffer", storeWatcher, processorManager, 3)

var heartBeatManagerMap = make(map[string]*heartBeatManager)
heartBeatManagerMap["p1"] = manageHeartbeat(ctx, "p1", hbStore, &wg)
heartBeatManagerMap["p2"] = manageHeartbeat(ctx, "p2", hbStore, &wg)

// start the heartbeats for p1 and p2
heartBeatManagerMap["p1"].start()
heartBeatManagerMap["p2"].start()

allProcessors := processorManager.GetAllProcessors()
for len(allProcessors) != 2 {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
}

for allProcessors["p1"].GetOffsetTimelines()[0].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [1651161600100:100] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" &&
allProcessors["p1"].GetOffsetTimelines()[1].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [1651161600100:100] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" &&
allProcessors["p1"].GetOffsetTimelines()[2].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [1651161600100:100] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" {
Expand Down
16 changes: 3 additions & 13 deletions pkg/watermark/processor/processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/store"
Expand Down Expand Up @@ -243,19 +242,10 @@ func (v *ProcessorManager) startTimeLineWatcher() {
}
switch value.Operation() {
case store.KVPut:
// a new processor's OT might take up to 5 secs to be reflected because we are not waiting for it to be added.
// This should not be a problem because the processor will send heartbeat as soon as it boots up.
// In case we miss it, we might see a delay.
p := v.GetProcessor(value.Key())
_ = wait.ExponentialBackoffWithContext(v.ctx, wait.Backoff{
// default heartbeat rate is set to 5 seconds, so retry every "duration * factor + [0, jitter]" interval for 5 times
Duration: 1 * time.Second,
Factor: 1,
Jitter: 0.1,
Steps: 5,
}, func() (done bool, err error) {
if p = v.GetProcessor(value.Key()); p == nil {
return false, nil
}
return true, nil
})
if p == nil {
v.log.Errorw("Unable to find the processor", zap.String("processorEntity", value.Key()))
continue
Expand Down

0 comments on commit fc724df

Please sign in to comment.