Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#6827
Browse files Browse the repository at this point in the history
close tikv#6645

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lhy1024 authored and ti-chi-bot committed Jul 31, 2023
1 parent f8bf1d7 commit 80ed204
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 29 deletions.
90 changes: 65 additions & 25 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
)

var (
topnPosition = 10
statisticsInterval = time.Second
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule")
Expand Down Expand Up @@ -438,12 +439,23 @@ func isAvailableV1(s *solution) bool {
}

type balanceSolver struct {
<<<<<<< HEAD
schedule.Cluster
sche *hotScheduler
stLoadDetail map[uint64]*statistics.StoreLoadDetail
rwTy statistics.RWType
opTy opType
resourceTy resourceType
=======
sche.SchedulerCluster
sche *hotScheduler
stLoadDetail map[uint64]*statistics.StoreLoadDetail
filteredHotPeers map[uint64][]*statistics.HotPeerStat // storeID -> hotPeers(filtered)
nthHotPeer map[uint64][]*statistics.HotPeerStat // storeID -> [dimLen]hotPeers
rwTy statistics.RWType
opTy opType
resourceTy resourceType
>>>>>>> 16926ad89 (scheduler: make hot v2 more suitable small hot region (#6827))

cur *solution

Expand Down Expand Up @@ -482,8 +494,21 @@ type balanceSolver struct {
}

func (bs *balanceSolver) init() {
// Init store load detail according to the type.
// Load the configuration items of the scheduler.
bs.resourceTy = toResourceType(bs.rwTy, bs.opTy)
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetSchedulerConfig().GetHotRegionCacheHitsThreshold()
bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities())
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2()
switch bs.sche.conf.GetRankFormulaVersion() {
case "v1":
bs.initRankV1()
default:
bs.initRankV2()
}

// Init store load detail according to the type.
bs.stLoadDetail = bs.sche.stLoadInfos[bs.resourceTy]

bs.maxSrc = &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)}
Expand All @@ -496,10 +521,14 @@ func (bs *balanceSolver) init() {
}
maxCur := &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)}

bs.filteredHotPeers = make(map[uint64][]*statistics.HotPeerStat)
bs.nthHotPeer = make(map[uint64][]*statistics.HotPeerStat)
for _, detail := range bs.stLoadDetail {
bs.maxSrc = statistics.MaxLoad(bs.maxSrc, detail.LoadPred.Min())
bs.minDst = statistics.MinLoad(bs.minDst, detail.LoadPred.Max())
maxCur = statistics.MaxLoad(maxCur, &detail.LoadPred.Current)
bs.nthHotPeer[detail.GetID()] = make([]*statistics.HotPeerStat, statistics.DimLen)
bs.filteredHotPeers[detail.GetID()] = bs.filterHotPeers(detail)
}

