Skip to content

Commit

Permalink
make test happy
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed May 9, 2024
1 parent 7fa19d3 commit 1f13fa2
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 19 deletions.
29 changes: 21 additions & 8 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ type Leadership struct {
leaderKey string
leaderValue string

LeaderWatch bool
leaderWatch struct {
syncutil.RWMutex
val bool
}

keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
Expand All @@ -74,10 +77,6 @@ type Leadership struct {
campaignTimes []time.Time
}

func (ls *Leadership) SetLeaderWatch(val bool) {
ls.LeaderWatch = val
}

func (ls *Leadership) GetLeaderValue() string {
return ls.leaderValue
}
Expand Down Expand Up @@ -123,6 +122,20 @@ func (ls *Leadership) GetLeaderKey() string {
return ls.leaderKey
}

// SetLeaderWatch sets the leader watch flag.
func (ls *Leadership) SetLeaderWatch(val bool) {
ls.leaderWatch.Lock()
ls.leaderWatch.val = val
ls.leaderWatch.Unlock()
}

// GetLeaderWatch gets the leader watch flag.
func (ls *Leadership) GetLeaderWatch() bool {
ls.leaderWatch.RLock()
defer ls.leaderWatch.RUnlock()
return ls.leaderWatch.val
}

// GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`.
func (ls *Leadership) GetCampaignTimesNum() int {
if ls == nil {
Expand Down Expand Up @@ -386,8 +399,8 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
return
}
// only API update the leader key to transfer the leader will meet
if ev.Type == mvccpb.PUT && ls.LeaderWatch {
log.Info("[LeaderWatch] current leadership is updated", zap.Int64("watchRevision", revision),
if ev.Type == mvccpb.PUT && ls.GetLeaderWatch() {
log.Info("[LeaderWatch] current leadership is updated",
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
}
Expand All @@ -409,5 +422,5 @@ func (ls *Leadership) Reset() {
}
ls.keepAliveCancelFuncLock.Unlock()
ls.getLease().Close()
ls.LeaderWatch = false
ls.SetLeaderWatch(false)
}
27 changes: 24 additions & 3 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"context"
"fmt"
"github.com/tikv/pd/pkg/utils/etcdutil"
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -306,7 +307,7 @@ func (s *Server) campaignLeader() {
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

exitPrimary := make(chan struct{})
go s.primaryWatch(exitPrimary)
go s.primaryWatch(ctx, exitPrimary)

leaderTicker := time.NewTicker(utils.LeaderTickInterval)
defer leaderTicker.Stop()
Expand All @@ -329,7 +330,7 @@ func (s *Server) campaignLeader() {
}
}

func (s *Server) primaryWatch(exitPrimary chan struct{}) {
func (s *Server) primaryWatch(ctx context.Context, exitPrimary chan struct{}) {
_, revision, err := s.participant.GetPersistentLeader()
if err != nil {
log.Error("[primary] getting the leader meets error", errs.ZapError(err))
Expand All @@ -341,10 +342,30 @@ func (s *Server) primaryWatch(exitPrimary chan struct{}) {
s.participant.GetLeadership().Watch(s.serverLoopCtx, revision+1)
s.participant.GetLeadership().SetLeaderWatch(false)

// only API update primary will set the expected leader
// check leader key whether deleted
leaderRaw, err := etcdutil.GetValue(s.participant.Client(), s.participant.GetLeaderPath())
if err != nil {
log.Error("[primary] get primary key error", zap.Error(err))
return
}
if leaderRaw == nil {
log.Info("[primary] leader key is deleted, the primary will step down")
return
}

utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath())

s.participant.UnsetLeader()
exitPrimary <- struct{}{}
for {
select {
case <-ctx.Done():
log.Info("[primary] exit the primary watch loop")
return
case exitPrimary <- struct{}{}:
return
}
}
}

// Close closes the server.
Expand Down
8 changes: 3 additions & 5 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package utils

import (
"context"
"github.com/tikv/pd/pkg/storage/kv"
"net"
"net/http"
"os"
Expand All @@ -33,6 +32,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/soheilhy/cmux"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand Down Expand Up @@ -91,29 +91,27 @@ func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) {
resp, err := kv.NewSlowLogTxn(client).
Then(clientv3.OpDelete(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))).
Commit()
if err != nil && !resp.Succeeded {
if err != nil || !resp.Succeeded {
log.Error("change primary error", errs.ZapError(err))
return
}
}

// SetExpectedPrimary sets the expected primary key when the current primary has exited.
func SetExpectedPrimary(client *clientv3.Client, leaderPath string) {
// write a flag to indicate the current primary has exited
leaderRaw, err := etcdutil.GetValue(client, leaderPath)
if err != nil {
log.Error("[primary] get primary key error", zap.Error(err))
return
}

// write a flag to indicate the current primary has exited
resp, err := kv.NewSlowLogTxn(client).
Then(
clientv3.OpPut(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"), string(leaderRaw)),
// indicate the current primary has exited
clientv3.OpDelete(leaderPath)).
Commit()
if err != nil && !resp.Succeeded {
if err != nil || !resp.Succeeded {
log.Error("change primary error", errs.ZapError(err))
return
}
Expand Down
27 changes: 24 additions & 3 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"github.com/tikv/pd/pkg/utils/etcdutil"
"runtime/trace"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -647,7 +648,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
zap.String("tso-primary-name", gta.member.Name()))

exitPrimary := make(chan struct{})
go gta.primaryWatch(exitPrimary)
go gta.primaryWatch(ctx, exitPrimary)

leaderTicker := time.NewTicker(mcsutils.LeaderTickInterval)
defer leaderTicker.Stop()
Expand All @@ -672,7 +673,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
}
}

func (gta *GlobalTSOAllocator) primaryWatch(exitPrimary chan struct{}) {
func (gta *GlobalTSOAllocator) primaryWatch(ctx context.Context, exitPrimary chan struct{}) {
_, revision, err := gta.member.GetPersistentLeader()
if err != nil {
log.Error("[primary] getting the leader meets error", errs.ZapError(err))
Expand All @@ -686,10 +687,30 @@ func (gta *GlobalTSOAllocator) primaryWatch(exitPrimary chan struct{}) {
gta.member.GetLeadership().Watch(gta.ctx, revision+1)
gta.member.GetLeadership().SetLeaderWatch(false)

// only API update primary will set the expected leader
// check leader key whether deleted
leaderRaw, err := etcdutil.GetValue(gta.member.Client(), gta.member.GetLeaderPath())
if err != nil {
log.Error("[primary] get primary key error", zap.Error(err))
return
}
if leaderRaw == nil {
log.Info("[primary] leader key is deleted, the primary will step down")
return
}

mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath())

gta.member.UnsetLeader()
exitPrimary <- struct{}{}
for {
select {
case <-ctx.Done():
log.Info("[primary] exit the primary watch loop")
return
case exitPrimary <- struct{}{}:
return
}
}
}

func (gta *GlobalTSOAllocator) getMetrics() *tsoMetrics {
Expand Down

0 comments on commit 1f13fa2

Please sign in to comment.