Skip to content

Commit

Permalink
deprecate the disable in old scheduler config
Browse files Browse the repository at this point in the history
Signed-off-by: okJiang <[email protected]>
  • Loading branch information
okJiang committed Aug 14, 2024
1 parent 3117f99 commit 2ab6151
Show file tree
Hide file tree
Showing 35 changed files with 294 additions and 139 deletions.
12 changes: 0 additions & 12 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,18 +647,6 @@ func (o *PersistConfig) SetMaxReplicas(replicas int) {
o.SetReplicationConfig(v)
}

// IsSchedulerDisabled returns if the scheduler is disabled.
func (o *PersistConfig) IsSchedulerDisabled(tp types.CheckerSchedulerType) bool {
oldType := types.SchedulerTypeCompatibleMap[tp]
schedulers := o.GetScheduleConfig().Schedulers
for _, s := range schedulers {
if oldType == s.Type {
return s.Disable
}
}
return false
}

// SetPlacementRulesCacheEnabled sets if the placement rules cache is enabled.
func (o *PersistConfig) SetPlacementRulesCacheEnabled(enabled bool) {
v := o.GetReplicationConfig().Clone()
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Watcher struct {
*PersistConfig
// Some data, like the scheduler configs, should be loaded into the storage
// to make sure the coordinator could access them correctly.
// It is a memory storage.
storage storage.Storage
// schedulersController is used to trigger the scheduler's config reloading.
// Store as `*schedulers.Controller`.
Expand Down
1 change: 0 additions & 1 deletion pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type SchedulerConfigProvider interface {
SetSchedulingAllowanceStatus(bool, string)
GetStoresLimit() map[uint64]StoreLimitConfig

IsSchedulerDisabled(types.CheckerSchedulerType) bool
AddSchedulerCfg(types.CheckerSchedulerType, []string)
RemoveSchedulerCfg(types.CheckerSchedulerType)
Persist(endpoint.ConfigStorage) error
Expand Down
22 changes: 11 additions & 11 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,16 +285,16 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
log.Error("the scheduler type not found", zap.String("scheduler-name", name), errs.ZapError(errs.ErrSchedulerNotFound))
continue
}
if cfg.Disable {
log.Info("skip create scheduler with independent configuration", zap.String("scheduler-name", name), zap.String("scheduler-type", cfg.Type), zap.Strings("scheduler-args", cfg.Args))
continue
}
s, err := schedulers.CreateScheduler(types.ConvertOldStrToType[cfg.Type], c.opController,
c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)), c.schedulers.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
continue
}
if s.IsDisable() {
log.Info("skip create scheduler with independent configuration", zap.String("scheduler-name", name), zap.String("scheduler-type", cfg.Type), zap.Strings("scheduler-args", cfg.Args))
continue
}
if needRun {
log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddScheduler(s); err != nil {
Expand All @@ -313,13 +313,6 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
// The old way to create the scheduler.
k := 0
for _, schedulerCfg := range scheduleCfg.Schedulers {
if schedulerCfg.Disable {
scheduleCfg.Schedulers[k] = schedulerCfg
k++
log.Info("skip create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Strings("scheduler-args", schedulerCfg.Args))
continue
}

tp := types.ConvertOldStrToType[schedulerCfg.Type]
s, err := schedulers.CreateScheduler(tp, c.opController,
c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(tp, schedulerCfg.Args), c.schedulers.RemoveScheduler)
Expand All @@ -329,6 +322,13 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
continue
}

if s.IsDisable() {
scheduleCfg.Schedulers[k] = schedulerCfg
k++
log.Info("skip create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Strings("scheduler-args", schedulerCfg.Args))
continue
}

if needRun {
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()),
zap.Strings("scheduler-args", schedulerCfg.Args))
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/diagnostic/diagnostic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func (d *Manager) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
return res, nil
}
isDisabled := d.config.IsSchedulerDisabled(scheduler.Scheduler.GetType())
if isDisabled {

if scheduler.Scheduler.IsDisable() {
ts := uint64(time.Now().Unix())
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
return res, nil
Expand Down
12 changes: 2 additions & 10 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,11 +855,7 @@ func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (any, error)
case "disabled":
disabledSchedulers := make([]string, 0, len(schedulers))
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
return nil, err
}
if disabled {
if sc.GetScheduler(scheduler).IsDisable() {
disabledSchedulers = append(disabledSchedulers, scheduler)
}
}
Expand All @@ -870,11 +866,7 @@ func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (any, error)
// We should not return the disabled schedulers here.
enabledSchedulers := make([]string, 0, len(schedulers))
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
return nil, err
}
if !disabled {
if !sc.GetScheduler(scheduler).IsDisable() {
enabledSchedulers = append(enabledSchedulers, scheduler)
}
}
Expand Down
21 changes: 17 additions & 4 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/utils/reflectutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/unrolled/render"
"go.uber.org/zap"
Expand All @@ -54,8 +53,7 @@ const (
)

type balanceLeaderSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig
defaultSchedulerConfig

Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Expand Down Expand Up @@ -166,7 +164,7 @@ type balanceLeaderScheduler struct {
// each store balanced.
func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceLeaderSchedulerConfig, options ...BalanceLeaderCreateOption) Scheduler {
s := &balanceLeaderScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceLeaderScheduler),
BaseScheduler: NewBaseScheduler(opController, types.BalanceLeaderScheduler, conf),
retryQuota: newRetryQuota(),
conf: conf,
handler: newBalanceLeaderHandler(conf),
Expand Down Expand Up @@ -543,3 +541,18 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.
op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64))
return op
}

