Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rule_checker: can replace unhealthPeer with orphanPeer (#6831) #6844

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d $(PACKAGE_DIRECTORIES) 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run --verbose $(PACKAGE_DIRECTORIES)
@ golangci-lint run --verbose $(PACKAGE_DIRECTORIES) --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config revive.toml $(PACKAGES)

Expand Down
6 changes: 4 additions & 2 deletions client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ install-tools:

static: install-tools
@ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ golangci-lint run -c ../.golangci.yml ./...
@ revive -formatter friendly -config ../revive.toml .
@ echo "golangci-lint ..."
@ golangci-lint run -c ../.golangci.yml --verbose ./... --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config ../revive.toml ./...

tidy:
@ go mod tidy
Expand Down
54 changes: 50 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ var (
ruleCheckerMoveToBetterLocationCounter = checkerCounter.WithLabelValues(ruleChecker, "move-to-better-location")
ruleCheckerSkipRemoveOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "skip-remove-orphan-peer")
ruleCheckerRemoveOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "remove-orphan-peer")
ruleCheckerReplaceOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "replace-orphan-peer")
)

// RuleChecker fix/improve region by placement rules.
Expand Down Expand Up @@ -426,14 +427,15 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
if len(fit.OrphanPeers) == 0 {
return nil, nil
}
var pinDownPeer *metapb.Peer
isUnhealthyPeer := func(id uint64) bool {
for _, pendingPeer := range region.GetPendingPeers() {
if pendingPeer.GetId() == id {
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.GetId() == id {
return true
}
}
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.GetId() == id {
for _, pendingPeer := range region.GetPendingPeers() {
if pendingPeer.GetId() == id {
return true
}
}
Expand All @@ -450,16 +452,56 @@ loopFits:
}
for _, p := range rf.Peers {
if isUnhealthyPeer(p.GetId()) {
// make sure is down peer.
if region.GetDownPeer(p.GetId()) != nil {
pinDownPeer = p
}
hasUnhealthyFit = true
break loopFits
}
}
}

// If hasUnhealthyFit is false, it is safe to delete the OrphanPeer.
if !hasUnhealthyFit {
ruleCheckerRemoveOrphanPeerCounter.Inc()
return operator.CreateRemovePeerOperator("remove-orphan-peer", c.cluster, 0, region, fit.OrphanPeers[0].StoreId)
}

// try to use orphan peers to replace unhealthy down peers.
for _, orphanPeer := range fit.OrphanPeers {
if pinDownPeer != nil {
// make sure the orphan peer is healthy.
if isUnhealthyPeer(orphanPeer.GetId()) {
continue
}
// no consider witness in this path.
if pinDownPeer.GetIsWitness() || orphanPeer.GetIsWitness() {
continue
}
// down peer's store should be down.
if !c.isStoreDownTimeHitMaxDownTime(pinDownPeer.GetStoreId()) {
continue
}
// check if down peer can replace with orphan peer.
dstStore := c.cluster.GetStore(orphanPeer.GetStoreId())
if fit.Replace(pinDownPeer.GetStoreId(), dstStore) {
destRole := pinDownPeer.GetRole()
orphanPeerRole := orphanPeer.GetRole()
ruleCheckerReplaceOrphanPeerCounter.Inc()
switch {
case orphanPeerRole == metapb.PeerRole_Learner && destRole == metapb.PeerRole_Voter:
return operator.CreatePromoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer)
case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Learner:
return operator.CreateDemoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer)
default:
// destRole should not same with orphanPeerRole. if role is same, it fit with orphanPeer should be better than now.
// destRole never be leader, so we not consider it.
}
}
}
}

