diff --git a/client/go.mod b/client/go.mod index 89799796521..6baa2f112f4 100644 --- a/client/go.mod +++ b/client/go.mod @@ -16,7 +16,6 @@ require ( github.com/stretchr/testify v1.8.2 go.uber.org/atomic v1.10.0 go.uber.org/goleak v1.1.11 - go.uber.org/multierr v1.11.0 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 google.golang.org/grpc v1.62.1 @@ -34,6 +33,7 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.46.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/client/http/client.go b/client/http/client.go index 30144ebe2c5..7b34193c2a4 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -153,10 +153,11 @@ func (ci *clientInner) requestWithRetry( } // Copy a new backoffer for each request. bo := *reqInfo.bo - // Backoffer also needs to check the status code to determine whether to retry. + // Set the retryable checker for the backoffer if it's not set. bo.SetRetryableChecker(func(err error) bool { + // Backoffer also needs to check the status code to determine whether to retry. return err != nil && !noNeedRetry(statusCode) - }) + }, false) return bo.Exec(ctx, execFunc) } diff --git a/client/retry/backoff.go b/client/retry/backoff.go index 580e466badb..9161ad0fea1 100644 --- a/client/retry/backoff.go +++ b/client/retry/backoff.go @@ -24,12 +24,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" - "go.uber.org/multierr" "go.uber.org/zap" ) -const maxRecordErrorCount = 20 - // Option is used to customize the backoffer. type Option func(*Backoffer) @@ -50,7 +47,7 @@ type Backoffer struct { // total defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success. total time.Duration // retryableChecker is used to check if the error is retryable. - // By default, all errors are retryable. + // If it's not set, it will always retry unconditionally no matter what the error is. retryableChecker func(err error) bool // logInterval defines the log interval for retrying. logInterval time.Duration @@ -69,28 +66,22 @@ func (bo *Backoffer) Exec( ) error { defer bo.resetBackoff() var ( - allErrors error - err error - after *time.Timer + err error + after *time.Timer ) fnName := getFunctionName(fn) for { err = fn() bo.attempt++ - if bo.attempt < maxRecordErrorCount { - // multierr.Append will ignore nil error. - allErrors = multierr.Append(allErrors, err) - } - if !bo.isRetryable(err) { + if err == nil || !bo.isRetryable(err) { break } currentInterval := bo.nextInterval() bo.nextLogTime += currentInterval - if err != nil { - if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval { - bo.nextLogTime %= bo.logInterval - log.Warn("call PD API failed and retrying", zap.String("api", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err)) - } + if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval { + bo.nextLogTime %= bo.logInterval + log.Warn("[pd.backoffer] exec fn failed and retrying", + zap.String("fn-name", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err)) } if after == nil { after = time.NewTimer(currentInterval) @@ -100,7 +91,7 @@ func (bo *Backoffer) Exec( select { case <-ctx.Done(): after.Stop() - return multierr.Append(allErrors, errors.Trace(ctx.Err())) + return errors.Trace(ctx.Err()) case <-after.C: failpoint.Inject("backOffExecute", func() { testBackOffExecuteFlag = true @@ -115,7 +106,7 @@ func (bo *Backoffer) Exec( } } } - return allErrors + return err } // InitialBackoffer make the initial state for retrying. @@ -132,12 +123,9 @@ func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer total = base } bo := &Backoffer{ - base: base, - max: max, - total: total, - retryableChecker: func(err error) bool { - return err != nil - }, + base: base, + max: max, + total: total, next: base, currentTotal: 0, attempt: 0, @@ -148,8 +136,11 @@ func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer return bo } -// SetRetryableChecker sets the retryable checker. -func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool) { +// SetRetryableChecker sets the retryable checker, `overwrite` flag is used to indicate whether to overwrite the existing checker. +func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool, overwrite bool) { + if !overwrite && bo.retryableChecker != nil { + return + } bo.retryableChecker = checker } diff --git a/client/retry/backoff_test.go b/client/retry/backoff_test.go index 8df06b75f94..22d487b1885 100644 --- a/client/retry/backoff_test.go +++ b/client/retry/backoff_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "errors" + "fmt" "testing" "time" @@ -87,24 +88,64 @@ func TestBackoffer(t *testing.T) { return expectedErr }) re.InDelta(total, time.Since(start), float64(250*time.Millisecond)) - re.ErrorContains(err, "test; test; test; test") + re.ErrorContains(err, "test") re.ErrorIs(err, expectedErr) re.Equal(4, execCount) re.True(isBackofferReset(bo)) - // Test the retryable checker. + // Test the error returned. execCount = 0 - bo = InitialBackoffer(base, max, total) - bo.SetRetryableChecker(func(error) bool { - return execCount < 2 + err = bo.Exec(ctx, func() error { + execCount++ + return fmt.Errorf("test %d", execCount) }) + re.Error(err) + re.Equal("test 4", err.Error()) + re.Equal(4, execCount) + re.True(isBackofferReset(bo)) + execCount = 0 err = bo.Exec(ctx, func() error { + if execCount == 1 { + return nil + } execCount++ - return nil + return expectedErr }) + re.Equal(1, execCount) re.NoError(err) + re.True(isBackofferReset(bo)) + + // Test the retryable checker. + execCount = 0 + bo = InitialBackoffer(base, max, total) + retryableChecker := func(error) bool { + return execCount < 2 + } + bo.SetRetryableChecker(retryableChecker, false) + execFunc := func() error { + execCount++ + return expectedErr + } + err = bo.Exec(ctx, execFunc) + re.ErrorIs(err, expectedErr) + re.Equal(2, execCount) + re.True(isBackofferReset(bo)) + // Test the retryable checker with overwrite. + execCount = 0 + retryableChecker = func(error) bool { + return execCount < 4 + } + bo.SetRetryableChecker(retryableChecker, false) + err = bo.Exec(ctx, execFunc) + re.ErrorIs(err, expectedErr) re.Equal(2, execCount) re.True(isBackofferReset(bo)) + execCount = 0 + bo.SetRetryableChecker(retryableChecker, true) + err = bo.Exec(ctx, execFunc) + re.ErrorIs(err, expectedErr) + re.Equal(4, execCount) + re.True(isBackofferReset(bo)) } func isBackofferReset(bo *Backoffer) bool { @@ -129,21 +170,20 @@ func TestBackofferWithLog(t *testing.T) { // 10 + 20 + 40 + 80(log) + 100(log) * 9 >= 1000, so log ten times. re.Len(ms, 10) // 10 + 20 + 40 + 80 + 100 * 9, 13 times retry. - rfc := `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]` + rfc := `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]` re.Contains(ms[len(ms)-1], rfc) // 10 + 20 + 40 + 80(log), 4 times retry. - rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]` + rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]` re.Contains(ms[0], rfc) - bo.resetBackoff() err = bo.Exec(ctx, testFn) re.ErrorIs(err, errTest) ms = lg.Messages() re.Len(ms, 20) - rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]` + rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]` re.Contains(ms[len(ms)-1], rfc) - rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]` + rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]` re.Contains(ms[len1], rfc) } diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index fa109946e4b..9d7e0985940 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -21,6 +21,7 @@ import ( "net/url" "sort" "strings" + "sync" "testing" "time" @@ -531,14 +532,15 @@ func (suite *httpClientTestSuite) TestSchedulers() { defer cancel() schedulers, err := client.GetSchedulers(ctx) re.NoError(err) - re.Empty(schedulers) + const schedulerName = "evict-leader-scheduler" + re.NotContains(schedulers, schedulerName) - err = client.CreateScheduler(ctx, "evict-leader-scheduler", 1) + err = client.CreateScheduler(ctx, schedulerName, 1) re.NoError(err) schedulers, err = client.GetSchedulers(ctx) re.NoError(err) - re.Len(schedulers, 1) - err = client.SetSchedulerDelay(ctx, "evict-leader-scheduler", 100) + re.Contains(schedulers, schedulerName) + err = client.SetSchedulerDelay(ctx, schedulerName, 100) re.NoError(err) err = client.SetSchedulerDelay(ctx, "not-exist", 100) re.ErrorContains(err, "500 Internal Server Error") // TODO: should return friendly error message @@ -757,3 +759,43 @@ func (suite *httpClientTestSuite) TestGetHealthStatus() { re.Equal("pd2", healths[1].Name) re.True(healths[0].Health && healths[1].Health) } + +func (suite *httpClientTestSuite) TestRetryOnLeaderChange() { + re := suite.Require() + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + bo := retry.InitialBackoffer(100*time.Millisecond, time.Second, 0) + client := suite.client.WithBackoffer(bo) + for { + healths, err := client.GetHealthStatus(ctx) + if err != nil && strings.Contains(err.Error(), "context canceled") { + return + } + re.NoError(err) + re.Len(healths, 2) + select { + case <-ctx.Done(): + return + default: + } + } + }() + + leader := suite.cluster.GetLeaderServer() + re.NotNil(leader) + for i := 0; i < 3; i++ { + leader.ResignLeader() + re.NotEmpty(suite.cluster.WaitLeader()) + leader = suite.cluster.GetLeaderServer() + re.NotNil(leader) + } + + // Cancel the context to stop the goroutine. + cancel() + wg.Wait() +}