Skip to content

Commit

Permalink
*: fix region stats check (tikv#7748) (tikv#7840)
Browse files Browse the repository at this point in the history
close tikv#7728

Signed-off-by: Cabinfever_B <[email protected]>

Co-authored-by: Yongbo Jiang <[email protected]>
Co-authored-by: Cabinfever_B <[email protected]>
  • Loading branch information
ti-chi-bot and CabinfeverB authored Feb 23, 2024
1 parent 8df5f5c commit 6eeb69d
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 20 deletions.
20 changes: 10 additions & 10 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ func (r *RegionInfo) LoadedFromStorage() bool {
return r.source == Storage
}

// LoadedFromSync means this region's meta info loaded from region syncer.
// Only used for test.
func (r *RegionInfo) LoadedFromSync() bool {
return r.source == Sync
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCreateOption) *RegionInfo {
regionInfo := &RegionInfo{
Expand Down Expand Up @@ -668,7 +674,7 @@ func (r *RegionInfo) isRegionRecreated() bool {

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)
type RegionGuideFunc func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -681,19 +687,15 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) {
return func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
if origin == nil {
if log.GetLevel() <= zap.DebugLevel {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache, isNew = true, true, true
saveKV, saveCache = true, true
} else {
if origin.LoadedFromStorage() {
isNew = true
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
if r.GetVersion() > o.GetVersion() {
Expand All @@ -719,9 +721,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
saveKV, saveCache = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
} else if log.GetLevel() <= zap.InfoLevel {
if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel {
info("leader changed",
zap.Uint64("region-id", region.GetID()),
zap.Uint64("from", origin.GetLeader().GetStoreId()),
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, _, needSync := RegionGuide(regionA, regionB)
_, _, needSync := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}
Expand Down
25 changes: 21 additions & 4 deletions pkg/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
// RegionStatisticType represents the type of the region's status.
type RegionStatisticType uint32

const emptyStatistic = RegionStatisticType(0)

// region status type
const (
MissPeer RegionStatisticType = 1 << iota
Expand Down Expand Up @@ -163,6 +165,9 @@ func (r *RegionStatistics) deleteOfflineEntry(deleteIndex RegionStatisticType, r
// due to some special state types.
func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool {
regionID := region.GetID()
if !r.isObserved(regionID) {
return true
}
if r.IsRegionStatsType(regionID, OversizedRegion) !=
region.IsOversized(int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxSize()), int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxKeys())) {
return true
Expand All @@ -171,6 +176,14 @@ func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool {
region.NeedMerge(int64(r.conf.GetMaxMergeRegionSize()), int64(r.conf.GetMaxMergeRegionKeys()))
}

// isObserved returns whether the region is observed. And it also shows whether PD received heartbeat of this region.
func (r *RegionStatistics) isObserved(id uint64) bool {
r.RLock()
defer r.RUnlock()
_, ok := r.index[id]
return ok
}

// Observe records the current regions' status.
func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo) {
r.Lock()
Expand Down Expand Up @@ -275,10 +288,11 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
r.deleteOfflineEntry(deleteIndex, regionID)
r.offlineIndex[regionID] = offlinePeerTypeIndex

if oldIndex, ok := r.index[regionID]; ok {
deleteIndex = oldIndex &^ peerTypeIndex
// Remove the info if any of the conditions are not met any more.
if oldIndex, ok := r.index[regionID]; ok && oldIndex > emptyStatistic {
deleteIndex := oldIndex &^ peerTypeIndex
r.deleteEntry(deleteIndex, regionID)
}
r.deleteEntry(deleteIndex, regionID)
r.index[regionID] = peerTypeIndex
}

Expand All @@ -287,7 +301,10 @@ func (r *RegionStatistics) ClearDefunctRegion(regionID uint64) {
r.Lock()
defer r.Unlock()
if oldIndex, ok := r.index[regionID]; ok {
r.deleteEntry(oldIndex, regionID)
delete(r.index, regionID)
if oldIndex > emptyStatistic {
r.deleteEntry(oldIndex, regionID)
}
}
if oldIndex, ok := r.offlineIndex[regionID]; ok {
r.deleteOfflineEntry(oldIndex, regionID)
Expand Down
2 changes: 1 addition & 1 deletion server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (suite *regionTestSuite) TestRegionCheck() {
r7 := make([]*histItem, 1)
suite.NoError(tu.ReadGetJSON(re, testDialClient, url, &r7))
histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}}
suite.Equal(histKeys, r7)
re.Equal(histKeys, r7)
}

func (suite *regionTestSuite) TestRegions() {
Expand Down
5 changes: 2 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,9 +1027,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !saveKV && !saveCache && !isNew {
saveKV, saveCache, needSync := regionGuide(region, origin)
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand Down
2 changes: 1 addition & 1 deletion server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
}
_, saveKV, _, _ := regionGuide(region, origin)
saveKV, _, _ := regionGuide(region, origin)
overlaps := bc.PutRegion(region)

if hasBuckets {
Expand Down
94 changes: 94 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/testutil"
Expand Down Expand Up @@ -180,6 +181,99 @@ func TestDamagedRegion(t *testing.T) {
re.Equal(uint64(1), rc.GetOperatorController().OperatorCount(operator.OpAdmin))
}

func TestRegionStatistics(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tc, err := tests.NewTestCluster(ctx, 2)
defer tc.Destroy()
re.NoError(err)

err = tc.RunInitialServers()
re.NoError(err)

leaderName := tc.WaitLeader()
leaderServer := tc.GetServer(leaderName)
grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr())
clusterID := leaderServer.GetClusterID()
bootstrapCluster(re, clusterID, grpcPDClient)
rc := leaderServer.GetRaftCluster()

region := &metapb.Region{
Id: 10,
StartKey: []byte("abc"),
EndKey: []byte("xyz"),
Peers: []*metapb.Peer{
{Id: 101, StoreId: 1},
{Id: 102, StoreId: 2},
{Id: 103, StoreId: 3},
{Id: 104, StoreId: 4, Role: metapb.PeerRole_Learner},
},
}

// To put region.
regionInfo := core.NewRegionInfo(region, region.Peers[0], core.SetApproximateSize(0))
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
regions := rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Len(regions, 1)

// wait for sync region
time.Sleep(1000 * time.Millisecond)

leaderServer.ResignLeader()
newLeaderName := tc.WaitLeader()
re.NotEqual(newLeaderName, leaderName)
leaderServer = tc.GetServer(newLeaderName)
rc = leaderServer.GetRaftCluster()
r := rc.GetRegion(region.Id)
re.NotNil(r)
re.True(r.LoadedFromSync())
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Empty(regions)
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Len(regions, 1)

leaderServer.ResignLeader()
newLeaderName = tc.WaitLeader()
re.Equal(newLeaderName, leaderName)
leaderServer = tc.GetServer(newLeaderName)
rc = leaderServer.GetRaftCluster()
re.NotNil(r)
re.True(r.LoadedFromStorage() || r.LoadedFromSync())
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Empty(regions)
regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30))
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
rc = leaderServer.GetRaftCluster()
r = rc.GetRegion(region.Id)
re.NotNil(r)
re.False(r.LoadedFromStorage() && r.LoadedFromSync())

leaderServer.ResignLeader()
newLeaderName = tc.WaitLeader()
re.NotEqual(newLeaderName, leaderName)
leaderServer.ResignLeader()
newLeaderName = tc.WaitLeader()
re.Equal(newLeaderName, leaderName)
leaderServer = tc.GetServer(newLeaderName)
rc = leaderServer.GetRaftCluster()
r = rc.GetRegion(region.Id)
re.NotNil(r)
re.False(r.LoadedFromStorage() && r.LoadedFromSync())
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Empty(regions)

regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30))
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Len(regions, 1)
}

func TestStaleRegion(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 6eeb69d

Please sign in to comment.