From 98974bfae1dca3a501976cf68638fb064ef1b672 Mon Sep 17 00:00:00 2001 From: Hu# Date: Tue, 14 Nov 2023 12:11:14 +0800 Subject: [PATCH] This is an automated cherry-pick of #7217 ref tikv/pd#7016 Signed-off-by: ti-chi-bot --- pkg/cluster/cluster.go | 62 + pkg/core/region.go | 18 +- pkg/core/region_tree.go | 7 + server/cluster/cluster.go | 2 +- server/cluster/cluster_test.go | 1436 +++++++++++++++++++++++ server/cluster/prepare_checker.go | 32 +- tests/pdctl/scheduler/scheduler_test.go | 124 ++ tests/server/cluster/cluster_test.go | 2 +- tests/testutil.go | 316 +++++ 9 files changed, 1986 insertions(+), 13 deletions(-) create mode 100644 pkg/cluster/cluster.go create mode 100644 tests/testutil.go diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go new file mode 100644 index 000000000000..8809a7069362 --- /dev/null +++ b/pkg/cluster/cluster.go @@ -0,0 +1,62 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/statistics" +) + +// Cluster provides an overview of a cluster's basic information. +type Cluster interface { + GetHotStat() *statistics.HotStat + GetRegionStats() *statistics.RegionStatistics + GetLabelStats() *statistics.LabelStatistics + GetCoordinator() *schedule.Coordinator + GetRuleManager() *placement.RuleManager +} + +// HandleStatsAsync handles the flow asynchronously. +func HandleStatsAsync(c Cluster, region *core.RegionInfo) { + c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) + c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + for _, peer := range region.GetPeers() { + peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) + c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) + } + c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) +} + +// HandleOverlaps handles the overlap regions. +func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) { + for _, item := range overlaps { + if c.GetRegionStats() != nil { + c.GetRegionStats().ClearDefunctRegion(item.GetID()) + } + c.GetLabelStats().ClearDefunctRegion(item.GetID()) + c.GetRuleManager().InvalidCache(item.GetID()) + } +} + +// Collect collects the cluster information. +func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats, isNew, isPrepared bool) { + if hasRegionStats { + c.GetRegionStats().Observe(region, stores) + } +} diff --git a/pkg/core/region.go b/pkg/core/region.go index cfe24ba02137..0b042c3d4a02 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -1289,11 +1289,23 @@ func (r *RegionsInfo) GetStoreWriteRate(storeID uint64) (bytesRate, keysRate flo return } -// GetClusterNotFromStorageRegionsCnt gets the total count of regions that not loaded from storage anymore +// GetClusterNotFromStorageRegionsCnt gets the `NotFromStorageRegionsCnt` count of regions that not loaded from storage anymore. func (r *RegionsInfo) GetClusterNotFromStorageRegionsCnt() int { r.t.RLock() defer r.t.RUnlock() - return r.tree.notFromStorageRegionsCnt + return r.tree.notFromStorageRegionsCount() +} + +// GetNotFromStorageRegionsCntByStore gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID. +func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int { + r.st.RLock() + defer r.st.RUnlock() + return r.getNotFromStorageRegionsCntByStoreLocked(storeID) +} + +// getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID. +func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int { + return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount() } // GetMetaRegions gets a set of metapb.Region from regionMap @@ -1329,7 +1341,7 @@ func (r *RegionsInfo) GetStoreRegionCount(storeID uint64) int { return r.getStoreRegionCountLocked(storeID) } -// GetStoreRegionCount gets the total count of a store's leader, follower and learner RegionInfo by storeID +// getStoreRegionCountLocked gets the total count of a store's leader, follower and learner RegionInfo by storeID func (r *RegionsInfo) getStoreRegionCountLocked(storeID uint64) int { return r.leaders[storeID].length() + r.followers[storeID].length() + r.learners[storeID].length() } diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index ed3445de6b64..ecc988d97d8b 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -82,6 +82,13 @@ func (t *regionTree) length() int { return t.tree.Len() } +func (t *regionTree) notFromStorageRegionsCount() int { + if t == nil { + return 0 + } + return t.notFromStorageRegionsCnt +} + // GetOverlaps returns the range items that has some intersections with the given items. func (t *regionTree) overlaps(item *regionItem) []*regionItem { // note that Find() gets the last item that is less or equal than the item. diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index e0a4e00be030..1a6c1e7e0c70 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1050,7 +1050,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, // check its validation again here. // - // However it can't solve the race condition of concurrent heartbeats from the same region. + // However, it can't solve the race condition of concurrent heartbeats from the same region. if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil { return err } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index f7ff71b888d9..d53c2b365080 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2189,3 +2189,1439 @@ func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error { return nil } +<<<<<<< HEAD +======= + +func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind operator.OpKind, steps ...operator.OpStep) *operator.Operator { + return operator.NewTestOperator(regionID, regionEpoch, kind, steps...) +} + +func (c *testCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { + id, err := c.AllocID() + if err != nil { + return nil, err + } + return &metapb.Peer{Id: id, StoreId: storeID}, nil +} + +func (c *testCluster) addRegionStore(storeID uint64, regionCount int, regionSizes ...uint64) error { + var regionSize uint64 + if len(regionSizes) == 0 { + regionSize = uint64(regionCount) * 10 + } else { + regionSize = regionSizes[0] + } + + stats := &pdpb.StoreStats{} + stats.Capacity = 100 * units.GiB + stats.UsedSize = regionSize * units.MiB + stats.Available = stats.Capacity - stats.UsedSize + newStore := core.NewStoreInfo(&metapb.Store{Id: storeID}, + core.SetStoreStats(stats), + core.SetRegionCount(regionCount), + core.SetRegionSize(int64(regionSize)), + core.SetLastHeartbeatTS(time.Now()), + ) + + c.SetStoreLimit(storeID, storelimit.AddPeer, 60) + c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) addLeaderRegion(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) error { + region := newTestRegionMeta(regionID) + leader, _ := c.AllocPeer(leaderStoreID) + region.Peers = []*metapb.Peer{leader} + for _, followerStoreID := range followerStoreIDs { + peer, _ := c.AllocPeer(followerStoreID) + region.Peers = append(region.Peers, peer) + } + regionInfo := core.NewRegionInfo(region, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10)) + return c.putRegion(regionInfo) +} + +func (c *testCluster) updateLeaderCount(storeID uint64, leaderCount int) error { + store := c.GetStore(storeID) + newStore := store.Clone( + core.SetLeaderCount(leaderCount), + core.SetLeaderSize(int64(leaderCount)*10), + ) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) addLeaderStore(storeID uint64, leaderCount int) error { + stats := &pdpb.StoreStats{} + newStore := core.NewStoreInfo(&metapb.Store{Id: storeID}, + core.SetStoreStats(stats), + core.SetLeaderCount(leaderCount), + core.SetLeaderSize(int64(leaderCount)*10), + core.SetLastHeartbeatTS(time.Now()), + ) + + c.SetStoreLimit(storeID, storelimit.AddPeer, 60) + c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) setStoreDown(storeID uint64) error { + store := c.GetStore(storeID) + newStore := store.Clone( + core.SetStoreState(metapb.StoreState_Up), + core.SetLastHeartbeatTS(typeutil.ZeroTime), + ) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) setStoreOffline(storeID uint64) error { + store := c.GetStore(storeID) + newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline, false)) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) error { + // regions load from etcd will have no leader + region := newTestRegionMeta(regionID) + region.Peers = []*metapb.Peer{} + for _, id := range followerStoreIDs { + peer, _ := c.AllocPeer(id) + region.Peers = append(region.Peers, peer) + } + return c.putRegion(core.NewRegionInfo(region, nil, core.SetSource(core.Storage))) +} + +func TestBasic(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + + re.NoError(tc.addLeaderRegion(1, 1)) + + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) + re.Equal(op1.RegionID(), oc.GetOperator(1).RegionID()) + + // Region 1 already has an operator, cannot add another one. + op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op2) + re.Equal(uint64(0), oc.OperatorCount(operator.OpRegion)) + + // Remove the operator manually, then we can add a new operator. + re.True(oc.RemoveOperator(op1)) + op3 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op3) + re.Equal(uint64(1), oc.OperatorCount(operator.OpRegion)) + re.Equal(op3.RegionID(), oc.GetOperator(1).RegionID()) +} + +func TestDispatch(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + co.GetPrepareChecker().SetPrepared() + // Transfer peer from store 4 to store 1. + re.NoError(tc.addRegionStore(4, 40)) + re.NoError(tc.addRegionStore(3, 30)) + re.NoError(tc.addRegionStore(2, 20)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + + // Transfer leader from store 4 to store 2. + re.NoError(tc.updateLeaderCount(4, 50)) + re.NoError(tc.updateLeaderCount(3, 50)) + re.NoError(tc.updateLeaderCount(2, 20)) + re.NoError(tc.updateLeaderCount(1, 10)) + re.NoError(tc.addLeaderRegion(2, 4, 3, 2)) + + go co.RunUntilStop() + + // Wait for schedule and turn off balance. + waitOperator(re, co, 1) + controller := co.GetSchedulersController() + operatorutil.CheckTransferPeer(re, co.GetOperatorController().GetOperator(1), operator.OpKind(0), 4, 1) + re.NoError(controller.RemoveScheduler(schedulers.BalanceRegionName)) + waitOperator(re, co, 2) + operatorutil.CheckTransferLeader(re, co.GetOperatorController().GetOperator(2), operator.OpKind(0), 4, 2) + re.NoError(controller.RemoveScheduler(schedulers.BalanceLeaderName)) + + stream := mockhbstream.NewHeartbeatStream() + + // Transfer peer. + region := tc.GetRegion(1).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitRemovePeer(re, stream, region, 4) + re.NoError(dispatchHeartbeat(co, region, stream)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Transfer leader. + region = tc.GetRegion(2).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + waitTransferLeader(re, stream, region, 2) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func dispatchHeartbeat(co *schedule.Coordinator, region *core.RegionInfo, stream hbstream.HeartbeatStream) error { + co.GetHeartbeatStreams().BindStream(region.GetLeader().GetStoreId(), stream) + if err := co.GetCluster().(*RaftCluster).putRegion(region.Clone(core.SetSource(core.Heartbeat))); err != nil { + return err + } + co.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, nil) + return nil +} + +func TestCollectMetricsConcurrent(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, func(tc *testCluster) { + tc.regionStats = statistics.NewRegionStatistics( + tc.GetBasicCluster(), + tc.GetOpts(), + nil) + }, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + // Make sure there are no problem when concurrent write and read + var wg sync.WaitGroup + count := 10 + wg.Add(count + 1) + for i := 0; i <= count; i++ { + go func(i int) { + defer wg.Done() + for j := 0; j < 1000; j++ { + re.NoError(tc.addRegionStore(uint64(i%5), rand.Intn(200))) + } + }(i) + } + controller := co.GetSchedulersController() + for i := 0; i < 1000; i++ { + co.CollectHotSpotMetrics() + controller.CollectSchedulerMetrics() + co.GetCluster().(*RaftCluster).collectStatisticsMetrics() + } + co.ResetHotSpotMetrics() + controller.ResetSchedulerMetrics() + co.GetCluster().(*RaftCluster).resetStatisticsMetrics() + wg.Wait() +} + +func TestCollectMetrics(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, func(tc *testCluster) { + tc.regionStats = statistics.NewRegionStatistics( + tc.GetBasicCluster(), + tc.GetOpts(), + nil) + }, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + count := 10 + for i := 0; i <= count; i++ { + for k := 0; k < 200; k++ { + item := &statistics.HotPeerStat{ + StoreID: uint64(i % 5), + RegionID: uint64(i*1000 + k), + Loads: []float64{10, 20, 30}, + HotDegree: 10, + AntiCount: utils.HotRegionAntiCount, // for write + } + tc.hotStat.HotCache.Update(item, utils.Write) + } + } + controller := co.GetSchedulersController() + for i := 0; i < 1000; i++ { + co.CollectHotSpotMetrics() + controller.CollectSchedulerMetrics() + co.GetCluster().(*RaftCluster).collectStatisticsMetrics() + } + stores := co.GetCluster().GetStores() + regionStats := co.GetCluster().RegionWriteStats() + status1 := statistics.CollectHotPeerInfos(stores, regionStats) + status2 := statistics.GetHotStatus(stores, co.GetCluster().GetStoresLoads(), regionStats, utils.Write, co.GetCluster().GetSchedulerConfig().IsTraceRegionFlow()) + for _, s := range status2.AsLeader { + s.Stats = nil + } + for _, s := range status2.AsPeer { + s.Stats = nil + } + re.Equal(status1, status2) + co.ResetHotSpotMetrics() + controller.ResetSchedulerMetrics() + co.GetCluster().(*RaftCluster).resetStatisticsMetrics() +} + +func prepare(setCfg func(*sc.ScheduleConfig), setTc func(*testCluster), run func(*schedule.Coordinator), re *require.Assertions) (*testCluster, *schedule.Coordinator, func()) { + ctx, cancel := context.WithCancel(context.Background()) + cfg, opt, err := newTestScheduleConfig() + re.NoError(err) + if setCfg != nil { + setCfg(cfg) + } + tc := newTestCluster(ctx, opt) + hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */) + if setTc != nil { + setTc(tc) + } + co := schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + if run != nil { + run(co) + } + return tc, co, func() { + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + hbStreams.Close() + cancel() + } +} + +func checkRegionAndOperator(re *require.Assertions, tc *testCluster, co *schedule.Coordinator, regionID uint64, expectAddOperator int) { + ops := co.GetCheckerController().CheckRegion(tc.GetRegion(regionID)) + if ops == nil { + re.Equal(0, expectAddOperator) + } else { + re.Equal(expectAddOperator, co.GetOperatorController().AddWaitingOperator(ops...)) + } +} + +func TestCheckRegion(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(nil, nil, nil, re) + hbStreams, opt := co.GetHeartbeatStreams(), tc.opt + defer cleanup() + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addLeaderRegion(1, 2, 3)) + checkRegionAndOperator(re, tc, co, 1, 1) + operatorutil.CheckAddPeer(re, co.GetOperatorController().GetOperator(1), operator.OpReplica, 1) + checkRegionAndOperator(re, tc, co, 1, 0) + + r := tc.GetRegion(1) + p := &metapb.Peer{Id: 1, StoreId: 1, Role: metapb.PeerRole_Learner} + r = r.Clone( + core.WithAddPeer(p), + core.WithPendingPeers(append(r.GetPendingPeers(), p)), + ) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 0) + + tc = newTestCluster(ctx, opt) + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 0) + r = r.Clone(core.WithPendingPeers(nil)) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 1) + op := co.GetOperatorController().GetOperator(1) + re.Equal(1, op.Len()) + re.Equal(uint64(1), op.Step(0).(operator.PromoteLearner).ToStore) + checkRegionAndOperator(re, tc, co, 1, 0) +} + +func TestCheckRegionWithScheduleDeny(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addLeaderRegion(1, 2, 3)) + region := tc.GetRegion(1) + re.NotNil(region) + // test with label schedule=deny + labelerManager := tc.GetRegionLabeler() + labelerManager.SetLabelRule(&labeler.LabelRule{ + ID: "schedulelabel", + Labels: []labeler.RegionLabel{{Key: "schedule", Value: "deny"}}, + RuleType: labeler.KeyRange, + Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}}, + }) + + // should allow to do rule checker + re.True(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 1, 1) + + // should not allow to merge + tc.opt.SetSplitMergeInterval(time.Duration(0)) + re.NoError(tc.addLeaderRegion(2, 2, 3, 4)) + re.NoError(tc.addLeaderRegion(3, 2, 3, 4)) + region = tc.GetRegion(2) + re.True(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 2, 0) + // delete label rule, should allow to do merge + labelerManager.DeleteLabelRule("schedulelabel") + re.False(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 2, 2) +} + +func TestCheckerIsBusy(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + cfg.ReplicaScheduleLimit = 0 // ensure replica checker is busy + cfg.MergeScheduleLimit = 10 + }, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 0)) + num := 1 + typeutil.MaxUint64(tc.opt.GetReplicaScheduleLimit(), tc.opt.GetMergeScheduleLimit()) + var operatorKinds = []operator.OpKind{ + operator.OpReplica, operator.OpRegion | operator.OpMerge, + } + for i, operatorKind := range operatorKinds { + for j := uint64(0); j < num; j++ { + regionID := j + uint64(i+1)*num + re.NoError(tc.addLeaderRegion(regionID, 1)) + switch operatorKind { + case operator.OpReplica: + op := newTestOperator(regionID, tc.GetRegion(regionID).GetRegionEpoch(), operatorKind) + re.Equal(1, co.GetOperatorController().AddWaitingOperator(op)) + case operator.OpRegion | operator.OpMerge: + if regionID%2 == 1 { + ops, err := operator.CreateMergeRegionOperator("merge-region", co.GetCluster(), tc.GetRegion(regionID), tc.GetRegion(regionID-1), operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + } + } + } + } + checkRegionAndOperator(re, tc, co, num, 0) +} + +func TestMergeRegionCancelOneOperator(t *testing.T) { + re := require.New(t) + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + source := core.NewRegionInfo( + &metapb.Region{ + Id: 1, + StartKey: []byte(""), + EndKey: []byte("a"), + }, + nil, + ) + target := core.NewRegionInfo( + &metapb.Region{ + Id: 2, + StartKey: []byte("a"), + EndKey: []byte("t"), + }, + nil, + ) + re.NoError(tc.putRegion(source)) + re.NoError(tc.putRegion(target)) + + // Cancel source region. + ops, err := operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + // Cancel source operator. + co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(source.GetID())) + re.Len(co.GetOperatorController().GetOperators(), 0) + + // Cancel target region. + ops, err = operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + // Cancel target operator. + co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(target.GetID())) + re.Len(co.GetOperatorController().GetOperators(), 0) +} + +func TestReplica(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + // Turn off balance. + cfg.LeaderScheduleLimit = 0 + cfg.RegionScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(4, 4)) + + stream := mockhbstream.NewHeartbeatStream() + + // Add peer to store 1. + re.NoError(tc.addLeaderRegion(1, 2, 3)) + region := tc.GetRegion(1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Peer in store 3 is down, remove peer in store 3 and add peer to store 4. + re.NoError(tc.setStoreDown(3)) + downPeer := &pdpb.PeerStats{ + Peer: region.GetStorePeer(3), + DownSeconds: 24 * 60 * 60, + } + region = region.Clone( + core.WithDownPeers(append(region.GetDownPeers(), downPeer)), + ) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 4) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 4) + region = region.Clone(core.WithDownPeers(nil)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Remove peer from store 3. + re.NoError(tc.addLeaderRegion(2, 1, 2, 3, 4)) + region = tc.GetRegion(2) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitRemovePeer(re, stream, region, 3) // store3 is down, we should remove it firstly. + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Remove offline peer directly when it's pending. + re.NoError(tc.addLeaderRegion(3, 1, 2, 3)) + re.NoError(tc.setStoreOffline(3)) + region = tc.GetRegion(3) + region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetStorePeer(3)})) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func TestCheckCache(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + // Turn off replica scheduling. + cfg.ReplicaScheduleLimit = 0 + }, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 0)) + re.NoError(tc.addRegionStore(2, 0)) + re.NoError(tc.addRegionStore(3, 0)) + + // Add a peer with two replicas. + re.NoError(tc.addLeaderRegion(1, 2, 3)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/break-patrol", `return`)) + + // case 1: operator cannot be created due to replica-schedule-limit restriction + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(co.GetCheckerController().GetWaitingRegions(), 1) + + // cancel the replica-schedule-limit restriction + cfg := tc.GetScheduleConfig() + cfg.ReplicaScheduleLimit = 10 + tc.SetScheduleConfig(cfg) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + oc := co.GetOperatorController() + re.Len(oc.GetOperators(), 1) + re.Empty(co.GetCheckerController().GetWaitingRegions()) + + // case 2: operator cannot be created due to store limit restriction + oc.RemoveOperator(oc.GetOperator(1)) + tc.SetStoreLimit(1, storelimit.AddPeer, 0) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(co.GetCheckerController().GetWaitingRegions(), 1) + + // cancel the store limit restriction + tc.SetStoreLimit(1, storelimit.AddPeer, 10) + time.Sleep(time.Second) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(oc.GetOperators(), 1) + re.Empty(co.GetCheckerController().GetWaitingRegions()) + + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/break-patrol")) +} + +func TestPeerState(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + // Transfer peer from store 4 to store 1. + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addRegionStore(2, 10)) + re.NoError(tc.addRegionStore(3, 10)) + re.NoError(tc.addRegionStore(4, 40)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + + stream := mockhbstream.NewHeartbeatStream() + + // Wait for schedule. + waitOperator(re, co, 1) + operatorutil.CheckTransferPeer(re, co.GetOperatorController().GetOperator(1), operator.OpKind(0), 4, 1) + + region := tc.GetRegion(1).Clone() + + // Add new peer. + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + + // If the new peer is pending, the operator will not finish. + region = region.Clone(core.WithPendingPeers(append(region.GetPendingPeers(), region.GetStorePeer(1)))) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + re.NotNil(co.GetOperatorController().GetOperator(region.GetID())) + + // The new peer is not pending now, the operator will finish. + // And we will proceed to remove peer in store 4. + region = region.Clone(core.WithPendingPeers(nil)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitRemovePeer(re, stream, region, 4) + re.NoError(tc.addLeaderRegion(1, 1, 2, 3)) + region = tc.GetRegion(1).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func TestShouldRun(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + tc.RaftCluster.coordinator = co + defer cleanup() + + re.NoError(tc.addLeaderStore(1, 5)) + re.NoError(tc.addLeaderStore(2, 2)) + re.NoError(tc.addLeaderStore(3, 0)) + re.NoError(tc.addLeaderStore(4, 0)) + re.NoError(tc.LoadRegion(1, 1, 2, 3)) + re.NoError(tc.LoadRegion(2, 1, 2, 3)) + re.NoError(tc.LoadRegion(3, 1, 2, 3)) + re.NoError(tc.LoadRegion(4, 1, 2, 3)) + re.NoError(tc.LoadRegion(5, 1, 2, 3)) + re.NoError(tc.LoadRegion(6, 2, 1, 4)) + re.NoError(tc.LoadRegion(7, 2, 1, 4)) + re.False(co.ShouldRun()) + re.Equal(2, tc.GetStoreRegionCount(4)) + + testCases := []struct { + regionID uint64 + ShouldRun bool + }{ + {1, false}, + {2, false}, + {3, false}, + {4, false}, + {5, false}, + // store4 needs Collect two region + {6, false}, + {7, true}, + } + + for _, testCase := range testCases { + r := tc.GetRegion(testCase.regionID) + nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) + re.NoError(tc.processRegionHeartbeat(nr)) + re.Equal(testCase.ShouldRun, co.ShouldRun()) + } + nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} + newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) + re.Error(tc.processRegionHeartbeat(newRegion)) + re.Equal(7, tc.core.GetClusterNotFromStorageRegionsCnt()) +} + +func TestShouldRunWithNonLeaderRegions(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + tc.RaftCluster.coordinator = co + defer cleanup() + + re.NoError(tc.addLeaderStore(1, 10)) + re.NoError(tc.addLeaderStore(2, 0)) + re.NoError(tc.addLeaderStore(3, 0)) + for i := 0; i < 10; i++ { + re.NoError(tc.LoadRegion(uint64(i+1), 1, 2, 3)) + } + re.False(co.ShouldRun()) + re.Equal(10, tc.GetStoreRegionCount(1)) + + testCases := []struct { + regionID uint64 + ShouldRun bool + }{ + {1, false}, + {2, false}, + {3, false}, + {4, false}, + {5, false}, + {6, false}, + {7, false}, + {8, false}, + {9, true}, + } + + for _, testCase := range testCases { + r := tc.GetRegion(testCase.regionID) + nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) + re.NoError(tc.processRegionHeartbeat(nr)) + re.Equal(testCase.ShouldRun, co.ShouldRun()) + } + nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} + newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) + re.Error(tc.processRegionHeartbeat(newRegion)) + re.Equal(9, tc.core.GetClusterNotFromStorageRegionsCnt()) + + // Now, after server is prepared, there exist some regions with no leader. + re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId()) +} + +func TestAddScheduler(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + controller := co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), len(sc.DefaultSchedulers)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceWitnessName)) + re.NoError(controller.RemoveScheduler(schedulers.TransferWitnessLeaderName)) + re.Empty(controller.GetSchedulerNames()) + + stream := mockhbstream.NewHeartbeatStream() + + // Add stores 1,2,3 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + re.NoError(tc.addLeaderStore(3, 1)) + // Add regions 1 with leader in store 1 and followers in stores 2,3 + re.NoError(tc.addLeaderRegion(1, 1, 2, 3)) + // Add regions 2 with leader in store 2 and followers in stores 1,3 + re.NoError(tc.addLeaderRegion(2, 2, 1, 3)) + // Add regions 3 with leader in store 3 and followers in stores 1,2 + re.NoError(tc.addLeaderRegion(3, 3, 1, 2)) + + oc := co.GetOperatorController() + + // test ConfigJSONDecoder create + bl, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) + re.NoError(err) + conf, err := bl.EncodeConfig() + re.NoError(err) + data := make(map[string]interface{}) + err = json.Unmarshal(conf, &data) + re.NoError(err) + batch := data["batch"].(float64) + re.Equal(4, int(batch)) + gls, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"0"}), controller.RemoveScheduler) + re.NoError(err) + re.NotNil(controller.AddScheduler(gls)) + re.NotNil(controller.RemoveScheduler(gls.GetName())) + + gls, err = schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(gls)) + + hb, err := schedulers.CreateScheduler(schedulers.HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) + re.NoError(err) + conf, err = hb.EncodeConfig() + re.NoError(err) + data = make(map[string]interface{}) + re.NoError(json.Unmarshal(conf, &data)) + re.Contains(data, "enable-for-tiflash") + re.Equal("true", data["enable-for-tiflash"].(string)) + + // Transfer all leaders to store 1. + waitOperator(re, co, 2) + region2 := tc.GetRegion(2) + re.NoError(dispatchHeartbeat(co, region2, stream)) + region2 = waitTransferLeader(re, stream, region2, 1) + re.NoError(dispatchHeartbeat(co, region2, stream)) + waitNoResponse(re, stream) + + waitOperator(re, co, 3) + region3 := tc.GetRegion(3) + re.NoError(dispatchHeartbeat(co, region3, stream)) + region3 = waitTransferLeader(re, stream, region3, 1) + re.NoError(dispatchHeartbeat(co, region3, stream)) + waitNoResponse(re, stream) +} + +func TestPersistScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + defaultCount := len(sc.DefaultSchedulers) + // Add stores 1,2 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + + controller := co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), defaultCount) + oc := co.GetOperatorController() + storage := tc.RaftCluster.storage + + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(gls1, "1")) + evict, err := schedulers.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(evict, "2")) + re.Len(controller.GetSchedulerNames(), defaultCount+2) + sches, _, err := storage.LoadAllSchedulerConfigs() + re.NoError(err) + re.Len(sches, defaultCount+2) + + // remove 5 schedulers + re.NoError(controller.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceWitnessName)) + re.NoError(controller.RemoveScheduler(schedulers.TransferWitnessLeaderName)) + re.Len(controller.GetSchedulerNames(), defaultCount-3) + re.NoError(co.GetCluster().GetSchedulerConfig().Persist(storage)) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + // make a new coordinator for testing + // whether the schedulers added or removed in dynamic way are recorded in opt + _, newOpt, err := newTestScheduleConfig() + re.NoError(err) + shuffle, err := schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) + re.NoError(err) + re.NoError(controller.AddScheduler(shuffle)) + // suppose we add a new default enable scheduler + sc.DefaultSchedulers = append(sc.DefaultSchedulers, sc.SchedulerConfig{Type: "shuffle-region"}) + defer func() { + sc.DefaultSchedulers = sc.DefaultSchedulers[:len(sc.DefaultSchedulers)-1] + }() + re.Len(newOpt.GetSchedulers(), defaultCount) + re.NoError(newOpt.Reload(storage)) + // only remains 3 items with independent config. + sches, _, err = storage.LoadAllSchedulerConfigs() + re.NoError(err) + re.Len(sches, 3) + + // option have 6 items because the default scheduler do not remove. + re.Len(newOpt.GetSchedulers(), defaultCount+3) + re.NoError(newOpt.Persist(storage)) + tc.RaftCluster.opt = newOpt + + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + controller = co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), 3) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + // suppose restart PD again + _, newOpt, err = newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(storage)) + tc.RaftCluster.opt = newOpt + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + controller = co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), 3) + bls, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + re.NoError(controller.AddScheduler(bls)) + brs, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + re.NoError(controller.AddScheduler(brs)) + re.Len(controller.GetSchedulerNames(), defaultCount) + + // the scheduler option should contain 6 items + // the `hot scheduler` are disabled + re.Len(co.GetCluster().GetSchedulerConfig().(*config.PersistOptions).GetSchedulers(), defaultCount+3) + re.NoError(controller.RemoveScheduler(schedulers.GrantLeaderName)) + // the scheduler that is not enable by default will be completely deleted + re.Len(co.GetCluster().GetSchedulerConfig().(*config.PersistOptions).GetSchedulers(), defaultCount+2) + re.Len(controller.GetSchedulerNames(), 4) + re.NoError(co.GetCluster().GetSchedulerConfig().Persist(co.GetCluster().GetStorage())) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + _, newOpt, err = newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(co.GetCluster().GetStorage())) + tc.RaftCluster.opt = newOpt + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + + co.Run() + controller = co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), defaultCount-1) + re.NoError(controller.RemoveScheduler(schedulers.EvictLeaderName)) + re.Len(controller.GetSchedulerNames(), defaultCount-2) +} + +func TestRemoveScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + cfg.ReplicaScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + + // Add stores 1,2 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + defaultCount := len(sc.DefaultSchedulers) + controller := co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), defaultCount) + oc := co.GetOperatorController() + storage := tc.RaftCluster.storage + + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(gls1, "1")) + re.Len(controller.GetSchedulerNames(), defaultCount+1) + sches, _, err := storage.LoadAllSchedulerConfigs() + re.NoError(err) + re.Len(sches, defaultCount+1) + + // remove all schedulers + re.NoError(controller.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.GrantLeaderName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceWitnessName)) + re.NoError(controller.RemoveScheduler(schedulers.TransferWitnessLeaderName)) + // all removed + sches, _, err = storage.LoadAllSchedulerConfigs() + re.NoError(err) + re.Empty(sches) + re.Empty(controller.GetSchedulerNames()) + re.NoError(co.GetCluster().GetSchedulerConfig().Persist(co.GetCluster().GetStorage())) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + + // suppose restart PD again + _, newOpt, err := newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(tc.storage)) + tc.RaftCluster.opt = newOpt + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + re.Empty(controller.GetSchedulerNames()) + // the option remains default scheduler + re.Len(co.GetCluster().GetSchedulerConfig().(*config.PersistOptions).GetSchedulers(), defaultCount) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() +} + +func TestRestart(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + // Turn off balance, we test add replica only. + cfg.LeaderScheduleLimit = 0 + cfg.RegionScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + + // Add 3 stores (1, 2, 3) and a region with 1 replica on store 1. + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addLeaderRegion(1, 1)) + region := tc.GetRegion(1) + + // Add 1 replica on store 2. + stream := mockhbstream.NewHeartbeatStream() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 2) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 2) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + + // Recreate coordinator then add another replica on store 3. + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 3) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitPromoteLearner(re, stream, region, 3) +} + +func TestPauseScheduler(t *testing.T) { + re := require.New(t) + + _, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + controller := co.GetSchedulersController() + _, err := controller.IsSchedulerAllowed("test") + re.Error(err) + controller.PauseOrResumeScheduler(schedulers.BalanceLeaderName, 60) + paused, _ := controller.IsSchedulerPaused(schedulers.BalanceLeaderName) + re.True(paused) + pausedAt, err := controller.GetPausedSchedulerDelayAt(schedulers.BalanceLeaderName) + re.NoError(err) + resumeAt, err := controller.GetPausedSchedulerDelayUntil(schedulers.BalanceLeaderName) + re.NoError(err) + re.Equal(int64(60), resumeAt-pausedAt) + allowed, _ := controller.IsSchedulerAllowed(schedulers.BalanceLeaderName) + re.False(allowed) +} + +func BenchmarkPatrolRegion(b *testing.B) { + re := require.New(b) + + mergeLimit := uint64(4100) + regionNum := 10000 + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + cfg.MergeScheduleLimit = mergeLimit + }, nil, nil, re) + defer cleanup() + + tc.opt.SetSplitMergeInterval(time.Duration(0)) + for i := 1; i < 4; i++ { + if err := tc.addRegionStore(uint64(i), regionNum, 96); err != nil { + return + } + } + for i := 0; i < regionNum; i++ { + if err := tc.addLeaderRegion(uint64(i), 1, 2, 3); err != nil { + return + } + } + + listen := make(chan int) + go func() { + oc := co.GetOperatorController() + listen <- 0 + for { + if oc.OperatorCount(operator.OpMerge) == mergeLimit { + co.Stop() + return + } + } + }() + <-listen + + co.GetWaitGroup().Add(1) + b.ResetTimer() + co.PatrolRegions() +} + +func waitOperator(re *require.Assertions, co *schedule.Coordinator, regionID uint64) { + testutil.Eventually(re, func() bool { + return co.GetOperatorController().GetOperator(regionID) != nil + }) +} + +func TestOperatorCount(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + re.Equal(uint64(0), oc.OperatorCount(operator.OpLeader)) + re.Equal(uint64(0), oc.OperatorCount(operator.OpRegion)) + + re.NoError(tc.addLeaderRegion(1, 1)) + re.NoError(tc.addLeaderRegion(2, 2)) + { + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) // 1:leader + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op2) + re.Equal(uint64(2), oc.OperatorCount(operator.OpLeader)) // 1:leader, 2:leader + re.True(oc.RemoveOperator(op1)) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) // 2:leader + } + + { + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpRegion)) // 1:region 2:leader + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion) + op2.SetPriorityLevel(constant.High) + oc.AddWaitingOperator(op2) + re.Equal(uint64(2), oc.OperatorCount(operator.OpRegion)) // 1:region 2:region + re.Equal(uint64(0), oc.OperatorCount(operator.OpLeader)) + } +} + +func TestStoreOverloaded(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + opt := tc.GetOpts() + re.NoError(tc.addRegionStore(4, 100)) + re.NoError(tc.addRegionStore(3, 100)) + re.NoError(tc.addRegionStore(2, 100)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + region := tc.GetRegion(1).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + start := time.Now() + { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Len(ops, 1) + op1 := ops[0] + re.NotNil(op1) + re.True(oc.AddOperator(op1)) + re.True(oc.RemoveOperator(op1)) + } + for { + time.Sleep(time.Millisecond * 10) + ops, _ := lb.Schedule(tc, false /* dryRun */) + if time.Since(start) > time.Second { + break + } + re.Empty(ops) + } + + // reset all stores' limit + // scheduling one time needs 1/10 seconds + opt.SetAllStoresLimit(storelimit.AddPeer, 600) + opt.SetAllStoresLimit(storelimit.RemovePeer, 600) + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Len(ops, 1) + op := ops[0] + re.True(oc.AddOperator(op)) + re.True(oc.RemoveOperator(op)) + } + // sleep 1 seconds to make sure that the token is filled up + time.Sleep(time.Second) + for i := 0; i < 100; i++ { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Greater(len(ops), 0) + } +} + +func TestStoreOverloadedWithReplace(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + + re.NoError(tc.addRegionStore(4, 100)) + re.NoError(tc.addRegionStore(3, 100)) + re.NoError(tc.addRegionStore(2, 100)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + re.NoError(tc.addLeaderRegion(2, 1, 3, 4)) + region := tc.GetRegion(1).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + region = tc.GetRegion(2).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 1}) + re.True(oc.AddOperator(op1)) + op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 2}) + op2.SetPriorityLevel(constant.High) + re.True(oc.AddOperator(op2)) + op3 := newTestOperator(1, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 3}) + re.False(oc.AddOperator(op3)) + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Empty(ops) + // sleep 2 seconds to make sure that token is filled up + time.Sleep(2 * time.Second) + ops, _ = lb.Schedule(tc, false /* dryRun */) + re.Greater(len(ops), 0) +} + +func TestDownStoreLimit(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + rc := co.GetCheckerController().GetRuleChecker() + + tc.addRegionStore(1, 100) + tc.addRegionStore(2, 100) + tc.addRegionStore(3, 100) + tc.addLeaderRegion(1, 1, 2, 3) + + region := tc.GetRegion(1) + tc.setStoreDown(1) + tc.SetStoreLimit(1, storelimit.RemovePeer, 1) + + region = region.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: region.GetStorePeer(1), + DownSeconds: 24 * 60 * 60, + }, + }), core.SetApproximateSize(1)) + tc.putRegion(region) + for i := uint64(1); i < 20; i++ { + tc.addRegionStore(i+3, 100) + op := rc.Check(region) + re.NotNil(op) + re.True(oc.AddOperator(op)) + oc.RemoveOperator(op) + } + + region = region.Clone(core.SetApproximateSize(100)) + tc.putRegion(region) + for i := uint64(20); i < 25; i++ { + tc.addRegionStore(i+3, 100) + op := rc.Check(region) + re.NotNil(op) + re.True(oc.AddOperator(op)) + oc.RemoveOperator(op) + } +} + +// FIXME: remove after move into schedulers package +type mockLimitScheduler struct { + schedulers.Scheduler + limit uint64 + counter *operator.Controller + kind operator.OpKind +} + +func (s *mockLimitScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + return s.counter.OperatorCount(s.kind) < s.limit +} + +func TestController(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + + re.NoError(tc.addLeaderRegion(1, 1)) + re.NoError(tc.addLeaderRegion(2, 2)) + scheduler, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + lb := &mockLimitScheduler{ + Scheduler: scheduler, + counter: oc, + kind: operator.OpLeader, + } + + sc := schedulers.NewScheduleController(tc.ctx, co.GetCluster(), co.GetOperatorController(), lb) + + for i := schedulers.MinScheduleInterval; sc.GetInterval() != schedulers.MaxScheduleInterval; i = sc.GetNextInterval(i) { + re.Equal(i, sc.GetInterval()) + re.Empty(sc.Schedule(false)) + } + // limit = 2 + lb.limit = 2 + // count = 0 + { + re.True(sc.AllowSchedule(false)) + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op1)) + // count = 1 + re.True(sc.AllowSchedule(false)) + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op2)) + // count = 2 + re.False(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op1)) + // count = 1 + re.True(sc.AllowSchedule(false)) + } + + op11 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + // add a PriorityKind operator will remove old operator + { + op3 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpHotRegion) + op3.SetPriorityLevel(constant.High) + re.Equal(1, oc.AddWaitingOperator(op11)) + re.False(sc.AllowSchedule(false)) + re.Equal(1, oc.AddWaitingOperator(op3)) + re.True(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op3)) + } + + // add a admin operator will remove old operator + { + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op2)) + re.False(sc.AllowSchedule(false)) + op4 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpAdmin) + op4.SetPriorityLevel(constant.High) + re.Equal(1, oc.AddWaitingOperator(op4)) + re.True(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op4)) + } + + // test wrong region id. + { + op5 := newTestOperator(3, &metapb.RegionEpoch{}, operator.OpHotRegion) + re.Equal(0, oc.AddWaitingOperator(op5)) + } + + // test wrong region epoch. + re.True(oc.RemoveOperator(op11)) + epoch := &metapb.RegionEpoch{ + Version: tc.GetRegion(1).GetRegionEpoch().GetVersion() + 1, + ConfVer: tc.GetRegion(1).GetRegionEpoch().GetConfVer(), + } + { + op6 := newTestOperator(1, epoch, operator.OpLeader) + re.Equal(0, oc.AddWaitingOperator(op6)) + } + epoch.Version-- + { + op6 := newTestOperator(1, epoch, operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op6)) + re.True(oc.RemoveOperator(op6)) + } +} + +func TestInterval(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + lb, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, co.GetOperatorController(), storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + sc := schedulers.NewScheduleController(tc.ctx, co.GetCluster(), co.GetOperatorController(), lb) + + // If no operator for x seconds, the next check should be in x/2 seconds. + idleSeconds := []int{5, 10, 20, 30, 60} + for _, n := range idleSeconds { + sc.SetInterval(schedulers.MinScheduleInterval) + for totalSleep := time.Duration(0); totalSleep <= time.Second*time.Duration(n); totalSleep += sc.GetInterval() { + re.Empty(sc.Schedule(false)) + } + re.Less(sc.GetInterval(), time.Second*time.Duration(n/2)) + } +} + +func waitAddLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddLearnerNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + return region.Clone( + core.WithAddPeer(res.GetChangePeer().GetPeer()), + core.WithIncConfVer(), + ) +} + +func waitPromoteLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + // Remove learner than add voter. + return region.Clone( + core.WithRemoveStorePeer(storeID), + core.WithAddPeer(res.GetChangePeer().GetPeer()), + ) +} + +func waitRemovePeer(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_RemoveNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + return region.Clone( + core.WithRemoveStorePeer(storeID), + core.WithIncConfVer(), + ) +} + +func waitTransferLeader(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + if res.GetRegionId() == region.GetID() { + for _, peer := range append(res.GetTransferLeader().GetPeers(), res.GetTransferLeader().GetPeer()) { + if peer.GetStoreId() == storeID { + return true + } + } + } + } + return false + }) + return region.Clone( + core.WithLeader(region.GetStorePeer(storeID)), + ) +} + +func waitNoResponse(re *require.Assertions, stream mockhbstream.HeartbeatStream) { + testutil.Eventually(re, func() bool { + res := stream.Recv() + return res == nil + }) +} +>>>>>>> 86831ce71 (prepare_check: remove redundant check (#7217)) diff --git a/server/cluster/prepare_checker.go b/server/cluster/prepare_checker.go index 20652bb435fb..8832afeabd06 100644 --- a/server/cluster/prepare_checker.go +++ b/server/cluster/prepare_checker.go @@ -25,16 +25,13 @@ import ( type prepareChecker struct { syncutil.RWMutex - reactiveRegions map[uint64]int - start time.Time - sum int - prepared bool + start time.Time + prepared bool } func newPrepareChecker() *prepareChecker { return &prepareChecker{ - start: time.Now(), - reactiveRegions: make(map[uint64]int), + start: time.Now(), } } @@ -50,6 +47,7 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool { return true } notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt() +<<<<<<< HEAD:server/cluster/prepare_checker.go totalRegionsCnt := c.GetRegionCount() if float64(notLoadedFromRegionsCnt) > float64(totalRegionsCnt)*collectFactor { log.Info("meta not loaded from region number is satisfied, finish prepare checker", @@ -57,8 +55,11 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool { checker.prepared = true return true } +======= + totalRegionsCnt := c.GetTotalRegionCount() +>>>>>>> 86831ce71 (prepare_check: remove redundant check (#7217)):pkg/schedule/prepare_checker.go // The number of active regions should be more than total region of all stores * collectFactor - if float64(totalRegionsCnt)*collectFactor > float64(checker.sum) { + if float64(totalRegionsCnt)*collectFactor > float64(notLoadedFromRegionsCnt) { return false } for _, store := range c.GetStores() { @@ -67,14 +68,16 @@ func (checker *prepareChecker) check(c *core.BasicCluster) bool { } storeID := store.GetID() // For each store, the number of active regions should be more than total region of the store * collectFactor - if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) { + if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) { return false } } + log.Info("not loaded from storage region number is satisfied, finish prepare checker", zap.Int("not-from-storage-region", notLoadedFromRegionsCnt), zap.Int("total-region", totalRegionsCnt)) checker.prepared = true return true } +<<<<<<< HEAD:server/cluster/prepare_checker.go func (checker *prepareChecker) collect(region *core.RegionInfo) { checker.Lock() defer checker.Unlock() @@ -85,7 +88,20 @@ func (checker *prepareChecker) collect(region *core.RegionInfo) { } func (checker *prepareChecker) isPrepared() bool { +======= +func (checker *prepareChecker) IsPrepared() bool { +>>>>>>> 86831ce71 (prepare_check: remove redundant check (#7217)):pkg/schedule/prepare_checker.go checker.RLock() defer checker.RUnlock() return checker.prepared } +<<<<<<< HEAD:server/cluster/prepare_checker.go +======= + +// for test purpose +func (checker *prepareChecker) SetPrepared() { + checker.Lock() + defer checker.Unlock() + checker.prepared = true +} +>>>>>>> 86831ce71 (prepare_check: remove redundant check (#7217)):pkg/schedule/prepare_checker.go diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index aced6750a11f..39ae672a9f31 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -480,5 +480,129 @@ func TestScheduler(t *testing.T) { cfg.Schedulers = origin err = leaderServer.GetServer().SetScheduleConfig(*cfg) re.NoError(err) +<<<<<<< HEAD checkSchedulerWithStatusCommand(nil, "disabled", nil) +======= + checkSchedulerWithStatusCommand("disabled", nil) +} + +func (suite *schedulerTestSuite) TestSchedulerDiagnostic() { + env := tests.NewSchedulingTestEnvironment(suite.T()) + env.RunTestInTwoModes(suite.checkSchedulerDiagnostic) +} + +func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestCluster) { + re := suite.Require() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := pdctlCmd.GetRootCmd() + + checkSchedulerDescribeCommand := func(schedulerName, expectedStatus, expectedSummary string) { + result := make(map[string]interface{}) + testutil.Eventually(re, func() bool { + mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result) + return len(result) != 0 && expectedStatus == result["status"] && expectedSummary == result["summary"] + }, testutil.WithTickInterval(50*time.Millisecond)) + } + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } + + // note: because pdqsort is an unstable sort algorithm, set ApproximateSize for this region. + tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10)) + + echo := mustExec(re, cmd, []string{"-u", pdAddr, "config", "set", "enable-diagnostic", "true"}, nil) + re.Contains(echo, "Success!") + checkSchedulerDescribeCommand("balance-region-scheduler", "pending", "1 store(s) RegionNotMatchRule; ") + + // scheduler delete command + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "") + + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "") +} + +func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v interface{}) string { + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + if v == nil { + return string(output) + } + re.NoError(json.Unmarshal(output, v), string(output)) + return "" +} + +func mightExec(re *require.Assertions, cmd *cobra.Command, args []string, v interface{}) { + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + if v == nil { + return + } + json.Unmarshal(output, v) +} + +func TestForwardSchedulerRequest(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestAPICluster(ctx, 1) + re.NoError(err) + re.NoError(cluster.RunInitialServers()) + re.NotEmpty(cluster.WaitLeader()) + server := cluster.GetLeaderServer() + re.NoError(server.BootstrapCluster()) + backendEndpoints := server.GetAddr() + tc, err := tests.NewTestSchedulingCluster(ctx, 2, backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + cmd := pdctlCmd.GetRootCmd() + args := []string{"-u", backendEndpoints, "scheduler", "show"} + var slice []string + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &slice)) + re.Contains(slice, "balance-leader-scheduler") + + mustUsage := func(args []string) { + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "Usage") + } + mustUsage([]string{"-u", backendEndpoints, "scheduler", "pause", "balance-leader-scheduler"}) + mustExec(re, cmd, []string{"-u", backendEndpoints, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + checkSchedulerWithStatusCommand := func(status string, expected []string) { + var schedulers []string + mustExec(re, cmd, []string{"-u", backendEndpoints, "scheduler", "show", "--status", status}, &schedulers) + re.Equal(expected, schedulers) + } + checkSchedulerWithStatusCommand("paused", []string{ + "balance-leader-scheduler", + }) +>>>>>>> 86831ce71 (prepare_check: remove redundant check (#7217)) } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 9ef64ac91fb6..6e79995008f6 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -1409,7 +1409,7 @@ func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id. StartKey: []byte{byte(i)}, EndKey: []byte{byte(i + 1)}, } - rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) + rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0], core.SetSource(core.Heartbeat))) } time.Sleep(50 * time.Millisecond) diff --git a/tests/testutil.go b/tests/testutil.go new file mode 100644 index 000000000000..059a152f06f5 --- /dev/null +++ b/tests/testutil.go @@ -0,0 +1,316 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tests + +import ( + "context" + "fmt" + "os" + "sync" + "testing" + "time" + + "github.com/docker/go-units" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/stretchr/testify/require" + bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/core" + rm "github.com/tikv/pd/pkg/mcs/resourcemanager/server" + scheduling "github.com/tikv/pd/pkg/mcs/scheduling/server" + sc "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + tso "github.com/tikv/pd/pkg/mcs/tso/server" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/versioninfo" + "github.com/tikv/pd/server" + "go.uber.org/zap" +) + +var once sync.Once + +// InitLogger initializes the logger for test. +func InitLogger(logConfig log.Config, logger *zap.Logger, logProps *log.ZapProperties, isRedactInfoLogEnabled bool) (err error) { + once.Do(func() { + // Setup the logger. + err = logutil.SetupLogger(logConfig, &logger, &logProps, isRedactInfoLogEnabled) + if err != nil { + return + } + log.ReplaceGlobals(logger, logProps) + // Flushing any buffered log entries. + log.Sync() + }) + return err +} + +// StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing. +func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) { + cfg := rm.NewConfig() + cfg.BackendEndpoints = backendEndpoints + cfg.ListenAddr = listenAddrs + cfg, err := rm.GenerateConfig(cfg) + re.NoError(err) + + s, cleanup, err := rm.NewTestServer(ctx, re, cfg) + re.NoError(err) + testutil.Eventually(re, func() bool { + return !s.IsClosed() + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + return s, cleanup +} + +// StartSingleTSOTestServerWithoutCheck creates and starts a tso server with default config for testing. +func StartSingleTSOTestServerWithoutCheck(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func(), error) { + cfg := tso.NewConfig() + cfg.BackendEndpoints = backendEndpoints + cfg.ListenAddr = listenAddrs + cfg, err := tso.GenerateConfig(cfg) + re.NoError(err) + // Setup the logger. + err = InitLogger(cfg.Log, cfg.Logger, cfg.LogProps, cfg.Security.RedactInfoLog) + re.NoError(err) + return NewTSOTestServer(ctx, cfg) +} + +// StartSingleTSOTestServer creates and starts a tso server with default config for testing. +func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func()) { + s, cleanup, err := StartSingleTSOTestServerWithoutCheck(ctx, re, backendEndpoints, listenAddrs) + re.NoError(err) + testutil.Eventually(re, func() bool { + return !s.IsClosed() + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + return s, cleanup +} + +// NewTSOTestServer creates a tso server with given config for testing. +func NewTSOTestServer(ctx context.Context, cfg *tso.Config) (*tso.Server, testutil.CleanupFunc, error) { + s := tso.CreateServer(ctx, cfg) + if err := s.Run(); err != nil { + return nil, nil, err + } + cleanup := func() { + s.Close() + os.RemoveAll(cfg.DataDir) + } + return s, cleanup, nil +} + +// StartSingleSchedulingTestServer creates and starts a scheduling server with default config for testing. +func StartSingleSchedulingTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*scheduling.Server, func()) { + cfg := sc.NewConfig() + cfg.BackendEndpoints = backendEndpoints + cfg.ListenAddr = listenAddrs + cfg, err := scheduling.GenerateConfig(cfg) + re.NoError(err) + + s, cleanup, err := scheduling.NewTestServer(ctx, re, cfg) + re.NoError(err) + testutil.Eventually(re, func() bool { + return !s.IsClosed() + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + return s, cleanup +} + +// NewSchedulingTestServer creates a scheduling server with given config for testing. +func NewSchedulingTestServer(ctx context.Context, cfg *sc.Config) (*scheduling.Server, testutil.CleanupFunc, error) { + s := scheduling.CreateServer(ctx, cfg) + if err := s.Run(); err != nil { + return nil, nil, err + } + cleanup := func() { + s.Close() + os.RemoveAll(cfg.DataDir) + } + return s, cleanup, nil +} + +// WaitForPrimaryServing waits for one of servers being elected to be the primary/leader +func WaitForPrimaryServing(re *require.Assertions, serverMap map[string]bs.Server) string { + var primary string + testutil.Eventually(re, func() bool { + for name, s := range serverMap { + if s.IsServing() { + primary = name + return true + } + } + return false + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + return primary +} + +// MustPutStore is used for test purpose. +func MustPutStore(re *require.Assertions, cluster *TestCluster, store *metapb.Store) { + store.Address = fmt.Sprintf("tikv%d", store.GetId()) + if len(store.Version) == 0 { + store.Version = versioninfo.MinSupportedVersion(versioninfo.Version2_0).String() + } + svr := cluster.GetLeaderServer().GetServer() + grpcServer := &server.GrpcServer{Server: svr} + _, err := grpcServer.PutStore(context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: svr.ClusterID()}, + Store: store, + }) + re.NoError(err) + + storeInfo := grpcServer.GetRaftCluster().GetStore(store.GetId()) + newStore := storeInfo.Clone(core.SetStoreStats(&pdpb.StoreStats{ + Capacity: uint64(10 * units.GiB), + UsedSize: uint64(9 * units.GiB), + Available: uint64(1 * units.GiB), + })) + grpcServer.GetRaftCluster().GetBasicCluster().PutStore(newStore) + if cluster.GetSchedulingPrimaryServer() != nil { + cluster.GetSchedulingPrimaryServer().GetCluster().PutStore(newStore) + } +} + +// MustPutRegion is used for test purpose. +func MustPutRegion(re *require.Assertions, cluster *TestCluster, regionID, storeID uint64, start, end []byte, opts ...core.RegionCreateOption) *core.RegionInfo { + leader := &metapb.Peer{ + Id: regionID, + StoreId: storeID, + } + metaRegion := &metapb.Region{ + Id: regionID, + StartKey: start, + EndKey: end, + Peers: []*metapb.Peer{leader}, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + } + opts = append(opts, core.SetSource(core.Heartbeat)) + r := core.NewRegionInfo(metaRegion, leader, opts...) + MustPutRegionInfo(re, cluster, r) + return r +} + +// MustPutRegionInfo is used for test purpose. +func MustPutRegionInfo(re *require.Assertions, cluster *TestCluster, regionInfo *core.RegionInfo) { + err := cluster.HandleRegionHeartbeat(regionInfo) + re.NoError(err) + if cluster.GetSchedulingPrimaryServer() != nil { + err = cluster.GetSchedulingPrimaryServer().GetCluster().HandleRegionHeartbeat(regionInfo) + re.NoError(err) + } +} + +// MustReportBuckets is used for test purpose. +func MustReportBuckets(re *require.Assertions, cluster *TestCluster, regionID uint64, start, end []byte, stats *metapb.BucketStats) *metapb.Buckets { + buckets := &metapb.Buckets{ + RegionId: regionID, + Version: 1, + Keys: [][]byte{start, end}, + Stats: stats, + // report buckets interval is 10s + PeriodInMs: 10000, + } + err := cluster.HandleReportBuckets(buckets) + re.NoError(err) + // TODO: forwards to scheduling server after it supports buckets + return buckets +} + +type mode int + +const ( + pdMode mode = iota + apiMode +) + +// SchedulingTestEnvironment is used for test purpose. +type SchedulingTestEnvironment struct { + t *testing.T + ctx context.Context + cancel context.CancelFunc + cluster *TestCluster + opts []ConfigOption +} + +// NewSchedulingTestEnvironment is to create a new SchedulingTestEnvironment. +func NewSchedulingTestEnvironment(t *testing.T, opts ...ConfigOption) *SchedulingTestEnvironment { + return &SchedulingTestEnvironment{ + t: t, + opts: opts, + } +} + +// RunTestInTwoModes is to run test in two modes. +func (s *SchedulingTestEnvironment) RunTestInTwoModes(test func(*TestCluster)) { + s.RunTestInPDMode(test) + s.RunTestInAPIMode(test) +} + +// RunTestInPDMode is to run test in pd mode. +func (s *SchedulingTestEnvironment) RunTestInPDMode(test func(*TestCluster)) { + s.t.Log("start to run test in pd mode") + s.startCluster(pdMode) + test(s.cluster) + s.cleanup() + s.t.Log("finish to run test in pd mode") +} + +// RunTestInAPIMode is to run test in api mode. +func (s *SchedulingTestEnvironment) RunTestInAPIMode(test func(*TestCluster)) { + s.t.Log("start to run test in api mode") + re := require.New(s.t) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`)) + s.startCluster(apiMode) + test(s.cluster) + s.cleanup() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember")) + s.t.Log("finish to run test in api mode") +} + +func (s *SchedulingTestEnvironment) cleanup() { + s.cluster.Destroy() + s.cancel() +} + +func (s *SchedulingTestEnvironment) startCluster(m mode) { + var err error + re := require.New(s.t) + s.ctx, s.cancel = context.WithCancel(context.Background()) + switch m { + case pdMode: + s.cluster, err = NewTestCluster(s.ctx, 1, s.opts...) + re.NoError(err) + err = s.cluster.RunInitialServers() + re.NoError(err) + re.NotEmpty(s.cluster.WaitLeader()) + leaderServer := s.cluster.GetServer(s.cluster.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + case apiMode: + s.cluster, err = NewTestAPICluster(s.ctx, 1, s.opts...) + re.NoError(err) + err = s.cluster.RunInitialServers() + re.NoError(err) + re.NotEmpty(s.cluster.WaitLeader()) + leaderServer := s.cluster.GetServer(s.cluster.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + // start scheduling cluster + tc, err := NewTestSchedulingCluster(s.ctx, 1, leaderServer.GetAddr()) + re.NoError(err) + tc.WaitForPrimaryServing(re) + s.cluster.SetSchedulingCluster(tc) + time.Sleep(200 * time.Millisecond) // wait for scheduling cluster to update member + } +}