From 7dc47e5da64c5fb5115591da842b7c838bfa6ff3 Mon Sep 17 00:00:00 2001 From: husharp Date: Thu, 22 Feb 2024 18:25:12 +0800 Subject: [PATCH] cp Signed-off-by: husharp --- pkg/core/region.go | 14 ++++++++++++- pkg/core/region_tree.go | 7 +++++++ server/cluster/cluster.go | 4 ---- server/cluster/coordinator_test.go | 18 +++++++--------- server/cluster/prepare_checker.go | 31 +++++++--------------------- tests/pdctl/helper.go | 1 + tests/server/cluster/cluster_test.go | 2 +- 7 files changed, 37 insertions(+), 40 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index cfe24ba0213..fa9f22936c4 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1296,6 +1296,18 @@ func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int { return r.tree.notFromStorageRegionsCnt } +// GetNotFromStorageRegionsCntByStore gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID. +func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int { + r.st.RLock() + defer r.st.RUnlock() + return r.getNotFromStorageRegionsCntByStoreLocked(storeID) +} + +// getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID. +func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int { + return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount() +} + // GetMetaRegions gets a set of metapb.Region from regionMap func (r *RegionsInfo) GetMetaRegions() []*metapb.Region { r.t.RLock() @@ -1329,7 +1341,7 @@ func (r *RegionsInfo) GetStoreRegionCount(storeID uint64) int { return r.getStoreRegionCountLocked(storeID) } -// GetStoreRegionCount gets the total count of a store's leader, follower and learner RegionInfo by storeID +// getStoreRegionCountLocked gets the total count of a store's leader, follower and learner RegionInfo by storeID func (r *RegionsInfo) getStoreRegionCountLocked(storeID uint64) int { return r.leaders[storeID].length() + r.followers[storeID].length() + r.learners[storeID].length() } diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index ed3445de6b6..ecc988d97d8 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -82,6 +82,13 @@ func (t *regionTree) length() int { return t.tree.Len() } +func (t *regionTree) notFromStorageRegionsCount() int { + if t == nil { + return 0 + } + return t.notFromStorageRegionsCnt +} + // GetOverlaps returns the range items that has some intersections with the given items. func (t *regionTree) overlaps(item *regionItem) []*regionItem { // note that Find() gets the last item that is less or equal than the item. diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index e0a4e00be03..df0115826c8 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1069,10 +1069,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { c.regionStats.Observe(region, c.getRegionStoresLocked(region)) } - if !c.IsPrepared() && isNew { - c.coordinator.prepareChecker.collect(region) - } - if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it. diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 100f34caa59..e8a3b4a34fd 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -149,7 +149,7 @@ func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) er peer, _ := c.AllocPeer(id) region.Peers = append(region.Peers, peer) } - return c.putRegion(core.NewRegionInfo(region, nil)) + return c.putRegion(core.NewRegionInfo(region, nil, core.SetSource(core.Storage))) } func TestBasic(t *testing.T) { @@ -233,7 +233,7 @@ func TestDispatch(t *testing.T) { func dispatchHeartbeat(co *coordinator, region *core.RegionInfo, stream hbstream.HeartbeatStream) error { co.hbStreams.BindStream(region.GetLeader().GetStoreId(), stream) - if err := co.cluster.putRegion(region.Clone()); err != nil { + if err := co.cluster.putRegion(region.Clone(core.SetSource(core.Heartbeat))); err != nil { return err } co.opController.Dispatch(region, schedule.DispatchFromHeartBeat) @@ -659,14 +659,14 @@ func TestShouldRun(t *testing.T) { for _, testCase := range testCases { r := tc.GetRegion(testCase.regionID) - nr := r.Clone(core.WithLeader(r.GetPeers()[0])) + nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) re.NoError(tc.processRegionHeartbeat(nr)) re.Equal(testCase.shouldRun, co.shouldRun()) } nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} - newRegion := core.NewRegionInfo(nr, nil) + newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) re.Error(tc.processRegionHeartbeat(newRegion)) - re.Equal(7, co.prepareChecker.sum) + re.Equal(7, tc.core.GetClusterNotFromStorageRegionsCnt()) } func TestShouldRunWithNonLeaderRegions(t *testing.T) { @@ -702,14 +702,14 @@ func TestShouldRunWithNonLeaderRegions(t *testing.T) { for _, testCase := range testCases { r := tc.GetRegion(testCase.regionID) - nr := r.Clone(core.WithLeader(r.GetPeers()[0])) + nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) re.NoError(tc.processRegionHeartbeat(nr)) re.Equal(testCase.shouldRun, co.shouldRun()) } nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} - newRegion := core.NewRegionInfo(nr, nil) + newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) re.Error(tc.processRegionHeartbeat(newRegion)) - re.Equal(9, co.prepareChecker.sum) + re.Equal(9, tc.core.GetClusterNotFromStorageRegionsCnt()) // Now, after server is prepared, there exist some regions with no leader. re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId()) @@ -1011,7 +1011,6 @@ func TestRestart(t *testing.T) { re.NoError(tc.addRegionStore(3, 3)) re.NoError(tc.addLeaderRegion(1, 1)) region := tc.GetRegion(1) - co.prepareChecker.collect(region) // Add 1 replica on store 2. stream := mockhbstream.NewHeartbeatStream() @@ -1024,7 +1023,6 @@ func TestRestart(t *testing.T) { // Recreate coordinator then add another replica on store 3. co = newCoordinator(ctx, tc.RaftCluster, hbStreams) - co.prepareChecker.collect(region) co.run() re.NoError(dispatchHeartbeat(co, region, stream)) region = waitAddLearner(re, stream, region, 3) diff --git a/server/cluster/prepare_checker.go b/server/cluster/prepare_checker.go index 20652bb435f..21f6c919149 100644 --- a/server/cluster/prepare_checker.go +++ b/server/cluster/prepare_checker.go @@ -15,26 +15,23 @@ package cluster import ( + "go.uber.org/zap" "time" "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/utils/syncutil" - "go.uber.org/zap" ) type prepareChecker struct { syncutil.RWMutex - reactiveRegions map[uint64]int - start time.Time - sum int - prepared bool + start time.Time + prepared bool } func newPrepareChecker() *prepareChecker { return &prepareChecker{ - start: time.Now(), - reactiveRegions: make(map[uint64]int), + start: time.Now(), } } @@ -51,14 +48,8 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool { } notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt() totalRegionsCnt := c.GetRegionCount() - if float64(notLoadedFromRegionsCnt) > float64(totalRegionsCnt)*collectFactor { - log.Info("meta not loaded from region number is satisfied, finish prepare checker", - zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt)) - checker.prepared = true - return true - } // The number of active regions should be more than total region of all stores * collectFactor - if float64(totalRegionsCnt)*collectFactor > float64(checker.sum) { + if float64(totalRegionsCnt)*collectFactor > float64(notLoadedFromRegionsCnt) { return false } for _, store := range c.GetStores() { @@ -67,23 +58,15 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool { } storeID := store.GetID() // For each store, the number of active regions should be more than total region of the store * collectFactor - if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) { + if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) { return false } } + log.Info("not loaded from storage region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt)) checker.prepared = true return true } -func (checker *prepareChecker) collect(region *core.RegionInfo) { - checker.Lock() - defer checker.Unlock() - for _, p := range region.GetPeers() { - checker.reactiveRegions[p.GetStoreId()]++ - } - checker.sum++ -} - func (checker *prepareChecker) isPrepared() bool { checker.RLock() defer checker.RUnlock() diff --git a/tests/pdctl/helper.go b/tests/pdctl/helper.go index 67729fa5ce2..f2e7980953a 100644 --- a/tests/pdctl/helper.go +++ b/tests/pdctl/helper.go @@ -125,6 +125,7 @@ func MustPutRegion(re *require.Assertions, cluster *tests.TestCluster, regionID, Peers: []*metapb.Peer{leader}, RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, } + opts = append(opts, core.SetSource(core.Heartbeat)) r := core.NewRegionInfo(metaRegion, leader, opts...) err := cluster.HandleRegionHeartbeat(r) re.NoError(err) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 9ef64ac91fb..6e79995008f 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -1409,7 +1409,7 @@ func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id. StartKey: []byte{byte(i)}, EndKey: []byte{byte(i + 1)}, } - rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) + rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0], core.SetSource(core.Heartbeat))) } time.Sleep(50 * time.Millisecond)