Skip to content

Commit

Permalink
Merge branch 'release-6.1' into cherry-pick-7252-to-release-6.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Feb 28, 2024
2 parents 61253b9 + 63bf970 commit 91f5a45
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 37 deletions.
2 changes: 2 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ import (
var (
// DefaultMinResolvedTSPersistenceInterval is the default value of min resolved ts persistence interval.
DefaultMinResolvedTSPersistenceInterval = 10 * time.Second

denySchedulersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues("schedulers", "deny")
)

// regionLabelGCInterval is the interval to run region-label's GC work.
Expand Down
17 changes: 16 additions & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,8 +893,23 @@ func (s *scheduleController) Schedule() []*operator.Operator {
}
cacheCluster := newCacheCluster(s.cluster)
// If we have schedule, reset interval to the minimal interval.
if ops := s.Scheduler.Schedule(cacheCluster); len(ops) > 0 {
ops := s.Scheduler.Schedule(cacheCluster)
foundDisabled := false
for _, op := range ops {
if labelMgr := s.cluster.GetRegionLabeler(); labelMgr != nil {
if labelMgr.ScheduleDisabled(s.cluster.GetRegion(op.RegionID())) {
denySchedulersByLabelerCounter.Inc()
foundDisabled = true
break
}
}
}
if len(ops) > 0 {
s.nextInterval = s.Scheduler.GetMinInterval()
// try regenerating operators
if foundDisabled {
continue
}
return ops
}
}
Expand Down
52 changes: 50 additions & 2 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,22 @@ func (s *testCoordinatorSuite) TestCheckRegionWithScheduleDeny(c *C) {
Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}},
})

// should allow to do rule checker
c.Assert(labelerManager.ScheduleDisabled(region), IsTrue)
s.checkRegion(c, tc, co, 1, 0)
s.checkRegion(c, tc, co, 1, 1)
// should not allow to merge
tc.opt.SetSplitMergeInterval(time.Duration(0))

c.Assert(tc.addLeaderRegion(2, 2, 3, 4), IsNil)
c.Assert(tc.addLeaderRegion(3, 2, 3, 4), IsNil)
region = tc.GetRegion(2)
c.Assert(labelerManager.ScheduleDisabled(region), IsTrue)
s.checkRegion(c, tc, co, 2, 0)

// delete label rule, should allow to do merge
labelerManager.DeleteLabelRule("schedulelabel")
c.Assert(labelerManager.ScheduleDisabled(region), IsFalse)
s.checkRegion(c, tc, co, 1, 1)
s.checkRegion(c, tc, co, 2, 2)
}

func (s *testCoordinatorSuite) TestCheckerIsBusy(c *C) {
Expand Down Expand Up @@ -864,6 +875,43 @@ func (s *testCoordinatorSuite) TestRemoveScheduler(c *C) {
co.wg.Wait()
}

func (s *testCoordinatorSuite) TestDenyScheduler(c *C) {
tc, co, cleanup := prepare(nil, nil, func(co *coordinator) {
labelerManager := co.cluster.GetRegionLabeler()
labelerManager.SetLabelRule(&labeler.LabelRule{
ID: "schedulelabel",
Labels: []labeler.RegionLabel{{Key: "schedule", Value: "deny"}},
RuleType: labeler.KeyRange,
Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}},
})
co.run()
}, c)
defer cleanup()

c.Assert(len(co.schedulers), Equals, len(config.DefaultSchedulers))

// Transfer peer from store 4 to store 1 if not set deny.
c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addRegionStore(3, 30), IsNil)
c.Assert(tc.addRegionStore(2, 20), IsNil)
c.Assert(tc.addRegionStore(1, 10), IsNil)
c.Assert(tc.addLeaderRegion(1, 2, 3, 4), IsNil)

// Transfer leader from store 4 to store 2 if not set deny.
c.Assert(tc.updateLeaderCount(4, 1000), IsNil)
c.Assert(tc.updateLeaderCount(3, 50), IsNil)
c.Assert(tc.updateLeaderCount(2, 20), IsNil)
c.Assert(tc.updateLeaderCount(1, 10), IsNil)
c.Assert(tc.addLeaderRegion(2, 4, 3, 2), IsNil)

