Skip to content

Commit

Permalink
state
Browse files Browse the repository at this point in the history
Signed-off-by: Calvin Neo <[email protected]>
  • Loading branch information
CalvinNeo committed Sep 5, 2024
1 parent 9afa987 commit b9e3ca2
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 40 deletions.
58 changes: 30 additions & 28 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1376,40 +1376,13 @@ type MigrationResult struct {
Ops []*MigrationOp `json:"ops"`
}

// BalanceRegion checks if regions are imbalanced and rebalance them.
func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, requiredLabels []*metapb.StoreLabel) (MigrationResult, error) {
startKey, err := hex.DecodeString(rawStartKey)
if err != nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, err
}
endKey, err := hex.DecodeString(rawEndKey)
if err != nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, err
}
c := h.GetCluster()
if c == nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
co := h.GetCoordinator()
if co == nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
regions := c.ScanRegions(startKey, endKey, -1)
regionIDMap := make(map[uint64]*core.RegionInfo)
for _, r := range regions {
regionIDMap[r.GetID()] = r
}

stores := c.GetStores()
func ComputeCandidateStores(requiredLabels []*metapb.StoreLabel, stores []*core.StoreInfo, regions []*core.RegionInfo) []*StoreRegionSet {
candidates := make([]*StoreRegionSet, 0)
for _, s := range stores {
storeLabelMap := make(map[string]*metapb.StoreLabel)
for _, l := range s.GetLabels() {
storeLabelMap[l.Key] = l
}
if len(requiredLabels) != len(storeLabelMap) {
continue
}
gotLabels := true
for _, larg := range requiredLabels {
if l, ok := storeLabelMap[larg.Key]; ok {
Expand Down Expand Up @@ -1442,6 +1415,35 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, requiredLabels []
}
candidates = append(candidates, candidate)
}
return candidates
}

// RedistibuteRegions checks if regions are imbalanced and rebalance them.
func (h *Handler) RedistibuteRegions(rawStartKey, rawEndKey string, requiredLabels []*metapb.StoreLabel) (MigrationResult, error) {
startKey, err := hex.DecodeString(rawStartKey)
if err != nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, err
}
endKey, err := hex.DecodeString(rawEndKey)
if err != nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, err
}
c := h.GetCluster()
if c == nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
co := h.GetCoordinator()
if co == nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
regions := c.ScanRegions(startKey, endKey, -1)
regionIDMap := make(map[uint64]*core.RegionInfo)
for _, r := range regions {
regionIDMap[r.GetID()] = r
}

stores := c.GetStores()
candidates := ComputeCandidateStores(requiredLabels, stores, regions)

senders, receivers, ops := MigrationPlan(candidates)

Expand Down
4 changes: 2 additions & 2 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ func (h *regionsHandler) CheckRegionsReplicated(w http.ResponseWriter, r *http.R
h.rd.JSON(w, http.StatusOK, state)
}

