Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
absolute8511 committed Feb 4, 2021
2 parents 5459c25 + fa168fe commit bbd259a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 11 deletions.
39 changes: 28 additions & 11 deletions consistence/data_placement_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -1640,23 +1640,16 @@ func (dpm *DataPlacement) chooseNewLeaderFromISR(topicInfo *TopicPartitionMetaIn
if err == nil && dpm.isTopNTopic(topicInfo, sortedTopN) {
newLeader = dpm.chooseNewLeaderFromISRForTopN(topicInfo, sortedTopN, topicList, currentNodes, newestReplicas)
} else {
// choose another leader in ISR list, and add new node to ISR
// list.
// select the least load factor node
minLF := float64(math.MaxInt64)
loadFactors := make(map[string]float64, len(newestReplicas))
for _, replica := range newestReplicas {
stat, err := dpm.lookupCoord.getNsqdTopicStat(currentNodes[replica])
if err != nil {
coordLog.Infof("ignore node %v while choose new leader : %v", replica, topicInfo.GetTopicDesp())
coordLog.Infof("ignore node %v while choose new leader : %v, %v", replica, topicInfo.GetTopicDesp(), err)
continue
}
lf := stat.GetNodeLeaderLoadFactor()

coordLog.Infof("node %v load factor is : %v", replica, lf)
if newLeader == "" || lf < minLF {
newLeader = replica
}
loadFactors[replica] = stat.GetNodeLeaderLoadFactor()
}
newLeader = dpm.chooseNewLeaderByLeastLoadFactor(loadFactors)
}
}
if newLeader == "" {
Expand All @@ -1667,6 +1660,30 @@ func (dpm *DataPlacement) chooseNewLeaderFromISR(topicInfo *TopicPartitionMetaIn
return newLeader, newestLogID, nil
}

// chooseNewLeaderByLeastLoadFactor choose another leader in ISR list, and add new node to ISR
// list.
// select the least load factor node
func (dpm *DataPlacement) chooseNewLeaderByLeastLoadFactor(loadFactors map[string]float64) string {
if len(loadFactors) == 0 {
return ""
}

var (
newLeader = ""
minLF = float64(math.MaxInt64)
)

for replica, lf := range loadFactors {
coordLog.Infof("node %v load factor is : %v", replica, lf)
if newLeader == "" || lf < minLF {
newLeader = replica
minLF = lf
}
}

return newLeader
}

func (dpm *DataPlacement) chooseNewLeaderFromISRForTopN(topicInfo *TopicPartitionMetaInfo,
sortedTopics LFListT,
topicList []TopicPartitionMetaInfo,
Expand Down
44 changes: 44 additions & 0 deletions consistence/data_placement_mgr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package consistence

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestChooseNewLeaderByLeastLoadFactor(t *testing.T) {
dpm := &DataPlacement{}
testCases := []struct {
loadFactors map[string]float64
newLeader []string
}{
{
loadFactors: map[string]float64{"a": 1},
newLeader: []string{"a"},
},
{
loadFactors: map[string]float64{"a": 96, "b": 35},
newLeader: []string{"b"},
},
{
loadFactors: map[string]float64{"a": 35, "b": 35},
newLeader: []string{"a", "b"},
},
{
loadFactors: map[string]float64{"a": 98, "b": 56, "c": 63},
newLeader: []string{"b"},
},
{
loadFactors: map[string]float64{"a": 98, "b": 56, "c": 56},
newLeader: []string{"b", "c"},
},
{
loadFactors: map[string]float64{"a": 56, "b": 56, "c": 56},
newLeader: []string{"a", "b", "c"},
},
}
for _, tCase := range testCases {
newLeader := dpm.chooseNewLeaderByLeastLoadFactor(tCase.loadFactors)
assert.Containsf(t, tCase.newLeader, newLeader, "not the right leader %s", newLeader)
}
}

0 comments on commit bbd259a

Please sign in to comment.