Skip to content

Commit

Permalink
*: refine code, decrease indent, and rename (#8276)
Browse files Browse the repository at this point in the history
ref #4399

- decrease indent for code readability
- use `continue loop` to instead with `foundDisabled` variable, this is more concise
- no other logic updates
- rename `process` to `progress`

Signed-off-by: okJiang <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
okJiang and ti-chi-bot[bot] authored Jun 14, 2024
1 parent 9dff6e6 commit f1e85de
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 100 deletions.
122 changes: 63 additions & 59 deletions pkg/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,47 +130,50 @@ func (m *Manager) UpdateProgress(progress string, current, remaining float64, is
m.Lock()
defer m.Unlock()

if p, exist := m.progresses[progress]; exist {
for _, op := range opts {
op(p)
}
p.remaining = remaining
if p.total < remaining {
p.total = remaining
}
p, exist := m.progresses[progress]
if !exist {
return
}

p.history.PushBack(current)
p.currentWindowLength++
for _, op := range opts {
op(p)
}
p.remaining = remaining
if p.total < remaining {
p.total = remaining
}

// try to move `front` into correct place.
for p.currentWindowLength > p.windowLength {
p.front = p.front.Next()
p.currentWindowLength--
}
for p.currentWindowLength < p.windowLength && p.front.Prev() != nil {
p.front = p.front.Prev()
p.currentWindowLength++
}
p.history.PushBack(current)
p.currentWindowLength++

for p.history.Len() > p.windowCapacity {
p.history.Remove(p.history.Front())
}
// try to move `front` into correct place.
for p.currentWindowLength > p.windowLength {
p.front = p.front.Next()
p.currentWindowLength--
}
for p.currentWindowLength < p.windowLength && p.front.Prev() != nil {
p.front = p.front.Prev()
p.currentWindowLength++
}

// It means it just init and we haven't update the progress
if p.history.Len() <= 1 {
p.lastSpeed = 0
} else if isInc {
// the value increases, e.g., [1, 2, 3]
p.lastSpeed = (current - p.front.Value.(float64)) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
} else {
// the value decreases, e.g., [3, 2, 1]
p.lastSpeed = (p.front.Value.(float64) - current) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
}
if p.lastSpeed < 0 {
p.lastSpeed = 0
}
for p.history.Len() > p.windowCapacity {
p.history.Remove(p.history.Front())
}

// It means it just init and we haven't update the progress
if p.history.Len() <= 1 {
p.lastSpeed = 0
} else if isInc {
// the value increases, e.g., [1, 2, 3]
p.lastSpeed = (current - p.front.Value.(float64)) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
} else {
// the value decreases, e.g., [3, 2, 1]
p.lastSpeed = (p.front.Value.(float64) - current) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
}
if p.lastSpeed < 0 {
p.lastSpeed = 0
}
}

Expand Down Expand Up @@ -201,39 +204,40 @@ func (m *Manager) GetProgresses(filter func(p string) bool) []string {
m.RLock()
defer m.RUnlock()

processes := []string{}
progresses := make([]string, 0, len(m.progresses))
for p := range m.progresses {
if filter(p) {
processes = append(processes, p)
progresses = append(progresses, p)
}
}
return processes
return progresses
}

// Status returns the current progress status of a give name.
func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed float64, err error) {
func (m *Manager) Status(progressName string) (progress, leftSeconds, currentSpeed float64, err error) {
m.RLock()
defer m.RUnlock()

if p, exist := m.progresses[progress]; exist {
process = 1 - p.remaining/p.total
if process < 0 {
process = 0
err = errs.ErrProgressWrongStatus.FastGenByArgs(fmt.Sprintf("the remaining: %v is larger than the total: %v", p.remaining, p.total))
return
}
currentSpeed = p.lastSpeed
// When the progress is newly added, there is no last speed.
if p.lastSpeed == 0 && p.history.Len() <= 1 {
currentSpeed = 0
}

leftSeconds = p.remaining / currentSpeed
if math.IsNaN(leftSeconds) || math.IsInf(leftSeconds, 0) {
leftSeconds = math.MaxFloat64
}
p, exist := m.progresses[progressName]
if !exist {
err = errs.ErrProgressNotFound.FastGenByArgs(fmt.Sprintf("the progress: %s", progressName))
return
}
progress = 1 - p.remaining/p.total
if progress < 0 {
progress = 0
err = errs.ErrProgressWrongStatus.FastGenByArgs(fmt.Sprintf("the remaining: %v is larger than the total: %v", p.remaining, p.total))
return
}
err = errs.ErrProgressNotFound.FastGenByArgs(fmt.Sprintf("the progress: %s", progress))
currentSpeed = p.lastSpeed
// When the progress is newly added, there is no last speed.
if p.lastSpeed == 0 && p.history.Len() <= 1 {
currentSpeed = 0
}

leftSeconds = p.remaining / currentSpeed
if math.IsNaN(leftSeconds) || math.IsInf(leftSeconds, 0) {
leftSeconds = math.MaxFloat64
}
return
}
9 changes: 4 additions & 5 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluste

func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) {
evictSlowStoreCounter.Inc()
var ops []*operator.Operator

if s.conf.evictStore() != 0 {
store := cluster.GetStore(s.conf.evictStore())
Expand All @@ -298,7 +297,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool
return s.schedulerEvictLeader(cluster), nil
}
s.cleanupEvictLeader(cluster)
return ops, nil
return nil, nil
}

var slowStore *core.StoreInfo
Expand All @@ -311,14 +310,14 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool
if (store.IsPreparing() || store.IsServing()) && store.IsSlow() {
// Do nothing if there is more than one slow store.
if slowStore != nil {
return ops, nil
return nil, nil
}
slowStore = store
}
}

if slowStore == nil || slowStore.GetSlowScore() < slowStoreEvictThreshold {
return ops, nil
return nil, nil
}

// If there is only one slow store, evict leaders from that store.
Expand All @@ -327,7 +326,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool
err := s.prepareEvictLeader(cluster, slowStore.GetID())
if err != nil {
log.Info("prepare for evicting leader failed", zap.Error(err), zap.Uint64("store-id", slowStore.GetID()))
return ops, nil
return nil, nil
}
return s.schedulerEvictLeader(cluster), nil
}
Expand Down
35 changes: 17 additions & 18 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ func (s *ScheduleController) Stop() {

// Schedule tries to create some operators.
func (s *ScheduleController) Schedule(diagnosable bool) []*operator.Operator {
retry:
for i := 0; i < maxScheduleRetries; i++ {
// no need to retry if schedule should stop to speed exit
select {
Expand All @@ -470,29 +471,27 @@ func (s *ScheduleController) Schedule(diagnosable bool) []*operator.Operator {
if diagnosable {
s.diagnosticRecorder.SetResultFromPlans(ops, plans)
}
foundDisabled := false
if len(ops) == 0 {
continue
}

// If we have schedule, reset interval to the minimal interval.
s.nextInterval = s.Scheduler.GetMinInterval()
for _, op := range ops {
if labelMgr := s.cluster.GetRegionLabeler(); labelMgr != nil {
region := s.cluster.GetRegion(op.RegionID())
if region == nil {
continue
}
if labelMgr.ScheduleDisabled(region) {
denySchedulersByLabelerCounter.Inc()
foundDisabled = true
break
}
region := s.cluster.GetRegion(op.RegionID())
if region == nil {
continue retry
}
}
if len(ops) > 0 {
// If we have schedule, reset interval to the minimal interval.
s.nextInterval = s.Scheduler.GetMinInterval()
// try regenerating operators
if foundDisabled {
labelMgr := s.cluster.GetRegionLabeler()
if labelMgr == nil {
continue
}
return ops
if labelMgr.ScheduleDisabled(region) {
denySchedulersByLabelerCounter.Inc()
continue retry
}
}
return ops
}
s.nextInterval = s.Scheduler.GetNextInterval(s.nextInterval)
return nil
Expand Down
37 changes: 19 additions & 18 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,21 +1345,22 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro
zap.Uint64("store-id", storeID),
zap.String("store-address", newStore.GetAddress()),
zap.Bool("physically-destroyed", newStore.IsPhysicallyDestroyed()))
err := c.setStore(newStore)
if err == nil {
regionSize := float64(c.GetStoreRegionSize(storeID))
c.resetProgress(storeID, store.GetAddress())
c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.GetCoordinator().GetPatrolRegionsDuration()))
// record the current store limit in memory
c.prevStoreLimit[storeID] = map[storelimit.Type]float64{
storelimit.AddPeer: c.GetStoreLimitByType(storeID, storelimit.AddPeer),
storelimit.RemovePeer: c.GetStoreLimitByType(storeID, storelimit.RemovePeer),
}
// TODO: if the persist operation encounters error, the "Unlimited" will be rollback.
// And considering the store state has changed, RemoveStore is actually successful.
_ = c.SetStoreLimit(storeID, storelimit.RemovePeer, storelimit.Unlimited)

if err := c.setStore(newStore); err != nil {
return err
}
return err
regionSize := float64(c.GetStoreRegionSize(storeID))
c.resetProgress(storeID, store.GetAddress())
c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.GetCoordinator().GetPatrolRegionsDuration()))
// record the current store limit in memory
c.prevStoreLimit[storeID] = map[storelimit.Type]float64{
storelimit.AddPeer: c.GetStoreLimitByType(storeID, storelimit.AddPeer),
storelimit.RemovePeer: c.GetStoreLimitByType(storeID, storelimit.RemovePeer),
}
// TODO: if the persist operation encounters error, the "Unlimited" will be rollback.
// And considering the store state has changed, RemoveStore is actually successful.
_ = c.SetStoreLimit(storeID, storelimit.RemovePeer, storelimit.Unlimited)
return nil
}

func (c *RaftCluster) checkReplicaBeforeOfflineStore(storeID uint64) error {
Expand Down Expand Up @@ -1846,14 +1847,14 @@ func (c *RaftCluster) updateProgress(storeID uint64, storeAddress, action string
return
}
c.progressManager.UpdateProgress(progressName, current, remaining, isInc, opts...)
process, ls, cs, err := c.progressManager.Status(progressName)
progress, leftSeconds, currentSpeed, err := c.progressManager.Status(progressName)
if err != nil {
log.Error("get progress status failed", zap.String("progress", progressName), zap.Float64("remaining", remaining), errs.ZapError(err))
return
}
storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(process)
storesSpeedGauge.WithLabelValues(storeAddress, storeLabel, action).Set(cs)
storesETAGauge.WithLabelValues(storeAddress, storeLabel, action).Set(ls)
storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(progress)
storesSpeedGauge.WithLabelValues(storeAddress, storeLabel, action).Set(currentSpeed)
storesETAGauge.WithLabelValues(storeAddress, storeLabel, action).Set(leftSeconds)
}

func (c *RaftCluster) resetProgress(storeID uint64, storeAddress string) {
Expand Down

0 comments on commit f1e85de

Please sign in to comment.