Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-7.5' into cherry-pick-…
Browse files Browse the repository at this point in the history
…7748-to-release-7.5
  • Loading branch information
CabinfeverB committed Feb 10, 2024
2 parents 3705a9a + b8feb2b commit 04a02a7
Show file tree
Hide file tree
Showing 17 changed files with 176 additions and 82 deletions.
18 changes: 15 additions & 3 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,11 +1340,23 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo
return
}

// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore
// GetClusterNotFromStorageRegionsCnt gets the `NotFromStorageRegionsCnt` count of regions that not loaded from storage anymore.
func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int {
r.st.RLock()
defer r.st.RUnlock()
return r.tree.notFromStorageRegionsCnt
return r.tree.notFromStorageRegionsCount()
}

// GetNotFromStorageRegionsCntByStore gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID.
func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int {
r.st.RLock()
defer r.st.RUnlock()
return r.getNotFromStorageRegionsCntByStoreLocked(storeID)
}

// getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID.
func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int {
return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount()
}

// GetMetaRegions gets a set of metapb.Region from regionMap
Expand Down Expand Up @@ -1380,7 +1392,7 @@ func (r *RegionsInfo) GetStoreRegionCount(storeID uint64) int {
return r.getStoreRegionCountLocked(storeID)
}

// GetStoreRegionCount gets the total count of a store's leader, follower and learner RegionInfo by storeID
// getStoreRegionCountLocked gets the total count of a store's leader, follower and learner RegionInfo by storeID
func (r *RegionsInfo) getStoreRegionCountLocked(storeID uint64) int {
return r.leaders[storeID].length() + r.followers[storeID].length() + r.learners[storeID].length()
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func (t *regionTree) length() int {
return t.tree.Len()
}

func (t *regionTree) notFromStorageRegionsCount() int {
if t == nil {
return 0
}
return t.notFromStorageRegionsCnt
}

// GetOverlaps returns the range items that has some intersections with the given items.
func (t *regionTree) overlaps(item *regionItem) []*regionItem {
// note that Find() gets the last item that is less or equal than the item.
Expand Down
4 changes: 3 additions & 1 deletion pkg/mcs/metastorage/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *Service) Watch(req *meta_storagepb.WatchRequest, server meta_storagepb.
if err := s.checkServing(); err != nil {
return err
}
ctx, cancel := context.WithCancel(s.ctx)
ctx, cancel := context.WithCancel(server.Context())
defer cancel()
options := []clientv3.OpOption{}
key := string(req.GetKey())
Expand All @@ -106,6 +106,8 @@ func (s *Service) Watch(req *meta_storagepb.WatchRequest, server meta_storagepb.
select {
case <-ctx.Done():
return nil
case <-s.ctx.Done():
return nil
case res := <-watchChan:
if res.Err() != nil {
var resp meta_storagepb.WatchResponse
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,8 @@ func (c *Cluster) collectClusterMetrics() {
func (c *Cluster) resetMetrics() {
statistics.Reset()

c.coordinator.GetSchedulersController().ResetSchedulerMetrics()
c.coordinator.ResetHotSpotMetrics()
schedulers.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
c.resetClusterMetrics()
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ func (c *RuleChecker) CheckWithFit(region *core.RegionInfo, fit *placement.Regio
return
}

// the placement rule is disabled
if fit == nil {
return
}

// If the fit is calculated by FitRegion, which means we get a new fit result, thus we should
// invalid the cache if it exists
c.ruleManager.InvalidCache(region.GetID())
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ func collectHotMetrics(cluster sche.ClusterInformer, stores []*core.StoreInfo, t
}

// ResetHotSpotMetrics resets hot spot metrics.
func (c *Coordinator) ResetHotSpotMetrics() {
func ResetHotSpotMetrics() {
hotSpotStatusGauge.Reset()
schedulers.HotPendingSum.Reset()
}
Expand Down
35 changes: 6 additions & 29 deletions pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ import (

type prepareChecker struct {
syncutil.RWMutex
reactiveRegions map[uint64]int
start time.Time
sum int
prepared bool
start time.Time
prepared bool
}

func newPrepareChecker() *prepareChecker {
return &prepareChecker{
start: time.Now(),
reactiveRegions: make(map[uint64]int),
start: time.Now(),
}
}

Expand All @@ -51,13 +48,8 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
}
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
totalRegionsCnt := c.GetTotalRegionCount()
if float64(notLoadedFromRegionsCnt) > float64(totalRegionsCnt)*collectFactor {
log.Info("meta not loaded from region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt))
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(totalRegionsCnt)*collectFactor > float64(checker.sum) {
if float64(totalRegionsCnt)*collectFactor > float64(notLoadedFromRegionsCnt) {
return false
}
for _, store := range c.GetStores() {
Expand All @@ -66,23 +58,15 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool {
}
storeID := store.GetID()
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) {
return false
}
}
log.Info("not loaded from storage region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt))
checker.prepared = true
return true
}

func (checker *prepareChecker) Collect(region *core.RegionInfo) {
checker.Lock()
defer checker.Unlock()
for _, p := range region.GetPeers() {
checker.reactiveRegions[p.GetStoreId()]++
}
checker.sum++
}

func (checker *prepareChecker) IsPrepared() bool {
checker.RLock()
defer checker.RUnlock()
Expand All @@ -95,10 +79,3 @@ func (checker *prepareChecker) SetPrepared() {
defer checker.Unlock()
checker.prepared = true
}

// for test purpose
func (checker *prepareChecker) GetSum() int {
checker.RLock()
defer checker.RUnlock()
return checker.sum
}
11 changes: 3 additions & 8 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ const maxScheduleRetries = 10

var (
denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("schedulers", "deny")
rulesCntStatusGauge = ruleStatusGauge.WithLabelValues("rule_count")
groupsCntStatusGauge = ruleStatusGauge.WithLabelValues("group_count")
)

// Controller is used to manage all schedulers.
Expand Down Expand Up @@ -128,21 +126,18 @@ func (c *Controller) CollectSchedulerMetrics() {
}
ruleCnt := ruleMgr.GetRulesCount()
groupCnt := ruleMgr.GetGroupsCount()
rulesCntStatusGauge.Set(float64(ruleCnt))
groupsCntStatusGauge.Set(float64(groupCnt))
ruleStatusGauge.WithLabelValues("rule_count").Set(float64(ruleCnt))
ruleStatusGauge.WithLabelValues("group_count").Set(float64(groupCnt))
}

func (c *Controller) isSchedulingHalted() bool {
return c.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

// ResetSchedulerMetrics resets metrics of all schedulers.
func (c *Controller) ResetSchedulerMetrics() {
func ResetSchedulerMetrics() {
schedulerStatusGauge.Reset()
ruleStatusGauge.Reset()
// create in map again
rulesCntStatusGauge = ruleStatusGauge.WithLabelValues("rule_count")
groupsCntStatusGauge = ruleStatusGauge.WithLabelValues("group_count")
}

// AddSchedulerHandler adds the HTTP handler for a scheduler.
Expand Down
66 changes: 50 additions & 16 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,13 @@ func (h *regionHandler) GetRegionByID(w http.ResponseWriter, r *http.Request) {
}

regionInfo := rc.GetRegion(regionID)
h.rd.JSON(w, http.StatusOK, NewAPIRegionInfo(regionInfo))
b, err := marshalRegionInfoJSON(r.Context(), regionInfo)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.Data(w, http.StatusOK, b)
}

// @Tags region
Expand Down Expand Up @@ -289,7 +295,13 @@ func (h *regionHandler) GetRegion(w http.ResponseWriter, r *http.Request) {
}

regionInfo := rc.GetRegionByKey([]byte(key))
h.rd.JSON(w, http.StatusOK, NewAPIRegionInfo(regionInfo))
b, err := marshalRegionInfoJSON(r.Context(), regionInfo)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.Data(w, http.StatusOK, b)
}

// @Tags region
Expand Down Expand Up @@ -349,6 +361,24 @@ func newRegionsHandler(svr *server.Server, rd *render.Render) *regionsHandler {
}
}

// marshalRegionInfoJSON marshals region to bytes in `RegionInfo`'s JSON format.
// It is used to reduce the cost of JSON serialization.
func marshalRegionInfoJSON(ctx context.Context, r *core.RegionInfo) ([]byte, error) {
out := &jwriter.Writer{}

region := &RegionInfo{}
select {
case <-ctx.Done():
// Return early, avoid the unnecessary computation.
// See more details in https://github.com/tikv/pd/issues/6835
return nil, ctx.Err()
default:
}

covertAPIRegionInfo(r, region, out)
return out.Buffer.BuildBytes(), out.Error
}

// marshalRegionsInfoJSON marshals regions to bytes in `RegionsInfo`'s JSON format.
// It is used to reduce the cost of JSON serialization.
func marshalRegionsInfoJSON(ctx context.Context, regions []*core.RegionInfo) ([]byte, error) {
Expand All @@ -372,27 +402,31 @@ func marshalRegionsInfoJSON(ctx context.Context, regions []*core.RegionInfo) ([]
if i > 0 {
out.RawByte(',')
}
InitRegion(r, region)
// EasyJSON will not check anonymous struct pointer field and will panic if the field is nil.
// So we need to set the field to default value explicitly when the anonymous struct pointer is nil.
region.Leader.setDefaultIfNil()
for i := range region.Peers {
region.Peers[i].setDefaultIfNil()
}
for i := range region.PendingPeers {
region.PendingPeers[i].setDefaultIfNil()
}
for i := range region.DownPeers {
region.DownPeers[i].setDefaultIfNil()
}
region.MarshalEasyJSON(out)
covertAPIRegionInfo(r, region, out)
}
out.RawByte(']')

out.RawByte('}')
return out.Buffer.BuildBytes(), out.Error
}

func covertAPIRegionInfo(r *core.RegionInfo, region *RegionInfo, out *jwriter.Writer) {
InitRegion(r, region)
// EasyJSON will not check anonymous struct pointer field and will panic if the field is nil.
// So we need to set the field to default value explicitly when the anonymous struct pointer is nil.
region.Leader.setDefaultIfNil()
for i := range region.Peers {
region.Peers[i].setDefaultIfNil()
}
for i := range region.PendingPeers {
region.PendingPeers[i].setDefaultIfNil()
}
for i := range region.DownPeers {
region.DownPeers[i].setDefaultIfNil()
}
region.MarshalEasyJSON(out)
}

// @Tags region
// @Summary List all regions in the cluster.
// @Produce json
Expand Down
6 changes: 3 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
// However, it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
return err
}
Expand Down Expand Up @@ -2176,8 +2176,8 @@ func (c *RaftCluster) resetMetrics() {
statistics.Reset()

if !c.isAPIServiceMode {
c.coordinator.GetSchedulersController().ResetSchedulerMetrics()
c.coordinator.ResetHotSpotMetrics()
schedulers.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
c.resetClusterMetrics()
}
c.resetHealthStatus()
Expand Down
Loading

0 comments on commit 04a02a7

Please sign in to comment.