From 199b01792159e5d8e83ef419a5053401e998bb0e Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 31 May 2024 16:29:52 +0800 Subject: [PATCH] client/retry: only return the latest error in backoffer (#8227) ref tikv/pd#8142 Due to the return of historical errors causing the client's retry logic to fail, and since we currently do not need to obtain all errors during retries, this PR removes `multierr` from backoffer and add tests to ensure the correctness of the retry logic. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/go.mod | 2 +- client/http/client.go | 5 +- client/retry/backoff.go | 45 ++++++-------- client/retry/backoff_test.go | 62 +++++++++++++++---- tests/integrations/client/http_client_test.go | 50 +++++++++++++-- 5 files changed, 119 insertions(+), 45 deletions(-) diff --git a/client/go.mod b/client/go.mod index 89799796521..6baa2f112f4 100644 --- a/client/go.mod +++ b/client/go.mod @@ -16,7 +16,6 @@ require ( github.com/stretchr/testify v1.8.2 go.uber.org/atomic v1.10.0 go.uber.org/goleak v1.1.11 - go.uber.org/multierr v1.11.0 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 google.golang.org/grpc v1.62.1 @@ -34,6 +33,7 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.46.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/client/http/client.go b/client/http/client.go index 30144ebe2c5..7b34193c2a4 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -153,10 +153,11 @@ func (ci *clientInner) requestWithRetry( } // Copy a new backoffer for each request. bo := *reqInfo.bo - // Backoffer also needs to check the status code to determine whether to retry. + // Set the retryable checker for the backoffer if it's not set. bo.SetRetryableChecker(func(err error) bool { + // Backoffer also needs to check the status code to determine whether to retry. return err != nil && !noNeedRetry(statusCode) - }) + }, false) return bo.Exec(ctx, execFunc) } diff --git a/client/retry/backoff.go b/client/retry/backoff.go index 580e466badb..9161ad0fea1 100644 --- a/client/retry/backoff.go +++ b/client/retry/backoff.go @@ -24,12 +24,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" - "go.uber.org/multierr" "go.uber.org/zap" ) -const maxRecordErrorCount = 20 - // Option is used to customize the backoffer. type Option func(*Backoffer) @@ -50,7 +47,7 @@ type Backoffer struct { // total defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success. total time.Duration // retryableChecker is used to check if the error is retryable. - // By default, all errors are retryable. + // If it's not set, it will always retry unconditionally no matter what the error is. retryableChecker func(err error) bool // logInterval defines the log interval for retrying. logInterval time.Duration @@ -69,28 +66,22 @@ func (bo *Backoffer) Exec( ) error { defer bo.resetBackoff() var ( - allErrors error - err error - after *time.Timer + err error + after *time.Timer ) fnName := getFunctionName(fn) for { err = fn() bo.attempt++ - if bo.attempt < maxRecordErrorCount { - // multierr.Append will ignore nil error. - allErrors = multierr.Append(allErrors, err) - } - if !bo.isRetryable(err) { + if err == nil || !bo.isRetryable(err) { break } currentInterval := bo.nextInterval() bo.nextLogTime += currentInterval - if err != nil { - if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval { - bo.nextLogTime %= bo.logInterval - log.Warn("call PD API failed and retrying", zap.String("api", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err)) - } + if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval { + bo.nextLogTime %= bo.logInterval + log.Warn("[pd.backoffer] exec fn failed and retrying", + zap.String("fn-name", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err)) } if after == nil { after = time.NewTimer(currentInterval) @@ -100,7 +91,7 @@ func (bo *Backoffer) Exec( select { case <-ctx.Done(): after.Stop() - return multierr.Append(allErrors, errors.Trace(ctx.Err())) + return errors.Trace(ctx.Err()) case <-after.C: failpoint.Inject("backOffExecute", func() { testBackOffExecuteFlag = true @@ -115,7 +106,7 @@ func (bo *Backoffer) Exec( } } } - return allErrors + return err } // InitialBackoffer make the initial state for retrying. @@ -132,12 +123,9 @@ func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer total = base } bo := &Backoffer{ - base: base, - max: max, - total: total, - retryableChecker: func(err error) bool { - return err != nil - }, + base: base, + max: max, + total: total, next: base, currentTotal: 0, attempt: 0, @@ -148,8 +136,11 @@ func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer return bo } -// SetRetryableChecker sets the retryable checker. -func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool) { +// SetRetryableChecker sets the retryable checker, `overwrite` flag is used to indicate whether to overwrite the existing checker. +func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool, overwrite bool) { + if !overwrite && bo.retryableChecker != nil { + return + } bo.retryableChecker = checker } diff --git a/client/retry/backoff_test.go b/client/retry/backoff_test.go index 8df06b75f94..22d487b1885 100644 --- a/client/retry/backoff_test.go +++ b/client/retry/backoff_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "errors" + "fmt" "testing" "time" @@ -87,24 +88,64 @@ func TestBackoffer(t *testing.T) { return expectedErr }) re.InDelta(total, time.Since(start), float64(250*time.Millisecond)) - re.ErrorContains(err, "test; test; test; test") + re.ErrorContains(err, "test") re.ErrorIs(err, expectedErr) re.Equal(4, execCount) re.True(isBackofferReset(bo)) - // Test the retryable checker. + // Test the error returned. execCount = 0 - bo = InitialBackoffer(base, max, total) - bo.SetRetryableChecker(func(error) bool { - return execCount < 2 + err = bo.Exec(ctx, func() error { + execCount++ + return fmt.Errorf("test %d", execCount) }) + re.Error(err) + re.Equal("test 4", err.Error()) + re.Equal(4, execCount) + re.True(isBackofferReset(bo)) + execCount = 0 err = bo.Exec(ctx, func() error { + if execCount == 1 { + return nil + } execCount++ - return nil + return expectedErr }) + re.Equal(1, execCount) re.NoError(err) + re.True(isBackofferReset(bo)) + + // Test the retryable checker. + execCount = 0 + bo = InitialBackoffer(base, max, total) + retryableChecker := func(error) bool { + return execCount < 2 + } + bo.SetRetryableChecker(retryableChecker, false) + execFunc := func() error { + execCount++ + return expectedErr + } + err = bo.Exec(ctx, execFunc) + re.ErrorIs(err, expectedErr) + re.Equal(2, execCount) + re.True(isBackofferReset(bo)) + // Test the retryable checker with overwrite. + execCount = 0 + retryableChecker = func(error) bool { + return execCount < 4 + } + bo.SetRetryableChecker(retryableChecker, false) + err = bo.Exec(ctx, execFunc) + re.ErrorIs(err, expectedErr) re.Equal(2, execCount) re.True(isBackofferReset(bo)) + execCount = 0 + bo.SetRetryableChecker(retryableChecker, true) + err = bo.Exec(ctx, execFunc) + re.ErrorIs(err, expectedErr) + re.Equal(4, execCount) + re.True(isBackofferReset(bo)) } func isBackofferReset(bo *Backoffer) bool { @@ -129,21 +170,20 @@ func TestBackofferWithLog(t *testing.T) { // 10 + 20 + 40 + 80(log) + 100(log) * 9 >= 1000, so log ten times. re.Len(ms, 10) // 10 + 20 + 40 + 80 + 100 * 9, 13 times retry. - rfc := `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]` + rfc := `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]` re.Contains(ms[len(ms)-1], rfc) // 10 + 20 + 40 + 80(log), 4 times retry. - rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]` + rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]` re.Contains(ms[0], rfc) - bo.resetBackoff() err = bo.Exec(ctx, testFn) re.ErrorIs(err, errTest) ms = lg.Messages() re.Len(ms, 20) - rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]` + rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=13] [error=test]` re.Contains(ms[len(ms)-1], rfc) - rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]` + rfc = `["[pd.backoffer] exec fn failed and retrying"] [fn-name=testFn] [retry-time=4] [error=test]` re.Contains(ms[len1], rfc) } diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index fa109946e4b..9d7e0985940 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -21,6 +21,7 @@ import ( "net/url" "sort" "strings" + "sync" "testing" "time" @@ -531,14 +532,15 @@ func (suite *httpClientTestSuite) TestSchedulers() { defer cancel() schedulers, err := client.GetSchedulers(ctx) re.NoError(err) - re.Empty(schedulers) + const schedulerName = "evict-leader-scheduler" + re.NotContains(schedulers, schedulerName) - err = client.CreateScheduler(ctx, "evict-leader-scheduler", 1) + err = client.CreateScheduler(ctx, schedulerName, 1) re.NoError(err) schedulers, err = client.GetSchedulers(ctx) re.NoError(err) - re.Len(schedulers, 1) - err = client.SetSchedulerDelay(ctx, "evict-leader-scheduler", 100) + re.Contains(schedulers, schedulerName) + err = client.SetSchedulerDelay(ctx, schedulerName, 100) re.NoError(err) err = client.SetSchedulerDelay(ctx, "not-exist", 100) re.ErrorContains(err, "500 Internal Server Error") // TODO: should return friendly error message @@ -757,3 +759,43 @@ func (suite *httpClientTestSuite) TestGetHealthStatus() { re.Equal("pd2", healths[1].Name) re.True(healths[0].Health && healths[1].Health) } + +func (suite *httpClientTestSuite) TestRetryOnLeaderChange() { + re := suite.Require() + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + bo := retry.InitialBackoffer(100*time.Millisecond, time.Second, 0) + client := suite.client.WithBackoffer(bo) + for { + healths, err := client.GetHealthStatus(ctx) + if err != nil && strings.Contains(err.Error(), "context canceled") { + return + } + re.NoError(err) + re.Len(healths, 2) + select { + case <-ctx.Done(): + return + default: + } + } + }() + + leader := suite.cluster.GetLeaderServer() + re.NotNil(leader) + for i := 0; i < 3; i++ { + leader.ResignLeader() + re.NotEmpty(suite.cluster.WaitLeader()) + leader = suite.cluster.GetLeaderServer() + re.NotNil(leader) + } + + // Cancel the context to stop the goroutine. + cancel() + wg.Wait() +}