diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go new file mode 100644 index 00000000000..06dd3f31cfa --- /dev/null +++ b/pkg/schedule/config/config.go @@ -0,0 +1,103 @@ +package config + +import ( + "sync" + "time" + + "github.com/coreos/go-semver/semver" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/core/constant" + "github.com/tikv/pd/pkg/core/storelimit" +) + +// RejectLeader is the label property type that suggests a store should not +// have any region leaders. +const RejectLeader = "reject-leader" + +var schedulerMap sync.Map + +// RegisterScheduler registers the scheduler type. +func RegisterScheduler(typ string) { + schedulerMap.Store(typ, struct{}{}) +} + +// IsSchedulerRegistered checks if the named scheduler type is registered. +func IsSchedulerRegistered(name string) bool { + _, ok := schedulerMap.Load(name) + return ok +} + +// Config is the interface that wraps the Config related methods. +type Config interface { + GetReplicaScheduleLimit() uint64 + GetRegionScheduleLimit() uint64 + GetMergeScheduleLimit() uint64 + GetLeaderScheduleLimit() uint64 + GetHotRegionScheduleLimit() uint64 + GetWitnessScheduleLimit() uint64 + + GetHotRegionCacheHitsThreshold() int + GetMaxMovableHotPeerSize() int64 + IsTraceRegionFlow() bool + + GetSplitMergeInterval() time.Duration + GetMaxMergeRegionSize() uint64 + GetMaxMergeRegionKeys() uint64 + GetKeyType() constant.KeyType + IsOneWayMergeEnabled() bool + IsCrossTableMergeEnabled() bool + + IsPlacementRulesEnabled() bool + IsPlacementRulesCacheEnabled() bool + + GetMaxReplicas() int + GetPatrolRegionInterval() time.Duration + GetMaxStoreDownTime() time.Duration + GetLocationLabels() []string + GetIsolationLevel() string + IsReplaceOfflineReplicaEnabled() bool + IsMakeUpReplicaEnabled() bool + IsRemoveExtraReplicaEnabled() bool + IsLocationReplacementEnabled() bool + IsRemoveDownReplicaEnabled() bool + + GetSwitchWitnessInterval() time.Duration + IsWitnessAllowed() bool + + GetLowSpaceRatio() float64 + GetHighSpaceRatio() float64 + GetTolerantSizeRatio() float64 + GetLeaderSchedulePolicy() constant.SchedulePolicy + GetRegionScoreFormulaVersion() string + + GetMaxSnapshotCount() uint64 + GetMaxPendingPeerCount() uint64 + GetSchedulerMaxWaitingOperator() uint64 + GetStoreLimitByType(uint64, storelimit.Type) float64 + SetAllStoresLimit(storelimit.Type, float64) + GetSlowStoreEvictingAffectedStoreRatioThreshold() float64 + IsUseJointConsensus() bool + CheckLabelProperty(string, []*metapb.StoreLabel) bool + IsDebugMetricsEnabled() bool + GetClusterVersion() *semver.Version + GetStoreLimitVersion() string + // for test purpose + SetPlacementRuleEnabled(bool) + SetSplitMergeInterval(time.Duration) + SetMaxReplicas(int) + SetPlacementRulesCacheEnabled(bool) + SetWitnessEnabled(bool) + // only for store configuration + UseRaftV2() +} + +// StoreConfig is the interface that wraps the StoreConfig related methods. +type StoreConfig interface { + GetRegionMaxSize() uint64 + CheckRegionSize(uint64, uint64) error + CheckRegionKeys(uint64, uint64) error + IsEnableRegionBucket() bool + IsRaftKV2() bool + // for test purpose + SetRegionBucketEnabled(bool) +} diff --git a/server/config/store_config.go b/server/config/store_config.go index 960ea6688e7..74f881058d4 100644 --- a/server/config/store_config.go +++ b/server/config/store_config.go @@ -121,6 +121,25 @@ func (c *StoreConfig) IsEnableRegionBucket() bool { return c.Coprocessor.EnableRegionBucket } +<<<<<<< HEAD +======= +// IsRaftKV2 returns true if the raft kv is v2. +func (c *StoreConfig) IsRaftKV2() bool { + if c == nil { + return false + } + return c.Storage.Engine == raftStoreV2 +} + +// SetRegionBucketEnabled sets if the region bucket is enabled. +func (c *StoreConfig) SetRegionBucketEnabled(enabled bool) { + if c == nil { + return + } + c.Coprocessor.EnableRegionBucket = enabled +} + +>>>>>>> 4f87e9da8 (scheduler: cache history loads in hot region scheduler (#6314)) // GetRegionBucketSize returns region bucket size if enable region buckets. func (c *StoreConfig) GetRegionBucketSize() uint64 { if c == nil || !c.Coprocessor.EnableRegionBucket { diff --git a/server/config/store_config_test.go b/server/config/store_config_test.go index 6916fedc929..3183d3fcd10 100644 --- a/server/config/store_config_test.go +++ b/server/config/store_config_test.go @@ -108,6 +108,11 @@ func TestParseConfig(t *testing.T) { re.NoError(json.Unmarshal([]byte(body), &config)) m.update(&config) re.Equal(uint64(96), config.GetRegionBucketSize()) +<<<<<<< HEAD +======= + re.True(config.IsRaftKV2()) + re.Equal(raftStoreV2, config.Storage.Engine) +>>>>>>> 4f87e9da8 (scheduler: cache history loads in hot region scheduler (#6314)) } func TestMergeCheck(t *testing.T) { diff --git a/server/core/kind.go b/server/core/kind.go index 4c99bd605a8..a2e4c560e26 100644 --- a/server/core/kind.go +++ b/server/core/kind.go @@ -47,6 +47,14 @@ const ( LeaderKind ResourceKind = iota // RegionKind indicates the region kind resource RegionKind +<<<<<<< HEAD:server/core/kind.go +======= + // WitnessKind indicates the witness kind resource + WitnessKind + + // ResourceKindLen represents the ResourceKind count + ResourceKindLen +>>>>>>> 4f87e9da8 (scheduler: cache history loads in hot region scheduler (#6314)):pkg/core/constant/kind.go ) func (k ResourceKind) String() string { diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 8e3881bece7..f35a0856dfc 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -45,7 +45,8 @@ type baseHotScheduler struct { stInfos map[uint64]*statistics.StoreSummaryInfo // temporary states but exported to API or metrics // Every time `Schedule()` will recalculate it. - stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail + stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail + stHistoryLoads *statistics.StoreHistoryLoads // temporary states // Every time `Schedule()` will recalculate it. storesLoads map[uint64][]float64 @@ -63,6 +64,7 @@ func newBaseHotScheduler(opController *schedule.OperatorController) *baseHotSche BaseScheduler: base, types: []statistics.RWType{statistics.Write, statistics.Read}, regionPendings: make(map[uint64]*pendingInfluence), + stHistoryLoads: statistics.NewStoreHistoryLoads(statistics.DimLen), r: rand.New(rand.NewSource(time.Now().UnixNano())), } for ty := resourceType(0); ty < resourceTypeLen; ty++ { @@ -84,6 +86,7 @@ func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sched h.stLoadInfos[ty] = statistics.SummaryStoresLoad( h.stInfos, h.storesLoads, + h.stHistoryLoads, regionStats, isTraceRegionFlow, rw, resource) @@ -257,7 +260,6 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster) if h.conf.IsForbidRWType(typ) { return nil } - switch typ { case statistics.Read: return h.balanceHotReadRegions(cluster) @@ -441,6 +443,8 @@ type balanceSolver struct { minorDecRatio float64 maxPeerNum int minHotDegree int + // todo: remove this after testing more scene in the single rocksdb + isRaftKV2 bool firstPriorityV2Ratios *rankV2Ratios secondPriorityV2Ratios *rankV2Ratios @@ -454,6 +458,7 @@ type balanceSolver struct { betterThan func(*solution) bool rankToDimString func() string checkByPriorityAndTolerance func(loads []float64, f func(int) bool) bool + checkHistoryLoadsByPriority func(loads [][]float64, f func(int) bool) bool } func (bs *balanceSolver) init() { @@ -494,6 +499,7 @@ func (bs *balanceSolver) init() { bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio() bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber() bs.minHotDegree = bs.GetOpts().GetHotRegionCacheHitsThreshold() + bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2() switch bs.sche.conf.GetRankFormulaVersion() { case "v1": @@ -518,10 +524,13 @@ func (bs *balanceSolver) pickCheckPolicyV1() { switch { case bs.resourceTy == writeLeader: bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly + bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceFirstOnly case bs.sche.conf.IsStrictPickingStoreEnabled(): bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAllOf + bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceAllOf default: bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly + bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceFirstOnly } } @@ -590,7 +599,6 @@ func (bs *balanceSolver) solve() []*operator.Operator { if !bs.isValid() { return nil } - bs.cur = &solution{} tryUpdateBestSolution := func() { if label, ok := bs.filterUniformStore(); ok { @@ -742,12 +750,20 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai continue } - if bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) { - ret[id] = detail - hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc() - } else { - hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc() + if !bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) { + hotSchedulerResultCounter.WithLabelValues("src-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + continue } + // only raftkv2 needs to check the history loads. + if bs.isRaftKV2 { + if !bs.checkSrcHistoryLoadsByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, srcToleranceRatio) { + hotSchedulerResultCounter.WithLabelValues("src-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + continue + } + } + + ret[id] = detail + hotSchedulerResultCounter.WithLabelValues("src-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() } return ret } @@ -758,6 +774,17 @@ func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *sta }) } +func (bs *balanceSolver) checkSrcHistoryLoadsByPriorityAndTolerance(current, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool { + if len(current.HistoryLoads) == 0 { + return true + } + return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool { + return slice.AllOf(current.HistoryLoads[i], func(j int) bool { + return current.HistoryLoads[i][j] > toleranceRatio*expectLoad.HistoryLoads[i][j] + }) + }) +} + // filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status. // The returned hotPeer count in controlled by `max-peer-number`. func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) { @@ -956,12 +983,20 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st } if filter.Target(bs.GetOpts(), store, filters) { id := store.GetID() - if bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) { - ret[id] = detail - hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(id, 10)).Inc() - } else { - hotSchedulerResultCounter.WithLabelValues("dst-store-failed", strconv.FormatUint(id, 10)).Inc() + if !bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) { + hotSchedulerResultCounter.WithLabelValues("dst-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + continue } + // only raftkv2 needs to check history loads + if bs.isRaftKV2 { + if !bs.checkDstHistoryLoadsByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, dstToleranceRatio) { + hotSchedulerResultCounter.WithLabelValues("dst-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + continue + } + } + + hotSchedulerResultCounter.WithLabelValues("dst-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc() + ret[id] = detail } } return ret @@ -973,6 +1008,17 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist }) } +func (bs *balanceSolver) checkDstHistoryLoadsByPriorityAndTolerance(current, expect *statistics.StoreLoad, toleranceRatio float64) bool { + if len(current.HistoryLoads) == 0 { + return true + } + return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool { + return slice.AllOf(current.HistoryLoads[i], func(j int) bool { + return current.HistoryLoads[i][j]*toleranceRatio < expect.HistoryLoads[i][j] + }) + }) +} + func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f func(int) bool) bool { return slice.AllOf(loads, func(i int) bool { if bs.isSelectedDim(i) { @@ -982,6 +1028,15 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f fun }) } +func (bs *balanceSolver) checkHistoryLoadsByPriorityAndToleranceAllOf(loads [][]float64, f func(int) bool) bool { + return slice.AllOf(loads, func(i int) bool { + if bs.isSelectedDim(i) { + return f(i) + } + return true + }) +} + func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f func(int) bool) bool { return slice.AnyOf(loads, func(i int) bool { if bs.isSelectedDim(i) { @@ -991,10 +1046,23 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f fun }) } +func (bs *balanceSolver) checkHistoryByPriorityAndToleranceAnyOf(loads [][]float64, f func(int) bool) bool { + return slice.AnyOf(loads, func(i int) bool { + if bs.isSelectedDim(i) { + return f(i) + } + return false + }) +} + func (bs *balanceSolver) checkByPriorityAndToleranceFirstOnly(loads []float64, f func(int) bool) bool { return f(bs.firstPriority) } +func (bs *balanceSolver) checkHistoryLoadsByPriorityAndToleranceFirstOnly(_ [][]float64, f func(int) bool) bool { + return f(bs.firstPriority) +} + func (bs *balanceSolver) enableExpectation() bool { return bs.sche.conf.GetDstToleranceRatio() > 0 && bs.sche.conf.GetSrcToleranceRatio() > 0 } diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index cdff13c550c..dd3273e6e02 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -193,7 +193,22 @@ func newTestRegion(id uint64) *core.RegionInfo { func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { re := require.New(t) +<<<<<<< HEAD:server/schedulers/hot_region_test.go ctx, cancel := context.WithCancel(context.Background()) +======= + statistics.Denoising = false + statistics.HistorySampleDuration = 0 + statisticsInterval = 0 + checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */) + checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */) +} + +func TestSplitBuckets(t *testing.T) { + re := require.New(t) + statistics.Denoising = false + cancel, _, tc, oc := prepareSchedulersTest() + tc.SetHotRegionCacheHitsThreshold(1) +>>>>>>> 4f87e9da8 (scheduler: cache history loads in hot region scheduler (#6314)):pkg/schedule/schedulers/hot_region_test.go defer cancel() statistics.Denoising = false opt := config.NewTestOptions() @@ -524,11 +539,13 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { tikvKeysSum += float64(storesBytes[i]/100) / 10 tikvQuerySum += float64(storesBytes[i]/100) / 10 } + for i := uint64(1); i <= storeCount; i++ { if i != downStoreID { tc.UpdateStorageWrittenBytes(i, storesBytes[i]) } } + { // Check the load expect aliveTiKVCount := float64(aliveTiKVLastID - aliveTiKVStartID + 1) allowLeaderTiKVCount := aliveTiKVCount - 1 // store 5 with evict leader @@ -1813,6 +1830,7 @@ func checkSortResult(re *require.Assertions, regions []uint64, hotPeers map[*sta func TestInfluenceByRWType(t *testing.T) { re := require.New(t) + statistics.HistorySampleDuration = 0 originValue := schedulePeerPr defer func() { schedulePeerPr = originValue @@ -2453,10 +2471,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: true, // all of load: &statistics.StoreLoad{ // all dims are higher than expect, allow schedule - Loads: []float64{2.0, 2.0, 2.0}, + Loads: []float64{2.0, 2.0, 2.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {2.0, 2.0}, {2.0, 2.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2465,10 +2485,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: true, // all of load: &statistics.StoreLoad{ // all dims are higher than expect, but lower than expect*toleranceRatio, not allow schedule - Loads: []float64{2.0, 2.0, 2.0}, + Loads: []float64{2.0, 2.0, 2.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {2.0, 2.0}, {2.0, 2.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, toleranceRatio: 2.2, @@ -2478,10 +2500,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: true, // all of load: &statistics.StoreLoad{ // only queryDim is lower, but the dim is no selected, allow schedule - Loads: []float64{2.0, 2.0, 1.0}, + Loads: []float64{2.0, 2.0, 1.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {2.0, 2.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2490,10 +2514,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: true, // all of load: &statistics.StoreLoad{ // only keyDim is lower, and the dim is selected, not allow schedule - Loads: []float64{2.0, 1.0, 2.0}, + Loads: []float64{2.0, 1.0, 2.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {1.0, 1.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: false, @@ -2502,10 +2528,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: false, // first only load: &statistics.StoreLoad{ // keyDim is higher, and the dim is selected, allow schedule - Loads: []float64{1.0, 2.0, 1.0}, + Loads: []float64{1.0, 2.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {2.0, 2.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2514,10 +2542,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: false, // first only load: &statistics.StoreLoad{ // although byteDim is higher, the dim is not first, not allow schedule - Loads: []float64{2.0, 1.0, 1.0}, + Loads: []float64{2.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{2.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: false, @@ -2526,10 +2556,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: false, // first only load: &statistics.StoreLoad{ // although queryDim is higher, the dim is no selected, not allow schedule - Loads: []float64{1.0, 1.0, 2.0}, + Loads: []float64{1.0, 1.0, 2.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {2.0, 2.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: false, @@ -2538,10 +2570,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV1, strict: false, // first only load: &statistics.StoreLoad{ // all dims are lower than expect, not allow schedule - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{2.0, 2.0, 2.0}, + Loads: []float64{2.0, 2.0, 2.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {2.0, 2.0}, {2.0, 2.0}}, }, isSrc: true, allow: false, @@ -2551,10 +2585,12 @@ func TestExpect(t *testing.T) { strict: true, rs: writeLeader, load: &statistics.StoreLoad{ // only keyDim is higher, but write leader only consider the first priority - Loads: []float64{1.0, 2.0, 1.0}, + Loads: []float64{1.0, 2.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {2.0, 2.0}, {2.0, 2.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2564,10 +2600,27 @@ func TestExpect(t *testing.T) { strict: true, rs: writeLeader, load: &statistics.StoreLoad{ // although byteDim is higher, the dim is not first, not allow schedule - Loads: []float64{2.0, 1.0, 1.0}, + Loads: []float64{2.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {1.0, 1.0}, {2.0, 2.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, + }, + isSrc: true, + allow: false, + }, + { + initFunc: (*balanceSolver).pickCheckPolicyV1, + strict: true, + rs: writeLeader, + load: &statistics.StoreLoad{ // history loads is not higher than the expected. + Loads: []float64{2.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {1.0, 2.0}, {1.0, 2.0}}, + }, + expect: &statistics.StoreLoad{ + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 2.0}, {1.0, 2.0}, {1.0, 2.0}}, }, isSrc: true, allow: false, @@ -2577,10 +2630,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV2, strict: false, // any of load: &statistics.StoreLoad{ // keyDim is higher, and the dim is selected, allow schedule - Loads: []float64{1.0, 2.0, 1.0}, + Loads: []float64{1.0, 2.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {2.0, 2.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2589,10 +2644,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV2, strict: false, // any of load: &statistics.StoreLoad{ // byteDim is higher, and the dim is selected, allow schedule - Loads: []float64{2.0, 1.0, 1.0}, + Loads: []float64{2.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {1.0, 1.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2601,10 +2658,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV2, strict: false, // any of load: &statistics.StoreLoad{ // although queryDim is higher, the dim is no selected, not allow schedule - Loads: []float64{1.0, 1.0, 2.0}, + Loads: []float64{1.0, 1.0, 2.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {2.0, 2.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: false, @@ -2613,10 +2672,12 @@ func TestExpect(t *testing.T) { initFunc: (*balanceSolver).pickCheckPolicyV2, strict: false, // any of load: &statistics.StoreLoad{ // all dims are lower than expect, not allow schedule - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{2.0, 2.0, 2.0}, + Loads: []float64{2.0, 2.0, 2.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {2.0, 2.0}, {2.0, 2.0}}, }, isSrc: true, allow: false, @@ -2626,10 +2687,12 @@ func TestExpect(t *testing.T) { strict: true, rs: writeLeader, load: &statistics.StoreLoad{ // only keyDim is higher, but write leader only consider the first priority - Loads: []float64{1.0, 2.0, 1.0}, + Loads: []float64{1.0, 2.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {2.0, 2.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: true, @@ -2639,10 +2702,12 @@ func TestExpect(t *testing.T) { strict: true, rs: writeLeader, load: &statistics.StoreLoad{ // although byteDim is higher, the dim is not first, not allow schedule - Loads: []float64{2.0, 1.0, 1.0}, + Loads: []float64{2.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{2.0, 2.0}, {1.0, 1.0}, {1.0, 1.0}}, }, expect: &statistics.StoreLoad{ - Loads: []float64{1.0, 1.0, 1.0}, + Loads: []float64{1.0, 1.0, 1.0}, + HistoryLoads: [][]float64{{1.0, 1.0}, {1.0, 1.0}, {1.0, 1.0}}, }, isSrc: true, allow: false, @@ -2654,8 +2719,16 @@ func TestExpect(t *testing.T) { for i, v := range src.Loads { dst[i] = 3.0 - v } + historyLoads := make([][]float64, len(src.HistoryLoads)) + for i, dim := range src.HistoryLoads { + historyLoads[i] = make([]float64, len(dim)) + for j, load := range dim { + historyLoads[i][j] = 3.0 - load + } + } return &statistics.StoreLoad{ - Loads: dst, + Loads: dst, + HistoryLoads: historyLoads, } } @@ -2674,6 +2747,8 @@ func TestExpect(t *testing.T) { testCase.initFunc(bs) re.Equal(testCase.allow, bs.checkSrcByPriorityAndTolerance(testCase.load, testCase.expect, toleranceRatio)) re.Equal(testCase.allow, bs.checkDstByPriorityAndTolerance(srcToDst(testCase.load), srcToDst(testCase.expect), toleranceRatio)) + re.Equal(testCase.allow, bs.checkSrcHistoryLoadsByPriorityAndTolerance(testCase.load, testCase.expect, toleranceRatio)) + re.Equal(testCase.allow, bs.checkDstHistoryLoadsByPriorityAndTolerance(srcToDst(testCase.load), srcToDst(testCase.expect), toleranceRatio)) } } diff --git a/server/schedulers/hot_region_v2.go b/server/schedulers/hot_region_v2.go index 49b7f7042b0..4d3cb796eb1 100644 --- a/server/schedulers/hot_region_v2.go +++ b/server/schedulers/hot_region_v2.go @@ -86,8 +86,10 @@ func (bs *balanceSolver) pickCheckPolicyV2() { switch { case bs.resourceTy == writeLeader: bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly + bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceFirstOnly default: bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAnyOf + bs.checkHistoryLoadsByPriority = bs.checkHistoryByPriorityAndToleranceAnyOf } } diff --git a/server/statistics/kind.go b/server/statistics/kind.go index 06f8f64ca07..a1bc65475ea 100644 --- a/server/statistics/kind.go +++ b/server/statistics/kind.go @@ -170,6 +170,7 @@ type RWType int const ( Write RWType = iota Read + RWTypeLen ) func (rw RWType) String() string { diff --git a/server/statistics/store_collection_test.go b/server/statistics/store_collection_test.go index 7fa5d9e94fc..92a0bb2d682 100644 --- a/server/statistics/store_collection_test.go +++ b/server/statistics/store_collection_test.go @@ -15,13 +15,20 @@ package statistics import ( + "strconv" "testing" "time" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" +<<<<<<< HEAD:server/statistics/store_collection_test.go "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" +======= + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" + "github.com/tikv/pd/pkg/mock/mockconfig" +>>>>>>> 4f87e9da8 (scheduler: cache history loads in hot region scheduler (#6314)):pkg/statistics/store_collection_test.go ) func TestStoreStatistics(t *testing.T) { @@ -80,3 +87,66 @@ func TestStoreStatistics(t *testing.T) { re.Equal(4, stats.LabelCounter["host:h2"]) re.Equal(2, stats.LabelCounter["zone:unknown"]) } + +func TestSummaryStoreInfos(t *testing.T) { + re := require.New(t) + rw := Read + kind := constant.LeaderKind + collector := newTikvCollector() + storeHistoryLoad := NewStoreHistoryLoads(DimLen) + storeInfos := make(map[uint64]*StoreSummaryInfo) + storeLoads := make(map[uint64][]float64) + for _, storeID := range []int{1, 3} { + storeInfos[uint64(storeID)] = &StoreSummaryInfo{ + isTiFlash: false, + StoreInfo: core.NewStoreInfo(&metapb.Store{Id: uint64(storeID), Address: "mock://tikv" + strconv.Itoa(storeID)}, core.SetLastHeartbeatTS(time.Now())), + } + storeLoads[uint64(storeID)] = []float64{1, 2, 0, 0, 5} + for i, v := range storeLoads[uint64(storeID)] { + storeLoads[uint64(storeID)][i] = v * float64(storeID) + } + } + + // case 1: put one element into history load + details := summaryStoresLoadByEngine(storeInfos, storeLoads, storeHistoryLoad, nil, rw, kind, collector) + re.Len(details, 2) + re.Empty(details[0].LoadPred.Current.HistoryLoads) + re.Empty(details[1].LoadPred.Current.HistoryLoads) + expectHistoryLoads := []float64{1, 2, 5} + for _, storeID := range []uint64{1, 3} { + loads := storeHistoryLoad.Get(storeID, rw, kind) + for i := 0; i < len(loads); i++ { + for j := 0; j < len(loads[0]); j++ { + if loads[i][j] != 0 { + re.Equal(loads[i][j]/float64(storeID), expectHistoryLoads[i]) + } + } + } + } + + // case 2: put many elements into history load + historySampleInterval = 0 + for i := 1; i < 10; i++ { + details = summaryStoresLoadByEngine(storeInfos, storeLoads, storeHistoryLoad, nil, rw, kind, collector) + expect := []float64{2, 4, 10} + for _, detail := range details { + loads := detail.LoadPred.Current.HistoryLoads + storeID := detail.GetID() + for i := 0; i < len(loads); i++ { + for j := 0; j < len(loads[0]); j++ { + if loads[i][j] != 0 { + re.Equal(loads[i][j]/float64(storeID), expectHistoryLoads[i]) + } + } + } + + for i, loads := range detail.LoadPred.Expect.HistoryLoads { + for _, load := range loads { + if load != 0 { + re.Equal(load, expect[i]) + } + } + } + } + } +} diff --git a/server/statistics/store_hot_peers_infos.go b/server/statistics/store_hot_peers_infos.go index 17ed6f47598..f0d618d5fef 100644 --- a/server/statistics/store_hot_peers_infos.go +++ b/server/statistics/store_hot_peers_infos.go @@ -74,12 +74,14 @@ func GetHotStatus(stores []*core.StoreInfo, storesLoads map[uint64][]float64, re stLoadInfosAsLeader := SummaryStoresLoad( stInfos, storesLoads, + nil, regionStats, isTraceRegionFlow, typ, core.LeaderKind) stLoadInfosAsPeer := SummaryStoresLoad( stInfos, storesLoads, + nil, regionStats, isTraceRegionFlow, typ, core.RegionKind) @@ -104,6 +106,7 @@ func GetHotStatus(stores []*core.StoreInfo, storesLoads map[uint64][]float64, re func SummaryStoresLoad( storeInfos map[uint64]*StoreSummaryInfo, storesLoads map[uint64][]float64, + storesHistoryLoads *StoreHistoryLoads, storeHotPeers map[uint64][]*HotPeerStat, isTraceRegionFlow bool, rwTy RWType, @@ -115,6 +118,7 @@ func SummaryStoresLoad( tikvLoadDetail := summaryStoresLoadByEngine( storeInfos, storesLoads, + storesHistoryLoads, storeHotPeers, rwTy, kind, newTikvCollector(), @@ -122,6 +126,7 @@ func SummaryStoresLoad( tiflashLoadDetail := summaryStoresLoadByEngine( storeInfos, storesLoads, + storesHistoryLoads, storeHotPeers, rwTy, kind, newTiFlashCollector(isTraceRegionFlow), @@ -136,6 +141,7 @@ func SummaryStoresLoad( func summaryStoresLoadByEngine( storeInfos map[uint64]*StoreSummaryInfo, storesLoads map[uint64][]float64, + storesHistoryLoads *StoreHistoryLoads, storeHotPeers map[uint64][]*HotPeerStat, rwTy RWType, kind core.ResourceKind, @@ -143,6 +149,7 @@ func summaryStoresLoadByEngine( ) []*StoreLoadDetail { loadDetail := make([]*StoreLoadDetail, 0, len(storeInfos)) allStoreLoadSum := make([]float64, DimLen) + allStoreHistoryLoadSum := make([][]float64, DimLen) allStoreCount := 0 allHotPeersCount := 0 @@ -174,8 +181,22 @@ func summaryStoresLoadByEngine( ty = "query-rate-" + rwTy.String() + "-" + kind.String() hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[QueryDim]) } - loads := collector.GetLoads(storeLoads, peerLoadSum, rwTy, kind) + + var historyLoads [][]float64 + if storesHistoryLoads != nil { + historyLoads = storesHistoryLoads.Get(id, rwTy, kind) + for i, loads := range historyLoads { + if allStoreHistoryLoadSum[i] == nil || len(allStoreHistoryLoadSum[i]) < len(loads) { + allStoreHistoryLoadSum[i] = make([]float64, len(loads)) + } + for j, load := range loads { + allStoreHistoryLoadSum[i][j] += load + } + } + storesHistoryLoads.Add(id, rwTy, kind, loads) + } + for i := range allStoreLoadSum { allStoreLoadSum[i] += loads[i] } @@ -184,8 +205,9 @@ func summaryStoresLoadByEngine( // Build store load prediction from current load and pending influence. stLoadPred := (&StoreLoad{ - Loads: loads, - Count: float64(len(hotPeers)), + Loads: loads, + Count: float64(len(hotPeers)), + HistoryLoads: historyLoads, }).ToLoadPred(rwTy, info.PendingSum) // Construct store load info. @@ -206,6 +228,14 @@ func summaryStoresLoadByEngine( expectLoads[i] = allStoreLoadSum[i] / float64(allStoreCount) } + // todo: remove some the max value or min value to avoid the effect of extreme value. + expectHistoryLoads := make([][]float64, DimLen) + for i := range allStoreHistoryLoadSum { + expectHistoryLoads[i] = make([]float64, len(allStoreHistoryLoadSum[i])) + for j := range allStoreHistoryLoadSum[i] { + expectHistoryLoads[i][j] = allStoreHistoryLoadSum[i][j] / float64(allStoreCount) + } + } stddevLoads := make([]float64, len(allStoreLoadSum)) if allHotPeersCount != 0 { for _, detail := range loadDetail { @@ -237,8 +267,9 @@ func summaryStoresLoadByEngine( hotPeerSummary.WithLabelValues(ty, engine).Set(stddevLoads[QueryDim]) } expect := StoreLoad{ - Loads: expectLoads, - Count: expectCount, + Loads: expectLoads, + Count: expectCount, + HistoryLoads: expectHistoryLoads, } stddev := StoreLoad{ Loads: stddevLoads, diff --git a/server/statistics/store_load.go b/server/statistics/store_load.go index cbdac685401..c8075e9e694 100644 --- a/server/statistics/store_load.go +++ b/server/statistics/store_load.go @@ -16,8 +16,14 @@ package statistics import ( "math" + "time" +<<<<<<< HEAD:server/statistics/store_load.go "github.com/tikv/pd/server/core" +======= + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/core/constant" +>>>>>>> 4f87e9da8 (scheduler: cache history loads in hot region scheduler (#6314)):pkg/statistics/store_load.go ) // StoreLoadDetail records store load information. @@ -144,8 +150,9 @@ func (s *StoreSummaryInfo) SetEngineAsTiFlash() { // StoreLoad records the current load. type StoreLoad struct { - Loads []float64 - Count float64 + Loads []float64 + Count float64 + HistoryLoads [][]float64 } // ToLoadPred returns the current load and future predictive load. @@ -240,3 +247,91 @@ func MaxLoad(a, b *StoreLoad) *StoreLoad { Count: math.Max(a.Count, b.Count), } } + +var ( + // historySampleInterval is the sampling interval for history load. + historySampleInterval = 30 * time.Second + // HistorySampleDuration is the duration for saving history load. + HistorySampleDuration = 5 * time.Minute + defaultSize = 10 +) + +// StoreHistoryLoads records the history load of a store. +type StoreHistoryLoads struct { + // loads[read/write][leader/follower]-->[store id]-->history load + loads [RWTypeLen][constant.ResourceKindLen]map[uint64]*storeHistoryLoad + dim int +} + +// NewStoreHistoryLoads creates a StoreHistoryLoads. +func NewStoreHistoryLoads(dim int) *StoreHistoryLoads { + st := StoreHistoryLoads{dim: dim} + for i := RWType(0); i < RWTypeLen; i++ { + for j := constant.ResourceKind(0); j < constant.ResourceKindLen; j++ { + st.loads[i][j] = make(map[uint64]*storeHistoryLoad) + } + } + return &st +} + +// Add adds the store load to the history. +func (s *StoreHistoryLoads) Add(storeID uint64, rwTp RWType, kind constant.ResourceKind, loads []float64) { + load, ok := s.loads[rwTp][kind][storeID] + if !ok { + size := defaultSize + if historySampleInterval != 0 { + size = int(HistorySampleDuration / historySampleInterval) + } + load = newStoreHistoryLoad(size, s.dim) + s.loads[rwTp][kind][storeID] = load + } + load.add(loads) +} + +// Get returns the store loads from the history, not one time point. +func (s *StoreHistoryLoads) Get(storeID uint64, rwTp RWType, kind constant.ResourceKind) [][]float64 { + load, ok := s.loads[rwTp][kind][storeID] + if !ok { + return [][]float64{} + } + return load.get() +} + +type storeHistoryLoad struct { + update time.Time + // loads is a circular buffer. + // [dim] --> [1,2,3...] + loads [][]float64 + size int + count int +} + +func newStoreHistoryLoad(size int, dim int) *storeHistoryLoad { + return &storeHistoryLoad{ + loads: make([][]float64, dim), + size: size, + } +} + +// add adds the store load to the history. +// eg. add([1,2,3]) --> [][]float64{{1}, {2}, {3}} +func (s *storeHistoryLoad) add(loads []float64) { + // reject if the loads length is not equal to the dimension. + if time.Since(s.update) < historySampleInterval || s.size == 0 || len(loads) != len(s.loads) { + return + } + if s.count == 0 { + for i := range s.loads { + s.loads[i] = make([]float64, s.size) + } + } + for i, v := range loads { + s.loads[i][s.count%s.size] = v + } + s.count++ + s.update = time.Now() +} + +func (s *storeHistoryLoad) get() [][]float64 { + return s.loads +} diff --git a/server/statistics/store_load_test.go b/server/statistics/store_load_test.go new file mode 100644 index 00000000000..18441f00dbc --- /dev/null +++ b/server/statistics/store_load_test.go @@ -0,0 +1,45 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/core/constant" +) + +func TestHistoryLoads(t *testing.T) { + re := require.New(t) + historySampleInterval = 0 + historyLoads := NewStoreHistoryLoads(DimLen) + loads := []float64{1.0, 2.0, 3.0} + rwTp := Read + kind := constant.LeaderKind + historyLoads.Add(1, rwTp, kind, loads) + re.Len(historyLoads.Get(1, rwTp, kind)[0], 10) + + expectLoads := make([][]float64, DimLen) + for i := 0; i < len(loads); i++ { + expectLoads[i] = make([]float64, 10) + } + for i := 0; i < 10; i++ { + historyLoads.Add(1, rwTp, kind, loads) + expectLoads[ByteDim][i] = 1.0 + expectLoads[KeyDim][i] = 2.0 + expectLoads[QueryDim][i] = 3.0 + } + re.EqualValues(expectLoads, historyLoads.Get(1, rwTp, kind)) +}