Skip to content

Commit

Permalink
support configuring backoff for client
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Oct 9, 2024
1 parent 697d907 commit 34b0620
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 122 deletions.
93 changes: 70 additions & 23 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ import (
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/tlsutil"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -304,6 +307,13 @@ func WithInitMetricsOption(initMetrics bool) ClientOption {
}
}

// WithBackoffer configures the client with backoffer.
func WithBackoffer(bo *retry.Backoffer) ClientOption {
return func(c *client) {
c.option.bo = bo
}
}

var _ Client = (*client)(nil)

// serviceModeKeeper is for service mode switching.
Expand Down Expand Up @@ -1033,35 +1043,72 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
}
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()

options := &GetRegionOp{}
for _, opt := range opts {
opt(options)
}
req := &pdpb.GetRegionRequest{
Header: c.requestHeader(),
RegionKey: key,
NeedBuckets: options.needBuckets,
}
serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
execFunc := func() (any, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()

options := &GetRegionOp{}
for _, opt := range opts {
opt(options)
}
req := &pdpb.GetRegionRequest{
Header: c.requestHeader(),
RegionKey: key,
NeedBuckets: options.needBuckets,
}
serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err = protoClient.GetRegion(cctx, req)
}
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err = protoClient.GetRegion(cctx, req)
}

if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
}
if c.option.bo == nil {
resp, err := execFunc()
if err != nil {
return nil, err
}
region, ok := resp.(*Region)
if !ok {
return nil, errs.ErrClientInvalidResponseType
}
return region, err
}
// Copy a new backoffer for each request.
bo := *c.option.bo
// Set the retryable checker for the backoffer if it's not set.
bo.SetRetryableChecker(func(err error) bool {
return err != nil && needRetry(err)
}, false)
resp, err := bo.ExecWithResult(ctx, execFunc)
if err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
region, ok := resp.(*Region)
if !ok {
return nil, errs.ErrClientInvalidResponseType
}
return region, err
}

func needRetry(err error) bool {
st, ok := status.FromError(err)
if !ok {
return false
}
return st.Code() == codes.ResourceExhausted
}

// GetPrevRegion implements the RPCClient interface.
Expand Down
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
// client errors
var (
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientInvalidResponseType = errors.Normalize("invalid response type", errors.RFCCodeText("PD:client:ErrClientInvalidResponseType"))
ErrClientGetMetaStorageClient = errors.Normalize("failed to get meta storage client", errors.RFCCodeText("PD:client:ErrClientGetMetaStorageClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
Expand Down
2 changes: 2 additions & 0 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/retry"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -61,6 +62,7 @@ type option struct {
useTSOServerProxy bool
metricsLabels prometheus.Labels
initMetrics bool
bo *retry.Backoffer

// Dynamic options.
dynamicOptions [dynamicOptionCount]atomic.Value
Expand Down
51 changes: 51 additions & 0 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,57 @@ func (bo *Backoffer) Exec(
return err
}

// ExecWithResult is a helper function to exec backoff with result.
func (bo *Backoffer) ExecWithResult(
ctx context.Context,
fn func() (any, error),
) (any, error) {
defer bo.resetBackoff()
var (
resp any
err error
after *time.Timer
)
fnName := getFunctionName(fn)
for {
resp, err = fn()
bo.attempt++
if err == nil || !bo.isRetryable(err) {
break
}
currentInterval := bo.nextInterval()
bo.nextLogTime += currentInterval
if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval {
bo.nextLogTime %= bo.logInterval
log.Warn("[pd.backoffer] exec fn failed and retrying",
zap.String("fn-name", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err))
}
if after == nil {
after = time.NewTimer(currentInterval)
} else {
after.Reset(currentInterval)
}
select {
case <-ctx.Done():
after.Stop()
return nil, errors.Trace(ctx.Err())
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
})
}
after.Stop()
// If the current total time exceeds the maximum total time, return the last error.
if bo.total > 0 {
bo.currentTotal += currentInterval
if bo.currentTotal >= bo.total {
break
}
}
}
return resp, err
}

// InitialBackoffer make the initial state for retrying.
// - `base` defines the initial time interval to wait before each retry.
// - `max` defines the max time interval to wait before each retry.
Expand Down
Loading

0 comments on commit 34b0620

Please sign in to comment.