Skip to content

Commit

Permalink
Fix the bug causing incorrect reference count after a cache reset
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed May 20, 2024
1 parent 6ded6f8 commit 9659be7
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 21 deletions.
31 changes: 10 additions & 21 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ func NewRegionsInfo() *RegionsInfo {
learners: make(map[uint64]*regionTree),
witnesses: make(map[uint64]*regionTree),
pendingPeers: make(map[uint64]*regionTree),
overlapTree: newRegionTree(),
overlapTree: newRegionTreeWithCountRef(),
}
}

Expand Down Expand Up @@ -1131,42 +1131,31 @@ func (r *RegionsInfo) updateSubTreeLocked(rangeChanged bool, overlaps []*RegionI
item := &regionItem{region}
r.subRegions[region.GetID()] = item
r.overlapTree.update(item, false)
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem, countRef bool) {
// Add leaders and followers.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64) {
store, ok := peersMap[storeID]
if !ok {
if !countRef {
store = newRegionTree()
} else {
store = newRegionTreeWithCountRef()
}
store = newRegionTree()
peersMap[storeID] = store
}
store.update(item, false)
}

// Add to leaders and followers.
for _, peer := range region.GetVoters() {
storeID := peer.GetStoreId()
if peer.GetId() == region.leader.GetId() {
// Add leader peer to leaders.
setPeer(r.leaders, storeID, item, true)
setPeer(r.leaders, storeID)
} else {
// Add follower peer to followers.
setPeer(r.followers, storeID, item, false)
setPeer(r.followers, storeID)
}
}

// Add other peers.
setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) {
for _, peer := range peers {
storeID := peer.GetStoreId()
setPeer(peersMap, storeID, item, false)
setPeer(peersMap, peer.GetStoreId())
}
}
// Add to learners.
setPeers(r.learners, region.GetLearners())
// Add to witnesses.
setPeers(r.witnesses, region.GetWitnesses())
// Add to PendingPeers
setPeers(r.pendingPeers, region.GetPendingPeers())
}

Expand Down Expand Up @@ -1335,7 +1324,7 @@ func (r *RegionsInfo) RemoveRegion(region *RegionInfo) {
// ResetRegionCache resets the regions info.
func (r *RegionsInfo) ResetRegionCache() {
r.t.Lock()
r.tree = newRegionTree()
r.tree = newRegionTreeWithCountRef()
r.regions = make(map[uint64]*regionItem)
r.t.Unlock()
r.st.Lock()
Expand All @@ -1345,7 +1334,7 @@ func (r *RegionsInfo) ResetRegionCache() {
r.learners = make(map[uint64]*regionTree)
r.witnesses = make(map[uint64]*regionTree)
r.pendingPeers = make(map[uint64]*regionTree)
r.overlapTree = newRegionTree()
r.overlapTree = newRegionTreeWithCountRef()
}

// RemoveRegionFromSubTree removes RegionInfo from regionSubTrees
Expand Down
15 changes: 15 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,3 +1081,18 @@ func TestCheckAndPutSubTree(t *testing.T) {
// should failed to put because the root tree is missing
re.Equal(0, regions.tree.length())
}

func TestCntRefAfterResetRegionCache(t *testing.T) {
re := require.New(t)
regions := NewRegionsInfo()
// Put the region first.
region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b"))
regions.CheckAndPutRegion(region)
re.Equal(int32(2), region.GetRef())
regions.ResetRegionCache()
// Put the region after reset.
region = NewTestRegionInfo(1, 1, []byte("a"), []byte("b"))
re.Zero(region.GetRef())
regions.CheckAndPutRegion(region)
re.Equal(int32(2), region.GetRef())
}

0 comments on commit 9659be7

Please sign in to comment.