Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add disable to independent default scheduler config #8531

Closed
wants to merge 19 commits into from
Closed
2 changes: 2 additions & 0 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ func (s *StoresInfo) PauseLeaderTransfer(storeID uint64) error {
if !store.AllowLeaderTransfer() {
return errs.ErrPauseLeaderTransfer.FastGenByArgs(storeID)
}
log.Info("pause leader transfer", zap.Uint64("store-id", storeID))
s.stores[storeID] = store.Clone(PauseLeaderTransfer())
return nil
}
Expand All @@ -801,6 +802,7 @@ func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64) {
zap.Uint64("store-id", storeID))
return
}
log.Info("resume leader transfer", zap.Uint64("store-id", storeID))
s.stores[storeID] = store.Clone(ResumeLeaderTransfer())
}

Expand Down
12 changes: 0 additions & 12 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,18 +647,6 @@ func (o *PersistConfig) SetMaxReplicas(replicas int) {
o.SetReplicationConfig(v)
}

// IsSchedulerDisabled returns if the scheduler is disabled.
func (o *PersistConfig) IsSchedulerDisabled(tp types.CheckerSchedulerType) bool {
oldType := types.SchedulerTypeCompatibleMap[tp]
schedulers := o.GetScheduleConfig().Schedulers
for _, s := range schedulers {
if oldType == s.Type {
return s.Disable
}
}
return false
}