// IsDiable implements the Scheduler interface.
func (l *balanceLeaderScheduler) IsDisable() bool {
l.conf.RLock()
defer l.conf.RUnlock()
return l.conf.isDisable()
}

// SetDiable implements the Scheduler interface.
func (l *balanceLeaderScheduler) SetDisable(disable bool) {
l.conf.RLock()
defer l.conf.RUnlock()
l.conf.setDisable(disable)
l.conf.save()

Check failure on line 557 in pkg/schedule/schedulers/balance_leader.go

View workflow job for this annotation

GitHub Actions / statics

Error return value of `l.conf.save` is not checked (errcheck)
}
19 changes: 18 additions & 1 deletion pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
)

type balanceRegionSchedulerConfig struct {
defaultSchedulerConfig

Ranges []core.KeyRange `json:"ranges"`
// TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler.
}
Expand All @@ -53,7 +55,7 @@ type balanceRegionScheduler struct {
// each store balanced.
func newBalanceRegionScheduler(opController *operator.Controller, conf *balanceRegionSchedulerConfig, opts ...BalanceRegionCreateOption) Scheduler {
scheduler := &balanceRegionScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceRegionScheduler),
BaseScheduler: NewBaseScheduler(opController, types.BalanceRegionScheduler, conf),
retryQuota: newRetryQuota(),
name: types.BalanceRegionScheduler.String(),
conf: conf,
Expand Down Expand Up @@ -271,3 +273,18 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co
}
return nil
}

// IsDiable implements the Scheduler interface.
func (s *balanceRegionScheduler) IsDisable() bool {
s.conf.Lock()
defer s.conf.Unlock()
return s.conf.isDisable()
}

// SetDiable implements the Scheduler interface.
func (s *balanceRegionScheduler) SetDisable(disable bool) {
s.conf.Lock()
defer s.conf.Unlock()
s.conf.setDisable(disable)
s.conf.save()

Check failure on line 289 in pkg/schedule/schedulers/balance_region.go

View workflow job for this annotation

GitHub Actions / statics

Error return value of `s.conf.save` is not checked (errcheck)
}
4 changes: 1 addition & 3 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/utils/reflectutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/unrolled/render"
"go.uber.org/zap"
)
Expand All @@ -51,7 +50,6 @@ const (
)

type balanceWitnessSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig

