Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/retry: only return the latest error in backoffer #8227

Merged
merged 3 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.62.1
Expand All @@ -34,6 +33,7 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
5 changes: 3 additions & 2 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,11 @@ func (ci *clientInner) requestWithRetry(
}
// Copy a new backoffer for each request.
bo := *reqInfo.bo
// Backoffer also needs to check the status code to determine whether to retry.
// Set the retryable checker for the backoffer if it's not set.
bo.SetRetryableChecker(func(err error) bool {
// Backoffer also needs to check the status code to determine whether to retry.
return err != nil && !noNeedRetry(statusCode)
})
}, false)
return bo.Exec(ctx, execFunc)
}

Expand Down
45 changes: 18 additions & 27 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"go.uber.org/multierr"
"go.uber.org/zap"
)

const maxRecordErrorCount = 20

// Option is used to customize the backoffer.
type Option func(*Backoffer)

Expand All @@ -50,7 +47,7 @@ type Backoffer struct {
// total defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
total time.Duration
// retryableChecker is used to check if the error is retryable.
// By default, all errors are retryable.
// If it's not set, it will always retry unconditionally no matter what the error is.
retryableChecker func(err error) bool
// logInterval defines the log interval for retrying.
logInterval time.Duration
Expand All @@ -69,28 +66,22 @@ func (bo *Backoffer) Exec(
) error {
defer bo.resetBackoff()
var (
allErrors error
err error
after *time.Timer
err error
after *time.Timer
)
fnName := getFunctionName(fn)
for {
err = fn()
bo.attempt++
if bo.attempt < maxRecordErrorCount {
// multierr.Append will ignore nil error.
allErrors = multierr.Append(allErrors, err)
}
if !bo.isRetryable(err) {
if err == nil || !bo.isRetryable(err) {
break
}
currentInterval := bo.nextInterval()
bo.nextLogTime += currentInterval
if err != nil {
if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval {
bo.nextLogTime %= bo.logInterval
log.Warn("call PD API failed and retrying", zap.String("api", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err))
}
if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval {
bo.nextLogTime %= bo.logInterval
log.Warn("[pd.backoffer] exec fn failed and retrying",
zap.String("fn-name", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err))
}
if after == nil {
after = time.NewTimer(currentInterval)
Expand All @@ -100,7 +91,7 @@ func (bo *Backoffer) Exec(
select {
case <-ctx.Done():
after.Stop()
return multierr.Append(allErrors, errors.Trace(ctx.Err()))
return errors.Trace(ctx.Err())
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
Expand All @@ -115,7 +106,7 @@ func (bo *Backoffer) Exec(
}
}
}
return allErrors
return err
}

// InitialBackoffer make the initial state for retrying.
Expand All @@ -132,12 +123,9 @@ func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer
total = base
}
bo := &Backoffer{
base: base,
max: max,
total: total,
retryableChecker: func(err error) bool {
return err != nil
},
base: base,
max: max,
total: total,
next: base,
currentTotal: 0,
attempt: 0,
Expand All @@ -148,8 +136,11 @@ func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer
return bo
}

// SetRetryableChecker sets the retryable checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool) {
// SetRetryableChecker sets the retryable checker, `overwrite` flag is used to indicate whether to overwrite the existing checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool, overwrite bool) {
Copy link
Member

@okJiang okJiang May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't overwrite over-designed? Because now there is only one call

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SetRetryableChecker function might be invoked by higher-level callers such as BR, Lightning, or TiDB. I added this flag to ensure the caller-set checker is not overwritten unexpectedly.

if !overwrite && bo.retryableChecker != nil {
return
}
bo.retryableChecker = checker
}

Expand Down
62 changes: 51 additions & 11 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -87,24 +88,64 @@ func TestBackoffer(t *testing.T) {
return expectedErr
})
re.InDelta(total, time.Since(start), float64(250*time.Millisecond))
re.ErrorContains(err, "test; test; test; test")
re.ErrorContains(err, "test")
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))

