Skip to content

Commit

Permalink
more fix
Browse files Browse the repository at this point in the history
Signed-off-by: Calvin Neo <[email protected]>
  • Loading branch information
CalvinNeo committed Aug 21, 2024
1 parent 9bfe8b6 commit 9afa987
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 41 deletions.
15 changes: 3 additions & 12 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1297,19 +1297,14 @@ func PickRegions(n int, fromStore *StoreRegionSet, toStore *StoreRegionSet) *Mig
// If toStore doesn't has this region, then create a move op.
o.Regions[r] = false
o.OriginalPeer = fromStore.OriginalPeer[r]
log.Info("!!!! Pick S", zap.Any("r", r), zap.Any("fr", fromStore), zap.Any("to", toStore), zap.Any("OriginalPeer", fromStore.OriginalPeer[r]))
fromStore.RegionIDSet[r] = true
n--
} else {
log.Info("!!!! Pick", zap.Any("r", r), zap.Any("fr", fromStore), zap.Any("to", toStore))
}
}
return &o
}

func MigrationPlan(stores []*StoreRegionSet) ([]int, []int, []*MigrationOp) {
log.Info("!!! MigrationPlan",
zap.Any("store-id", stores))
totalRegionCount := 0
for _, store := range stores {
totalRegionCount += len(store.RegionIDSet)
Expand All @@ -1335,7 +1330,6 @@ func MigrationPlan(stores []*StoreRegionSet) ([]int, []int, []*MigrationOp) {
expectedCount = append(expectedCount, avr)
}

log.Info("!!! expectedCount", zap.Any("expectedCount", expectedCount))
senders := []int{}
receivers := []int{}
sendersVolume := []int{}
Expand Down Expand Up @@ -1383,7 +1377,7 @@ type MigrationResult struct {
}

// BalanceRegion checks if regions are imbalanced and rebalance them.
func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*metapb.StoreLabel) (MigrationResult, error) {
func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, requiredLabels []*metapb.StoreLabel) (MigrationResult, error) {
startKey, err := hex.DecodeString(rawStartKey)
if err != nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, err
Expand Down Expand Up @@ -1413,12 +1407,11 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*me
for _, l := range s.GetLabels() {
storeLabelMap[l.Key] = l
}
if len(storeLabels) != len(storeLabelMap) {
if len(requiredLabels) != len(storeLabelMap) {
continue
}
log.Info("!!!! store pass 1", zap.Any("s", s), zap.Any("l", s.GetLabels()), zap.Any("id", s.GetID()))
gotLabels := true
for _, larg := range storeLabels {
for _, larg := range requiredLabels {
if l, ok := storeLabelMap[larg.Key]; ok {
if larg.Value != l.Value {
gotLabels = false
Expand All @@ -1433,7 +1426,6 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*me
if !gotLabels {
continue
}
log.Info("!!!! store pass 2", zap.Any("s", s))
candidate := &StoreRegionSet{
ID: s.GetID(),
Info: s,
Expand All @@ -1448,7 +1440,6 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*me
}
}
}
log.Info("!!!! store pass 3", zap.Any("s", s))
candidates = append(candidates, candidate)
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/utils/testutil/api_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ import (
"io"
"net/http"

"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/apiutil"
"go.uber.org/zap"
)

// Status is used to check whether http response code is equal given code.
Expand All @@ -47,7 +45,6 @@ func StatusNotOK(re *require.Assertions) func([]byte, int, http.Header) {
// ExtractJSON is used to check whether given data can be extracted successfully.
func ExtractJSON(re *require.Assertions, data any) func([]byte, int, http.Header) {
return func(resp []byte, _ int, _ http.Header) {
log.Info("!!!!! ffdfdfdfd", zap.Any("a", string(resp)))
re.NoError(json.Unmarshal(resp, data), "resp: "+string(resp))
}
}
Expand Down
145 changes: 119 additions & 26 deletions tests/server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"errors"
"fmt"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -655,54 +656,146 @@ func (suite *operatorTestSuite) checkRemoveOperators(cluster *tests.TestCluster)
re.NoError(err)
}

func (suite *operatorTestSuite) checkBalanceRegions(cluster *tests.TestCluster) {
re := suite.Require()
stores := []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 2,
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 4,
type regionStoresPair struct {
RegionId uint64
StorePos []uint64
}

func buildBalanceRegionTestCases(storeIDs []uint64, regionDist []regionStoresPair) ([]*metapb.Store, []*core.RegionInfo) {
stores := []*metapb.Store{}
regions := []*core.RegionInfo{}
for _, i := range storeIDs {
stores = append(stores, &metapb.Store{
Id: i,
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
LastHeartbeat: time.Now().UnixNano(),
},
})
}

var peerIdAllocator uint64
peerIdAllocator = 10000
for _, p := range regionDist {
regionId := p.RegionId
holdingStores := p.StorePos
var peers []*metapb.Peer
for _, storePos := range holdingStores {
s := stores[storePos]
peerIdAllocator += 1
peers = append(peers, &metapb.Peer{
StoreId: s.GetId(),
Id: peerIdAllocator,
})
}
region := core.NewTestRegionInfo(regionId, stores[holdingStores[0]].GetId(), []byte(fmt.Sprintf("r%v", regionId)), []byte(fmt.Sprintf("r%v", regionId+1)), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1), core.SetPeers(peers))
regions = append(regions, region)
}

return stores, regions
}

func validateMigtationOut(ops []*handler.MigrationOp, storeIDs []uint64) []uint64 {
r := make(map[uint64]interface{})
for _, op := range ops {
for _, sid := range storeIDs {
if op.FromStore == sid {
for k, _ := range op.Regions {
r[k] = nil
}
}
}
}
rl := []uint64{}
for k, _ := range r {
rl = append(rl, k)
}
slices.Sort(rl)
return rl
}

func validateMigtationIn(ops []*handler.MigrationOp, storeIDs []uint64) []uint64 {
r := make(map[uint64]interface{})
for _, op := range ops {
for _, sid := range storeIDs {
if op.ToStore == sid {
for k, _ := range op.Regions {
r[k] = nil
}
}
}
}
rl := []uint64{}
for k, _ := range r {
rl = append(rl, k)
}
slices.Sort(rl)
return rl
}

func (suite *operatorTestSuite) checkBalanceRegions1(cluster *tests.TestCluster) {
re := suite.Require()

stores, regions := buildBalanceRegionTestCases([]uint64{1, 2, 4}, []regionStoresPair{
{10, []uint64{0}},
{20, []uint64{0}},
{30, []uint64{0}},
})

for _, store := range stores {
tests.MustPutStore(re, cluster, store)
}

pauseAllCheckers(re, cluster)
result := handler.MigrationResult{}
r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1))
tests.MustPutRegionInfo(re, cluster, r1)
r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3))
tests.MustPutRegionInfo(re, cluster, r2)
r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2))
tests.MustPutRegionInfo(re, cluster, r3)
for _, r := range regions {
tests.MustPutRegionInfo(re, cluster, r)
}

urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr())
e := tu.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/regions/balance", urlPrefix), []byte(``), tu.StatusOK(re), tu.ExtractJSON(re, &result))
re.NoError(e)
re.Equal(2, len(result.Ops))
}

func (suite *operatorTestSuite) checkBalanceRegions2(cluster *tests.TestCluster) {
re := suite.Require()

stores, regions := buildBalanceRegionTestCases([]uint64{1, 2, 4}, []regionStoresPair{
{10, []uint64{0, 1}},
{20, []uint64{0, 2}},
{30, []uint64{0, 1}},
})

for _, store := range stores {
tests.MustPutStore(re, cluster, store)
}

pauseAllCheckers(re, cluster)
result := handler.MigrationResult{}
for _, r := range regions {
tests.MustPutRegionInfo(re, cluster, r)
}

urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr())
e := tu.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/regions/balance", urlPrefix), []byte(``), tu.StatusOK(re), tu.ExtractJSON(re, &result))
re.NoError(e)
re.Equal(1, len(result.Ops))
validateMigtationOut(result.Ops, []uint64{1})
validateMigtationIn(result.Ops, []uint64{4})
}

func (suite *operatorTestSuite) TestBalanceRegions() {
// use a new environment to avoid being affected by other tests
use a new environment to avoid being affected by other tests

Check failure on line 788 in tests/server/api/operator_test.go

View workflow job for this annotation

GitHub Actions / statics

expected ';', found a

Check failure on line 788 in tests/server/api/operator_test.go

View workflow job for this annotation

GitHub Actions / statics

expected ';', found a

Check failure on line 788 in tests/server/api/operator_test.go

View workflow job for this annotation

GitHub Actions / statics

expected ';', found a

Check failure on line 788 in tests/server/api/operator_test.go

View workflow job for this annotation

GitHub Actions / chunks (5, Tests(2))

expected ';', found a
env := tests.NewSchedulingTestEnvironment(suite.T(),
func(conf *config.Config, _ string) {
conf.Replication.MaxReplicas = 1
})
env.RunTestBasedOnMode(suite.checkBalanceRegions)
env.RunTestBasedOnMode(suite.checkBalanceRegions1)
env.Cleanup()
env2 := tests.NewSchedulingTestEnvironment(suite.T(),
func(conf *config.Config, _ string) {
conf.Replication.MaxReplicas = 1
})
env2.RunTestBasedOnMode(suite.checkBalanceRegions2)
env2.Cleanup()
}

Check failure on line 801 in tests/server/api/operator_test.go

View workflow job for this annotation

GitHub Actions / statics

expected '}', found 'EOF'

Check failure on line 801 in tests/server/api/operator_test.go

View workflow job for this annotation

GitHub Actions / statics

expected '}', found 'EOF'

Check failure on line 801 in tests/server/api/operator_test.go

View workflow job for this annotation

GitHub Actions / statics

expected '}', found 'EOF'

0 comments on commit 9afa987

Please sign in to comment.