Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] *: Deprecated scheduler v2 #8528

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 97 additions & 58 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/statistics/utils"
Expand Down Expand Up @@ -62,6 +61,27 @@ type Cluster struct {
miscRunner ratelimit.Runner
// logRunner is used to process the log asynchronously.
logRunner ratelimit.Runner

schedulerNotifier schedulerNotifier
}

type schedulerNotifier struct {
addNotifier chan string
removeNotifier chan string
}

func (s *schedulerNotifier) addScheduler(name string) {
select {
case s.addNotifier <- name:
default:
}
}

func (s *schedulerNotifier) removeScheduler(name string) {
select {
case s.removeNotifier <- name:
default:
}
}

const (
Expand Down Expand Up @@ -258,9 +278,10 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {
return c.apiServerLeader.CompareAndSwap(old, new)
}

func trySend(notifier chan struct{}) {
// trySend try to send all default schedulers.
func trySend(notifier chan string) {
select {
case notifier <- struct{}{}:
case notifier <- "":
// If the channel is not empty, it means the check is triggered.
default:
}
Expand All @@ -271,86 +292,104 @@ func (c *Cluster) updateScheduler() {
defer logutil.LogPanic()
defer c.wg.Done()

var (
schedulerName string
addNotifier = make(chan string, 1)
removeNotifier = make(chan string, 1)
)
c.schedulerNotifier = schedulerNotifier{
addNotifier: addNotifier,
removeNotifier: removeNotifier,
}
// Make sure the coordinator has initialized all the existing schedulers.
c.waitSchedulersInitialized()
// Establish a notifier to listen the schedulers updating.
notifier := make(chan struct{}, 1)

// Make sure the check will be triggered once later.
trySend(notifier)
c.persistConfig.SetSchedulersUpdatingNotifier(notifier)

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

loop:
for {
retryOnNotRunning := func() {
if !c.running.Load() {
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-ticker.C:
// retry
trySend(notifier)
continue loop
}
}
}
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-notifier:
case schedulerName = <-addNotifier:
retryOnNotRunning()
// This is triggered by the watcher when the schedulers are updated.
}

if !c.running.Load() {
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-ticker.C:
// retry
trySend(notifier)
continue
}
case schedulerName = <-removeNotifier:
}

log.Info("schedulers updating notifier is triggered, try to update the scheduler")
type schedulerConfig struct {
Args []string `json:"args"`
Disable bool `json:"disable"`
}
var (
schedulersController = c.coordinator.GetSchedulersController()
latestSchedulersConfig = c.persistConfig.GetScheduleConfig().Schedulers
schedulersController = c.coordinator.GetSchedulersController()
)

// Create the newly added schedulers.
for _, scheduler := range latestSchedulersConfig {
schedulerType := types.ConvertOldStrToType[scheduler.Type]
s, err := schedulers.CreateScheduler(
schedulerType,
c.coordinator.GetOperatorController(),
c.storage,
schedulers.ConfigSliceDecoder(schedulerType, scheduler.Args),
schedulersController.RemoveScheduler,
)
if err != nil {
log.Error("failed to create scheduler",
zap.String("scheduler-type", scheduler.Type),
zap.Strings("scheduler-args", scheduler.Args),
errs.ZapError(err))
continue
}
name := s.GetName()
if existed, _ := schedulersController.IsSchedulerExisted(name); existed {
log.Info("scheduler has already existed, skip adding it",
zap.String("scheduler-name", name),
zap.Strings("scheduler-args", scheduler.Args))
continue
}
if err := schedulersController.AddScheduler(s, scheduler.Args...); err != nil {
log.Error("failed to add scheduler",
zap.String("scheduler-name", name),
zap.Strings("scheduler-args", scheduler.Args),
errs.ZapError(err))
continue
}
log.Info("add scheduler successfully",
schedulerType := types.ConvertOldStrToType[schedulerName]
cfg, err := c.storage.LoadSchedulerConfig(schedulerName)
if err != nil {
log.Error("failed to load scheduler config",
zap.String("scheduler-name", schedulerName),
errs.ZapError(err))
continue
}

s, err := schedulers.CreateScheduler(
schedulerType,
c.coordinator.GetOperatorController(),
c.storage,
schedulers.ConfigJSONDecoder([]byte(cfg)),
schedulersController.RemoveScheduler,
)
if err != nil {
log.Error("failed to create scheduler",
zap.Stringer("scheduler-type", schedulerType),
zap.String("scheduler-cfg", cfg),
errs.ZapError(err))
continue
}
name := s.GetName()
if existed, _ := schedulersController.IsSchedulerExisted(name); existed {
log.Info("scheduler has already existed, skip adding it",
zap.String("scheduler-name", name),
zap.String("scheduler-cfg", cfg))
continue
}
if err := schedulersController.AddScheduler(s); err != nil {
log.Error("failed to add scheduler",
zap.String("scheduler-name", name),
zap.Strings("scheduler-args", scheduler.Args))
zap.String("scheduler-cfg", cfg),
errs.ZapError(err))
continue
}
log.Info("add scheduler successfully",
zap.String("scheduler-name", name),
zap.String("scheduler-cfg", cfg))

// Remove the deleted schedulers.
for _, name := range schedulersController.GetSchedulerNames() {
scheduler := schedulersController.GetScheduler(name)
oldType := types.SchedulerTypeCompatibleMap[scheduler.GetType()]
if slice.AnyOf(latestSchedulersConfig, func(i int) bool {
return latestSchedulersConfig[i].Type == oldType
}) {
continue
}
if err := schedulersController.RemoveScheduler(name); err != nil {
log.Error("failed to remove scheduler",
zap.String("scheduler-name", name),
Expand Down
55 changes: 0 additions & 55 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"fmt"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync/atomic"
Expand All @@ -37,7 +36,7 @@
mcsconstant "github.com/tikv/pd/pkg/mcs/utils/constant"
sc "github.com/tikv/pd/pkg/schedule/config"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/slice"

Check failure on line 39 in pkg/mcs/scheduling/server/config/config.go

View workflow job for this annotation

GitHub Actions / statics

"github.com/tikv/pd/pkg/slice" imported and not used

Check failure on line 39 in pkg/mcs/scheduling/server/config/config.go

View workflow job for this annotation

GitHub Actions / tso-function-test

"github.com/tikv/pd/pkg/slice" imported and not used

Check failure on line 39 in pkg/mcs/scheduling/server/config/config.go

View workflow job for this annotation

GitHub Actions / chunks (3, Unit Test(3))

"github.com/tikv/pd/pkg/slice" imported and not used

Check failure on line 39 in pkg/mcs/scheduling/server/config/config.go

View workflow job for this annotation

GitHub Actions / chunks (4, Tests(1))

"github.com/tikv/pd/pkg/slice" imported and not used

Check failure on line 39 in pkg/mcs/scheduling/server/config/config.go

View workflow job for this annotation

GitHub Actions / chunks (5, Tests(2))

"github.com/tikv/pd/pkg/slice" imported and not used
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
Expand Down Expand Up @@ -230,9 +229,6 @@
schedule atomic.Value
replication atomic.Value
storeConfig atomic.Value
// schedulersUpdatingNotifier is used to notify that the schedulers have been updated.
// Store as `chan<- struct{}`.
schedulersUpdatingNotifier atomic.Value
}

// NewPersistConfig creates a new PersistConfig instance.
Expand All @@ -248,27 +244,6 @@
return o
}

// SetSchedulersUpdatingNotifier sets the schedulers updating notifier.
func (o *PersistConfig) SetSchedulersUpdatingNotifier(notifier chan<- struct{}) {
o.schedulersUpdatingNotifier.Store(notifier)
}

func (o *PersistConfig) getSchedulersUpdatingNotifier() chan<- struct{} {
v := o.schedulersUpdatingNotifier.Load()
if v == nil {
return nil
}
return v.(chan<- struct{})
}

func (o *PersistConfig) tryNotifySchedulersUpdating() {
notifier := o.getSchedulersUpdatingNotifier()
if notifier == nil {
return
}
notifier <- struct{}{}
}

// GetClusterVersion returns the cluster version.
func (o *PersistConfig) GetClusterVersion() *semver.Version {
return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion))
Expand All @@ -286,25 +261,7 @@

// SetScheduleConfig sets the scheduling configuration dynamically.
func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) {
old := o.GetScheduleConfig()
o.schedule.Store(cfg)
// The coordinator is not aware of the underlying scheduler config changes,
// we should notify it to update the schedulers proactively.
if !reflect.DeepEqual(old.Schedulers, cfg.Schedulers) {
o.tryNotifySchedulersUpdating()
}
}

// AdjustScheduleCfg adjusts the schedule config during the initialization.
func AdjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) {
// In case we add new default schedulers.
for _, ps := range sc.DefaultSchedulers {
if slice.NoneOf(scheduleCfg.Schedulers, func(i int) bool {
return scheduleCfg.Schedulers[i].Type == ps.Type
}) {
scheduleCfg.Schedulers = append(scheduleCfg.Schedulers, ps)
}
}
}

// GetReplicationConfig returns replication configurations.
Expand Down Expand Up @@ -647,18 +604,6 @@
o.SetReplicationConfig(v)
}