rankStepRatios := []float64{
Expand All @@ -514,6 +543,7 @@ func (bs *balanceSolver) init() {
Loads: stepLoads,
Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(),
}
<<<<<<< HEAD

bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities())
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
Expand All @@ -527,6 +557,8 @@ func (bs *balanceSolver) init() {
default:
bs.initRankV2()
}
=======
>>>>>>> 16926ad89 (scheduler: make hot v2 more suitable small hot region (#6827))
}

func (bs *balanceSolver) initRankV1() {
Expand Down Expand Up @@ -649,7 +681,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore
srcStoreID := srcStore.GetID()
for _, mainPeerStat := range bs.filterHotPeers(srcStore) {
for _, mainPeerStat := range bs.filteredHotPeers[srcStoreID] {
if bs.cur.region = bs.getRegion(mainPeerStat, srcStoreID); bs.cur.region == nil {
continue
} else if bs.opTy == movePeer {
Expand All @@ -667,7 +699,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
if bs.needSearchRevertRegions() {
hotSchedulerSearchRevertRegionsCounter.Inc()
dstStoreID := dstStore.GetID()
for _, revertPeerStat := range bs.filterHotPeers(bs.cur.dstStore) {
for _, revertPeerStat := range bs.filteredHotPeers[dstStoreID] {
revertRegion := bs.getRegion(revertPeerStat, dstStoreID)
if revertRegion == nil || revertRegion.GetID() == bs.cur.region.GetID() ||
!allowRevertRegion(revertRegion, srcStoreID) {
Expand Down Expand Up @@ -834,44 +866,52 @@ func (bs *balanceSolver) checkSrcHistoryLoadsByPriorityAndTolerance(current, exp

// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
// The returned hotPeer count in controlled by `max-peer-number`.
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) {
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) []*statistics.HotPeerStat {
hotPeers := storeLoad.HotPeers
ret := make([]*statistics.HotPeerStat, 0, len(hotPeers))
appendItem := func(item *statistics.HotPeerStat) {
if _, ok := bs.sche.regionPendings[item.ID()]; !ok && !item.IsNeedCoolDownTransferLeader(bs.minHotDegree, bs.rwTy) {
// no in pending operator and no need cool down after transfer leader
ret = append(ret, item)
}
}

src := storeLoad.HotPeers
// At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
if len(src) <= bs.maxPeerNum {
ret = make([]*statistics.HotPeerStat, 0, len(src))
for _, peer := range src {
appendItem(peer)
}
} else {
union := bs.sortHotPeers(src)
var firstSort, secondSort []*statistics.HotPeerStat
if len(hotPeers) >= topnPosition || len(hotPeers) > bs.maxPeerNum {
firstSort = make([]*statistics.HotPeerStat, len(hotPeers))
copy(firstSort, hotPeers)
sort.Slice(firstSort, func(i, j int) bool {
return firstSort[i].GetLoad(bs.firstPriority) > firstSort[j].GetLoad(bs.firstPriority)
})
secondSort = make([]*statistics.HotPeerStat, len(hotPeers))
copy(secondSort, hotPeers)
sort.Slice(secondSort, func(i, j int) bool {
return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority)
})
}
if len(hotPeers) >= topnPosition {
storeID := storeLoad.GetID()
bs.nthHotPeer[storeID][bs.firstPriority] = firstSort[topnPosition-1]
bs.nthHotPeer[storeID][bs.secondPriority] = secondSort[topnPosition-1]
}
if len(hotPeers) > bs.maxPeerNum {
union := bs.sortHotPeers(firstSort, secondSort)
ret = make([]*statistics.HotPeerStat, 0, len(union))
for peer := range union {
appendItem(peer)
}
return ret
}

return
for _, peer := range hotPeers {
appendItem(peer)
}
return ret
}

func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} {
firstSort := make([]*statistics.HotPeerStat, len(ret))
copy(firstSort, ret)
sort.Slice(firstSort, func(i, j int) bool {
return firstSort[i].GetLoad(bs.firstPriority) > firstSort[j].GetLoad(bs.firstPriority)
})
secondSort := make([]*statistics.HotPeerStat, len(ret))
copy(secondSort, ret)
sort.Slice(secondSort, func(i, j int) bool {
return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority)
})
func (bs *balanceSolver) sortHotPeers(firstSort, secondSort []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} {
union := make(map[*statistics.HotPeerStat]struct{}, bs.maxPeerNum)
// At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
for len(union) < bs.maxPeerNum {
for len(firstSort) > 0 {
peer := firstSort[0]
Expand Down
11 changes: 7 additions & 4 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1834,20 +1834,23 @@ func TestHotCacheSortHotPeer(t *testing.T) {
},
}}

st := &statistics.StoreLoadDetail{
HotPeers: hotPeers,
}
leaderSolver.maxPeerNum = 1
u := leaderSolver.sortHotPeers(hotPeers)
u := leaderSolver.filterHotPeers(st)
checkSortResult(re, []uint64{1}, u)

leaderSolver.maxPeerNum = 2
u = leaderSolver.sortHotPeers(hotPeers)
u = leaderSolver.filterHotPeers(st)
checkSortResult(re, []uint64{1, 2}, u)
}

func checkSortResult(re *require.Assertions, regions []uint64, hotPeers map[*statistics.HotPeerStat]struct{}) {
func checkSortResult(re *require.Assertions, regions []uint64, hotPeers []*statistics.HotPeerStat) {
re.Equal(len(hotPeers), len(regions))
for _, region := range regions {
in := false
for hotPeer := range hotPeers {
for _, hotPeer := range hotPeers {
if hotPeer.RegionID == region {
in = true
break
Expand Down
7 changes: 7 additions & 0 deletions pkg/schedule/schedulers/hot_region_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,17 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int {
srcPendingRate, dstPendingRate := bs.cur.getPendingLoad(dim)
peersRate := bs.cur.getPeersRateFromCache(dim)
highRate, lowRate := srcRate, dstRate
topnHotPeer := bs.nthHotPeer[bs.cur.srcStore.GetID()][dim]
reverse := false
if srcRate < dstRate {
highRate, lowRate = dstRate, srcRate
peersRate = -peersRate
reverse = true
topnHotPeer = bs.nthHotPeer[bs.cur.dstStore.GetID()][dim]
}
topnRate := math.MaxFloat64
if topnHotPeer != nil {
topnRate = topnHotPeer.GetLoad(dim)
}

if highRate*rs.balancedCheckRatio <= lowRate {
Expand Down Expand Up @@ -262,6 +268,7 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int {
// maxBetterRate may be less than minBetterRate, in which case a positive fraction cannot be produced.
minNotWorsenedRate = -bs.getMinRate(dim)
minBetterRate = math.Min(minBalancedRate*rs.perceivedRatio, lowRate*rs.minHotRatio)
minBetterRate = math.Min(minBetterRate, topnRate)
maxBetterRate = maxBalancedRate + (highRate-lowRate-minBetterRate-maxBalancedRate)*rs.perceivedRatio
maxNotWorsenedRate = maxBalancedRate + (highRate-lowRate-minNotWorsenedRate-maxBalancedRate)*rs.perceivedRatio
}
Expand Down
Loading

0 comments on commit 80ed204

Please sign in to comment.