Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#6314
Browse files Browse the repository at this point in the history
close tikv#6297, ref tikv#6328, ref tikv/tikv#14458

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
bufferflies authored and ti-chi-bot committed Jul 14, 2023
1 parent bd94a98 commit 0261305
Show file tree
Hide file tree
Showing 12 changed files with 575 additions and 53 deletions.
103 changes: 103 additions & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package config

import (
"sync"
"time"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
)

// RejectLeader is the label property type that suggests a store should not
// have any region leaders.
const RejectLeader = "reject-leader"

var schedulerMap sync.Map

// RegisterScheduler registers the scheduler type.
func RegisterScheduler(typ string) {
schedulerMap.Store(typ, struct{}{})
}

// IsSchedulerRegistered checks if the named scheduler type is registered.
func IsSchedulerRegistered(name string) bool {
_, ok := schedulerMap.Load(name)
return ok
}

// Config is the interface that wraps the Config related methods.
type Config interface {
GetReplicaScheduleLimit() uint64
GetRegionScheduleLimit() uint64
GetMergeScheduleLimit() uint64
GetLeaderScheduleLimit() uint64
GetHotRegionScheduleLimit() uint64
GetWitnessScheduleLimit() uint64

GetHotRegionCacheHitsThreshold() int
GetMaxMovableHotPeerSize() int64
IsTraceRegionFlow() bool

GetSplitMergeInterval() time.Duration
GetMaxMergeRegionSize() uint64
GetMaxMergeRegionKeys() uint64
GetKeyType() constant.KeyType
IsOneWayMergeEnabled() bool
IsCrossTableMergeEnabled() bool

IsPlacementRulesEnabled() bool
IsPlacementRulesCacheEnabled() bool

GetMaxReplicas() int
GetPatrolRegionInterval() time.Duration
GetMaxStoreDownTime() time.Duration
GetLocationLabels() []string
GetIsolationLevel() string
IsReplaceOfflineReplicaEnabled() bool
IsMakeUpReplicaEnabled() bool
IsRemoveExtraReplicaEnabled() bool
IsLocationReplacementEnabled() bool
IsRemoveDownReplicaEnabled() bool

GetSwitchWitnessInterval() time.Duration
IsWitnessAllowed() bool

GetLowSpaceRatio() float64
GetHighSpaceRatio() float64
GetTolerantSizeRatio() float64
GetLeaderSchedulePolicy() constant.SchedulePolicy
GetRegionScoreFormulaVersion() string

GetMaxSnapshotCount() uint64
GetMaxPendingPeerCount() uint64
GetSchedulerMaxWaitingOperator() uint64
GetStoreLimitByType(uint64, storelimit.Type) float64
SetAllStoresLimit(storelimit.Type, float64)
GetSlowStoreEvictingAffectedStoreRatioThreshold() float64
IsUseJointConsensus() bool
CheckLabelProperty(string, []*metapb.StoreLabel) bool
IsDebugMetricsEnabled() bool
GetClusterVersion() *semver.Version
GetStoreLimitVersion() string
// for test purpose
SetPlacementRuleEnabled(bool)
SetSplitMergeInterval(time.Duration)
SetMaxReplicas(int)
SetPlacementRulesCacheEnabled(bool)
SetWitnessEnabled(bool)
// only for store configuration
UseRaftV2()
}

// StoreConfig is the interface that wraps the StoreConfig related methods.
type StoreConfig interface {
GetRegionMaxSize() uint64
CheckRegionSize(uint64, uint64) error
CheckRegionKeys(uint64, uint64) error
IsEnableRegionBucket() bool
IsRaftKV2() bool
// for test purpose
SetRegionBucketEnabled(bool)
}
19 changes: 19 additions & 0 deletions server/config/store_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,25 @@ func (c *StoreConfig) IsEnableRegionBucket() bool {
return c.Coprocessor.EnableRegionBucket
}