// IsSchedulerDisabled returns if the scheduler is disabled.
func (o *PersistConfig) IsSchedulerDisabled(tp types.CheckerSchedulerType) bool {
oldType := types.SchedulerTypeCompatibleMap[tp]
schedulers := o.GetScheduleConfig().Schedulers
for _, s := range schedulers {
if oldType == s.Type {
return s.Disable
}
}
return false
}

// SetPlacementRulesCacheEnabled sets if the placement rules cache is enabled.
func (o *PersistConfig) SetPlacementRulesCacheEnabled(enabled bool) {
v := o.GetReplicationConfig().Clone()
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Watcher struct {
*PersistConfig
// Some data, like the scheduler configs, should be loaded into the storage
// to make sure the coordinator could access them correctly.
// It is a memory storage.
storage storage.Storage
// schedulersController is used to trigger the scheduler's config reloading.
// Store as `*schedulers.Controller`.
Expand Down Expand Up @@ -129,7 +130,6 @@ func (cw *Watcher) initializeConfigWatcher() error {
return err
}
log.Info("update scheduling config", zap.Reflect("new", cfg))
AdjustScheduleCfg(&cfg.Schedule)
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetScheduleConfig(&cfg.Schedule)
cw.SetReplicationConfig(&cfg.Replication)
Expand Down
12 changes: 0 additions & 12 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,6 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool)
c.HaltScheduling = defaultHaltScheduling
}

