Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/member: Fixing residual counts in campaign times #8226

Merged
merged 4 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ import (
)

const (
defaultCampaignTimesSlot = 10
watchLoopUnhealthyTimeout = 60 * time.Second
campaignTimesRecordTimeout = 5 * time.Minute
defaultCampaignTimesSlot = 10
watchLoopUnhealthyTimeout = 60 * time.Second
)

var campaignTimesRecordTimeout = 5 * time.Minute

// GetLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
leader := &pdpb.Member{}
Expand Down Expand Up @@ -114,6 +115,7 @@ func (ls *Leadership) GetLeaderKey() string {
}

// GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`.
// Need to make sure `AddCampaignTimes` is called before this function.
func (ls *Leadership) GetCampaignTimesNum() int {
if ls == nil {
return 0
Expand All @@ -129,16 +131,16 @@ func (ls *Leadership) ResetCampaignTimes() {
ls.campaignTimes = make([]time.Time, 0, defaultCampaignTimesSlot)
}

// addCampaignTimes is used to add the campaign times of the leader.
func (ls *Leadership) addCampaignTimes() {
// AddCampaignTimes is used to add the campaign times of the leader.
func (ls *Leadership) AddCampaignTimes() {
if ls == nil {
return
}
for i := len(ls.campaignTimes) - 1; i >= 0; i-- {
if time.Since(ls.campaignTimes[i]) > campaignTimesRecordTimeout {
// remove the time which is more than `campaignTimesRecordTimeout`
// array is sorted by time
ls.campaignTimes = ls.campaignTimes[i:]
ls.campaignTimes = ls.campaignTimes[i+1:]
break
}
}
Expand All @@ -148,7 +150,6 @@ func (ls *Leadership) addCampaignTimes() {

// Campaign is used to campaign the leader with given lease and returns a leadership
func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
ls.addCampaignTimes()
ls.leaderValue = leaderData
// Create a new lease to campaign
newLease := &lease{
Expand Down
33 changes: 33 additions & 0 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,36 @@ func TestRequestProgress(t *testing.T) {
checkWatcherRequestProgress(false)
checkWatcherRequestProgress(true)
}

func TestCampaignTimes(t *testing.T) {
re := require.New(t)
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
leadership := NewLeadership(client, "test_leader", "test_leader")

// all the campaign times are within the timeout.
campaignTimesRecordTimeout = 10 * time.Second
defer func() {
campaignTimesRecordTimeout = 5 * time.Minute
}()
for i := 0; i < 3; i++ {
leadership.AddCampaignTimes()
time.Sleep(100 * time.Millisecond)
}
re.Equal(3, leadership.GetCampaignTimesNum())

// only the last 2 records are valid.
campaignTimesRecordTimeout = 200 * time.Millisecond
for i := 0; i < 3; i++ {
leadership.AddCampaignTimes()
time.Sleep(100 * time.Millisecond)
}
re.Equal(2, leadership.GetCampaignTimesNum())

time.Sleep(200 * time.Millisecond)
// need to wait for the next addCampaignTimes to update the campaign time.
re.Equal(2, leadership.GetCampaignTimesNum())
// check campaign leader frequency.
leadership.AddCampaignTimes()
re.Equal(1, leadership.GetCampaignTimesNum())
}
3 changes: 2 additions & 1 deletion pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,12 @@ func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time {
// and make it become a PD leader.
// leader should be changed when campaign leader frequently.
func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout int64) error {
m.leadership.AddCampaignTimes()
failpoint.Inject("skipCampaignLeaderCheck", func() {
failpoint.Return(m.leadership.Campaign(leaseTimeout, m.MemberValue()))
})

if m.leadership.GetCampaignTimesNum() >= campaignLeaderFrequencyTimes {
if m.leadership.GetCampaignTimesNum() > campaignLeaderFrequencyTimes {
if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil {
return err
}
Expand Down
10 changes: 8 additions & 2 deletions tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,20 +328,26 @@ func TestCampaignLeaderFrequently(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 5)
cluster, err := tests.NewTestCluster(ctx, 3)
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)
// the 1st time campaign leader.
cluster.WaitLeader()
leader := cluster.GetLeader()
re.NotEmpty(cluster.GetLeader())

for i := 0; i < 3; i++ {
// need to prevent 3 times(including the above 1st time) campaign leader in 5 min.
for i := 0; i < 2; i++ {
cluster.GetLeaderServer().ResetPDLeader()
cluster.WaitLeader()
re.Equal(leader, cluster.GetLeader())
}
// check for the 4th time.
cluster.GetLeaderServer().ResetPDLeader()
cluster.WaitLeader()
// PD leader should be different from before because etcd leader changed.
re.NotEmpty(cluster.GetLeader())
re.NotEqual(leader, cluster.GetLeader())
Expand Down