From 64abc3cc6b08889e2254e8ae9a74076da0758d91 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 25 Sep 2024 13:16:59 +0800 Subject: [PATCH] address comments: remove sleep in failpoint Signed-off-by: lhy1024 --- pkg/schedule/checker/checker_controller.go | 8 ++++- server/cluster/cluster_test.go | 41 +++++++++++----------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 62dd12fa684..e0b2b14bd0c 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -169,7 +169,6 @@ func (c *Controller) PatrolRegions() { start = time.Now() } failpoint.Inject("breakPatrol", func() { - time.Sleep(100 * time.Millisecond) // ensure the regions are handled by the workers failpoint.Return() }) case <-c.ctx.Done(): @@ -492,6 +491,13 @@ func (c *Controller) GetPauseController(name string) (*PauseController, error) { } } +func (c *Controller) IsPatrolRegionChanEmpty() bool { + if c.patrolRegionContext == nil { + return true + } + return len(c.patrolRegionContext.regionChan) == 0 +} + // PatrolRegionContext is used to store the context of patrol regions. type PatrolRegionContext struct { workersCtx context.Context diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 78a1e12c58c..4f4b2ec90e7 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2841,6 +2841,7 @@ func TestCheckCache(t *testing.T) { }, nil, nil, re) defer cleanup() oc := co.GetOperatorController() + checker := co.GetCheckerController() re.NoError(tc.addRegionStore(1, 0)) re.NoError(tc.addRegionStore(2, 0)) @@ -2851,34 +2852,34 @@ func TestCheckCache(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol", `return`)) // case 1: operator cannot be created due to replica-schedule-limit restriction - co.GetWaitGroup().Add(1) - co.PatrolRegions() + checker.PatrolRegions() + re.True(checker.IsPatrolRegionChanEmpty()) re.Empty(oc.GetOperators()) - re.Len(co.GetCheckerController().GetPendingProcessedRegions(), 1) + re.Len(checker.GetPendingProcessedRegions(), 1) // cancel the replica-schedule-limit restriction cfg := tc.GetScheduleConfig() cfg.ReplicaScheduleLimit = 10 tc.SetScheduleConfig(cfg) - co.GetWaitGroup().Add(1) - co.PatrolRegions() + checker.PatrolRegions() + re.True(checker.IsPatrolRegionChanEmpty()) re.Len(oc.GetOperators(), 1) - re.Empty(co.GetCheckerController().GetPendingProcessedRegions()) + re.Empty(checker.GetPendingProcessedRegions()) // case 2: operator cannot be created due to store limit restriction oc.RemoveOperator(oc.GetOperator(1)) tc.SetStoreLimit(1, storelimit.AddPeer, 0) - co.GetWaitGroup().Add(1) - co.PatrolRegions() - re.Len(co.GetCheckerController().GetPendingProcessedRegions(), 1) + checker.PatrolRegions() + re.True(checker.IsPatrolRegionChanEmpty()) + re.Len(checker.GetPendingProcessedRegions(), 1) // cancel the store limit restriction tc.SetStoreLimit(1, storelimit.AddPeer, 10) time.Sleep(time.Second) - co.GetWaitGroup().Add(1) - co.PatrolRegions() + checker.PatrolRegions() + re.True(checker.IsPatrolRegionChanEmpty()) re.Len(oc.GetOperators(), 1) - re.Empty(co.GetCheckerController().GetPendingProcessedRegions()) + re.Empty(checker.GetPendingProcessedRegions()) co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() @@ -2897,6 +2898,7 @@ func TestPatrolRegionConcurrency(t *testing.T) { }, nil, nil, re) defer cleanup() oc := co.GetOperatorController() + checker := co.GetCheckerController() tc.opt.SetSplitMergeInterval(time.Duration(0)) for i := 1; i < 4; i++ { @@ -2912,8 +2914,7 @@ func TestPatrolRegionConcurrency(t *testing.T) { // test patrol region concurrency re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol", `return`)) - co.GetWaitGroup().Add(1) - co.PatrolRegions() + checker.PatrolRegions() testutil.Eventually(re, func() bool { return len(oc.GetOperators()) >= mergeScheduleLimit }) @@ -2924,9 +2925,8 @@ func TestPatrolRegionConcurrency(t *testing.T) { for i := 0; i < 10; i++ { suspectRegions = append(suspectRegions, uint64(i)) } - co.GetCheckerController().AddPendingProcessedRegions(false, suspectRegions...) - co.GetWaitGroup().Add(1) - co.PatrolRegions() + checker.AddPendingProcessedRegions(false, suspectRegions...) + checker.PatrolRegions() testutil.Eventually(re, func() bool { return len(oc.GetOperators()) >= mergeScheduleLimit }) @@ -2980,8 +2980,7 @@ func checkScanLimit(re *require.Assertions, regionCount int, expectScanLimit ... re.NoError(tc.putRegion(region)) } - co.GetWaitGroup().Add(1) - co.PatrolRegions() + co.GetCheckerController().PatrolRegions() defer func() { co.GetSchedulersController().Wait() co.GetWaitGroup().Wait() @@ -3472,9 +3471,9 @@ func BenchmarkPatrolRegion(b *testing.B) { }() <-listen - co.GetWaitGroup().Add(1) b.ResetTimer() - co.PatrolRegions() + checker := co.GetCheckerController() + checker.PatrolRegions() } func waitOperator(re *require.Assertions, co *schedule.Coordinator, regionID uint64) {