diff --git a/pkg/cache/priority_queue.go b/pkg/cache/priority_queue.go index a7ac79090b0..3040fa76747 100644 --- a/pkg/cache/priority_queue.go +++ b/pkg/cache/priority_queue.go @@ -16,19 +16,21 @@ package cache import ( "github.com/tikv/pd/pkg/btree" + "github.com/tikv/pd/pkg/utils/syncutil" ) // defaultDegree default btree degree, the depth is h right + return r.Priority > other.Priority } diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 148a7015d11..cfb6d689b42 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -398,6 +398,11 @@ func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int { return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold) } +// GetPatrolRegionConcurrency returns the worker count of the patrol. +func (o *PersistConfig) GetPatrolRegionConcurrency() int { + return int(o.GetScheduleConfig().PatrolRegionConcurrency) +} + // GetMaxMovableHotPeerSize returns the max movable hot peer size. func (o *PersistConfig) GetMaxMovableHotPeerSize() int64 { return o.GetScheduleConfig().MaxMovableHotPeerSize diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 1f370176383..a0cf6dee627 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -63,6 +63,7 @@ const ( defaultRegionScoreFormulaVersion = "v2" defaultLeaderSchedulePolicy = "count" defaultStoreLimitVersion = "v1" + defaultPatrolRegionConcurrency = 8 // DefaultSplitMergeInterval is the default value of config split merge interval. DefaultSplitMergeInterval = time.Hour defaultSwitchWitnessInterval = time.Hour @@ -305,6 +306,9 @@ type ScheduleConfig struct { // HaltScheduling is the option to halt the scheduling. Once it's on, PD will halt the scheduling, // and any other scheduling configs will be ignored. HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"` + + // PatrolRegionConcurrency is the number of workers to patrol region. + PatrolRegionConcurrency uint64 `toml:"patrol-worker-count" json:"patrol-worker-count"` } // Clone returns a cloned scheduling configuration. @@ -374,6 +378,9 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool) if !meta.IsDefined("store-limit-version") { configutil.AdjustString(&c.StoreLimitVersion, defaultStoreLimitVersion) } + if !meta.IsDefined("patrol-worker-count") { + configutil.AdjustUint64(&c.PatrolRegionConcurrency, defaultPatrolRegionConcurrency) + } if !meta.IsDefined("enable-joint-consensus") { c.EnableJointConsensus = defaultEnableJointConsensus diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index 20c7f0dc2cf..ca9b352722a 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -62,6 +62,7 @@ type SchedulerConfigProvider interface { GetHotRegionCacheHitsThreshold() int GetMaxMovableHotPeerSize() int64 IsTraceRegionFlow() bool + GetPatrolRegionConcurrency() int GetTolerantSizeRatio() float64 GetLeaderSchedulePolicy() constant.SchedulePolicy @@ -117,6 +118,7 @@ type SharedConfigProvider interface { IsPlacementRulesCacheEnabled() bool SetHaltScheduling(bool, string) GetHotRegionCacheHitsThreshold() int + GetPatrolRegionConcurrency() int // for test purpose SetPlacementRuleEnabled(bool) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 35d9c2029a1..f3f357f25a3 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -52,7 +52,7 @@ const ( // pushOperatorTickInterval is the interval try to push the operator. pushOperatorTickInterval = 500 * time.Millisecond - patrolScanRegionLimit = 128 // It takes about 14 minutes to iterate 1 million regions. + patrolScanRegionLimit = 1024 // It takes about 14 minutes to iterate 1 million regions. // PluginLoad means action for load plugin PluginLoad = "PluginLoad" // PluginUnload means action for unload plugin @@ -184,6 +184,7 @@ func (c *Coordinator) PatrolRegions() { // Check priority regions first. c.checkPriorityRegions() + // Check suspect regions first. c.checkSuspectRegions() // Check regions in the waiting list @@ -211,6 +212,42 @@ func (c *Coordinator) isSchedulingHalted() bool { return c.cluster.GetSchedulerConfig().IsSchedulingHalted() } +var coordinatorPool = sync.Pool{ + New: func() any { + return func(w *sync.WaitGroup, r *core.RegionInfo, f func(w *sync.WaitGroup, r *core.RegionInfo)) { + go f(w, r) + } + }, +} + +func (c *Coordinator) concurrentlyExecute(regionNum int, + getRegionFunc func(int) *core.RegionInfo, executeForRegion func(*core.RegionInfo), +) (key []byte, removeRegions []uint64) { + workers := c.cluster.GetCheckerConfig().GetPatrolRegionConcurrency() + var wg sync.WaitGroup + for i := 0; i < regionNum; i += workers { + // run workers concurrently, and limit the number of workers. + for j := i; j < i+workers && j < regionNum; j += 1 { + curRegion := getRegionFunc(j) + if curRegion == nil { + // Now only for priority regions, it may be removed from the cluster if it is not exist. + removeRegions = append(removeRegions, uint64(j)) + continue + } + wg.Add(1) + coordinatorWorker := coordinatorPool.Get().(func(*sync.WaitGroup, *core.RegionInfo, func(*sync.WaitGroup, *core.RegionInfo))) + coordinatorWorker(&wg, curRegion, func(w *sync.WaitGroup, r *core.RegionInfo) { + executeForRegion(r) + w.Done() + }) + coordinatorPool.Put(coordinatorWorker) + key = curRegion.GetEndKey() + } + wg.Wait() + } + return +} + func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) { regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit) if len(regions) == 0 { @@ -219,49 +256,49 @@ func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core return } - for _, region := range regions { - c.tryAddOperators(region) - key = region.GetEndKey() - } + key, _ = c.concurrentlyExecute(len(regions), func(i int) *core.RegionInfo { + return regions[i] + }, c.tryAddOperators) + return } func (c *Coordinator) checkSuspectRegions() { - for _, id := range c.checkers.GetSuspectRegions() { - region := c.cluster.GetRegion(id) - c.tryAddOperators(region) - } + items := c.checkers.GetSuspectRegions() + + c.concurrentlyExecute(len(items), func(i int) *core.RegionInfo { + return c.cluster.GetRegion(items[i]) + }, c.tryAddOperators) } func (c *Coordinator) checkWaitingRegions() { items := c.checkers.GetWaitingRegions() waitingListGauge.Set(float64(len(items))) - for _, item := range items { - region := c.cluster.GetRegion(item.Key) - c.tryAddOperators(region) - } + + c.concurrentlyExecute(len(items), func(i int) *core.RegionInfo { + return c.cluster.GetRegion(items[i].Key) + }, c.tryAddOperators) } // checkPriorityRegions checks priority regions func (c *Coordinator) checkPriorityRegions() { items := c.checkers.GetPriorityRegions() - removes := make([]uint64, 0) priorityListGauge.Set(float64(len(items))) - for _, id := range items { - region := c.cluster.GetRegion(id) - if region == nil { - removes = append(removes, id) - continue - } + + // need to remove the region from priority list if it is not exist + _, removes := c.concurrentlyExecute(len(items), func(i int) *core.RegionInfo { + return c.cluster.GetRegion(items[i]) + }, func(region *core.RegionInfo) { ops := c.checkers.CheckRegion(region) // it should skip if region needs to merge if len(ops) == 0 || ops[0].Kind()&operator.OpMerge != 0 { - continue + return } if !c.opController.ExceedStoreLimit(ops...) { c.opController.AddWaitingOperator(ops...) } - } + }) + for _, v := range removes { c.checkers.RemovePriorityRegions(v) } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 62118dde593..e2abf4cc45a 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -228,6 +228,11 @@ func (o *PersistOptions) SetMaxReplicas(replicas int) { o.SetReplicationConfig(v) } +// GetPatrolRegionConcurrency returns the worker count of the patrol. +func (o *PersistOptions) GetPatrolRegionConcurrency() int { + return int(o.GetScheduleConfig().PatrolRegionConcurrency) +} + var supportedTTLConfigs = []string{ sc.MaxSnapshotCountKey, sc.MaxMergeRegionSizeKey,