// If hasUnhealthyFit is true, try to remove unhealthy orphan peers only if number of OrphanPeers is >= 2.
// Ref https://github.com/tikv/pd/issues/4045
if len(fit.OrphanPeers) >= 2 {
Expand Down Expand Up @@ -498,6 +540,10 @@ func (c *RuleChecker) isDownPeer(region *core.RegionInfo, peer *metapb.Peer) boo

func (c *RuleChecker) isStoreDownTimeHitMaxDownTime(storeID uint64) bool {
store := c.cluster.GetStore(storeID)
if store == nil {
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return false
}
return store.DownTime() >= c.cluster.GetOpts().GetMaxStoreDownTime()
}

Expand Down
127 changes: 126 additions & 1 deletion pkg/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ func (suite *ruleCheckerTestSuite) TestFixRuleWitness() {
op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("add-rule-peer", op.Desc())
fmt.Println(op)
suite.Equal(uint64(3), op.Step(0).(operator.AddLearner).ToStore)
suite.True(op.Step(0).(operator.AddLearner).IsWitness)
}
Expand Down Expand Up @@ -686,6 +685,132 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() {
suite.Equal("remove-orphan-peer", op.Desc())
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole1() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4})
r1 := suite.cluster.GetRegion(1)
suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano()

// set peer3 to pending and down
r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}))
r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: r1.GetStorePeer(3),
DownSeconds: 30000,
},
}))
suite.cluster.PutRegion(r1)

op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Equal(uint64(3), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore)
suite.Equal(uint64(4), op.Step(0).(operator.ChangePeerV2Enter).PromoteLearners[0].ToStore)
suite.Equal(uint64(3), op.Step(1).(operator.ChangePeerV2Leave).DemoteVoters[0].ToStore)
suite.Equal(uint64(4), op.Step(1).(operator.ChangePeerV2Leave).PromoteLearners[0].ToStore)
suite.Equal("replace-down-peer-with-orphan-peer", op.Desc())

// set peer3 only pending
r1 = r1.Clone(core.WithDownPeers(nil))
suite.cluster.PutRegion(r1)
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.Nil(op)
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole2() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
suite.cluster.AddLeaderRegion(1, 1, 2, 3, 4, 5)
r1 := suite.cluster.GetRegion(1)

// set peer3 to pending and down, and peer 3 to learner, and store 3 is down
suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano()
r1 = r1.Clone(core.WithLearners([]*metapb.Peer{r1.GetPeer(3)}))
r1 = r1.Clone(
core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}),
core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: r1.GetStorePeer(3),
DownSeconds: 30000,
},
}),
)
suite.cluster.PutRegion(r1)

// default and test group => 3 voter + 1 learner
err := suite.ruleManager.SetRule(&placement.Rule{
GroupID: "test",
ID: "10",
Role: placement.Learner,
Count: 1,
})
suite.NoError(err)

op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Equal(uint64(5), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore)
suite.Equal(uint64(3), op.Step(1).(operator.RemovePeer).FromStore)
suite.Equal("replace-down-peer-with-orphan-peer", op.Desc())
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthPeersAndTiFlash() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4", "engine": "tiflash"})
suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4})
rule := &placement.Rule{
GroupID: "pd",
ID: "test",
Role: placement.Voter,
Count: 3,
}
rule2 := &placement.Rule{
GroupID: "pd",
ID: "test2",
Role: placement.Learner,
Count: 1,
LabelConstraints: []placement.LabelConstraint{
{
Key: "engine",
Op: placement.In,
Values: []string{"tiflash"},
},
},
}
suite.ruleManager.SetRule(rule)
suite.ruleManager.SetRule(rule2)
suite.ruleManager.DeleteRule("pd", "default")

r1 := suite.cluster.GetRegion(1)
// set peer3 to pending and down
r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}))
r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: r1.GetStorePeer(3),
DownSeconds: 30000,
},
}))
suite.cluster.PutRegion(r1)
suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano()

op := suite.rc.Check(suite.cluster.GetRegion(1))
// should not promote tiflash peer
suite.Nil(op)

// scale a node, can replace the down peer
suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("fast-replace-rule-down-peer", op.Desc())
}