adjustSchedulers(&c.Schedulers, DefaultSchedulers)

for k, b := range c.migrateConfigurationMap() {
v, err := parseDeprecatedFlag(meta, k, *b[0], *b[1])
if err != nil {
Expand Down Expand Up @@ -574,16 +572,6 @@ var DefaultSchedulers = SchedulerConfigs{
{Type: types.SchedulerTypeCompatibleMap[types.EvictSlowStoreScheduler]},
}

// IsDefaultScheduler checks whether the scheduler is enabled by default.
func IsDefaultScheduler(typ string) bool {
for _, c := range DefaultSchedulers {
if typ == c.Type {
return true
}
}
return false
}

// ReplicationConfig is the replication configuration.
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
type ReplicationConfig struct {
Expand Down
3 changes: 0 additions & 3 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ type SchedulerConfigProvider interface {
SetSchedulingAllowanceStatus(bool, string)
GetStoresLimit() map[uint64]StoreLimitConfig

IsSchedulerDisabled(types.CheckerSchedulerType) bool
AddSchedulerCfg(types.CheckerSchedulerType, []string)
RemoveSchedulerCfg(types.CheckerSchedulerType)
Persist(endpoint.ConfigStorage) error

GetRegionScheduleLimit() uint64
Expand Down
Loading
Loading