Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: remove task name opt #8168

Merged
merged 2 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -751,19 +750,19 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
debug = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
"DebugLog",
func(_ context.Context) {
d(msg, fields...)
},
ratelimit.WithTaskName("DebugLog"),
)
}
info = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
"InfoLog",
func(_ context.Context) {
i(msg, fields...)
},
ratelimit.WithTaskName("InfoLog"),
)
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,10 +594,10 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c

ctx.TaskRunner.RunTask(
ctx,
ratelimit.HandleStatsAsync,
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
ratelimit.WithTaskName(ratelimit.HandleStatsAsync),
)
tracer.OnAsyncHotStatsFinished()
hasRegionStats := c.regionStats != nil
Expand All @@ -611,22 +611,22 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx,
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
},
ratelimit.WithTaskName(ratelimit.ObserveRegionStatsAsync),
)
}
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
)
}
return nil
Expand All @@ -646,28 +646,28 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
}
ctx.TaskRunner.RunTask(
ctx,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
)
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
ratelimit.HandleOverlaps,
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
ratelimit.WithTaskName(ratelimit.HandleOverlaps),
)
}
tracer.OnSaveCacheFinished()
// handle region stats
ctx.TaskRunner.RunTask(
ctx,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
cluster.Collect(c, region, hasRegionStats)
},
ratelimit.WithTaskName(ratelimit.CollectRegionStatsAsync),
)
tracer.OnCollectRegionStatsFinished()
return nil
Expand Down
36 changes: 15 additions & 21 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,18 @@ const initialCapacity = 100

// Runner is the interface for running tasks.
type Runner interface {
RunTask(ctx context.Context, f func(context.Context), opts ...TaskOption) error
RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error
Start()
Stop()
}

// Task is a task to be run.
type Task struct {
Ctx context.Context
Opts *TaskOpts
f func(context.Context)
ctx context.Context
submittedAt time.Time
opts *TaskOpts
f func(context.Context)
name string
}

// ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum.
Expand Down Expand Up @@ -86,19 +87,11 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur
}

// TaskOpts is the options for RunTask.
type TaskOpts struct {
// TaskName is a human-readable name for the operation. TODO: metrics by name.
TaskName string
}
type TaskOpts struct{}

// TaskOption configures TaskOp
type TaskOption func(opts *TaskOpts)

// WithTaskName specify the task name.
func WithTaskName(name string) TaskOption {
return func(opts *TaskOpts) { opts.TaskName = name }
}

// Start starts the runner.
func (cr *ConcurrentRunner) Start() {
cr.stopChan = make(chan struct{})
Expand All @@ -114,9 +107,9 @@ func (cr *ConcurrentRunner) Start() {
if err != nil {
continue
}
go cr.run(task.Ctx, task.f, token)
go cr.run(task.ctx, task.f, token)
} else {
go cr.run(task.Ctx, task.f, nil)
go cr.run(task.ctx, task.f, nil)
}
case <-cr.stopChan:
cr.pendingMu.Lock()
Expand Down Expand Up @@ -156,7 +149,7 @@ func (cr *ConcurrentRunner) processPendingTasks() {
select {
case cr.taskChan <- task:
cr.pendingTasks = cr.pendingTasks[1:]
cr.pendingTaskCount[task.Opts.TaskName]--
cr.pendingTaskCount[task.name]--
return
default:
return
Expand All @@ -171,15 +164,16 @@ func (cr *ConcurrentRunner) Stop() {
}

// RunTask runs the task asynchronously.
func (cr *ConcurrentRunner) RunTask(ctx context.Context, f func(context.Context), opts ...TaskOption) error {
func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error {
taskOpts := &TaskOpts{}
for _, opt := range opts {
opt(taskOpts)
}
task := &Task{
Ctx: ctx,
ctx: ctx,
name: name,
f: f,
Opts: taskOpts,
opts: taskOpts,
}

cr.processPendingTasks()
Expand All @@ -197,7 +191,7 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, f func(context.Context)
}
task.submittedAt = time.Now()
cr.pendingTasks = append(cr.pendingTasks, task)
cr.pendingTaskCount[taskOpts.TaskName]++
cr.pendingTaskCount[task.name]++
}
return nil
}
Expand All @@ -211,7 +205,7 @@ func NewSyncRunner() *SyncRunner {
}

// RunTask runs the task synchronously.
func (*SyncRunner) RunTask(ctx context.Context, f func(context.Context), _ ...TaskOption) error {
func (*SyncRunner) RunTask(ctx context.Context, _ string, f func(context.Context), _ ...TaskOption) error {
f(ctx)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ratelimit/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ func TestConcurrentRunner(t *testing.T) {
wg.Add(1)
err := runner.RunTask(
context.Background(),
"test1",
func(context.Context) {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
},
WithTaskName("test1"),
)
require.NoError(t, err)
}
Expand All @@ -55,11 +55,11 @@ func TestConcurrentRunner(t *testing.T) {
wg.Add(1)
err := runner.RunTask(
context.Background(),
"test2",
func(context.Context) {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
},
WithTaskName("test2"),
)
if err != nil {
wg.Done()
Expand Down
14 changes: 7 additions & 7 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,10 +1026,10 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.HandleStatsAsync,
func(_ context.Context) {
cluster.HandleStatsAsync(c, region)
},
ratelimit.WithTaskName(ratelimit.HandleStatsAsync),
)
}
tracer.OnAsyncHotStatsFinished()
Expand All @@ -1047,22 +1047,22 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
cluster.Collect(c, region, hasRegionStats)
}
},
ratelimit.WithTaskName(ratelimit.ObserveRegionStatsAsync),
)
}
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
)
}
return nil
Expand All @@ -1086,20 +1086,20 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
}
ctx.TaskRunner.RunTask(
ctx,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
ratelimit.WithTaskName(ratelimit.UpdateSubTree),
)
tracer.OnUpdateSubTreeFinished()

if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.HandleOverlaps,
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
},
ratelimit.WithTaskName(ratelimit.HandleOverlaps),
)
}
regionUpdateCacheEventCounter.Inc()
Expand All @@ -1109,20 +1109,21 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// handle region stats
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
cluster.Collect(c, region, hasRegionStats)
},
ratelimit.WithTaskName(ratelimit.CollectRegionStatsAsync),
)

tracer.OnCollectRegionStatsFinished()
if c.storage != nil {
if saveKV {
ctx.TaskRunner.RunTask(
ctx.Context,
ratelimit.SaveRegionToKV,
func(_ context.Context) {
// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
Expand All @@ -1144,7 +1145,6 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
}
regionUpdateKVEventCounter.Inc()
},
ratelimit.WithTaskName(ratelimit.SaveRegionToKV),
)
}
}
Expand Down