// Test the retryable checker.
// Test the error returned.
execCount = 0
bo = InitialBackoffer(base, max, total)
bo.SetRetryableChecker(func(error) bool {
return execCount < 2
err = bo.Exec(ctx, func() error {
execCount++
return fmt.Errorf("test %d", execCount)
})
re.Error(err)
re.Equal("test 4", err.Error())
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
execCount = 0
err = bo.Exec(ctx, func() error {
if execCount == 1 {
return nil
}
execCount++
return nil
return expectedErr
})
re.Equal(1, execCount)
re.NoError(err)
re.True(isBackofferReset(bo))

// Test the retryable checker.
execCount = 0
bo = InitialBackoffer(base, max, total)
retryableChecker := func(error) bool {
return execCount < 2
}
bo.SetRetryableChecker(retryableChecker, false)
execFunc := func() error {
execCount++
return expectedErr
}
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(2, execCount)
re.True(isBackofferReset(bo))
// Test the retryable checker with overwrite.
execCount = 0
retryableChecker = func(error) bool {
return execCount < 4
}
bo.SetRetryableChecker(retryableChecker, false)
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(2, execCount)
re.True(isBackofferReset(bo))
execCount = 0
bo.SetRetryableChecker(retryableChecker, true)
err = bo.Exec(ctx, execFunc)
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
}

func isBackofferReset(bo *Backoffer) bool {
Expand All @@ -129,21 +170,20 @@ func TestBackofferWithLog(t *testing.T) {
// 10 + 20 + 40 + 80(log) + 100(log) * 9 >= 1000, so log ten times.
re.Len(ms, 10)
// 10 + 20 + 40 + 80 + 100 * 9, 13 times retry.
rfc := `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]`
rfc := `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
// 10 + 20 + 40 + 80(log), 4 times retry.
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]`
rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]`
re.Contains(ms[0], rfc)

bo.resetBackoff()
err = bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

ms = lg.Messages()
re.Len(ms, 20)
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]`
rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]`
re.Contains(ms[len(ms)-1], rfc)
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]`
rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]`
re.Contains(ms[len1], rfc)
}

Expand Down
50 changes: 46 additions & 4 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -531,14 +532,15 @@ func (suite *httpClientTestSuite) TestSchedulers() {
defer cancel()
schedulers, err := client.GetSchedulers(ctx)
re.NoError(err)
re.Empty(schedulers)
const schedulerName = "evict-leader-scheduler"
re.NotContains(schedulers, schedulerName)

err = client.CreateScheduler(ctx, "evict-leader-scheduler", 1)
err = client.CreateScheduler(ctx, schedulerName, 1)
re.NoError(err)
schedulers, err = client.GetSchedulers(ctx)
re.NoError(err)
re.Len(schedulers, 1)
err = client.SetSchedulerDelay(ctx, "evict-leader-scheduler", 100)
re.Contains(schedulers, schedulerName)
err = client.SetSchedulerDelay(ctx, schedulerName, 100)
re.NoError(err)
err = client.SetSchedulerDelay(ctx, "not-exist", 100)
re.ErrorContains(err, "500 Internal Server Error") // TODO: should return friendly error message
Expand Down Expand Up @@ -757,3 +759,43 @@ func (suite *httpClientTestSuite) TestGetHealthStatus() {
re.Equal("pd2", healths[1].Name)
re.True(healths[0].Health && healths[1].Health)
}

func (suite *httpClientTestSuite) TestRetryOnLeaderChange() {
re := suite.Require()
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
bo := retry.InitialBackoffer(100*time.Millisecond, time.Second, 0)
client := suite.client.WithBackoffer(bo)
for {
healths, err := client.GetHealthStatus(ctx)
if err != nil && strings.Contains(err.Error(), "context canceled") {
return
}
re.NoError(err)
re.Len(healths, 2)
select {
case <-ctx.Done():
return
default:
}
}
}()

leader := suite.cluster.GetLeaderServer()
re.NotNil(leader)
for i := 0; i < 3; i++ {
leader.ResignLeader()
re.NotEmpty(suite.cluster.WaitLeader())
leader = suite.cluster.GetLeaderServer()
re.NotNil(leader)
}

// Cancel the context to stop the goroutine.
cancel()
wg.Wait()
}