Skip to content

Commit

Permalink
Individually check the scheduling halt for online unsafe recovery
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored and ti-chi-bot committed May 8, 2024
1 parent 27fbf10 commit dc697cd
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 34 deletions.
7 changes: 7 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,3 +615,10 @@ func (c *Cluster) DropCacheAllRegion() {
func (c *Cluster) DropCacheRegion(id uint64) {
c.RemoveRegionIfExist(id)
}

// IsSchedulingHalted returns whether the scheduling is halted.
// Currently, the microservice scheduling is halted when:
// - The `HaltScheduling` persist option is set to true.
func (c *Cluster) IsSchedulingHalted() bool {
return c.persistConfig.IsSchedulingHalted()
}
4 changes: 4 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,10 @@ func (o *PersistConfig) SetSplitMergeInterval(splitMergeInterval time.Duration)
o.SetScheduleConfig(v)
}

// SetSchedulingAllowanceStatus sets the scheduling allowance status to help distinguish the source of the halt.
// TODO: support this metrics for the scheduling service in the future.
func (*PersistConfig) SetSchedulingAllowanceStatus(bool, string) {}

// SetHaltScheduling set HaltScheduling.
func (o *PersistConfig) SetHaltScheduling(halt bool, source string) {
v := o.GetScheduleConfig().Clone()
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (s *Service) AskBatchSplit(ctx context.Context, request *schedulingpb.AskBa
}, nil
}

