Skip to content

Commit

Permalink
pkg: remove old duplicated task (#8234)
Browse files Browse the repository at this point in the history
ref #7897

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Jun 13, 2024
1 parent 9348164 commit bf02fb5
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 51 deletions.
10 changes: 5 additions & 5 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package core

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
Expand Down Expand Up @@ -751,21 +750,22 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
logRunner := ctx.LogRunner
// print log asynchronously
debug, info := d, i
regionID := region.GetID()
if logRunner != nil {
debug = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
regionID,
"DebugLog",
func(_ context.Context) {
func() {
d(msg, fields...)
},
)
}
info = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
regionID,
"InfoLog",
func(_ context.Context) {
func() {
i(msg, fields...)
},
)
Expand Down
22 changes: 11 additions & 11 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,15 +607,15 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
_, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin)

regionID := region.GetID()
if !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
func() {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
Expand All @@ -625,9 +625,9 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(true),
Expand All @@ -649,28 +649,28 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
return err
}
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(retained),
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.HandleOverlaps,
func(_ context.Context) {
func() {
cluster.HandleOverlaps(c, overlaps)
},
)
}
tracer.OnSaveCacheFinished()
// handle region stats
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
func() {
cluster.Collect(c, region, hasRegionStats)
},
)
Expand Down
51 changes: 33 additions & 18 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ const (

// Runner is the interface for running tasks.
type Runner interface {
RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error
RunTask(id uint64, name string, f func(), opts ...TaskOption) error
Start()
Stop()
}

// Task is a task to be run.
type Task struct {
ctx context.Context
id uint64
submittedAt time.Time
f func(context.Context)
f func()
name string
// retained indicates whether the task should be dropped if the task queue exceeds maxPendingDuration.
retained bool
Expand All @@ -60,17 +60,22 @@ type Task struct {
// ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum.
var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded")

// ConcurrentRunner is a simple task runner that limits the number of concurrent tasks.
type taskID struct {
id uint64
name string
}

type ConcurrentRunner struct {
name string
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingTasks []*Task
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]int64
pendingTaskCount map[string]int
pendingTasks []*Task
existTasks map[taskID]*Task
maxWaitingDuration prometheus.Gauge
}

Expand All @@ -82,7 +87,8 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur
maxPendingDuration: maxPendingDuration,
taskChan: make(chan *Task),
pendingTasks: make([]*Task, 0, initialCapacity),
pendingTaskCount: make(map[string]int64),
pendingTaskCount: make(map[string]int),
existTasks: make(map[taskID]*Task),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
}
return s
Expand All @@ -101,6 +107,7 @@ func (cr *ConcurrentRunner) Start() {
cr.stopChan = make(chan struct{})
cr.wg.Add(1)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
go func() {
defer cr.wg.Done()
for {
Expand Down Expand Up @@ -139,7 +146,7 @@ func (cr *ConcurrentRunner) Start() {

func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) {
start := time.Now()
task.f(task.ctx)
task.f()
if token != nil {
cr.limiter.ReleaseToken(token)
cr.processPendingTasks()
Expand All @@ -157,6 +164,7 @@ func (cr *ConcurrentRunner) processPendingTasks() {
case cr.taskChan <- task:
cr.pendingTasks = cr.pendingTasks[1:]
cr.pendingTaskCount[task.name]--
delete(cr.existTasks, taskID{id: task.id, name: task.name})
default:
}
return
Expand All @@ -170,11 +178,12 @@ func (cr *ConcurrentRunner) Stop() {
}

// RunTask runs the task asynchronously.
func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error {
func (cr *ConcurrentRunner) RunTask(id uint64, name string, f func(), opts ...TaskOption) error {
task := &Task{
ctx: ctx,
name: name,
f: f,
id: id,
name: name,
f: f,
submittedAt: time.Now(),
}
for _, opt := range opts {
opt(task)
Expand All @@ -187,23 +196,29 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con
}()

pendingTaskNum := len(cr.pendingTasks)
tid := taskID{task.id, task.name}
if pendingTaskNum > 0 {
// Here we use a map to find the task with the same ID.
// Then replace the old task with the new one.
if t, ok := cr.existTasks[tid]; ok {
t.f = f
t.submittedAt = time.Now()
return nil
}
if !task.retained {
maxWait := time.Since(cr.pendingTasks[0].submittedAt)
if maxWait > cr.maxPendingDuration {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
// We use the max task number to limit the memory usage.
// It occupies around 1.5GB memory when there is 20000000 pending task.
if len(cr.pendingTasks) > maxPendingTaskNum {
if pendingTaskNum > maxPendingTaskNum {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
task.submittedAt = time.Now()
cr.pendingTasks = append(cr.pendingTasks, task)
cr.existTasks[tid] = task
cr.pendingTaskCount[task.name]++
return nil
}
Expand All @@ -217,8 +232,8 @@ func NewSyncRunner() *SyncRunner {
}

// RunTask runs the task synchronously.
func (*SyncRunner) RunTask(ctx context.Context, _ string, f func(context.Context), _ ...TaskOption) error {
f(ctx)
func (*SyncRunner) RunTask(_ uint64, _ string, f func(), _ ...TaskOption) error {
f()
return nil
}

Expand Down
34 changes: 29 additions & 5 deletions pkg/ratelimit/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package ratelimit

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -34,9 +33,9 @@ func TestConcurrentRunner(t *testing.T) {
time.Sleep(50 * time.Millisecond)
wg.Add(1)
err := runner.RunTask(
context.Background(),
uint64(i),
"test1",
func(context.Context) {
func() {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
},
Expand All @@ -54,9 +53,9 @@ func TestConcurrentRunner(t *testing.T) {
for i := 0; i < 10; i++ {
wg.Add(1)
err := runner.RunTask(
context.Background(),
uint64(i),
"test2",
func(context.Context) {
func() {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
},
Expand All @@ -74,4 +73,29 @@ func TestConcurrentRunner(t *testing.T) {
}
wg.Wait()
})

t.Run("DuplicatedTask", func(t *testing.T) {
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Minute)
runner.Start()
defer runner.Stop()
for i := 1; i < 11; i++ {
regionID := uint64(i)
if i == 10 {
regionID = 4
}
err := runner.RunTask(
regionID,
"test3",
func() {
time.Sleep(time.Second)
},
)
require.NoError(t, err)
time.Sleep(1 * time.Millisecond)
}

updatedSubmitted := runner.pendingTasks[1].submittedAt
lastSubmitted := runner.pendingTasks[len(runner.pendingTasks)-1].submittedAt
require.Greater(t, updatedSubmitted, lastSubmitted)
})
}
25 changes: 13 additions & 12 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// Save to cache if meta or leader is updated, or contains any down/pending peer.
saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin)
tracer.OnRegionGuideFinished()
regionID := region.GetID()
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
Expand All @@ -1046,9 +1047,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// We need to think of a better way to reduce this part of the cost in the future.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
func() {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
Expand All @@ -1058,9 +1059,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(true),
Expand All @@ -1086,9 +1087,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
return err
}
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
func() {
c.CheckAndPutSubTree(region)
},
ratelimit.WithRetained(retained),
Expand All @@ -1097,9 +1098,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio

if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.HandleOverlaps,
func(_ context.Context) {
func() {
cluster.HandleOverlaps(c, overlaps)
},
)
Expand All @@ -1110,9 +1111,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
tracer.OnSaveCacheFinished()
// handle region stats
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
func() {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
Expand All @@ -1124,9 +1125,9 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if c.storage != nil {
if saveKV {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.SaveRegionToKV,
func(_ context.Context) {
func() {
// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
// Not successfully saved to storage is not fatal, it only leads to longer warm-up
Expand Down

0 comments on commit bf02fb5

Please sign in to comment.