Skip to content

Commit

Permalink
exact schedulerConfig interface
Browse files Browse the repository at this point in the history
Signed-off-by: okJiang <[email protected]>
  • Loading branch information
okJiang committed Aug 12, 2024
1 parent 92adb24 commit b3ed767
Show file tree
Hide file tree
Showing 20 changed files with 219 additions and 307 deletions.
26 changes: 6 additions & 20 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/reflectutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand All @@ -56,8 +55,9 @@ const (

type balanceLeaderSchedulerConfig struct {
syncutil.RWMutex
storage endpoint.ConfigStorage
Ranges []core.KeyRange `json:"ranges"`
schedulerConfig

Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
}
Expand All @@ -79,7 +79,7 @@ func (conf *balanceLeaderSchedulerConfig) update(data []byte) (int, any) {
}
return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10"
}
if err := conf.persistLocked(); err != nil {
if err := conf.schedulerConfig.save(conf); err != nil {
log.Warn("failed to save balance-leader-scheduler config", errs.ZapError(err))
}
log.Info("balance-leader-scheduler config is updated", zap.ByteString("old", oldConfig), zap.ByteString("new", newConfig))
Expand Down Expand Up @@ -111,14 +111,6 @@ func (conf *balanceLeaderSchedulerConfig) clone() *balanceLeaderSchedulerConfig
}
}

func (conf *balanceLeaderSchedulerConfig) persistLocked() error {
data, err := EncodeConfig(conf)
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(BalanceLeaderName, data)
}

