From 91fbb4022134f71f71d582225677a8c14858f291 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 10 May 2024 16:37:11 +0800 Subject: [PATCH] optimize memory usage Signed-off-by: Ryan Leung --- pkg/core/metrics.go | 18 ++++++- pkg/core/region.go | 23 ++++---- pkg/ratelimit/runner.go | 81 ++++++++++++++++++----------- pkg/statistics/region_collection.go | 39 ++++++++------ pkg/utils/grpcutil/grpcutil.go | 10 ++-- server/cluster/cluster_worker.go | 1 + 6 files changed, 105 insertions(+), 67 deletions(-) diff --git a/pkg/core/metrics.go b/pkg/core/metrics.go index d23cf9dfcaaa..94fd94013c4e 100644 --- a/pkg/core/metrics.go +++ b/pkg/core/metrics.go @@ -15,6 +15,7 @@ package core import ( + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -114,6 +115,7 @@ type RegionHeartbeatProcessTracer interface { OnCollectRegionStatsFinished() OnAllStageFinished() LogFields() []zap.Field + Release() } type noopHeartbeatProcessTracer struct{} @@ -138,6 +140,7 @@ func (*noopHeartbeatProcessTracer) OnAllStageFinished() {} func (*noopHeartbeatProcessTracer) LogFields() []zap.Field { return nil } +func (*noopHeartbeatProcessTracer) Release() {} type regionHeartbeatProcessTracer struct { startTime time.Time @@ -149,9 +152,22 @@ type regionHeartbeatProcessTracer struct { OtherDuration time.Duration } +var tracerPool = sync.Pool{ + New: func() any { + return ®ionHeartbeatProcessTracer{} + }, +} + // NewHeartbeatProcessTracer returns a heartbeat process tracer. func NewHeartbeatProcessTracer() RegionHeartbeatProcessTracer { - return ®ionHeartbeatProcessTracer{} + return tracerPool.Get().(*regionHeartbeatProcessTracer) +} + +// Release puts the tracer back into the pool. +func (h *regionHeartbeatProcessTracer) Release() { + // Reset the fields of h to their zero values. + *h = regionHeartbeatProcessTracer{} + tracerPool.Put(h) } func (h *regionHeartbeatProcessTracer) Begin() { diff --git a/pkg/core/region.go b/pkg/core/region.go index be8f392f05e2..8f22be29cf31 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -138,26 +138,23 @@ func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCre // classifyVoterAndLearner sorts out voter and learner from peers into different slice. func classifyVoterAndLearner(region *RegionInfo) { - learners := make([]*metapb.Peer, 0, 1) - voters := make([]*metapb.Peer, 0, len(region.meta.Peers)) - witnesses := make([]*metapb.Peer, 0, 1) + // Reset slices + region.learners = region.learners[:0] + region.voters = region.voters[:0] + region.witnesses = region.witnesses[:0] for _, p := range region.meta.Peers { if IsLearner(p) { - learners = append(learners, p) + region.learners = append(region.learners, p) } else { - voters = append(voters, p) + region.voters = append(region.voters, p) } - // Whichever peer role can be a witness if IsWitness(p) { - witnesses = append(witnesses, p) + region.witnesses = append(region.witnesses, p) } } - sort.Sort(peerSlice(learners)) - sort.Sort(peerSlice(voters)) - sort.Sort(peerSlice(witnesses)) - region.learners = learners - region.voters = voters - region.witnesses = witnesses + sort.Sort(peerSlice(region.learners)) + sort.Sort(peerSlice(region.voters)) + sort.Sort(peerSlice(region.witnesses)) } // peersEqualTo returns true when the peers are not changed, which may caused by: the region leader not changed, diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index bfa1bf1865f6..69a0a98d9a84 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -15,6 +15,7 @@ package ratelimit import ( + "container/list" "context" "errors" "sync" @@ -35,8 +36,6 @@ const ( SaveRegionToKV = "SaveRegionToKV" ) -const initialCapacity = 100 - // Runner is the interface for running tasks. type Runner interface { RunTask(ctx context.Context, f func(context.Context), opts ...TaskOption) error @@ -44,7 +43,14 @@ type Runner interface { Stop() } -// Task is a task to be run. +// TaskPool is a pool for tasks. +var TaskPool = &sync.Pool{ + New: func() any { + return &Task{} + }, +} + +// Task is a task to run. type Task struct { Ctx context.Context Opts *TaskOpts @@ -52,6 +58,26 @@ type Task struct { submittedAt time.Time } +func NewTask(ctx context.Context, f func(context.Context), opts ...TaskOption) *Task { + task := TaskPool.Get().(*Task) + task.Ctx = ctx + task.f = f + task.Opts = &TaskOpts{} + task.submittedAt = time.Now() + for _, opt := range opts { + opt(task.Opts) + } + return task +} + +// ReleaseTask releases the task. +func ReleaseTask(task *Task) { + task.Ctx = nil + task.Opts = nil + task.f = nil + TaskPool.Put(task) +} + // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. var ErrMaxWaitingTasksExceeded = errors.New("max waiting tasks exceeded") @@ -61,7 +87,7 @@ type ConcurrentRunner struct { limiter *ConcurrencyLimiter maxPendingDuration time.Duration taskChan chan *Task - pendingTasks []*Task + pendingTasks *list.List pendingMu sync.Mutex stopChan chan struct{} wg sync.WaitGroup @@ -77,7 +103,7 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur limiter: limiter, maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), - pendingTasks: make([]*Task, 0, initialCapacity), + pendingTasks: list.New(), failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name), pendingTaskCount: make(map[string]int64), maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), @@ -114,21 +140,22 @@ func (cr *ConcurrentRunner) Start() { if err != nil { continue } - go cr.run(task.Ctx, task.f, token) + go cr.run(task, token) } else { - go cr.run(task.Ctx, task.f, nil) + go cr.run(task, nil) } case <-cr.stopChan: cr.pendingMu.Lock() - cr.pendingTasks = make([]*Task, 0, initialCapacity) + cr.pendingTasks = list.New() cr.pendingMu.Unlock() log.Info("stopping async task runner", zap.String("name", cr.name)) return case <-ticker.C: maxDuration := time.Duration(0) cr.pendingMu.Lock() - if len(cr.pendingTasks) > 0 { - maxDuration = time.Since(cr.pendingTasks[0].submittedAt) + if cr.pendingTasks.Len() > 0 { + first := cr.pendingTasks.Front() + maxDuration = time.Since(first.Value.(*Task).submittedAt) } for name, cnt := range cr.pendingTaskCount { RunnerTaskPendingTasks.WithLabelValues(cr.name, name).Set(float64(cnt)) @@ -140,27 +167,28 @@ func (cr *ConcurrentRunner) Start() { }() } -func (cr *ConcurrentRunner) run(ctx context.Context, task func(context.Context), token *TaskToken) { - task(ctx) +func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) { + task.f(task.Ctx) if token != nil { token.Release() cr.processPendingTasks() } + ReleaseTask(task) } func (cr *ConcurrentRunner) processPendingTasks() { cr.pendingMu.Lock() defer cr.pendingMu.Unlock() - for len(cr.pendingTasks) > 0 { - task := cr.pendingTasks[0] + if cr.pendingTasks.Len() > 0 { + first := cr.pendingTasks.Front() + task := first.Value.(*Task) select { case cr.taskChan <- task: - cr.pendingTasks = cr.pendingTasks[1:] + cr.pendingTasks.Remove(first) cr.pendingTaskCount[task.Opts.TaskName]-- - return default: - return } + return } } @@ -172,15 +200,7 @@ func (cr *ConcurrentRunner) Stop() { // RunTask runs the task asynchronously. func (cr *ConcurrentRunner) RunTask(ctx context.Context, f func(context.Context), opts ...TaskOption) error { - taskOpts := &TaskOpts{} - for _, opt := range opts { - opt(taskOpts) - } - task := &Task{ - Ctx: ctx, - f: f, - Opts: taskOpts, - } + task := NewTask(ctx, f, opts...) cr.processPendingTasks() select { @@ -188,16 +208,17 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, f func(context.Context) default: cr.pendingMu.Lock() defer cr.pendingMu.Unlock() - if len(cr.pendingTasks) > 0 { - maxWait := time.Since(cr.pendingTasks[0].submittedAt) + if cr.pendingTasks.Len() > 0 { + first := cr.pendingTasks.Front() + maxWait := time.Since(first.Value.(*Task).submittedAt) if maxWait > cr.maxPendingDuration { cr.failedTaskCount.Inc() return ErrMaxWaitingTasksExceeded } } task.submittedAt = time.Now() - cr.pendingTasks = append(cr.pendingTasks, task) - cr.pendingTaskCount[taskOpts.TaskName]++ + cr.pendingTasks.PushBack(task) + cr.pendingTaskCount[task.Opts.TaskName]++ } return nil } diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 565597b4efb3..ee30f33389c7 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -207,14 +207,27 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store } } } + + peers := region.GetPeers() + downPeers := region.GetDownPeers() + pendingPeers := region.GetPendingPeers() + learners := region.GetLearners() + voters := region.GetVoters() + regionSize := region.GetApproximateSize() + regionMaxSize := int64(r.conf.GetRegionMaxSize()) + regionMaxKeys := int64(r.conf.GetRegionMaxKeys()) + maxMergeRegionSize := int64(r.conf.GetMaxMergeRegionSize()) + maxMergeRegionKeys := int64(r.conf.GetMaxMergeRegionKeys()) + leaderIsWitness := region.GetLeader().GetIsWitness() + // Better to make sure once any of these conditions changes, it will trigger the heartbeat `save_cache`. // Otherwise, the state may be out-of-date for a long time, which needs another way to apply the change ASAP. // For example, see `RegionStatsNeedUpdate` above to know how `OversizedRegion` and `UndersizedRegion` are updated. conditions := map[RegionStatisticType]bool{ - MissPeer: len(region.GetPeers()) < desiredReplicas, - ExtraPeer: len(region.GetPeers()) > desiredReplicas, - DownPeer: len(region.GetDownPeers()) > 0, - PendingPeer: len(region.GetPendingPeers()) > 0, + MissPeer: len(peers) < desiredReplicas, + ExtraPeer: len(peers) > desiredReplicas, + DownPeer: len(downPeers) > 0, + PendingPeer: len(pendingPeers) > 0, OfflinePeer: func() bool { for _, store := range stores { if store.IsRemoving() { @@ -226,17 +239,11 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store } return false }(), - LearnerPeer: len(region.GetLearners()) > 0, - EmptyRegion: region.GetApproximateSize() <= core.EmptyRegionApproximateSize, - OversizedRegion: region.IsOversized( - int64(r.conf.GetRegionMaxSize()), - int64(r.conf.GetRegionMaxKeys()), - ), - UndersizedRegion: region.NeedMerge( - int64(r.conf.GetMaxMergeRegionSize()), - int64(r.conf.GetMaxMergeRegionKeys()), - ), - WitnessLeader: region.GetLeader().GetIsWitness(), + LearnerPeer: len(learners) > 0, + EmptyRegion: regionSize <= core.EmptyRegionApproximateSize, + OversizedRegion: region.IsOversized(regionMaxSize, regionMaxKeys), + UndersizedRegion: region.NeedMerge(maxMergeRegionSize, maxMergeRegionKeys), + WitnessLeader: leaderIsWitness, } // Check if the region meets any of the conditions and update the corresponding info. regionID := region.GetID() @@ -253,7 +260,7 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store info.startDownPeerTS = time.Now().Unix() logDownPeerWithNoDisconnectedStore(region, stores) } - } else if typ == MissPeer && len(region.GetVoters()) < desiredVoters { + } else if typ == MissPeer && len(voters) < desiredVoters { if info.startMissVoterPeerTS != 0 { regionMissVoterPeerDuration.Observe(float64(time.Now().Unix() - info.startMissVoterPeerTS)) } else { diff --git a/pkg/utils/grpcutil/grpcutil.go b/pkg/utils/grpcutil/grpcutil.go index 9b8cc2feb493..5633533ae4a1 100644 --- a/pkg/utils/grpcutil/grpcutil.go +++ b/pkg/utils/grpcutil/grpcutil.go @@ -186,13 +186,9 @@ func ResetForwardContext(ctx context.Context) context.Context { // GetForwardedHost returns the forwarded host in metadata. func GetForwardedHost(ctx context.Context) string { - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - log.Debug("failed to get gRPC incoming metadata when getting forwarded host") - return "" - } - if t, ok := md[ForwardMetadataKey]; ok { - return t[0] + s := metadata.ValueFromIncomingContext(ctx, ForwardMetadataKey) + if len(s) > 0 { + return s[0] } return "" } diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index fcddea58b18b..3d20fbb5e2f3 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -59,6 +59,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { return err } tracer.OnAllStageFinished() + tracer.Release() if c.IsServiceIndependent(mcsutils.SchedulingServiceName) { return nil