Skip to content

Commit

Permalink
client/retry: only return the latest error in backoffer (#8227)
Browse files Browse the repository at this point in the history
ref #8142

Due to the return of historical errors causing the client's retry logic to fail,
and since we currently do not need to obtain all errors during retries, this PR
removes `multierr` from backoffer and add tests to ensure the correctness of the retry logic.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
JmPotato and ti-chi-bot[bot] authored May 31, 2024
1 parent 19c9852 commit 199b017
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 45 deletions.
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.62.1
Expand All @@ -34,6 +33,7 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
5 changes: 3 additions & 2 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,11 @@ func (ci *clientInner) requestWithRetry(
}
// Copy a new backoffer for each request.
bo := *reqInfo.bo
// Backoffer also needs to check the status code to determine whether to retry.
// Set the retryable checker for the backoffer if it's not set.
bo.SetRetryableChecker(func(err error) bool {
// Backoffer also needs to check the status code to determine whether to retry.
return err != nil && !noNeedRetry(statusCode)
})
}, false)
return bo.Exec(ctx, execFunc)
}

Expand Down
45 changes: 18 additions & 27 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"go.uber.org/multierr"
"go.uber.org/zap"
)

const maxRecordErrorCount = 20

// Option is used to customize the backoffer.
type Option func(*Backoffer)

Expand All @@ -50,7 +47,7 @@ type Backoffer struct {
// total defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
total time.Duration
// retryableChecker is used to check if the error is retryable.
// By default, all errors are retryable.
// If it's not set, it will always retry unconditionally no matter what the error is.
retryableChecker func(err error) bool
// logInterval defines the log interval for retrying.
logInterval time.Duration
Expand All @@ -69,28 +66,22 @@ func (bo *Backoffer) Exec(
) error {
defer bo.resetBackoff()
var (
allErrors error
err error
after *time.Timer
err error
after *time.Timer
)
fnName := getFunctionName(fn)
for {
err = fn()
bo.attempt++
if bo.attempt < maxRecordErrorCount {
// multierr.Append will ignore nil error.
allErrors = multierr.Append(allErrors, err)
}
if !bo.isRetryable(err) {
if err == nil || !bo.isRetryable(err) {
break
}
currentInterval := bo.nextInterval()
bo.nextLogTime += currentInterval
if err != nil {
if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval {
bo.nextLogTime %= bo.logInterval
log.Warn("call PD API failed and retrying", zap.String("api", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err))
}
if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval {
bo.nextLogTime %= bo.logInterval
log.Warn("[pd.backoffer] exec fn failed and retrying",
zap.String("fn-name", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err))
}
if after == nil {
after = time.NewTimer(currentInterval)
Expand All @@ -100,7 +91,7 @@ func (bo *Backoffer) Exec(
select {
case <-ctx.Done():
after.Stop()
return multierr.Append(allErrors, errors.Trace(ctx.Err()))
return errors.Trace(ctx.Err())
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
Expand All @@ -115,7 +106,7 @@ func (bo *Backoffer) Exec(
}
}
}
return allErrors
return err
}

// InitialBackoffer make the initial state for retrying.
Expand All @@ -132,12 +123,9 @@ func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer
total = base
}
bo := &Backoffer{
base: base,
max: max,
total: total,
retryableChecker: func(err error) bool {
return err != nil
},
base: base,
max: max,
total: total,
next: base,
currentTotal: 0,
attempt: 0,
Expand All @@ -148,8 +136,11 @@ func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer
return bo
}

// SetRetryableChecker sets the retryable checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool) {
// SetRetryableChecker sets the retryable checker, `overwrite` flag is used to indicate whether to overwrite the existing checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool, overwrite bool) {
if !overwrite && bo.retryableChecker != nil {
return
}
bo.retryableChecker = checker
}

Expand Down
62 changes: 51 additions & 11 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -87,24 +88,64 @@ func TestBackoffer(t *testing.T) {
return expectedErr
})
re.InDelta(total, time.Since(start), float64(250*time.Millisecond))
re.ErrorContains(err, "test; test; test; test")
re.ErrorContains(err, "test")
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))