<<<<<<< HEAD
=======
// IsRaftKV2 returns true if the raft kv is v2.
func (c *StoreConfig) IsRaftKV2() bool {
if c == nil {
return false
}
return c.Storage.Engine == raftStoreV2
}

// SetRegionBucketEnabled sets if the region bucket is enabled.
func (c *StoreConfig) SetRegionBucketEnabled(enabled bool) {
if c == nil {
return
}
c.Coprocessor.EnableRegionBucket = enabled
}

>>>>>>> 4f87e9da8 (scheduler: cache history loads in hot region scheduler (#6314))
// GetRegionBucketSize returns region bucket size if enable region buckets.
func (c *StoreConfig) GetRegionBucketSize() uint64 {
if c == nil || !c.Coprocessor.EnableRegionBucket {
Expand Down
5 changes: 5 additions & 0 deletions server/config/store_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func TestParseConfig(t *testing.T) {
re.NoError(json.Unmarshal([]byte(body), &config))
m.update(&config)
re.Equal(uint64(96), config.GetRegionBucketSize())
<<<<<<< HEAD
=======
re.True(config.IsRaftKV2())
re.Equal(raftStoreV2, config.Storage.Engine)
>>>>>>> 4f87e9da8 (scheduler: cache history loads in hot region scheduler (#6314))
}

func TestMergeCheck(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions server/core/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ const (
LeaderKind ResourceKind = iota
// RegionKind indicates the region kind resource
RegionKind
<<<<<<< HEAD:server/core/kind.go
=======
// WitnessKind indicates the witness kind resource
WitnessKind

// ResourceKindLen represents the ResourceKind count
ResourceKindLen
>>>>>>> 4f87e9da8 (scheduler: cache history loads in hot region scheduler (#6314)):pkg/core/constant/kind.go
)

func (k ResourceKind) String() string {
Expand Down
94 changes: 81 additions & 13 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ type baseHotScheduler struct {
stInfos map[uint64]*statistics.StoreSummaryInfo
// temporary states but exported to API or metrics
// Every time `Schedule()` will recalculate it.
stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail
stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail
stHistoryLoads *statistics.StoreHistoryLoads
// temporary states
// Every time `Schedule()` will recalculate it.
storesLoads map[uint64][]float64
Expand All @@ -63,6 +64,7 @@ func newBaseHotScheduler(opController *schedule.OperatorController) *baseHotSche
BaseScheduler: base,
types: []statistics.RWType{statistics.Write, statistics.Read},
regionPendings: make(map[uint64]*pendingInfluence),
stHistoryLoads: statistics.NewStoreHistoryLoads(statistics.DimLen),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
Expand All @@ -84,6 +86,7 @@ func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sched
h.stLoadInfos[ty] = statistics.SummaryStoresLoad(
h.stInfos,
h.storesLoads,
h.stHistoryLoads,
regionStats,
isTraceRegionFlow,
rw, resource)
Expand Down Expand Up @@ -257,7 +260,6 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster)
if h.conf.IsForbidRWType(typ) {
return nil
}

switch typ {
case statistics.Read:
return h.balanceHotReadRegions(cluster)
Expand Down Expand Up @@ -441,6 +443,8 @@ type balanceSolver struct {
minorDecRatio float64
maxPeerNum int
minHotDegree int
// todo: remove this after testing more scene in the single rocksdb
isRaftKV2 bool

firstPriorityV2Ratios *rankV2Ratios
secondPriorityV2Ratios *rankV2Ratios
Expand All @@ -454,6 +458,7 @@ type balanceSolver struct {
betterThan func(*solution) bool
rankToDimString func() string
checkByPriorityAndTolerance func(loads []float64, f func(int) bool) bool
checkHistoryLoadsByPriority func(loads [][]float64, f func(int) bool) bool
}

func (bs *balanceSolver) init() {
Expand Down Expand Up @@ -494,6 +499,7 @@ func (bs *balanceSolver) init() {
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetOpts().GetHotRegionCacheHitsThreshold()
bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2()

switch bs.sche.conf.GetRankFormulaVersion() {
case "v1":
Expand All @@ -518,10 +524,13 @@ func (bs *balanceSolver) pickCheckPolicyV1() {
switch {
case bs.resourceTy == writeLeader:
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly
bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceFirstOnly
case bs.sche.conf.IsStrictPickingStoreEnabled():
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAllOf
bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceAllOf
default:
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly
bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceFirstOnly
}
}

Expand Down Expand Up @@ -590,7 +599,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
if !bs.isValid() {
return nil
}

bs.cur = &solution{}
tryUpdateBestSolution := func() {
if label, ok := bs.filterUniformStore(); ok {
Expand Down Expand Up @@ -742,12 +750,20 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai
continue
}

if bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc()
if !bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("src-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
// only raftkv2 needs to check the history loads.
if bs.isRaftKV2 {
if !bs.checkSrcHistoryLoadsByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, srcToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("src-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
}

ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
}
return ret
}
Expand All @@ -758,6 +774,17 @@ func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *sta
})
}

func (bs *balanceSolver) checkSrcHistoryLoadsByPriorityAndTolerance(current, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool {
if len(current.HistoryLoads) == 0 {
return true
}
return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool {
return slice.AllOf(current.HistoryLoads[i], func(j int) bool {
return current.HistoryLoads[i][j] > toleranceRatio*expectLoad.HistoryLoads[i][j]
})
})
}

// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
// The returned hotPeer count in controlled by `max-peer-number`.
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) {
Expand Down Expand Up @@ -956,12 +983,20 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st
}
if filter.Target(bs.GetOpts(), store, filters) {
id := store.GetID()
if bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
hotSchedulerResultCounter.WithLabelValues("dst-store-failed", strconv.FormatUint(id, 10)).Inc()
if !bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("dst-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
// only raftkv2 needs to check history loads
if bs.isRaftKV2 {
if !bs.checkDstHistoryLoadsByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, dstToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("dst-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
}

hotSchedulerResultCounter.WithLabelValues("dst-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
ret[id] = detail
}
}
return ret
Expand All @@ -973,6 +1008,17 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist
})
}

func (bs *balanceSolver) checkDstHistoryLoadsByPriorityAndTolerance(current, expect *statistics.StoreLoad, toleranceRatio float64) bool {
if len(current.HistoryLoads) == 0 {
return true
}
return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool {
return slice.AllOf(current.HistoryLoads[i], func(j int) bool {
return current.HistoryLoads[i][j]*toleranceRatio < expect.HistoryLoads[i][j]
})
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f func(int) bool) bool {
return slice.AllOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
Expand All @@ -982,6 +1028,15 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f fun
})
}

func (bs *balanceSolver) checkHistoryLoadsByPriorityAndToleranceAllOf(loads [][]float64, f func(int) bool) bool {
return slice.AllOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
return f(i)
}
return true
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f func(int) bool) bool {
return slice.AnyOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
Expand All @@ -991,10 +1046,23 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f fun
})
}

func (bs *balanceSolver) checkHistoryByPriorityAndToleranceAnyOf(loads [][]float64, f func(int) bool) bool {
return slice.AnyOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
return f(i)
}
return false
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceFirstOnly(loads []float64, f func(int) bool) bool {
return f(bs.firstPriority)
}

func (bs *balanceSolver) checkHistoryLoadsByPriorityAndToleranceFirstOnly(_ [][]float64, f func(int) bool) bool {
return f(bs.firstPriority)
}

func (bs *balanceSolver) enableExpectation() bool {
return bs.sche.conf.GetDstToleranceRatio() > 0 && bs.sche.conf.GetSrcToleranceRatio() > 0
}
Expand Down
Loading

0 comments on commit 0261305

Please sign in to comment.