Skip to content

Commit

Permalink
add global component_sources (#2320)
Browse files Browse the repository at this point in the history
  • Loading branch information
nexustar authored Nov 15, 2023
1 parent 520c05a commit c0abd84
Show file tree
Hide file tree
Showing 18 changed files with 171 additions and 105 deletions.
22 changes: 20 additions & 2 deletions components/dm/spec/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ func (c *DMMasterComponent) Role() string {
return ComponentDMMaster
}

// Source implements Component interface.
func (c *DMMasterComponent) Source() string {
source := c.Topology.ComponentSources.Master
if source != "" {
return source
}
return ComponentDMMaster
}

// CalculateVersion implements the Component interface
func (c *DMMasterComponent) CalculateVersion(clusterVersion string) string {
return clusterVersion
Expand All @@ -93,7 +102,7 @@ func (c *DMMasterComponent) Instances() []Instance {
ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost,
Port: s.Port,
SSHP: s.SSHPort,
Source: s.GetSource(),
Source: s.Source,

Ports: []int{
s.Port,
Expand Down Expand Up @@ -283,6 +292,15 @@ func (c *DMWorkerComponent) Role() string {
return ComponentDMWorker
}

// Source implements Component interface.
func (c *DMWorkerComponent) Source() string {
source := c.Topology.ComponentSources.Worker
if source != "" {
return source
}
return ComponentDMWorker
}

// CalculateVersion implements the Component interface
func (c *DMWorkerComponent) CalculateVersion(clusterVersion string) string {
return clusterVersion
Expand All @@ -308,7 +326,7 @@ func (c *DMWorkerComponent) Instances() []Instance {
ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost,
Port: s.Port,
SSHP: s.SSHPort,
Source: s.GetSource(),
Source: s.Source,

Ports: []int{
s.Port,
Expand Down
32 changes: 12 additions & 20 deletions components/dm/spec/topology_dm.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ const (
)

var (
globalOptionTypeName = reflect.TypeOf(GlobalOptions{}).Name()
monitorOptionTypeName = reflect.TypeOf(MonitoredOptions{}).Name()
serverConfigsTypeName = reflect.TypeOf(DMServerConfigs{}).Name()
globalOptionTypeName = reflect.TypeOf(GlobalOptions{}).Name()
monitorOptionTypeName = reflect.TypeOf(MonitoredOptions{}).Name()
serverConfigsTypeName = reflect.TypeOf(DMServerConfigs{}).Name()
componentSourcesTypeName = reflect.TypeOf(ComponentSources{}).Name()
)

func setDefaultDir(parent, role, port string, field reflect.Value) {
Expand Down Expand Up @@ -69,7 +70,7 @@ func isSkipField(field reflect.Value) bool {
field = field.Elem()
}
tp := field.Type().Name()
return tp == globalOptionTypeName || tp == monitorOptionTypeName || tp == serverConfigsTypeName
return tp == globalOptionTypeName || tp == monitorOptionTypeName || tp == serverConfigsTypeName || tp == componentSourcesTypeName
}

type (
Expand All @@ -95,10 +96,17 @@ type (
Grafana map[string]string `yaml:"grafana"`
}

// ComponentSources represents the source of components
ComponentSources struct {
Master string `yaml:"master,omitempty"`
Worker string `yaml:"worker,omitempty"`
}

// Specification represents the specification of topology.yaml
Specification struct {
GlobalOptions GlobalOptions `yaml:"global,omitempty" validate:"global:editable"`
MonitoredOptions *MonitoredOptions `yaml:"monitored,omitempty" validate:"monitored:editable"`
ComponentSources ComponentSources `yaml:"component_sources,omitempty" validate:"component_sources:editable"`
ServerConfigs DMServerConfigs `yaml:"server_configs,omitempty" validate:"server_configs:ignore"`
Masters []*MasterSpec `yaml:"master_servers"`
Workers []*WorkerSpec `yaml:"worker_servers"`
Expand Down Expand Up @@ -203,14 +211,6 @@ func (s *MasterSpec) GetAdvertisePeerURL(enableTLS bool) string {
return fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(s.Host, s.PeerPort))
}

// GetSource returns source to download the component
func (s *MasterSpec) GetSource() string {
if s.Source == "" {
return ComponentDMMaster
}
return s.Source
}

// WorkerSpec represents the Master topology specification in topology.yaml
type WorkerSpec struct {
Host string `yaml:"host"`
Expand Down Expand Up @@ -282,14 +282,6 @@ func (s *WorkerSpec) IgnoreMonitorAgent() bool {
return s.IgnoreExporter
}

// GetSource returns source to download the component
func (s *WorkerSpec) GetSource() string {
if s.Source == "" {
return ComponentDMWorker
}
return s.Source
}

// UnmarshalYAML sets default values when unmarshaling the topology file
func (s *Specification) UnmarshalYAML(unmarshal func(any) error) error {
type topology Specification
Expand Down
5 changes: 5 additions & 0 deletions pkg/cluster/spec/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func (c *AlertManagerComponent) Role() string {
return RoleMonitor
}

// Source implements Component interface.
func (c *AlertManagerComponent) Source() string {
return ComponentAlertmanager
}

// CalculateVersion implements the Component interface
func (c *AlertManagerComponent) CalculateVersion(_ string) string {
// always not follow cluster version, use ""(latest) by default
Expand Down
19 changes: 10 additions & 9 deletions pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,6 @@ func (s *CDCSpec) IgnoreMonitorAgent() bool {
return s.IgnoreExporter
}

// GetSource returns source to download the component
func (s *CDCSpec) GetSource() string {
if s.Source == "" {
return ComponentCDC
}
return s.Source
}

// CDCComponent represents CDC component.
type CDCComponent struct{ Topology *Specification }

Expand All @@ -113,6 +105,15 @@ func (c *CDCComponent) Role() string {
return ComponentCDC
}

// Source implements Component interface.
func (c *CDCComponent) Source() string {
source := c.Topology.ComponentSources.CDC
if source != "" {
return source
}
return ComponentCDC
}

// CalculateVersion implements the Component interface
func (c *CDCComponent) CalculateVersion(clusterVersion string) string {
version := c.Topology.ComponentVersions.CDC
Expand Down Expand Up @@ -140,7 +141,7 @@ func (c *CDCComponent) Instances() []Instance {
ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost,
Port: s.Port,
SSHP: s.SSHPort,
Source: s.GetSource(),
Source: s.Source,
NumaNode: s.NumaNode,
NumaCores: "",

Expand Down
17 changes: 9 additions & 8 deletions pkg/cluster/spec/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,6 @@ func (s *DashboardSpec) IgnoreMonitorAgent() bool {
return s.IgnoreExporter
}

// GetSource returns source to download the component
func (s *DashboardSpec) GetSource() string {
if s.Source == "" {
return ComponentDashboard
}
return s.Source
}

// DashboardComponent represents Drainer component.
type DashboardComponent struct{ Topology *Specification }

Expand All @@ -115,6 +107,15 @@ func (c *DashboardComponent) Role() string {
return ComponentDashboard
}

// Source implements Component interface.
func (c *DashboardComponent) Source() string {
source := c.Topology.ComponentSources.Dashboard
if source != "" {
return source
}
return ComponentDashboard
}

// CalculateVersion implements the Component interface
func (c *DashboardComponent) CalculateVersion(clusterVersion string) string {
version := c.Topology.ComponentVersions.Dashboard
Expand Down
19 changes: 10 additions & 9 deletions pkg/cluster/spec/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,6 @@ func (s *DrainerSpec) IgnoreMonitorAgent() bool {
return s.IgnoreExporter
}

// GetSource returns source to download the component
func (s *DrainerSpec) GetSource() string {
if s.Source == "" {
return ComponentDrainer
}
return s.Source
}

// DrainerComponent represents Drainer component.
type DrainerComponent struct{ Topology *Specification }

Expand All @@ -134,6 +126,15 @@ func (c *DrainerComponent) Role() string {
return ComponentDrainer
}

// Source implements Component interface.
func (c *DrainerComponent) Source() string {
source := c.Topology.ComponentSources.Drainer
if source != "" {
return source
}
return ComponentDrainer
}

// CalculateVersion implements the Component interface
func (c *DrainerComponent) CalculateVersion(clusterVersion string) string {
version := c.Topology.ComponentVersions.Drainer
Expand Down Expand Up @@ -161,7 +162,7 @@ func (c *DrainerComponent) Instances() []Instance {
ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost,
Port: s.Port,
SSHP: s.SSHPort,
Source: s.GetSource(),
Source: s.Source,
NumaNode: s.NumaNode,
NumaCores: "",

Expand Down
5 changes: 5 additions & 0 deletions pkg/cluster/spec/grafana.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func (c *GrafanaComponent) Role() string {
return RoleMonitor
}

// Source implements Component interface.
func (c *GrafanaComponent) Source() string {
return ComponentGrafana
}

// CalculateVersion implements the Component interface
func (c *GrafanaComponent) CalculateVersion(clusterVersion string) string {
// always not follow cluster version, use ""(latest) by default
Expand Down
9 changes: 6 additions & 3 deletions pkg/cluster/spec/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var (
type Component interface {
Name() string
Role() string
Source() string
Instances() []Instance
CalculateVersion(string) string
SetVersion(string)
Expand Down Expand Up @@ -316,10 +317,12 @@ func (i *BaseInstance) ComponentName() string {

// ComponentSource implements Instance interface
func (i *BaseInstance) ComponentSource() string {
if i.Source == "" {
return i.Name
if i.Source != "" {
return i.Source
} else if i.Component.Source() != "" {
return i.Component.Source()
}
return i.Source
return i.ComponentName()
}

// InstanceName implements Instance interface
Expand Down
5 changes: 5 additions & 0 deletions pkg/cluster/spec/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func (c *MonitorComponent) Role() string {
return RoleMonitor
}

// Source implements Component interface.
func (c *MonitorComponent) Source() string {
return ComponentPrometheus
}

// CalculateVersion implements the Component interface
func (c *MonitorComponent) CalculateVersion(clusterVersion string) string {
// always not follow cluster version, use ""(latest) by default
Expand Down
19 changes: 10 additions & 9 deletions pkg/cluster/spec/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,6 @@ func (s *PDSpec) GetAdvertisePeerURL(enableTLS bool) string {
return fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(s.Host, s.PeerPort))
}

// GetSource returns source to download the component
func (s *PDSpec) GetSource() string {
if s.Source == "" {
return ComponentPD
}
return s.Source
}

// PDComponent represents PD component.
type PDComponent struct{ Topology *Specification }

Expand All @@ -159,6 +151,15 @@ func (c *PDComponent) Role() string {
return ComponentPD
}

// Source implements Component interface.
func (c *PDComponent) Source() string {
source := c.Topology.ComponentSources.PD
if source != "" {
return source
}
return ComponentPD
}

// CalculateVersion implements the Component interface
func (c *PDComponent) CalculateVersion(clusterVersion string) string {
version := c.Topology.ComponentVersions.PD
Expand Down Expand Up @@ -188,7 +189,7 @@ func (c *PDComponent) Instances() []Instance {
ListenHost: utils.Ternary(s.ListenHost != "", s.ListenHost, c.Topology.BaseTopo().GlobalOptions.ListenHost).(string),
Port: s.ClientPort,
SSHP: s.SSHPort,
Source: s.GetSource(),
Source: s.Source,
NumaNode: s.NumaNode,
NumaCores: "",

Expand Down
19 changes: 10 additions & 9 deletions pkg/cluster/spec/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,6 @@ func (s *PumpSpec) IgnoreMonitorAgent() bool {
return s.IgnoreExporter
}

// GetSource returns source to download the component
func (s *PumpSpec) GetSource() string {
if s.Source == "" {
return ComponentPump
}
return s.Source
}

// PumpComponent represents Pump component.
type PumpComponent struct{ Topology *Specification }

Expand All @@ -133,6 +125,15 @@ func (c *PumpComponent) Role() string {
return ComponentPump
}

// Source implements Component interface.
func (c *PumpComponent) Source() string {
source := c.Topology.ComponentSources.Pump
if source != "" {
return source
}
return ComponentPump
}

// CalculateVersion implements the Component interface
func (c *PumpComponent) CalculateVersion(clusterVersion string) string {
version := c.Topology.ComponentVersions.Pump
Expand Down Expand Up @@ -160,7 +161,7 @@ func (c *PumpComponent) Instances() []Instance {
ListenHost: c.Topology.BaseTopo().GlobalOptions.ListenHost,
Port: s.Port,
SSHP: s.SSHPort,
Source: s.GetSource(),
Source: s.Source,
NumaNode: s.NumaNode,
NumaCores: "",

Expand Down
Loading

0 comments on commit c0abd84

Please sign in to comment.