Skip to content

Commit

Permalink
schedule: Add leader filter for scatter (#5663)
Browse files Browse the repository at this point in the history
close #5622

schedule: Add leader filter for scatter

Signed-off-by: Cabinfever_B <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
CabinfeverB and ti-chi-bot authored Nov 3, 2022
1 parent 6f0fb32 commit 6fcb528
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 5 deletions.
16 changes: 11 additions & 5 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
// FIXME: target leader only considers the ordinary stores, maybe we need to consider the
// special engine stores if the engine supports to become a leader. But now there is only
// one engine, tiflash, which does not support the leader, so don't consider it for now.
targetLeader := r.selectAvailableLeaderStore(group, targetPeers, r.ordinaryEngine)
targetLeader := r.selectAvailableLeaderStore(group, region, targetPeers, r.ordinaryEngine)
if targetLeader == 0 {
scatterCounter.WithLabelValues("no-leader", "").Inc()
return nil
Expand Down Expand Up @@ -457,16 +457,22 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto
}

// selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by
// the existed peers store depended on the leader counts in the group level.
func (r *RegionScatterer) selectAvailableLeaderStore(group string, peers map[uint64]*metapb.Peer, context engineContext) uint64 {
// the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines.
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, peers map[uint64]*metapb.Peer, context engineContext) uint64 {
sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId())
if sourceStore == nil {
log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))
return 0
}
leaderCandidateStores := make([]uint64, 0)
// use PlacementLeaderSafeguard for filtering follower and learner in rule
filter := filter.NewPlacementLeaderSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, true /*allowMoveLeader*/)
for storeID := range peers {
store := r.cluster.GetStore(storeID)
if store == nil {
return 0
}
engine := store.GetLabelValue(core.EngineKey)
if len(engine) < 1 {
if filter == nil || filter.Target(r.cluster.GetOpts(), store).IsOK() {
leaderCandidateStores = append(leaderCandidateStores, storeID)
}
}
Expand Down
86 changes: 86 additions & 0 deletions server/schedule/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,92 @@ func TestRegionFromDifferentGroups(t *testing.T) {
check(scatterer.ordinaryEngine.selectedPeer)
}

func TestRegionHasLearner(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
oc := NewOperatorController(ctx, tc, stream)
// Add 8 stores.
voterCount := uint64(6)
storeCount := uint64(8)
for i := uint64(1); i <= voterCount; i++ {
tc.AddLabelsStore(i, 0, map[string]string{"zone": "z1"})
}
for i := voterCount + 1; i <= 8; i++ {
tc.AddLabelsStore(i, 0, map[string]string{"zone": "z2"})
}
tc.RuleManager.SetRule(&placement.Rule{
GroupID: "pd",
ID: "default",
Role: placement.Voter,
Count: 3,
LabelConstraints: []placement.LabelConstraint{
{
Key: "zone",
Op: placement.In,
Values: []string{"z1"},
},
},
})
tc.RuleManager.SetRule(&placement.Rule{
GroupID: "pd",
ID: "learner",
Role: placement.Learner,
Count: 1,
LabelConstraints: []placement.LabelConstraint{
{
Key: "zone",
Op: placement.In,
Values: []string{"z2"},
},
},
})
scatterer := NewRegionScatterer(ctx, tc, oc)
regionCount := 50
for i := 1; i <= regionCount; i++ {
_, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), "group")
re.NoError(err)
}
check := func(ss *selectedStores) {
max := uint64(0)
min := uint64(math.MaxUint64)
for i := uint64(1); i <= max; i++ {
count := ss.TotalCountByStore(i)
if count > max {
max = count
}
if count < min {
min = count
}
}
re.LessOrEqual(max-min, uint64(2))
}
check(scatterer.ordinaryEngine.selectedPeer)
checkLeader := func(ss *selectedStores) {
max := uint64(0)
min := uint64(math.MaxUint64)
for i := uint64(1); i <= voterCount; i++ {
count := ss.TotalCountByStore(i)
if count > max {
max = count
}
if count < min {
min = count
}
}
re.LessOrEqual(max-2, uint64(regionCount)/voterCount)
re.LessOrEqual(min-1, uint64(regionCount)/voterCount)
for i := voterCount + 1; i <= storeCount; i++ {
count := ss.TotalCountByStore(i)
re.LessOrEqual(count, uint64(0))
}
}
checkLeader(scatterer.ordinaryEngine.selectedLeader)
}

// TestSelectedStores tests if the peer count has changed due to the picking strategy.
// Ref https://github.com/tikv/pd/issues/4565
func TestSelectedStores(t *testing.T) {
Expand Down

0 comments on commit 6fcb528

Please sign in to comment.