Skip to content

Commit

Permalink
*: concurrently update the subtree
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Apr 15, 2024
1 parent 0214778 commit 16e1028
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 3 deletions.
155 changes: 155 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,161 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *R
return overlaps, nil
}

// CheckAndPutRootTree checks if the region is valid to put to the root, if valid then return error.
// Usually used with CheckAndPutSubTree together.
func (r *RegionsInfo) CheckAndPutRootTree(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) {
tracer := ctx.Tracer
r.t.Lock()
var ols []*regionItem
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
return nil, err
}
tracer.OnValidateRegionFinished()
_, overlaps, _ := r.setRegionLocked(region, true, ols...)
r.t.Unlock()
tracer.OnSetRegionFinished()
return overlaps, nil
}

// CheckAndPutSubTree checks if the region is valid to put to the sub tree, if valid then return error.
// Usually used with CheckAndPutRootTree together.
func (r *RegionsInfo) CheckAndPutSubTree(ctx context.Context, region *RegionInfo) error {

Check failure on line 1033 in pkg/core/region.go

View workflow job for this annotation

GitHub Actions / statics

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it to match ^_ (revive)
// new region get from root tree again
var newRegion *RegionInfo
newRegion = r.GetRegion(region.GetID())
if newRegion == nil {
newRegion = region
}
r.UpdateSubTreeOrderInsensitive(newRegion)
return nil
}

// UpdateSubTreeOrderInsensitive updates the subtree.
// It's can used to update the subtree concurrently.
// because it can use concurrently, check region version to make sure the order.
// 1. if the version is stale, drop this update.
// 2. if the version is same, then only some statistic info need to be updated. the order of update is not important.
//
// in another hand, the overlap regions need re-check, because the region tree and the subtree update is not atomic.
func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) {
var origin *RegionInfo
r.st.Lock()
defer r.st.Unlock()
originItem, ok := r.subRegions[region.GetID()]
if ok {
origin = originItem.RegionInfo
}
rangeChanged := true

if origin != nil {
re := region.GetRegionEpoch()
oe := origin.GetRegionEpoch()
isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm()
if (isTermBehind || re.GetVersion() < oe.GetVersion() || re.GetConfVer() < oe.GetConfVer()) && !region.isRegionRecreated() {
// Region meta is stale, skip.
return
}
rangeChanged = !origin.rangeEqualsTo(region)

if rangeChanged || !origin.peersEqualTo(region) {
// If the range or peers have changed, the sub regionTree needs to be cleaned up.
// TODO: Improve performance by deleting only the different peers.
r.removeRegionFromSubTreeLocked(origin)
} else {
// The region tree and the subtree update is not atomic and the region tree is updated first.
// If there are two thread needs to update region tree,
// t1: thread-A update region tree
// t2: thread-B: update region tree again
// t3: thread-B: update subtree
// t4: thread-A: update region subtree
// to keep region tree consistent with subtree, we need to drop this update.
if tree, ok := r.subRegions[region.GetID()]; ok {
r.updateSubTreeStat(origin, region)
tree.RegionInfo = region
}
return
}
}

if rangeChanged {
overlaps := r.getOverlapRegionFromSubTreeLocked(region)
for _, re := range overlaps {
r.removeRegionFromSubTreeLocked(re)
}
}

item := &regionItem{region}
r.subRegions[region.GetID()] = item
// It has been removed and all information needs to be updated again.
// Set peers then.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) {
store, ok := peersMap[storeID]
if !ok {
store = newRegionTree()
peersMap[storeID] = store
}
store.update(item, false)
}

// Add to leaders and followers.
for _, peer := range region.GetVoters() {
storeID := peer.GetStoreId()
if peer.GetId() == region.leader.GetId() {
// Add leader peer to leaders.
setPeer(r.leaders, storeID, item)
} else {
// Add follower peer to followers.
setPeer(r.followers, storeID, item)
}
}

setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) {
for _, peer := range peers {
storeID := peer.GetStoreId()
setPeer(peersMap, storeID, item)
}
}
// Add to learners.
setPeers(r.learners, region.GetLearners())
// Add to witnesses.
setPeers(r.witnesses, region.GetWitnesses())
// Add to PendingPeers
setPeers(r.pendingPeers, region.GetPendingPeers())
}

func (r *RegionsInfo) getOverlapRegionFromSubTreeLocked(region *RegionInfo) []*RegionInfo {
it := &regionItem{RegionInfo: region}
overlaps := make([]*RegionInfo, 0)
overlapsMap := make(map[uint64]struct{})
collectFromItemSlice := func(peersMap map[uint64]*regionTree, storeID uint64) {
if tree, ok := peersMap[storeID]; ok {
items := tree.overlaps(it)
for _, item := range items {
if _, ok := overlapsMap[item.GetID()]; !ok {
overlapsMap[item.GetID()] = struct{}{}
overlaps = append(overlaps, item.RegionInfo)
}
}
}
}
for _, peer := range region.GetMeta().GetPeers() {
storeID := peer.GetStoreId()
collectFromItemSlice(r.leaders, storeID)
collectFromItemSlice(r.followers, storeID)
collectFromItemSlice(r.learners, storeID)
collectFromItemSlice(r.witnesses, storeID)
}
return overlaps
}