func (h *regionsHandler) BalanceRegion(w http.ResponseWriter, r *http.Request) {
func (h *regionsHandler) RedistibuteRegions(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
rawStartKey := vars["startKey"]
rawEndKey := vars["endKey"]
storeLabels := make([]*metapb.StoreLabel, 0)
result, err := h.Handler.BalanceRegion(rawStartKey, rawEndKey, storeLabels)
result, err := h.Handler.RedistibuteRegions(rawStartKey, rawEndKey, storeLabels)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
Expand Down
2 changes: 1 addition & 1 deletion server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(clusterRouter, "/regions/split", regionsHandler.SplitRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/regions/range-holes", regionsHandler.GetRangeHoles, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/replicated", regionsHandler.CheckRegionsReplicated, setMethods(http.MethodGet), setQueries("startKey", "{startKey}", "endKey", "{endKey}"), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/balance", regionsHandler.BalanceRegion, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/regions/balance", regionsHandler.RedistibuteRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))

registerFunc(apiRouter, "/version", newVersionHandler(rd).GetVersion, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/status", newStatusHandler(svr, rd).GetPDStatus, setMethods(http.MethodGet), setAuditBackend(prometheus))
Expand Down
95 changes: 86 additions & 9 deletions tests/server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/handler"
Expand Down Expand Up @@ -661,7 +663,7 @@ type regionStoresPair struct {
StorePos []uint64
}

func buildBalanceRegionTestCases(storeIDs []uint64, regionDist []regionStoresPair) ([]*metapb.Store, []*core.RegionInfo) {
func buildRedistributeRegionsTestCases(storeIDs []uint64, regionDist []regionStoresPair) ([]*metapb.Store, []*core.RegionInfo) {
stores := []*metapb.Store{}
regions := []*core.RegionInfo{}
for _, i := range storeIDs {
Expand Down Expand Up @@ -732,10 +734,10 @@ func validateMigtationIn(ops []*handler.MigrationOp, storeIDs []uint64) []uint64
return rl
}

func (suite *operatorTestSuite) checkBalanceRegions1(cluster *tests.TestCluster) {
func (suite *operatorTestSuite) checkRedistributeRegions1(cluster *tests.TestCluster) {
re := suite.Require()

stores, regions := buildBalanceRegionTestCases([]uint64{1, 2, 4}, []regionStoresPair{
stores, regions := buildRedistributeRegionsTestCases([]uint64{1, 2, 4}, []regionStoresPair{
{10, []uint64{0}},
{20, []uint64{0}},
{30, []uint64{0}},
Expand All @@ -757,10 +759,10 @@ func (suite *operatorTestSuite) checkBalanceRegions1(cluster *tests.TestCluster)
re.Equal(2, len(result.Ops))
}

func (suite *operatorTestSuite) checkBalanceRegions2(cluster *tests.TestCluster) {
func (suite *operatorTestSuite) checkRedistributeRegions2(cluster *tests.TestCluster) {
re := suite.Require()

stores, regions := buildBalanceRegionTestCases([]uint64{1, 2, 4}, []regionStoresPair{
stores, regions := buildRedistributeRegionsTestCases([]uint64{1, 2, 4}, []regionStoresPair{
{10, []uint64{0, 1}},
{20, []uint64{0, 2}},
{30, []uint64{0, 1}},
Expand All @@ -784,18 +786,93 @@ func (suite *operatorTestSuite) checkBalanceRegions2(cluster *tests.TestCluster)
validateMigtationIn(result.Ops, []uint64{4})
}

func (suite *operatorTestSuite) TestBalanceRegions() {
use a new environment to avoid being affected by other tests
func (suite *operatorTestSuite) TestRedistributeRegions() {
env := tests.NewSchedulingTestEnvironment(suite.T(),
func(conf *config.Config, _ string) {
conf.Replication.MaxReplicas = 1
})
env.RunTestBasedOnMode(suite.checkBalanceRegions1)
env.RunTestBasedOnMode(suite.checkRedistributeRegions1)
env.Cleanup()
env2 := tests.NewSchedulingTestEnvironment(suite.T(),
func(conf *config.Config, _ string) {
conf.Replication.MaxReplicas = 1
})
env2.RunTestBasedOnMode(suite.checkBalanceRegions2)
env2.RunTestBasedOnMode(suite.checkRedistributeRegions2)
env2.Cleanup()
}

func (suite *operatorTestSuite) checkRedistributeRegions3(cluster *tests.TestCluster) {
re := suite.Require()

stores, regions := buildRedistributeRegionsTestCases([]uint64{1, 2, 4}, []regionStoresPair{
{10, []uint64{0, 1}},
{20, []uint64{0, 2}},
{30, []uint64{0, 1}},
})

for _, store := range stores {
tests.MustPutStore(re, cluster, store)
}

pauseAllCheckers(re, cluster)
result := handler.MigrationResult{}
for _, r := range regions {
tests.MustPutRegionInfo(re, cluster, r)
}

urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr())
e := tu.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/regions/balance", urlPrefix), []byte(``), tu.StatusOK(re), tu.ExtractJSON(re, &result))
re.NoError(e)
re.Equal(1, len(result.Ops))
validateMigtationOut(result.Ops, []uint64{1})
validateMigtationIn(result.Ops, []uint64{4})
}

func TestComputeCandidateStores(t *testing.T) {
re := require.New(t)
stores := []*core.StoreInfo{}

stats := &pdpb.StoreStats{
Capacity: 100,
Available: 100,
}
stores = append(stores, core.NewStoreInfo(
&metapb.Store{
Id: 1,
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}},
LastHeartbeat: time.Now().UnixNano(),
},
core.SetStoreStats(stats),
core.SetLastHeartbeatTS(time.Now()),
))
stores = append(stores, core.NewStoreInfo(
&metapb.Store{
Id: 2,
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}, {Key: "engine", Value: "tiflash"}},
LastHeartbeat: time.Now().UnixNano(),
},
core.SetStoreStats(stats),
core.SetLastHeartbeatTS(time.Now()),
))
stores = append(stores, core.NewStoreInfo(
&metapb.Store{
Id: 3,
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z2"}},
LastHeartbeat: time.Now().UnixNano(),
},
core.SetStoreStats(stats),
core.SetLastHeartbeatTS(time.Now()),
))

regions := []*core.RegionInfo{}

re.Len(handler.ComputeCandidateStores([]*metapb.StoreLabel{{Key: "zone", Value: "z1"}}, stores, regions), 2, "case 1")
re.Len(handler.ComputeCandidateStores([]*metapb.StoreLabel{{Key: "zone", Value: "z2"}}, stores, regions), 1, "case 1")
re.Len(handler.ComputeCandidateStores([]*metapb.StoreLabel{{Key: "zone", Value: "z1"}, {Key: "engine", Value: "tiflash"}}, stores, regions), 1, "case 1")
}

0 comments on commit b9e3ca2

Please sign in to comment.