diff --git a/pkg/mcs/utils/constant/constant.go b/pkg/mcs/utils/constant/constant.go index b064926b7b17..cd01c94f3e0f 100644 --- a/pkg/mcs/utils/constant/constant.go +++ b/pkg/mcs/utils/constant/constant.go @@ -19,9 +19,9 @@ import "time" const ( // ClusterIDPath is the path to store cluster id ClusterIDPath = "/pd/cluster_id" - // RetryIntervalWaitAPIService is the interval to retry. + // RetryInterval is the interval to retry. // Note: the interval must be less than the timeout of tidb and tikv, which is 2s by default in tikv. - RetryIntervalWaitAPIService = 500 * time.Millisecond + RetryInterval = 500 * time.Millisecond // TCPNetworkStr is the string of tcp network TCPNetworkStr = "tcp" diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index e5e00cb8e83b..a60ddc12b475 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -26,9 +26,7 @@ import ( "time" "github.com/gin-gonic/gin" - "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/diagnosticspb" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -49,18 +47,12 @@ import ( "google.golang.org/grpc/keepalive" ) -const ( - // maxRetryTimes is the max retry times for initializing the cluster ID. - maxRetryTimes = 5 - // retryInterval is the interval to retry. - retryInterval = time.Second -) - // InitClusterID initializes the cluster ID. func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err error) { - ticker := time.NewTicker(retryInterval) + ticker := time.NewTicker(constant.RetryInterval) defer ticker.Stop() - for i := 0; i < maxRetryTimes; i++ { + retryTimes := 0 + for { if clusterID, err := etcdutil.GetClusterID(client, constant.ClusterIDPath); err == nil && clusterID != 0 { return clusterID, nil } @@ -68,9 +60,13 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err case <-ctx.Done(): return 0, err case <-ticker.C: + retryTimes++ + if retryTimes/500 > 0 { + log.Warn("etcd is not ready, retrying", errs.ZapError(err)) + retryTimes /= 500 + } } } - return 0, errors.Errorf("failed to init cluster ID after retrying %d times", maxRetryTimes) } // PromHandler is a handler to get prometheus metrics. @@ -121,59 +117,6 @@ type server interface { Name() string } -// WaitAPIServiceReady waits for the api service ready. -func WaitAPIServiceReady(s server) error { - var ( - ready bool - err error - ) - ticker := time.NewTicker(constant.RetryIntervalWaitAPIService) - defer ticker.Stop() - retryTimes := 0 - for { - ready, err = isAPIServiceReady(s) - if err == nil && ready { - return nil - } - select { - case <-s.Context().Done(): - return errors.New("context canceled while waiting api server ready") - case <-ticker.C: - retryTimes++ - if retryTimes/500 > 0 { - log.Warn("api server is not ready, retrying", errs.ZapError(err)) - retryTimes /= 500 - } - } - } -} - -func isAPIServiceReady(s server) (bool, error) { - urls := strings.Split(s.GetBackendEndpoints(), ",") - if len(urls) == 0 { - return false, errors.New("no backend endpoints") - } - cc, err := s.GetDelegateClient(s.Context(), s.GetTLSConfig(), urls[0]) - if err != nil { - return false, err - } - clusterInfo, err := pdpb.NewPDClient(cc).GetClusterInfo(s.Context(), &pdpb.GetClusterInfoRequest{}) - if err != nil { - return false, err - } - if clusterInfo.GetHeader().GetError() != nil { - return false, errors.Errorf(clusterInfo.GetHeader().GetError().String()) - } - modes := clusterInfo.ServiceModes - if len(modes) == 0 { - return false, errors.New("no service mode") - } - if modes[0] == pdpb.ServiceMode_API_SVC_MODE { - return true, nil - } - return false, nil -} - // InitClient initializes the etcd and http clients. func InitClient(s server) error { tlsConfig, err := s.GetTLSConfig().ToTLSConfig() diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 0fe64b8d329e..dc792d7a4621 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -210,57 +210,6 @@ func getEtcdTimestampKeyNum(re *require.Assertions, client *clientv3.Client) int return count } -func TestWaitAPIServiceReady(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - startCluster := func(isAPIServiceMode bool) (cluster *tests.TestCluster, backendEndpoints string) { - var err error - if isAPIServiceMode { - cluster, err = tests.NewTestAPICluster(ctx, 1) - } else { - cluster, err = tests.NewTestCluster(ctx, 1) - } - re.NoError(err) - err = cluster.RunInitialServers() - re.NoError(err) - leaderName := cluster.WaitLeader() - re.NotEmpty(leaderName) - pdLeader := cluster.GetServer(leaderName) - return cluster, pdLeader.GetAddr() - } - - // tso server cannot be started because the pd server is not ready as api service. - cluster, backendEndpoints := startCluster(false /*isAPIServiceMode*/) - sctx, scancel := context.WithTimeout(ctx, time.Second*10) - defer scancel() - s, _, err := tests.StartSingleTSOTestServerWithoutCheck(sctx, re, backendEndpoints, tempurl.Alloc()) - re.Error(err) - re.Nil(s) - cluster.Destroy() - - // tso server can be started because the pd server is ready as api service. - cluster, backendEndpoints = startCluster(true /*isAPIServiceMode*/) - sctx, scancel = context.WithTimeout(ctx, time.Second*10) - defer scancel() - s, cleanup, err := tests.StartSingleTSOTestServerWithoutCheck(sctx, re, backendEndpoints, tempurl.Alloc()) - re.NoError(err) - defer cluster.Destroy() - defer cleanup() - - for i := 0; i < 12; i++ { - select { - case <-time.After(time.Second): - case <-sctx.Done(): - return - } - if s != nil && s.IsServing() { - break - } - } -} - type APIServerForward struct { re *require.Assertions ctx context.Context