Ranges []core.KeyRange `json:"ranges"`
Expand Down Expand Up @@ -164,7 +162,7 @@ type balanceWitnessScheduler struct {
// each store balanced.
func newBalanceWitnessScheduler(opController *operator.Controller, conf *balanceWitnessSchedulerConfig, options ...BalanceWitnessCreateOption) Scheduler {
s := &balanceWitnessScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceWitnessScheduler),
BaseScheduler: NewBaseScheduler(opController, types.BalanceWitnessScheduler, conf),
retryQuota: newRetryQuota(),
conf: conf,
handler: newBalanceWitnessHandler(conf),
Expand Down
19 changes: 16 additions & 3 deletions pkg/schedule/schedulers/base_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,16 @@ type BaseScheduler struct {

name string
tp types.CheckerSchedulerType
conf schedulerConfig
}

// NewBaseScheduler returns a basic scheduler
func NewBaseScheduler(opController *operator.Controller, tp types.CheckerSchedulerType) *BaseScheduler {
return &BaseScheduler{OpController: opController, tp: tp}
func NewBaseScheduler(
opController *operator.Controller,
tp types.CheckerSchedulerType,
conf schedulerConfig,
) *BaseScheduler {
return &BaseScheduler{OpController: opController, tp: tp, conf: conf}
}

func (*BaseScheduler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
Expand Down Expand Up @@ -100,7 +105,9 @@ func (*BaseScheduler) GetNextInterval(interval time.Duration) time.Duration {
func (*BaseScheduler) PrepareConfig(sche.SchedulerCluster) error { return nil }

// CleanConfig does some cleanup work about config.
func (*BaseScheduler) CleanConfig(sche.SchedulerCluster) {}
func (s *BaseScheduler) CleanConfig(sche.SchedulerCluster) error {
return s.conf.clean()
}

// GetName returns the name of the scheduler
func (s *BaseScheduler) GetName() string {
Expand All @@ -114,3 +121,9 @@ func (s *BaseScheduler) GetName() string {
func (s *BaseScheduler) GetType() types.CheckerSchedulerType {
return s.tp
}

// IsDiable implements the Scheduler interface.
func (*BaseScheduler) IsDisable() bool { return false }

// SetDisable implements the Scheduler interface.
func (*BaseScheduler) SetDisable(bool) {}
61 changes: 60 additions & 1 deletion pkg/schedule/schedulers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,28 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
)

type schedulerConfig interface {
syncutil.RWMutexInterface

Check failure on line 25 in pkg/schedule/schedulers/config.go

View workflow job for this annotation

GitHub Actions / tso-function-test

undefined: syncutil.RWMutexInterface

Check failure on line 25 in pkg/schedule/schedulers/config.go

View workflow job for this annotation

GitHub Actions / chunks (1, Unit Test(1))

undefined: syncutil.RWMutexInterface

init(name string, storage endpoint.ConfigStorage, data any)
save() error
load(any) error
init(name string, storage endpoint.ConfigStorage, data any)
clean() error
// setArgs([]string)
// getArgs() []string
}

type baseSchedulerConfig struct {
syncutil.RWMutex

name string
storage endpoint.ConfigStorage

// Args is the input arguments of the scheduler.
// Args []string `json:"args"`
// data is the config of the scheduler.
data any
}
Expand Down Expand Up @@ -58,3 +68,52 @@ func (b *baseSchedulerConfig) load(v any) error {
}
return DecodeConfig([]byte(data), v)
}

func (b *baseSchedulerConfig) clean() error {
return b.storage.RemoveSchedulerConfig(b.name)
}

// func (b *baseSchedulerConfig) setArgs(args []string) {
// b.Args = args
// }

// func (b *baseSchedulerConfig) getArgs() []string {
// return b.Args
// }

type defaultSchedulerConfig interface {
schedulerConfig

isDisable() bool
setDisable(bool)
}

type baseDefaultSchedulerConfig struct {
schedulerConfig

Disabled bool `json:"disabled"`
}

func newBaseDefaultSchedulerConfig() *baseDefaultSchedulerConfig {
return &baseDefaultSchedulerConfig{
schedulerConfig: &baseSchedulerConfig{},
}
}

func (b *baseDefaultSchedulerConfig) isDisable() bool {
b.load(b)

Check failure on line 104 in pkg/schedule/schedulers/config.go

View workflow job for this annotation

GitHub Actions / statics

Error return value of `b.load` is not checked (errcheck)
return b.Disabled
}

func (b *baseDefaultSchedulerConfig) setDisable(disabled bool) {
b.Lock()
defer b.Unlock()
b.Disabled = disabled
}

func (b *baseDefaultSchedulerConfig) clean() error {
b.Lock()
defer b.Unlock()
b.Disabled = false
return b.save()
}
23 changes: 23 additions & 0 deletions pkg/schedule/schedulers/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,26 @@ func TestSchedulerConfig(t *testing.T) {
// report error because the config is empty and cannot be decoded
require.Error(t, cfg2.load(newTc))
}

func TestDefaultSchedulerConfig(t *testing.T) {
s := storage.NewStorageWithMemoryBackend()

type testConfig struct {
defaultSchedulerConfig
Value string `json:"value"`
}

cfg := &testConfig{
defaultSchedulerConfig: newBaseDefaultSchedulerConfig(),
}
cfg.init("test", s, cfg)
require.False(t, cfg.isDisable())
cfg.setDisable(true)
require.True(t, cfg.isDisable())

cfg2 := &testConfig{
defaultSchedulerConfig: newBaseDefaultSchedulerConfig(),
}
cfg.init("test", s, cfg2)
require.True(t, cfg.isDisable())
}
Loading

0 comments on commit 2ab6151

Please sign in to comment.