if c.persistConfig.IsSchedulingHalted() {
if c.IsSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.persistConfig.IsTikvRegionSplitEnabled() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func IsSchedulerRegistered(name string) bool {
type SchedulerConfigProvider interface {
SharedConfigProvider

IsSchedulingHalted() bool
SetSchedulingAllowanceStatus(bool, string)
GetStoresLimit() map[uint64]StoreLimitConfig

IsSchedulerDisabled(string) bool
Expand Down
6 changes: 1 addition & 5 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (c *Coordinator) PatrolRegions() {
log.Info("patrol regions has been stopped")
return
}
if c.isSchedulingHalted() {
if c.cluster.IsSchedulingHalted() {
continue
}

Expand Down Expand Up @@ -207,10 +207,6 @@ func (c *Coordinator) PatrolRegions() {
}
}

func (c *Coordinator) isSchedulingHalted() bool {
return c.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) {
regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit)
if len(regions) == 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type SchedulerCluster interface {
GetSchedulerConfig() sc.SchedulerConfigProvider
GetRegionLabeler() *labeler.RegionLabeler
GetStoreConfig() sc.StoreConfigProvider
IsSchedulingHalted() bool
}

// CheckerCluster is an aggregate interface that wraps multiple interfaces
Expand Down
12 changes: 2 additions & 10 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (c *Controller) CollectSchedulerMetrics() {
var allowScheduler float64
// If the scheduler is not allowed to schedule, it will disappear in Grafana panel.
// See issue #1341.
if !s.IsPaused() && !c.isSchedulingHalted() {
if !s.IsPaused() && !c.cluster.IsSchedulingHalted() {
allowScheduler = 1
}
schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler)
Expand All @@ -131,10 +131,6 @@ func (c *Controller) CollectSchedulerMetrics() {
ruleStatusGauge.WithLabelValues("group_count").Set(float64(groupCnt))
}

func (c *Controller) isSchedulingHalted() bool {
return c.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

// ResetSchedulerMetrics resets metrics of all schedulers.
func ResetSchedulerMetrics() {
schedulerStatusGauge.Reset()
Expand Down Expand Up @@ -526,7 +522,7 @@ func (s *ScheduleController) AllowSchedule(diagnosable bool) bool {
}
return false
}
if s.isSchedulingHalted() {
if s.cluster.IsSchedulingHalted() {
if diagnosable {
s.diagnosticRecorder.SetResultFromStatus(Halted)
}
Expand All @@ -541,10 +537,6 @@ func (s *ScheduleController) AllowSchedule(diagnosable bool) bool {
return true
}

func (s *ScheduleController) isSchedulingHalted() bool {
return s.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

// IsPaused returns if a scheduler is paused.
func (s *ScheduleController) IsPaused() bool {
delayUntil := atomic.LoadInt64(&s.delayUntil)
Expand Down
9 changes: 4 additions & 5 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,11 @@ func (u *Controller) GetStage() stage {
}

func (u *Controller) changeStage(stage stage) {
u.stage = stage
// Halt and resume the scheduling once the running state changed.
running := isRunning(stage)
if opt := u.cluster.GetSchedulerConfig(); opt.IsSchedulingHalted() != running {
opt.SetHaltScheduling(running, "online-unsafe-recovery")
// If the running stage changes, update the scheduling allowance status to add or remove "online-unsafe-recovery" halt.
if running := isRunning(stage); running != isRunning(u.stage) {
u.cluster.GetSchedulerConfig().SetSchedulingAllowanceStatus(running, "online-unsafe-recovery")
}
u.stage = stage

var output StageOutput
output.Time = time.Now().Format("2006-01-02 15:04:05.000")
Expand Down
8 changes: 8 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,14 @@ func (c *RaftCluster) SetPDServerConfig(cfg *config.PDServerConfig) {
c.opt.SetPDServerConfig(cfg)
}

// IsSchedulingHalted returns whether the scheduling is halted.
// Currently, the PD scheduling is halted when:
// - The `HaltScheduling` persist option is set to true.
// - Online unsafe recovery is running.
func (c *RaftCluster) IsSchedulingHalted() bool {
return c.opt.IsSchedulingHalted() || c.unsafeRecoveryController.IsRunning()
}

// GetUnsafeRecoveryController returns the unsafe recovery controller.
func (c *RaftCluster) GetUnsafeRecoveryController() *unsaferecovery.Controller {
return c.unsafeRecoveryController
Expand Down
8 changes: 2 additions & 6 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {

// HandleAskSplit handles the split request.
func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) {
if c.isSchedulingHalted() {
if c.IsSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.opt.IsTikvRegionSplitEnabled() {
Expand Down Expand Up @@ -97,13 +97,9 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp
return split, nil
}

func (c *RaftCluster) isSchedulingHalted() bool {
return c.opt.IsSchedulingHalted()
}

// HandleAskBatchSplit handles the batch split request.
func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) {
if c.isSchedulingHalted() {
if c.IsSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.opt.IsTikvRegionSplitEnabled() {
Expand Down
15 changes: 10 additions & 5 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,11 +987,8 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien

var haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling")

// SetHaltScheduling set HaltScheduling.
func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
v := o.GetScheduleConfig().Clone()
v.HaltScheduling = halt
o.SetScheduleConfig(v)
// SetSchedulingAllowanceStatus sets the scheduling allowance status to help distinguish the source of the halt.
func (*PersistOptions) SetSchedulingAllowanceStatus(halt bool, source string) {
if halt {
haltSchedulingStatus.Set(1)
schedulingAllowanceStatusGauge.WithLabelValues(source).Set(1)
Expand All @@ -1001,6 +998,14 @@ func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
}
}

// SetHaltScheduling set HaltScheduling.
func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
v := o.GetScheduleConfig().Clone()
v.HaltScheduling = halt
o.SetScheduleConfig(v)
o.SetSchedulingAllowanceStatus(halt, source)
}

// IsSchedulingHalted returns if PD scheduling is halted.
func (o *PersistOptions) IsSchedulingHalted() bool {
if o == nil {
Expand Down
2 changes: 1 addition & 1 deletion server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func forwardRegionHeartbeatToScheduling(rc *cluster.RaftCluster, forwardStream s
return
}
// TODO: find a better way to halt scheduling immediately.
if rc.GetOpts().IsSchedulingHalted() {
if rc.IsSchedulingHalted() {
continue
}
// The error types defined for schedulingpb and pdpb are different, so we need to convert them.
Expand Down

0 comments on commit dc697cd

Please sign in to comment.