From fc6f6f6346019c2b65596d86c1fd42300d600b8b Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 16 Jul 2024 16:24:57 +0800 Subject: [PATCH] fix Signed-off-by: lhy1024 --- pkg/schedule/operator/operator_controller.go | 13 +++------- pkg/schedule/operator/operator_test.go | 14 ++++------- pkg/schedule/operator/waiting_operator.go | 26 +++++++++++++++++++- 3 files changed, 34 insertions(+), 19 deletions(-) diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index 0cc9da263dad..39aae616675e 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -309,7 +309,7 @@ func (oc *Controller) AddWaitingOperator(ops ...*Operator) int { oc.wop.PutOperator(ops[i]) } operatorCounter.WithLabelValues(desc, "put").Inc() - oc.wopStatus.ops[desc]++ + oc.wopStatus.incCount(desc) added++ needPromoted++ } @@ -371,7 +371,7 @@ func (oc *Controller) PromoteWaitingOperator() { _ = op.Cancel(ExceedStoreLimit) oc.buryOperator(op) } - oc.wopStatus.ops[ops[0].Desc()]-- + oc.wopStatus.decCount(ops[0].Desc()) continue } @@ -381,10 +381,10 @@ func (oc *Controller) PromoteWaitingOperator() { _ = op.Cancel(reason) oc.buryOperator(op) } - oc.wopStatus.ops[ops[0].Desc()]-- + oc.wopStatus.decCount(ops[0].Desc()) continue } - oc.wopStatus.ops[ops[0].Desc()]-- + oc.wopStatus.decCount(ops[0].Desc()) break } @@ -438,13 +438,8 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool operatorCounter.WithLabelValues(op.Desc(), "unexpected-status").Inc() return false, NotInCreateStatus } -<<<<<<< HEAD - if !isPromoting && oc.wopStatus.ops[op.Desc()] >= oc.config.GetSchedulerMaxWaitingOperator() { - log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.ops[op.Desc()]), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator())) -======= if !isPromoting && oc.wopStatus.getCount(op.Desc()) >= oc.config.GetSchedulerMaxWaitingOperator() { log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.getCount(op.Desc())), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator())) ->>>>>>> e767c012f (schedule: fix datarace in `operator.check` (#8264)) operatorCounter.WithLabelValues(op.Desc(), "exceed-max-waiting").Inc() return false, ExceedWaitLimit } diff --git a/pkg/schedule/operator/operator_test.go b/pkg/schedule/operator/operator_test.go index 04d94644f572..828ab4be27d3 100644 --- a/pkg/schedule/operator/operator_test.go +++ b/pkg/schedule/operator/operator_test.go @@ -17,11 +17,8 @@ package operator import ( "context" "encoding/json" -<<<<<<< HEAD "fmt" -======= "sync" ->>>>>>> e767c012f (schedule: fix datarace in `operator.check` (#8264)) "sync/atomic" "testing" "time" @@ -534,9 +531,8 @@ func (suite *operatorTestSuite) TestRecord() { suite.Greater(ob.duration.Seconds(), time.Second.Seconds()) } -func TestOperatorCheckConcurrently(t *testing.T) { - re := require.New(t) - region := newTestRegion(1, 1, [2]uint64{1, 1}, [2]uint64{2, 2}) +func (suite *operatorTestSuite) TestOperatorCheckConcurrently() { + region := suite.newTestRegion(1, 1, [2]uint64{1, 1}, [2]uint64{2, 2}) // addPeer1, transferLeader1, removePeer3 steps := []OpStep{ AddPeer{ToStore: 1, PeerID: 1}, @@ -544,15 +540,15 @@ func TestOperatorCheckConcurrently(t *testing.T) { RemovePeer{FromStore: 3}, } op := NewTestOperator(1, &metapb.RegionEpoch{}, OpAdmin|OpLeader|OpRegion, steps...) - re.Equal(constant.Urgent, op.GetPriorityLevel()) - checkSteps(re, op, steps) + suite.Equal(constant.Urgent, op.GetPriorityLevel()) + suite.checkSteps(op, steps) op.Start() var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() - re.Nil(op.Check(region)) + suite.Nil(op.Check(region)) }() } wg.Wait() diff --git a/pkg/schedule/operator/waiting_operator.go b/pkg/schedule/operator/waiting_operator.go index 8f5c72b053b3..8b964933be59 100644 --- a/pkg/schedule/operator/waiting_operator.go +++ b/pkg/schedule/operator/waiting_operator.go @@ -16,6 +16,8 @@ package operator import ( "math/rand" + + "github.com/tikv/pd/pkg/utils/syncutil" ) // priorityWeight is used to represent the weight of different priorities of operators. @@ -106,12 +108,34 @@ func (b *randBuckets) GetOperator() []*Operator { // waitingOperatorStatus is used to limit the count of each kind of operators. type waitingOperatorStatus struct { + mu syncutil.Mutex ops map[string]uint64 } // newWaitingOperatorStatus creates a new waitingOperatorStatus. func newWaitingOperatorStatus() *waitingOperatorStatus { return &waitingOperatorStatus{ - make(map[string]uint64), + ops: make(map[string]uint64), } } + +// incCount increments the count of the given operator kind. +func (s *waitingOperatorStatus) incCount(kind string) { + s.mu.Lock() + defer s.mu.Unlock() + s.ops[kind]++ +} + +// decCount decrements the count of the given operator kind. +func (s *waitingOperatorStatus) decCount(kind string) { + s.mu.Lock() + defer s.mu.Unlock() + s.ops[kind]-- +} + +// getCount returns the count of the given operator kind. +func (s *waitingOperatorStatus) getCount(kind string) uint64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.ops[kind] +}