From 9e0a52f473c38091af1b5dc9f0cc47f2afb7f20e Mon Sep 17 00:00:00 2001 From: nexustar Date: Mon, 18 Sep 2023 02:18:17 +0800 Subject: [PATCH] use version per component instead --- components/dm/spec/logic.go | 20 ++++++++ pkg/cluster/manager/upgrade.go | 16 +++--- pkg/cluster/spec/alertmanager.go | 23 ++++++--- pkg/cluster/spec/cdc.go | 16 +++++- pkg/cluster/spec/dashboard.go | 16 +++++- pkg/cluster/spec/drainer.go | 16 +++++- pkg/cluster/spec/grafana.go | 17 ++++++- pkg/cluster/spec/instance.go | 24 +++------ pkg/cluster/spec/monitoring.go | 17 ++++++- pkg/cluster/spec/pd.go | 16 +++++- pkg/cluster/spec/pump.go | 16 +++++- pkg/cluster/spec/spec.go | 86 +++++++++++++++++++++----------- pkg/cluster/spec/tidb.go | 16 +++++- pkg/cluster/spec/tiflash.go | 16 +++++- pkg/cluster/spec/tikv.go | 16 +++++- pkg/cluster/spec/tikv_cdc.go | 20 +++++--- pkg/cluster/spec/tispark.go | 22 ++++++++ 17 files changed, 293 insertions(+), 80 deletions(-) diff --git a/components/dm/spec/logic.go b/components/dm/spec/logic.go index b48db093cb..03e6017550 100644 --- a/components/dm/spec/logic.go +++ b/components/dm/spec/logic.go @@ -68,6 +68,16 @@ func (c *DMMasterComponent) Role() string { return ComponentDMMaster } +// CalculateVersion implements the Component interface +func (c *DMMasterComponent) CalculateVersion(clusterVersion string) string { + return clusterVersion +} + +// SetVersion implements Component interface. +func (c *DMMasterComponent) SetVersion(version string) { + // not supported now +} + // Instances implements Component interface. func (c *DMMasterComponent) Instances() []Instance { ins := make([]Instance, 0) @@ -271,6 +281,16 @@ func (c *DMWorkerComponent) Role() string { return ComponentDMWorker } +// CalculateVersion implements the Component interface +func (c *DMWorkerComponent) CalculateVersion(clusterVersion string) string { + return clusterVersion +} + +// SetVersion implements Component interface. +func (c *DMWorkerComponent) SetVersion(version string) { + // not supported now +} + // Instances implements Component interface. func (c *DMWorkerComponent) Instances() []Instance { ins := make([]Instance, 0) diff --git a/pkg/cluster/manager/upgrade.go b/pkg/cluster/manager/upgrade.go index 42754d1292..341f3601c9 100644 --- a/pkg/cluster/manager/upgrade.go +++ b/pkg/cluster/manager/upgrade.go @@ -101,16 +101,16 @@ Do you want to continue? [y/N]:`, hasImported := false for _, comp := range topo.ComponentsByUpdateOrder(base.Version) { - for _, inst := range comp.Instances() { - compName := inst.ComponentName() + compName := comp.Name() - // if component version is not specified, use the cluster version or latest("") - version := componentVersions[compName] - if version != "" { - inst.SetVersion(version) - } - version = inst.CalculateVersion(clusterVersion) + // if component version is not specified, use the cluster version or latest("") + version := componentVersions[compName] + if version != "" { + comp.SetVersion(version) + } + version = comp.CalculateVersion(clusterVersion) + for _, inst := range comp.Instances() { // Download component from repository key := fmt.Sprintf("%s-%s-%s-%s", compName, version, inst.OS(), inst.Arch()) if _, found := uniqueComps[key]; !found { diff --git a/pkg/cluster/spec/alertmanager.go b/pkg/cluster/spec/alertmanager.go index 1bb777a01f..075a2f70f6 100644 --- a/pkg/cluster/spec/alertmanager.go +++ b/pkg/cluster/spec/alertmanager.go @@ -32,7 +32,6 @@ type AlertmanagerSpec struct { Host string `yaml:"host"` ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` - Version string `yaml:"version,omitempty"` Imported bool `yaml:"imported,omitempty"` Patched bool `yaml:"patched,omitempty"` IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` @@ -99,6 +98,21 @@ func (c *AlertManagerComponent) Role() string { return RoleMonitor } +// CalculateVersion implements the Component interface +func (c *AlertManagerComponent) CalculateVersion(_ string) string { + // always not follow cluster version, use ""(latest) by default + version := c.Topology.BaseTopo().AlertManagerVersion + if version != nil { + return *version + } + return "" +} + +// SetVersion implements Component interface. +func (c *AlertManagerComponent) SetVersion(version string) { + *c.Topology.BaseTopo().AlertManagerVersion = version +} + // Instances implements Component interface. func (c *AlertManagerComponent) Instances() []Instance { alertmanagers := c.Topology.BaseTopo().Alertmanagers @@ -131,6 +145,7 @@ func (c *AlertManagerComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return UptimeByHost(s.GetManageHost(), s.WebPort, timeout, tlsCfg) }, + Component: c, }, topo: c.Topology, }) @@ -233,9 +248,3 @@ func (i *AlertManagerInstance) ScaleConfig( func (i *AlertManagerInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]any, paths meta.DirPaths) (map[string]any, error) { return nil, nil } - -// CalculateVersion implements the Instance interface -func (i *AlertManagerInstance) CalculateVersion(_ string) string { - // always not follow global version, use ""(latest) by default - return i.InstanceSpec.(*AlertmanagerSpec).Version -} diff --git a/pkg/cluster/spec/cdc.go b/pkg/cluster/spec/cdc.go index 102780353c..de818ddfbe 100644 --- a/pkg/cluster/spec/cdc.go +++ b/pkg/cluster/spec/cdc.go @@ -36,7 +36,6 @@ type CDCSpec struct { Host string `yaml:"host"` ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` - Version string `yaml:"version,omitempty"` Imported bool `yaml:"imported,omitempty"` Patched bool `yaml:"patched,omitempty"` IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` @@ -114,6 +113,20 @@ func (c *CDCComponent) Role() string { return ComponentCDC } +// CalculateVersion implements the Component interface +func (c *CDCComponent) CalculateVersion(clusterVersion string) string { + version := c.Topology.ComponentVersions.CDC + if version == "" { + version = clusterVersion + } + return version +} + +// SetVersion implements Component interface. +func (c *CDCComponent) SetVersion(version string) { + c.Topology.ComponentVersions.CDC = version +} + // Instances implements Component interface. func (c *CDCComponent) Instances() []Instance { ins := make([]Instance, 0, len(c.Topology.CDCServers)) @@ -140,6 +153,7 @@ func (c *CDCComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg) }, + Component: c, }, c.Topology} if s.DataDir != "" { instance.Dirs = append(instance.Dirs, s.DataDir) diff --git a/pkg/cluster/spec/dashboard.go b/pkg/cluster/spec/dashboard.go index 123e7a090e..7c12b9d0a0 100644 --- a/pkg/cluster/spec/dashboard.go +++ b/pkg/cluster/spec/dashboard.go @@ -31,7 +31,6 @@ type DashboardSpec struct { Host string `yaml:"host"` ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` - Version string `yaml:"version,omitempty"` Patched bool `yaml:"patched,omitempty"` IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` Port int `yaml:"port" default:"12333"` @@ -116,6 +115,20 @@ func (c *DashboardComponent) Role() string { return ComponentDashboard } +// CalculateVersion implements the Component interface +func (c *DashboardComponent) CalculateVersion(clusterVersion string) string { + version := c.Topology.ComponentVersions.Dashboard + if version == "" { + version = clusterVersion + } + return version +} + +// SetVersion implements Component interface. +func (c *DashboardComponent) SetVersion(version string) { + c.Topology.ComponentVersions.Dashboard = version +} + // Instances implements Component interface. func (c *DashboardComponent) Instances() []Instance { ins := make([]Instance, 0, len(c.Topology.Drainers)) @@ -141,6 +154,7 @@ func (c *DashboardComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg) }, + Component: c, }, c.Topology}) } return ins diff --git a/pkg/cluster/spec/drainer.go b/pkg/cluster/spec/drainer.go index f9686982dd..e81d284ccc 100644 --- a/pkg/cluster/spec/drainer.go +++ b/pkg/cluster/spec/drainer.go @@ -33,7 +33,6 @@ import ( type DrainerSpec struct { Host string `yaml:"host"` ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` - Version string `yaml:"version,omitempty"` SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` Imported bool `yaml:"imported,omitempty"` Patched bool `yaml:"patched,omitempty"` @@ -135,6 +134,20 @@ func (c *DrainerComponent) Role() string { return ComponentDrainer } +// CalculateVersion implements the Component interface +func (c *DrainerComponent) CalculateVersion(clusterVersion string) string { + version := c.Topology.ComponentVersions.Drainer + if version == "" { + version = clusterVersion + } + return version +} + +// SetVersion implements Component interface. +func (c *DrainerComponent) SetVersion(version string) { + c.Topology.ComponentVersions.Drainer = version +} + // Instances implements Component interface. func (c *DrainerComponent) Instances() []Instance { ins := make([]Instance, 0, len(c.Topology.Drainers)) @@ -160,6 +173,7 @@ func (c *DrainerComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg) }, + Component: c, }, c.Topology}) } return ins diff --git a/pkg/cluster/spec/grafana.go b/pkg/cluster/spec/grafana.go index eace8dd0f7..1b80c34d43 100644 --- a/pkg/cluster/spec/grafana.go +++ b/pkg/cluster/spec/grafana.go @@ -37,7 +37,6 @@ type GrafanaSpec struct { Host string `yaml:"host"` ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` - Version string `yaml:"version,omitempty"` Imported bool `yaml:"imported,omitempty"` Patched bool `yaml:"patched,omitempty"` IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` @@ -108,6 +107,21 @@ func (c *GrafanaComponent) Role() string { return RoleMonitor } +// CalculateVersion implements the Component interface +func (c *GrafanaComponent) CalculateVersion(_ string) string { + // always not follow cluster version, use ""(latest) by default + version := c.Topology.BaseTopo().GrafanaVersion + if version != nil { + return *version + } + return "" +} + +// SetVersion implements Component interface. +func (c *GrafanaComponent) SetVersion(version string) { + *c.Topology.BaseTopo().GrafanaVersion = version +} + // Instances implements Component interface. func (c *GrafanaComponent) Instances() []Instance { servers := c.BaseTopo().Grafanas @@ -136,6 +150,7 @@ func (c *GrafanaComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg) }, + Component: c, }, topo: c.Topology, }) diff --git a/pkg/cluster/spec/instance.go b/pkg/cluster/spec/instance.go index 34ae48d951..6298456a3a 100644 --- a/pkg/cluster/spec/instance.go +++ b/pkg/cluster/spec/instance.go @@ -71,6 +71,8 @@ type Component interface { Name() string Role() string Instances() []Instance + CalculateVersion(string) string + SetVersion(string) } // RollingUpdateInstance represent a instance need to transfer state when restart. @@ -109,7 +111,7 @@ type Instance interface { IsPatched() bool SetPatched(bool) CalculateVersion(string) string - SetVersion(string) + // SetVersion(string) setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]any, paths meta.DirPaths) (map[string]any, error) } @@ -151,6 +153,8 @@ type BaseInstance struct { Dirs []string StatusFn func(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdHosts ...string) string UptimeFn func(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration + + Component Component } // Ready implements Instance interface @@ -448,23 +452,7 @@ func (i *BaseInstance) SetPatched(p bool) { // CalculateVersion implements the Instance interface func (i *BaseInstance) CalculateVersion(globalVersion string) string { - v := reflect.Indirect(reflect.ValueOf(i.InstanceSpec)).FieldByName("Version") - if !v.IsValid() { - return globalVersion - } - if v.String() == "" { - return globalVersion - } - return v.String() -} - -// SetVersion implements the Instance interface -func (i *BaseInstance) SetVersion(version string) { - v := reflect.Indirect(reflect.ValueOf(i.InstanceSpec)).FieldByName("Version") - if !v.CanSet() { - return - } - v.SetString(version) + return i.Component.CalculateVersion(globalVersion) } // PrepareStart checks instance requirements before starting diff --git a/pkg/cluster/spec/monitoring.go b/pkg/cluster/spec/monitoring.go index 7a0954c1c6..8d3550a2c7 100644 --- a/pkg/cluster/spec/monitoring.go +++ b/pkg/cluster/spec/monitoring.go @@ -40,7 +40,6 @@ type PrometheusSpec struct { Host string `yaml:"host"` ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` - Version string `yaml:"version,omitempty"` Imported bool `yaml:"imported,omitempty"` Patched bool `yaml:"patched,omitempty"` IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` @@ -125,6 +124,21 @@ func (c *MonitorComponent) Role() string { return RoleMonitor } +// CalculateVersion implements the Component interface +func (c *MonitorComponent) CalculateVersion(_ string) string { + // always not follow cluster version, use ""(latest) by default + version := c.Topology.BaseTopo().PrometheusVersion + if version != nil { + return *version + } + return "" +} + +// SetVersion implements Component interface. +func (c *MonitorComponent) SetVersion(version string) { + *c.Topology.BaseTopo().PrometheusVersion = version +} + // Instances implements Component interface. func (c *MonitorComponent) Instances() []Instance { servers := c.BaseTopo().Monitors @@ -153,6 +167,7 @@ func (c *MonitorComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg) }, + Component: c, }, c.Topology} if s.NgPort > 0 { mi.BaseInstance.Ports = append(mi.BaseInstance.Ports, s.NgPort) diff --git a/pkg/cluster/spec/pd.go b/pkg/cluster/spec/pd.go index b5fa0887e5..24151208dc 100644 --- a/pkg/cluster/spec/pd.go +++ b/pkg/cluster/spec/pd.go @@ -37,7 +37,6 @@ type PDSpec struct { ListenHost string `yaml:"listen_host,omitempty"` AdvertiseClientAddr string `yaml:"advertise_client_addr,omitempty"` AdvertisePeerAddr string `yaml:"advertise_peer_addr,omitempty"` - Version string `yaml:"version,omitempty"` SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` Imported bool `yaml:"imported,omitempty"` Patched bool `yaml:"patched,omitempty"` @@ -160,6 +159,20 @@ func (c *PDComponent) Role() string { return ComponentPD } +// CalculateVersion implements the Component interface +func (c *PDComponent) CalculateVersion(clusterVersion string) string { + version := c.Topology.ComponentVersions.PD + if version != "" { + version = clusterVersion + } + return version +} + +// SetVersion implements Component interface. +func (c *PDComponent) SetVersion(version string) { + c.Topology.ComponentVersions.PD = version +} + // Instances implements Component interface. func (c *PDComponent) Instances() []Instance { ins := make([]Instance, 0, len(c.Topology.PDServers)) @@ -189,6 +202,7 @@ func (c *PDComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return UptimeByHost(s.GetManageHost(), s.ClientPort, timeout, tlsCfg) }, + Component: c, }, topo: c.Topology, }) diff --git a/pkg/cluster/spec/pump.go b/pkg/cluster/spec/pump.go index 1e1b680f9d..a7b6122d9b 100644 --- a/pkg/cluster/spec/pump.go +++ b/pkg/cluster/spec/pump.go @@ -34,7 +34,6 @@ type PumpSpec struct { Host string `yaml:"host"` ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` - Version string `yaml:"version,omitempty"` Imported bool `yaml:"imported,omitempty"` Patched bool `yaml:"patched,omitempty"` IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` @@ -134,6 +133,20 @@ func (c *PumpComponent) Role() string { return ComponentPump } +// CalculateVersion implements the Component interface +func (c *PumpComponent) CalculateVersion(clusterVersion string) string { + version := c.Topology.ComponentVersions.Pump + if version == "" { + version = clusterVersion + } + return version +} + +// SetVersion implements Component interface. +func (c *PumpComponent) SetVersion(version string) { + c.Topology.ComponentVersions.Pump = version +} + // Instances implements Component interface. func (c *PumpComponent) Instances() []Instance { ins := make([]Instance, 0, len(c.Topology.PumpServers)) @@ -159,6 +172,7 @@ func (c *PumpComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg) }, + Component: c, }, c.Topology}) } return ins diff --git a/pkg/cluster/spec/spec.go b/pkg/cluster/spec/spec.go index a6a76088d3..d0a7089973 100644 --- a/pkg/cluster/spec/spec.go +++ b/pkg/cluster/spec/spec.go @@ -119,25 +119,44 @@ type ( Grafana map[string]string `yaml:"grafana"` } + // ComponentVersions represents the versions of components + ComponentVersions struct { + TiDB string `yaml:"tidb"` + TiKV string `yaml:"tikv"` + TiFlash string `yaml:"tiflash"` + PD string `yaml:"pd"` + Dashboard string `yaml:"tidb_dashboard"` + Pump string `yaml:"pump"` + Drainer string `yaml:"drainer"` + CDC string `yaml:"cdc"` + TiKVCDC string `yaml:"kvcdc"` + Prometheus string `yaml:"prometheus"` + Grafana string `yaml:"grafana"` + AlertManager string `yaml:"alertmanager"` + NodeExporter string `yaml:"node_exporter"` + BlackboxExporter string `yaml:"blackbox_exporter"` + } + // Specification represents the specification of topology.yaml Specification struct { - GlobalOptions GlobalOptions `yaml:"global,omitempty" validate:"global:editable"` - MonitoredOptions MonitoredOptions `yaml:"monitored,omitempty" validate:"monitored:editable"` - ServerConfigs ServerConfigs `yaml:"server_configs,omitempty" validate:"server_configs:ignore"` - TiDBServers []*TiDBSpec `yaml:"tidb_servers"` - TiKVServers []*TiKVSpec `yaml:"tikv_servers"` - TiFlashServers []*TiFlashSpec `yaml:"tiflash_servers"` - PDServers []*PDSpec `yaml:"pd_servers"` - DashboardServers []*DashboardSpec `yaml:"tidb_dashboard_servers,omitempty"` - PumpServers []*PumpSpec `yaml:"pump_servers,omitempty"` - Drainers []*DrainerSpec `yaml:"drainer_servers,omitempty"` - CDCServers []*CDCSpec `yaml:"cdc_servers,omitempty"` - TiKVCDCServers []*TiKVCDCSpec `yaml:"kvcdc_servers,omitempty"` - TiSparkMasters []*TiSparkMasterSpec `yaml:"tispark_masters,omitempty"` - TiSparkWorkers []*TiSparkWorkerSpec `yaml:"tispark_workers,omitempty"` - Monitors []*PrometheusSpec `yaml:"monitoring_servers"` - Grafanas []*GrafanaSpec `yaml:"grafana_servers,omitempty"` - Alertmanagers []*AlertmanagerSpec `yaml:"alertmanager_servers,omitempty"` + GlobalOptions GlobalOptions `yaml:"global,omitempty" validate:"global:editable"` + MonitoredOptions MonitoredOptions `yaml:"monitored,omitempty" validate:"monitored:editable"` + ComponentVersions ComponentVersions `yaml:"component_versions,omitempty" validate:"component_versions:editable"` + ServerConfigs ServerConfigs `yaml:"server_configs,omitempty" validate:"server_configs:ignore"` + TiDBServers []*TiDBSpec `yaml:"tidb_servers"` + TiKVServers []*TiKVSpec `yaml:"tikv_servers"` + TiFlashServers []*TiFlashSpec `yaml:"tiflash_servers"` + PDServers []*PDSpec `yaml:"pd_servers"` + DashboardServers []*DashboardSpec `yaml:"tidb_dashboard_servers,omitempty"` + PumpServers []*PumpSpec `yaml:"pump_servers,omitempty"` + Drainers []*DrainerSpec `yaml:"drainer_servers,omitempty"` + CDCServers []*CDCSpec `yaml:"cdc_servers,omitempty"` + TiKVCDCServers []*TiKVCDCSpec `yaml:"kvcdc_servers,omitempty"` + TiSparkMasters []*TiSparkMasterSpec `yaml:"tispark_masters,omitempty"` + TiSparkWorkers []*TiSparkWorkerSpec `yaml:"tispark_workers,omitempty"` + Monitors []*PrometheusSpec `yaml:"monitoring_servers"` + Grafanas []*GrafanaSpec `yaml:"grafana_servers,omitempty"` + Alertmanagers []*AlertmanagerSpec `yaml:"alertmanager_servers,omitempty"` } ) @@ -147,9 +166,12 @@ type BaseTopo struct { MonitoredOptions *MonitoredOptions MasterList []string - Monitors []*PrometheusSpec - Grafanas []*GrafanaSpec - Alertmanagers []*AlertmanagerSpec + PrometheusVersion *string + GrafanaVersion *string + AlertManagerVersion *string + Monitors []*PrometheusSpec + Grafanas []*GrafanaSpec + Alertmanagers []*AlertmanagerSpec } // Topology represents specification of the cluster. @@ -254,12 +276,15 @@ func (s *Specification) Type() string { // BaseTopo implements Topology interface. func (s *Specification) BaseTopo() *BaseTopo { return &BaseTopo{ - GlobalOptions: &s.GlobalOptions, - MonitoredOptions: s.GetMonitoredOptions(), - MasterList: s.GetPDListWithManageHost(), - Monitors: s.Monitors, - Grafanas: s.Grafanas, - Alertmanagers: s.Alertmanagers, + GlobalOptions: &s.GlobalOptions, + MonitoredOptions: s.GetMonitoredOptions(), + MasterList: s.GetPDListWithManageHost(), + PrometheusVersion: &s.ComponentVersions.Prometheus, + GrafanaVersion: &s.ComponentVersions.Grafana, + AlertManagerVersion: &s.ComponentVersions.AlertManager, + Monitors: s.Monitors, + Grafanas: s.Grafanas, + Alertmanagers: s.Alertmanagers, } } @@ -532,15 +557,16 @@ func fillCustomDefaults(globalOptions *GlobalOptions, data any) error { } var ( - globalOptionTypeName = reflect.TypeOf(GlobalOptions{}).Name() - monitorOptionTypeName = reflect.TypeOf(MonitoredOptions{}).Name() - serverConfigsTypeName = reflect.TypeOf(ServerConfigs{}).Name() + globalOptionTypeName = reflect.TypeOf(GlobalOptions{}).Name() + monitorOptionTypeName = reflect.TypeOf(MonitoredOptions{}).Name() + serverConfigsTypeName = reflect.TypeOf(ServerConfigs{}).Name() + componentVersionsTypeName = reflect.TypeOf(ComponentVersions{}).Name() ) // Skip global/monitored options func isSkipField(field reflect.Value) bool { tp := field.Type().Name() - return tp == globalOptionTypeName || tp == monitorOptionTypeName || tp == serverConfigsTypeName + return tp == globalOptionTypeName || tp == monitorOptionTypeName || tp == serverConfigsTypeName || tp == componentVersionsTypeName } func setDefaultDir(parent, role, port string, field reflect.Value) { diff --git a/pkg/cluster/spec/tidb.go b/pkg/cluster/spec/tidb.go index d0a7be6c64..e94fe21011 100644 --- a/pkg/cluster/spec/tidb.go +++ b/pkg/cluster/spec/tidb.go @@ -35,7 +35,6 @@ type TiDBSpec struct { ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` ListenHost string `yaml:"listen_host,omitempty"` AdvertiseAddr string `yaml:"advertise_address,omitempty"` - Version string `yaml:"version,omitempty"` SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` Imported bool `yaml:"imported,omitempty"` Patched bool `yaml:"patched,omitempty"` @@ -111,6 +110,20 @@ func (c *TiDBComponent) Role() string { return ComponentTiDB } +// CalculateVersion implements the Component interface +func (c *TiDBComponent) CalculateVersion(clusterVersion string) string { + version := c.Topology.ComponentVersions.TiDB + if version == "" { + version = clusterVersion + } + return version +} + +// SetVersion implements Component interface. +func (c *TiDBComponent) SetVersion(version string) { + c.Topology.ComponentVersions.TiDB = version +} + // Instances implements Component interface. func (c *TiDBComponent) Instances() []Instance { ins := make([]Instance, 0, len(c.Topology.TiDBServers)) @@ -139,6 +152,7 @@ func (c *TiDBComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return UptimeByHost(s.GetManageHost(), s.StatusPort, timeout, tlsCfg) }, + Component: c, }, c.Topology}) } return ins diff --git a/pkg/cluster/spec/tiflash.go b/pkg/cluster/spec/tiflash.go index ac4f7c06fd..13d7f1696a 100644 --- a/pkg/cluster/spec/tiflash.go +++ b/pkg/cluster/spec/tiflash.go @@ -43,7 +43,6 @@ type TiFlashSpec struct { Host string `yaml:"host"` ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` - Version string `yaml:"version,omitempty"` Imported bool `yaml:"imported,omitempty"` Patched bool `yaml:"patched,omitempty"` IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` @@ -283,6 +282,20 @@ func (c *TiFlashComponent) Role() string { return ComponentTiFlash } +// CalculateVersion implements the Component interface +func (c *TiFlashComponent) CalculateVersion(clusterVersion string) string { + version := c.Topology.ComponentVersions.TiFlash + if version == "" { + version = clusterVersion + } + return version +} + +// SetVersion implements Component interface. +func (c *TiFlashComponent) SetVersion(version string) { + c.Topology.ComponentVersions.TiFlash = version +} + // Instances implements Component interface. func (c *TiFlashComponent) Instances() []Instance { ins := make([]Instance, 0, len(c.Topology.TiFlashServers)) @@ -312,6 +325,7 @@ func (c *TiFlashComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return UptimeByHost(s.GetManageHost(), s.StatusPort, timeout, tlsCfg) }, + Component: c, }, c.Topology}) } return ins diff --git a/pkg/cluster/spec/tikv.go b/pkg/cluster/spec/tikv.go index ce7ef82970..8fd4d511d0 100644 --- a/pkg/cluster/spec/tikv.go +++ b/pkg/cluster/spec/tikv.go @@ -51,7 +51,6 @@ type TiKVSpec struct { ListenHost string `yaml:"listen_host,omitempty"` AdvertiseAddr string `yaml:"advertise_addr,omitempty"` SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` - Version string `yaml:"version,omitempty"` Imported bool `yaml:"imported,omitempty"` Patched bool `yaml:"patched,omitempty"` IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` @@ -186,6 +185,20 @@ func (c *TiKVComponent) Role() string { return ComponentTiKV } +// CalculateVersion implements the Component interface +func (c *TiKVComponent) CalculateVersion(clusterVersion string) string { + version := c.Topology.ComponentVersions.TiKV + if version == "" { + version = clusterVersion + } + return version +} + +// SetVersion implements Component interface. +func (c *TiKVComponent) SetVersion(version string) { + c.Topology.ComponentVersions.TiKV = version +} + // Instances implements Component interface. func (c *TiKVComponent) Instances() []Instance { ins := make([]Instance, 0, len(c.Topology.TiKVServers)) @@ -213,6 +226,7 @@ func (c *TiKVComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return UptimeByHost(s.GetManageHost(), s.StatusPort, timeout, tlsCfg) }, + Component: c, }, c.Topology, 0}) } return ins diff --git a/pkg/cluster/spec/tikv_cdc.go b/pkg/cluster/spec/tikv_cdc.go index 2cfaaf5bae..51301e09c1 100644 --- a/pkg/cluster/spec/tikv_cdc.go +++ b/pkg/cluster/spec/tikv_cdc.go @@ -36,7 +36,6 @@ type TiKVCDCSpec struct { Host string `yaml:"host"` ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"` SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"` - Version string `yaml:"version,omitempty"` Imported bool `yaml:"imported,omitempty"` Patched bool `yaml:"patched,omitempty"` IgnoreExporter bool `yaml:"ignore_exporter,omitempty"` @@ -106,6 +105,18 @@ func (c *TiKVCDCComponent) Role() string { return ComponentTiKVCDC } +// CalculateVersion implements the Component interface +func (c *TiKVCDCComponent) CalculateVersion(clusterVersion string) string { + // always not follow global version, use ""(latest) by default + version := c.Topology.ComponentVersions.TiKVCDC + return version +} + +// SetVersion implements Component interface. +func (c *TiKVCDCComponent) SetVersion(version string) { + c.Topology.ComponentVersions.TiKVCDC = version +} + // GetSource returns source to download the component func (s *TiKVCDCSpec) GetSource() string { if s.Source == "" { @@ -140,6 +151,7 @@ func (c *TiKVCDCComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg) }, + Component: c, }, c.Topology} if s.DataDir != "" { instance.Dirs = append(instance.Dirs, s.DataDir) @@ -329,9 +341,3 @@ func (i *TiKVCDCInstance) PostRestart(ctx context.Context, topo Topology, tlsCfg logger.Debugf("tikv-cdc post-restart success, addr: %s, elapsed: %+v", address, time.Since(start)) return nil } - -// CalculateVersion implements the Instance interface -func (i *TiKVCDCInstance) CalculateVersion(_ string) string { - // always not follow global version, use ""(latest) by default - return i.InstanceSpec.(*TiKVCDCSpec).Version -} diff --git a/pkg/cluster/spec/tispark.go b/pkg/cluster/spec/tispark.go index bacd9f4b04..fac2306c91 100644 --- a/pkg/cluster/spec/tispark.go +++ b/pkg/cluster/spec/tispark.go @@ -141,6 +141,16 @@ func (c *TiSparkMasterComponent) Role() string { return RoleTiSparkMaster } +// CalculateVersion implements the Component interface +func (c *TiSparkMasterComponent) CalculateVersion(clusterVersion string) string { + return "" +} + +// SetVersion implements Component interface. +func (c *TiSparkMasterComponent) SetVersion(version string) { + // should never be calles +} + // Instances implements Component interface. func (c *TiSparkMasterComponent) Instances() []Instance { ins := make([]Instance, 0, len(c.Topology.TiSparkMasters)) @@ -168,6 +178,7 @@ func (c *TiSparkMasterComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return 0 }, + Component: c, }, topo: c.Topology, }) @@ -321,6 +332,16 @@ func (c *TiSparkWorkerComponent) Role() string { return RoleTiSparkWorker } +// CalculateVersion implements the Component interface +func (c *TiSparkWorkerComponent) CalculateVersion(clusterVersion string) string { + return "" +} + +// SetVersion implements Component interface. +func (c *TiSparkWorkerComponent) SetVersion(version string) { + // should never be called +} + // Instances implements Component interface. func (c *TiSparkWorkerComponent) Instances() []Instance { ins := make([]Instance, 0, len(c.Topology.TiSparkWorkers)) @@ -347,6 +368,7 @@ func (c *TiSparkWorkerComponent) Instances() []Instance { UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration { return 0 }, + Component: c, }, topo: c.Topology, })