Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Aug 1, 2024
1 parent c25a1dd commit da33a00
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 36 deletions.
78 changes: 45 additions & 33 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package checker
import (
"bytes"
"context"
"strconv"
"time"

"github.com/pingcap/failpoint"
Expand All @@ -39,11 +40,11 @@ const (
checkSuspectRangesInterval = 100 * time.Millisecond
// DefaultPendingRegionCacheSize is the default length of waiting list.
DefaultPendingRegionCacheSize = 100000
// For 1,024,000 regions, patrolRegionScanLimit is 1000, which is max(minPatrolRegionScanLimit, 1,024,000/patrolRegionPartition)
// For 1,024,000 regions, patrolRegionScanLimit is 1000, which is max(MinPatrolRegionScanLimit, 1,024,000/patrolRegionPartition)
// In order to avoid the patrolRegionScanLimit to be too big or too small, it will be limited to [128,8192].
// It takes about 10s to iterate 1,024,000 regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered.
minPatrolRegionScanLimit = 128
maxPatrolScanRegionLimit = 8192
MinPatrolRegionScanLimit = 128
MaxPatrolScanRegionLimit = 8192
patrolRegionPartition = 1024
)

Expand Down Expand Up @@ -121,39 +122,39 @@ func (c *Controller) PatrolRegions() {
select {
case <-ticker.C:
c.updateTickerIfNeeded(ticker)
if c.cluster.IsSchedulingHalted() {
continue
}

// Check priority regions first.
c.checkPriorityRegions()
// Check pending processed regions first.
c.checkPendingProcessedRegions()

key, regions = c.checkRegions(key)
if len(regions) == 0 {
continue
}
// Updates the label level isolation statistics.
c.cluster.UpdateRegionsLabelLevelStats(regions)
// When the key is nil, it means that the scan is finished.
if len(key) == 0 {
// update the scan limit.
c.patrolRegionScanLimit = calculateScanLimit(c.cluster)
// update the metrics.
dur := time.Since(start)
patrolCheckRegionsGauge.Set(dur.Seconds())
c.setPatrolRegionsDuration(dur)
start = time.Now()
}
failpoint.Inject("breakPatrol", func() {
failpoint.Return()
})
case <-c.ctx.Done():
patrolCheckRegionsGauge.Set(0)
c.setPatrolRegionsDuration(0)
return
}
if c.cluster.IsSchedulingHalted() {
continue
}

// Check priority regions first.
c.checkPriorityRegions()
// Check pending processed regions first.
c.checkPendingProcessedRegions()

key, regions = c.checkRegions(key)
if len(regions) == 0 {
continue
}
// Updates the label level isolation statistics.
c.cluster.UpdateRegionsLabelLevelStats(regions)
// When the key is nil, it means that the scan is finished.
if len(key) == 0 {
// update the scan limit.
c.patrolRegionScanLimit = calculateScanLimit(c.cluster)
// update the metrics.
dur := time.Since(start)
patrolCheckRegionsGauge.Set(dur.Seconds())
c.setPatrolRegionsDuration(dur)
start = time.Now()
}
failpoint.Inject("breakPatrol", func() {
failpoint.Break()
})
}
}

Expand Down Expand Up @@ -451,7 +452,18 @@ func (c *Controller) updateTickerIfNeeded(ticker *time.Ticker) {
}
}

// GetPatrolRegionScanLimit returns the limit of regions to scan.
// It only used for test.
func (c *Controller) GetPatrolRegionScanLimit() int {
return c.patrolRegionScanLimit
}

func calculateScanLimit(cluster sche.CheckerCluster) int {
scanlimit := max(minPatrolRegionScanLimit, cluster.GetTotalRegionCount()/patrolRegionPartition)
return min(scanlimit, maxPatrolScanRegionLimit)
regionCount := cluster.GetTotalRegionCount()
failpoint.Inject("regionCount", func(val failpoint.Value) {
c, _ := strconv.ParseInt(val.(string), 10, 64)
regionCount = int(c)
})
scanlimit := max(MinPatrolRegionScanLimit, regionCount/patrolRegionPartition)
return min(scanlimit, MaxPatrolScanRegionLimit)
}
57 changes: 54 additions & 3 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/progress"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/checker"
sc "github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
Expand Down Expand Up @@ -2183,18 +2184,24 @@ func newTestRegions(n, m, np uint64) []*core.RegionInfo {
peers := make([]*metapb.Peer, 0, np)
for j := uint64(0); j < np; j++ {
peer := &metapb.Peer{
Id: i*np + j,
Id: 100000000 + i*np + j,
}
peer.StoreId = (i + j) % m
peers = append(peers, peer)
}
region := &metapb.Region{
Id: i,
Peers: peers,
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
StartKey: []byte(fmt.Sprintf("a%20d", i)),
EndKey: []byte(fmt.Sprintf("a%20d", i+1)),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2},
}
if i == 0 {
region.StartKey = []byte("")
}
if i == n-1 {
region.EndKey = []byte("")
}
regions = append(regions, core.NewRegionInfo(region, peers[0], core.SetApproximateSize(100), core.SetApproximateKeys(1000)))
}
return regions
Expand Down Expand Up @@ -2880,6 +2887,50 @@ func TestCheckCache(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol"))
}

func TestScanLimit(t *testing.T) {
re := require.New(t)

checkScanLimit(re, 1000, checker.MinPatrolRegionScanLimit)
checkScanLimit(re, 10000)
checkScanLimit(re, 100000)
checkScanLimit(re, 1000000)
checkScanLimit(re, 10000000, checker.MaxPatrolScanRegionLimit)
}

func checkScanLimit(re *require.Assertions, regionCount int, expectScanLimit ...int) {
tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) {

Check failure on line 2901 in server/cluster/cluster_test.go

View workflow job for this annotation

GitHub Actions / statics

unused-parameter: parameter 'cfg' seems to be unused, consider removing or renaming it to match ^_ (revive)
}, nil, nil, re)
defer cleanup()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol", `return`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/checker/regionCount", fmt.Sprintf("return(\"%d\")", regionCount)))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/breakPatrol"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/checker/regionCount"))
}()

re.NoError(tc.addRegionStore(1, 0))
re.NoError(tc.addRegionStore(2, 0))
re.NoError(tc.addRegionStore(3, 0))
regions := newTestRegions(10, 3, 3)
for _, region := range regions {
re.NoError(tc.putRegion(region))
}

co.GetWaitGroup().Add(1)
co.PatrolRegions()
defer func() {
co.GetSchedulersController().Wait()
co.GetWaitGroup().Wait()
}()

limit := co.GetCheckerController().GetPatrolRegionScanLimit()
re.LessOrEqual(checker.MinPatrolRegionScanLimit, limit)
re.GreaterOrEqual(checker.MaxPatrolScanRegionLimit, limit)
if len(expectScanLimit) > 0 {
re.Equal(expectScanLimit[0], limit)
}
}

func TestPeerState(t *testing.T) {
re := require.New(t)

Expand Down

0 comments on commit da33a00

Please sign in to comment.