// there should no balance leader/region operator
for i := 0; i < 10; i++ {
c.Assert(co.opController.GetOperator(1), IsNil)
c.Assert(co.opController.GetOperator(2), IsNil)
time.Sleep(10 * time.Millisecond)
}
}

func (s *testCoordinatorSuite) TestRestart(c *C) {
tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) {
// Turn off balance, we test add replica only.
Expand Down
18 changes: 11 additions & 7 deletions server/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
// DefaultCacheSize is the default length of waiting list.
const DefaultCacheSize = 1000

var denyCheckersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues("checkers", "deny")

// Controller is used to manage all checkers.
type Controller struct {
cluster schedule.Cluster
Expand Down Expand Up @@ -80,13 +82,6 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
return []*operator.Operator{op}
}

if cl, ok := c.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
l := cl.GetRegionLabeler()
if l.ScheduleDisabled(region) {
return nil
}
}

if op := c.splitChecker.Check(region); op != nil {
return []*operator.Operator{op}
}
Expand All @@ -112,6 +107,15 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
c.regionWaitingList.Put(region.GetID(), nil)
}
}
// skip the joint checker, split checker and rule checker when region label is set to "schedule=deny".
// those checkers is help to make region health, it's necessary to skip them when region is set to deny.
if cl, ok := c.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
l := cl.GetRegionLabeler()
if l.ScheduleDisabled(region) {
denyCheckersByLabelerCounter.Inc()
return nil
}
}

if c.mergeChecker != nil {
allowed := opController.OperatorCount(operator.OpMerge) < c.opts.GetMergeScheduleLimit()
Expand Down
10 changes: 10 additions & 0 deletions server/schedule/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ var (
Name: "scatter_distribution",
Help: "Counter of the distribution in scatter.",
}, []string{"store", "is_leader", "engine"})

// LabelerEventCounter is a counter of the scheduler labeler system.
LabelerEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "labeler_event_counter",
Help: "Counter of the scheduler label.",
}, []string{"type", "event"})
)

func init() {
Expand All @@ -94,4 +103,5 @@ func init() {
prometheus.MustRegister(scatterCounter)
prometheus.MustRegister(scatterDistributionCounter)
prometheus.MustRegister(operatorSizeHist)
prometheus.MustRegister(LabelerEventCounter)
}
9 changes: 0 additions & 9 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/core/storelimit"
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/operator"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -421,14 +420,6 @@ func (oc *OperatorController) checkAddOperator(isPromoting bool, ops ...*operato
if op.SchedulerKind() == operator.OpAdmin || op.IsLeaveJointStateOperator() {
continue
}
if cl, ok := oc.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
l := cl.GetRegionLabeler()
if l.ScheduleDisabled(region) {
log.Debug("schedule disabled", zap.Uint64("region-id", op.RegionID()))
operatorWaitCounter.WithLabelValues(op.Desc(), "schedule-disabled").Inc()
return false
}
}
}
expired := false
for _, op := range ops {
Expand Down
20 changes: 2 additions & 18 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,23 +752,7 @@ func (t *testOperatorControllerSuite) TestAddWaitingOperator(c *C) {
RuleType: labeler.KeyRange,
Data: []interface{}{map[string]interface{}{"start_key": "1a", "end_key": "1b"}},
})

c.Assert(labelerManager.ScheduleDisabled(source), IsTrue)
// add operator should be failed since it is labeled with `schedule=deny`.
c.Assert(controller.AddWaitingOperator(ops...), Equals, 0)

// add operator should be success without `schedule=deny`
labelerManager.DeleteLabelRule("schedulelabel")
labelerManager.ScheduleDisabled(source)
c.Assert(labelerManager.ScheduleDisabled(source), IsFalse)
// now there is one operator being allowed to add, if it is a merge operator
// both of the pair are allowed
ops, err = operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge)
c.Assert(err, IsNil)
c.Assert(ops, HasLen, 2)
c.Assert(controller.AddWaitingOperator(ops...), Equals, 2)
c.Assert(controller.AddWaitingOperator(ops...), Equals, 0)

// no space left, new operator can not be added.
c.Assert(controller.AddWaitingOperator(addPeerOp(0)), Equals, 0)
// add operator should be success since it is not check in addWaitingOperator
c.Assert(2, Equals, controller.AddWaitingOperator(ops...))
}

0 comments on commit 91f5a45

Please sign in to comment.