Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 2be2c5a
Merge: 21914c5 3117f99
Author: okJiang <[email protected]>
Date:   Wed Aug 14 13:11:03 2024 +0800

    Merge branch 'master' of github.com:tikv/pd into deprecated-scheduler-v2

    Signed-off-by: okJiang <[email protected]>

commit 21914c5
Merge: fe528a7 0d73b58
Author: okJiang <[email protected]>
Date:   Wed Aug 14 13:02:59 2024 +0800

    Merge branch 'unify-scheduler-config' into deprecated-scheduler-v2

    Signed-off-by: okJiang <[email protected]>

commit 0d73b58
Author: okJiang <[email protected]>
Date:   Wed Aug 14 11:25:26 2024 +0800

    trigger ci

    Signed-off-by: okJiang <[email protected]>

commit ef974fc
Author: okJiang <[email protected]>
Date:   Wed Aug 14 11:17:52 2024 +0800

    trigger ci

    Signed-off-by: okJiang <[email protected]>

commit fa25ca0
Merge: fa73a15 4ba7249
Author: okJiang <[email protected]>
Date:   Wed Aug 14 11:11:46 2024 +0800

    Merge branch 'unify-scheduler-config' of github.com:okJiang/pd into unify-scheduler-config

commit fa73a15
Author: okJiang <[email protected]>
Date:   Wed Aug 14 11:11:25 2024 +0800

    fix comment

    Signed-off-by: okJiang <[email protected]>

commit 4ba7249
Merge: 96ef87e 1c1cd99
Author: okJiang <[email protected]>
Date:   Tue Aug 13 17:08:41 2024 +0800

    Merge branch 'master' into unify-scheduler-config

commit 96ef87e
Author: okJiang <[email protected]>
Date:   Tue Aug 13 16:56:22 2024 +0800

    fix comment: update save(conf) to save()

    Signed-off-by: okJiang <[email protected]>

commit fe528a7
Author: okJiang <[email protected]>
Date:   Tue Aug 13 16:34:12 2024 +0800

    save work

    Signed-off-by: okJiang <[email protected]>

commit 5832835
Author: okJiang <[email protected]>
Date:   Tue Aug 13 10:11:38 2024 +0800

    trigger ci

    Signed-off-by: okJiang <[email protected]>

commit 9c61f70
Merge: 1dfc4e6 dc5f3ca
Author: okJiang <[email protected]>
Date:   Mon Aug 12 17:31:44 2024 +0800

    Merge branch 'unify-scheduler-config' of github.com:okJiang/pd into unify-scheduler-config

commit 1dfc4e6
Author: okJiang <[email protected]>
Date:   Mon Aug 12 17:31:23 2024 +0800

    trigger ci

    Signed-off-by: okJiang <[email protected]>

commit dc5f3ca
Merge: 33b1c7c f3602e3
Author: okJiang <[email protected]>
Date:   Mon Aug 12 16:21:14 2024 +0800

    Merge branch 'master' into unify-scheduler-config

commit 33b1c7c
Author: okJiang <[email protected]>
Date:   Mon Aug 12 16:14:40 2024 +0800

    fix comment

    Signed-off-by: okJiang <[email protected]>

commit 2498970
Author: okJiang <[email protected]>
Date:   Mon Aug 12 14:26:40 2024 +0800

    revert a commit

    Signed-off-by: okJiang <[email protected]>

commit b2fc257
Author: okJiang <[email protected]>
Date:   Mon Aug 12 14:23:05 2024 +0800

    add more ut

    Signed-off-by: okJiang <[email protected]>

commit c6267ff
Author: okJiang <[email protected]>
Date:   Mon Aug 12 13:11:30 2024 +0800

    add an ut

    Signed-off-by: okJiang <[email protected]>

commit 1483339
Author: okJiang <[email protected]>
Date:   Mon Aug 12 11:45:17 2024 +0800

    optimize

    Signed-off-by: okJiang <[email protected]>

commit b3ed767
Author: okJiang <[email protected]>
Date:   Mon Aug 12 11:21:40 2024 +0800

    exact schedulerConfig interface

    Signed-off-by: okJiang <[email protected]>

Signed-off-by: okJiang <[email protected]>
  • Loading branch information
okJiang committed Aug 14, 2024
1 parent 3117f99 commit a574dc7
Show file tree
Hide file tree
Showing 28 changed files with 297 additions and 336 deletions.
155 changes: 97 additions & 58 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/statistics/utils"
Expand Down Expand Up @@ -62,6 +61,27 @@ type Cluster struct {
miscRunner ratelimit.Runner
// logRunner is used to process the log asynchronously.
logRunner ratelimit.Runner

schedulerNotifier schedulerNotifier
}

type schedulerNotifier struct {
addNotifier chan string
removeNotifier chan string
}

func (s *schedulerNotifier) addScheduler(name string) {
select {
case s.addNotifier <- name:
default:
}
}

func (s *schedulerNotifier) removeScheduler(name string) {
select {
case s.removeNotifier <- name:
default:
}
}

