Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove retry when the processor is not found. #868

Merged
merged 7 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/reduce/pnf/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ func (op *OrderedProcessor) reduceOp(ctx context.Context, t *ForwardTask) {
op.log.Infow("ReduceOp exiting", zap.String("partitionID", t.pf.PartitionID.String()), zap.Error(ctx.Err()))
return
}
op.log.Errorw("Process failed", zap.String("partitionID", t.pf.PartitionID.String()), zap.Error(err))
time.Sleep(retryDelay)
KeranYang marked this conversation as resolved.
Show resolved Hide resolved
op.log.Panic("Process failed exiting...", zap.String("partitionID", t.pf.PartitionID.String()), zap.Error(err))
}

// indicate that we are done with reduce UDF invocation.
Expand Down
163 changes: 86 additions & 77 deletions pkg/watermark/fetch/edge_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,18 +866,8 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
assert.NoError(t, err)
defer otStore.Close()

otValueByte, err := otValueToBytes(testOffset, epoch, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

epoch += 60000

otValueByte, err = otValueToBytes(testOffset+5, epoch, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p2", otValueByte)
assert.NoError(t, err)

hbWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_PROCESSORS", hbWatcherCh)
assert.NoError(t, err)
otWatcher, err := inmem.NewInMemWatch(ctx, "testFetch", keyspace+"_OT", otWatcherCh)
Expand All @@ -901,7 +891,6 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err())
}
default:
//println("waiting for processors to be added")
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
Expand All @@ -910,42 +899,52 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
assert.True(t, allProcessors["p1"].IsActive())
assert.True(t, allProcessors["p2"].IsActive())

