Skip to content

Commit

Permalink
Only return the latest error in backoffer
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed May 30, 2024
1 parent c498063 commit 612870a
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 19 deletions.
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.62.1
Expand All @@ -34,6 +33,7 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
16 changes: 4 additions & 12 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"go.uber.org/multierr"
"go.uber.org/zap"
)

const maxRecordErrorCount = 20

// Option is used to customize the backoffer.
type Option func(*Backoffer)

Expand Down Expand Up @@ -69,18 +66,13 @@ func (bo *Backoffer) Exec(
) error {
defer bo.resetBackoff()
var (
allErrors error
err error
after *time.Timer
err error
after *time.Timer
)
fnName := getFunctionName(fn)
for {
err = fn()
bo.attempt++
if bo.attempt < maxRecordErrorCount {
// multierr.Append will ignore nil error.
allErrors = multierr.Append(allErrors, err)
}
if !bo.isRetryable(err) {
break
}
Expand All @@ -100,7 +92,7 @@ func (bo *Backoffer) Exec(
select {
case <-ctx.Done():
after.Stop()
return multierr.Append(allErrors, errors.Trace(ctx.Err()))
return errors.Trace(ctx.Err())
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
Expand All @@ -115,7 +107,7 @@ func (bo *Backoffer) Exec(
}
}
}
return allErrors
return err
}

// InitialBackoffer make the initial state for retrying.
Expand Down
26 changes: 24 additions & 2 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -87,11 +88,33 @@ func TestBackoffer(t *testing.T) {
return expectedErr
})
re.InDelta(total, time.Since(start), float64(250*time.Millisecond))
re.ErrorContains(err, "test; test; test; test")
re.ErrorContains(err, "test")
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))

// Test the error returned.
execCount = 0
err = bo.Exec(ctx, func() error {
execCount++
return fmt.Errorf("test %d", execCount)
})
re.Error(err)
re.Equal("test 4", err.Error())
re.Equal(4, execCount)
re.True(isBackofferReset(bo))
execCount = 0
err = bo.Exec(ctx, func() error {
if execCount == 1 {
return nil
}
execCount++
return expectedErr
})
re.Equal(1, execCount)
re.NoError(err)
re.True(isBackofferReset(bo))

// Test the retryable checker.
execCount = 0
bo = InitialBackoffer(base, max, total)
Expand Down Expand Up @@ -135,7 +158,6 @@ func TestBackofferWithLog(t *testing.T) {
rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]`
re.Contains(ms[0], rfc)

bo.resetBackoff()
err = bo.Exec(ctx, testFn)
re.ErrorIs(err, errTest)

Expand Down
50 changes: 46 additions & 4 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -531,14 +532,15 @@ func (suite *httpClientTestSuite) TestSchedulers() {
defer cancel()
schedulers, err := client.GetSchedulers(ctx)
re.NoError(err)
re.Empty(schedulers)
const schedulerName = "evict-leader-scheduler"
re.NotContains(schedulers, schedulerName)

err = client.CreateScheduler(ctx, "evict-leader-scheduler", 1)
err = client.CreateScheduler(ctx, schedulerName, 1)
re.NoError(err)
schedulers, err = client.GetSchedulers(ctx)
re.NoError(err)
re.Len(schedulers, 1)
err = client.SetSchedulerDelay(ctx, "evict-leader-scheduler", 100)
re.Contains(schedulers, schedulerName)
err = client.SetSchedulerDelay(ctx, schedulerName, 100)
re.NoError(err)
err = client.SetSchedulerDelay(ctx, "not-exist", 100)
re.ErrorContains(err, "500 Internal Server Error") // TODO: should return friendly error message
Expand Down Expand Up @@ -757,3 +759,43 @@ func (suite *httpClientTestSuite) TestGetHealthStatus() {
re.Equal("pd2", healths[1].Name)
re.True(healths[0].Health && healths[1].Health)
}

func (suite *httpClientTestSuite) TestRetryOnLeaderChange() {
re := suite.Require()
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
bo := retry.InitialBackoffer(100*time.Millisecond, time.Second, 0)
client := suite.client.WithBackoffer(bo)
for {
healths, err := client.GetHealthStatus(ctx)
if err != nil && strings.Contains(err.Error(), "context canceled") {
return
}
re.NoError(err)
re.Len(healths, 2)
select {
case <-ctx.Done():
return
default:
}
}
}()

leader := suite.cluster.GetLeaderServer()
re.NotNil(leader)
for i := 0; i < 3; i++ {
leader.ResignLeader()
re.NotEmpty(suite.cluster.WaitLeader())
leader = suite.cluster.GetLeaderServer()
re.NotNil(leader)
}

// Cancel the context to stop the goroutine.
cancel()
wg.Wait()
}

0 comments on commit 612870a

Please sign in to comment.