diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index 9716cac8449..b562df305e7 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -44,6 +44,7 @@ import ( ) var ( + topnPosition = 10 statisticsInterval = time.Second // WithLabelValues is a heavy operation, define variable to avoid call it every time. hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule") @@ -159,6 +160,9 @@ func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sched // It makes each dim rate or count become `weight` times to the origin value. func (h *baseHotScheduler) summaryPendingInfluence(cluster schedule.Cluster) { for id, p := range h.regionPendings { + if p.op == nil { + continue + } from := h.stInfos[p.from] to := h.stInfos[p.to] maxZombieDur := p.maxZombieDuration @@ -439,11 +443,13 @@ func isAvailableV1(s *solution) bool { type balanceSolver struct { schedule.Cluster - sche *hotScheduler - stLoadDetail map[uint64]*statistics.StoreLoadDetail - rwTy statistics.RWType - opTy opType - resourceTy resourceType + sche *hotScheduler + stLoadDetail map[uint64]*statistics.StoreLoadDetail + filteredHotPeers map[uint64][]*statistics.HotPeerStat // storeID -> hotPeers(filtered) + nthHotPeer map[uint64][]*statistics.HotPeerStat // storeID -> [dimLen]hotPeers + rwTy statistics.RWType + opTy opType + resourceTy resourceType cur *solution @@ -482,8 +488,21 @@ type balanceSolver struct { } func (bs *balanceSolver) init() { - // Init store load detail according to the type. + // Load the configuration items of the scheduler. bs.resourceTy = toResourceType(bs.rwTy, bs.opTy) + bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber() + bs.minHotDegree = bs.GetOpts().GetHotRegionCacheHitsThreshold() + bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities()) + bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio() + bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2() + switch bs.sche.conf.GetRankFormulaVersion() { + case "v1": + bs.initRankV1() + default: + bs.initRankV2() + } + + // Init store load detail according to the type. bs.stLoadDetail = bs.sche.stLoadInfos[bs.resourceTy] bs.maxSrc = &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)} @@ -496,10 +515,14 @@ func (bs *balanceSolver) init() { } maxCur := &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)} + bs.filteredHotPeers = make(map[uint64][]*statistics.HotPeerStat) + bs.nthHotPeer = make(map[uint64][]*statistics.HotPeerStat) for _, detail := range bs.stLoadDetail { bs.maxSrc = statistics.MaxLoad(bs.maxSrc, detail.LoadPred.Min()) bs.minDst = statistics.MinLoad(bs.minDst, detail.LoadPred.Max()) maxCur = statistics.MaxLoad(maxCur, &detail.LoadPred.Current) + bs.nthHotPeer[detail.GetID()] = make([]*statistics.HotPeerStat, statistics.DimLen) + bs.filteredHotPeers[detail.GetID()] = bs.filterHotPeers(detail) } rankStepRatios := []float64{ @@ -514,19 +537,6 @@ func (bs *balanceSolver) init() { Loads: stepLoads, Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(), } - - bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities()) - bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio() - bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber() - bs.minHotDegree = bs.GetOpts().GetHotRegionCacheHitsThreshold() - bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2() - - switch bs.sche.conf.GetRankFormulaVersion() { - case "v1": - bs.initRankV1() - default: - bs.initRankV2() - } } func (bs *balanceSolver) initRankV1() { @@ -649,7 +659,7 @@ func (bs *balanceSolver) solve() []*operator.Operator { for _, srcStore := range bs.filterSrcStores() { bs.cur.srcStore = srcStore srcStoreID := srcStore.GetID() - for _, mainPeerStat := range bs.filterHotPeers(srcStore) { + for _, mainPeerStat := range bs.filteredHotPeers[srcStoreID] { if bs.cur.region = bs.getRegion(mainPeerStat, srcStoreID); bs.cur.region == nil { continue } else if bs.opTy == movePeer { @@ -667,7 +677,7 @@ func (bs *balanceSolver) solve() []*operator.Operator { if bs.needSearchRevertRegions() { hotSchedulerSearchRevertRegionsCounter.Inc() dstStoreID := dstStore.GetID() - for _, revertPeerStat := range bs.filterHotPeers(bs.cur.dstStore) { + for _, revertPeerStat := range bs.filteredHotPeers[dstStoreID] { revertRegion := bs.getRegion(revertPeerStat, dstStoreID) if revertRegion == nil || revertRegion.GetID() == bs.cur.region.GetID() || !allowRevertRegion(revertRegion, srcStoreID) { @@ -834,7 +844,9 @@ func (bs *balanceSolver) checkSrcHistoryLoadsByPriorityAndTolerance(current, exp // filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status. // The returned hotPeer count in controlled by `max-peer-number`. -func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) { +func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) []*statistics.HotPeerStat { + hotPeers := storeLoad.HotPeers + ret := make([]*statistics.HotPeerStat, 0, len(hotPeers)) appendItem := func(item *statistics.HotPeerStat) { if _, ok := bs.sche.regionPendings[item.ID()]; !ok && !item.IsNeedCoolDownTransferLeader(bs.minHotDegree, bs.rwTy) { // no in pending operator and no need cool down after transfer leader @@ -842,36 +854,42 @@ func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) ( } } - src := storeLoad.HotPeers - // At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow. - if len(src) <= bs.maxPeerNum { - ret = make([]*statistics.HotPeerStat, 0, len(src)) - for _, peer := range src { - appendItem(peer) - } - } else { - union := bs.sortHotPeers(src) + var firstSort, secondSort []*statistics.HotPeerStat + if len(hotPeers) >= topnPosition || len(hotPeers) > bs.maxPeerNum { + firstSort = make([]*statistics.HotPeerStat, len(hotPeers)) + copy(firstSort, hotPeers) + sort.Slice(firstSort, func(i, j int) bool { + return firstSort[i].GetLoad(bs.firstPriority) > firstSort[j].GetLoad(bs.firstPriority) + }) + secondSort = make([]*statistics.HotPeerStat, len(hotPeers)) + copy(secondSort, hotPeers) + sort.Slice(secondSort, func(i, j int) bool { + return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority) + }) + } + if len(hotPeers) >= topnPosition { + storeID := storeLoad.GetID() + bs.nthHotPeer[storeID][bs.firstPriority] = firstSort[topnPosition-1] + bs.nthHotPeer[storeID][bs.secondPriority] = secondSort[topnPosition-1] + } + if len(hotPeers) > bs.maxPeerNum { + union := bs.sortHotPeers(firstSort, secondSort) ret = make([]*statistics.HotPeerStat, 0, len(union)) for peer := range union { appendItem(peer) } + return ret } - return + for _, peer := range hotPeers { + appendItem(peer) + } + return ret } -func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} { - firstSort := make([]*statistics.HotPeerStat, len(ret)) - copy(firstSort, ret) - sort.Slice(firstSort, func(i, j int) bool { - return firstSort[i].GetLoad(bs.firstPriority) > firstSort[j].GetLoad(bs.firstPriority) - }) - secondSort := make([]*statistics.HotPeerStat, len(ret)) - copy(secondSort, ret) - sort.Slice(secondSort, func(i, j int) bool { - return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority) - }) +func (bs *balanceSolver) sortHotPeers(firstSort, secondSort []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} { union := make(map[*statistics.HotPeerStat]struct{}, bs.maxPeerNum) + // At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow. for len(union) < bs.maxPeerNum { for len(firstSort) > 0 { peer := firstSort[0] diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index abd1a272478..5fc1cffb8ba 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -1834,20 +1834,23 @@ func TestHotCacheSortHotPeer(t *testing.T) { }, }} + st := &statistics.StoreLoadDetail{ + HotPeers: hotPeers, + } leaderSolver.maxPeerNum = 1 - u := leaderSolver.sortHotPeers(hotPeers) + u := leaderSolver.filterHotPeers(st) checkSortResult(re, []uint64{1}, u) leaderSolver.maxPeerNum = 2 - u = leaderSolver.sortHotPeers(hotPeers) + u = leaderSolver.filterHotPeers(st) checkSortResult(re, []uint64{1, 2}, u) } -func checkSortResult(re *require.Assertions, regions []uint64, hotPeers map[*statistics.HotPeerStat]struct{}) { +func checkSortResult(re *require.Assertions, regions []uint64, hotPeers []*statistics.HotPeerStat) { re.Equal(len(hotPeers), len(regions)) for _, region := range regions { in := false - for hotPeer := range hotPeers { + for _, hotPeer := range hotPeers { if hotPeer.RegionID == region { in = true break diff --git a/pkg/schedule/schedulers/hot_region_v2.go b/pkg/schedule/schedulers/hot_region_v2.go index 2b7e96af0fb..689c42b65f0 100644 --- a/pkg/schedule/schedulers/hot_region_v2.go +++ b/pkg/schedule/schedulers/hot_region_v2.go @@ -206,11 +206,17 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int { srcPendingRate, dstPendingRate := bs.cur.getPendingLoad(dim) peersRate := bs.cur.getPeersRateFromCache(dim) highRate, lowRate := srcRate, dstRate + topnHotPeer := bs.nthHotPeer[bs.cur.srcStore.GetID()][dim] reverse := false if srcRate < dstRate { highRate, lowRate = dstRate, srcRate peersRate = -peersRate reverse = true + topnHotPeer = bs.nthHotPeer[bs.cur.dstStore.GetID()][dim] + } + topnRate := math.MaxFloat64 + if topnHotPeer != nil { + topnRate = topnHotPeer.GetLoad(dim) } if highRate*rs.balancedCheckRatio <= lowRate { @@ -262,6 +268,7 @@ func (bs *balanceSolver) getScoreByPriorities(dim int, rs *rankV2Ratios) int { // maxBetterRate may be less than minBetterRate, in which case a positive fraction cannot be produced. minNotWorsenedRate = -bs.getMinRate(dim) minBetterRate = math.Min(minBalancedRate*rs.perceivedRatio, lowRate*rs.minHotRatio) + minBetterRate = math.Min(minBetterRate, topnRate) maxBetterRate = maxBalancedRate + (highRate-lowRate-minBetterRate-maxBalancedRate)*rs.perceivedRatio maxNotWorsenedRate = maxBalancedRate + (highRate-lowRate-minNotWorsenedRate-maxBalancedRate)*rs.perceivedRatio } diff --git a/pkg/schedule/schedulers/hot_region_v2_test.go b/pkg/schedule/schedulers/hot_region_v2_test.go index 1449b3a5946..14ec3edca8b 100644 --- a/pkg/schedule/schedulers/hot_region_v2_test.go +++ b/pkg/schedule/schedulers/hot_region_v2_test.go @@ -19,6 +19,7 @@ import ( "github.com/docker/go-units" "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/statistics" @@ -33,6 +34,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() statistics.Denoising = false + statisticsInterval = 0 sche, err := schedule.CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil) re.NoError(err) @@ -146,6 +148,7 @@ func TestHotWriteRegionScheduleWithRevertRegionsDimFirstOnly(t *testing.T) { // This is a test that searchRevertRegions finds a solution of rank -2. re := require.New(t) statistics.Denoising = false + statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -208,6 +211,7 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { // This is a test that searchRevertRegions finds a solution of rank -1. re := require.New(t) statistics.Denoising = false + statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -269,6 +273,7 @@ func TestHotReadRegionScheduleWithRevertRegionsDimSecond(t *testing.T) { func TestSkipUniformStore(t *testing.T) { re := require.New(t) statistics.Denoising = false + statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -346,3 +351,123 @@ func TestSkipUniformStore(t *testing.T) { operatorutil.CheckTransferLeader(re, ops[0], operator.OpHotRegion, 3, 2) clearPendingInfluence(hb.(*hotScheduler)) } + +func TestHotReadRegionScheduleWithSmallHotRegion(t *testing.T) { + // This is a test that we can schedule small hot region, + // which is smaller than 20% of diff or 2% of low node. (#6645) + // 20% is from `firstPriorityPerceivedRatio`, 2% is from `firstPriorityMinHotRatio`. + // The byte of high node is 2000MB/s, the low node is 200MB/s. + // The query of high node is 2000qps, the low node is 200qps. + // There are all small hot regions in the cluster, which are smaller than 20% of diff or 2% of low node. + re := require.New(t) + emptyFunc := func(*mockcluster.Cluster, *hotScheduler) {} + highLoad, lowLoad := uint64(2000), uint64(200) + bigHotRegionByte := uint64(float64(lowLoad) * firstPriorityMinHotRatio * 10 * units.MiB * statistics.ReadReportInterval) + bigHotRegionQuery := uint64(float64(lowLoad) * firstPriorityMinHotRatio * 10 * statistics.ReadReportInterval) + + // Case1: Before #6827, we only use minHotRatio, so cannot schedule small hot region in this case. + // Because 10000 is larger than the length of hotRegions, so `filterHotPeers` will skip the topn calculation. + origin := topnPosition + topnPosition = 10000 + ops := checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, emptyFunc) + re.Empty(ops) + topnPosition = origin + + // Case2: After #6827, we use top10 as the threshold of minHotPeer. + ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, emptyFunc) + re.Len(ops, 1) + ops = checkHotReadRegionScheduleWithSmallHotRegion(re, lowLoad, highLoad, emptyFunc) + re.Len(ops, 0) + + // Case3: If there is larger hot region, we will schedule it. + hotRegionID := uint64(100) + ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, _ *hotScheduler) { + tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3}) + }) + re.Len(ops, 1) + re.Equal(hotRegionID, ops[0].RegionID()) + + // Case4: If there is larger hot region, but it need to cool down, we will schedule small hot region. + ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, _ *hotScheduler) { + // just transfer leader + tc.AddRegionWithReadInfo(hotRegionID, 2, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{1, 3}) + tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3}) + }) + re.Len(ops, 1) + re.NotEqual(hotRegionID, ops[0].RegionID()) + + // Case5: If there is larger hot region, but it is pending, we will schedule small hot region. + ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, hb *hotScheduler) { + tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3}) + hb.regionPendings[hotRegionID] = &pendingInfluence{} + }) + re.Len(ops, 1) + re.NotEqual(hotRegionID, ops[0].RegionID()) + + // Case5: If there are more than topnPosition hot regions, but them need to cool down, + // we will schedule large hot region rather than small hot region, so there is no operator. + topnPosition = 2 + ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, _ *hotScheduler) { + // just transfer leader + tc.AddRegionWithReadInfo(hotRegionID, 2, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{1, 3}) + tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3}) + // just transfer leader + tc.AddRegionWithReadInfo(hotRegionID+1, 2, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{1, 3}) + tc.AddRegionWithReadInfo(hotRegionID+1, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3}) + }) + re.Len(ops, 0) + topnPosition = origin + + // Case6: If there are more than topnPosition hot regions, but them are pending, + // we will schedule large hot region rather than small hot region, so there is no operator. + topnPosition = 2 + ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, hb *hotScheduler) { + tc.AddRegionWithReadInfo(hotRegionID, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3}) + hb.regionPendings[hotRegionID] = &pendingInfluence{} + tc.AddRegionWithReadInfo(hotRegionID+1, 1, bigHotRegionByte, 0, bigHotRegionQuery, statistics.ReadReportInterval, []uint64{2, 3}) + hb.regionPendings[hotRegionID+1] = &pendingInfluence{} + }) + re.Len(ops, 0) + topnPosition = origin +} + +func checkHotReadRegionScheduleWithSmallHotRegion(re *require.Assertions, highLoad, lowLoad uint64, + addOtherRegions func(*mockcluster.Cluster, *hotScheduler)) []*operator.Operator { + cancel, _, tc, oc := prepareSchedulersTest() + defer cancel() + statistics.Denoising = false + sche, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + re.NoError(err) + hb := sche.(*hotScheduler) + hb.conf.SetSrcToleranceRatio(1) + hb.conf.SetDstToleranceRatio(1) + hb.conf.SetRankFormulaVersion("v2") + hb.conf.ReadPriorities = []string{statistics.QueryPriority, statistics.BytePriority} + tc.SetHotRegionCacheHitsThreshold(0) + tc.AddRegionStore(1, 40) + tc.AddRegionStore(2, 10) + tc.AddRegionStore(3, 10) + + tc.UpdateStorageReadQuery(1, highLoad*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadQuery(2, lowLoad*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadQuery(3, (highLoad+lowLoad)/2*statistics.StoreHeartBeatReportInterval) + tc.UpdateStorageReadStats(1, highLoad*units.MiB*statistics.StoreHeartBeatReportInterval, 0) + tc.UpdateStorageReadStats(2, lowLoad*units.MiB*statistics.StoreHeartBeatReportInterval, 0) + tc.UpdateStorageReadStats(3, (highLoad+lowLoad)/2*units.MiB*statistics.StoreHeartBeatReportInterval, 0) + + smallHotPeerQuery := float64(lowLoad) * firstPriorityMinHotRatio * 0.9 // it's a small hot region than the firstPriorityMinHotRatio + smallHotPeerByte := float64(lowLoad) * secondPriorityMinHotRatio * 0.9 * units.MiB // it's a small hot region than the secondPriorityMinHotRatio + regions := make([]testRegionInfo, 0) + for i := 10; i < 50; i++ { + regions = append(regions, testRegionInfo{uint64(i), []uint64{1, 2, 3}, smallHotPeerByte, 0, smallHotPeerQuery}) + if i < 20 { + regions = append(regions, testRegionInfo{uint64(i), []uint64{2, 1, 3}, smallHotPeerByte, 0, smallHotPeerQuery}) + regions = append(regions, testRegionInfo{uint64(i), []uint64{3, 1, 2}, smallHotPeerByte, 0, smallHotPeerQuery}) + } + } + addRegionInfo(tc, statistics.Read, regions) + tc.SetHotRegionCacheHitsThreshold(1) + addOtherRegions(tc, hb) + ops, _ := hb.Schedule(tc, false) + return ops +}