Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#7217
Browse files Browse the repository at this point in the history
ref tikv#7016

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
HuSharp authored and ti-chi-bot committed Feb 22, 2024
1 parent eddf85e commit 98974bf
Show file tree
Hide file tree
Showing 9 changed files with 1,986 additions and 13 deletions.
62 changes: 62 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cluster

import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/statistics"
)

// Cluster provides an overview of a cluster's basic information.
type Cluster interface {
GetHotStat() *statistics.HotStat
GetRegionStats() *statistics.RegionStatistics
GetLabelStats() *statistics.LabelStatistics
GetCoordinator() *schedule.Coordinator
GetRuleManager() *placement.RuleManager
}

// HandleStatsAsync handles the flow asynchronously.
func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region)
}

// HandleOverlaps handles the overlap regions.
func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) {
for _, item := range overlaps {
if c.GetRegionStats() != nil {
c.GetRegionStats().ClearDefunctRegion(item.GetID())
}
c.GetLabelStats().ClearDefunctRegion(item.GetID())
c.GetRuleManager().InvalidCache(item.GetID())
}
}

// Collect collects the cluster information.
func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats, isNew, isPrepared bool) {
if hasRegionStats {
c.GetRegionStats().Observe(region, stores)
}
}
18 changes: 15 additions & 3 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1289,11 +1289,23 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo
return
}

// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore
// GetClusterNotFromStorageRegionsCnt gets the `NotFromStorageRegionsCnt` count of regions that not loaded from storage anymore.
func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int {
r.t.RLock()
defer r.t.RUnlock()
return r.tree.notFromStorageRegionsCnt
return r.tree.notFromStorageRegionsCount()
}

// GetNotFromStorageRegionsCntByStore gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID.
func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int {
r.st.RLock()
defer r.st.RUnlock()
return r.getNotFromStorageRegionsCntByStoreLocked(storeID)
}

// getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID.
func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int {
return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount()
}

// GetMetaRegions gets a set of metapb.Region from regionMap
Expand Down Expand Up @@ -1329,7 +1341,7 @@ func (r *RegionsInfo) GetStoreRegionCount(storeID uint64) int {
return r.getStoreRegionCountLocked(storeID)
}

// GetStoreRegionCount gets the total count of a store's leader, follower and learner RegionInfo by storeID
// getStoreRegionCountLocked gets the total count of a store's leader, follower and learner RegionInfo by storeID
func (r *RegionsInfo) getStoreRegionCountLocked(storeID uint64) int {
return r.leaders[storeID].length() + r.followers[storeID].length() + r.learners[storeID].length()
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func (t *regionTree) length() int {
return t.tree.Len()
}

func (t *regionTree) notFromStorageRegionsCount() int {
if t == nil {
return 0
}
return t.notFromStorageRegionsCnt
}

// GetOverlaps returns the range items that has some intersections with the given items.
func (t *regionTree) overlaps(item *regionItem) []*regionItem {
// note that Find() gets the last item that is less or equal than the item.
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
// However, it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 98974bf

Please sign in to comment.