heartBeatManagerMap["p1"].stop()
otValueByte, err := otValueToBytes(testOffset, epoch, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

// "p1" status becomes deleted since we stopped the heartbeat
for !allProcessors["p1"].IsDeleted() {
otValueByte, err = otValueToBytes(testOffset+5, epoch, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p2", otValueByte)
assert.NoError(t, err)

for allProcessors["p1"].GetOffsetTimelines()[0].GetHeadOffset() != 100 {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected p1 to be deleted: %s", ctx.Err())
t.Fatalf("expected p1 head offset to be 100: %s", ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
}

// "p2" will become active since we are sending heartbeat for it
for !allProcessors["p2"].IsActive() {
heartBeatManagerMap["p1"].stop()

// "p1" status becomes deleted since we stopped the heartbeat
for !allProcessors["p1"].IsDeleted() {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected p2 to be active: %s", ctx.Err())
t.Fatalf("expected p1 to be deleted: %s", ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
}

for allProcessors["p1"].GetOffsetTimelines()[0].GetHeadOffset() != 100 {
// "p2" will become active since we are sending heartbeat for it
for !allProcessors["p2"].IsActive() {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected p1 head offset to be 100: %s", ctx.Err())
t.Fatalf("expected p2 to be active: %s", ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
}

Expand Down Expand Up @@ -1107,30 +1106,6 @@ func TestFetcherWithSameOTBucketWithSinglePartition(t *testing.T) {
assert.NoError(t, err)
defer otStore.Close()

// put values into otStore
// this first entry should not be in the offset timeline because we set the wmb bucket history to 2
otValueByte, err := otValueToBytes(testOffset, epoch+100, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

otValueByte, err = otValueToBytes(testOffset+1, epoch+200, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

otValueByte, err = otValueToBytes(testOffset+2, epoch+300, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

epoch += 60000

otValueByte, err = otValueToBytes(testOffset+5, epoch+500, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p2", otValueByte)
assert.NoError(t, err)

// create watchers for heartbeat and offset timeline
hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient)
assert.NoError(t, err)
Expand Down Expand Up @@ -1161,7 +1136,42 @@ func TestFetcherWithSameOTBucketWithSinglePartition(t *testing.T) {
}
}

for allProcessors["p1"].GetOffsetTimelines()[0].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" {
for !allProcessors["p1"].IsActive() {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected p1 to be active, got %t: %s", allProcessors["p1"].IsActive(), ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
}
}

// put values into otStore
// this first entry should not be in the offset timeline because we set the wmb bucket history to 2
otValueByte, err := otValueToBytes(testOffset, epoch+100, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

otValueByte, err = otValueToBytes(testOffset+1, epoch+200, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

otValueByte, err = otValueToBytes(testOffset+2, epoch+300, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p1", otValueByte)
assert.NoError(t, err)

epoch += 60000

otValueByte, err = otValueToBytes(testOffset+5, epoch+500, false, 0)
assert.NoError(t, err)
err = otStore.PutKV(ctx, "p2", otValueByte)
assert.NoError(t, err)

for allProcessors["p1"].GetOffsetTimelines()[0].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [1651161600100:100] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
Expand Down Expand Up @@ -1215,7 +1225,6 @@ func TestFetcherWithSameOTBucketWithSinglePartition(t *testing.T) {
t.Fatalf("expected p1 to be active: %s", ctx.Err())
}
default:
//println("waiting for p1 to be active line 1221")
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
Expand Down Expand Up @@ -1389,6 +1398,36 @@ func TestFetcherWithSameOTBucketWithMultiplePartition(t *testing.T) {
assert.NoError(t, err)
defer otStore.Close()

// create watchers for heartbeat and offset timeline
hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient)
assert.NoError(t, err)
otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient)
assert.NoError(t, err)
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
processorManager := processor.NewProcessorManager(ctx, storeWatcher, 3)
fetcher := NewEdgeFetcher(ctx, "testBuffer", storeWatcher, processorManager, 3)

var heartBeatManagerMap = make(map[string]*heartBeatManager)
heartBeatManagerMap["p1"] = manageHeartbeat(ctx, "p1", hbStore, &wg)
heartBeatManagerMap["p2"] = manageHeartbeat(ctx, "p2", hbStore, &wg)

// start the heartbeats for p1 and p2
heartBeatManagerMap["p1"].start()
heartBeatManagerMap["p2"].start()

allProcessors := processorManager.GetAllProcessors()
for len(allProcessors) != 2 {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
}

// put values into otStore
otValueByteOne, err := otValueToBytes(testOffset, epoch+100, false, 0)
assert.NoError(t, err)
Expand Down Expand Up @@ -1446,36 +1485,6 @@ func TestFetcherWithSameOTBucketWithMultiplePartition(t *testing.T) {
err = otStore.PutKV(ctx, "p2", otValueByteThree)
assert.NoError(t, err)

// create watchers for heartbeat and offset timeline
hbWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_PROCESSORS", defaultJetStreamClient)
assert.NoError(t, err)
otWatcher, err := jetstream.NewKVJetStreamKVWatch(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient)
assert.NoError(t, err)
storeWatcher := store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher)
processorManager := processor.NewProcessorManager(ctx, storeWatcher, 3)
fetcher := NewEdgeFetcher(ctx, "testBuffer", storeWatcher, processorManager, 3)

var heartBeatManagerMap = make(map[string]*heartBeatManager)
heartBeatManagerMap["p1"] = manageHeartbeat(ctx, "p1", hbStore, &wg)
heartBeatManagerMap["p2"] = manageHeartbeat(ctx, "p2", hbStore, &wg)

// start the heartbeats for p1 and p2
heartBeatManagerMap["p1"].start()
heartBeatManagerMap["p2"].start()

allProcessors := processorManager.GetAllProcessors()
for len(allProcessors) != 2 {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected 2 processors, got %d: %s", len(allProcessors), ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
allProcessors = processorManager.GetAllProcessors()
}
}

for allProcessors["p1"].GetOffsetTimelines()[0].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [1651161600100:100] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" &&
allProcessors["p1"].GetOffsetTimelines()[1].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [1651161600100:100] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" &&
allProcessors["p1"].GetOffsetTimelines()[2].Dump() != "[1651161600300:102] -> [1651161600200:101] -> [1651161600100:100] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" {
Expand Down
16 changes: 1 addition & 15 deletions pkg/watermark/processor/processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ import (
"sync"
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/store"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
"go.uber.org/zap"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't look right

)

// ProcessorManager manages the point of view of Vn-1 from Vn vertex processors (or source processor). The code is running on Vn vertex.
Expand Down Expand Up @@ -244,18 +242,6 @@ func (v *ProcessorManager) startTimeLineWatcher() {
switch value.Operation() {
case store.KVPut:
p := v.GetProcessor(value.Key())
yhl25 marked this conversation as resolved.
Show resolved Hide resolved
_ = wait.ExponentialBackoffWithContext(v.ctx, wait.Backoff{
// default heartbeat rate is set to 5 seconds, so retry every "duration * factor + [0, jitter]" interval for 5 times
Duration: 1 * time.Second,
Factor: 1,
Jitter: 0.1,
Steps: 5,
}, func() (done bool, err error) {
if p = v.GetProcessor(value.Key()); p == nil {
return false, nil
}
return true, nil
})
if p == nil {
v.log.Errorw("Unable to find the processor", zap.String("processorEntity", value.Key()))
continue
Expand Down
Loading