func (suite *ruleCheckerTestSuite) TestIssue3293() {
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host1"})
Expand Down
5 changes: 4 additions & 1 deletion pkg/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ func (b *Builder) Build(kind OpKind) (*Operator, error) {
if brief, b.err = b.prepareBuild(); b.err != nil {
return nil, b.err
}

if b.useJointConsensus {
kind, b.err = b.buildStepsWithJointConsensus(kind)
} else {
Expand Down Expand Up @@ -549,6 +548,10 @@ func (b *Builder) brief() string {
return fmt.Sprintf("%s: store %s to %s", op, b.toRemove, b.toAdd)
case len(b.toAdd) > 0:
return fmt.Sprintf("add peer: store %s", b.toAdd)
case len(b.toRemove) > 0 && len(b.toPromote) > 0:
return fmt.Sprintf("promote peer: store %s, rm peer: store %s", b.toRemove, b.toPromote)
case len(b.toRemove) > 0 && len(b.toDemote) > 0:
return fmt.Sprintf("demote peer: store %s, rm peer: store %s", b.toDemote, b.toRemove)
case len(b.toRemove) > 0:
return fmt.Sprintf("rm peer: store %s", b.toRemove)
case len(b.toPromote) > 0:
Expand Down
21 changes: 20 additions & 1 deletion pkg/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,25 @@ func CreatePromoteLearnerOperator(desc string, ci ClusterInformer, region *core.
Build(0)
}

// CreatePromoteLearnerOperatorAndRemovePeer creates an operator that promotes a learner and removes a peer.
func CreatePromoteLearnerOperatorAndRemovePeer(desc string, ci ClusterInformer, region *core.RegionInfo, toPromote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
PromoteLearner(toPromote.GetStoreId()).
RemovePeer(toRemove.GetStoreId()).
Build(0)
}

// CreateDemoteLearnerOperatorAndRemovePeer creates an operator that demotes a learner and removes a peer.
func CreateDemoteLearnerOperatorAndRemovePeer(desc string, ci ClusterInformer, region *core.RegionInfo, toDemote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) {
if !ci.GetOpts().IsUseJointConsensus() {
return nil, errors.Errorf("cannot build demote learner operator due to disabling using joint state")
}
return NewBuilder(desc, ci, region).
DemoteVoter(toDemote.GetStoreId()).
RemovePeer(toRemove.GetStoreId()).
Build(0)
}

// CreateRemovePeerOperator creates an operator that removes a peer from region.
func CreateRemovePeerOperator(desc string, ci ClusterInformer, kind OpKind, region *core.RegionInfo, storeID uint64) (*Operator, error) {
return NewBuilder(desc, ci, region).
Expand Down Expand Up @@ -246,7 +265,7 @@ func CreateLeaveJointStateOperator(desc string, ci ClusterInformer, origin *core
b := NewBuilder(desc, ci, origin, SkipOriginJointStateCheck, SkipPlacementRulesCheck)

if b.err == nil && !core.IsInJointState(origin.GetPeers()...) {
b.err = errors.Errorf("cannot build leave joint state operator for region which is not in joint state")
b.err = errors.Errorf("cannot build leave joint state operator due to disabling using joint state")
}

if b.err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (o *Operator) Sync(other *Operator) {
func (o *Operator) String() string {
stepStrs := make([]string, len(o.steps))
for i := range o.steps {
stepStrs[i] = o.steps[i].String()
stepStrs[i] = fmt.Sprintf("%d:{%s}", i, o.steps[i].String())
}
s := fmt.Sprintf("%s {%s} (kind:%s, region:%v(%v, %v), createAt:%s, startAt:%s, currentStep:%v, size:%d, steps:[%s],timeout:[%s])",
o.desc, o.brief, o.kind, o.regionID, o.regionEpoch.GetVersion(), o.regionEpoch.GetConfVer(), o.GetCreateTime(),
Expand Down
3 changes: 3 additions & 0 deletions pkg/schedule/placement/fit.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type RegionFit struct {

// Replace return true if the replacement store is fit all constraints and isolation score is not less than the origin.
func (f *RegionFit) Replace(srcStoreID uint64, dstStore *core.StoreInfo) bool {
if dstStore == nil {
return false
}
fit := f.getRuleFitByStoreID(srcStoreID)
// check the target store is fit all constraints.
if fit == nil {
Expand Down
Loading
Loading