Skip to content

Commit

Permalink
make concurrent
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Apr 19, 2024
1 parent 882d2e5 commit eedd799
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 35 deletions.
41 changes: 28 additions & 13 deletions pkg/cache/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@ package cache

import (
"github.com/tikv/pd/pkg/btree"
"github.com/tikv/pd/pkg/utils/syncutil"
)

// defaultDegree default btree degree, the depth is h<log(degree)(capacity+1)/2
const defaultDegree = 4

// PriorityQueue queue has priority and preempt
// PriorityQueue is a queue that supports priorities and preemption, and is thread-safe.
type PriorityQueue struct {
items map[uint64]*Entry
btree *btree.BTreeG[*Entry]
capacity int
syncutil.RWMutex
}

// NewPriorityQueue construct of priority queue
// NewPriorityQueue constructs a new instance of a thread-safe priority queue.
func NewPriorityQueue(capacity int) *PriorityQueue {
return &PriorityQueue{
items: make(map[uint64]*Entry),
Expand All @@ -42,8 +44,10 @@ type PriorityQueueItem interface {
ID() uint64
}

// Put put value with priority into queue
// Put inserts a value with a given priority into the queue.
func (pq *PriorityQueue) Put(priority int, value PriorityQueueItem) bool {
pq.Lock()
defer pq.Unlock()
id := value.ID()
entry, ok := pq.items[id]
if !ok {
Expand All @@ -54,7 +58,9 @@ func (pq *PriorityQueue) Put(priority int, value PriorityQueueItem) bool {
if !found || !min.Less(entry) {
return false
}
pq.Unlock()
pq.Remove(min.Value.ID())
pq.Lock()
}
} else if entry.Priority != priority { // delete before update
pq.btree.Delete(entry)
Expand All @@ -66,29 +72,37 @@ func (pq *PriorityQueue) Put(priority int, value PriorityQueueItem) bool {
return true
}

// Get find entry by id from queue
// Get retrieves an entry by ID from the queue.
func (pq *PriorityQueue) Get(id uint64) *Entry {
pq.RLock()
defer pq.RUnlock()
return pq.items[id]
}

// Peek return the highest priority entry
// Peek returns the highest priority entry without removing it.
func (pq *PriorityQueue) Peek() *Entry {
pq.RLock()
defer pq.RUnlock()
if max, ok := pq.btree.Max(); ok {
return max
}
return nil
}

// Tail return the lowest priority entry
// Tail returns the lowest priority entry without removing it.
func (pq *PriorityQueue) Tail() *Entry {
pq.RLock()
defer pq.RUnlock()
if min, ok := pq.btree.Min(); ok {
return min
}
return nil
}

// Elems return all elements in queue
// Elems returns all elements in the queue.
func (pq *PriorityQueue) Elems() []*Entry {
pq.RLock()
defer pq.RUnlock()
rs := make([]*Entry, pq.Len())
count := 0
pq.btree.Descend(func(i *Entry) bool {
Expand All @@ -99,28 +113,29 @@ func (pq *PriorityQueue) Elems() []*Entry {
return rs
}

// Remove remove value from queue
// Remove deletes an entry from the queue.
func (pq *PriorityQueue) Remove(id uint64) {
pq.Lock()
defer pq.Unlock()
if v, ok := pq.items[id]; ok {
pq.btree.Delete(v)
delete(pq.items, id)
}
}

// Len return queue size
// Len returns the number of elements in the queue.
func (pq *PriorityQueue) Len() int {
// Lock is not necessary for calling Len() on pq.btree as it is assumed to be thread-safe.
return pq.btree.Len()
}

// Entry a pair of region and it's priority
// Entry represents a pair of region and it's priority.
type Entry struct {
Priority int
Value PriorityQueueItem
}

// Less return true if the entry has smaller priority
func (r *Entry) Less(other *Entry) bool {
left := r.Priority
right := other.Priority
return left > right
return r.Priority > other.Priority
}
5 changes: 5 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@ func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int {
return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold)
}

// GetPatrolRegionConcurrency returns the worker count of the patrol.
func (o *PersistConfig) GetPatrolRegionConcurrency() int {
return int(o.GetScheduleConfig().PatrolRegionConcurrency)
}

// GetMaxMovableHotPeerSize returns the max movable hot peer size.
func (o *PersistConfig) GetMaxMovableHotPeerSize() int64 {
return o.GetScheduleConfig().MaxMovableHotPeerSize
Expand Down
7 changes: 7 additions & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
defaultRegionScoreFormulaVersion = "v2"
defaultLeaderSchedulePolicy = "count"
defaultStoreLimitVersion = "v1"
defaultPatrolRegionConcurrency = 8
// DefaultSplitMergeInterval is the default value of config split merge interval.
DefaultSplitMergeInterval = time.Hour
defaultSwitchWitnessInterval = time.Hour
Expand Down Expand Up @@ -305,6 +306,9 @@ type ScheduleConfig struct {
// HaltScheduling is the option to halt the scheduling. Once it's on, PD will halt the scheduling,
// and any other scheduling configs will be ignored.
HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"`

// PatrolRegionConcurrency is the number of workers to patrol region.
PatrolRegionConcurrency uint64 `toml:"patrol-worker-count" json:"patrol-worker-count"`
}

// Clone returns a cloned scheduling configuration.
Expand Down Expand Up @@ -374,6 +378,9 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool)
if !meta.IsDefined("store-limit-version") {
configutil.AdjustString(&c.StoreLimitVersion, defaultStoreLimitVersion)
}
if !meta.IsDefined("patrol-worker-count") {
configutil.AdjustUint64(&c.PatrolRegionConcurrency, defaultPatrolRegionConcurrency)
}

if !meta.IsDefined("enable-joint-consensus") {
c.EnableJointConsensus = defaultEnableJointConsensus
Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type SchedulerConfigProvider interface {
GetHotRegionCacheHitsThreshold() int
GetMaxMovableHotPeerSize() int64
IsTraceRegionFlow() bool
GetPatrolRegionConcurrency() int

GetTolerantSizeRatio() float64
GetLeaderSchedulePolicy() constant.SchedulePolicy
Expand Down Expand Up @@ -117,6 +118,7 @@ type SharedConfigProvider interface {
IsPlacementRulesCacheEnabled() bool
SetHaltScheduling(bool, string)
GetHotRegionCacheHitsThreshold() int
GetPatrolRegionConcurrency() int

// for test purpose
SetPlacementRuleEnabled(bool)
Expand Down
81 changes: 59 additions & 22 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const (
// pushOperatorTickInterval is the interval try to push the operator.
pushOperatorTickInterval = 500 * time.Millisecond

patrolScanRegionLimit = 128 // It takes about 14 minutes to iterate 1 million regions.
patrolScanRegionLimit = 1024 // It takes about 14 minutes to iterate 1 million regions.
// PluginLoad means action for load plugin
PluginLoad = "PluginLoad"
// PluginUnload means action for unload plugin
Expand Down Expand Up @@ -184,6 +184,7 @@ func (c *Coordinator) PatrolRegions() {

// Check priority regions first.
c.checkPriorityRegions()

// Check suspect regions first.
c.checkSuspectRegions()
// Check regions in the waiting list
Expand Down Expand Up @@ -211,6 +212,42 @@ func (c *Coordinator) isSchedulingHalted() bool {
return c.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

var coordinatorPool = sync.Pool{
New: func() any {
return func(w *sync.WaitGroup, r *core.RegionInfo, f func(w *sync.WaitGroup, r *core.RegionInfo)) {
go f(w, r)
}
},
}

func (c *Coordinator) concurrentlyExecute(regionNum int,
getRegionFunc func(int) *core.RegionInfo, executeForRegion func(*core.RegionInfo),
) (key []byte, removeRegions []uint64) {
workers := c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency()
var wg sync.WaitGroup
for i := 0; i < regionNum; i += workers {
// run workers concurrently, and limit the number of workers.
for j := i; j < i+workers && j < regionNum; j += 1 {
curRegion := getRegionFunc(j)
if curRegion == nil {
// Now only for priority regions, it may be removed from the cluster if it is not exist.
removeRegions = append(removeRegions, uint64(j))
continue
}
wg.Add(1)
coordinatorWorker := coordinatorPool.Get().(func(*sync.WaitGroup, *core.RegionInfo, func(*sync.WaitGroup, *core.RegionInfo)))
coordinatorWorker(&wg, curRegion, func(w *sync.WaitGroup, r *core.RegionInfo) {
executeForRegion(r)
w.Done()
})
coordinatorPool.Put(coordinatorWorker)
key = curRegion.GetEndKey()
}
wg.Wait()
}
return
}

func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) {
regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit)
if len(regions) == 0 {
Expand All @@ -219,49 +256,49 @@ func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core
return
}

for _, region := range regions {
c.tryAddOperators(region)
key = region.GetEndKey()
}
key, _ = c.concurrentlyExecute(len(regions), func(i int) *core.RegionInfo {
return regions[i]
}, c.tryAddOperators)

return
}

func (c *Coordinator) checkSuspectRegions() {
for _, id := range c.checkers.GetSuspectRegions() {
region := c.cluster.GetRegion(id)
c.tryAddOperators(region)
}
items := c.checkers.GetSuspectRegions()

c.concurrentlyExecute(len(items), func(i int) *core.RegionInfo {
return c.cluster.GetRegion(items[i])
}, c.tryAddOperators)
}

func (c *Coordinator) checkWaitingRegions() {
items := c.checkers.GetWaitingRegions()
waitingListGauge.Set(float64(len(items)))
for _, item := range items {
region := c.cluster.GetRegion(item.Key)
c.tryAddOperators(region)
}

c.concurrentlyExecute(len(items), func(i int) *core.RegionInfo {
return c.cluster.GetRegion(items[i].Key)
}, c.tryAddOperators)
}

// checkPriorityRegions checks priority regions
func (c *Coordinator) checkPriorityRegions() {
items := c.checkers.GetPriorityRegions()
removes := make([]uint64, 0)
priorityListGauge.Set(float64(len(items)))
for _, id := range items {
region := c.cluster.GetRegion(id)
if region == nil {
removes = append(removes, id)
continue
}

// need to remove the region from priority list if it is not exist
_, removes := c.concurrentlyExecute(len(items), func(i int) *core.RegionInfo {
return c.cluster.GetRegion(items[i])
}, func(region *core.RegionInfo) {
ops := c.checkers.CheckRegion(region)
// it should skip if region needs to merge
if len(ops) == 0 || ops[0].Kind()&operator.OpMerge != 0 {
continue
return
}
if !c.opController.ExceedStoreLimit(ops...) {
c.opController.AddWaitingOperator(ops...)
}
}
})

for _, v := range removes {
c.checkers.RemovePriorityRegions(v)
}
Expand Down
5 changes: 5 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ func (o *PersistOptions) SetMaxReplicas(replicas int) {
o.SetReplicationConfig(v)
}

// GetPatrolRegionConcurrency returns the worker count of the patrol.
func (o *PersistOptions) GetPatrolRegionConcurrency() int {
return int(o.GetScheduleConfig().PatrolRegionConcurrency)
}

var supportedTTLConfigs = []string{
sc.MaxSnapshotCountKey,
sc.MaxMergeRegionSizeKey,
Expand Down

0 comments on commit eedd799

Please sign in to comment.