Skip to content

Commit

Permalink
playground/pdms: wait for tso component ready (#2394)
Browse files Browse the repository at this point in the history
* wait for tso

Signed-off-by: husharp <[email protected]>

* change api

Signed-off-by: husharp <[email protected]>

* fix test

Signed-off-by: husharp <[email protected]>

---------

Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp authored Apr 9, 2024
1 parent ea64109 commit 11f6005
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 8 deletions.
37 changes: 30 additions & 7 deletions components/playground/instance/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,26 @@ import (
"fmt"
"path/filepath"
"strings"
"time"

"github.com/pingcap/tiup/pkg/cluster/api"
tiupexec "github.com/pingcap/tiup/pkg/exec"
"github.com/pingcap/tiup/pkg/utils"
)

// TiKVInstance represent a running tikv-server
type TiKVInstance struct {
instance
pds []*PDInstance
pds []*PDInstance
tsos []*PDInstance
Process
isCSEMode bool
cseOpts CSEOptions
isCSEMode bool
cseOpts CSEOptions
isPDMSMode bool
}

// NewTiKVInstance return a TiKVInstance
func NewTiKVInstance(binPath string, dir, host, configPath string, id int, port int, pds []*PDInstance, isCSEMode bool, cseOptions CSEOptions) *TiKVInstance {
func NewTiKVInstance(binPath string, dir, host, configPath string, id int, port int, pds []*PDInstance, tsos []*PDInstance, isCSEMode bool, cseOptions CSEOptions, isPDMSMode bool) *TiKVInstance {
if port <= 0 {
port = 20160
}
Expand All @@ -47,9 +51,11 @@ func NewTiKVInstance(binPath string, dir, host, configPath string, id int, port
StatusPort: utils.MustGetFreePort(host, 20180),
ConfigPath: configPath,
},
pds: pds,
isCSEMode: isCSEMode,
cseOpts: cseOptions,
pds: pds,
tsos: tsos,
isCSEMode: isCSEMode,
cseOpts: cseOptions,
isPDMSMode: isPDMSMode,
}
}

Expand All @@ -69,6 +75,23 @@ func (inst *TiKVInstance) Start(ctx context.Context, version utils.Version) erro
return err
}

// Need to check tso status
if inst.isPDMSMode {
var tsoEnds []string
for _, pd := range inst.tsos {
tsoEnds = append(tsoEnds, fmt.Sprintf("%s:%d", AdvertiseHost(pd.Host), pd.StatusPort))
}
pdcli := api.NewPDClient(ctx,
tsoEnds, 10*time.Second, nil,
)
if err := pdcli.CheckTSOHealth(&utils.RetryOption{
Delay: time.Second * 5,
Timeout: time.Second * 300,
}); err != nil {
return err
}
}

endpoints := pdEndpoints(inst.pds, true)
args := []string{
fmt.Sprintf("--addr=%s", utils.JoinHostPort(inst.Host, inst.Port)),
Expand Down
2 changes: 1 addition & 1 deletion components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func (p *Playground) addInstance(componentID string, pdRole instance.PDRole, tif
ins = inst
p.tidbs = append(p.tidbs, inst)
case spec.ComponentTiKV:
inst := instance.NewTiKVInstance(cfg.BinPath, dir, host, cfg.ConfigPath, id, cfg.Port, p.pds, p.bootOptions.Mode == "tidb-cse", p.bootOptions.CSEOpts)
inst := instance.NewTiKVInstance(cfg.BinPath, dir, host, cfg.ConfigPath, id, cfg.Port, p.pds, p.tsos, p.bootOptions.Mode == "tidb-cse", p.bootOptions.CSEOpts, p.bootOptions.PDMode == "ms")
ins = inst
p.tikvs = append(p.tikvs, inst)
case spec.ComponentTiFlash:
Expand Down
28 changes: 28 additions & 0 deletions pkg/cluster/api/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ var (
pdStoresURI = "pd/api/v1/stores"
pdStoresLimitURI = "pd/api/v1/stores/limit"
pdRegionsCheckURI = "pd/api/v1/regions/check"
tsoHealthPrefix = "tso/api/v1/health"
)

func tryURLs(endpoints []string, f func(endpoint string) ([]byte, error)) ([]byte, error) {
Expand Down Expand Up @@ -198,6 +199,33 @@ func (pc *PDClient) CheckHealth() error {
return nil
}

// CheckTSOHealth checks the health of TSO service(which is a Micro Service component of PD)
func (pc *PDClient) CheckTSOHealth(retryOpt *utils.RetryOption) error {
servicePrefix := fmt.Sprintf("tso/%s", tsoHealthPrefix)
endpoints := pc.getEndpoints(servicePrefix)

if err := utils.Retry(func() error {
var err error
for _, endpoint := range endpoints {
_, err = pc.httpClient.Get(pc.ctx, endpoint)
if err != nil {
return err
}
}
if err == nil {
return nil
}

// return error by default, to make the retry work
pc.l().Debugf("Still waiting for the PD Micro Service's TSO health")
return perrs.New("Still waiting for the PD Micro Service's TSO health")
}, *retryOpt); err != nil {
return fmt.Errorf("error check PD Micro Service's TSO health, %v", err)
}

return nil
}

// GetStores queries the stores info from PD server
func (pc *PDClient) GetStores() (*StoresInfo, error) {
// Return all stores
Expand Down

0 comments on commit 11f6005

Please sign in to comment.