// SetPlacementRulesCacheEnabled sets if the placement rules cache is enabled.
func (o *PersistConfig) SetPlacementRulesCacheEnabled(enabled bool) {
v := o.GetReplicationConfig().Clone()
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Watcher struct {
*PersistConfig
// Some data, like the scheduler configs, should be loaded into the storage
// to make sure the coordinator could access them correctly.
// It is a memory storage.
storage storage.Storage
// schedulersController is used to trigger the scheduler's config reloading.
// Store as `*schedulers.Controller`.
Expand Down
1 change: 0 additions & 1 deletion pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type SchedulerConfigProvider interface {
SetSchedulingAllowanceStatus(bool, string)
GetStoresLimit() map[uint64]StoreLimitConfig

IsSchedulerDisabled(types.CheckerSchedulerType) bool
AddSchedulerCfg(types.CheckerSchedulerType, []string)
RemoveSchedulerCfg(types.CheckerSchedulerType)
Persist(endpoint.ConfigStorage) error
Expand Down
22 changes: 11 additions & 11 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,16 +285,16 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
log.Error("the scheduler type not found", zap.String("scheduler-name", name), errs.ZapError(errs.ErrSchedulerNotFound))
continue
}
if cfg.Disable {
log.Info("skip create scheduler with independent configuration", zap.String("scheduler-name", name), zap.String("scheduler-type", cfg.Type), zap.Strings("scheduler-args", cfg.Args))
continue
}
s, err := schedulers.CreateScheduler(types.ConvertOldStrToType[cfg.Type], c.opController,
c.cluster.GetStorage(), schedulers.ConfigJSONDecoder([]byte(data)), c.schedulers.RemoveScheduler)
if err != nil {
log.Error("can not create scheduler with independent configuration", zap.String("scheduler-name", name), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
continue
}
if s.IsDisable() {
log.Info("skip create scheduler with independent configuration", zap.String("scheduler-name", name), zap.String("scheduler-type", cfg.Type), zap.Strings("scheduler-args", cfg.Args))
continue
}
if needRun {
log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddScheduler(s); err != nil {
Expand All @@ -313,13 +313,6 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
// The old way to create the scheduler.
k := 0
for _, schedulerCfg := range scheduleCfg.Schedulers {
if schedulerCfg.Disable {
scheduleCfg.Schedulers[k] = schedulerCfg
k++
log.Info("skip create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Strings("scheduler-args", schedulerCfg.Args))
continue
}

tp := types.ConvertOldStrToType[schedulerCfg.Type]
s, err := schedulers.CreateScheduler(tp, c.opController,
c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(tp, schedulerCfg.Args), c.schedulers.RemoveScheduler)
Expand All @@ -329,6 +322,13 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
continue
}

if s.IsDisable() {
scheduleCfg.Schedulers[k] = schedulerCfg
k++
log.Info("skip create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Strings("scheduler-args", schedulerCfg.Args))
continue
}

if needRun {
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()),
zap.Strings("scheduler-args", schedulerCfg.Args))
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/diagnostic/diagnostic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func (d *Manager) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
return res, nil
}
isDisabled := d.config.IsSchedulerDisabled(scheduler.Scheduler.GetType())
if isDisabled {

if scheduler.Scheduler.IsDisable() {
ts := uint64(time.Now().Unix())
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
return res, nil
Expand Down
12 changes: 2 additions & 10 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,11 +856,7 @@ func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (any, error)
case "disabled":
disabledSchedulers := make([]string, 0, len(schedulers))
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
return nil, err
}
if disabled {
if sc.GetScheduler(scheduler).IsDisable() {
disabledSchedulers = append(disabledSchedulers, scheduler)
}
}
Expand All @@ -871,11 +867,7 @@ func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (any, error)
// We should not return the disabled schedulers here.
enabledSchedulers := make([]string, 0, len(schedulers))
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
return nil, err
}
if !disabled {
if !sc.GetScheduler(scheduler).IsDisable() {
enabledSchedulers = append(enabledSchedulers, scheduler)
}
}
Expand Down
29 changes: 25 additions & 4 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/reflectutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/unrolled/render"
"go.uber.org/zap"
Expand All @@ -52,8 +51,7 @@ const (
)

type balanceLeaderSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig
baseDefaultSchedulerConfig

Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Expand Down Expand Up @@ -147,6 +145,9 @@ func (handler *balanceLeaderHandler) updateConfig(w http.ResponseWriter, r *http
}

func (handler *balanceLeaderHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
if handler.config.isDisable() {
handler.rd.JSON(w, http.StatusNotFound, errs.ErrSchedulerNotFound.Error())
}
conf := handler.config.clone()
handler.rd.JSON(w, http.StatusOK, conf)
}
Expand All @@ -164,7 +165,7 @@ type balanceLeaderScheduler struct {
// each store balanced.
func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceLeaderSchedulerConfig, options ...BalanceLeaderCreateOption) Scheduler {
s := &balanceLeaderScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceLeaderScheduler),
BaseScheduler: NewBaseScheduler(opController, types.BalanceLeaderScheduler, conf),
retryQuota: newRetryQuota(),
conf: conf,
handler: newBalanceLeaderHandler(conf),
Expand Down Expand Up @@ -370,6 +371,11 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
return result, collector.GetPlans()
}

// Clean implements the Scheduler interface.
func (l *balanceLeaderScheduler) Clean() error {
return l.conf.clean()
}

func createTransferLeaderOperator(cs *candidateStores, dir string, l *balanceLeaderScheduler,
ssolver *solver, usedRegions map[uint64]struct{}, collector *plan.Collector) *operator.Operator {
store := cs.getStore()
Expand Down Expand Up @@ -541,3 +547,18 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.
op.SetAdditionalInfo("targetScore", strconv.FormatFloat(solver.targetScore, 'f', 2, 64))
return op
}

// IsDiable implements the Scheduler interface.
func (l *balanceLeaderScheduler) IsDisable() bool {
return l.conf.isDisable()
}

// SetDiable implements the Scheduler interface.
func (l *balanceLeaderScheduler) SetDisable(disable bool) error {
return l.conf.setDisable(disable)
}

// IsDefault implements the Scheduler interface.
func (*balanceLeaderScheduler) IsDefault() bool {
return true
}
24 changes: 23 additions & 1 deletion pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
)

type balanceRegionSchedulerConfig struct {
baseDefaultSchedulerConfig

Ranges []core.KeyRange `json:"ranges"`
// TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler.
}
Expand All @@ -48,7 +50,7 @@ type balanceRegionScheduler struct {
// each store balanced.
func newBalanceRegionScheduler(opController *operator.Controller, conf *balanceRegionSchedulerConfig, opts ...BalanceRegionCreateOption) Scheduler {
scheduler := &balanceRegionScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceRegionScheduler),
BaseScheduler: NewBaseScheduler(opController, types.BalanceRegionScheduler, conf),
retryQuota: newRetryQuota(),
name: types.BalanceRegionScheduler.String(),
conf: conf,
Expand Down Expand Up @@ -79,6 +81,11 @@ func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(s.conf)
}

// Clean implements the Scheduler interface.
func (s *balanceRegionScheduler) Clean() error {
return s.conf.clean()
}

// IsScheduleAllowed implements the Scheduler interface.
func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
allowed := s.OpController.OperatorCount(operator.OpRegion) < cluster.GetSchedulerConfig().GetRegionScheduleLimit()
Expand Down Expand Up @@ -266,3 +273,18 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co
}
return nil
}

// IsDiable implements the Scheduler interface.
func (s *balanceRegionScheduler) IsDisable() bool {
return s.conf.isDisable()
}

// SetDiable implements the Scheduler interface.
func (s *balanceRegionScheduler) SetDisable(disable bool) error {
return s.conf.setDisable(disable)
}

// IsDefault implements the Scheduler interface.
func (*balanceRegionScheduler) IsDefault() bool {
return true
}
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ const (
)

type balanceWitnessSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig
syncutil.RWMutex

Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Expand Down Expand Up @@ -162,7 +162,7 @@ type balanceWitnessScheduler struct {
// each store balanced.
func newBalanceWitnessScheduler(opController *operator.Controller, conf *balanceWitnessSchedulerConfig, options ...BalanceWitnessCreateOption) Scheduler {
s := &balanceWitnessScheduler{
BaseScheduler: NewBaseScheduler(opController, types.BalanceWitnessScheduler),
BaseScheduler: NewBaseScheduler(opController, types.BalanceWitnessScheduler, conf),
retryQuota: newRetryQuota(),
conf: conf,
handler: newBalanceWitnessHandler(conf),
Expand Down
23 changes: 21 additions & 2 deletions pkg/schedule/schedulers/base_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,16 @@ type BaseScheduler struct {

name string
tp types.CheckerSchedulerType
conf schedulerConfig
}

// NewBaseScheduler returns a basic scheduler
func NewBaseScheduler(opController *operator.Controller, tp types.CheckerSchedulerType) *BaseScheduler {
return &BaseScheduler{OpController: opController, tp: tp}
func NewBaseScheduler(
opController *operator.Controller,
tp types.CheckerSchedulerType,
conf schedulerConfig,
) *BaseScheduler {
return &BaseScheduler{OpController: opController, tp: tp, conf: conf}
}

func (*BaseScheduler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
Expand Down Expand Up @@ -114,3 +119,17 @@ func (s *BaseScheduler) GetName() string {
func (s *BaseScheduler) GetType() types.CheckerSchedulerType {
return s.tp
}

// Clean implements the Scheduler interface.
func (s *BaseScheduler) Clean() error {
return s.conf.clean()
}

// IsDiable implements the Scheduler interface.
func (*BaseScheduler) IsDisable() bool { return false }

// SetDisable implements the Scheduler interface.
func (*BaseScheduler) SetDisable(bool) error { return nil }

// IsDefault returns if the scheduler is a default scheduler.
func (*BaseScheduler) IsDefault() bool { return false }
51 changes: 50 additions & 1 deletion pkg/schedule/schedulers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,22 @@ package schedulers
import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

type schedulerConfig interface {
init(name string, storage endpoint.ConfigStorage, data any)
save() error
load(any) error
init(name string, storage endpoint.ConfigStorage, data any)
clean() error
}

var _ schedulerConfig = &baseSchedulerConfig{}

type baseSchedulerConfig struct {
name string
storage endpoint.ConfigStorage
Expand All @@ -48,6 +55,7 @@ func (b *baseSchedulerConfig) save() error {
if err != nil {
return err
}
log.Info("save scheduler config", zap.String("scheduler-name", b.name), zap.String("config", string(data)))
return b.storage.SaveSchedulerConfig(b.name, data)
}

Expand All @@ -58,3 +66,44 @@ func (b *baseSchedulerConfig) load(v any) error {
}
return DecodeConfig([]byte(data), v)
}

func (b *baseSchedulerConfig) clean() error {
return b.storage.RemoveSchedulerConfig(b.name)
}

type baseDefaultSchedulerConfig struct {
schedulerConfig
syncutil.RWMutex

Disabled bool `json:"disabled"`
}

func newBaseDefaultSchedulerConfig() baseDefaultSchedulerConfig {
return baseDefaultSchedulerConfig{
schedulerConfig: &baseSchedulerConfig{},
}
}

func (b *baseDefaultSchedulerConfig) isDisable() bool {
b.Lock()
defer b.Unlock()
if err := b.load(b); err != nil {
log.Warn("failed to load scheduler config, maybe the config never persist", errs.ZapError(err))
}
return b.Disabled
}

func (b *baseDefaultSchedulerConfig) setDisable(disabled bool) error {
b.Lock()
defer b.Unlock()
b.Disabled = disabled
log.Info("set scheduler disable", zap.Bool("disabled", disabled))
return b.save()
}

func (b *baseDefaultSchedulerConfig) clean() error {
b.Lock()
defer b.Unlock()
b.Disabled = true
return b.save()
}
Loading
Loading