From da33a00f1dde82428b05f8a7fbace15033e6fdf6 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 1 Aug 2024 18:55:05 +0800 Subject: [PATCH] add unit test Signed-off-by: lhy1024 --- pkg/schedule/checker/checker_controller.go | 78 +++++++++++++--------- server/cluster/cluster_test.go | 57 +++++++++++++++- 2 files changed, 99 insertions(+), 36 deletions(-) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 40acd053013..e4baccee2bf 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -17,6 +17,7 @@ package checker import ( "bytes" "context" + "strconv" "time" "github.com/pingcap/failpoint" @@ -39,11 +40,11 @@ const ( checkSuspectRangesInterval = 100 * time.Millisecond // DefaultPendingRegionCacheSize is the default length of waiting list. DefaultPendingRegionCacheSize = 100000 - // For 1,024,000 regions, patrolRegionScanLimit is 1000, which is max(minPatrolRegionScanLimit, 1,024,000/patrolRegionPartition) + // For 1,024,000 regions, patrolRegionScanLimit is 1000, which is max(MinPatrolRegionScanLimit, 1,024,000/patrolRegionPartition) // In order to avoid the patrolRegionScanLimit to be too big or too small, it will be limited to [128,8192]. // It takes about 10s to iterate 1,024,000 regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered. - minPatrolRegionScanLimit = 128 - maxPatrolScanRegionLimit = 8192 + MinPatrolRegionScanLimit = 128 + MaxPatrolScanRegionLimit = 8192 patrolRegionPartition = 1024 ) @@ -121,39 +122,39 @@ func (c *Controller) PatrolRegions() { select { case <-ticker.C: c.updateTickerIfNeeded(ticker) + if c.cluster.IsSchedulingHalted() { + continue + } + + // Check priority regions first. + c.checkPriorityRegions() + // Check pending processed regions first. + c.checkPendingProcessedRegions() + + key, regions = c.checkRegions(key) + if len(regions) == 0 { + continue + } + // Updates the label level isolation statistics. + c.cluster.UpdateRegionsLabelLevelStats(regions) + // When the key is nil, it means that the scan is finished. + if len(key) == 0 { + // update the scan limit. + c.patrolRegionScanLimit = calculateScanLimit(c.cluster) + // update the metrics. + dur := time.Since(start) + patrolCheckRegionsGauge.Set(dur.Seconds()) + c.setPatrolRegionsDuration(dur) + start = time.Now() + } + failpoint.Inject("breakPatrol", func() { + failpoint.Return() + }) case <-c.ctx.Done(): patrolCheckRegionsGauge.Set(0) c.setPatrolRegionsDuration(0) return } - if c.cluster.IsSchedulingHalted() { - continue - } - - // Check priority regions first. - c.checkPriorityRegions() - // Check pending processed regions first. - c.checkPendingProcessedRegions() - - key, regions = c.checkRegions(key) - if len(regions) == 0 { - continue - } - // Updates the label level isolation statistics. - c.cluster.UpdateRegionsLabelLevelStats(regions) - // When the key is nil, it means that the scan is finished. - if len(key) == 0 { - // update the scan limit. - c.patrolRegionScanLimit = calculateScanLimit(c.cluster) - // update the metrics. - dur := time.Since(start) - patrolCheckRegionsGauge.Set(dur.Seconds()) - c.setPatrolRegionsDuration(dur) - start = time.Now() - } - failpoint.Inject("breakPatrol", func() { - failpoint.Break() - }) } } @@ -451,7 +452,18 @@ func (c *Controller) updateTickerIfNeeded(ticker *time.Ticker) { } } +// GetPatrolRegionScanLimit returns the limit of regions to scan. +// It only used for test. +func (c *Controller) GetPatrolRegionScanLimit() int { + return c.patrolRegionScanLimit +} + func calculateScanLimit(cluster sche.CheckerCluster) int { - scanlimit := max(minPatrolRegionScanLimit, cluster.GetTotalRegionCount()/patrolRegionPartition) - return min(scanlimit, maxPatrolScanRegionLimit) + regionCount := cluster.GetTotalRegionCount() + failpoint.Inject("regionCount", func(val failpoint.Value) { + c, _ := strconv.ParseInt(val.(string), 10, 64) + regionCount = int(c) + }) + scanlimit := max(MinPatrolRegionScanLimit, regionCount/patrolRegionPartition) + return min(scanlimit, MaxPatrolScanRegionLimit) } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index cd7f94e001a..60c3a5c2126 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -44,6 +44,7 @@ import ( "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/progress" "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/checker" sc "github.com/tikv/pd/pkg/schedule/config" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" @@ -2183,7 +2184,7 @@ func newTestRegions(n, m, np uint64) []*core.RegionInfo { peers := make([]*metapb.Peer, 0, np) for j := uint64(0); j < np; j++ { peer := &metapb.Peer{ - Id: i*np + j, + Id: 100000000 + i*np + j, } peer.StoreId = (i + j) % m peers = append(peers, peer) @@ -2191,10 +2192,16 @@ func newTestRegions(n, m, np uint64) []*core.RegionInfo { region := &metapb.Region{ Id: i, Peers: peers, - StartKey: []byte{byte(i)}, - EndKey: []byte{byte(i + 1)}, + StartKey: []byte(fmt.Sprintf("a%20d", i)), + EndKey: []byte(fmt.Sprintf("a%20d", i+1)), RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2}, } + if i == 0 { + region.StartKey = []byte("") + } + if i == n-1 { + region.EndKey = []byte("") + } regions = append(regions, core.NewRegionInfo(region, peers[0], core.SetApproximateSize(100), core.SetApproximateKeys(1000))) } return regions @@ -2880,6 +2887,50 @@ func TestCheckCache(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol")) } +func TestScanLimit(t *testing.T) { + re := require.New(t) + + checkScanLimit(re, 1000, checker.MinPatrolRegionScanLimit) + checkScanLimit(re, 10000) + checkScanLimit(re, 100000) + checkScanLimit(re, 1000000) + checkScanLimit(re, 10000000, checker.MaxPatrolScanRegionLimit) +} + +func checkScanLimit(re *require.Assertions, regionCount int, expectScanLimit ...int) { + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + }, nil, nil, re) + defer cleanup() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol", `return`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/regionCount", fmt.Sprintf("return(\"%d\")", regionCount))) + defer func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/regionCount")) + }() + + re.NoError(tc.addRegionStore(1, 0)) + re.NoError(tc.addRegionStore(2, 0)) + re.NoError(tc.addRegionStore(3, 0)) + regions := newTestRegions(10, 3, 3) + for _, region := range regions { + re.NoError(tc.putRegion(region)) + } + + co.GetWaitGroup().Add(1) + co.PatrolRegions() + defer func() { + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + }() + + limit := co.GetCheckerController().GetPatrolRegionScanLimit() + re.LessOrEqual(checker.MinPatrolRegionScanLimit, limit) + re.GreaterOrEqual(checker.MaxPatrolScanRegionLimit, limit) + if len(expectScanLimit) > 0 { + re.Equal(expectScanLimit[0], limit) + } +} + func TestPeerState(t *testing.T) { re := require.New(t)