Skip to content

Commit

Permalink
scheduler: introduce base class of hot region scheduler (#5708)
Browse files Browse the repository at this point in the history
ref #5691

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
lhy1024 and ti-chi-bot authored Nov 22, 2022
1 parent 36c1532 commit ff29b53
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 191 deletions.
51 changes: 11 additions & 40 deletions server/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
package schedulers

import (
"math/rand"
"net/http"
"sort"
"strconv"
"strings"
"time"

"github.com/gorilla/mux"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -157,27 +155,19 @@ func (conf *grantHotRegionSchedulerConfig) has(storeID uint64) bool {

// grantLeaderScheduler transfers all hot peers to peers and transfer leader to the fixed store
type grantHotRegionScheduler struct {
*BaseScheduler
r *rand.Rand
conf *grantHotRegionSchedulerConfig
handler http.Handler
types []statistics.RWType
stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail
*baseHotScheduler
conf *grantHotRegionSchedulerConfig
handler http.Handler
}

// newGrantHotRegionScheduler creates an admin scheduler that transfers hot region peer to fixed store and hot region leader to one store.
func newGrantHotRegionScheduler(opController *schedule.OperatorController, conf *grantHotRegionSchedulerConfig) *grantHotRegionScheduler {
base := NewBaseScheduler(opController)
base := newBaseHotScheduler(opController)
handler := newGrantHotRegionHandler(conf)
ret := &grantHotRegionScheduler{
BaseScheduler: base,
conf: conf,
handler: handler,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
types: []statistics.RWType{statistics.Read, statistics.Write},
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
ret.stLoadInfos[ty] = map[uint64]*statistics.StoreLoadDetail{}
baseHotScheduler: base,
conf: conf,
handler: handler,
}
return ret
}
Expand Down Expand Up @@ -272,32 +262,13 @@ func newGrantHotRegionHandler(config *grantHotRegionSchedulerConfig) http.Handle

func (s *grantHotRegionScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()
i := s.r.Int() % len(s.types)
return s.dispatch(s.types[i], cluster), nil
rw := s.randomRWType()
s.prepareForBalance(rw, cluster)
return s.dispatch(rw, cluster), nil
}

func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster) []*operator.Operator {
storeInfos := statistics.SummaryStoreInfos(cluster.GetStores())
storesLoads := cluster.GetStoresLoads()
isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow()

var stLoadInfos map[uint64]*statistics.StoreLoadDetail
switch typ {
case statistics.Read:
stLoadInfos = statistics.SummaryStoresLoad(
storeInfos,
storesLoads,
cluster.RegionReadStats(),
isTraceRegionFlow,
statistics.Read, core.RegionKind)
case statistics.Write:
stLoadInfos = statistics.SummaryStoresLoad(
storeInfos,
storesLoads,
cluster.RegionWriteStats(),
isTraceRegionFlow,
statistics.Write, core.RegionKind)
}
stLoadInfos := s.stLoadInfos[buildResourceType(typ, core.RegionKind)]
infos := make([]*statistics.StoreLoadDetail, len(stLoadInfos))
index := 0
for _, info := range stLoadInfos {
Expand Down
236 changes: 128 additions & 108 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,106 @@ import (
"go.uber.org/zap"
)

type baseHotScheduler struct {
*BaseScheduler
// store information, including pending Influence by resource type
// Every time `Schedule()` will recalculate it.
stInfos map[uint64]*statistics.StoreSummaryInfo
// temporary states but exported to API or metrics
// Every time `Schedule()` will recalculate it.
stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail
// temporary states
// Every time `Schedule()` will recalculate it.
storesLoads map[uint64][]float64
// regionPendings stores regionID -> pendingInfluence
// this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't
// be selected if its owner region is tracked in this attribute.
regionPendings map[uint64]*pendingInfluence
types []statistics.RWType
r *rand.Rand
}

func newBaseHotScheduler(opController *schedule.OperatorController) *baseHotScheduler {
base := NewBaseScheduler(opController)
ret := &baseHotScheduler{
BaseScheduler: base,
types: []statistics.RWType{statistics.Write, statistics.Read},
regionPendings: make(map[uint64]*pendingInfluence),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
ret.stLoadInfos[ty] = map[uint64]*statistics.StoreLoadDetail{}
}
return ret
}

// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
// each store, only update read or write load detail
func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster schedule.Cluster) {
h.stInfos = statistics.SummaryStoreInfos(cluster.GetStores())
h.summaryPendingInfluence(cluster)
h.storesLoads = cluster.GetStoresLoads()
isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow()

prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, resource core.ResourceKind) {
ty := buildResourceType(rw, resource)
h.stLoadInfos[ty] = statistics.SummaryStoresLoad(
h.stInfos,
h.storesLoads,
regionStats,
isTraceRegionFlow,
rw, resource)
}
switch rw {
case statistics.Read:
// update read statistics
regionRead := cluster.RegionReadStats()
prepare(regionRead, core.LeaderKind)
prepare(regionRead, core.RegionKind)
case statistics.Write:
// update write statistics
regionWrite := cluster.RegionWriteStats()
prepare(regionWrite, core.LeaderKind)
prepare(regionWrite, core.RegionKind)
}
}

// summaryPendingInfluence calculate the summary of pending Influence for each store
// and clean the region from regionInfluence if they have ended operator.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *baseHotScheduler) summaryPendingInfluence(cluster schedule.Cluster) {
for id, p := range h.regionPendings {
from := h.stInfos[p.from]
to := h.stInfos[p.to]
maxZombieDur := p.maxZombieDuration
weight, needGC := calcPendingInfluence(p.op, maxZombieDur)

if needGC {
delete(h.regionPendings, id)
continue
}

if from != nil && weight > 0 {
from.AddInfluence(&p.origin, -weight)
}
if to != nil && weight > 0 {
to.AddInfluence(&p.origin, weight)
}
}
for storeID, info := range h.stInfos {
storeLabel := strconv.FormatUint(storeID, 10)
if infl := info.PendingSum; infl != nil {
statistics.ForeachRegionStats(func(rwTy statistics.RWType, dim int, kind statistics.RegionStatKind) {
cluster.SetHotPendingInfluenceMetrics(storeLabel, rwTy.String(), statistics.DimToString(dim), infl.Loads[kind])
})
}
}
}

func (h *baseHotScheduler) randomRWType() statistics.RWType {
return h.types[h.r.Int()%len(h.types)]
}

func init() {
schedule.RegisterSliceDecoderBuilder(HotRegionType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
Expand Down Expand Up @@ -91,40 +191,21 @@ var (

type hotScheduler struct {
name string
*BaseScheduler
*baseHotScheduler
syncutil.RWMutex
types []statistics.RWType
r *rand.Rand

// regionPendings stores regionID -> pendingInfluence
// this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't
// be selected if its owner region is tracked in this attribute.
regionPendings map[uint64]*pendingInfluence

// store information, including pending Influence by resource type
// Every time `Schedule()` will recalculate it.
stInfos map[uint64]*statistics.StoreSummaryInfo
// temporary states but exported to API or metrics
// Every time `Schedule()` will recalculate it.
stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail

// config of hot scheduler
conf *hotRegionSchedulerConfig
searchRevertRegions [resourceTypeLen]bool // Whether to search revert regions.
}

func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionSchedulerConfig) *hotScheduler {
base := NewBaseScheduler(opController)
base := newBaseHotScheduler(opController)
ret := &hotScheduler{
name: HotRegionName,
BaseScheduler: base,
types: []statistics.RWType{statistics.Write, statistics.Read},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
regionPendings: make(map[uint64]*pendingInfluence),
conf: conf,
name: HotRegionName,
baseHotScheduler: base,
conf: conf,
}
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
ret.stLoadInfos[ty] = map[uint64]*statistics.StoreLoadDetail{}
ret.searchRevertRegions[ty] = false
}
return ret
Expand Down Expand Up @@ -164,13 +245,13 @@ func (h *hotScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool {

func (h *hotScheduler) Schedule(cluster schedule.Cluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
schedulerCounter.WithLabelValues(h.GetName(), "schedule").Inc()
return h.dispatch(h.types[h.r.Int()%len(h.types)], cluster), nil
rw := h.randomRWType()
return h.dispatch(rw, cluster), nil
}

func (h *hotScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster) []*operator.Operator {
h.Lock()
defer h.Unlock()

h.prepareForBalance(typ, cluster)
// it can not move earlier to support to use api and metrics.
if h.conf.IsForbidRWType(typ) {
Expand All @@ -186,86 +267,6 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster)
return nil
}

// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
// each store
func (h *hotScheduler) prepareForBalance(typ statistics.RWType, cluster schedule.Cluster) {
h.stInfos = statistics.SummaryStoreInfos(cluster.GetStores())
h.summaryPendingInfluence(cluster)
storesLoads := cluster.GetStoresLoads()
isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow()

switch typ {
case statistics.Read:
// update read statistics
regionRead := cluster.RegionReadStats()
h.stLoadInfos[readLeader] = statistics.SummaryStoresLoad(
h.stInfos,
storesLoads,
regionRead,
isTraceRegionFlow,
statistics.Read, core.LeaderKind)
h.stLoadInfos[readPeer] = statistics.SummaryStoresLoad(
h.stInfos,
storesLoads,
regionRead,
isTraceRegionFlow,
statistics.Read, core.RegionKind)
case statistics.Write:
// update write statistics
regionWrite := cluster.RegionWriteStats()
h.stLoadInfos[writeLeader] = statistics.SummaryStoresLoad(
h.stInfos,
storesLoads,
regionWrite,
isTraceRegionFlow,
statistics.Write, core.LeaderKind)
h.stLoadInfos[writePeer] = statistics.SummaryStoresLoad(
h.stInfos,
storesLoads,
regionWrite,
isTraceRegionFlow,
statistics.Write, core.RegionKind)
}
}

// summaryPendingInfluence calculate the summary of pending Influence for each store
// and clean the region from regionInfluence if they have ended operator.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *hotScheduler) summaryPendingInfluence(cluster schedule.Cluster) {
for id, p := range h.regionPendings {
from := h.stInfos[p.from]
to := h.stInfos[p.to]
maxZombieDur := p.maxZombieDuration
weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur)

if needGC {
delete(h.regionPendings, id)
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec()
log.Debug("gc pending influence in hot region scheduler",
zap.Uint64("region-id", id),
zap.Time("create", p.op.GetCreateTime()),
zap.Time("now", time.Now()),
zap.Duration("zombie", maxZombieDur))
continue
}

if from != nil && weight > 0 {
from.AddInfluence(&p.origin, -weight)
}
if to != nil && weight > 0 {
to.AddInfluence(&p.origin, weight)
}
}
for storeID, info := range h.stInfos {
storeLabel := strconv.FormatUint(storeID, 10)
if infl := info.PendingSum; infl != nil {
statistics.ForeachRegionStats(func(rwTy statistics.RWType, dim int, kind statistics.RegionStatKind) {
cluster.SetHotPendingInfluenceMetrics(storeLabel, rwTy.String(), statistics.DimToString(dim), infl.Loads[kind])
})
}
}
}

func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool {
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
Expand All @@ -277,7 +278,6 @@ func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, d
influence := newPendingInfluence(op, srcStore, dstStore, infl, maxZombieDur)
h.regionPendings[regionID] = influence

schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc()
statistics.ForeachRegionStats(func(rwTy statistics.RWType, dim int, kind statistics.RegionStatKind) {
hotPeerHist.WithLabelValues(h.GetName(), rwTy.String(), statistics.DimToString(dim)).Observe(infl.Loads[kind])
})
Expand Down Expand Up @@ -1446,7 +1446,7 @@ func (bs *balanceSolver) logBestSolution() {
}

// calcPendingInfluence return the calculate weight of one Operator, the value will between [0,1]
func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur time.Duration) (weight float64, needGC bool) {
func calcPendingInfluence(op *operator.Operator, maxZombieDur time.Duration) (weight float64, needGC bool) {
status := op.CheckAndGetStatus()
if !operator.IsEndStatus(status) {
return 1, false
Expand Down Expand Up @@ -1517,6 +1517,26 @@ func toResourceType(rwTy statistics.RWType, opTy opType) resourceType {
panic(fmt.Sprintf("invalid arguments for toResourceType: rwTy = %v, opTy = %v", rwTy, opTy))
}

func buildResourceType(rwTy statistics.RWType, ty core.ResourceKind) resourceType {
switch rwTy {
case statistics.Write:
switch ty {
case core.RegionKind:
return writePeer
case core.LeaderKind:
return writeLeader
}
case statistics.Read:
switch ty {
case core.RegionKind:
return readPeer
case core.LeaderKind:
return readLeader
}
}
panic(fmt.Sprintf("invalid arguments for buildResourceType: rwTy = %v, ty = %v", rwTy, ty))
}

func stringToDim(name string) int {
switch name {
case statistics.BytePriority:
Expand Down
Loading

0 comments on commit ff29b53

Please sign in to comment.