Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid unwanted watcher creation and reduce being stuck with udf is restarted #999

Merged
merged 12 commits into from
Aug 30, 2023
4 changes: 2 additions & 2 deletions pkg/shared/kvs/jetstream/kv_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ func (jsw *jetStreamWatch) Watch(ctx context.Context) (<-chan kvs.KVEntry, <-cha
kvLastUpdatedTime := jsw.lastUpdateKVTime()

// if the last update time is zero, it means that there are no key-value pairs in the store yet or ctx was canceled both the cases we should not recreate the watcher
// if the last update time is before the previous fetch time, it means that the store is not getting any updates
// if the last update time is not after the previous fetch time, it means that the store is not getting any updates (watermark is not getting updated)
// therefore, we don't have to recreate the watcher
if kvLastUpdatedTime.IsZero() || kvLastUpdatedTime.Before(jsw.previousFetchTime) {
if kvLastUpdatedTime.IsZero() || !kvLastUpdatedTime.After(jsw.previousFetchTime) {
jsw.log.Debug("The watcher is not receiving any updates, but the store is not getting any updates either", zap.String("watcher", jsw.GetKVName()), zap.Time("lastUpdateKVTime", kvLastUpdatedTime), zap.Time("previousFetchTime", jsw.previousFetchTime))
} else {
// if the last update time is after the previous fetch time, it means that the store is getting updates but the watcher is not receiving any
Expand Down
194 changes: 111 additions & 83 deletions pkg/udf/rpc/grpc_reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"strconv"
"sync"
"time"

reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1"
Expand Down Expand Up @@ -75,111 +74,103 @@ func (u *GRPCBasedReduce) WaitUntilReady(ctx context.Context) error {

// ApplyReduce accepts a channel of isbMessages and returns the aggregated result
func (u *GRPCBasedReduce) ApplyReduce(ctx context.Context, partitionID *partition.ID, messageStream <-chan *isb.ReadMessage) ([]*isb.WriteMessage, error) {
datumCh := make(chan *reducepb.ReduceRequest)
var wg sync.WaitGroup
var result *reducepb.ReduceResponse
var err error
var (
result *reducepb.ReduceResponse
err error
errCh = make(chan error, 1)
responseCh = make(chan *reducepb.ReduceResponse, 1)
datumCh = make(chan *reducepb.ReduceRequest)
)

// pass key and window information inside the context
mdMap := map[string]string{
shared.WinStartTime: strconv.FormatInt(partitionID.Start.UnixMilli(), 10),
shared.WinEndTime: strconv.FormatInt(partitionID.End.UnixMilli(), 10),
}

ctx = metadata.NewOutgoingContext(ctx, metadata.New(mdMap))
cctx, cancel := context.WithCancel(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need a new context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to exit from readloop go routine when we return from ApplyReduce.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be okay to just do ctx, cancel := context.WithCancel(ctx), do not need to give a new cctx.

This is a common patten to make sure everyone in this function watches this ctx will properly exit if the function returns, no matter if it's expectedly or unexpectedly.

defer cancel()
grpcCtx := metadata.NewOutgoingContext(ctx, metadata.New(mdMap))

// There can be two error scenarios:
// 1. The u.client.ReduceFn method returns an error before reading all the messages from the messageStream
// 2. The u.client.ReduceFn method returns an error after reading all the messages from the messageStream

// invoke the reduceFn method with datumCh channel
wg.Add(1)
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need of wait group?

defer wg.Done()
// TODO handle this error here itself
result, err = u.client.ReduceFn(ctx, datumCh)
result, err = u.client.ReduceFn(grpcCtx, datumCh)
if err != nil {
errCh <- err
} else {
responseCh <- result
}
close(errCh)
close(responseCh)
}()

readLoop:
for {
select {
case msg, ok := <-messageStream:
if msg != nil {
// create datum from isbMessage and send it to datumCh channel for reduceFn
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go func() {
  defer close(datumCh)
  for {
    select {
      if xxx {
        return
      }
      xxxx
    }
  }
}()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// after reading all the messages from the messageStream or if ctx was canceled close the datumCh channel
defer close(datumCh)
for {
select {
case msg, ok := <-messageStream:
// if the messageStream is closed, break the loop
if !ok {
return
}
// if the message is nil, break the loop
if msg == nil {
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

d := createDatum(msg)

// send the datum to datumCh channel, handle the case when the context is canceled
select {
case datumCh <- d:
case <-ctx.Done():
close(datumCh)
return nil, ctx.Err()
case <-cctx.Done():
return
}
}
if !ok {
break readLoop
}
case <-ctx.Done():
close(datumCh)
return nil, ctx.Err()
}
}

// close the datumCh, let the reduceFn know that there are no more messages
close(datumCh)

wg.Wait()

if err != nil {
// if any error happens in reduce
// will exit and restart the numa container
udfErr, _ := sdkerr.FromError(err)
switch udfErr.ErrorKind() {
case sdkerr.Retryable:
// TODO: currently we don't handle retryable errors for reduce
return nil, ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
}
case sdkerr.NonRetryable:
return nil, ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
}
default:
return nil, ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
case <-cctx.Done(): // if the context is done, break the loop
return
}
}
}
}()

taggedMessages := make([]*isb.WriteMessage, 0)
for _, response := range result.GetResults() {
keys := response.Keys
taggedMessage := &isb.WriteMessage{
Message: isb.Message{
Header: isb.Header{
MessageInfo: isb.MessageInfo{
EventTime: partitionID.End.Add(-1 * time.Millisecond),
IsLate: false,
// wait for the reduceFn to finish
for {
select {
case err = <-errCh:
if err != nil {
return nil, convertToUdfError(err)
}
case result = <-responseCh:
taggedMessages := make([]*isb.WriteMessage, 0)
for _, response := range result.GetResults() {
keys := response.Keys
taggedMessage := &isb.WriteMessage{
Message: isb.Message{
Header: isb.Header{
MessageInfo: isb.MessageInfo{
EventTime: partitionID.End.Add(-1 * time.Millisecond),
IsLate: false,
},
Keys: keys,
},
Body: isb.Body{
Payload: response.Value,
},
},
Keys: keys,
},
Body: isb.Body{
Payload: response.Value,
},
},
Tags: response.Tags,
Tags: response.Tags,
}
taggedMessages = append(taggedMessages, taggedMessage)
}
return taggedMessages, nil
case <-cctx.Done():
return nil, convertToUdfError(ctx.Err())
}
taggedMessages = append(taggedMessages, taggedMessage)
}
return taggedMessages, nil
}

func createDatum(readMessage *isb.ReadMessage) *reducepb.ReduceRequest {
Expand All @@ -194,3 +185,40 @@ func createDatum(readMessage *isb.ReadMessage) *reducepb.ReduceRequest {
}
return d
}

// convertToUdfError converts the error returned by the reduceFn to ApplyUDFErr
func convertToUdfError(err error) ApplyUDFErr {
// if any error happens in reduce
// will exit and restart the numa container
udfErr, _ := sdkerr.FromError(err)
switch udfErr.ErrorKind() {
case sdkerr.Retryable:
// TODO: currently we don't handle retryable errors for reduce
return ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
}
case sdkerr.NonRetryable:
return ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
}
default:
return ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
}
}
}
7 changes: 6 additions & 1 deletion pkg/udf/rpc/grpc_reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func TestGRPCBasedUDF_BasicReduceWithMockClient(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

go func() {
<-ctx.Done()
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
Expand All @@ -159,7 +160,11 @@ func TestGRPCBasedUDF_BasicReduceWithMockClient(t *testing.T) {

go func() {
for index := range messages {
messageCh <- &messages[index]
select {
case <-ctx.Done():
return
case messageCh <- &messages[index]:
}
}
close(messageCh)
}()
Expand Down
Loading