diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 8b98e718de2..e3d199128d2 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -792,6 +792,7 @@ "intervalFactor": 2, "legendFormat": "{{type}}", "refId": "B" +<<<<<<< HEAD }, { "expr": "pd_regions_offline_status{tidb_cluster=\"$tidb_cluster\", type=\"offline-peer-region-count\", instance=\"$instance\"}", @@ -799,6 +800,8 @@ "intervalFactor": 1, "legendFormat": "{{type}}", "refId": "C" +======= +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) } ], "thresholds": [ diff --git a/server/api/region.go b/server/api/region.go index b4acdee0dae..719772e3071 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -374,6 +374,7 @@ func (h *regionsHandler) GetStoreRegions(w http.ResponseWriter, r *http.Request) h.rd.JSON(w, http.StatusOK, regionsInfo) } +<<<<<<< HEAD // @Tags region // @Summary List all regions that miss peer. // @Produce json @@ -381,8 +382,74 @@ func (h *regionsHandler) GetStoreRegions(w http.ResponseWriter, r *http.Request) // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /regions/check/miss-peer [get] func (h *regionsHandler) GetMissPeerRegions(w http.ResponseWriter, r *http.Request) { +======= +// @Tags region +// @Summary List regions belongs to the given keyspace ID. +// @Param keyspace_id query string true "Keyspace ID" +// @Param limit query integer false "Limit count" default(16) +// @Produce json +// @Success 200 {object} RegionsInfo +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/keyspace/id/{id} [get] +func (h *regionsHandler) GetKeyspaceRegions(w http.ResponseWriter, r *http.Request) { + rc := getCluster(r) + vars := mux.Vars(r) + keyspaceIDStr := vars["id"] + if keyspaceIDStr == "" { + h.rd.JSON(w, http.StatusBadRequest, "keyspace id is empty") + return + } + + keyspaceID64, err := strconv.ParseUint(keyspaceIDStr, 10, 32) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + keyspaceID := uint32(keyspaceID64) + keyspaceManager := h.svr.GetKeyspaceManager() + if _, err := keyspaceManager.LoadKeyspaceByID(keyspaceID); err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + + limit := defaultRegionLimit + if limitStr := r.URL.Query().Get("limit"); limitStr != "" { + limit, err = strconv.Atoi(limitStr) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + } + if limit > maxRegionLimit { + limit = maxRegionLimit + } + regionBound := keyspace.MakeRegionBound(keyspaceID) + regions := rc.ScanRegions(regionBound.RawLeftBound, regionBound.RawRightBound, limit) + if limit <= 0 || limit > len(regions) { + txnRegion := rc.ScanRegions(regionBound.TxnLeftBound, regionBound.TxnRightBound, limit-len(regions)) + regions = append(regions, txnRegion...) + } + regionsInfo := convertToAPIRegions(regions) + h.rd.JSON(w, http.StatusOK, regionsInfo) +} + +// @Tags region +// @Summary List all regions that miss peer. +// @Produce json +// @Success 200 {object} RegionsInfo +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /regions/check/miss-peer [get] +func (h *regionsHandler) GetMissPeerRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.MissPeer) +} + +func (h *regionsHandler) getRegionsByType( + w http.ResponseWriter, + typ statistics.RegionStatisticType, +) { +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) handler := h.svr.GetHandler() - regions, err := handler.GetRegionsByType(statistics.MissPeer) + regions, err := handler.GetRegionsByType(typ) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return @@ -391,6 +458,7 @@ func (h *regionsHandler) GetMissPeerRegions(w http.ResponseWriter, r *http.Reque h.rd.JSON(w, http.StatusOK, regionsInfo) } +<<<<<<< HEAD // @Tags region // @Summary List all regions that has extra peer. // @Produce json @@ -491,6 +559,86 @@ func (h *regionsHandler) GetEmptyRegion(w http.ResponseWriter, r *http.Request) } regionsInfo := convertToAPIRegions(regions) h.rd.JSON(w, http.StatusOK, regionsInfo) +======= +// @Tags region +// @Summary List all regions that has extra peer. +// @Produce json +// @Success 200 {object} RegionsInfo +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /regions/check/extra-peer [get] +func (h *regionsHandler) GetExtraPeerRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.ExtraPeer) +} + +// @Tags region +// @Summary List all regions that has pending peer. +// @Produce json +// @Success 200 {object} RegionsInfo +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /regions/check/pending-peer [get] +func (h *regionsHandler) GetPendingPeerRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.PendingPeer) +} + +// @Tags region +// @Summary List all regions that has down peer. +// @Produce json +// @Success 200 {object} RegionsInfo +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /regions/check/down-peer [get] +func (h *regionsHandler) GetDownPeerRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.DownPeer) +} + +// @Tags region +// @Summary List all regions that has learner peer. +// @Produce json +// @Success 200 {object} RegionsInfo +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /regions/check/learner-peer [get] +func (h *regionsHandler) GetLearnerPeerRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.LearnerPeer) +} + +// @Tags region +// @Summary List all regions that has offline peer. +// @Produce json +// @Success 200 {object} RegionsInfo +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /regions/check/offline-peer [get] +func (h *regionsHandler) GetOfflinePeerRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.OfflinePeer) +} + +// @Tags region +// @Summary List all regions that are oversized. +// @Produce json +// @Success 200 {object} RegionsInfo +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /regions/check/oversized-region [get] +func (h *regionsHandler) GetOverSizedRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.OversizedRegion) +} + +// @Tags region +// @Summary List all regions that are undersized. +// @Produce json +// @Success 200 {object} RegionsInfo +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /regions/check/undersized-region [get] +func (h *regionsHandler) GetUndersizedRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.UndersizedRegion) +} + +// @Tags region +// @Summary List all empty regions. +// @Produce json +// @Success 200 {object} RegionsInfo +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /regions/check/empty-region [get] +func (h *regionsHandler) GetEmptyRegions(w http.ResponseWriter, _ *http.Request) { + h.getRegionsByType(w, statistics.EmptyRegion) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) } type histItem struct { diff --git a/server/api/region_test.go b/server/api/region_test.go index 7f3af26f085..72d50c0412c 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -153,9 +153,19 @@ func (s *testRegionSuite) TestRegion(c *C) { func (s *testRegionSuite) TestRegionCheck(c *C) { r := newTestRegionInfo(2, 1, []byte("a"), []byte("b")) downPeer := &metapb.Peer{Id: 13, StoreId: 2} +<<<<<<< HEAD r = r.Clone(core.WithAddPeer(downPeer), core.WithDownPeers([]*pdpb.PeerStats{{Peer: downPeer, DownSeconds: 3600}}), core.WithPendingPeers([]*metapb.Peer{downPeer})) mustRegionHeartbeat(c, s.svr, r) url := fmt.Sprintf("%s/region/id/%d", s.urlPrefix, r.GetID()) +======= + r = r.Clone( + core.WithAddPeer(downPeer), + core.WithDownPeers([]*pdpb.PeerStats{{Peer: downPeer, DownSeconds: 3600}}), + core.WithPendingPeers([]*metapb.Peer{downPeer})) + re := suite.Require() + mustRegionHeartbeat(re, suite.svr, r) + url := fmt.Sprintf("%s/region/id/%d", suite.urlPrefix, r.GetID()) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) r1 := &RegionInfo{} c.Assert(readJSON(testDialClient, url, r1), IsNil) r1.Adjust() @@ -201,7 +211,20 @@ func (s *testRegionSuite) TestRegionCheck(c *C) { r7 := make([]*histItem, 1) c.Assert(readJSON(testDialClient, url, &r7), IsNil) histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}} +<<<<<<< HEAD c.Assert(r7, DeepEquals, histKeys) +======= + suite.Equal(histKeys, r7) + + mustPutStore(re, suite.svr, 2, metapb.StoreState_Offline, metapb.NodeState_Removing, []*metapb.StoreLabel{}) + mustRegionHeartbeat(re, suite.svr, r) + url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "offline-peer") + r8 := &RegionsInfo{} + suite.NoError(tu.ReadGetJSON(re, testDialClient, url, r8)) + r4.Adjust() + suite.Equal(1, r8.Count) + suite.Equal(r.GetID(), r8.Regions[0].ID) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) } func (s *testRegionSuite) TestRegions(c *C) { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index f81bf9387c2..372baf972e2 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -254,6 +254,7 @@ func (c *RaftCluster) Start(s Server) error { if err != nil { return err } +<<<<<<< HEAD c.replicationMode, err = replication.NewReplicationModeManager(s.GetConfig().ReplicationMode, s.GetStorage(), cluster, s) if err != nil { @@ -262,6 +263,11 @@ func (c *RaftCluster) Start(s Server) error { c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams()) c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager) +======= + c.storeConfigManager = config.NewStoreConfigManager(c.httpClient) + c.coordinator = schedule.NewCoordinator(c.ctx, cluster, s.GetHBStreams()) + c.regionStats = statistics.NewRegionStatistics(c.core, c.opt, c.ruleManager, c.storeConfigManager) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) c.limiter = NewStoreLimiter(s.GetPersistOptions()) c.unsafeRecoveryController = newUnsafeRecoveryController(cluster) @@ -1347,6 +1353,7 @@ func (c *RaftCluster) GetRegionStatsByType(typ statistics.RegionStatisticType) [ return c.regionStats.GetRegionStatsByType(typ) } +<<<<<<< HEAD // GetOfflineRegionStatsByType gets the status of the offline region by types. func (c *RaftCluster) GetOfflineRegionStatsByType(typ statistics.RegionStatisticType) []*core.RegionInfo { c.RLock() @@ -1360,6 +1367,10 @@ func (c *RaftCluster) GetOfflineRegionStatsByType(typ statistics.RegionStatistic func (c *RaftCluster) updateRegionsLabelLevelStats(regions []*core.RegionInfo) { c.Lock() defer c.Unlock() +======= +// UpdateRegionsLabelLevelStats updates the status of the region label level by types. +func (c *RaftCluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) for _, region := range regions { c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.opt.GetLocationLabels()) } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index b3ce608ab37..60e4c12a750 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -646,8 +646,51 @@ func (s *testClusterInfoSuite) TestRegionFlowChanged(c *C) { func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) { _, opt, err := newTestScheduleConfig() +<<<<<<< HEAD c.Assert(err, IsNil) cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) +======= + re.NoError(err) + cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) + cluster.regionStats = statistics.NewRegionStatistics( + cluster.GetBasicCluster(), + cluster.GetOpts(), + cluster.ruleManager, + cluster.storeConfigManager) + region := newTestRegions(1, 3, 3)[0] + cluster.opt.GetMaxMergeRegionKeys() + curMaxMergeSize := int64(cluster.opt.GetMaxMergeRegionSize()) + curMaxMergeKeys := int64(cluster.opt.GetMaxMergeRegionKeys()) + region = region.Clone( + core.WithLeader(region.GetPeers()[2]), + core.SetApproximateSize(curMaxMergeSize-1), + core.SetApproximateKeys(curMaxMergeKeys-1), + core.SetFromHeartbeat(true), + ) + cluster.processRegionHeartbeat(region) + regionID := region.GetID() + re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) + // Test ApproximateSize and ApproximateKeys change. + region = region.Clone( + core.WithLeader(region.GetPeers()[2]), + core.SetApproximateSize(curMaxMergeSize+1), + core.SetApproximateKeys(curMaxMergeKeys+1), + core.SetFromHeartbeat(true), + ) + cluster.processRegionHeartbeat(region) + re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) + // Test MaxMergeRegionSize and MaxMergeRegionKeys change. + cluster.opt.SetMaxMergeRegionSize(uint64(curMaxMergeSize + 2)) + cluster.opt.SetMaxMergeRegionKeys(uint64(curMaxMergeKeys + 2)) + cluster.processRegionHeartbeat(region) + re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) + cluster.opt.SetMaxMergeRegionSize(uint64(curMaxMergeSize)) + cluster.opt.SetMaxMergeRegionKeys(uint64(curMaxMergeKeys)) + cluster.processRegionHeartbeat(region) + re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) +} +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) regions := []*core.RegionInfo{core.NewTestRegionInfo([]byte{}, []byte{})} regions = core.SplitRegions(regions) @@ -840,7 +883,16 @@ func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) { panic(err) } } +<<<<<<< HEAD cluster.regionStats = statistics.NewRegionStatistics(cluster.GetOpts(), cluster.ruleManager) +======= + cluster.regionStats = statistics.NewRegionStatistics( + cluster.GetBasicCluster(), + cluster.GetOpts(), + cluster.ruleManager, + cluster.storeConfigManager) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) // Put 3 stores. for _, store := range newTestStores(4, "5.0.0") { @@ -878,14 +930,24 @@ func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) { for i := 0; i < n; i++ { regions = core.SplitRegions(regions) } +<<<<<<< HEAD heartbeatRegions(c, cluster, regions) c.Assert(cluster.GetOfflineRegionStatsByType(statistics.OfflinePeer), HasLen, len(regions)) +======= + heartbeatRegions(re, cluster, regions) + re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions)) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) // Merge. for i := 0; i < n; i++ { regions = core.MergeRegions(regions) +<<<<<<< HEAD heartbeatRegions(c, cluster, regions) c.Assert(cluster.GetOfflineRegionStatsByType(statistics.OfflinePeer), HasLen, len(regions)) +======= + heartbeatRegions(re, cluster, regions) + re.Len(cluster.GetRegionStatsByType(statistics.OfflinePeer), len(regions)) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) } } @@ -927,7 +989,273 @@ var _ = Suite(&testStoresInfoSuite{}) type testStoresInfoSuite struct{} +<<<<<<< HEAD func (s *testStoresInfoSuite) TestStores(c *C) { +======= + var stores []*core.StoreInfo + var testStore *core.StoreInfo + for i, zone := range zones { + for j, rack := range racks { + for k, host := range hosts { + storeID := uint64(i*len(racks)*len(hosts) + j*len(hosts) + k) + storeLabels := map[string]string{ + "zone": zone, + "rack": rack, + "host": host, + } + store := core.NewStoreInfoWithLabel(storeID, storeLabels) + if i == 0 && j == 0 && k == 0 { + testStore = store + } + stores = append(stores, store) + } + } + } + + re.Equal(1.0/3/3/4, getStoreTopoWeight(testStore, stores, labels, 3)) +} + +func TestTopologyWeight1(t *testing.T) { + re := require.New(t) + + labels := []string{"dc", "zone", "host"} + store1 := core.NewStoreInfoWithLabel(1, map[string]string{"dc": "dc1", "zone": "zone1", "host": "host1"}) + store2 := core.NewStoreInfoWithLabel(2, map[string]string{"dc": "dc2", "zone": "zone2", "host": "host2"}) + store3 := core.NewStoreInfoWithLabel(3, map[string]string{"dc": "dc3", "zone": "zone3", "host": "host3"}) + store4 := core.NewStoreInfoWithLabel(4, map[string]string{"dc": "dc1", "zone": "zone1", "host": "host1"}) + store5 := core.NewStoreInfoWithLabel(5, map[string]string{"dc": "dc1", "zone": "zone2", "host": "host2"}) + store6 := core.NewStoreInfoWithLabel(6, map[string]string{"dc": "dc1", "zone": "zone3", "host": "host3"}) + stores := []*core.StoreInfo{store1, store2, store3, store4, store5, store6} + + re.Equal(1.0/3, getStoreTopoWeight(store2, stores, labels, 3)) + re.Equal(1.0/3/4, getStoreTopoWeight(store1, stores, labels, 3)) + re.Equal(1.0/3/4, getStoreTopoWeight(store6, stores, labels, 3)) +} + +func TestTopologyWeight2(t *testing.T) { + re := require.New(t) + + labels := []string{"dc", "zone", "host"} + store1 := core.NewStoreInfoWithLabel(1, map[string]string{"dc": "dc1", "zone": "zone1", "host": "host1"}) + store2 := core.NewStoreInfoWithLabel(2, map[string]string{"dc": "dc2"}) + store3 := core.NewStoreInfoWithLabel(3, map[string]string{"dc": "dc3"}) + store4 := core.NewStoreInfoWithLabel(4, map[string]string{"dc": "dc1", "zone": "zone2", "host": "host1"}) + store5 := core.NewStoreInfoWithLabel(5, map[string]string{"dc": "dc1", "zone": "zone3", "host": "host1"}) + stores := []*core.StoreInfo{store1, store2, store3, store4, store5} + + re.Equal(1.0/3, getStoreTopoWeight(store2, stores, labels, 3)) + re.Equal(1.0/3/3, getStoreTopoWeight(store1, stores, labels, 3)) +} + +func TestTopologyWeight3(t *testing.T) { + re := require.New(t) + + labels := []string{"dc", "zone", "host"} + store1 := core.NewStoreInfoWithLabel(1, map[string]string{"dc": "dc1", "zone": "zone1", "host": "host1"}) + store2 := core.NewStoreInfoWithLabel(2, map[string]string{"dc": "dc1", "zone": "zone2", "host": "host2"}) + store3 := core.NewStoreInfoWithLabel(3, map[string]string{"dc": "dc1", "zone": "zone3", "host": "host3"}) + store4 := core.NewStoreInfoWithLabel(4, map[string]string{"dc": "dc2", "zone": "zone4", "host": "host4"}) + store5 := core.NewStoreInfoWithLabel(5, map[string]string{"dc": "dc2", "zone": "zone4", "host": "host5"}) + store6 := core.NewStoreInfoWithLabel(6, map[string]string{"dc": "dc2", "zone": "zone5", "host": "host6"}) + + store7 := core.NewStoreInfoWithLabel(7, map[string]string{"dc": "dc1", "zone": "zone1", "host": "host7"}) + store8 := core.NewStoreInfoWithLabel(8, map[string]string{"dc": "dc2", "zone": "zone4", "host": "host8"}) + store9 := core.NewStoreInfoWithLabel(9, map[string]string{"dc": "dc2", "zone": "zone4", "host": "host9"}) + store10 := core.NewStoreInfoWithLabel(10, map[string]string{"dc": "dc2", "zone": "zone5", "host": "host10"}) + stores := []*core.StoreInfo{store1, store2, store3, store4, store5, store6, store7, store8, store9, store10} + + re.Equal(1.0/5/2, getStoreTopoWeight(store7, stores, labels, 5)) + re.Equal(1.0/5/4, getStoreTopoWeight(store8, stores, labels, 5)) + re.Equal(1.0/5/4, getStoreTopoWeight(store9, stores, labels, 5)) + re.Equal(1.0/5/2, getStoreTopoWeight(store10, stores, labels, 5)) +} + +func TestTopologyWeight4(t *testing.T) { + re := require.New(t) + + labels := []string{"dc", "zone", "host"} + store1 := core.NewStoreInfoWithLabel(1, map[string]string{"dc": "dc1", "zone": "zone1", "host": "host1"}) + store2 := core.NewStoreInfoWithLabel(2, map[string]string{"dc": "dc1", "zone": "zone1", "host": "host2"}) + store3 := core.NewStoreInfoWithLabel(3, map[string]string{"dc": "dc1", "zone": "zone2", "host": "host3"}) + store4 := core.NewStoreInfoWithLabel(4, map[string]string{"dc": "dc2", "zone": "zone1", "host": "host4"}) + + stores := []*core.StoreInfo{store1, store2, store3, store4} + + re.Equal(1.0/3/2, getStoreTopoWeight(store1, stores, labels, 3)) + re.Equal(1.0/3, getStoreTopoWeight(store3, stores, labels, 3)) + re.Equal(1.0/3, getStoreTopoWeight(store4, stores, labels, 3)) +} + +func TestCalculateStoreSize1(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, opt, err := newTestScheduleConfig() + re.NoError(err) + cfg := opt.GetReplicationConfig() + cfg.EnablePlacementRules = true + opt.SetReplicationConfig(cfg) + cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) + cluster.regionStats = statistics.NewRegionStatistics( + cluster.GetBasicCluster(), + cluster.GetOpts(), + cluster.ruleManager, + cluster.storeConfigManager) + + // Put 10 stores. + for i, store := range newTestStores(10, "6.0.0") { + var labels []*metapb.StoreLabel + if i%3 == 0 { + // zone 1 has 1, 4, 7, 10 + labels = append(labels, &metapb.StoreLabel{Key: "zone", Value: "zone1"}) + } else if i%3 == 1 { + // zone 2 has 2, 5, 8 + labels = append(labels, &metapb.StoreLabel{Key: "zone", Value: "zone2"}) + } else { + // zone 3 has 3, 6, 9 + labels = append(labels, &metapb.StoreLabel{Key: "zone", Value: "zone3"}) + } + labels = append(labels, []*metapb.StoreLabel{ + { + Key: "rack", + Value: fmt.Sprintf("rack-%d", i%2+1), + }, + { + Key: "host", + Value: fmt.Sprintf("host-%d", i), + }, + }...) + s := store.Clone(core.SetStoreLabels(labels)) + re.NoError(cluster.PutStore(s.GetMeta())) + } + + cluster.ruleManager.SetRule( + &placement.Rule{GroupID: "pd", ID: "zone1", StartKey: []byte(""), EndKey: []byte(""), Role: "voter", Count: 2, + LabelConstraints: []placement.LabelConstraint{ + {Key: "zone", Op: "in", Values: []string{"zone1"}}, + }, + LocationLabels: []string{"rack", "host"}}, + ) + + cluster.ruleManager.SetRule( + &placement.Rule{GroupID: "pd", ID: "zone2", StartKey: []byte(""), EndKey: []byte(""), Role: "voter", Count: 2, + LabelConstraints: []placement.LabelConstraint{ + {Key: "zone", Op: "in", Values: []string{"zone2"}}, + }, + LocationLabels: []string{"rack", "host"}}, + ) + + cluster.ruleManager.SetRule( + &placement.Rule{GroupID: "pd", ID: "zone3", StartKey: []byte(""), EndKey: []byte(""), Role: "follower", Count: 1, + LabelConstraints: []placement.LabelConstraint{ + {Key: "zone", Op: "in", Values: []string{"zone3"}}, + }, + LocationLabels: []string{"rack", "host"}}, + ) + cluster.ruleManager.DeleteRule("pd", "default") + + regions := newTestRegions(100, 10, 5) + for _, region := range regions { + re.NoError(cluster.putRegion(region)) + } + + stores := cluster.GetStores() + store := cluster.GetStore(1) + // 100 * 100 * 2 (placement rule) / 4 (host) * 0.9 = 4500 + re.Equal(4500.0, cluster.getThreshold(stores, store)) + + cluster.opt.SetPlacementRuleEnabled(false) + cluster.opt.SetLocationLabels([]string{"zone", "rack", "host"}) + // 30000 (total region size) / 3 (zone) / 4 (host) * 0.9 = 2250 + re.Equal(2250.0, cluster.getThreshold(stores, store)) +} + +func TestCalculateStoreSize2(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, opt, err := newTestScheduleConfig() + re.NoError(err) + cfg := opt.GetReplicationConfig() + cfg.EnablePlacementRules = true + opt.SetReplicationConfig(cfg) + opt.SetMaxReplicas(3) + cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) + cluster.regionStats = statistics.NewRegionStatistics( + cluster.GetBasicCluster(), + cluster.GetOpts(), + cluster.ruleManager, + cluster.storeConfigManager) + + // Put 10 stores. + for i, store := range newTestStores(10, "6.0.0") { + var labels []*metapb.StoreLabel + if i%2 == 0 { + // dc 1 has 1, 3, 5, 7, 9 + labels = append(labels, &metapb.StoreLabel{Key: "dc", Value: "dc1"}) + if i%4 == 0 { + labels = append(labels, &metapb.StoreLabel{Key: "logic", Value: "logic1"}) + } else { + labels = append(labels, &metapb.StoreLabel{Key: "logic", Value: "logic2"}) + } + } else { + // dc 2 has 2, 4, 6, 8, 10 + labels = append(labels, &metapb.StoreLabel{Key: "dc", Value: "dc2"}) + if i%3 == 0 { + labels = append(labels, &metapb.StoreLabel{Key: "logic", Value: "logic3"}) + } else { + labels = append(labels, &metapb.StoreLabel{Key: "logic", Value: "logic4"}) + } + } + labels = append(labels, []*metapb.StoreLabel{{Key: "rack", Value: "r1"}, {Key: "host", Value: "h1"}}...) + s := store.Clone(core.SetStoreLabels(labels)) + re.NoError(cluster.PutStore(s.GetMeta())) + } + + cluster.ruleManager.SetRule( + &placement.Rule{GroupID: "pd", ID: "dc1", StartKey: []byte(""), EndKey: []byte(""), Role: "voter", Count: 2, + LabelConstraints: []placement.LabelConstraint{ + {Key: "dc", Op: "in", Values: []string{"dc1"}}, + }, + LocationLabels: []string{"dc", "logic", "rack", "host"}}, + ) + + cluster.ruleManager.SetRule( + &placement.Rule{GroupID: "pd", ID: "logic3", StartKey: []byte(""), EndKey: []byte(""), Role: "voter", Count: 1, + LabelConstraints: []placement.LabelConstraint{ + {Key: "logic", Op: "in", Values: []string{"logic3"}}, + }, + LocationLabels: []string{"dc", "logic", "rack", "host"}}, + ) + + cluster.ruleManager.SetRule( + &placement.Rule{GroupID: "pd", ID: "logic4", StartKey: []byte(""), EndKey: []byte(""), Role: "learner", Count: 1, + LabelConstraints: []placement.LabelConstraint{ + {Key: "logic", Op: "in", Values: []string{"logic4"}}, + }, + LocationLabels: []string{"dc", "logic", "rack", "host"}}, + ) + cluster.ruleManager.DeleteRule("pd", "default") + + regions := newTestRegions(100, 10, 5) + for _, region := range regions { + re.NoError(cluster.putRegion(region)) + } + + stores := cluster.GetStores() + store := cluster.GetStore(1) + + // 100 * 100 * 4 (total region size) / 2 (dc) / 2 (logic) / 3 (host) * 0.9 = 3000 + re.Equal(3000.0, cluster.getThreshold(stores, store)) +} + +func TestStores(t *testing.T) { + re := require.New(t) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) n := uint64(10) cache := core.NewStoresInfo() stores := newTestStores(n, "2.0.0") @@ -1275,3 +1603,1442 @@ func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error { return nil } +<<<<<<< HEAD +======= + +func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind operator.OpKind, steps ...operator.OpStep) *operator.Operator { + return operator.NewTestOperator(regionID, regionEpoch, kind, steps...) +} + +func (c *testCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { + id, err := c.AllocID() + if err != nil { + return nil, err + } + return &metapb.Peer{Id: id, StoreId: storeID}, nil +} + +func (c *testCluster) addRegionStore(storeID uint64, regionCount int, regionSizes ...uint64) error { + var regionSize uint64 + if len(regionSizes) == 0 { + regionSize = uint64(regionCount) * 10 + } else { + regionSize = regionSizes[0] + } + + stats := &pdpb.StoreStats{} + stats.Capacity = 100 * units.GiB + stats.UsedSize = regionSize * units.MiB + stats.Available = stats.Capacity - stats.UsedSize + newStore := core.NewStoreInfo(&metapb.Store{Id: storeID}, + core.SetStoreStats(stats), + core.SetRegionCount(regionCount), + core.SetRegionSize(int64(regionSize)), + core.SetLastHeartbeatTS(time.Now()), + ) + + c.SetStoreLimit(storeID, storelimit.AddPeer, 60) + c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) addLeaderRegion(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) error { + region := newTestRegionMeta(regionID) + leader, _ := c.AllocPeer(leaderStoreID) + region.Peers = []*metapb.Peer{leader} + for _, followerStoreID := range followerStoreIDs { + peer, _ := c.AllocPeer(followerStoreID) + region.Peers = append(region.Peers, peer) + } + regionInfo := core.NewRegionInfo(region, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10)) + return c.putRegion(regionInfo) +} + +func (c *testCluster) updateLeaderCount(storeID uint64, leaderCount int) error { + store := c.GetStore(storeID) + newStore := store.Clone( + core.SetLeaderCount(leaderCount), + core.SetLeaderSize(int64(leaderCount)*10), + ) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) addLeaderStore(storeID uint64, leaderCount int) error { + stats := &pdpb.StoreStats{} + newStore := core.NewStoreInfo(&metapb.Store{Id: storeID}, + core.SetStoreStats(stats), + core.SetLeaderCount(leaderCount), + core.SetLeaderSize(int64(leaderCount)*10), + core.SetLastHeartbeatTS(time.Now()), + ) + + c.SetStoreLimit(storeID, storelimit.AddPeer, 60) + c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) setStoreDown(storeID uint64) error { + store := c.GetStore(storeID) + newStore := store.Clone( + core.UpStore(), + core.SetLastHeartbeatTS(typeutil.ZeroTime), + ) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) setStoreOffline(storeID uint64) error { + store := c.GetStore(storeID) + newStore := store.Clone(core.OfflineStore(false)) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) error { + // regions load from etcd will have no leader + region := newTestRegionMeta(regionID) + region.Peers = []*metapb.Peer{} + for _, id := range followerStoreIDs { + peer, _ := c.AllocPeer(id) + region.Peers = append(region.Peers, peer) + } + return c.putRegion(core.NewRegionInfo(region, nil)) +} + +func TestBasic(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + + re.NoError(tc.addLeaderRegion(1, 1)) + + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) + re.Equal(op1.RegionID(), oc.GetOperator(1).RegionID()) + + // Region 1 already has an operator, cannot add another one. + op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op2) + re.Equal(uint64(0), oc.OperatorCount(operator.OpRegion)) + + // Remove the operator manually, then we can add a new operator. + re.True(oc.RemoveOperator(op1)) + op3 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op3) + re.Equal(uint64(1), oc.OperatorCount(operator.OpRegion)) + re.Equal(op3.RegionID(), oc.GetOperator(1).RegionID()) +} + +func TestDispatch(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + co.GetPrepareChecker().SetPrepared() + // Transfer peer from store 4 to store 1. + re.NoError(tc.addRegionStore(4, 40)) + re.NoError(tc.addRegionStore(3, 30)) + re.NoError(tc.addRegionStore(2, 20)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + + // Transfer leader from store 4 to store 2. + re.NoError(tc.updateLeaderCount(4, 50)) + re.NoError(tc.updateLeaderCount(3, 50)) + re.NoError(tc.updateLeaderCount(2, 20)) + re.NoError(tc.updateLeaderCount(1, 10)) + re.NoError(tc.addLeaderRegion(2, 4, 3, 2)) + + go co.RunUntilStop() + + // Wait for schedule and turn off balance. + waitOperator(re, co, 1) + sc := co.GetSchedulersController() + operatorutil.CheckTransferPeer(re, co.GetOperatorController().GetOperator(1), operator.OpKind(0), 4, 1) + re.NoError(sc.RemoveScheduler(schedulers.BalanceRegionName)) + waitOperator(re, co, 2) + operatorutil.CheckTransferLeader(re, co.GetOperatorController().GetOperator(2), operator.OpKind(0), 4, 2) + re.NoError(sc.RemoveScheduler(schedulers.BalanceLeaderName)) + + stream := mockhbstream.NewHeartbeatStream() + + // Transfer peer. + region := tc.GetRegion(1).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitRemovePeer(re, stream, region, 4) + re.NoError(dispatchHeartbeat(co, region, stream)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Transfer leader. + region = tc.GetRegion(2).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + waitTransferLeader(re, stream, region, 2) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func dispatchHeartbeat(co *schedule.Coordinator, region *core.RegionInfo, stream hbstream.HeartbeatStream) error { + co.GetHeartbeatStreams().BindStream(region.GetLeader().GetStoreId(), stream) + if err := co.GetCluster().(*RaftCluster).putRegion(region.Clone()); err != nil { + return err + } + co.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, nil) + return nil +} + +func TestCollectMetricsConcurrent(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, func(tc *testCluster) { + tc.regionStats = statistics.NewRegionStatistics( + tc.GetBasicCluster(), + tc.GetOpts(), + nil, + tc.storeConfigManager) + }, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + // Make sure there are no problem when concurrent write and read + var wg sync.WaitGroup + count := 10 + wg.Add(count + 1) + for i := 0; i <= count; i++ { + go func(i int) { + defer wg.Done() + for j := 0; j < 1000; j++ { + re.NoError(tc.addRegionStore(uint64(i%5), rand.Intn(200))) + } + }(i) + } + sc := co.GetSchedulersController() + for i := 0; i < 1000; i++ { + co.CollectHotSpotMetrics() + sc.CollectSchedulerMetrics() + co.GetCluster().(*RaftCluster).collectClusterMetrics() + } + co.ResetHotSpotMetrics() + sc.ResetSchedulerMetrics() + co.GetCluster().(*RaftCluster).resetClusterMetrics() + wg.Wait() +} + +func TestCollectMetrics(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, func(tc *testCluster) { + tc.regionStats = statistics.NewRegionStatistics( + tc.GetBasicCluster(), + tc.GetOpts(), + nil, + tc.storeConfigManager) + }, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + count := 10 + for i := 0; i <= count; i++ { + for k := 0; k < 200; k++ { + item := &statistics.HotPeerStat{ + StoreID: uint64(i % 5), + RegionID: uint64(i*1000 + k), + Loads: []float64{10, 20, 30}, + HotDegree: 10, + AntiCount: statistics.HotRegionAntiCount, // for write + } + tc.hotStat.HotCache.Update(item, statistics.Write) + } + } + sc := co.GetSchedulersController() + for i := 0; i < 1000; i++ { + co.CollectHotSpotMetrics() + sc.CollectSchedulerMetrics() + co.GetCluster().(*RaftCluster).collectClusterMetrics() + } + stores := co.GetCluster().GetStores() + regionStats := co.GetCluster().RegionWriteStats() + status1 := statistics.CollectHotPeerInfos(stores, regionStats) + status2 := statistics.GetHotStatus(stores, co.GetCluster().GetStoresLoads(), regionStats, statistics.Write, co.GetCluster().GetSchedulerConfig().IsTraceRegionFlow()) + for _, s := range status2.AsLeader { + s.Stats = nil + } + for _, s := range status2.AsPeer { + s.Stats = nil + } + re.Equal(status1, status2) + co.ResetHotSpotMetrics() + sc.ResetSchedulerMetrics() + co.GetCluster().(*RaftCluster).resetClusterMetrics() +} + +func prepare(setCfg func(*config.ScheduleConfig), setTc func(*testCluster), run func(*schedule.Coordinator), re *require.Assertions) (*testCluster, *schedule.Coordinator, func()) { + ctx, cancel := context.WithCancel(context.Background()) + cfg, opt, err := newTestScheduleConfig() + re.NoError(err) + if setCfg != nil { + setCfg(cfg) + } + tc := newTestCluster(ctx, opt) + hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */) + if setTc != nil { + setTc(tc) + } + co := schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + if run != nil { + run(co) + } + return tc, co, func() { + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + hbStreams.Close() + cancel() + } +} + +func checkRegionAndOperator(re *require.Assertions, tc *testCluster, co *schedule.Coordinator, regionID uint64, expectAddOperator int) { + ops := co.GetCheckerController().CheckRegion(tc.GetRegion(regionID)) + if ops == nil { + re.Equal(0, expectAddOperator) + } else { + re.Equal(expectAddOperator, co.GetOperatorController().AddWaitingOperator(ops...)) + } +} + +func TestCheckRegion(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(nil, nil, nil, re) + hbStreams, opt := co.GetHeartbeatStreams(), tc.opt + defer cleanup() + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addLeaderRegion(1, 2, 3)) + checkRegionAndOperator(re, tc, co, 1, 1) + operatorutil.CheckAddPeer(re, co.GetOperatorController().GetOperator(1), operator.OpReplica, 1) + checkRegionAndOperator(re, tc, co, 1, 0) + + r := tc.GetRegion(1) + p := &metapb.Peer{Id: 1, StoreId: 1, Role: metapb.PeerRole_Learner} + r = r.Clone( + core.WithAddPeer(p), + core.WithPendingPeers(append(r.GetPendingPeers(), p)), + ) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 0) + + tc = newTestCluster(ctx, opt) + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 0) + r = r.Clone(core.WithPendingPeers(nil)) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 1) + op := co.GetOperatorController().GetOperator(1) + re.Equal(1, op.Len()) + re.Equal(uint64(1), op.Step(0).(operator.PromoteLearner).ToStore) + checkRegionAndOperator(re, tc, co, 1, 0) +} + +func TestCheckRegionWithScheduleDeny(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addLeaderRegion(1, 2, 3)) + region := tc.GetRegion(1) + re.NotNil(region) + // test with label schedule=deny + labelerManager := tc.GetRegionLabeler() + labelerManager.SetLabelRule(&labeler.LabelRule{ + ID: "schedulelabel", + Labels: []labeler.RegionLabel{{Key: "schedule", Value: "deny"}}, + RuleType: labeler.KeyRange, + Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}}, + }) + + // should allow to do rule checker + re.True(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 1, 1) + + // should not allow to merge + tc.opt.SetSplitMergeInterval(time.Duration(0)) + re.NoError(tc.addLeaderRegion(2, 2, 3, 4)) + re.NoError(tc.addLeaderRegion(3, 2, 3, 4)) + region = tc.GetRegion(2) + re.True(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 2, 0) + // delete label rule, should allow to do merge + labelerManager.DeleteLabelRule("schedulelabel") + re.False(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 2, 2) +} + +func TestCheckerIsBusy(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { + cfg.ReplicaScheduleLimit = 0 // ensure replica checker is busy + cfg.MergeScheduleLimit = 10 + }, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 0)) + num := 1 + typeutil.MaxUint64(tc.opt.GetReplicaScheduleLimit(), tc.opt.GetMergeScheduleLimit()) + var operatorKinds = []operator.OpKind{ + operator.OpReplica, operator.OpRegion | operator.OpMerge, + } + for i, operatorKind := range operatorKinds { + for j := uint64(0); j < num; j++ { + regionID := j + uint64(i+1)*num + re.NoError(tc.addLeaderRegion(regionID, 1)) + switch operatorKind { + case operator.OpReplica: + op := newTestOperator(regionID, tc.GetRegion(regionID).GetRegionEpoch(), operatorKind) + re.Equal(1, co.GetOperatorController().AddWaitingOperator(op)) + case operator.OpRegion | operator.OpMerge: + if regionID%2 == 1 { + ops, err := operator.CreateMergeRegionOperator("merge-region", co.GetCluster(), tc.GetRegion(regionID), tc.GetRegion(regionID-1), operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + } + } + } + } + checkRegionAndOperator(re, tc, co, num, 0) +} + +func TestMergeRegionCancelOneOperator(t *testing.T) { + re := require.New(t) + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + source := core.NewRegionInfo( + &metapb.Region{ + Id: 1, + StartKey: []byte(""), + EndKey: []byte("a"), + }, + nil, + ) + target := core.NewRegionInfo( + &metapb.Region{ + Id: 2, + StartKey: []byte("a"), + EndKey: []byte("t"), + }, + nil, + ) + re.NoError(tc.putRegion(source)) + re.NoError(tc.putRegion(target)) + + // Cancel source region. + ops, err := operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + // Cancel source operator. + co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(source.GetID())) + re.Len(co.GetOperatorController().GetOperators(), 0) + + // Cancel target region. + ops, err = operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + // Cancel target operator. + co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(target.GetID())) + re.Len(co.GetOperatorController().GetOperators(), 0) +} + +func TestReplica(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { + // Turn off balance. + cfg.LeaderScheduleLimit = 0 + cfg.RegionScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(4, 4)) + + stream := mockhbstream.NewHeartbeatStream() + + // Add peer to store 1. + re.NoError(tc.addLeaderRegion(1, 2, 3)) + region := tc.GetRegion(1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Peer in store 3 is down, remove peer in store 3 and add peer to store 4. + re.NoError(tc.setStoreDown(3)) + downPeer := &pdpb.PeerStats{ + Peer: region.GetStorePeer(3), + DownSeconds: 24 * 60 * 60, + } + region = region.Clone( + core.WithDownPeers(append(region.GetDownPeers(), downPeer)), + ) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 4) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 4) + region = region.Clone(core.WithDownPeers(nil)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Remove peer from store 4. + re.NoError(tc.addLeaderRegion(2, 1, 2, 3, 4)) + region = tc.GetRegion(2) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitRemovePeer(re, stream, region, 4) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Remove offline peer directly when it's pending. + re.NoError(tc.addLeaderRegion(3, 1, 2, 3)) + re.NoError(tc.setStoreOffline(3)) + region = tc.GetRegion(3) + region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetStorePeer(3)})) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func TestCheckCache(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { + // Turn off replica scheduling. + cfg.ReplicaScheduleLimit = 0 + }, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 0)) + re.NoError(tc.addRegionStore(2, 0)) + re.NoError(tc.addRegionStore(3, 0)) + + // Add a peer with two replicas. + re.NoError(tc.addLeaderRegion(1, 2, 3)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/break-patrol", `return`)) + + // case 1: operator cannot be created due to replica-schedule-limit restriction + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(co.GetCheckerController().GetWaitingRegions(), 1) + + // cancel the replica-schedule-limit restriction + cfg := tc.GetScheduleConfig() + cfg.ReplicaScheduleLimit = 10 + tc.SetScheduleConfig(cfg) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + oc := co.GetOperatorController() + re.Len(oc.GetOperators(), 1) + re.Empty(co.GetCheckerController().GetWaitingRegions()) + + // case 2: operator cannot be created due to store limit restriction + oc.RemoveOperator(oc.GetOperator(1)) + tc.SetStoreLimit(1, storelimit.AddPeer, 0) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(co.GetCheckerController().GetWaitingRegions(), 1) + + // cancel the store limit restriction + tc.SetStoreLimit(1, storelimit.AddPeer, 10) + time.Sleep(time.Second) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(oc.GetOperators(), 1) + re.Empty(co.GetCheckerController().GetWaitingRegions()) + + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/break-patrol")) +} + +func TestPeerState(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + // Transfer peer from store 4 to store 1. + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addRegionStore(2, 10)) + re.NoError(tc.addRegionStore(3, 10)) + re.NoError(tc.addRegionStore(4, 40)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + + stream := mockhbstream.NewHeartbeatStream() + + // Wait for schedule. + waitOperator(re, co, 1) + operatorutil.CheckTransferPeer(re, co.GetOperatorController().GetOperator(1), operator.OpKind(0), 4, 1) + + region := tc.GetRegion(1).Clone() + + // Add new peer. + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + + // If the new peer is pending, the operator will not finish. + region = region.Clone(core.WithPendingPeers(append(region.GetPendingPeers(), region.GetStorePeer(1)))) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + re.NotNil(co.GetOperatorController().GetOperator(region.GetID())) + + // The new peer is not pending now, the operator will finish. + // And we will proceed to remove peer in store 4. + region = region.Clone(core.WithPendingPeers(nil)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitRemovePeer(re, stream, region, 4) + re.NoError(tc.addLeaderRegion(1, 1, 2, 3)) + region = tc.GetRegion(1).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func TestShouldRun(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + tc.RaftCluster.coordinator = co + defer cleanup() + + re.NoError(tc.addLeaderStore(1, 5)) + re.NoError(tc.addLeaderStore(2, 2)) + re.NoError(tc.addLeaderStore(3, 0)) + re.NoError(tc.addLeaderStore(4, 0)) + re.NoError(tc.LoadRegion(1, 1, 2, 3)) + re.NoError(tc.LoadRegion(2, 1, 2, 3)) + re.NoError(tc.LoadRegion(3, 1, 2, 3)) + re.NoError(tc.LoadRegion(4, 1, 2, 3)) + re.NoError(tc.LoadRegion(5, 1, 2, 3)) + re.NoError(tc.LoadRegion(6, 2, 1, 4)) + re.NoError(tc.LoadRegion(7, 2, 1, 4)) + re.False(co.ShouldRun()) + re.Equal(2, tc.GetStoreRegionCount(4)) + + testCases := []struct { + regionID uint64 + ShouldRun bool + }{ + {1, false}, + {2, false}, + {3, false}, + {4, false}, + {5, false}, + // store4 needs Collect two region + {6, false}, + {7, true}, + } + + for _, testCase := range testCases { + r := tc.GetRegion(testCase.regionID) + nr := r.Clone(core.WithLeader(r.GetPeers()[0])) + re.NoError(tc.processRegionHeartbeat(nr)) + re.Equal(testCase.ShouldRun, co.ShouldRun()) + } + nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} + newRegion := core.NewRegionInfo(nr, nil) + re.Error(tc.processRegionHeartbeat(newRegion)) + re.Equal(7, co.GetPrepareChecker().GetSum()) +} + +func TestShouldRunWithNonLeaderRegions(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + tc.RaftCluster.coordinator = co + defer cleanup() + + re.NoError(tc.addLeaderStore(1, 10)) + re.NoError(tc.addLeaderStore(2, 0)) + re.NoError(tc.addLeaderStore(3, 0)) + for i := 0; i < 10; i++ { + re.NoError(tc.LoadRegion(uint64(i+1), 1, 2, 3)) + } + re.False(co.ShouldRun()) + re.Equal(10, tc.GetStoreRegionCount(1)) + + testCases := []struct { + regionID uint64 + ShouldRun bool + }{ + {1, false}, + {2, false}, + {3, false}, + {4, false}, + {5, false}, + {6, false}, + {7, false}, + {8, false}, + {9, true}, + } + + for _, testCase := range testCases { + r := tc.GetRegion(testCase.regionID) + nr := r.Clone(core.WithLeader(r.GetPeers()[0])) + re.NoError(tc.processRegionHeartbeat(nr)) + re.Equal(testCase.ShouldRun, co.ShouldRun()) + } + nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} + newRegion := core.NewRegionInfo(nr, nil) + re.Error(tc.processRegionHeartbeat(newRegion)) + re.Equal(9, co.GetPrepareChecker().GetSum()) + + // Now, after server is prepared, there exist some regions with no leader. + re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId()) +} + +func TestAddScheduler(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + sc := co.GetSchedulersController() + re.Len(sc.GetSchedulerNames(), len(config.DefaultSchedulers)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(sc.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceWitnessName)) + re.NoError(sc.RemoveScheduler(schedulers.TransferWitnessLeaderName)) + re.Empty(sc.GetSchedulerNames()) + + stream := mockhbstream.NewHeartbeatStream() + + // Add stores 1,2,3 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + re.NoError(tc.addLeaderStore(3, 1)) + // Add regions 1 with leader in store 1 and followers in stores 2,3 + re.NoError(tc.addLeaderRegion(1, 1, 2, 3)) + // Add regions 2 with leader in store 2 and followers in stores 1,3 + re.NoError(tc.addLeaderRegion(2, 2, 1, 3)) + // Add regions 3 with leader in store 3 and followers in stores 1,2 + re.NoError(tc.addLeaderRegion(3, 3, 1, 2)) + + oc := co.GetOperatorController() + + // test ConfigJSONDecoder create + bl, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) + re.NoError(err) + conf, err := bl.EncodeConfig() + re.NoError(err) + data := make(map[string]interface{}) + err = json.Unmarshal(conf, &data) + re.NoError(err) + batch := data["batch"].(float64) + re.Equal(4, int(batch)) + gls, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"0"}), sc.RemoveScheduler) + re.NoError(err) + re.NotNil(sc.AddScheduler(gls)) + re.NotNil(sc.RemoveScheduler(gls.GetName())) + + gls, err = schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), sc.RemoveScheduler) + re.NoError(err) + re.NoError(sc.AddScheduler(gls)) + + hb, err := schedulers.CreateScheduler(schedulers.HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) + re.NoError(err) + conf, err = hb.EncodeConfig() + re.NoError(err) + data = make(map[string]interface{}) + re.NoError(json.Unmarshal(conf, &data)) + re.Contains(data, "enable-for-tiflash") + re.Equal("true", data["enable-for-tiflash"].(string)) + + // Transfer all leaders to store 1. + waitOperator(re, co, 2) + region2 := tc.GetRegion(2) + re.NoError(dispatchHeartbeat(co, region2, stream)) + region2 = waitTransferLeader(re, stream, region2, 1) + re.NoError(dispatchHeartbeat(co, region2, stream)) + waitNoResponse(re, stream) + + waitOperator(re, co, 3) + region3 := tc.GetRegion(3) + re.NoError(dispatchHeartbeat(co, region3, stream)) + region3 = waitTransferLeader(re, stream, region3, 1) + re.NoError(dispatchHeartbeat(co, region3, stream)) + waitNoResponse(re, stream) +} + +func TestPersistScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + defaultCount := len(config.DefaultSchedulers) + // Add stores 1,2 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + + sc := co.GetSchedulersController() + re.Len(sc.GetSchedulerNames(), defaultCount) + oc := co.GetOperatorController() + storage := tc.RaftCluster.storage + + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), sc.RemoveScheduler) + re.NoError(err) + re.NoError(sc.AddScheduler(gls1, "1")) + evict, err := schedulers.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"}), sc.RemoveScheduler) + re.NoError(err) + re.NoError(sc.AddScheduler(evict, "2")) + re.Len(sc.GetSchedulerNames(), defaultCount+2) + sches, _, err := storage.LoadAllScheduleConfig() + re.NoError(err) + re.Len(sches, defaultCount+2) + + // remove 5 schedulers + re.NoError(sc.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(sc.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceWitnessName)) + re.NoError(sc.RemoveScheduler(schedulers.TransferWitnessLeaderName)) + re.Len(sc.GetSchedulerNames(), defaultCount-3) + re.NoError(co.GetCluster().GetPersistOptions().Persist(storage)) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + // make a new coordinator for testing + // whether the schedulers added or removed in dynamic way are recorded in opt + _, newOpt, err := newTestScheduleConfig() + re.NoError(err) + _, err = schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) + re.NoError(err) + // suppose we add a new default enable scheduler + config.DefaultSchedulers = append(config.DefaultSchedulers, config.SchedulerConfig{Type: "shuffle-region"}) + defer func() { + config.DefaultSchedulers = config.DefaultSchedulers[:len(config.DefaultSchedulers)-1] + }() + re.Len(newOpt.GetSchedulers(), defaultCount) + re.NoError(newOpt.Reload(storage)) + // only remains 3 items with independent config. + sches, _, err = storage.LoadAllScheduleConfig() + re.NoError(err) + re.Len(sches, 3) + + // option have 6 items because the default scheduler do not remove. + re.Len(newOpt.GetSchedulers(), defaultCount+3) + re.NoError(newOpt.Persist(storage)) + tc.RaftCluster.opt = newOpt + + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + sc = co.GetSchedulersController() + re.Len(sc.GetSchedulerNames(), 3) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + // suppose restart PD again + _, newOpt, err = newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(storage)) + tc.RaftCluster.opt = newOpt + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + sc = co.GetSchedulersController() + re.Len(sc.GetSchedulerNames(), 3) + bls, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + re.NoError(sc.AddScheduler(bls)) + brs, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + re.NoError(sc.AddScheduler(brs)) + re.Len(sc.GetSchedulerNames(), defaultCount) + + // the scheduler option should contain 6 items + // the `hot scheduler` are disabled + re.Len(co.GetCluster().GetPersistOptions().GetSchedulers(), defaultCount+3) + re.NoError(sc.RemoveScheduler(schedulers.GrantLeaderName)) + // the scheduler that is not enable by default will be completely deleted + re.Len(co.GetCluster().GetPersistOptions().GetSchedulers(), defaultCount+2) + re.Len(sc.GetSchedulerNames(), 4) + re.NoError(co.GetCluster().GetPersistOptions().Persist(co.GetCluster().GetStorage())) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + _, newOpt, err = newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(co.GetCluster().GetStorage())) + tc.RaftCluster.opt = newOpt + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + + co.Run() + sc = co.GetSchedulersController() + re.Len(sc.GetSchedulerNames(), defaultCount-1) + re.NoError(sc.RemoveScheduler(schedulers.EvictLeaderName)) + re.Len(sc.GetSchedulerNames(), defaultCount-2) +} + +func TestRemoveScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { + cfg.ReplicaScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + + // Add stores 1,2 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + defaultCount := len(config.DefaultSchedulers) + sc := co.GetSchedulersController() + re.Len(sc.GetSchedulerNames(), defaultCount) + oc := co.GetOperatorController() + storage := tc.RaftCluster.storage + + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), sc.RemoveScheduler) + re.NoError(err) + re.NoError(sc.AddScheduler(gls1, "1")) + re.Len(sc.GetSchedulerNames(), defaultCount+1) + sches, _, err := storage.LoadAllScheduleConfig() + re.NoError(err) + re.Len(sches, defaultCount+1) + + // remove all schedulers + re.NoError(sc.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(sc.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(sc.RemoveScheduler(schedulers.GrantLeaderName)) + re.NoError(sc.RemoveScheduler(schedulers.BalanceWitnessName)) + re.NoError(sc.RemoveScheduler(schedulers.TransferWitnessLeaderName)) + // all removed + sches, _, err = storage.LoadAllScheduleConfig() + re.NoError(err) + re.Empty(sches) + re.Empty(sc.GetSchedulerNames()) + re.NoError(co.GetCluster().GetPersistOptions().Persist(co.GetCluster().GetStorage())) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + + // suppose restart PD again + _, newOpt, err := newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(tc.storage)) + tc.RaftCluster.opt = newOpt + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + re.Empty(sc.GetSchedulerNames()) + // the option remains default scheduler + re.Len(co.GetCluster().GetPersistOptions().GetSchedulers(), defaultCount) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() +} + +func TestRestart(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { + // Turn off balance, we test add replica only. + cfg.LeaderScheduleLimit = 0 + cfg.RegionScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + + // Add 3 stores (1, 2, 3) and a region with 1 replica on store 1. + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addLeaderRegion(1, 1)) + region := tc.GetRegion(1) + co.GetPrepareChecker().Collect(region) + + // Add 1 replica on store 2. + stream := mockhbstream.NewHeartbeatStream() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 2) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 2) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + + // Recreate coordinator then add another replica on store 3. + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.GetPrepareChecker().Collect(region) + co.Run() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 3) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitPromoteLearner(re, stream, region, 3) +} + +func TestPauseScheduler(t *testing.T) { + re := require.New(t) + + _, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + sc := co.GetSchedulersController() + _, err := sc.IsSchedulerAllowed("test") + re.Error(err) + sc.PauseOrResumeScheduler(schedulers.BalanceLeaderName, 60) + paused, _ := sc.IsSchedulerPaused(schedulers.BalanceLeaderName) + re.True(paused) + pausedAt, err := sc.GetPausedSchedulerDelayAt(schedulers.BalanceLeaderName) + re.NoError(err) + resumeAt, err := sc.GetPausedSchedulerDelayUntil(schedulers.BalanceLeaderName) + re.NoError(err) + re.Equal(int64(60), resumeAt-pausedAt) + allowed, _ := sc.IsSchedulerAllowed(schedulers.BalanceLeaderName) + re.False(allowed) +} + +func BenchmarkPatrolRegion(b *testing.B) { + re := require.New(b) + + mergeLimit := uint64(4100) + regionNum := 10000 + + tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { + cfg.MergeScheduleLimit = mergeLimit + }, nil, nil, re) + defer cleanup() + + tc.opt.SetSplitMergeInterval(time.Duration(0)) + for i := 1; i < 4; i++ { + if err := tc.addRegionStore(uint64(i), regionNum, 96); err != nil { + return + } + } + for i := 0; i < regionNum; i++ { + if err := tc.addLeaderRegion(uint64(i), 1, 2, 3); err != nil { + return + } + } + + listen := make(chan int) + go func() { + oc := co.GetOperatorController() + listen <- 0 + for { + if oc.OperatorCount(operator.OpMerge) == mergeLimit { + co.Stop() + return + } + } + }() + <-listen + + co.GetWaitGroup().Add(1) + b.ResetTimer() + co.PatrolRegions() +} + +func waitOperator(re *require.Assertions, co *schedule.Coordinator, regionID uint64) { + testutil.Eventually(re, func() bool { + return co.GetOperatorController().GetOperator(regionID) != nil + }) +} + +func TestOperatorCount(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + re.Equal(uint64(0), oc.OperatorCount(operator.OpLeader)) + re.Equal(uint64(0), oc.OperatorCount(operator.OpRegion)) + + re.NoError(tc.addLeaderRegion(1, 1)) + re.NoError(tc.addLeaderRegion(2, 2)) + { + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) // 1:leader + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op2) + re.Equal(uint64(2), oc.OperatorCount(operator.OpLeader)) // 1:leader, 2:leader + re.True(oc.RemoveOperator(op1)) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) // 2:leader + } + + { + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpRegion)) // 1:region 2:leader + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion) + op2.SetPriorityLevel(constant.High) + oc.AddWaitingOperator(op2) + re.Equal(uint64(2), oc.OperatorCount(operator.OpRegion)) // 1:region 2:region + re.Equal(uint64(0), oc.OperatorCount(operator.OpLeader)) + } +} + +func TestStoreOverloaded(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + opt := tc.GetOpts() + re.NoError(tc.addRegionStore(4, 100)) + re.NoError(tc.addRegionStore(3, 100)) + re.NoError(tc.addRegionStore(2, 100)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + region := tc.GetRegion(1).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + start := time.Now() + { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Len(ops, 1) + op1 := ops[0] + re.NotNil(op1) + re.True(oc.AddOperator(op1)) + re.True(oc.RemoveOperator(op1)) + } + for { + time.Sleep(time.Millisecond * 10) + ops, _ := lb.Schedule(tc, false /* dryRun */) + if time.Since(start) > time.Second { + break + } + re.Empty(ops) + } + + // reset all stores' limit + // scheduling one time needs 1/10 seconds + opt.SetAllStoresLimit(storelimit.AddPeer, 600) + opt.SetAllStoresLimit(storelimit.RemovePeer, 600) + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Len(ops, 1) + op := ops[0] + re.True(oc.AddOperator(op)) + re.True(oc.RemoveOperator(op)) + } + // sleep 1 seconds to make sure that the token is filled up + time.Sleep(time.Second) + for i := 0; i < 100; i++ { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Greater(len(ops), 0) + } +} + +func TestStoreOverloadedWithReplace(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + + re.NoError(tc.addRegionStore(4, 100)) + re.NoError(tc.addRegionStore(3, 100)) + re.NoError(tc.addRegionStore(2, 100)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + re.NoError(tc.addLeaderRegion(2, 1, 3, 4)) + region := tc.GetRegion(1).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + region = tc.GetRegion(2).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 1}) + re.True(oc.AddOperator(op1)) + op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 2}) + op2.SetPriorityLevel(constant.High) + re.True(oc.AddOperator(op2)) + op3 := newTestOperator(1, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 3}) + re.False(oc.AddOperator(op3)) + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Empty(ops) + // sleep 2 seconds to make sure that token is filled up + time.Sleep(2 * time.Second) + ops, _ = lb.Schedule(tc, false /* dryRun */) + re.Greater(len(ops), 0) +} + +func TestDownStoreLimit(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + rc := co.GetCheckerController().GetRuleChecker() + + tc.addRegionStore(1, 100) + tc.addRegionStore(2, 100) + tc.addRegionStore(3, 100) + tc.addLeaderRegion(1, 1, 2, 3) + + region := tc.GetRegion(1) + tc.setStoreDown(1) + tc.SetStoreLimit(1, storelimit.RemovePeer, 1) + + region = region.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: region.GetStorePeer(1), + DownSeconds: 24 * 60 * 60, + }, + }), core.SetApproximateSize(1)) + tc.putRegion(region) + for i := uint64(1); i < 20; i++ { + tc.addRegionStore(i+3, 100) + op := rc.Check(region) + re.NotNil(op) + re.True(oc.AddOperator(op)) + oc.RemoveOperator(op) + } + + region = region.Clone(core.SetApproximateSize(100)) + tc.putRegion(region) + for i := uint64(20); i < 25; i++ { + tc.addRegionStore(i+3, 100) + op := rc.Check(region) + re.NotNil(op) + re.True(oc.AddOperator(op)) + oc.RemoveOperator(op) + } +} + +// FIXME: remove after move into schedulers package +type mockLimitScheduler struct { + schedulers.Scheduler + limit uint64 + counter *operator.Controller + kind operator.OpKind +} + +func (s *mockLimitScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + return s.counter.OperatorCount(s.kind) < s.limit +} + +func TestController(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + + re.NoError(tc.addLeaderRegion(1, 1)) + re.NoError(tc.addLeaderRegion(2, 2)) + scheduler, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + lb := &mockLimitScheduler{ + Scheduler: scheduler, + counter: oc, + kind: operator.OpLeader, + } + + sc := schedulers.NewScheduleController(tc.ctx, co.GetCluster(), co.GetOperatorController(), lb) + + for i := schedulers.MinScheduleInterval; sc.GetInterval() != schedulers.MaxScheduleInterval; i = sc.GetNextInterval(i) { + re.Equal(i, sc.GetInterval()) + re.Empty(sc.Schedule(false)) + } + // limit = 2 + lb.limit = 2 + // count = 0 + { + re.True(sc.AllowSchedule(false)) + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op1)) + // count = 1 + re.True(sc.AllowSchedule(false)) + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op2)) + // count = 2 + re.False(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op1)) + // count = 1 + re.True(sc.AllowSchedule(false)) + } + + op11 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + // add a PriorityKind operator will remove old operator + { + op3 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpHotRegion) + op3.SetPriorityLevel(constant.High) + re.Equal(1, oc.AddWaitingOperator(op11)) + re.False(sc.AllowSchedule(false)) + re.Equal(1, oc.AddWaitingOperator(op3)) + re.True(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op3)) + } + + // add a admin operator will remove old operator + { + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op2)) + re.False(sc.AllowSchedule(false)) + op4 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpAdmin) + op4.SetPriorityLevel(constant.High) + re.Equal(1, oc.AddWaitingOperator(op4)) + re.True(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op4)) + } + + // test wrong region id. + { + op5 := newTestOperator(3, &metapb.RegionEpoch{}, operator.OpHotRegion) + re.Equal(0, oc.AddWaitingOperator(op5)) + } + + // test wrong region epoch. + re.True(oc.RemoveOperator(op11)) + epoch := &metapb.RegionEpoch{ + Version: tc.GetRegion(1).GetRegionEpoch().GetVersion() + 1, + ConfVer: tc.GetRegion(1).GetRegionEpoch().GetConfVer(), + } + { + op6 := newTestOperator(1, epoch, operator.OpLeader) + re.Equal(0, oc.AddWaitingOperator(op6)) + } + epoch.Version-- + { + op6 := newTestOperator(1, epoch, operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op6)) + re.True(oc.RemoveOperator(op6)) + } +} + +func TestInterval(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + lb, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, co.GetOperatorController(), storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + sc := schedulers.NewScheduleController(tc.ctx, co.GetCluster(), co.GetOperatorController(), lb) + + // If no operator for x seconds, the next check should be in x/2 seconds. + idleSeconds := []int{5, 10, 20, 30, 60} + for _, n := range idleSeconds { + sc.SetInterval(schedulers.MinScheduleInterval) + for totalSleep := time.Duration(0); totalSleep <= time.Second*time.Duration(n); totalSleep += sc.GetInterval() { + re.Empty(sc.Schedule(false)) + } + re.Less(sc.GetInterval(), time.Second*time.Duration(n/2)) + } +} + +func waitAddLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv(); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddLearnerNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + return region.Clone( + core.WithAddPeer(res.GetChangePeer().GetPeer()), + core.WithIncConfVer(), + ) +} + +func waitPromoteLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv(); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + // Remove learner than add voter. + return region.Clone( + core.WithRemoveStorePeer(storeID), + core.WithAddPeer(res.GetChangePeer().GetPeer()), + ) +} + +func waitRemovePeer(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv(); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_RemoveNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + return region.Clone( + core.WithRemoveStorePeer(storeID), + core.WithIncConfVer(), + ) +} + +func waitTransferLeader(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv(); res != nil { + if res.GetRegionId() == region.GetID() { + for _, peer := range append(res.GetTransferLeader().GetPeers(), res.GetTransferLeader().GetPeer()) { + if peer.GetStoreId() == storeID { + return true + } + } + } + } + return false + }) + return region.Clone( + core.WithLeader(region.GetStorePeer(storeID)), + ) +} + +func waitNoResponse(re *require.Assertions, stream mockhbstream.HeartbeatStream) { + testutil.Eventually(re, func() bool { + res := stream.Recv() + return res == nil + }) +} +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)) diff --git a/server/handler.go b/server/handler.go index 39303f4fc45..1ba3e63aa86 100644 --- a/server/handler.go +++ b/server/handler.go @@ -861,15 +861,6 @@ func (h *Handler) GetSchedulerConfigHandler() http.Handler { return mux } -// GetOfflinePeer gets the region with offline peer. -func (h *Handler) GetOfflinePeer(typ statistics.RegionStatisticType) ([]*core.RegionInfo, error) { - c := h.s.GetRaftCluster() - if c == nil { - return nil, errs.ErrNotBootstrapped.FastGenByArgs() - } - return c.GetOfflineRegionStatsByType(typ), nil -} - // ResetTS resets the ts with specified tso. func (h *Handler) ResetTS(ts uint64) error { tsoAllocator, err := h.s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation) diff --git a/server/statistics/metrics.go b/server/statistics/metrics.go index 7a152190615..7f6bd45b386 100644 --- a/server/statistics/metrics.go +++ b/server/statistics/metrics.go @@ -41,14 +41,6 @@ var ( Help: "Status of the regions.", }, []string{"type"}) - offlineRegionStatusGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "pd", - Subsystem: "regions", - Name: "offline_status", - Help: "Status of the offline regions.", - }, []string{"type"}) - clusterStatusGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", @@ -182,7 +174,6 @@ func init() { prometheus.MustRegister(hotCacheStatusGauge) prometheus.MustRegister(storeStatusGauge) prometheus.MustRegister(regionStatusGauge) - prometheus.MustRegister(offlineRegionStatusGauge) prometheus.MustRegister(clusterStatusGauge) prometheus.MustRegister(placementStatusGauge) prometheus.MustRegister(configStatusGauge) diff --git a/server/statistics/region_collection.go b/server/statistics/region_collection.go index 8a6322942c0..90938eae4b7 100644 --- a/server/statistics/region_collection.go +++ b/server/statistics/region_collection.go @@ -23,6 +23,12 @@ import ( "github.com/tikv/pd/server/schedule/placement" ) +// RegionInfoProvider is an interface to provide the region information. +type RegionInfoProvider interface { + // GetRegion returns the region information according to the given region ID. + GetRegion(regionID uint64) *core.RegionInfo +} + // RegionStatisticType represents the type of the region's status. type RegionStatisticType uint32 @@ -37,17 +43,51 @@ const ( EmptyRegion ) +var regionStatisticTypes = []RegionStatisticType{ + MissPeer, + ExtraPeer, + DownPeer, + PendingPeer, + OfflinePeer, + LearnerPeer, + EmptyRegion, + OversizedRegion, + UndersizedRegion, + WitnessLeader, +} + const nonIsolation = "none" +<<<<<<< HEAD:server/statistics/region_collection.go // RegionInfo is used to record the status of region. type RegionInfo struct { *core.RegionInfo +======= +var ( + // WithLabelValues is a heavy operation, define variable to avoid call it every time. + regionMissPeerRegionCounter = regionStatusGauge.WithLabelValues("miss-peer-region-count") + regionExtraPeerRegionCounter = regionStatusGauge.WithLabelValues("extra-peer-region-count") + regionDownPeerRegionCounter = regionStatusGauge.WithLabelValues("down-peer-region-count") + regionPendingPeerRegionCounter = regionStatusGauge.WithLabelValues("pending-peer-region-count") + regionOfflinePeerRegionCounter = regionStatusGauge.WithLabelValues("offline-peer-region-count") + regionLearnerPeerRegionCounter = regionStatusGauge.WithLabelValues("learner-peer-region-count") + regionEmptyRegionCounter = regionStatusGauge.WithLabelValues("empty-region-count") + regionOversizedRegionCounter = regionStatusGauge.WithLabelValues("oversized-region-count") + regionUndersizedRegionCounter = regionStatusGauge.WithLabelValues("undersized-region-count") + regionWitnessLeaderRegionCounter = regionStatusGauge.WithLabelValues("witness-leader-region-count") +) + +// RegionInfoWithTS is used to record the extra timestamp status of a region. +type RegionInfoWithTS struct { + id uint64 +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go startMissVoterPeerTS int64 startDownPeerTS int64 } // RegionStatistics is used to record the status of regions. type RegionStatistics struct { +<<<<<<< HEAD:server/statistics/region_collection.go opt *config.PersistOptions stats map[RegionStatisticType]map[uint64]*RegionInfo offlineStats map[RegionStatisticType]map[uint64]*core.RegionInfo @@ -80,18 +120,57 @@ func NewRegionStatistics(opt *config.PersistOptions, ruleManager *placement.Rule r.offlineStats[EmptyRegion] = make(map[uint64]*core.RegionInfo) r.offlineStats[OfflinePeer] = make(map[uint64]*core.RegionInfo) r.ruleManager = ruleManager +======= + sync.RWMutex + rip RegionInfoProvider + conf sc.CheckerConfig + stats map[RegionStatisticType]map[uint64]*RegionInfoWithTS + index map[uint64]RegionStatisticType + ruleManager *placement.RuleManager + storeConfigManager *config.StoreConfigManager +} + +// NewRegionStatistics creates a new RegionStatistics. +func NewRegionStatistics( + rip RegionInfoProvider, + conf sc.CheckerConfig, + ruleManager *placement.RuleManager, + storeConfigManager *config.StoreConfigManager, +) *RegionStatistics { + r := &RegionStatistics{ + rip: rip, + conf: conf, + ruleManager: ruleManager, + storeConfigManager: storeConfigManager, + stats: make(map[RegionStatisticType]map[uint64]*RegionInfoWithTS), + index: make(map[uint64]RegionStatisticType), + } + for _, typ := range regionStatisticTypes { + r.stats[typ] = make(map[uint64]*RegionInfoWithTS) + } +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go return r } // GetRegionStatsByType gets the status of the region by types. +<<<<<<< HEAD:server/statistics/region_collection.go +======= +// The regions here need to be cloned, otherwise, it may cause data race problems. +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go func (r *RegionStatistics) GetRegionStatsByType(typ RegionStatisticType) []*core.RegionInfo { res := make([]*core.RegionInfo, 0, len(r.stats[typ])) +<<<<<<< HEAD:server/statistics/region_collection.go for _, r := range r.stats[typ] { res = append(res, r.RegionInfo) +======= + for regionID := range r.stats[typ] { + res = append(res, r.rip.GetRegion(regionID).Clone()) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go } return res } +<<<<<<< HEAD:server/statistics/region_collection.go // GetOfflineRegionStatsByType gets the status of the offline region by types. func (r *RegionStatistics) GetOfflineRegionStatsByType(typ RegionStatisticType) []*core.RegionInfo { res := make([]*core.RegionInfo, 0, len(r.stats[typ])) @@ -99,6 +178,14 @@ func (r *RegionStatistics) GetOfflineRegionStatsByType(typ RegionStatisticType) res = append(res, r) } return res +======= +// IsRegionStatsType returns whether the status of the region is the given type. +func (r *RegionStatistics) IsRegionStatsType(regionID uint64, typ RegionStatisticType) bool { + r.RLock() + defer r.RUnlock() + _, exist := r.stats[typ][regionID] + return exist +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go } func (r *RegionStatistics) deleteEntry(deleteIndex RegionStatisticType, regionID uint64) { @@ -109,26 +196,50 @@ func (r *RegionStatistics) deleteEntry(deleteIndex RegionStatisticType, regionID } } +<<<<<<< HEAD:server/statistics/region_collection.go func (r *RegionStatistics) deleteOfflineEntry(deleteIndex RegionStatisticType, regionID uint64) { for typ := RegionStatisticType(1); typ <= deleteIndex; typ <<= 1 { if deleteIndex&typ != 0 { delete(r.offlineStats[typ], regionID) } } +======= +// RegionStatsNeedUpdate checks whether the region's status need to be updated +// due to some special state types. +func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool { + regionID := region.GetID() + if r.IsRegionStatsType(regionID, OversizedRegion) != + region.IsOversized(int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxSize()), int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxKeys())) { + return true + } + return r.IsRegionStatsType(regionID, UndersizedRegion) != + region.NeedMerge(int64(r.conf.GetMaxMergeRegionSize()), int64(r.conf.GetMaxMergeRegionKeys())) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go } // Observe records the current regions' status. func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo) { +<<<<<<< HEAD:server/statistics/region_collection.go // Region state. regionID := region.GetID() +======= + r.Lock() + defer r.Unlock() +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go var ( - peerTypeIndex RegionStatisticType - offlinePeerTypeIndex RegionStatisticType - deleteIndex RegionStatisticType + desiredReplicas = r.conf.GetMaxReplicas() + desiredVoters = desiredReplicas + peerTypeIndex RegionStatisticType + deleteIndex RegionStatisticType ) +<<<<<<< HEAD:server/statistics/region_collection.go desiredReplicas := r.opt.GetMaxReplicas() desiredVoters := desiredReplicas if r.opt.IsPlacementRulesEnabled() { +======= + // Check if the region meets count requirements of its rules. + if r.conf.IsPlacementRulesEnabled() { +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go if !r.ruleManager.IsInitialized() { log.Warn("ruleManager haven't been initialized") return @@ -143,6 +254,7 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store } } } +<<<<<<< HEAD:server/statistics/region_collection.go var isOffline bool @@ -156,26 +268,44 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store } } +======= + // Better to make sure once any of these conditions changes, it will trigger the heartbeat `save_cache`. + // Otherwise, the state may be out-of-date for a long time, which needs another way to apply the change ASAP. + // For example, see `RegionStatsNeedUpdate` above to know how `OversizedRegion` and `UndersizedRegion` are updated. +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go conditions := map[RegionStatisticType]bool{ MissPeer: len(region.GetPeers()) < desiredReplicas, ExtraPeer: len(region.GetPeers()) > desiredReplicas, DownPeer: len(region.GetDownPeers()) > 0, PendingPeer: len(region.GetPendingPeers()) > 0, + OfflinePeer: func() bool { + for _, store := range stores { + if store.IsRemoving() { + peer := region.GetStorePeer(store.GetID()) + if peer != nil { + return true + } + } + } + return false + }(), LearnerPeer: len(region.GetLearners()) > 0, EmptyRegion: region.GetApproximateSize() <= core.EmptyRegionApproximateSize, } - + // Check if the region meets any of the conditions and update the corresponding info. + regionID := region.GetID() for typ, c := range conditions { if c { +<<<<<<< HEAD:server/statistics/region_collection.go if isOffline { r.offlineStats[typ][regionID] = region offlinePeerTypeIndex |= typ } +======= +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go info := r.stats[typ][regionID] if info == nil { - info = &RegionInfo{ - RegionInfo: region, - } + info = &RegionInfoWithTS{id: regionID} } if typ == DownPeer { if info.startDownPeerTS != 0 { @@ -195,6 +325,7 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store peerTypeIndex |= typ } } +<<<<<<< HEAD:server/statistics/region_collection.go if isOffline { r.offlineStats[OfflinePeer][regionID] = region @@ -207,6 +338,9 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store r.deleteOfflineEntry(deleteIndex, regionID) r.offlineIndex[regionID] = offlinePeerTypeIndex +======= + // Remove the info if any of the conditions are not met any more. +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go if oldIndex, ok := r.index[regionID]; ok { deleteIndex = oldIndex &^ peerTypeIndex } @@ -219,13 +353,11 @@ func (r *RegionStatistics) ClearDefunctRegion(regionID uint64) { if oldIndex, ok := r.index[regionID]; ok { r.deleteEntry(oldIndex, regionID) } - if oldIndex, ok := r.offlineIndex[regionID]; ok { - r.deleteOfflineEntry(oldIndex, regionID) - } } // Collect collects the metrics of the regions' status. func (r *RegionStatistics) Collect() { +<<<<<<< HEAD:server/statistics/region_collection.go regionStatusGauge.WithLabelValues("miss-peer-region-count").Set(float64(len(r.stats[MissPeer]))) regionStatusGauge.WithLabelValues("extra-peer-region-count").Set(float64(len(r.stats[ExtraPeer]))) regionStatusGauge.WithLabelValues("down-peer-region-count").Set(float64(len(r.stats[DownPeer]))) @@ -240,12 +372,39 @@ func (r *RegionStatistics) Collect() { offlineRegionStatusGauge.WithLabelValues("learner-peer-region-count").Set(float64(len(r.offlineStats[LearnerPeer]))) offlineRegionStatusGauge.WithLabelValues("empty-region-count").Set(float64(len(r.offlineStats[EmptyRegion]))) offlineRegionStatusGauge.WithLabelValues("offline-peer-region-count").Set(float64(len(r.offlineStats[OfflinePeer]))) +======= + r.RLock() + defer r.RUnlock() + regionMissPeerRegionCounter.Set(float64(len(r.stats[MissPeer]))) + regionExtraPeerRegionCounter.Set(float64(len(r.stats[ExtraPeer]))) + regionDownPeerRegionCounter.Set(float64(len(r.stats[DownPeer]))) + regionPendingPeerRegionCounter.Set(float64(len(r.stats[PendingPeer]))) + regionOfflinePeerRegionCounter.Set(float64(len(r.stats[OfflinePeer]))) + regionLearnerPeerRegionCounter.Set(float64(len(r.stats[LearnerPeer]))) + regionEmptyRegionCounter.Set(float64(len(r.stats[EmptyRegion]))) + regionOversizedRegionCounter.Set(float64(len(r.stats[OversizedRegion]))) + regionUndersizedRegionCounter.Set(float64(len(r.stats[UndersizedRegion]))) + regionWitnessLeaderRegionCounter.Set(float64(len(r.stats[WitnessLeader]))) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go } // Reset resets the metrics of the regions' status. func (r *RegionStatistics) Reset() { +<<<<<<< HEAD:server/statistics/region_collection.go regionStatusGauge.Reset() offlineRegionStatusGauge.Reset() +======= + regionMissPeerRegionCounter.Set(0) + regionExtraPeerRegionCounter.Set(0) + regionDownPeerRegionCounter.Set(0) + regionPendingPeerRegionCounter.Set(0) + regionOfflinePeerRegionCounter.Set(0) + regionLearnerPeerRegionCounter.Set(0) + regionEmptyRegionCounter.Set(0) + regionOversizedRegionCounter.Set(0) + regionUndersizedRegionCounter.Set(0) + regionWitnessLeaderRegionCounter.Set(0) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection.go } // LabelStatistics is the statistics of the level of labels. diff --git a/server/statistics/region_collection_test.go b/server/statistics/region_collection_test.go index 7ec069e810d..7ca6f4d34f3 100644 --- a/server/statistics/region_collection_test.go +++ b/server/statistics/region_collection_test.go @@ -79,6 +79,7 @@ func (t *testRegionStatisticsSuite) TestRegionStatistics(c *C) { r2 := &metapb.Region{Id: 2, Peers: peers[0:2], StartKey: []byte("cc"), EndKey: []byte("dd")} region1 := core.NewRegionInfo(r1, peers[0]) region2 := core.NewRegionInfo(r2, peers[0]) +<<<<<<< HEAD:server/statistics/region_collection_test.go regionStats := NewRegionStatistics(opt, t.manager) regionStats.Observe(region1, stores) c.Assert(regionStats.stats[ExtraPeer], HasLen, 1) @@ -88,6 +89,15 @@ func (t *testRegionStatisticsSuite) TestRegionStatistics(c *C) { c.Assert(regionStats.offlineStats[LearnerPeer], HasLen, 1) c.Assert(regionStats.offlineStats[EmptyRegion], HasLen, 1) c.Assert(regionStats.offlineStats[OfflinePeer], HasLen, 1) +======= + regionStats := NewRegionStatistics(nil, opt, manager, nil) + regionStats.Observe(region1, stores) + re.Len(regionStats.stats[ExtraPeer], 1) + re.Len(regionStats.stats[LearnerPeer], 1) + re.Len(regionStats.stats[EmptyRegion], 1) + re.Len(regionStats.stats[UndersizedRegion], 1) + re.Len(regionStats.stats[OfflinePeer], 1) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection_test.go region1 = region1.Clone( core.WithDownPeers(downPeers), @@ -95,6 +105,7 @@ func (t *testRegionStatisticsSuite) TestRegionStatistics(c *C) { core.SetApproximateSize(144), ) regionStats.Observe(region1, stores) +<<<<<<< HEAD:server/statistics/region_collection_test.go c.Assert(regionStats.stats[ExtraPeer], HasLen, 1) c.Assert(regionStats.stats[MissPeer], HasLen, 0) c.Assert(regionStats.stats[DownPeer], HasLen, 1) @@ -136,6 +147,37 @@ func (t *testRegionStatisticsSuite) TestRegionStatistics(c *C) { c.Assert(regionStats.offlineStats[PendingPeer], HasLen, 0) c.Assert(regionStats.offlineStats[LearnerPeer], HasLen, 0) c.Assert(regionStats.offlineStats[OfflinePeer], HasLen, 0) +======= + re.Len(regionStats.stats[ExtraPeer], 1) + re.Empty(regionStats.stats[MissPeer]) + re.Len(regionStats.stats[DownPeer], 1) + re.Len(regionStats.stats[PendingPeer], 1) + re.Len(regionStats.stats[LearnerPeer], 1) + re.Empty(regionStats.stats[EmptyRegion]) + re.Len(regionStats.stats[OversizedRegion], 1) + re.Empty(regionStats.stats[UndersizedRegion]) + re.Len(regionStats.stats[OfflinePeer], 1) + + region2 = region2.Clone(core.WithDownPeers(downPeers[0:1])) + regionStats.Observe(region2, stores[0:2]) + re.Len(regionStats.stats[ExtraPeer], 1) + re.Len(regionStats.stats[MissPeer], 1) + re.Len(regionStats.stats[DownPeer], 2) + re.Len(regionStats.stats[PendingPeer], 1) + re.Len(regionStats.stats[LearnerPeer], 1) + re.Len(regionStats.stats[OversizedRegion], 1) + re.Len(regionStats.stats[UndersizedRegion], 1) + re.Len(regionStats.stats[OfflinePeer], 1) + + region1 = region1.Clone(core.WithRemoveStorePeer(7)) + regionStats.Observe(region1, stores[0:3]) + re.Empty(regionStats.stats[ExtraPeer]) + re.Len(regionStats.stats[MissPeer], 1) + re.Len(regionStats.stats[DownPeer], 2) + re.Len(regionStats.stats[PendingPeer], 1) + re.Empty(regionStats.stats[LearnerPeer]) + re.Empty(regionStats.stats[OfflinePeer]) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection_test.go store3 = stores[3].Clone(core.UpStore()) stores[3] = store3 @@ -170,7 +212,12 @@ func (t *testRegionStatisticsSuite) TestRegionStatisticsWithPlacementRule(c *C) region2 := core.NewRegionInfo(r2, peers[0]) region3 := core.NewRegionInfo(r3, peers[0]) region4 := core.NewRegionInfo(r4, peers[0]) +<<<<<<< HEAD:server/statistics/region_collection_test.go regionStats := NewRegionStatistics(opt, t.manager) +======= + region5 := core.NewRegionInfo(r5, peers[4]) + regionStats := NewRegionStatistics(nil, opt, manager, nil) +>>>>>>> 40eaa35f2 (statistics: get region info via core cluster inside RegionStatistics (#6804)):pkg/statistics/region_collection_test.go // r2 didn't match the rules regionStats.Observe(region2, stores) c.Assert(regionStats.stats[MissPeer], HasLen, 1)