From 34b062028c8b3a03e1ed1db75d9e72cc984e59bf Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 9 Oct 2024 15:47:26 +0800 Subject: [PATCH] support configuring backoff for client Signed-off-by: Ryan Leung --- client/client.go | 93 ++++++++++++---- client/errs/errno.go | 1 + client/option.go | 2 + client/retry/backoff.go | 51 +++++++++ server/grpc_service.go | 135 ++++++----------------- tests/integrations/client/client_test.go | 41 +++++++ 6 files changed, 201 insertions(+), 122 deletions(-) diff --git a/client/client.go b/client/client.go index 3faa3a09215..99dbb044c40 100644 --- a/client/client.go +++ b/client/client.go @@ -33,10 +33,13 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/retry" "github.com/tikv/pd/client/tlsutil" "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -304,6 +307,13 @@ func WithInitMetricsOption(initMetrics bool) ClientOption { } } +// WithBackoffer configures the client with backoffer. +func WithBackoffer(bo *retry.Backoffer) ClientOption { + return func(c *client) { + c.option.bo = bo + } +} + var _ Client = (*client)(nil) // serviceModeKeeper is for service mode switching. @@ -1033,35 +1043,72 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt } start := time.Now() defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) - defer cancel() - options := &GetRegionOp{} - for _, opt := range opts { - opt(options) - } - req := &pdpb.GetRegionRequest{ - Header: c.requestHeader(), - RegionKey: key, - NeedBuckets: options.needBuckets, - } - serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle()) - if serviceClient == nil { - return nil, errs.ErrClientGetProtoClient - } - resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req) - if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) { - protoClient, cctx := c.getClientAndContext(ctx) - if protoClient == nil { + execFunc := func() (any, error) { + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + defer cancel() + + options := &GetRegionOp{} + for _, opt := range opts { + opt(options) + } + req := &pdpb.GetRegionRequest{ + Header: c.requestHeader(), + RegionKey: key, + NeedBuckets: options.needBuckets, + } + serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle()) + if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } - resp, err = protoClient.GetRegion(cctx, req) - } + resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req) + if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) { + protoClient, cctx := c.getClientAndContext(ctx) + if protoClient == nil { + return nil, errs.ErrClientGetProtoClient + } + resp, err = protoClient.GetRegion(cctx, req) + } - if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil { + if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil { + return nil, err + } + return handleRegionResponse(resp), nil + } + if c.option.bo == nil { + resp, err := execFunc() + if err != nil { + return nil, err + } + region, ok := resp.(*Region) + if !ok { + return nil, errs.ErrClientInvalidResponseType + } + return region, err + } + // Copy a new backoffer for each request. + bo := *c.option.bo + // Set the retryable checker for the backoffer if it's not set. + bo.SetRetryableChecker(func(err error) bool { + return err != nil && needRetry(err) + }, false) + resp, err := bo.ExecWithResult(ctx, execFunc) + if err != nil { return nil, err } - return handleRegionResponse(resp), nil + region, ok := resp.(*Region) + if !ok { + return nil, errs.ErrClientInvalidResponseType + } + return region, err +} + +func needRetry(err error) bool { + st, ok := status.FromError(err) + if !ok { + return false + } + return st.Code() == codes.ResourceExhausted } // GetPrevRegion implements the RPCClient interface. diff --git a/client/errs/errno.go b/client/errs/errno.go index 95c6bffdfa4..5b46af10da6 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -39,6 +39,7 @@ const ( // client errors var ( ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient")) + ErrClientInvalidResponseType = errors.Normalize("invalid response type", errors.RFCCodeText("PD:client:ErrClientInvalidResponseType")) ErrClientGetMetaStorageClient = errors.Normalize("failed to get meta storage client", errors.RFCCodeText("PD:client:ErrClientGetMetaStorageClient")) ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream")) ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed")) diff --git a/client/option.go b/client/option.go index 3f2b7119b52..2759180e7e6 100644 --- a/client/option.go +++ b/client/option.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/pd/client/retry" "google.golang.org/grpc" ) @@ -61,6 +62,7 @@ type option struct { useTSOServerProxy bool metricsLabels prometheus.Labels initMetrics bool + bo *retry.Backoffer // Dynamic options. dynamicOptions [dynamicOptionCount]atomic.Value diff --git a/client/retry/backoff.go b/client/retry/backoff.go index 9161ad0fea1..4331982b2ca 100644 --- a/client/retry/backoff.go +++ b/client/retry/backoff.go @@ -109,6 +109,57 @@ func (bo *Backoffer) Exec( return err } +// ExecWithResult is a helper function to exec backoff with result. +func (bo *Backoffer) ExecWithResult( + ctx context.Context, + fn func() (any, error), +) (any, error) { + defer bo.resetBackoff() + var ( + resp any + err error + after *time.Timer + ) + fnName := getFunctionName(fn) + for { + resp, err = fn() + bo.attempt++ + if err == nil || !bo.isRetryable(err) { + break + } + currentInterval := bo.nextInterval() + bo.nextLogTime += currentInterval + if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval { + bo.nextLogTime %= bo.logInterval + log.Warn("[pd.backoffer] exec fn failed and retrying", + zap.String("fn-name", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err)) + } + if after == nil { + after = time.NewTimer(currentInterval) + } else { + after.Reset(currentInterval) + } + select { + case <-ctx.Done(): + after.Stop() + return nil, errors.Trace(ctx.Err()) + case <-after.C: + failpoint.Inject("backOffExecute", func() { + testBackOffExecuteFlag = true + }) + } + after.Stop() + // If the current total time exceeds the maximum total time, return the last error. + if bo.total > 0 { + bo.currentTotal += currentInterval + if bo.currentTotal >= bo.total { + break + } + } + } + return resp, err +} + // InitialBackoffer make the initial state for retrying. // - `base` defines the initial time interval to wait before each retry. // - `max` defines the max time interval to wait before each retry. diff --git a/server/grpc_service.go b/server/grpc_service.go index 9e892dda161..de4cfef43d3 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -300,9 +300,7 @@ func (s *GrpcServer) GetMinTS( if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetMinTSResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -457,9 +455,7 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetMembersResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } // Here we purposely do not check the cluster ID because the client does not know the correct cluster ID @@ -605,9 +601,7 @@ func (s *GrpcServer) Bootstrap(ctx context.Context, request *pdpb.BootstrapReque if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.BootstrapResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -649,9 +643,7 @@ func (s *GrpcServer) IsBootstrapped(ctx context.Context, request *pdpb.IsBootstr if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.IsBootstrappedResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -678,9 +670,7 @@ func (s *GrpcServer) AllocID(ctx context.Context, request *pdpb.AllocIDRequest) if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.AllocIDResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -714,9 +704,7 @@ func (s *GrpcServer) IsSnapshotRecovering(ctx context.Context, _ *pdpb.IsSnapsho if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.IsSnapshotRecoveringResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } // recovering mark is stored in etcd directly, there's no need to forward. @@ -740,9 +728,7 @@ func (s *GrpcServer) GetStore(ctx context.Context, request *pdpb.GetStoreRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetStoreResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -796,9 +782,7 @@ func (s *GrpcServer) PutStore(ctx context.Context, request *pdpb.PutStoreRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.PutStoreResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -853,9 +837,7 @@ func (s *GrpcServer) GetAllStores(ctx context.Context, request *pdpb.GetAllStore if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetAllStoresResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -898,9 +880,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.StoreHeartbeatResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, errs.ErrRateLimitExceeded.FastGenByArgs().Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1394,15 +1374,16 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error // GetRegion implements gRPC PDServer. func (s *GrpcServer) GetRegion(ctx context.Context, request *pdpb.GetRegionRequest) (*pdpb.GetRegionResponse, error) { + failpoint.Inject("rateLimit", func() { + failpoint.Return(nil, status.Error(codes.ResourceExhausted, errs.ErrRateLimitExceeded.Error())) + }) if s.GetServiceMiddlewarePersistOptions().IsGRPCRateLimitEnabled() { fName := currentFunction() limiter := s.GetGRPCRateLimiter() if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetRegionResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1464,9 +1445,7 @@ func (s *GrpcServer) GetPrevRegion(ctx context.Context, request *pdpb.GetRegionR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetRegionResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1523,9 +1502,7 @@ func (s *GrpcServer) GetRegionByID(ctx context.Context, request *pdpb.GetRegionB if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetRegionResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1585,9 +1562,7 @@ func (s *GrpcServer) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ScanRegionsResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1643,9 +1618,7 @@ func (s *GrpcServer) BatchScanRegions(ctx context.Context, request *pdpb.BatchSc if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.BatchScanRegionsResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1735,9 +1708,7 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.AskSplitResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1781,9 +1752,7 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.AskBatchSplitResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } @@ -1857,9 +1826,7 @@ func (s *GrpcServer) ReportSplit(ctx context.Context, request *pdpb.ReportSplitR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ReportSplitResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1895,9 +1862,7 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ReportBatchSplitResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1934,9 +1899,7 @@ func (s *GrpcServer) GetClusterConfig(ctx context.Context, request *pdpb.GetClus if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetClusterConfigResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1966,9 +1929,7 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.PutClusterConfigResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2007,9 +1968,7 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ScatterRegionResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } @@ -2121,9 +2080,7 @@ func (s *GrpcServer) GetGCSafePoint(ctx context.Context, request *pdpb.GetGCSafe if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetGCSafePointResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2180,9 +2137,7 @@ func (s *GrpcServer) UpdateGCSafePoint(ctx context.Context, request *pdpb.Update if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.UpdateGCSafePointResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2229,9 +2184,7 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.UpdateServiceGCSafePointResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2285,9 +2238,7 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetOperatorResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } @@ -2500,9 +2451,7 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.SyncMaxTSResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } tsoAllocatorManager := s.GetTSOAllocatorManager() @@ -2605,9 +2554,7 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.SplitRegionsResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } @@ -2671,9 +2618,7 @@ func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.S if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.SplitAndScatterRegionsResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2737,9 +2682,7 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetDCLocationInfoResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } am := s.GetTSOAllocatorManager() @@ -2999,9 +2942,7 @@ func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.Repo if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ReportMinResolvedTsResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -3039,9 +2980,7 @@ func (s *GrpcServer) SetExternalTimestamp(ctx context.Context, request *pdpb.Set if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.SetExternalTimestampResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -3077,9 +3016,7 @@ func (s *GrpcServer) GetExternalTimestamp(ctx context.Context, request *pdpb.Get if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetExternalTimestampResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 9f0b5f8d523..7554a115863 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -2178,3 +2178,44 @@ func (suite *clientTestSuite) TestBatchScanRegions() { return err != nil && strings.Contains(err.Error(), "found a hole region between") }) } + +func TestGetRegionWithBackoff(t *testing.T) { + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/rateLimit", "return(true)")) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cluster, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + defer cluster.Destroy() + endpoints := runServer(re, cluster) + + // Define the backoff parameters + base := 100 * time.Millisecond + max := 500 * time.Millisecond + total := 3 * time.Second + + // Create a backoff strategy + bo := retry.InitialBackoffer(base, max, total) + + // Initialize the client with context and backoff + client, err := pd.NewClientWithContext(ctx, endpoints, pd.SecurityOption{}, pd.WithBackoffer(bo)) + re.NoError(err) + + // Record the start time + start := time.Now() + + // Call GetRegion and expect it to handle backoff internally + _, err = client.GetRegion(ctx, []byte("key")) + re.Error(err) + // Calculate the elapsed time + elapsed := time.Since(start) + // Verify that some backoff occurred by checking if the elapsed time is greater than the base backoff + re.Greater(elapsed, base, "Expected some backoff to have occurred") + + re.NoError(failpoint.Disable("github.com/tikv/pd/server/rateLimit")) + // Call GetRegion again and expect it to succeed + region, err := client.GetRegion(ctx, []byte("key")) + re.NoError(err) + re.Equal(uint64(2), region.Meta.Id) // Adjust this based on expected region +}