From b02094315e41586f554ac033ec38e76ff8c66583 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 10 Jul 2023 16:17:27 +0800 Subject: [PATCH] support microservices mode for playground Signed-off-by: Ryan Leung --- components/playground/command.go | 1 - components/playground/instance/instance.go | 3 + components/playground/instance/pd.go | 110 +++++++++++++++------ components/playground/main.go | 32 +++++- components/playground/playground.go | 84 +++++++++++----- pkg/tidbver/tidbver.go | 5 + 6 files changed, 176 insertions(+), 59 deletions(-) diff --git a/components/playground/command.go b/components/playground/command.go index 49bf1a649c..31f9a19718 100644 --- a/components/playground/command.go +++ b/components/playground/command.go @@ -102,7 +102,6 @@ func newScaleOut() *cobra.Command { cmd.Flags().IntVarP(&opt.TiKVCDC.Num, "kvcdc", "", opt.TiKVCDC.Num, "TiKV-CDC instance number") cmd.Flags().IntVarP(&opt.Pump.Num, "pump", "", opt.Pump.Num, "Pump instance number") cmd.Flags().IntVarP(&opt.Drainer.Num, "drainer", "", opt.Pump.Num, "Drainer instance number") - cmd.Flags().StringVarP(&opt.TiDB.Host, "db.host", "", opt.TiDB.Host, "Playground TiDB host. If not provided, TiDB will still use `host` flag as its host") cmd.Flags().StringVarP(&opt.PD.Host, "pd.host", "", opt.PD.Host, "Playground PD host. If not provided, PD will still use `host` flag as its host") diff --git a/components/playground/instance/instance.go b/components/playground/instance/instance.go index 3a87555588..3071a7a230 100644 --- a/components/playground/instance/instance.go +++ b/components/playground/instance/instance.go @@ -109,6 +109,9 @@ func logIfErr(err error) { func pdEndpoints(pds []*PDInstance, isHTTP bool) []string { var endpoints []string for _, pd := range pds { + if pd.Role == PDRoleTSO { + continue + } if isHTTP { endpoints = append(endpoints, "http://"+utils.JoinHostPort(AdvertiseHost(pd.Host), pd.StatusPort)) } else { diff --git a/components/playground/instance/pd.go b/components/playground/instance/pd.go index b37bee57de..945442a03b 100644 --- a/components/playground/instance/pd.go +++ b/components/playground/instance/pd.go @@ -24,16 +24,32 @@ import ( "github.com/pingcap/tiup/pkg/utils" ) +// PDRole is the role of PD. +type PDRole string + +const ( + // PDRoleNormal is the default role of PD + PDRoleNormal PDRole = "pd" + // PDRoleAPI is the role of PD API + PDRoleAPI PDRole = "api" + // PDRoleTSO is the role of PD TSO + PDRoleTSO PDRole = "tso" + // PDRoleResourceManager is the role of PD resource manager + PDRoleResourceManager PDRole = "resource manager" +) + // PDInstance represent a running pd-server type PDInstance struct { instance + Role PDRole initEndpoints []*PDInstance joinEndpoints []*PDInstance + pds []*PDInstance Process } // NewPDInstance return a PDInstance -func NewPDInstance(binPath, dir, host, configPath string, id, port int) *PDInstance { +func NewPDInstance(role PDRole, binPath, dir, host, configPath string, id int, pds []*PDInstance, port int) *PDInstance { if port <= 0 { port = 2379 } @@ -47,6 +63,8 @@ func NewPDInstance(binPath, dir, host, configPath string, id, port int) *PDInsta StatusPort: utils.MustGetFreePort(host, port), ConfigPath: configPath, }, + Role: role, + pds: pds, } } @@ -70,35 +88,67 @@ func (inst *PDInstance) Name() string { // Start calls set inst.cmd and Start func (inst *PDInstance) Start(ctx context.Context, version utils.Version) error { uid := inst.Name() - args := []string{ - "--name=" + uid, - fmt.Sprintf("--data-dir=%s", filepath.Join(inst.Dir, "data")), - fmt.Sprintf("--peer-urls=http://%s", utils.JoinHostPort(inst.Host, inst.Port)), - fmt.Sprintf("--advertise-peer-urls=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.Port)), - fmt.Sprintf("--client-urls=http://%s", utils.JoinHostPort(inst.Host, inst.StatusPort)), - fmt.Sprintf("--advertise-client-urls=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.StatusPort)), - fmt.Sprintf("--log-file=%s", inst.LogFile()), - } - if inst.ConfigPath != "" { - args = append(args, fmt.Sprintf("--config=%s", inst.ConfigPath)) - } - - switch { - case len(inst.initEndpoints) > 0: - endpoints := make([]string, 0) - for _, pd := range inst.initEndpoints { - uid := fmt.Sprintf("pd-%d", pd.ID) - endpoints = append(endpoints, fmt.Sprintf("%s=http://%s", uid, utils.JoinHostPort(AdvertiseHost(inst.Host), pd.Port))) + var args []string + switch inst.Role { + case PDRoleNormal, PDRoleAPI: + if inst.Role == PDRoleAPI { + args = []string{"services", "api"} + } + args = append(args, []string{ + "--name=" + uid, + fmt.Sprintf("--data-dir=%s", filepath.Join(inst.Dir, "data")), + fmt.Sprintf("--peer-urls=http://%s", utils.JoinHostPort(inst.Host, inst.Port)), + fmt.Sprintf("--advertise-peer-urls=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.Port)), + fmt.Sprintf("--client-urls=http://%s", utils.JoinHostPort(inst.Host, inst.StatusPort)), + fmt.Sprintf("--advertise-client-urls=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.StatusPort)), + fmt.Sprintf("--log-file=%s", inst.LogFile()), + }...) + if inst.ConfigPath != "" { + args = append(args, fmt.Sprintf("--config=%s", inst.ConfigPath)) + } + switch { + case len(inst.initEndpoints) > 0: + endpoints := make([]string, 0) + for _, pd := range inst.initEndpoints { + uid := fmt.Sprintf("pd-%d", pd.ID) + endpoints = append(endpoints, fmt.Sprintf("%s=http://%s", uid, utils.JoinHostPort(AdvertiseHost(inst.Host), pd.Port))) + } + args = append(args, fmt.Sprintf("--initial-cluster=%s", strings.Join(endpoints, ","))) + case len(inst.joinEndpoints) > 0: + endpoints := make([]string, 0) + for _, pd := range inst.joinEndpoints { + endpoints = append(endpoints, fmt.Sprintf("http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), pd.Port))) + } + args = append(args, fmt.Sprintf("--join=%s", strings.Join(endpoints, ","))) + default: + return errors.Errorf("must set the init or join instances") + } + case PDRoleTSO: + endpoints := pdEndpoints(inst.pds, true) + args = []string{ + "services", + "tso", + fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.Port)), + fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.Port)), + fmt.Sprintf("--backend-endpoints=%s", strings.Join(endpoints, ",")), + fmt.Sprintf("--log-file=%s", inst.LogFile()), + } + if inst.ConfigPath != "" { + args = append(args, fmt.Sprintf("--config=%s", inst.ConfigPath)) + } + case PDRoleResourceManager: + endpoints := pdEndpoints(inst.pds, true) + args = []string{ + "services", + "resource-manager", + fmt.Sprintf("--listen-addr=http://%s", utils.JoinHostPort(inst.Host, inst.Port)), + fmt.Sprintf("--advertise-listen-addr=http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), inst.Port)), + fmt.Sprintf("--backend-endpoints=%s", strings.Join(endpoints, ",")), + fmt.Sprintf("--log-file=%s", inst.LogFile()), } - args = append(args, fmt.Sprintf("--initial-cluster=%s", strings.Join(endpoints, ","))) - case len(inst.joinEndpoints) > 0: - endpoints := make([]string, 0) - for _, pd := range inst.joinEndpoints { - endpoints = append(endpoints, fmt.Sprintf("http://%s", utils.JoinHostPort(AdvertiseHost(inst.Host), pd.Port))) + if inst.ConfigPath != "" { + args = append(args, fmt.Sprintf("--config=%s", inst.ConfigPath)) } - args = append(args, fmt.Sprintf("--join=%s", strings.Join(endpoints, ","))) - default: - return errors.Errorf("must set the init or join instances") } var err error @@ -113,12 +163,12 @@ func (inst *PDInstance) Start(ctx context.Context, version utils.Version) error // Component return the component name. func (inst *PDInstance) Component() string { - return "pd" + return string(inst.Role) } // LogFile return the log file. func (inst *PDInstance) LogFile() string { - return filepath.Join(inst.Dir, "pd.log") + return filepath.Join(inst.Dir, fmt.Sprintf("%s.log", string(inst.Role))) } // Addr return the listen address of PD diff --git a/components/playground/main.go b/components/playground/main.go index 9ef6a16cdf..5768354e91 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -56,7 +56,10 @@ import ( type BootOptions struct { Mode string `yaml:"mode"` Version string `yaml:"version"` - PD instance.Config `yaml:"pd"` + PD instance.Config `yaml:"pd"` // ignored when mode == pd-ms + API instance.Config `yaml:"api"` // Only available when mode == pd-ms + TSO instance.Config `yaml:"tso"` // Only available when mode == pd-ms + RM instance.Config `yaml:"rc"` // Only available when mode == pd-ms TiDB instance.Config `yaml:"tidb"` TiKV instance.Config `yaml:"tikv"` TiFlash instance.Config `yaml:"tiflash"` // ignored when mode == tidb-disagg @@ -266,7 +269,7 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol }, } - rootCmd.Flags().StringVar(&options.Mode, "mode", "tidb", "TiUP playground mode: 'tidb', 'tidb-disagg', 'tikv-slim'") + rootCmd.Flags().StringVar(&options.Mode, "mode", "tidb", "TiUP playground mode: 'tidb', 'tidb-disagg', 'tikv-slim', 'pd-ms'") rootCmd.PersistentFlags().StringVarP(&tag, "tag", "T", "", "Specify a tag for playground") // Use `PersistentFlags()` to make it available to subcommands. rootCmd.Flags().Bool("without-monitor", false, "Don't start prometheus and grafana component") rootCmd.Flags().BoolVar(&options.Monitor, "monitor", true, "Start prometheus and grafana component") @@ -285,6 +288,10 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol rootCmd.Flags().IntVar(&options.Pump.Num, "pump", 0, "Pump instance number") rootCmd.Flags().IntVar(&options.Drainer.Num, "drainer", 0, "Drainer instance number") + rootCmd.Flags().IntVar(&options.API.Num, "api", 0, "API instance number") + rootCmd.Flags().IntVar(&options.TSO.Num, "tso", 0, "TSO instance number") + rootCmd.Flags().IntVar(&options.RM.Num, "rc", 0, "Resource manager instance number") + rootCmd.Flags().IntVar(&options.TiDB.UpTimeout, "db.timeout", 60, "TiDB max wait time in seconds for starting, 0 means no limit") rootCmd.Flags().IntVar(&options.TiFlash.UpTimeout, "tiflash.timeout", 120, "TiFlash max wait time in seconds for starting, 0 means no limit") @@ -309,6 +316,10 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol rootCmd.Flags().StringVar(&options.TiCDC.ConfigPath, "ticdc.config", "", "TiCDC instance configuration file") rootCmd.Flags().StringVar(&options.TiKVCDC.ConfigPath, "kvcdc.config", "", "TiKV-CDC instance configuration file") + rootCmd.Flags().StringVar(&options.API.ConfigPath, "api.config", "", "API instance configuration file") + rootCmd.Flags().StringVar(&options.TSO.ConfigPath, "tso.config", "", "TSO instance configuration file") + rootCmd.Flags().StringVar(&options.RM.ConfigPath, "rc.config", "", "Resource manager instance configuration file") + rootCmd.Flags().StringVar(&options.TiDB.BinPath, "db.binpath", "", "TiDB instance binary path") rootCmd.Flags().StringVar(&options.TiKV.BinPath, "kv.binpath", "", "TiKV instance binary path") rootCmd.Flags().StringVar(&options.PD.BinPath, "pd.binpath", "", "PD instance binary path") @@ -320,6 +331,10 @@ If you'd like to use a TiDB version other than %s, cancel and retry with the fol rootCmd.Flags().StringVar(&options.Pump.BinPath, "pump.binpath", "", "Pump instance binary path") rootCmd.Flags().StringVar(&options.Drainer.BinPath, "drainer.binpath", "", "Drainer instance binary path") + rootCmd.Flags().StringVar(&options.API.BinPath, "api.binpath", "", "API instance binary path") + rootCmd.Flags().StringVar(&options.TSO.BinPath, "tso.binpath", "", "TSO instance binary path") + rootCmd.Flags().StringVar(&options.RM.BinPath, "rc.binpath", "", "Resource manager instance binary path") + rootCmd.Flags().StringVar(&options.TiKVCDC.Version, "kvcdc.version", "", "TiKV-CDC instance version") rootCmd.Flags().StringVar(&options.DisaggOpts.S3Endpoint, "disagg.s3_endpoint", "127.0.0.1:9000", "Object store URL for the disaggregated TiFlash, available when --mode=tidb-disagg") @@ -374,6 +389,19 @@ func populateDefaultOpt(flagSet *pflag.FlagSet) error { defaultStr(&options.TiFlashCompute.BinPath, "tiflash.compute.binpath", options.TiFlash.BinPath) defaultStr(&options.TiFlashCompute.ConfigPath, "tiflash.compute.config", options.TiFlash.ConfigPath) options.TiFlashCompute.UpTimeout = options.TiFlash.UpTimeout + case "pd-ms": + defaultInt(&options.TiDB.Num, "db", 1) + defaultInt(&options.TiKV.Num, "kv", 1) + defaultInt(&options.API.Num, "api", 1) + defaultStr(&options.API.BinPath, "api.binpath", options.API.BinPath) + defaultStr(&options.API.ConfigPath, "api.config", options.API.ConfigPath) + defaultInt(&options.TSO.Num, "tso", 1) + defaultStr(&options.TSO.BinPath, "tso.binpath", options.TSO.BinPath) + defaultStr(&options.TSO.ConfigPath, "tso.config", options.TSO.ConfigPath) + defaultInt(&options.RM.Num, "rc", 1) + defaultStr(&options.RM.BinPath, "rc.binpath", options.RM.BinPath) + defaultStr(&options.RM.ConfigPath, "rc.config", options.RM.ConfigPath) + defaultInt(&options.TiFlash.Num, "tiflash", 1) default: return errors.Errorf("Unknown --mode %s", options.Mode) } diff --git a/components/playground/playground.go b/components/playground/playground.go index 51e0dedc9e..ff0848767e 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -430,7 +430,11 @@ func (p *Playground) sanitizeComponentConfig(cid string, cfg *instance.Config) e func (p *Playground) startInstance(ctx context.Context, inst instance.Instance) error { boundVersion := p.bindVersion(inst.Component(), p.bootOptions.Version) - version, err := environment.GlobalEnv().V1Repository().ResolveComponentVersion(inst.Component(), boundVersion) + component := inst.Component() + if component == string(instance.PDRoleAPI) || component == string(instance.PDRoleTSO) || component == string(instance.PDRoleResourceManager) { + component = string(instance.PDRoleNormal) + } + version, err := environment.GlobalEnv().V1Repository().ResolveComponentVersion(component, boundVersion) if err != nil { return err } @@ -469,7 +473,7 @@ func (p *Playground) handleScaleOut(w io.Writer, cmd *Command) error { return err } // TODO: Support scale-out in disaggregated mode - inst, err := p.addInstance(cmd.ComponentID, instance.TiFlashRoleNormal, cmd.Config) + inst, err := p.addInstance(cmd.ComponentID, instance.PDRoleNormal, instance.TiFlashRoleNormal, cmd.Config) if err != nil { return err } @@ -633,7 +637,7 @@ func (p *Playground) enableBinlog() bool { return p.bootOptions.Pump.Num > 0 } -func (p *Playground) addInstance(componentID string, tiflashRole instance.TiFlashRole, cfg instance.Config) (ins instance.Instance, err error) { +func (p *Playground) addInstance(componentID string, pdRole instance.PDRole, tiflashRole instance.TiFlashRole, cfg instance.Config) (ins instance.Instance, err error) { if cfg.BinPath != "" { cfg.BinPath, err = getAbsolutePath(cfg.BinPath) if err != nil { @@ -651,7 +655,7 @@ func (p *Playground) addInstance(componentID string, tiflashRole instance.TiFlas dataDir := p.dataDir id := p.allocID(componentID) - dir := filepath.Join(dataDir, fmt.Sprintf("%s-%d", componentID, id)) + dir := filepath.Join(dataDir, fmt.Sprintf("%s-%d", pdRole, id)) if err = utils.MkdirAll(dir, 0755); err != nil { return nil, err } @@ -663,16 +667,20 @@ func (p *Playground) addInstance(componentID string, tiflashRole instance.TiFlas switch componentID { case spec.ComponentPD: - inst := instance.NewPDInstance(cfg.BinPath, dir, host, cfg.ConfigPath, id, cfg.Port) + inst := instance.NewPDInstance(pdRole, cfg.BinPath, dir, host, cfg.ConfigPath, id, p.pds, cfg.Port) ins = inst - if p.booted { - inst.Join(p.pds) - p.pds = append(p.pds, inst) + if pdRole == instance.PDRoleNormal || pdRole == instance.PDRoleAPI { + if p.booted { + inst.Join(p.pds) + p.pds = append(p.pds, inst) + } else { + p.pds = append(p.pds, inst) + for _, pd := range p.pds { + pd.InitCluster(p.pds) + } + } } else { p.pds = append(p.pds, inst) - for _, pd := range p.pds { - pd.InitCluster(p.pds) - } } case spec.ComponentTiDB: inst := instance.NewTiDBInstance(cfg.BinPath, dir, host, cfg.ConfigPath, id, cfg.Port, p.pds, p.enableBinlog(), p.bootOptions.Mode == "tidb-disagg") @@ -801,6 +809,9 @@ func (p *Playground) bindVersion(comp string, version string) (bindVersion strin func (p *Playground) bootCluster(ctx context.Context, env *environment.Environment, options *BootOptions) error { for _, cfg := range []*instance.Config{ &options.PD, + &options.API, + &options.TSO, + &options.RM, &options.TiDB, &options.TiKV, &options.TiFlash, @@ -820,7 +831,7 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme p.bootOptions = options // All others components depend on the pd, we just ensure the pd count must be great than 0 - if options.PD.Num < 1 { + if options.Mode != "pd-ms" && options.PD.Num < 1 { return fmt.Errorf("all components count must be great than 0 (pd=%v)", options.PD.Num) } @@ -837,24 +848,26 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme type InstancePair struct { comp string + pdRole instance.PDRole tiflashRole instance.TiFlashRole instance.Config } instances := []InstancePair{ - {spec.ComponentPD, "", options.PD}, - {spec.ComponentTiKV, "", options.TiKV}, - {spec.ComponentPump, "", options.Pump}, - {spec.ComponentTiDB, "", options.TiDB}, - {spec.ComponentCDC, "", options.TiCDC}, - {spec.ComponentTiKVCDC, "", options.TiKVCDC}, - {spec.ComponentDrainer, "", options.Drainer}, + {spec.ComponentTiKV, "", "", options.TiKV}, + {spec.ComponentPump, "", "", options.Pump}, + {spec.ComponentTiDB, "", "", options.TiDB}, + {spec.ComponentCDC, "", "", options.TiCDC}, + {spec.ComponentTiKVCDC, "", "", options.TiKVCDC}, + {spec.ComponentDrainer, "", "", options.Drainer}, } if options.Mode == "tidb" { - instances = append( - instances, - InstancePair{spec.ComponentTiFlash, instance.TiFlashRoleNormal, options.TiFlash}, + instances = append([]InstancePair{{spec.ComponentPD, instance.PDRoleNormal, instance.TiFlashRoleNormal, options.PD}}, + instances..., + ) + instances = append(instances, + InstancePair{spec.ComponentTiFlash, instance.PDRoleNormal, instance.TiFlashRoleNormal, options.TiFlash}, ) } else if options.Mode == "tidb-disagg" { if !tidbver.TiDBSupportDisagg(options.Version) { @@ -889,17 +902,36 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme return fmt.Errorf("Disaggregate mode preflight check failed: Bucket %s doesn't exist", options.DisaggOpts.Bucket) } } - + instances = append([]InstancePair{{spec.ComponentPD, instance.PDRoleNormal, instance.TiFlashRoleNormal, options.PD}}, + instances..., + ) instances = append( instances, - InstancePair{spec.ComponentTiFlash, instance.TiFlashRoleDisaggWrite, options.TiFlashWrite}, - InstancePair{spec.ComponentTiFlash, instance.TiFlashRoleDisaggCompute, options.TiFlashCompute}, + InstancePair{spec.ComponentTiFlash, instance.PDRoleNormal, instance.TiFlashRoleDisaggWrite, options.TiFlashWrite}, + InstancePair{spec.ComponentTiFlash, instance.PDRoleNormal, instance.TiFlashRoleDisaggCompute, options.TiFlashCompute}, + ) + } else if options.Mode == "pd-ms" { + if !tidbver.PDSupportMicroServices(options.Version) { + return fmt.Errorf("PD cluster doesn't support microservices mode in version %s", options.Version) + } + instances = append([]InstancePair{ + {spec.ComponentPD, instance.PDRoleAPI, instance.TiFlashRoleNormal, options.API}, + {spec.ComponentPD, instance.PDRoleTSO, instance.TiFlashRoleNormal, options.TSO}, + {spec.ComponentPD, instance.PDRoleResourceManager, instance.TiFlashRoleNormal, options.RM}}, + instances..., + ) + instances = append(instances, + InstancePair{spec.ComponentTiFlash, instance.PDRoleNormal, instance.TiFlashRoleNormal, options.TiFlash}, + ) + } else { + instances = append([]InstancePair{{spec.ComponentPD, instance.PDRoleNormal, instance.TiFlashRoleNormal, options.PD}}, + instances..., ) } for _, inst := range instances { for i := 0; i < inst.Num; i++ { - _, err := p.addInstance(inst.comp, inst.tiflashRole, inst.Config) + _, err := p.addInstance(inst.comp, inst.pdRole, inst.tiflashRole, inst.Config) if err != nil { return err } diff --git a/pkg/tidbver/tidbver.go b/pkg/tidbver/tidbver.go index ff1d263426..8ac987645f 100644 --- a/pkg/tidbver/tidbver.go +++ b/pkg/tidbver/tidbver.go @@ -93,6 +93,11 @@ func TiDBSupportDisagg(version string) bool { return semver.Compare(version, "v7.0.0") >= 0 || strings.Contains(version, "nightly") } +// PDSupportMicroServices returns true if the given version of PD supports micro services. +func PDSupportMicroServices(version string) bool { + return semver.Compare(version, "v7.3.0") >= 0 || strings.Contains(version, "nightly") +} + // TiCDCSupportConfigFile return if given version of TiCDC support config file func TiCDCSupportConfigFile(version string) bool { // config support since v4.0.13, ignore v5.0.0-rc