diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index bd2afc2973c..c92cd2c67ea 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -1297,19 +1297,14 @@ func PickRegions(n int, fromStore *StoreRegionSet, toStore *StoreRegionSet) *Mig // If toStore doesn't has this region, then create a move op. o.Regions[r] = false o.OriginalPeer = fromStore.OriginalPeer[r] - log.Info("!!!! Pick S", zap.Any("r", r), zap.Any("fr", fromStore), zap.Any("to", toStore), zap.Any("OriginalPeer", fromStore.OriginalPeer[r])) fromStore.RegionIDSet[r] = true n-- - } else { - log.Info("!!!! Pick", zap.Any("r", r), zap.Any("fr", fromStore), zap.Any("to", toStore)) } } return &o } func MigrationPlan(stores []*StoreRegionSet) ([]int, []int, []*MigrationOp) { - log.Info("!!! MigrationPlan", - zap.Any("store-id", stores)) totalRegionCount := 0 for _, store := range stores { totalRegionCount += len(store.RegionIDSet) @@ -1335,7 +1330,6 @@ func MigrationPlan(stores []*StoreRegionSet) ([]int, []int, []*MigrationOp) { expectedCount = append(expectedCount, avr) } - log.Info("!!! expectedCount", zap.Any("expectedCount", expectedCount)) senders := []int{} receivers := []int{} sendersVolume := []int{} @@ -1383,7 +1377,7 @@ type MigrationResult struct { } // BalanceRegion checks if regions are imbalanced and rebalance them. -func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*metapb.StoreLabel) (MigrationResult, error) { +func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, requiredLabels []*metapb.StoreLabel) (MigrationResult, error) { startKey, err := hex.DecodeString(rawStartKey) if err != nil { return MigrationResult{ErrorCode: 1, Ops: nil}, err @@ -1413,12 +1407,11 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*me for _, l := range s.GetLabels() { storeLabelMap[l.Key] = l } - if len(storeLabels) != len(storeLabelMap) { + if len(requiredLabels) != len(storeLabelMap) { continue } - log.Info("!!!! store pass 1", zap.Any("s", s), zap.Any("l", s.GetLabels()), zap.Any("id", s.GetID())) gotLabels := true - for _, larg := range storeLabels { + for _, larg := range requiredLabels { if l, ok := storeLabelMap[larg.Key]; ok { if larg.Value != l.Value { gotLabels = false @@ -1433,7 +1426,6 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*me if !gotLabels { continue } - log.Info("!!!! store pass 2", zap.Any("s", s)) candidate := &StoreRegionSet{ ID: s.GetID(), Info: s, @@ -1448,7 +1440,6 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*me } } } - log.Info("!!!! store pass 3", zap.Any("s", s)) candidates = append(candidates, candidate) } diff --git a/pkg/utils/testutil/api_check.go b/pkg/utils/testutil/api_check.go index 5e44d661c1c..0b714204500 100644 --- a/pkg/utils/testutil/api_check.go +++ b/pkg/utils/testutil/api_check.go @@ -19,10 +19,8 @@ import ( "io" "net/http" - "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/apiutil" - "go.uber.org/zap" ) // Status is used to check whether http response code is equal given code. @@ -47,7 +45,6 @@ func StatusNotOK(re *require.Assertions) func([]byte, int, http.Header) { // ExtractJSON is used to check whether given data can be extracted successfully. func ExtractJSON(re *require.Assertions, data any) func([]byte, int, http.Header) { return func(resp []byte, _ int, _ http.Header) { - log.Info("!!!!! ffdfdfdfd", zap.Any("a", string(resp))) re.NoError(json.Unmarshal(resp, data), "resp: "+string(resp)) } } diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index 2b4f020badb..7a8105f401e 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -18,6 +18,7 @@ import ( "encoding/json" "errors" "fmt" + "slices" "sort" "strconv" "strings" @@ -655,28 +656,90 @@ func (suite *operatorTestSuite) checkRemoveOperators(cluster *tests.TestCluster) re.NoError(err) } -func (suite *operatorTestSuite) checkBalanceRegions(cluster *tests.TestCluster) { - re := suite.Require() - stores := []*metapb.Store{ - { - Id: 1, - State: metapb.StoreState_Up, - NodeState: metapb.NodeState_Serving, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 2, - State: metapb.StoreState_Up, - NodeState: metapb.NodeState_Serving, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 4, +type regionStoresPair struct { + RegionId uint64 + StorePos []uint64 +} + +func buildBalanceRegionTestCases(storeIDs []uint64, regionDist []regionStoresPair) ([]*metapb.Store, []*core.RegionInfo) { + stores := []*metapb.Store{} + regions := []*core.RegionInfo{} + for _, i := range storeIDs { + stores = append(stores, &metapb.Store{ + Id: i, State: metapb.StoreState_Up, NodeState: metapb.NodeState_Serving, LastHeartbeat: time.Now().UnixNano(), - }, + }) + } + + var peerIdAllocator uint64 + peerIdAllocator = 10000 + for _, p := range regionDist { + regionId := p.RegionId + holdingStores := p.StorePos + var peers []*metapb.Peer + for _, storePos := range holdingStores { + s := stores[storePos] + peerIdAllocator += 1 + peers = append(peers, &metapb.Peer{ + StoreId: s.GetId(), + Id: peerIdAllocator, + }) + } + region := core.NewTestRegionInfo(regionId, stores[holdingStores[0]].GetId(), []byte(fmt.Sprintf("r%v", regionId)), []byte(fmt.Sprintf("r%v", regionId+1)), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1), core.SetPeers(peers)) + regions = append(regions, region) + } + + return stores, regions +} + +func validateMigtationOut(ops []*handler.MigrationOp, storeIDs []uint64) []uint64 { + r := make(map[uint64]interface{}) + for _, op := range ops { + for _, sid := range storeIDs { + if op.FromStore == sid { + for k, _ := range op.Regions { + r[k] = nil + } + } + } + } + rl := []uint64{} + for k, _ := range r { + rl = append(rl, k) + } + slices.Sort(rl) + return rl +} + +func validateMigtationIn(ops []*handler.MigrationOp, storeIDs []uint64) []uint64 { + r := make(map[uint64]interface{}) + for _, op := range ops { + for _, sid := range storeIDs { + if op.ToStore == sid { + for k, _ := range op.Regions { + r[k] = nil + } + } + } } + rl := []uint64{} + for k, _ := range r { + rl = append(rl, k) + } + slices.Sort(rl) + return rl +} + +func (suite *operatorTestSuite) checkBalanceRegions1(cluster *tests.TestCluster) { + re := suite.Require() + + stores, regions := buildBalanceRegionTestCases([]uint64{1, 2, 4}, []regionStoresPair{ + {10, []uint64{0}}, + {20, []uint64{0}}, + {30, []uint64{0}}, + }) for _, store := range stores { tests.MustPutStore(re, cluster, store) @@ -684,12 +747,9 @@ func (suite *operatorTestSuite) checkBalanceRegions(cluster *tests.TestCluster) pauseAllCheckers(re, cluster) result := handler.MigrationResult{} - r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1)) - tests.MustPutRegionInfo(re, cluster, r1) - r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3)) - tests.MustPutRegionInfo(re, cluster, r2) - r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2)) - tests.MustPutRegionInfo(re, cluster, r3) + for _, r := range regions { + tests.MustPutRegionInfo(re, cluster, r) + } urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr()) e := tu.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/regions/balance", urlPrefix), []byte(``), tu.StatusOK(re), tu.ExtractJSON(re, &result)) @@ -697,12 +757,45 @@ func (suite *operatorTestSuite) checkBalanceRegions(cluster *tests.TestCluster) re.Equal(2, len(result.Ops)) } +func (suite *operatorTestSuite) checkBalanceRegions2(cluster *tests.TestCluster) { + re := suite.Require() + + stores, regions := buildBalanceRegionTestCases([]uint64{1, 2, 4}, []regionStoresPair{ + {10, []uint64{0, 1}}, + {20, []uint64{0, 2}}, + {30, []uint64{0, 1}}, + }) + + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } + + pauseAllCheckers(re, cluster) + result := handler.MigrationResult{} + for _, r := range regions { + tests.MustPutRegionInfo(re, cluster, r) + } + + urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr()) + e := tu.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/regions/balance", urlPrefix), []byte(``), tu.StatusOK(re), tu.ExtractJSON(re, &result)) + re.NoError(e) + re.Equal(1, len(result.Ops)) + validateMigtationOut(result.Ops, []uint64{1}) + validateMigtationIn(result.Ops, []uint64{4}) +} + func (suite *operatorTestSuite) TestBalanceRegions() { - // use a new environment to avoid being affected by other tests + use a new environment to avoid being affected by other tests env := tests.NewSchedulingTestEnvironment(suite.T(), func(conf *config.Config, _ string) { conf.Replication.MaxReplicas = 1 }) - env.RunTestBasedOnMode(suite.checkBalanceRegions) + env.RunTestBasedOnMode(suite.checkBalanceRegions1) env.Cleanup() + env2 := tests.NewSchedulingTestEnvironment(suite.T(), + func(conf *config.Config, _ string) { + conf.Replication.MaxReplicas = 1 + }) + env2.RunTestBasedOnMode(suite.checkBalanceRegions2) + env2.Cleanup() }