Skip to content

Commit

Permalink
address comments: remove sleep in failpoint
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Sep 25, 2024
1 parent 457da3d commit 64abc3c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 22 deletions.
8 changes: 7 additions & 1 deletion pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ func (c *Controller) PatrolRegions() {
start = time.Now()
}
failpoint.Inject("breakPatrol", func() {
time.Sleep(100 * time.Millisecond) // ensure the regions are handled by the workers
failpoint.Return()
})
case <-c.ctx.Done():
Expand Down Expand Up @@ -492,6 +491,13 @@ func (c *Controller) GetPauseController(name string) (*PauseController, error) {
}
}

func (c *Controller) IsPatrolRegionChanEmpty() bool {

Check failure on line 494 in pkg/schedule/checker/checker_controller.go

View workflow job for this annotation

GitHub Actions / statics

exported: exported method Controller.IsPatrolRegionChanEmpty should have comment or be unexported (revive)
if c.patrolRegionContext == nil {
return true

Check warning on line 496 in pkg/schedule/checker/checker_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/checker/checker_controller.go#L496

Added line #L496 was not covered by tests
}
return len(c.patrolRegionContext.regionChan) == 0
}

// PatrolRegionContext is used to store the context of patrol regions.
type PatrolRegionContext struct {
workersCtx context.Context
Expand Down
41 changes: 20 additions & 21 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2841,6 +2841,7 @@ func TestCheckCache(t *testing.T) {
}, nil, nil, re)
defer cleanup()
oc := co.GetOperatorController()
checker := co.GetCheckerController()

re.NoError(tc.addRegionStore(1, 0))
re.NoError(tc.addRegionStore(2, 0))
Expand All @@ -2851,34 +2852,34 @@ func TestCheckCache(t *testing.T) {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol", `return`))

// case 1: operator cannot be created due to replica-schedule-limit restriction
co.GetWaitGroup().Add(1)
co.PatrolRegions()
checker.PatrolRegions()
re.True(checker.IsPatrolRegionChanEmpty())
re.Empty(oc.GetOperators())
re.Len(co.GetCheckerController().GetPendingProcessedRegions(), 1)
re.Len(checker.GetPendingProcessedRegions(), 1)

// cancel the replica-schedule-limit restriction
cfg := tc.GetScheduleConfig()
cfg.ReplicaScheduleLimit = 10
tc.SetScheduleConfig(cfg)
co.GetWaitGroup().Add(1)
co.PatrolRegions()
checker.PatrolRegions()
re.True(checker.IsPatrolRegionChanEmpty())
re.Len(oc.GetOperators(), 1)
re.Empty(co.GetCheckerController().GetPendingProcessedRegions())
re.Empty(checker.GetPendingProcessedRegions())

// case 2: operator cannot be created due to store limit restriction
oc.RemoveOperator(oc.GetOperator(1))
tc.SetStoreLimit(1, storelimit.AddPeer, 0)
co.GetWaitGroup().Add(1)
co.PatrolRegions()
re.Len(co.GetCheckerController().GetPendingProcessedRegions(), 1)
checker.PatrolRegions()
re.True(checker.IsPatrolRegionChanEmpty())
re.Len(checker.GetPendingProcessedRegions(), 1)

// cancel the store limit restriction
tc.SetStoreLimit(1, storelimit.AddPeer, 10)
time.Sleep(time.Second)
co.GetWaitGroup().Add(1)
co.PatrolRegions()
checker.PatrolRegions()
re.True(checker.IsPatrolRegionChanEmpty())
re.Len(oc.GetOperators(), 1)
re.Empty(co.GetCheckerController().GetPendingProcessedRegions())
re.Empty(checker.GetPendingProcessedRegions())

co.GetSchedulersController().Wait()
co.GetWaitGroup().Wait()
Expand All @@ -2897,6 +2898,7 @@ func TestPatrolRegionConcurrency(t *testing.T) {
}, nil, nil, re)
defer cleanup()
oc := co.GetOperatorController()
checker := co.GetCheckerController()

tc.opt.SetSplitMergeInterval(time.Duration(0))
for i := 1; i < 4; i++ {
Expand All @@ -2912,8 +2914,7 @@ func TestPatrolRegionConcurrency(t *testing.T) {

// test patrol region concurrency
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol", `return`))
co.GetWaitGroup().Add(1)
co.PatrolRegions()
checker.PatrolRegions()
testutil.Eventually(re, func() bool {
return len(oc.GetOperators()) >= mergeScheduleLimit
})
Expand All @@ -2924,9 +2925,8 @@ func TestPatrolRegionConcurrency(t *testing.T) {
for i := 0; i < 10; i++ {
suspectRegions = append(suspectRegions, uint64(i))
}
co.GetCheckerController().AddPendingProcessedRegions(false, suspectRegions...)
co.GetWaitGroup().Add(1)
co.PatrolRegions()
checker.AddPendingProcessedRegions(false, suspectRegions...)
checker.PatrolRegions()
testutil.Eventually(re, func() bool {
return len(oc.GetOperators()) >= mergeScheduleLimit
})
Expand Down Expand Up @@ -2980,8 +2980,7 @@ func checkScanLimit(re *require.Assertions, regionCount int, expectScanLimit ...
re.NoError(tc.putRegion(region))
}

co.GetWaitGroup().Add(1)
co.PatrolRegions()
co.GetCheckerController().PatrolRegions()
defer func() {
co.GetSchedulersController().Wait()
co.GetWaitGroup().Wait()
Expand Down Expand Up @@ -3472,9 +3471,9 @@ func BenchmarkPatrolRegion(b *testing.B) {
}()
<-listen

co.GetWaitGroup().Add(1)
b.ResetTimer()
co.PatrolRegions()
checker := co.GetCheckerController()
checker.PatrolRegions()
}

func waitOperator(re *require.Assertions, co *schedule.Coordinator, regionID uint64) {
Expand Down

0 comments on commit 64abc3c

Please sign in to comment.