// Test the retryable checker.
// Test the error returned.
execCount = 0
bo = InitialBackoffer(base, max, total)
bo.SetRetryableChecker(func(error) bool {
return execCount < 2
err = bo.Exec(ctx, func() error {
execCount++
return fmt.Errorf("test %d", execCount)
})
re.Error(err)
re.Equal("test 4", err.Error())
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
execCount = 0
err = bo.Exec(ctx, func() error {
if execCount == 1 {
return nil
}
execCount++
return nil
return expectedErr
})
re.Equal(1, execCount)
re.NoError(err)
re.True(isBackofferReset(bo))

// Test the retryable checker.
execCount = 0
bo = InitialBackoffer(base, max, total)
retryableChecker := func(error) bool {
return execCount < 2
}
bo.SetRetryableChecker(retryableChecker, false)
execFunc := func() error {
execCount++
return expectedErr
}
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(2, execCount)
re.True(isBackofferReset(bo))
// Test the retryable checker with overwrite.
execCount = 0
retryableChecker = func(error) bool {
return execCount < 4
}
bo.SetRetryableChecker(retryableChecker, false)
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(2, execCount)
re.True(isBackofferReset(bo))
execCount = 0
bo.SetRetryableChecker(retryableChecker, true)
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
}

func isBackofferReset(bo *Backoffer) bool {
Expand All @@ -129,21 +170,20 @@ func TestBackofferWithLog(t *testing.T) {
// 10 + 20 + 40 + 80(log) + 100(log) * 9 >= 1000, so log ten times.
re.Len(ms, 10)
// 10 + 20 + 40 + 80 + 100 * 9, 13 times retry.
rfc := `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]`
rfc := `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
// 10 + 20 + 40 + 80(log), 4 times retry.
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]`
rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]`
re.Contains(ms[0], rfc)

bo.resetBackoff()
err = bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

ms = lg.Messages()
re.Len(ms, 20)
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]`
rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]`
rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]`
re.Contains(ms[len1], rfc)
}

Expand Down
50 changes: 46 additions & 4 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -531,14 +532,15 @@ func (suite *httpClientTestSuite) TestSchedulers() {
defer cancel()
schedulers, err := client.GetSchedulers(ctx)
re.NoError(err)
re.Empty(schedulers)
const schedulerName = "evict-leader-scheduler"
re.NotContains(schedulers, schedulerName)

err = client.CreateScheduler(ctx, "evict-leader-scheduler", 1)
err = client.CreateScheduler(ctx, schedulerName, 1)
re.NoError(err)
schedulers, err = client.GetSchedulers(ctx)
re.NoError(err)
re.Len(schedulers, 1)
err = client.SetSchedulerDelay(ctx, "evict-leader-scheduler", 100)
re.Contains(schedulers, schedulerName)
err = client.SetSchedulerDelay(ctx, schedulerName, 100)
re.NoError(err)
err = client.SetSchedulerDelay(ctx, "not-exist", 100)
re.ErrorContains(err, "500 Internal Server Error") // TODO: should return friendly error message
Expand Down Expand Up @@ -757,3 +759,43 @@ func (suite *httpClientTestSuite) TestGetHealthStatus() {
re.Equal("pd2", healths[1].Name)
re.True(healths[0].Health && healths[1].Health)
}

func (suite *httpClientTestSuite) TestRetryOnLeaderChange() {
re := suite.Require()
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
bo := retry.InitialBackoffer(100*time.Millisecond, time.Second, 0)
client := suite.client.WithBackoffer(bo)
for {
healths, err := client.GetHealthStatus(ctx)
if err != nil && strings.Contains(err.Error(), "context canceled") {
return
}
re.NoError(err)
re.Len(healths, 2)
select {
case <-ctx.Done():
return
default:
}
}
}()

leader := suite.cluster.GetLeaderServer()
re.NotNil(leader)
for i := 0; i < 3; i++ {
leader.ResignLeader()
re.NotEmpty(suite.cluster.WaitLeader())
leader = suite.cluster.GetLeaderServer()
re.NotNil(leader)
}

// Cancel the context to stop the goroutine.
cancel()
wg.Wait()
}

0 comments on commit 199b017

Please sign in to comment.