func (conf *balanceLeaderSchedulerConfig) getBatch() int {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -216,15 +208,9 @@ func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) {
func (l *balanceLeaderScheduler) ReloadConfig() error {
l.conf.Lock()
defer l.conf.Unlock()
cfgData, err := l.conf.storage.LoadSchedulerConfig(l.GetName())
if err != nil {
return err
}
if len(cfgData) == 0 {
return nil
}

newCfg := &balanceLeaderSchedulerConfig{}
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
if err := l.conf.load(newCfg); err != nil {
return err
}
l.conf.Ranges = newCfg.Ranges
Expand Down
26 changes: 6 additions & 20 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/reflectutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/unrolled/render"
Expand All @@ -53,8 +52,9 @@ const (

type balanceWitnessSchedulerConfig struct {
syncutil.RWMutex
storage endpoint.ConfigStorage
Ranges []core.KeyRange `json:"ranges"`
schedulerConfig

Ranges []core.KeyRange `json:"ranges"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
}
Expand All @@ -76,7 +76,7 @@ func (conf *balanceWitnessSchedulerConfig) update(data []byte) (int, any) {
}
return http.StatusBadRequest, "invalid batch size which should be an integer between 1 and 10"
}
if err := conf.persistLocked(); err != nil {
if err := conf.schedulerConfig.save(conf); err != nil {
log.Warn("failed to persist config", zap.Error(err))
}
log.Info("balance-witness-scheduler config is updated", zap.ByteString("old", oldc), zap.ByteString("new", newc))
Expand Down Expand Up @@ -108,14 +108,6 @@ func (conf *balanceWitnessSchedulerConfig) clone() *balanceWitnessSchedulerConfi
}
}

func (conf *balanceWitnessSchedulerConfig) persistLocked() error {
data, err := EncodeConfig(conf)
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(BalanceWitnessName, data)
}

func (conf *balanceWitnessSchedulerConfig) getBatch() int {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -215,15 +207,9 @@ func (b *balanceWitnessScheduler) EncodeConfig() ([]byte, error) {
func (b *balanceWitnessScheduler) ReloadConfig() error {
b.conf.Lock()
defer b.conf.Unlock()
cfgData, err := b.conf.storage.LoadSchedulerConfig(b.GetName())
if err != nil {
return err
}
if len(cfgData) == 0 {
return nil
}

newCfg := &balanceWitnessSchedulerConfig{}
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
if err := b.conf.load(newCfg); err != nil {
return err
}
b.conf.Ranges = newCfg.Ranges
Expand Down
63 changes: 63 additions & 0 deletions pkg/schedule/schedulers/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schedulers

import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/tikv/pd/pkg/storage/endpoint"
)

type schedulerConfig interface {
save(any) error
load(any) error
setStorage(endpoint.ConfigStorage)
setName(string)
}

type baseSchedulerConfig struct {

Check failure on line 30 in pkg/schedule/schedulers/config.go

View workflow job for this annotation

GitHub Actions / statics

type `baseSchedulerConfig` is unused (unused)
name string
storage endpoint.ConfigStorage
}

func (b *baseSchedulerConfig) setStorage(storage endpoint.ConfigStorage) {

Check failure on line 35 in pkg/schedule/schedulers/config.go

View workflow job for this annotation

GitHub Actions / statics

func `(*baseSchedulerConfig).setStorage` is unused (unused)
b.storage = storage
}

func (b *baseSchedulerConfig) setName(name string) {

Check failure on line 39 in pkg/schedule/schedulers/config.go

View workflow job for this annotation

GitHub Actions / statics

func `(*baseSchedulerConfig).setName` is unused (unused)
b.name = name
}

func (b *baseSchedulerConfig) save(v any) error {

Check failure on line 43 in pkg/schedule/schedulers/config.go

View workflow job for this annotation

GitHub Actions / statics

func `(*baseSchedulerConfig).save` is unused (unused)
data, err := EncodeConfig(v)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
})
if err != nil {
return err
}
return b.storage.SaveSchedulerConfig(b.name, data)
}

func (b *baseSchedulerConfig) load(v any) error {

Check failure on line 54 in pkg/schedule/schedulers/config.go

View workflow job for this annotation

GitHub Actions / statics

func `(*baseSchedulerConfig).load` is unused (unused)
data, err := b.storage.LoadSchedulerConfig(b.name)
if err != nil {
return err
}
if err = DecodeConfig([]byte(data), v); err != nil {

Check failure on line 59 in pkg/schedule/schedulers/config.go

View workflow job for this annotation

GitHub Actions / statics

if-return: redundant if ...; err != nil check, just return error instead. (revive)
return err
}
return nil
}
32 changes: 9 additions & 23 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/unrolled/render"
Expand All @@ -48,7 +47,8 @@ const (

type evictLeaderSchedulerConfig struct {
syncutil.RWMutex
storage endpoint.ConfigStorage
schedulerConfig

StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
Expand Down Expand Up @@ -85,17 +85,6 @@ func (conf *evictLeaderSchedulerConfig) clone() *evictLeaderSchedulerConfig {
}
}

func (conf *evictLeaderSchedulerConfig) persistLocked() error {
data, err := EncodeConfig(conf)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
})
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(types.EvictLeaderScheduler.String(), data)
}

func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -148,15 +137,8 @@ func (conf *evictLeaderSchedulerConfig) encodeConfig() ([]byte, error) {
func (conf *evictLeaderSchedulerConfig) reloadConfig(name string) error {

Check failure on line 137 in pkg/schedule/schedulers/evict_leader.go

View workflow job for this annotation

GitHub Actions / statics

unused-parameter: parameter 'name' seems to be unused, consider removing or renaming it to match ^_ (revive)
conf.Lock()
defer conf.Unlock()
cfgData, err := conf.storage.LoadSchedulerConfig(name)
if err != nil {
return err
}
if len(cfgData) == 0 {
return nil
}
newCfg := &evictLeaderSchedulerConfig{}
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
if err := conf.load(newCfg); err != nil {
return err
}
pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
Expand Down Expand Up @@ -203,7 +185,11 @@ func (conf *evictLeaderSchedulerConfig) update(id uint64, newRanges []core.KeyRa
conf.StoreIDWithRanges[id] = newRanges
}
conf.Batch = batch
err := conf.persistLocked()
var err error
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
})
err = conf.save(conf)
if err != nil && id != 0 {
_, _ = conf.removeStoreLocked(id)
}
Expand All @@ -220,7 +206,7 @@ func (conf *evictLeaderSchedulerConfig) delete(id uint64) (any, error) {
}

keyRanges := conf.StoreIDWithRanges[id]
err = conf.persistLocked()
err = conf.save(conf)
if err != nil {
conf.resetStoreLocked(id, keyRanges)
conf.Unlock()
Expand Down
38 changes: 12 additions & 26 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/unrolled/render"
Expand All @@ -44,18 +43,18 @@ const (

type evictSlowStoreSchedulerConfig struct {
syncutil.RWMutex
schedulerConfig

cluster *core.BasicCluster
storage endpoint.ConfigStorage
// Last timestamp of the chosen slow store for eviction.
lastSlowStoreCaptureTS time.Time
// Duration gap for recovering the candidate, unit: s.
RecoveryDurationGap uint64 `json:"recovery-duration"`
EvictedStores []uint64 `json:"evict-stores"`
}

func initEvictSlowStoreSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowStoreSchedulerConfig {
func initEvictSlowStoreSchedulerConfig() *evictSlowStoreSchedulerConfig {
return &evictSlowStoreSchedulerConfig{
storage: storage,
lastSlowStoreCaptureTS: time.Time{},
RecoveryDurationGap: defaultRecoveryDurationGap,
EvictedStores: make([]uint64, 0),
Expand All @@ -70,17 +69,6 @@ func (conf *evictSlowStoreSchedulerConfig) clone() *evictSlowStoreSchedulerConfi
}
}

func (conf *evictSlowStoreSchedulerConfig) persistLocked() error {
data, err := EncodeConfig(conf)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
})
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(types.EvictSlowStoreScheduler.String(), data)
}

func (conf *evictSlowStoreSchedulerConfig) getStores() []uint64 {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -121,7 +109,11 @@ func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error {
defer conf.Unlock()
conf.EvictedStores = []uint64{id}
conf.lastSlowStoreCaptureTS = time.Now()
return conf.persistLocked()
return conf.persist()
}

func (conf *evictSlowStoreSchedulerConfig) persist() error {
return conf.schedulerConfig.save(conf)
}

func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err error) {
Expand All @@ -131,7 +123,7 @@ func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err
if oldID > 0 {
conf.EvictedStores = []uint64{}
conf.lastSlowStoreCaptureTS = time.Time{}
err = conf.persistLocked()
err = conf.persist()
}
return
}
Expand Down Expand Up @@ -167,7 +159,7 @@ func (handler *evictSlowStoreHandler) updateConfig(w http.ResponseWriter, r *htt
prevRecoveryDurationGap := handler.config.RecoveryDurationGap
recoveryDurationGap := uint64(recoveryDurationGapFloat)
handler.config.RecoveryDurationGap = recoveryDurationGap
if err := handler.config.persistLocked(); err != nil {
if err := handler.config.persist(); err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
handler.config.RecoveryDurationGap = prevRecoveryDurationGap
return
Expand Down Expand Up @@ -201,15 +193,9 @@ func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) {
func (s *evictSlowStoreScheduler) ReloadConfig() error {
s.conf.Lock()
defer s.conf.Unlock()
cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName())
if err != nil {
return err
}
if len(cfgData) == 0 {
return nil
}

newCfg := &evictSlowStoreSchedulerConfig{}
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
if err := s.conf.load(newCfg); err != nil {
return err
}
old := make(map[uint64]struct{})
Expand Down
14 changes: 2 additions & 12 deletions pkg/schedule/schedulers/evict_slow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package schedulers

import (
"context"
"encoding/json"
"strings"
"testing"

"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -103,18 +101,10 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStore() {
re.Zero(es2.conf.evictStore())

// check the value from storage.
sches, vs, err := es2.conf.storage.LoadAllSchedulerConfigs()
re.NoError(err)
valueStr := ""
for id, sche := range sches {
if strings.EqualFold(sche, EvictSlowStoreName) {
valueStr = vs[id]
}
}

var persistValue evictSlowStoreSchedulerConfig
err = json.Unmarshal([]byte(valueStr), &persistValue)
err := es2.conf.load(&persistValue)
re.NoError(err)

re.Equal(es2.conf.EvictedStores, persistValue.EvictedStores)
re.Zero(persistValue.evictStore())
re.True(persistValue.readyForRecovery())
Expand Down
Loading

0 comments on commit b3ed767

Please sign in to comment.