Skip to content

Commit

Permalink
remove function
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Aug 5, 2024
1 parent 4cb4135 commit 9940598
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 118 deletions.
4 changes: 2 additions & 2 deletions pkg/mcs/utils/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import "time"
const (
// ClusterIDPath is the path to store cluster id
ClusterIDPath = "/pd/cluster_id"
// RetryIntervalWaitAPIService is the interval to retry.
// RetryInterval is the interval to retry.
// Note: the interval must be less than the timeout of tidb and tikv, which is 2s by default in tikv.
RetryIntervalWaitAPIService = 500 * time.Millisecond
RetryInterval = 500 * time.Millisecond

// TCPNetworkStr is the string of tcp network
TCPNetworkStr = "tcp"
Expand Down
73 changes: 8 additions & 65 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import (
"time"

"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -49,28 +47,26 @@ import (
"google.golang.org/grpc/keepalive"
)

const (
// maxRetryTimes is the max retry times for initializing the cluster ID.
maxRetryTimes = 5
// retryInterval is the interval to retry.
retryInterval = time.Second
)

// InitClusterID initializes the cluster ID.
func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err error) {
ticker := time.NewTicker(retryInterval)
ticker := time.NewTicker(constant.RetryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
retryTimes := 0
for {
if clusterID, err := etcdutil.GetClusterID(client, constant.ClusterIDPath); err == nil && clusterID != 0 {
return clusterID, nil
}
select {
case <-ctx.Done():
return 0, err
case <-ticker.C:
retryTimes++
if retryTimes/500 > 0 {
log.Warn("etcd is not ready, retrying", errs.ZapError(err))
retryTimes /= 500
}
}
}
return 0, errors.Errorf("failed to init cluster ID after retrying %d times", maxRetryTimes)
}

// PromHandler is a handler to get prometheus metrics.
Expand Down Expand Up @@ -121,59 +117,6 @@ type server interface {
Name() string
}

// WaitAPIServiceReady waits for the api service ready.
func WaitAPIServiceReady(s server) error {
var (
ready bool
err error
)
ticker := time.NewTicker(constant.RetryIntervalWaitAPIService)
defer ticker.Stop()
retryTimes := 0
for {
ready, err = isAPIServiceReady(s)
if err == nil && ready {
return nil
}
select {
case <-s.Context().Done():
return errors.New("context canceled while waiting api server ready")
case <-ticker.C:
retryTimes++
if retryTimes/500 > 0 {
log.Warn("api server is not ready, retrying", errs.ZapError(err))
retryTimes /= 500
}
}
}
}

func isAPIServiceReady(s server) (bool, error) {
urls := strings.Split(s.GetBackendEndpoints(), ",")
if len(urls) == 0 {
return false, errors.New("no backend endpoints")
}
cc, err := s.GetDelegateClient(s.Context(), s.GetTLSConfig(), urls[0])
if err != nil {
return false, err
}
clusterInfo, err := pdpb.NewPDClient(cc).GetClusterInfo(s.Context(), &pdpb.GetClusterInfoRequest{})
if err != nil {
return false, err
}
if clusterInfo.GetHeader().GetError() != nil {
return false, errors.Errorf(clusterInfo.GetHeader().GetError().String())
}
modes := clusterInfo.ServiceModes
if len(modes) == 0 {
return false, errors.New("no service mode")
}
if modes[0] == pdpb.ServiceMode_API_SVC_MODE {
return true, nil
}
return false, nil
}

// InitClient initializes the etcd and http clients.
func InitClient(s server) error {
tlsConfig, err := s.GetTLSConfig().ToTLSConfig()
Expand Down
51 changes: 0 additions & 51 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,57 +210,6 @@ func getEtcdTimestampKeyNum(re *require.Assertions, client *clientv3.Client) int
return count
}

func TestWaitAPIServiceReady(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

startCluster := func(isAPIServiceMode bool) (cluster *tests.TestCluster, backendEndpoints string) {
var err error
if isAPIServiceMode {
cluster, err = tests.NewTestAPICluster(ctx, 1)
} else {
cluster, err = tests.NewTestCluster(ctx, 1)
}
re.NoError(err)
err = cluster.RunInitialServers()
re.NoError(err)
leaderName := cluster.WaitLeader()
re.NotEmpty(leaderName)
pdLeader := cluster.GetServer(leaderName)
return cluster, pdLeader.GetAddr()
}

// tso server cannot be started because the pd server is not ready as api service.
cluster, backendEndpoints := startCluster(false /*isAPIServiceMode*/)
sctx, scancel := context.WithTimeout(ctx, time.Second*10)
defer scancel()
s, _, err := tests.StartSingleTSOTestServerWithoutCheck(sctx, re, backendEndpoints, tempurl.Alloc())
re.Error(err)
re.Nil(s)
cluster.Destroy()

// tso server can be started because the pd server is ready as api service.
cluster, backendEndpoints = startCluster(true /*isAPIServiceMode*/)
sctx, scancel = context.WithTimeout(ctx, time.Second*10)
defer scancel()
s, cleanup, err := tests.StartSingleTSOTestServerWithoutCheck(sctx, re, backendEndpoints, tempurl.Alloc())
re.NoError(err)
defer cluster.Destroy()
defer cleanup()

for i := 0; i < 12; i++ {
select {
case <-time.After(time.Second):
case <-sctx.Done():
return
}
if s != nil && s.IsServing() {
break
}
}
}

type APIServerForward struct {
re *require.Assertions
ctx context.Context
Expand Down

0 comments on commit 9940598

Please sign in to comment.