From b3ed7671474ecab781cd77be26e3b0d1a9e5b219 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Mon, 12 Aug 2024 11:21:40 +0800 Subject: [PATCH] exact schedulerConfig interface Signed-off-by: okJiang <819421878@qq.com> --- pkg/schedule/schedulers/balance_leader.go | 26 ++---- pkg/schedule/schedulers/balance_witness.go | 26 ++---- pkg/schedule/schedulers/config.go | 63 ++++++++++++++ pkg/schedule/schedulers/evict_leader.go | 32 ++----- pkg/schedule/schedulers/evict_slow_store.go | 38 +++------ .../schedulers/evict_slow_store_test.go | 14 +-- pkg/schedule/schedulers/evict_slow_trend.go | 48 ++++------- .../schedulers/evict_slow_trend_test.go | 13 +-- pkg/schedule/schedulers/grant_hot_region.go | 19 +---- pkg/schedule/schedulers/grant_leader.go | 19 +---- pkg/schedule/schedulers/hot_region.go | 10 +-- pkg/schedule/schedulers/hot_region_config.go | 14 +-- pkg/schedule/schedulers/init.go | 85 +++++++++++++------ pkg/schedule/schedulers/scatter_range.go | 20 +---- pkg/schedule/schedulers/scheduler.go | 11 ++- pkg/schedule/schedulers/shuffle_hot_region.go | 37 +++----- pkg/schedule/schedulers/shuffle_leader.go | 7 +- pkg/schedule/schedulers/shuffle_region.go | 9 +- .../schedulers/shuffle_region_config.go | 13 +-- pkg/schedule/schedulers/split_bucket.go | 22 +---- 20 files changed, 219 insertions(+), 307 deletions(-) create mode 100644 pkg/schedule/schedulers/config.go diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index b434c7ad706..3802576ccb2 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -32,7 +32,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/reflectutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -56,8 +55,9 @@ const ( type balanceLeaderSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage - Ranges []core.KeyRange `json:"ranges"` + schedulerConfig + + Ranges []core.KeyRange `json:"ranges"` // Batch is used to generate multiple operators by one scheduling Batch int `json:"batch"` } @@ -79,7 +79,7 @@ func (conf *balanceLeaderSchedulerConfig) update(data []byte) (int, any) { } return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10" } - if err := conf.persistLocked(); err != nil { + if err := conf.schedulerConfig.save(conf); err != nil { log.Warn("failed to save balance-leader-scheduler config", errs.ZapError(err)) } log.Info("balance-leader-scheduler config is updated", zap.ByteString("old", oldConfig), zap.ByteString("new", newConfig)) @@ -111,14 +111,6 @@ func (conf *balanceLeaderSchedulerConfig) clone() *balanceLeaderSchedulerConfig } } -func (conf *balanceLeaderSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(BalanceLeaderName, data) -} - func (conf *balanceLeaderSchedulerConfig) getBatch() int { conf.RLock() defer conf.RUnlock() @@ -216,15 +208,9 @@ func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { func (l *balanceLeaderScheduler) ReloadConfig() error { l.conf.Lock() defer l.conf.Unlock() - cfgData, err := l.conf.storage.LoadSchedulerConfig(l.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } + newCfg := &balanceLeaderSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := l.conf.load(newCfg); err != nil { return err } l.conf.Ranges = newCfg.Ranges diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index 599af6df637..2bc0ad0f95f 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -33,7 +33,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/reflectutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -53,8 +52,9 @@ const ( type balanceWitnessSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage - Ranges []core.KeyRange `json:"ranges"` + schedulerConfig + + Ranges []core.KeyRange `json:"ranges"` // Batch is used to generate multiple operators by one scheduling Batch int `json:"batch"` } @@ -76,7 +76,7 @@ func (conf *balanceWitnessSchedulerConfig) update(data []byte) (int, any) { } return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10" } - if err := conf.persistLocked(); err != nil { + if err := conf.schedulerConfig.save(conf); err != nil { log.Warn("failed to persist config", zap.Error(err)) } log.Info("balance-witness-scheduler config is updated", zap.ByteString("old", oldc), zap.ByteString("new", newc)) @@ -108,14 +108,6 @@ func (conf *balanceWitnessSchedulerConfig) clone() *balanceWitnessSchedulerConfi } } -func (conf *balanceWitnessSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(BalanceWitnessName, data) -} - func (conf *balanceWitnessSchedulerConfig) getBatch() int { conf.RLock() defer conf.RUnlock() @@ -215,15 +207,9 @@ func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) { func (b *balanceWitnessScheduler) ReloadConfig() error { b.conf.Lock() defer b.conf.Unlock() - cfgData, err := b.conf.storage.LoadSchedulerConfig(b.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } + newCfg := &balanceWitnessSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := b.conf.load(newCfg); err != nil { return err } b.conf.Ranges = newCfg.Ranges diff --git a/pkg/schedule/schedulers/config.go b/pkg/schedule/schedulers/config.go new file mode 100644 index 00000000000..724e59e77cc --- /dev/null +++ b/pkg/schedule/schedulers/config.go @@ -0,0 +1,63 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulers + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/tikv/pd/pkg/storage/endpoint" +) + +type schedulerConfig interface { + save(any) error + load(any) error + setStorage(endpoint.ConfigStorage) + setName(string) +} + +type baseSchedulerConfig struct { + name string + storage endpoint.ConfigStorage +} + +func (b *baseSchedulerConfig) setStorage(storage endpoint.ConfigStorage) { + b.storage = storage +} + +func (b *baseSchedulerConfig) setName(name string) { + b.name = name +} + +func (b *baseSchedulerConfig) save(v any) error { + data, err := EncodeConfig(v) + failpoint.Inject("persistFail", func() { + err = errors.New("fail to persist") + }) + if err != nil { + return err + } + return b.storage.SaveSchedulerConfig(b.name, data) +} + +func (b *baseSchedulerConfig) load(v any) error { + data, err := b.storage.LoadSchedulerConfig(b.name) + if err != nil { + return err + } + if err = DecodeConfig([]byte(data), v); err != nil { + return err + } + return nil +} diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index d4e26cb1b68..bba6246710c 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -30,7 +30,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -48,7 +47,8 @@ const ( type evictLeaderSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig + StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` // Batch is used to generate multiple operators by one scheduling Batch int `json:"batch"` @@ -85,17 +85,6 @@ func (conf *evictLeaderSchedulerConfig) clone() *evictLeaderSchedulerConfig { } } -func (conf *evictLeaderSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - failpoint.Inject("persistFail", func() { - err = errors.New("fail to persist") - }) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(types.EvictLeaderScheduler.String(), data) -} - func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string { conf.RLock() defer conf.RUnlock() @@ -148,15 +137,8 @@ func (conf *evictLeaderSchedulerConfig) encodeConfig() ([]byte, error) { func (conf *evictLeaderSchedulerConfig) reloadConfig(name string) error { conf.Lock() defer conf.Unlock() - cfgData, err := conf.storage.LoadSchedulerConfig(name) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &evictLeaderSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := conf.load(newCfg); err != nil { return err } pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) @@ -203,7 +185,11 @@ func (conf *evictLeaderSchedulerConfig) update(id uint64, newRanges []core.KeyRa conf.StoreIDWithRanges[id] = newRanges } conf.Batch = batch - err := conf.persistLocked() + var err error + failpoint.Inject("persistFail", func() { + err = errors.New("fail to persist") + }) + err = conf.save(conf) if err != nil && id != 0 { _, _ = conf.removeStoreLocked(id) } @@ -220,7 +206,7 @@ func (conf *evictLeaderSchedulerConfig) delete(id uint64) (any, error) { } keyRanges := conf.StoreIDWithRanges[id] - err = conf.persistLocked() + err = conf.save(conf) if err != nil { conf.resetStoreLocked(id, keyRanges) conf.Unlock() diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index d0fb963bd52..bc05b0946fb 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -27,7 +27,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -44,8 +43,9 @@ const ( type evictSlowStoreSchedulerConfig struct { syncutil.RWMutex + schedulerConfig + cluster *core.BasicCluster - storage endpoint.ConfigStorage // Last timestamp of the chosen slow store for eviction. lastSlowStoreCaptureTS time.Time // Duration gap for recovering the candidate, unit: s. @@ -53,9 +53,8 @@ type evictSlowStoreSchedulerConfig struct { EvictedStores []uint64 `json:"evict-stores"` } -func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowStoreSchedulerConfig { +func initEvictSlowStoreSchedulerConfig() *evictSlowStoreSchedulerConfig { return &evictSlowStoreSchedulerConfig{ - storage: storage, lastSlowStoreCaptureTS: time.Time{}, RecoveryDurationGap: defaultRecoveryDurationGap, EvictedStores: make([]uint64, 0), @@ -70,17 +69,6 @@ func (conf *evictSlowStoreSchedulerConfig) clone() *evictSlowStoreSchedulerConfi } } -func (conf *evictSlowStoreSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - failpoint.Inject("persistFail", func() { - err = errors.New("fail to persist") - }) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(types.EvictSlowStoreScheduler.String(), data) -} - func (conf *evictSlowStoreSchedulerConfig) getStores() []uint64 { conf.RLock() defer conf.RUnlock() @@ -121,7 +109,11 @@ func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error { defer conf.Unlock() conf.EvictedStores = []uint64{id} conf.lastSlowStoreCaptureTS = time.Now() - return conf.persistLocked() + return conf.persist() +} + +func (conf *evictSlowStoreSchedulerConfig) persist() error { + return conf.schedulerConfig.save(conf) } func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err error) { @@ -131,7 +123,7 @@ func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err if oldID > 0 { conf.EvictedStores = []uint64{} conf.lastSlowStoreCaptureTS = time.Time{} - err = conf.persistLocked() + err = conf.persist() } return } @@ -167,7 +159,7 @@ func (handler *evictSlowStoreHandler) updateConfig(w http.ResponseWriter, r *htt prevRecoveryDurationGap := handler.config.RecoveryDurationGap recoveryDurationGap := uint64(recoveryDurationGapFloat) handler.config.RecoveryDurationGap = recoveryDurationGap - if err := handler.config.persistLocked(); err != nil { + if err := handler.config.persist(); err != nil { handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) handler.config.RecoveryDurationGap = prevRecoveryDurationGap return @@ -201,15 +193,9 @@ func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) { func (s *evictSlowStoreScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } + newCfg := &evictSlowStoreSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } old := make(map[uint64]struct{}) diff --git a/pkg/schedule/schedulers/evict_slow_store_test.go b/pkg/schedule/schedulers/evict_slow_store_test.go index ad5b16e8ca3..406a08b9c99 100644 --- a/pkg/schedule/schedulers/evict_slow_store_test.go +++ b/pkg/schedule/schedulers/evict_slow_store_test.go @@ -16,8 +16,6 @@ package schedulers import ( "context" - "encoding/json" - "strings" "testing" "github.com/pingcap/failpoint" @@ -103,18 +101,10 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() { re.Zero(es2.conf.evictStore()) // check the value from storage. - sches, vs, err := es2.conf.storage.LoadAllSchedulerConfigs() - re.NoError(err) - valueStr := "" - for id, sche := range sches { - if strings.EqualFold(sche, EvictSlowStoreName) { - valueStr = vs[id] - } - } - var persistValue evictSlowStoreSchedulerConfig - err = json.Unmarshal([]byte(valueStr), &persistValue) + err := es2.conf.load(&persistValue) re.NoError(err) + re.Equal(es2.conf.EvictedStores, persistValue.EvictedStores) re.Zero(persistValue.evictStore()) re.True(persistValue.readyForRecovery()) diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index 206791900c6..2de22466bbd 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -28,7 +28,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -54,8 +53,9 @@ type slowCandidate struct { type evictSlowTrendSchedulerConfig struct { syncutil.RWMutex + schedulerConfig + cluster *core.BasicCluster - storage endpoint.ConfigStorage // Candidate for eviction in current tick. evictCandidate slowCandidate // Last chosen candidate for eviction. @@ -66,9 +66,8 @@ type evictSlowTrendSchedulerConfig struct { EvictedStores []uint64 `json:"evict-by-trend-stores"` } -func initEvictSlowTrendSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowTrendSchedulerConfig { +func initEvictSlowTrendSchedulerConfig() *evictSlowTrendSchedulerConfig { return &evictSlowTrendSchedulerConfig{ - storage: storage, evictCandidate: slowCandidate{}, lastEvictCandidate: slowCandidate{}, RecoveryDurationGap: defaultRecoveryDurationGap, @@ -84,17 +83,6 @@ func (conf *evictSlowTrendSchedulerConfig) clone() *evictSlowTrendSchedulerConfi } } -func (conf *evictSlowTrendSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - failpoint.Inject("persistFail", func() { - err = errors.New("fail to persist") - }) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(types.EvictSlowTrendScheduler.String(), data) -} - func (conf *evictSlowTrendSchedulerConfig) getStores() []uint64 { conf.RLock() defer conf.RUnlock() @@ -205,7 +193,7 @@ func (conf *evictSlowTrendSchedulerConfig) setStoreAndPersist(id uint64) error { conf.Lock() defer conf.Unlock() conf.EvictedStores = []uint64{id} - return conf.persistLocked() + return conf.save(conf) } func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.SchedulerCluster) (oldID uint64, err error) { @@ -222,7 +210,7 @@ func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.Schedule conf.Lock() defer conf.Unlock() conf.EvictedStores = []uint64{} - return oldID, conf.persistLocked() + return oldID, conf.save(conf) } type evictSlowTrendHandler struct { @@ -251,14 +239,15 @@ func (handler *evictSlowTrendHandler) updateConfig(w http.ResponseWriter, r *htt handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) return } - handler.config.Lock() - defer handler.config.Unlock() - prevRecoveryDurationGap := handler.config.RecoveryDurationGap + conf := handler.config + conf.Lock() + defer conf.Unlock() + prevRecoveryDurationGap := conf.RecoveryDurationGap recoveryDurationGap := uint64(recoveryDurationGapFloat) - handler.config.RecoveryDurationGap = recoveryDurationGap - if err := handler.config.persistLocked(); err != nil { + conf.RecoveryDurationGap = recoveryDurationGap + if err := conf.save(conf); err != nil { handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - handler.config.RecoveryDurationGap = prevRecoveryDurationGap + conf.RecoveryDurationGap = prevRecoveryDurationGap return } log.Info("evict-slow-trend-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) @@ -304,17 +293,11 @@ func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) { func (s *evictSlowTrendScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &evictSlowTrendSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } + old := make(map[uint64]struct{}) for _, id := range s.conf.EvictedStores { old[id] = struct{}{} @@ -456,11 +439,12 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, _ bool func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSlowTrendSchedulerConfig) Scheduler { handler := newEvictSlowTrendHandler(conf) - return &evictSlowTrendScheduler{ + sche := &evictSlowTrendScheduler{ BaseScheduler: NewBaseScheduler(opController, types.EvictSlowTrendScheduler), conf: conf, handler: handler, } + return sche } func chooseEvictCandidate(cluster sche.SchedulerCluster, lastEvictCandidate *slowCandidate) (slowStore *core.StoreInfo) { diff --git a/pkg/schedule/schedulers/evict_slow_trend_test.go b/pkg/schedule/schedulers/evict_slow_trend_test.go index 10da5c91565..02cb65021eb 100644 --- a/pkg/schedule/schedulers/evict_slow_trend_test.go +++ b/pkg/schedule/schedulers/evict_slow_trend_test.go @@ -16,8 +16,6 @@ package schedulers import ( "context" - "encoding/json" - "strings" "testing" "time" @@ -186,17 +184,8 @@ func (suite *evictSlowTrendTestSuite) TestEvictSlowTrend() { re.Zero(es2.conf.evictedStore()) // check the value from storage. - sches, vs, err := es2.conf.storage.LoadAllSchedulerConfigs() - re.NoError(err) - valueStr := "" - for id, sche := range sches { - if strings.EqualFold(sche, EvictSlowTrendName) { - valueStr = vs[id] - } - } - var persistValue evictSlowTrendSchedulerConfig - err = json.Unmarshal([]byte(valueStr), &persistValue) + err := es2.conf.load(&persistValue) re.NoError(err) re.Equal(es2.conf.EvictedStores, persistValue.EvictedStores) re.Zero(persistValue.evictedStore()) diff --git a/pkg/schedule/schedulers/grant_hot_region.go b/pkg/schedule/schedulers/grant_hot_region.go index 15a520f95d0..c90f0da1329 100644 --- a/pkg/schedule/schedulers/grant_hot_region.go +++ b/pkg/schedule/schedulers/grant_hot_region.go @@ -34,7 +34,6 @@ import ( "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/utils" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -48,7 +47,8 @@ const ( type grantHotRegionSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig + cluster *core.BasicCluster StoreIDs []uint64 `json:"store-id"` StoreLeaderID uint64 `json:"store-leader-id"` @@ -93,11 +93,7 @@ func (conf *grantHotRegionSchedulerConfig) clone() *grantHotRegionSchedulerConfi func (conf *grantHotRegionSchedulerConfig) persist() error { conf.RLock() defer conf.RUnlock() - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(types.GrantHotRegionScheduler.String(), data) + return conf.save(conf) } func (conf *grantHotRegionSchedulerConfig) has(storeID uint64) bool { @@ -146,15 +142,8 @@ func (s *grantHotRegionScheduler) EncodeConfig() ([]byte, error) { func (s *grantHotRegionScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &grantHotRegionSchedulerConfig{} - if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } s.conf.StoreIDs = newCfg.StoreIDs diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 134eddda880..96074f5d94d 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -29,7 +29,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -43,7 +42,8 @@ const ( type grantLeaderSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig + StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` cluster *core.BasicCluster removeSchedulerCb func(name string) error @@ -83,11 +83,7 @@ func (conf *grantLeaderSchedulerConfig) clone() *grantLeaderSchedulerConfig { func (conf *grantLeaderSchedulerConfig) persist() error { conf.RLock() defer conf.RUnlock() - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(types.GrantLeaderScheduler.String(), data) + return conf.save(conf) } func (conf *grantLeaderSchedulerConfig) getRanges(id uint64) []string { @@ -176,15 +172,8 @@ func (s *grantLeaderScheduler) EncodeConfig() ([]byte, error) { func (s *grantLeaderScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &grantLeaderSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } pauseAndResumeLeaderTransfer(s.conf.cluster, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 717c1413ac4..ab595ec9058 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -222,15 +222,9 @@ func (h *hotScheduler) EncodeConfig() ([]byte, error) { func (h *hotScheduler) ReloadConfig() error { h.conf.Lock() defer h.conf.Unlock() - cfgData, err := h.conf.storage.LoadSchedulerConfig(h.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } + newCfg := &hotRegionSchedulerConfig{} - if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := h.conf.load(newCfg); err != nil { return err } h.conf.MinHotByteRate = newCfg.MinHotByteRate diff --git a/pkg/schedule/schedulers/hot_region_config.go b/pkg/schedule/schedulers/hot_region_config.go index 83121254cc0..cae57de361c 100644 --- a/pkg/schedule/schedulers/hot_region_config.go +++ b/pkg/schedule/schedulers/hot_region_config.go @@ -28,7 +28,6 @@ import ( "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/utils" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/reflectutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -114,7 +113,8 @@ func (conf *hotRegionSchedulerConfig) getValidConf() *hotRegionSchedulerConfig { type hotRegionSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig + lastQuerySupported bool MinHotByteRate float64 `json:"min-hot-byte-rate"` @@ -455,7 +455,7 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r * } newc, _ := json.Marshal(conf) if !bytes.Equal(oldc, newc) { - if err := conf.persistLocked(); err != nil { + if err := conf.save(conf); err != nil { log.Warn("failed to persist config", zap.Error(err)) } log.Info("hot-region-scheduler config is updated", zap.String("old", string(oldc)), zap.String("new", string(newc))) @@ -477,14 +477,6 @@ func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r * rd.Text(w, http.StatusBadRequest, "Config item is not found.") } -func (conf *hotRegionSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(HotRegionName, data) -} - func (conf *hotRegionSchedulerConfig) checkQuerySupport(cluster sche.SchedulerCluster) bool { version := cluster.GetSchedulerConfig().GetClusterVersion() querySupport := versioninfo.IsFeatureSupported(version, versioninfo.HotScheduleWithQuery) diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 0e1917e8552..250b362b905 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -55,14 +55,17 @@ func schedulersRegister() { RegisterScheduler(types.BalanceLeaderScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := &balanceLeaderSchedulerConfig{storage: storage} + conf := &balanceLeaderSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err } if conf.Batch == 0 { conf.Batch = BalanceLeaderBatchSize } - return newBalanceLeaderScheduler(opController, conf), nil + sche := newBalanceLeaderScheduler(opController, conf) + conf.setStorage(storage) + conf.setName(sche.GetName()) + return sche, nil }) // balance region @@ -109,14 +112,17 @@ func schedulersRegister() { RegisterScheduler(types.BalanceWitnessScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := &balanceWitnessSchedulerConfig{storage: storage} + conf := &balanceWitnessSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err } if conf.Batch == 0 { conf.Batch = balanceWitnessBatchSize } - return newBalanceWitnessScheduler(opController, conf), nil + sche := newBalanceWitnessScheduler(opController, conf) + conf.setStorage(storage) + conf.setName(sche.GetName()) + return sche, nil }) // evict leader @@ -147,13 +153,16 @@ func schedulersRegister() { RegisterScheduler(types.EvictLeaderScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { - conf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} + conf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange)} if err := decoder(conf); err != nil { return nil, err } conf.cluster = opController.GetCluster() conf.removeSchedulerCb = removeSchedulerCb[0] - return newEvictLeaderScheduler(opController, conf), nil + sche := newEvictLeaderScheduler(opController, conf) + conf.setStorage(storage) + conf.setName(sche.GetName()) + return sche, nil }) // evict slow store @@ -165,12 +174,15 @@ func schedulersRegister() { RegisterScheduler(types.EvictSlowStoreScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := initEvictSlowStoreSchedulerConfig(storage) + conf := initEvictSlowStoreSchedulerConfig() if err := decoder(conf); err != nil { return nil, err } conf.cluster = opController.GetCluster() - return newEvictSlowStoreScheduler(opController, conf), nil + sche := newEvictSlowStoreScheduler(opController, conf) + conf.setStorage(storage) + conf.setName(sche.GetName()) + return sche, nil }) // grant hot region @@ -206,12 +218,15 @@ func schedulersRegister() { RegisterScheduler(types.GrantHotRegionScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := &grantHotRegionSchedulerConfig{StoreIDs: make([]uint64, 0), storage: storage} + conf := &grantHotRegionSchedulerConfig{StoreIDs: make([]uint64, 0)} conf.cluster = opController.GetCluster() if err := decoder(conf); err != nil { return nil, err } - return newGrantHotRegionScheduler(opController, conf), nil + sche := newGrantHotRegionScheduler(opController, conf) + conf.setName(sche.GetName()) + conf.setStorage(storage) + return sche, nil }) // hot region @@ -238,8 +253,10 @@ func schedulersRegister() { return nil, err } } - conf.storage = storage - return newHotScheduler(opController, conf), nil + sche := newHotScheduler(opController, conf) + conf.setStorage(storage) + conf.setName(sche.GetName()) + return sche, nil }) // grant leader @@ -269,13 +286,16 @@ func schedulersRegister() { RegisterScheduler(types.GrantLeaderScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { - conf := &grantLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange), storage: storage} + conf := &grantLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange)} conf.cluster = opController.GetCluster() conf.removeSchedulerCb = removeSchedulerCb[0] if err := decoder(conf); err != nil { return nil, err } - return newGrantLeaderScheduler(opController, conf), nil + sche := newGrantLeaderScheduler(opController, conf) + conf.setName(sche.GetName()) + conf.setStorage(storage) + return sche, nil }) // label @@ -351,9 +371,7 @@ func schedulersRegister() { RegisterScheduler(types.ScatterRangeScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := &scatterRangeSchedulerConfig{ - storage: storage, - } + conf := &scatterRangeSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err } @@ -361,7 +379,10 @@ func schedulersRegister() { if len(rangeName) == 0 { return nil, errs.ErrSchedulerConfig.FastGenByArgs("range name") } - return newScatterRangeScheduler(opController, conf), nil + sche := newScatterRangeScheduler(opController, conf) + conf.setStorage(storage) + conf.setName(sche.GetName()) + return sche, nil }) // shuffle hot region @@ -389,8 +410,10 @@ func schedulersRegister() { if err := decoder(conf); err != nil { return nil, err } - conf.storage = storage - return newShuffleHotRegionScheduler(opController, conf), nil + sche := newShuffleHotRegionScheduler(opController, conf) + conf.setStorage(storage) + conf.setName(sche.GetName()) + return sche, nil }) // shuffle leader @@ -405,7 +428,6 @@ func schedulersRegister() { return err } conf.Ranges = ranges - conf.Name = ShuffleLeaderName return nil } }) @@ -438,11 +460,14 @@ func schedulersRegister() { RegisterScheduler(types.ShuffleRegionScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := &shuffleRegionSchedulerConfig{storage: storage} + conf := &shuffleRegionSchedulerConfig{} if err := decoder(conf); err != nil { return nil, err } - return newShuffleRegionScheduler(opController, conf), nil + sche := newShuffleRegionScheduler(opController, conf) + conf.setStorage(storage) + conf.setName(sche.GetName()) + return sche, nil }) // split bucket @@ -458,8 +483,10 @@ func schedulersRegister() { if err := decoder(conf); err != nil { return nil, err } - conf.storage = storage - return newSplitBucketScheduler(opController, conf), nil + sche := newSplitBucketScheduler(opController, conf) + conf.setStorage(storage) + conf.setName(sche.GetName()) + return sche, nil }) // transfer witness leader @@ -483,10 +510,14 @@ func schedulersRegister() { RegisterScheduler(types.EvictSlowTrendScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, _ ...func(string) error) (Scheduler, error) { - conf := initEvictSlowTrendSchedulerConfig(storage) + conf := initEvictSlowTrendSchedulerConfig() if err := decoder(conf); err != nil { return nil, err } - return newEvictSlowTrendScheduler(opController, conf), nil + + conf.setStorage(storage) + sche := newEvictSlowTrendScheduler(opController, conf) + conf.setName(sche.GetName()) + return sche, nil }) } diff --git a/pkg/schedule/schedulers/scatter_range.go b/pkg/schedule/schedulers/scatter_range.go index 9c9606b29a9..b22c13c1873 100644 --- a/pkg/schedule/schedulers/scatter_range.go +++ b/pkg/schedule/schedulers/scatter_range.go @@ -26,7 +26,6 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -39,7 +38,8 @@ const ( type scatterRangeSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig + RangeName string `json:"range-name"` StartKey string `json:"start-key"` EndKey string `json:"end-key"` @@ -69,14 +69,9 @@ func (conf *scatterRangeSchedulerConfig) clone() *scatterRangeSchedulerConfig { } func (conf *scatterRangeSchedulerConfig) persist() error { - name := conf.getSchedulerName() conf.RLock() defer conf.RUnlock() - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(name, data) + return conf.save(conf) } func (conf *scatterRangeSchedulerConfig) getRangeName() string { @@ -153,15 +148,8 @@ func (l *scatterRangeScheduler) EncodeConfig() ([]byte, error) { func (l *scatterRangeScheduler) ReloadConfig() error { l.config.Lock() defer l.config.Unlock() - cfgData, err := l.config.storage.LoadSchedulerConfig(l.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &scatterRangeSchedulerConfig{} - if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := l.config.load(newCfg); err != nil { return err } l.config.RangeName = newCfg.RangeName diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index 7fce5d9c46e..8de9b2f07c7 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -34,10 +34,9 @@ import ( // Scheduler is an interface to schedule resources. type Scheduler interface { + baseScheduler http.Handler - GetName() string - // GetType should in accordance with the name passing to RegisterScheduler() - GetType() types.CheckerSchedulerType + EncodeConfig() ([]byte, error) // ReloadConfig reloads the config from the storage. ReloadConfig() error @@ -49,6 +48,12 @@ type Scheduler interface { IsScheduleAllowed(cluster sche.SchedulerCluster) bool } +type baseScheduler interface { + GetName() string + // GetType should in accordance with the name passing to RegisterScheduler() + GetType() types.CheckerSchedulerType +} + // EncodeConfig encode the custom config for each scheduler. func EncodeConfig(v any) ([]byte, error) { marshaled, err := json.Marshal(v) diff --git a/pkg/schedule/schedulers/shuffle_hot_region.go b/pkg/schedule/schedulers/shuffle_hot_region.go index 686322961cb..9e6338d877a 100644 --- a/pkg/schedule/schedulers/shuffle_hot_region.go +++ b/pkg/schedule/schedulers/shuffle_hot_region.go @@ -28,7 +28,6 @@ import ( "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" "github.com/tikv/pd/pkg/statistics" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -42,8 +41,9 @@ const ( type shuffleHotRegionSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage - Limit uint64 `json:"limit"` + schedulerConfig + + Limit uint64 `json:"limit"` } func (conf *shuffleHotRegionSchedulerConfig) clone() *shuffleHotRegionSchedulerConfig { @@ -54,14 +54,6 @@ func (conf *shuffleHotRegionSchedulerConfig) clone() *shuffleHotRegionSchedulerC } } -func (conf *shuffleHotRegionSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(types.ShuffleHotRegionScheduler.String(), data) -} - func (conf *shuffleHotRegionSchedulerConfig) getLimit() uint64 { conf.RLock() defer conf.RUnlock() @@ -106,15 +98,8 @@ func (s *shuffleHotRegionScheduler) EncodeConfig() ([]byte, error) { func (s *shuffleHotRegionScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &shuffleHotRegionSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } s.conf.Limit = newCfg.Limit @@ -224,14 +209,16 @@ func (handler *shuffleHotRegionHandler) updateConfig(w http.ResponseWriter, r *h handler.rd.JSON(w, http.StatusBadRequest, "invalid limit") return } - handler.config.Lock() - defer handler.config.Unlock() - previous := handler.config.Limit - handler.config.Limit = uint64(limit) - err := handler.config.persistLocked() + + conf := handler.config + conf.Lock() + defer conf.Unlock() + previous := conf.Limit + conf.Limit = uint64(limit) + err := conf.save(conf) if err != nil { handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - handler.config.Limit = previous + conf.Limit = previous return } handler.rd.JSON(w, http.StatusOK, nil) diff --git a/pkg/schedule/schedulers/shuffle_leader.go b/pkg/schedule/schedulers/shuffle_leader.go index 2cd6c231a11..4270613667b 100644 --- a/pkg/schedule/schedulers/shuffle_leader.go +++ b/pkg/schedule/schedulers/shuffle_leader.go @@ -32,7 +32,6 @@ const ( ) type shuffleLeaderSchedulerConfig struct { - Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` // TODO: When we prepare to use Ranges, we will need to implement the ReloadConfig function for this scheduler. } @@ -46,11 +45,11 @@ type shuffleLeaderScheduler struct { // newShuffleLeaderScheduler creates an admin scheduler that shuffles leaders // between stores. func newShuffleLeaderScheduler(opController *operator.Controller, conf *shuffleLeaderSchedulerConfig) Scheduler { + base := NewBaseScheduler(opController, types.ShuffleLeaderScheduler) filters := []filter.Filter{ - &filter.StoreStateFilter{ActionScope: conf.Name, TransferLeader: true, OperatorLevel: constant.Low}, - filter.NewSpecialUseFilter(conf.Name), + &filter.StoreStateFilter{ActionScope: base.GetName(), TransferLeader: true, OperatorLevel: constant.Low}, + filter.NewSpecialUseFilter(base.GetName()), } - base := NewBaseScheduler(opController, types.ShuffleLeaderScheduler) return &shuffleLeaderScheduler{ BaseScheduler: base, conf: conf, diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index c179efd32c1..5d4c49e0fcc 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -67,15 +67,8 @@ func (s *shuffleRegionScheduler) EncodeConfig() ([]byte, error) { func (s *shuffleRegionScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &shuffleRegionSchedulerConfig{} - if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } s.conf.Roles = newCfg.Roles diff --git a/pkg/schedule/schedulers/shuffle_region_config.go b/pkg/schedule/schedulers/shuffle_region_config.go index fbf53cfeb4d..8b704284a48 100644 --- a/pkg/schedule/schedulers/shuffle_region_config.go +++ b/pkg/schedule/schedulers/shuffle_region_config.go @@ -21,7 +21,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/slice" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -37,7 +36,7 @@ var allRoles = []string{roleLeader, roleFollower, roleLearner} type shuffleRegionSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig Ranges []core.KeyRange `json:"ranges"` Roles []string `json:"roles"` // can include `leader`, `follower`, `learner`. @@ -100,18 +99,10 @@ func (conf *shuffleRegionSchedulerConfig) handleSetRoles(w http.ResponseWriter, defer conf.Unlock() old := conf.Roles conf.Roles = roles - if err := conf.persist(); err != nil { + if err := conf.save(conf); err != nil { conf.Roles = old // revert rd.Text(w, http.StatusInternalServerError, err.Error()) return } rd.Text(w, http.StatusOK, "Config is updated.") } - -func (conf *shuffleRegionSchedulerConfig) persist() error { - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(ShuffleRegionName, data) -} diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 7b238890107..cd26cd86ea8 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -30,7 +30,6 @@ import ( "github.com/tikv/pd/pkg/schedule/plan" types "github.com/tikv/pd/pkg/schedule/type" "github.com/tikv/pd/pkg/statistics/buckets" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/reflectutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/unrolled/render" @@ -53,7 +52,7 @@ func initSplitBucketConfig() *splitBucketSchedulerConfig { type splitBucketSchedulerConfig struct { syncutil.RWMutex - storage endpoint.ConfigStorage + schedulerConfig Degree int `json:"degree"` SplitLimit uint64 `json:"split-limit"` } @@ -66,14 +65,6 @@ func (conf *splitBucketSchedulerConfig) clone() *splitBucketSchedulerConfig { } } -func (conf *splitBucketSchedulerConfig) persistLocked() error { - data, err := EncodeConfig(conf) - if err != nil { - return err - } - return conf.storage.SaveSchedulerConfig(SplitBucketName, data) -} - func (conf *splitBucketSchedulerConfig) getDegree() int { conf.RLock() defer conf.RUnlock() @@ -120,7 +111,7 @@ func (h *splitBucketHandler) updateConfig(w http.ResponseWriter, r *http.Request } newc, _ := json.Marshal(h.conf) if !bytes.Equal(oldc, newc) { - if err := h.conf.persistLocked(); err != nil { + if err := h.conf.save(h.conf); err != nil { log.Warn("failed to save config", errs.ZapError(err)) } rd.Text(w, http.StatusOK, "Config is updated.") @@ -167,15 +158,8 @@ func newSplitBucketScheduler(opController *operator.Controller, conf *splitBucke func (s *splitBucketScheduler) ReloadConfig() error { s.conf.Lock() defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } newCfg := &splitBucketSchedulerConfig{} - if err := DecodeConfig([]byte(cfgData), newCfg); err != nil { + if err := s.conf.load(newCfg); err != nil { return err } s.conf.SplitLimit = newCfg.SplitLimit