diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index 855aa793a83..8319a395ac8 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -130,47 +130,50 @@ func (m *Manager) UpdateProgress(progress string, current, remaining float64, is m.Lock() defer m.Unlock() - if p, exist := m.progresses[progress]; exist { - for _, op := range opts { - op(p) - } - p.remaining = remaining - if p.total < remaining { - p.total = remaining - } + p, exist := m.progresses[progress] + if !exist { + return + } - p.history.PushBack(current) - p.currentWindowLength++ + for _, op := range opts { + op(p) + } + p.remaining = remaining + if p.total < remaining { + p.total = remaining + } - // try to move `front` into correct place. - for p.currentWindowLength > p.windowLength { - p.front = p.front.Next() - p.currentWindowLength-- - } - for p.currentWindowLength < p.windowLength && p.front.Prev() != nil { - p.front = p.front.Prev() - p.currentWindowLength++ - } + p.history.PushBack(current) + p.currentWindowLength++ - for p.history.Len() > p.windowCapacity { - p.history.Remove(p.history.Front()) - } + // try to move `front` into correct place. + for p.currentWindowLength > p.windowLength { + p.front = p.front.Next() + p.currentWindowLength-- + } + for p.currentWindowLength < p.windowLength && p.front.Prev() != nil { + p.front = p.front.Prev() + p.currentWindowLength++ + } - // It means it just init and we haven't update the progress - if p.history.Len() <= 1 { - p.lastSpeed = 0 - } else if isInc { - // the value increases, e.g., [1, 2, 3] - p.lastSpeed = (current - p.front.Value.(float64)) / - (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) - } else { - // the value decreases, e.g., [3, 2, 1] - p.lastSpeed = (p.front.Value.(float64) - current) / - (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) - } - if p.lastSpeed < 0 { - p.lastSpeed = 0 - } + for p.history.Len() > p.windowCapacity { + p.history.Remove(p.history.Front()) + } + + // It means it just init and we haven't update the progress + if p.history.Len() <= 1 { + p.lastSpeed = 0 + } else if isInc { + // the value increases, e.g., [1, 2, 3] + p.lastSpeed = (current - p.front.Value.(float64)) / + (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) + } else { + // the value decreases, e.g., [3, 2, 1] + p.lastSpeed = (p.front.Value.(float64) - current) / + (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) + } + if p.lastSpeed < 0 { + p.lastSpeed = 0 } } @@ -201,39 +204,40 @@ func (m *Manager) GetProgresses(filter func(p string) bool) []string { m.RLock() defer m.RUnlock() - processes := []string{} + progresses := make([]string, 0, len(m.progresses)) for p := range m.progresses { if filter(p) { - processes = append(processes, p) + progresses = append(progresses, p) } } - return processes + return progresses } // Status returns the current progress status of a give name. -func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed float64, err error) { +func (m *Manager) Status(progressName string) (progress, leftSeconds, currentSpeed float64, err error) { m.RLock() defer m.RUnlock() - if p, exist := m.progresses[progress]; exist { - process = 1 - p.remaining/p.total - if process < 0 { - process = 0 - err = errs.ErrProgressWrongStatus.FastGenByArgs(fmt.Sprintf("the remaining: %v is larger than the total: %v", p.remaining, p.total)) - return - } - currentSpeed = p.lastSpeed - // When the progress is newly added, there is no last speed. - if p.lastSpeed == 0 && p.history.Len() <= 1 { - currentSpeed = 0 - } - - leftSeconds = p.remaining / currentSpeed - if math.IsNaN(leftSeconds) || math.IsInf(leftSeconds, 0) { - leftSeconds = math.MaxFloat64 - } + p, exist := m.progresses[progressName] + if !exist { + err = errs.ErrProgressNotFound.FastGenByArgs(fmt.Sprintf("the progress: %s", progressName)) + return + } + progress = 1 - p.remaining/p.total + if progress < 0 { + progress = 0 + err = errs.ErrProgressWrongStatus.FastGenByArgs(fmt.Sprintf("the remaining: %v is larger than the total: %v", p.remaining, p.total)) return } - err = errs.ErrProgressNotFound.FastGenByArgs(fmt.Sprintf("the progress: %s", progress)) + currentSpeed = p.lastSpeed + // When the progress is newly added, there is no last speed. + if p.lastSpeed == 0 && p.history.Len() <= 1 { + currentSpeed = 0 + } + + leftSeconds = p.remaining / currentSpeed + if math.IsNaN(leftSeconds) || math.IsInf(leftSeconds, 0) { + leftSeconds = math.MaxFloat64 + } return } diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index ab30b256823..9b13e292c87 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -282,7 +282,6 @@ func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluste func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { evictSlowStoreCounter.Inc() - var ops []*operator.Operator if s.conf.evictStore() != 0 { store := cluster.GetStore(s.conf.evictStore()) @@ -298,7 +297,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool return s.schedulerEvictLeader(cluster), nil } s.cleanupEvictLeader(cluster) - return ops, nil + return nil, nil } var slowStore *core.StoreInfo @@ -311,14 +310,14 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool if (store.IsPreparing() || store.IsServing()) && store.IsSlow() { // Do nothing if there is more than one slow store. if slowStore != nil { - return ops, nil + return nil, nil } slowStore = store } } if slowStore == nil || slowStore.GetSlowScore() < slowStoreEvictThreshold { - return ops, nil + return nil, nil } // If there is only one slow store, evict leaders from that store. @@ -327,7 +326,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool err := s.prepareEvictLeader(cluster, slowStore.GetID()) if err != nil { log.Info("prepare for evicting leader failed", zap.Error(err), zap.Uint64("store-id", slowStore.GetID())) - return ops, nil + return nil, nil } return s.schedulerEvictLeader(cluster), nil } diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 334a2f1199a..ea480a06845 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -456,6 +456,7 @@ func (s *ScheduleController) Stop() { // Schedule tries to create some operators. func (s *ScheduleController) Schedule(diagnosable bool) []*operator.Operator { +retry: for i := 0; i < maxScheduleRetries; i++ { // no need to retry if schedule should stop to speed exit select { @@ -470,29 +471,27 @@ func (s *ScheduleController) Schedule(diagnosable bool) []*operator.Operator { if diagnosable { s.diagnosticRecorder.SetResultFromPlans(ops, plans) } - foundDisabled := false + if len(ops) == 0 { + continue + } + + // If we have schedule, reset interval to the minimal interval. + s.nextInterval = s.Scheduler.GetMinInterval() for _, op := range ops { - if labelMgr := s.cluster.GetRegionLabeler(); labelMgr != nil { - region := s.cluster.GetRegion(op.RegionID()) - if region == nil { - continue - } - if labelMgr.ScheduleDisabled(region) { - denySchedulersByLabelerCounter.Inc() - foundDisabled = true - break - } + region := s.cluster.GetRegion(op.RegionID()) + if region == nil { + continue retry } - } - if len(ops) > 0 { - // If we have schedule, reset interval to the minimal interval. - s.nextInterval = s.Scheduler.GetMinInterval() - // try regenerating operators - if foundDisabled { + labelMgr := s.cluster.GetRegionLabeler() + if labelMgr == nil { continue } - return ops + if labelMgr.ScheduleDisabled(region) { + denySchedulersByLabelerCounter.Inc() + continue retry + } } + return ops } s.nextInterval = s.Scheduler.GetNextInterval(s.nextInterval) return nil diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 851632bd61a..747cf2dc538 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1345,21 +1345,22 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro zap.Uint64("store-id", storeID), zap.String("store-address", newStore.GetAddress()), zap.Bool("physically-destroyed", newStore.IsPhysicallyDestroyed())) - err := c.setStore(newStore) - if err == nil { - regionSize := float64(c.GetStoreRegionSize(storeID)) - c.resetProgress(storeID, store.GetAddress()) - c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.GetCoordinator().GetPatrolRegionsDuration())) - // record the current store limit in memory - c.prevStoreLimit[storeID] = map[storelimit.Type]float64{ - storelimit.AddPeer: c.GetStoreLimitByType(storeID, storelimit.AddPeer), - storelimit.RemovePeer: c.GetStoreLimitByType(storeID, storelimit.RemovePeer), - } - // TODO: if the persist operation encounters error, the "Unlimited" will be rollback. - // And considering the store state has changed, RemoveStore is actually successful. - _ = c.SetStoreLimit(storeID, storelimit.RemovePeer, storelimit.Unlimited) + + if err := c.setStore(newStore); err != nil { + return err } - return err + regionSize := float64(c.GetStoreRegionSize(storeID)) + c.resetProgress(storeID, store.GetAddress()) + c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.GetCoordinator().GetPatrolRegionsDuration())) + // record the current store limit in memory + c.prevStoreLimit[storeID] = map[storelimit.Type]float64{ + storelimit.AddPeer: c.GetStoreLimitByType(storeID, storelimit.AddPeer), + storelimit.RemovePeer: c.GetStoreLimitByType(storeID, storelimit.RemovePeer), + } + // TODO: if the persist operation encounters error, the "Unlimited" will be rollback. + // And considering the store state has changed, RemoveStore is actually successful. + _ = c.SetStoreLimit(storeID, storelimit.RemovePeer, storelimit.Unlimited) + return nil } func (c *RaftCluster) checkReplicaBeforeOfflineStore(storeID uint64) error { @@ -1846,14 +1847,14 @@ func (c *RaftCluster) updateProgress(storeID uint64, storeAddress, action string return } c.progressManager.UpdateProgress(progressName, current, remaining, isInc, opts...) - process, ls, cs, err := c.progressManager.Status(progressName) + progress, leftSeconds, currentSpeed, err := c.progressManager.Status(progressName) if err != nil { log.Error("get progress status failed", zap.String("progress", progressName), zap.Float64("remaining", remaining), errs.ZapError(err)) return } - storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(process) - storesSpeedGauge.WithLabelValues(storeAddress, storeLabel, action).Set(cs) - storesETAGauge.WithLabelValues(storeAddress, storeLabel, action).Set(ls) + storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(progress) + storesSpeedGauge.WithLabelValues(storeAddress, storeLabel, action).Set(currentSpeed) + storesETAGauge.WithLabelValues(storeAddress, storeLabel, action).Set(leftSeconds) } func (c *RaftCluster) resetProgress(storeID uint64, storeAddress string) {