// GetRelevantRegions returns the relevant regions for a given region.
func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) {
r.t.RLock()
Expand Down
13 changes: 12 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,10 +632,21 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// However, it can't solve the race condition of concurrent heartbeats from the same region.

// Async task in next PR.
if overlaps, err = c.AtomicCheckAndPutRegion(ctx, region); err != nil {
if overlaps, err = c.CheckAndPutRootTree(ctx, region); err != nil {
tracer.OnSaveCacheFinished()
return err
}
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "UpdateSubTree",
Limit: ctx.Limiter,
},
func(ctx context.Context) {
c.CheckAndPutSubTree(ctx, region)
},
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
Expand Down
50 changes: 50 additions & 0 deletions pkg/ratelimit/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ratelimit

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_max_waiting_duration_seconds",
Help: "The duration of tasks waiting in the runner.",
}, []string{"name"})

RunnerTaskPendingTasks = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_pending_tasks",
Help: "The number of pending tasks in the runner.",
}, []string{"name"})
RunnerTaskFailedTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "ratelimit",
Name: "runner_task_failed_tasks_total",
Help: "The number of failed tasks in the runner.",
}, []string{"name"})
)

func init() {
prometheus.MustRegister(RunnerTaskMaxWaitingDuration)
prometheus.MustRegister(RunnerTaskPendingTasks)
prometheus.MustRegister(RunnerTaskFailedTasks)
}
15 changes: 15 additions & 0 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -53,6 +54,8 @@ type ConcurrentRunner struct {
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
failedTaskCount prometheus.Counter
maxWaitingDuration prometheus.Gauge
}

// NewConcurrentRunner creates a new ConcurrentRunner.
Expand All @@ -62,6 +65,8 @@ func NewConcurrentRunner(name string, maxPendingDuration time.Duration) *Concurr
maxPendingDuration: maxPendingDuration,
taskChan: make(chan *Task),
pendingTasks: make([]*Task, 0, initialCapacity),
failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
}
return s
}
Expand All @@ -77,6 +82,7 @@ type TaskOpts struct {
func (s *ConcurrentRunner) Start() {
s.stopChan = make(chan struct{})
s.wg.Add(1)
ticker := time.NewTicker(5 * time.Second)
go func() {
defer s.wg.Done()
for {
Expand All @@ -94,6 +100,14 @@ func (s *ConcurrentRunner) Start() {
case <-s.stopChan:
log.Info("stopping async task runner", zap.String("name", s.name))
return
case <-ticker.C:
maxDuration := time.Duration(0)
s.pendingMu.Lock()
if len(s.pendingTasks) > 0 {
maxDuration = time.Since(s.pendingTasks[0].submittedAt)
}
s.pendingMu.Unlock()
s.maxWaitingDuration.Set(float64(maxDuration.Seconds()))

Check failure on line 110 in pkg/ratelimit/runner.go

View workflow job for this annotation

GitHub Actions / statics

unnecessary conversion (unconvert)
}
}
}()
Expand Down Expand Up @@ -144,6 +158,7 @@ func (s *ConcurrentRunner) RunTask(ctx context.Context, opt TaskOpts, f func(con
if len(s.pendingTasks) > 0 {
maxWait := time.Since(s.pendingTasks[0].submittedAt)
if maxWait > s.maxPendingDuration {
s.failedTaskCount.Inc()
return ErrMaxWaitingTasksExceeded
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const (
defaultEnableJointConsensus = true
defaultEnableTiKVSplitRegion = true
defaultEnableHeartbeatBreakdownMetrics = true
defaultEnableHeartbeatConcurrentRunner = false
defaultEnableHeartbeatConcurrentRunner = true
defaultEnableCrossTableMerge = true
defaultEnableDiagnostic = true
defaultStrictlyMatchLabel = false
Expand Down
14 changes: 13 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,10 +1065,22 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// check its validation again here.
//
// However, it can't solve the race condition of concurrent heartbeats from the same region.
if overlaps, err = c.core.AtomicCheckAndPutRegion(ctx, region); err != nil {
if overlaps, err = c.core.CheckAndPutRootTree(ctx, region); err != nil {
tracer.OnSaveCacheFinished()
return err
}
ctx.TaskRunner.RunTask(
ctx,
ratelimit.TaskOpts{
TaskName: "UpdateSubTree",
Limit: ctx.Limiter,
},
func(ctx context.Context) {
c.CheckAndPutSubTree(ctx, region)
},
)
tracer.OnUpdateSubTreeFinished()

if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.TaskRunner.RunTask(
ctx.Context,
Expand Down

0 comments on commit 16e1028

Please sign in to comment.