const (
Expand Down Expand Up @@ -258,9 +278,10 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {
return c.apiServerLeader.CompareAndSwap(old, new)
}

func trySend(notifier chan struct{}) {
// trySend try to send all default schedulers.
func trySend(notifier chan string) {
select {
case notifier <- struct{}{}:
case notifier <- "":
// If the channel is not empty, it means the check is triggered.
default:
}
Expand All @@ -271,86 +292,104 @@ func (c *Cluster) updateScheduler() {
defer logutil.LogPanic()
defer c.wg.Done()

var (
schedulerName string
addNotifier = make(chan string, 1)
removeNotifier = make(chan string, 1)
)
c.schedulerNotifier = schedulerNotifier{
addNotifier: addNotifier,
removeNotifier: removeNotifier,
}
// Make sure the coordinator has initialized all the existing schedulers.
c.waitSchedulersInitialized()
// Establish a notifier to listen the schedulers updating.
notifier := make(chan struct{}, 1)

// Make sure the check will be triggered once later.
trySend(notifier)
c.persistConfig.SetSchedulersUpdatingNotifier(notifier)

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

loop:
for {
retryOnNotRunning := func() {
if !c.running.Load() {
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-ticker.C:
// retry
trySend(notifier)
continue loop
}
}
}
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-notifier:
case schedulerName = <-addNotifier:
retryOnNotRunning()
// This is triggered by the watcher when the schedulers are updated.
}

if !c.running.Load() {
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-ticker.C:
// retry
trySend(notifier)
continue
}
case schedulerName = <-removeNotifier:
}

log.Info("schedulers updating notifier is triggered, try to update the scheduler")
type schedulerConfig struct {
Args []string `json:"args"`
Disable bool `json:"disable"`
}
var (
schedulersController = c.coordinator.GetSchedulersController()
latestSchedulersConfig = c.persistConfig.GetScheduleConfig().Schedulers
schedulersController = c.coordinator.GetSchedulersController()
)

// Create the newly added schedulers.
for _, scheduler := range latestSchedulersConfig {
schedulerType := types.ConvertOldStrToType[scheduler.Type]
s, err := schedulers.CreateScheduler(
schedulerType,
c.coordinator.GetOperatorController(),
c.storage,
schedulers.ConfigSliceDecoder(schedulerType, scheduler.Args),
schedulersController.RemoveScheduler,
)
if err != nil {
log.Error("failed to create scheduler",
zap.String("scheduler-type", scheduler.Type),
zap.Strings("scheduler-args", scheduler.Args),
errs.ZapError(err))
continue
}
name := s.GetName()
if existed, _ := schedulersController.IsSchedulerExisted(name); existed {
log.Info("scheduler has already existed, skip adding it",
zap.String("scheduler-name", name),
zap.Strings("scheduler-args", scheduler.Args))
continue
}
if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil {
log.Error("failed to add scheduler",
zap.String("scheduler-name", name),
zap.Strings("scheduler-args", scheduler.Args),
errs.ZapError(err))
continue
}
log.Info("add scheduler successfully",
schedulerType := types.ConvertOldStrToType[schedulerName]
cfg, err := c.storage.LoadSchedulerConfig(schedulerName)
if err != nil {
log.Error("failed to load scheduler config",
zap.String("scheduler-name", schedulerName),
errs.ZapError(err))
continue
}

s, err := schedulers.CreateScheduler(
schedulerType,
c.coordinator.GetOperatorController(),
c.storage,
schedulers.ConfigJSONDecoder([]byte(cfg)),
schedulersController.RemoveScheduler,
)
if err != nil {
log.Error("failed to create scheduler",
zap.Stringer("scheduler-type", schedulerType),
zap.String("scheduler-cfg", cfg),
errs.ZapError(err))
continue
}
name := s.GetName()
if existed, _ := schedulersController.IsSchedulerExisted(name); existed {
log.Info("scheduler has already existed, skip adding it",
zap.String("scheduler-name", name),
zap.String("scheduler-cfg", cfg))
continue
}
if err := schedulersController.AddScheduler(s); err != nil {
log.Error("failed to add scheduler",
zap.String("scheduler-name", name),
zap.Strings("scheduler-args", scheduler.Args))
zap.String("scheduler-cfg", cfg),
errs.ZapError(err))
continue
}
log.Info("add scheduler successfully",
zap.String("scheduler-name", name),
zap.String("scheduler-cfg", cfg))

// Remove the deleted schedulers.
for _, name := range schedulersController.GetSchedulerNames() {
scheduler := schedulersController.GetScheduler(name)
oldType := types.SchedulerTypeCompatibleMap[scheduler.GetType()]
if slice.AnyOf(latestSchedulersConfig, func(i int) bool {
return latestSchedulersConfig[i].Type == oldType
}) {
continue
}
if err := schedulersController.RemoveScheduler(name); err != nil {
log.Error("failed to remove scheduler",
zap.String("scheduler-name", name),
Expand Down
55 changes: 0 additions & 55 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -230,9 +229,6 @@ type PersistConfig struct {
schedule atomic.Value
replication atomic.Value
storeConfig atomic.Value
// schedulersUpdatingNotifier is used to notify that the schedulers have been updated.
// Store as `chan<- struct{}`.
schedulersUpdatingNotifier atomic.Value
}

// NewPersistConfig creates a new PersistConfig instance.
Expand All @@ -248,27 +244,6 @@ func NewPersistConfig(cfg *Config, ttl *cache.TTLString) *PersistConfig {
return o
}

// SetSchedulersUpdatingNotifier sets the schedulers updating notifier.
func (o *PersistConfig) SetSchedulersUpdatingNotifier(notifier chan<- struct{}) {
o.schedulersUpdatingNotifier.Store(notifier)
}

func (o *PersistConfig) getSchedulersUpdatingNotifier() chan<- struct{} {
v := o.schedulersUpdatingNotifier.Load()
if v == nil {
return nil
}
return v.(chan<- struct{})
}

func (o *PersistConfig) tryNotifySchedulersUpdating() {
notifier := o.getSchedulersUpdatingNotifier()
if notifier == nil {
return
}
notifier <- struct{}{}
}

// GetClusterVersion returns the cluster version.
func (o *PersistConfig) GetClusterVersion() *semver.Version {
return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion))
Expand All @@ -286,25 +261,7 @@ func (o *PersistConfig) GetScheduleConfig() *sc.ScheduleConfig {

// SetScheduleConfig sets the scheduling configuration dynamically.
func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) {
old := o.GetScheduleConfig()
o.schedule.Store(cfg)
// The coordinator is not aware of the underlying scheduler config changes,
// we should notify it to update the schedulers proactively.
if !reflect.DeepEqual(old.Schedulers, cfg.Schedulers) {
o.tryNotifySchedulersUpdating()
}
}

// AdjustScheduleCfg adjusts the schedule config during the initialization.
func AdjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) {
// In case we add new default schedulers.
for _, ps := range sc.DefaultSchedulers {
if slice.NoneOf(scheduleCfg.Schedulers, func(i int) bool {
return scheduleCfg.Schedulers[i].Type == ps.Type
}) {
scheduleCfg.Schedulers = append(scheduleCfg.Schedulers, ps)
}
}
}

// GetReplicationConfig returns replication configurations.
Expand Down Expand Up @@ -647,18 +604,6 @@ func (o *PersistConfig) SetMaxReplicas(replicas int) {
o.SetReplicationConfig(v)
}

// IsSchedulerDisabled returns if the scheduler is disabled.
func (o *PersistConfig) IsSchedulerDisabled(tp types.CheckerSchedulerType) bool {
oldType := types.SchedulerTypeCompatibleMap[tp]
schedulers := o.GetScheduleConfig().Schedulers
for _, s := range schedulers {
if oldType == s.Type {
return s.Disable
}
}
return false
}

// SetPlacementRulesCacheEnabled sets if the placement rules cache is enabled.
func (o *PersistConfig) SetPlacementRulesCacheEnabled(enabled bool) {
v := o.GetReplicationConfig().Clone()
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Watcher struct {
*PersistConfig
// Some data, like the scheduler configs, should be loaded into the storage
// to make sure the coordinator could access them correctly.
// It is a memory storage.
storage storage.Storage
// schedulersController is used to trigger the scheduler's config reloading.
// Store as `*schedulers.Controller`.
Expand Down Expand Up @@ -129,7 +130,6 @@ func (cw *Watcher) initializeConfigWatcher() error {
return err
}
log.Info("update scheduling config", zap.Reflect("new", cfg))
AdjustScheduleCfg(&cfg.Schedule)
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetScheduleConfig(&cfg.Schedule)
cw.SetReplicationConfig(&cfg.Replication)
Expand Down
12 changes: 0 additions & 12 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,6 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool)
c.HaltScheduling = defaultHaltScheduling
}

adjustSchedulers(&c.Schedulers, DefaultSchedulers)

for k, b := range c.migrateConfigurationMap() {
v, err := parseDeprecatedFlag(meta, k, *b[0], *b[1])
if err != nil {
Expand Down Expand Up @@ -574,16 +572,6 @@ var DefaultSchedulers = SchedulerConfigs{
{Type: types.SchedulerTypeCompatibleMap[types.EvictSlowStoreScheduler]},
}

// IsDefaultScheduler checks whether the scheduler is enabled by default.
func IsDefaultScheduler(typ string) bool {
for _, c := range DefaultSchedulers {
if typ == c.Type {
return true
}
}
return false
}

// ReplicationConfig is the replication configuration.
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
type ReplicationConfig struct {
Expand Down
3 changes: 0 additions & 3 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ type SchedulerConfigProvider interface {
SetSchedulingAllowanceStatus(bool, string)
GetStoresLimit() map[uint64]StoreLimitConfig

IsSchedulerDisabled(types.CheckerSchedulerType) bool
AddSchedulerCfg(types.CheckerSchedulerType, []string)
RemoveSchedulerCfg(types.CheckerSchedulerType)
Persist(endpoint.ConfigStorage) error

GetRegionScheduleLimit() uint64
Expand Down
Loading

0 comments on commit a574dc7

Please sign in to comment.