Skip to content

Commit

Permalink
chore: add some small refactors mainly for source (#986)
Browse files Browse the repository at this point in the history
1. Remove retry on Ack.
For most of our existing sources, the Ack() functions return `return make([]error, len(offsets))`. The only exception is the RedisStream source, it uses `err := br.Client.XAck()` to batch acknowledge and populates the error array with the same err. Hence, for ALL sources, it's either ALL success or ALL failure and we don't need to retry acking.
2. Replace `err ==` with `errors.Is`.
3. Fix some typos and nit grammars as IntelliJ is giving me a lot of red lines :)
4. Remove some gRPC unit tests because I don't think they are necessary. The whole point of mocking a dependency is so that we don't worry about how the dependency constructs the returned value. Some of our gRPC unit tests are implementing the dependency and verifying the functionality of the implementation, which doesn't make sense to me. e.g. in 
`TestHGRPCBasedUDF_ApplyWithMockClient`, we implement `multiplyBy2` and verify the result is `multipliedBy2`.

Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Aug 25, 2023
1 parent d99480a commit e0cefe6
Show file tree
Hide file tree
Showing 18 changed files with 164 additions and 470 deletions.
3 changes: 2 additions & 1 deletion pkg/reduce/pbq/store/wal/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package wal

import (
"encoding/binary"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -111,7 +112,7 @@ func (w *WAL) Read(size int64) ([]*isb.ReadMessage, bool, error) {
for size > w.rOffset-start && !w.isEnd() {
message, sizeRead, err := decodeReadMessage(w.fp)
if err != nil {
if err == errChecksumMismatch {
if errors.Is(err, errChecksumMismatch) {
w.corrupted = true
}
return nil, false, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/reduce/pnf/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pnf
import (
"container/list"
"context"
"errors"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -132,7 +133,7 @@ func (op *OrderedProcessor) reduceOp(ctx context.Context, t *ForwardTask) {
start := time.Now()
err := t.pf.Process(ctx)
if err != nil {
if err == ctx.Err() {
if errors.Is(err, ctx.Err()) {
udfError.With(map[string]string{
metrics.LabelVertex: op.vertexName,
metrics.LabelPipeline: op.pipelineName,
Expand Down
5 changes: 3 additions & 2 deletions pkg/shared/kvs/jetstream/kv_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package jetstream

import (
"context"
"errors"
"fmt"
"time"

"github.com/nats-io/nats.go"
"github.com/numaproj/numaflow/pkg/shared/kvs"
"go.uber.org/zap"

jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats"
"github.com/numaproj/numaflow/pkg/shared/kvs"
"github.com/numaproj/numaflow/pkg/shared/logging"
)

Expand Down Expand Up @@ -229,7 +230,7 @@ retryLoop:
} else {
// if there are no keys in the store, return zero time because there are no updates
// upstream will handle it
if err == nats.ErrNoKeysFound {
if errors.Is(err, nats.ErrNoKeysFound) {
return time.Time{}
}
jsw.log.Errorw("Failed to get keys", zap.String("watcher", jsw.GetKVName()), zap.Error(err))
Expand Down
7 changes: 4 additions & 3 deletions pkg/sinks/udsink/udsink_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package udsink

import (
"context"
"errors"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -47,7 +48,7 @@ func Test_gRPCBasedUDSink_WaitUntilReadyWithMockClient(t *testing.T) {
defer cancel()
go func() {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
t.Log(t.Name(), "test timeout")
}
}()
Expand Down Expand Up @@ -103,7 +104,7 @@ func Test_gRPCBasedUDSink_ApplyWithMockClient(t *testing.T) {
defer cancel()
go func() {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
t.Log(t.Name(), "test timeout")
}
}()
Expand Down Expand Up @@ -145,7 +146,7 @@ func Test_gRPCBasedUDSink_ApplyWithMockClient(t *testing.T) {
defer cancel()
go func() {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
t.Log(t.Name(), "test timeout")
}
}()
Expand Down
59 changes: 7 additions & 52 deletions pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/forward"
Expand Down Expand Up @@ -166,7 +164,7 @@ func (isdf *DataForward) Start() <-chan struct{} {
}
}

// publisher was created by the forwarder, so it should be closed by the forwarder.
// the publisher was created by the forwarder, so it should be closed by the forwarder.
for _, toVertexPublishers := range isdf.toVertexWMPublishers {
for _, pub := range toVertexPublishers {
if err := pub.Close(); err != nil {
Expand Down Expand Up @@ -374,7 +372,7 @@ func (isdf *DataForward) forwardAChunk(ctx context.Context) {

// when we apply transformer, we don't handle partial errors (it's either non or all, non will return early),
// so we should be able to ack all the readOffsets including data messages and control messages
err = isdf.ackFromBuffer(ctx, readOffsets)
err = isdf.ackFromSource(ctx, readOffsets)
// implicit return for posterity :-)
if err != nil {
isdf.opts.logger.Errorw("failed to ack from source", zap.Error(err))
Expand All @@ -387,54 +385,11 @@ func (isdf *DataForward) forwardAChunk(ctx context.Context) {
forwardAChunkProcessingTime.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelPartitionName: isdf.reader.GetName()}).Observe(float64(time.Since(start).Microseconds()))
}

// ackFromBuffer acknowledges an array of offsets back to the reader
// and is a blocking call or until shutdown has been initiated.
func (isdf *DataForward) ackFromBuffer(ctx context.Context, offsets []isb.Offset) error {
var ackRetryBackOff = wait.Backoff{
Factor: 1,
Jitter: 0.1,
Steps: math.MaxInt,
Duration: time.Millisecond * 10,
}
var ackOffsets = offsets
attempt := 0

ctxClosedErr := wait.ExponentialBackoff(ackRetryBackOff, func() (done bool, err error) {
errs := isdf.reader.Ack(ctx, ackOffsets)
attempt += 1
summarizedErr := errorArrayToMap(errs)
var failedOffsets []isb.Offset
if len(summarizedErr) > 0 {
isdf.opts.logger.Errorw("Failed to ack from buffer, retrying", zap.Any("errors", summarizedErr), zap.Int("attempt", attempt))
// no point retrying if ctx.Done has been invoked
select {
case <-ctx.Done():
// no point in retrying after we have been asked to stop.
return false, ctx.Err()
default:
// retry only the failed offsets
for i, offset := range ackOffsets {
if errs[i] != nil {
failedOffsets = append(failedOffsets, offset)
}
}
ackOffsets = failedOffsets
if ok, _ := isdf.IsShuttingDown(); ok {
ackErr := fmt.Errorf("AckFromBuffer, Stop called while stuck on an internal error, %v", summarizedErr)
return false, ackErr
}
return false, nil
}
} else {
return true, nil
}
})

if ctxClosedErr != nil {
isdf.opts.logger.Errorw("Context closed while waiting to ack messages inside forward", zap.Error(ctxClosedErr))
}

return ctxClosedErr
func (isdf *DataForward) ackFromSource(ctx context.Context, offsets []isb.Offset) error {
// for all the sources, we either ack all offsets or none.
// when a batch ack fails, the source Ack() function populate the error array with the same error;
// hence we can just return the first error.
return isdf.reader.Ack(ctx, offsets)[0]
}

// writeToBuffers is a blocking call until all the messages have been forwarded to all the toBuffers, or a shutdown
Expand Down
Loading

0 comments on commit e0cefe6

Please sign in to comment.