diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index c92cd2c67ea..cbd88bb66b0 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -1376,40 +1376,13 @@ type MigrationResult struct { Ops []*MigrationOp `json:"ops"` } -// BalanceRegion checks if regions are imbalanced and rebalance them. -func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, requiredLabels []*metapb.StoreLabel) (MigrationResult, error) { - startKey, err := hex.DecodeString(rawStartKey) - if err != nil { - return MigrationResult{ErrorCode: 1, Ops: nil}, err - } - endKey, err := hex.DecodeString(rawEndKey) - if err != nil { - return MigrationResult{ErrorCode: 1, Ops: nil}, err - } - c := h.GetCluster() - if c == nil { - return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs() - } - co := h.GetCoordinator() - if co == nil { - return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs() - } - regions := c.ScanRegions(startKey, endKey, -1) - regionIDMap := make(map[uint64]*core.RegionInfo) - for _, r := range regions { - regionIDMap[r.GetID()] = r - } - - stores := c.GetStores() +func ComputeCandidateStores(requiredLabels []*metapb.StoreLabel, stores []*core.StoreInfo, regions []*core.RegionInfo) []*StoreRegionSet { candidates := make([]*StoreRegionSet, 0) for _, s := range stores { storeLabelMap := make(map[string]*metapb.StoreLabel) for _, l := range s.GetLabels() { storeLabelMap[l.Key] = l } - if len(requiredLabels) != len(storeLabelMap) { - continue - } gotLabels := true for _, larg := range requiredLabels { if l, ok := storeLabelMap[larg.Key]; ok { @@ -1442,6 +1415,35 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, requiredLabels [] } candidates = append(candidates, candidate) } + return candidates +} + +// RedistibuteRegions checks if regions are imbalanced and rebalance them. +func (h *Handler) RedistibuteRegions(rawStartKey, rawEndKey string, requiredLabels []*metapb.StoreLabel) (MigrationResult, error) { + startKey, err := hex.DecodeString(rawStartKey) + if err != nil { + return MigrationResult{ErrorCode: 1, Ops: nil}, err + } + endKey, err := hex.DecodeString(rawEndKey) + if err != nil { + return MigrationResult{ErrorCode: 1, Ops: nil}, err + } + c := h.GetCluster() + if c == nil { + return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + co := h.GetCoordinator() + if co == nil { + return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + regions := c.ScanRegions(startKey, endKey, -1) + regionIDMap := make(map[uint64]*core.RegionInfo) + for _, r := range regions { + regionIDMap[r.GetID()] = r + } + + stores := c.GetStores() + candidates := ComputeCandidateStores(requiredLabels, stores, regions) senders, receivers, ops := MigrationPlan(candidates) diff --git a/server/api/region.go b/server/api/region.go index 6c60d7fcd7b..d04bfb00c20 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -129,12 +129,12 @@ func (h *regionsHandler) CheckRegionsReplicated(w http.ResponseWriter, r *http.R h.rd.JSON(w, http.StatusOK, state) } -func (h *regionsHandler) BalanceRegion(w http.ResponseWriter, r *http.Request) { +func (h *regionsHandler) RedistibuteRegions(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) rawStartKey := vars["startKey"] rawEndKey := vars["endKey"] storeLabels := make([]*metapb.StoreLabel, 0) - result, err := h.Handler.BalanceRegion(rawStartKey, rawEndKey, storeLabels) + result, err := h.Handler.RedistibuteRegions(rawStartKey, rawEndKey, storeLabels) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return diff --git a/server/api/router.go b/server/api/router.go index e8b12e88cca..9f0d07bdfb5 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -284,7 +284,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/regions/split", regionsHandler.SplitRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/regions/range-holes", regionsHandler.GetRangeHoles, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/replicated", regionsHandler.CheckRegionsReplicated, setMethods(http.MethodGet), setQueries("startKey", "{startKey}", "endKey", "{endKey}"), setAuditBackend(prometheus)) - registerFunc(clusterRouter, "/regions/balance", regionsHandler.BalanceRegion, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) + registerFunc(clusterRouter, "/regions/balance", regionsHandler.RedistibuteRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/version", newVersionHandler(rd).GetVersion, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(apiRouter, "/status", newStatusHandler(svr, rd).GetPDStatus, setMethods(http.MethodGet), setAuditBackend(prometheus)) diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index 7a8105f401e..9a63dd55345 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -26,6 +26,8 @@ import ( "time" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/handler" @@ -661,7 +663,7 @@ type regionStoresPair struct { StorePos []uint64 } -func buildBalanceRegionTestCases(storeIDs []uint64, regionDist []regionStoresPair) ([]*metapb.Store, []*core.RegionInfo) { +func buildRedistributeRegionsTestCases(storeIDs []uint64, regionDist []regionStoresPair) ([]*metapb.Store, []*core.RegionInfo) { stores := []*metapb.Store{} regions := []*core.RegionInfo{} for _, i := range storeIDs { @@ -732,10 +734,10 @@ func validateMigtationIn(ops []*handler.MigrationOp, storeIDs []uint64) []uint64 return rl } -func (suite *operatorTestSuite) checkBalanceRegions1(cluster *tests.TestCluster) { +func (suite *operatorTestSuite) checkRedistributeRegions1(cluster *tests.TestCluster) { re := suite.Require() - stores, regions := buildBalanceRegionTestCases([]uint64{1, 2, 4}, []regionStoresPair{ + stores, regions := buildRedistributeRegionsTestCases([]uint64{1, 2, 4}, []regionStoresPair{ {10, []uint64{0}}, {20, []uint64{0}}, {30, []uint64{0}}, @@ -757,10 +759,10 @@ func (suite *operatorTestSuite) checkBalanceRegions1(cluster *tests.TestCluster) re.Equal(2, len(result.Ops)) } -func (suite *operatorTestSuite) checkBalanceRegions2(cluster *tests.TestCluster) { +func (suite *operatorTestSuite) checkRedistributeRegions2(cluster *tests.TestCluster) { re := suite.Require() - stores, regions := buildBalanceRegionTestCases([]uint64{1, 2, 4}, []regionStoresPair{ + stores, regions := buildRedistributeRegionsTestCases([]uint64{1, 2, 4}, []regionStoresPair{ {10, []uint64{0, 1}}, {20, []uint64{0, 2}}, {30, []uint64{0, 1}}, @@ -784,18 +786,93 @@ func (suite *operatorTestSuite) checkBalanceRegions2(cluster *tests.TestCluster) validateMigtationIn(result.Ops, []uint64{4}) } -func (suite *operatorTestSuite) TestBalanceRegions() { - use a new environment to avoid being affected by other tests +func (suite *operatorTestSuite) TestRedistributeRegions() { env := tests.NewSchedulingTestEnvironment(suite.T(), func(conf *config.Config, _ string) { conf.Replication.MaxReplicas = 1 }) - env.RunTestBasedOnMode(suite.checkBalanceRegions1) + env.RunTestBasedOnMode(suite.checkRedistributeRegions1) env.Cleanup() env2 := tests.NewSchedulingTestEnvironment(suite.T(), func(conf *config.Config, _ string) { conf.Replication.MaxReplicas = 1 }) - env2.RunTestBasedOnMode(suite.checkBalanceRegions2) + env2.RunTestBasedOnMode(suite.checkRedistributeRegions2) env2.Cleanup() } + +func (suite *operatorTestSuite) checkRedistributeRegions3(cluster *tests.TestCluster) { + re := suite.Require() + + stores, regions := buildRedistributeRegionsTestCases([]uint64{1, 2, 4}, []regionStoresPair{ + {10, []uint64{0, 1}}, + {20, []uint64{0, 2}}, + {30, []uint64{0, 1}}, + }) + + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } + + pauseAllCheckers(re, cluster) + result := handler.MigrationResult{} + for _, r := range regions { + tests.MustPutRegionInfo(re, cluster, r) + } + + urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr()) + e := tu.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/regions/balance", urlPrefix), []byte(``), tu.StatusOK(re), tu.ExtractJSON(re, &result)) + re.NoError(e) + re.Equal(1, len(result.Ops)) + validateMigtationOut(result.Ops, []uint64{1}) + validateMigtationIn(result.Ops, []uint64{4}) +} + +func TestComputeCandidateStores(t *testing.T) { + re := require.New(t) + stores := []*core.StoreInfo{} + + stats := &pdpb.StoreStats{ + Capacity: 100, + Available: 100, + } + stores = append(stores, core.NewStoreInfo( + &metapb.Store{ + Id: 1, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}}, + LastHeartbeat: time.Now().UnixNano(), + }, + core.SetStoreStats(stats), + core.SetLastHeartbeatTS(time.Now()), + )) + stores = append(stores, core.NewStoreInfo( + &metapb.Store{ + Id: 2, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}, {Key: "engine", Value: "tiflash"}}, + LastHeartbeat: time.Now().UnixNano(), + }, + core.SetStoreStats(stats), + core.SetLastHeartbeatTS(time.Now()), + )) + stores = append(stores, core.NewStoreInfo( + &metapb.Store{ + Id: 3, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z2"}}, + LastHeartbeat: time.Now().UnixNano(), + }, + core.SetStoreStats(stats), + core.SetLastHeartbeatTS(time.Now()), + )) + + regions := []*core.RegionInfo{} + + re.Len(handler.ComputeCandidateStores([]*metapb.StoreLabel{{Key: "zone", Value: "z1"}}, stores, regions), 2, "case 1") + re.Len(handler.ComputeCandidateStores([]*metapb.StoreLabel{{Key: "zone", Value: "z2"}}, stores, regions), 1, "case 1") + re.Len(handler.ComputeCandidateStores([]*metapb.StoreLabel{{Key: "zone", Value: "z1"}, {Key: "engine", Value: "tiflash"}}, stores, regions), 1, "case 1") +}