Skip to content

Commit

Permalink
use version per component instead
Browse files Browse the repository at this point in the history
  • Loading branch information
nexustar committed Sep 17, 2023
1 parent e899ab5 commit 9e0a52f
Show file tree
Hide file tree
Showing 17 changed files with 293 additions and 80 deletions.
20 changes: 20 additions & 0 deletions components/dm/spec/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ func (c *DMMasterComponent) Role() string {
return ComponentDMMaster
}

// CalculateVersion implements the Component interface
func (c *DMMasterComponent) CalculateVersion(clusterVersion string) string {
return clusterVersion
}

// SetVersion implements Component interface.
func (c *DMMasterComponent) SetVersion(version string) {
// not supported now
}

// Instances implements Component interface.
func (c *DMMasterComponent) Instances() []Instance {
ins := make([]Instance, 0)
Expand Down Expand Up @@ -271,6 +281,16 @@ func (c *DMWorkerComponent) Role() string {
return ComponentDMWorker
}

// CalculateVersion implements the Component interface
func (c *DMWorkerComponent) CalculateVersion(clusterVersion string) string {
return clusterVersion
}

// SetVersion implements Component interface.
func (c *DMWorkerComponent) SetVersion(version string) {
// not supported now
}

// Instances implements Component interface.
func (c *DMWorkerComponent) Instances() []Instance {
ins := make([]Instance, 0)
Expand Down
16 changes: 8 additions & 8 deletions pkg/cluster/manager/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ Do you want to continue? [y/N]:`,

hasImported := false
for _, comp := range topo.ComponentsByUpdateOrder(base.Version) {
for _, inst := range comp.Instances() {
compName := inst.ComponentName()
compName := comp.Name()

// if component version is not specified, use the cluster version or latest("")
version := componentVersions[compName]
if version != "" {
inst.SetVersion(version)
}
version = inst.CalculateVersion(clusterVersion)
// if component version is not specified, use the cluster version or latest("")
version := componentVersions[compName]
if version != "" {
comp.SetVersion(version)
}
version = comp.CalculateVersion(clusterVersion)

for _, inst := range comp.Instances() {
// Download component from repository
key := fmt.Sprintf("%s-%s-%s-%s", compName, version, inst.OS(), inst.Arch())
if _, found := uniqueComps[key]; !found {
Expand Down
23 changes: 16 additions & 7 deletions pkg/cluster/spec/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type AlertmanagerSpec struct {
Host string `yaml:"host"`
ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
Version string `yaml:"version,omitempty"`
Imported bool `yaml:"imported,omitempty"`
Patched bool `yaml:"patched,omitempty"`
IgnoreExporter bool `yaml:"ignore_exporter,omitempty"`
Expand Down Expand Up @@ -99,6 +98,21 @@ func (c *AlertManagerComponent) Role() string {
return RoleMonitor
}

// CalculateVersion implements the Component interface
func (c *AlertManagerComponent) CalculateVersion(_ string) string {
// always not follow cluster version, use ""(latest) by default
version := c.Topology.BaseTopo().AlertManagerVersion
if version != nil {
return *version
}
return ""
}

// SetVersion implements Component interface.
func (c *AlertManagerComponent) SetVersion(version string) {
*c.Topology.BaseTopo().AlertManagerVersion = version
}

// Instances implements Component interface.
func (c *AlertManagerComponent) Instances() []Instance {
alertmanagers := c.Topology.BaseTopo().Alertmanagers
Expand Down Expand Up @@ -131,6 +145,7 @@ func (c *AlertManagerComponent) Instances() []Instance {
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.GetManageHost(), s.WebPort, timeout, tlsCfg)
},
Component: c,
},
topo: c.Topology,
})
Expand Down Expand Up @@ -233,9 +248,3 @@ func (i *AlertManagerInstance) ScaleConfig(
func (i *AlertManagerInstance) setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]any, paths meta.DirPaths) (map[string]any, error) {
return nil, nil
}

// CalculateVersion implements the Instance interface
func (i *AlertManagerInstance) CalculateVersion(_ string) string {
// always not follow global version, use ""(latest) by default
return i.InstanceSpec.(*AlertmanagerSpec).Version
}
16 changes: 15 additions & 1 deletion pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type CDCSpec struct {
Host string `yaml:"host"`
ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
Version string `yaml:"version,omitempty"`
Imported bool `yaml:"imported,omitempty"`
Patched bool `yaml:"patched,omitempty"`
IgnoreExporter bool `yaml:"ignore_exporter,omitempty"`
Expand Down Expand Up @@ -114,6 +113,20 @@ func (c *CDCComponent) Role() string {
return ComponentCDC
}

// CalculateVersion implements the Component interface
func (c *CDCComponent) CalculateVersion(clusterVersion string) string {
version := c.Topology.ComponentVersions.CDC
if version == "" {
version = clusterVersion
}
return version
}

// SetVersion implements Component interface.
func (c *CDCComponent) SetVersion(version string) {
c.Topology.ComponentVersions.CDC = version
}

// Instances implements Component interface.
func (c *CDCComponent) Instances() []Instance {
ins := make([]Instance, 0, len(c.Topology.CDCServers))
Expand All @@ -140,6 +153,7 @@ func (c *CDCComponent) Instances() []Instance {
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg)
},
Component: c,
}, c.Topology}
if s.DataDir != "" {
instance.Dirs = append(instance.Dirs, s.DataDir)
Expand Down
16 changes: 15 additions & 1 deletion pkg/cluster/spec/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type DashboardSpec struct {
Host string `yaml:"host"`
ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
Version string `yaml:"version,omitempty"`
Patched bool `yaml:"patched,omitempty"`
IgnoreExporter bool `yaml:"ignore_exporter,omitempty"`
Port int `yaml:"port" default:"12333"`
Expand Down Expand Up @@ -116,6 +115,20 @@ func (c *DashboardComponent) Role() string {
return ComponentDashboard
}

// CalculateVersion implements the Component interface
func (c *DashboardComponent) CalculateVersion(clusterVersion string) string {
version := c.Topology.ComponentVersions.Dashboard
if version == "" {
version = clusterVersion
}
return version
}

// SetVersion implements Component interface.
func (c *DashboardComponent) SetVersion(version string) {
c.Topology.ComponentVersions.Dashboard = version
}

// Instances implements Component interface.
func (c *DashboardComponent) Instances() []Instance {
ins := make([]Instance, 0, len(c.Topology.Drainers))
Expand All @@ -141,6 +154,7 @@ func (c *DashboardComponent) Instances() []Instance {
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg)
},
Component: c,
}, c.Topology})
}
return ins
Expand Down
16 changes: 15 additions & 1 deletion pkg/cluster/spec/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
type DrainerSpec struct {
Host string `yaml:"host"`
ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"`
Version string `yaml:"version,omitempty"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
Imported bool `yaml:"imported,omitempty"`
Patched bool `yaml:"patched,omitempty"`
Expand Down Expand Up @@ -135,6 +134,20 @@ func (c *DrainerComponent) Role() string {
return ComponentDrainer
}

// CalculateVersion implements the Component interface
func (c *DrainerComponent) CalculateVersion(clusterVersion string) string {
version := c.Topology.ComponentVersions.Drainer
if version == "" {
version = clusterVersion
}
return version
}

// SetVersion implements Component interface.
func (c *DrainerComponent) SetVersion(version string) {
c.Topology.ComponentVersions.Drainer = version
}

// Instances implements Component interface.
func (c *DrainerComponent) Instances() []Instance {
ins := make([]Instance, 0, len(c.Topology.Drainers))
Expand All @@ -160,6 +173,7 @@ func (c *DrainerComponent) Instances() []Instance {
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg)
},
Component: c,
}, c.Topology})
}
return ins
Expand Down
17 changes: 16 additions & 1 deletion pkg/cluster/spec/grafana.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type GrafanaSpec struct {
Host string `yaml:"host"`
ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
Version string `yaml:"version,omitempty"`
Imported bool `yaml:"imported,omitempty"`
Patched bool `yaml:"patched,omitempty"`
IgnoreExporter bool `yaml:"ignore_exporter,omitempty"`
Expand Down Expand Up @@ -108,6 +107,21 @@ func (c *GrafanaComponent) Role() string {
return RoleMonitor
}

// CalculateVersion implements the Component interface
func (c *GrafanaComponent) CalculateVersion(_ string) string {
// always not follow cluster version, use ""(latest) by default
version := c.Topology.BaseTopo().GrafanaVersion
if version != nil {
return *version
}
return ""
}

// SetVersion implements Component interface.
func (c *GrafanaComponent) SetVersion(version string) {
*c.Topology.BaseTopo().GrafanaVersion = version
}

// Instances implements Component interface.
func (c *GrafanaComponent) Instances() []Instance {
servers := c.BaseTopo().Grafanas
Expand Down Expand Up @@ -136,6 +150,7 @@ func (c *GrafanaComponent) Instances() []Instance {
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg)
},
Component: c,
},
topo: c.Topology,
})
Expand Down
24 changes: 6 additions & 18 deletions pkg/cluster/spec/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type Component interface {
Name() string
Role() string
Instances() []Instance
CalculateVersion(string) string
SetVersion(string)
}

// RollingUpdateInstance represent a instance need to transfer state when restart.
Expand Down Expand Up @@ -109,7 +111,7 @@ type Instance interface {
IsPatched() bool
SetPatched(bool)
CalculateVersion(string) string
SetVersion(string)
// SetVersion(string)
setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]any, paths meta.DirPaths) (map[string]any, error)
}

Expand Down Expand Up @@ -151,6 +153,8 @@ type BaseInstance struct {
Dirs []string
StatusFn func(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdHosts ...string) string
UptimeFn func(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration

Component Component
}

// Ready implements Instance interface
Expand Down Expand Up @@ -448,23 +452,7 @@ func (i *BaseInstance) SetPatched(p bool) {

// CalculateVersion implements the Instance interface
func (i *BaseInstance) CalculateVersion(globalVersion string) string {
v := reflect.Indirect(reflect.ValueOf(i.InstanceSpec)).FieldByName("Version")
if !v.IsValid() {
return globalVersion
}
if v.String() == "" {
return globalVersion
}
return v.String()
}

// SetVersion implements the Instance interface
func (i *BaseInstance) SetVersion(version string) {
v := reflect.Indirect(reflect.ValueOf(i.InstanceSpec)).FieldByName("Version")
if !v.CanSet() {
return
}
v.SetString(version)
return i.Component.CalculateVersion(globalVersion)
}

// PrepareStart checks instance requirements before starting
Expand Down
17 changes: 16 additions & 1 deletion pkg/cluster/spec/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type PrometheusSpec struct {
Host string `yaml:"host"`
ManageHost string `yaml:"manage_host,omitempty" validate:"manage_host:editable"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
Version string `yaml:"version,omitempty"`
Imported bool `yaml:"imported,omitempty"`
Patched bool `yaml:"patched,omitempty"`
IgnoreExporter bool `yaml:"ignore_exporter,omitempty"`
Expand Down Expand Up @@ -125,6 +124,21 @@ func (c *MonitorComponent) Role() string {
return RoleMonitor
}

// CalculateVersion implements the Component interface
func (c *MonitorComponent) CalculateVersion(_ string) string {
// always not follow cluster version, use ""(latest) by default
version := c.Topology.BaseTopo().PrometheusVersion
if version != nil {
return *version
}
return ""
}

// SetVersion implements Component interface.
func (c *MonitorComponent) SetVersion(version string) {
*c.Topology.BaseTopo().PrometheusVersion = version
}

// Instances implements Component interface.
func (c *MonitorComponent) Instances() []Instance {
servers := c.BaseTopo().Monitors
Expand Down Expand Up @@ -153,6 +167,7 @@ func (c *MonitorComponent) Instances() []Instance {
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.GetManageHost(), s.Port, timeout, tlsCfg)
},
Component: c,
}, c.Topology}
if s.NgPort > 0 {
mi.BaseInstance.Ports = append(mi.BaseInstance.Ports, s.NgPort)
Expand Down
16 changes: 15 additions & 1 deletion pkg/cluster/spec/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type PDSpec struct {
ListenHost string `yaml:"listen_host,omitempty"`
AdvertiseClientAddr string `yaml:"advertise_client_addr,omitempty"`
AdvertisePeerAddr string `yaml:"advertise_peer_addr,omitempty"`
Version string `yaml:"version,omitempty"`
SSHPort int `yaml:"ssh_port,omitempty" validate:"ssh_port:editable"`
Imported bool `yaml:"imported,omitempty"`
Patched bool `yaml:"patched,omitempty"`
Expand Down Expand Up @@ -160,6 +159,20 @@ func (c *PDComponent) Role() string {
return ComponentPD
}

// CalculateVersion implements the Component interface
func (c *PDComponent) CalculateVersion(clusterVersion string) string {
version := c.Topology.ComponentVersions.PD
if version != "" {
version = clusterVersion
}
return version
}

// SetVersion implements Component interface.
func (c *PDComponent) SetVersion(version string) {
c.Topology.ComponentVersions.PD = version
}

// Instances implements Component interface.
func (c *PDComponent) Instances() []Instance {
ins := make([]Instance, 0, len(c.Topology.PDServers))
Expand Down Expand Up @@ -189,6 +202,7 @@ func (c *PDComponent) Instances() []Instance {
UptimeFn: func(_ context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration {
return UptimeByHost(s.GetManageHost(), s.ClientPort, timeout, tlsCfg)
},
Component: c,
},
topo: c.Topology,
})
Expand Down
Loading

0 comments on commit 9e0a52f

Please sign in to comment.