From 8d36be5914a30eb8126d3a9ee54678befa5b72c8 Mon Sep 17 00:00:00 2001 From: husharp Date: Thu, 9 May 2024 21:19:08 +0800 Subject: [PATCH] only trigger by updating Signed-off-by: husharp --- pkg/election/leadership.go | 6 ++-- pkg/mcs/scheduling/server/server.go | 39 ++++++++++++-------------- pkg/mcs/utils/util.go | 8 ++++-- pkg/member/member.go | 10 +++---- pkg/member/participant.go | 9 +++--- pkg/tso/allocator_manager.go | 2 -- pkg/tso/global_allocator.go | 39 ++++++++++++-------------- server/apiv2/handlers/micro_service.go | 4 +-- 8 files changed, 56 insertions(+), 61 deletions(-) diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 63f8e8ea54f..b9a5d8ac1c2 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -124,8 +124,8 @@ func (ls *Leadership) SetLeaderWatch(val bool) { ls.leaderWatch.Store(val) } -// GetLeaderWatch gets the leader watch flag. -func (ls *Leadership) GetLeaderWatch() bool { +// IsLeader gets the leader watch flag. +func (ls *Leadership) IsLeader() bool { return ls.leaderWatch.Load() } @@ -392,7 +392,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { return } // only API update the leader key to transfer the leader will meet - if ev.Type == mvccpb.PUT && ls.GetLeaderWatch() { + if ev.Type == mvccpb.PUT && ls.IsLeader() { log.Info("[LeaderWatch] current leadership is updated", zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) return diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index a6180ca4c3a..bf0077395bf 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -249,7 +249,7 @@ func (s *Server) primaryElectionLoop() { } // To make sure the expected leader(if exist) and primary are on the same server. - expectedPrimary := utils.GetExpectedPrimary(s.participant.GetLeaderPath(), s.GetClient()) + expectedPrimary := utils.GetExpectedPrimary(s.GetClient(), s.participant.GetLeaderPath()) if expectedPrimary != "" && expectedPrimary != s.participant.MemberValue() { log.Info("skip campaigning of scheduling primary and check later", zap.String("server-name", s.Name()), @@ -332,39 +332,36 @@ func (s *Server) campaignLeader() { } func (s *Server) primaryWatch(ctx context.Context, exitPrimary chan struct{}) { - _, revision, err := s.participant.GetPersistentLeader() - if err != nil { + resp, err := etcdutil.EtcdKVGet(s.participant.GetLeadership().GetClient(), s.participant.GetLeaderPath()) + if err != nil || resp == nil || len(resp.Kvs) == 0 { log.Error("[primary] getting the leader meets error", errs.ZapError(err)) return } log.Info("[primary] start to watch the primary", zap.Stringer("scheduling-primary", s.participant.GetLeader())) // Watch will keep looping and never return unless the primary has changed. s.participant.GetLeadership().SetLeaderWatch(true) - s.participant.GetLeadership().Watch(s.serverLoopCtx, revision+1) + s.participant.GetLeadership().Watch(s.serverLoopCtx, resp.Kvs[0].ModRevision+1) s.participant.GetLeadership().SetLeaderWatch(false) // only API update primary will set the expected leader - // check leader key whether deleted - leaderRaw, err := etcdutil.GetValue(s.participant.Client(), s.participant.GetLeaderPath()) + curPrimary, err := etcdutil.GetValue(s.participant.Client(), s.participant.GetLeaderPath()) if err != nil { - log.Error("[primary] get primary key error", zap.Error(err)) - return - } - if leaderRaw == nil { - log.Info("[primary] leader key is deleted, the primary will step down") + log.Error("[primary] getting the leader meets error", errs.ZapError(err)) return } + // only trigger by updating primary + if curPrimary != nil && resp.Kvs[0].Value != nil && string(curPrimary) != string(resp.Kvs[0].Value) { + utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath()) - utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath()) - - s.participant.UnsetLeader() - defer log.Info("[primary] exit the primary watch loop") - for { - select { - case <-ctx.Done(): - return - case exitPrimary <- struct{}{}: - return + s.participant.UnsetLeader() + defer log.Info("[primary] exit the primary watch loop") + for { + select { + case <-ctx.Done(): + return + case exitPrimary <- struct{}{}: + return + } } } } diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index b4d737c99e3..62bf8b320ad 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -75,8 +75,8 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err } // GetExpectedPrimary indicates API has changed the primary, ONLY SET VALUE BY API. -func GetExpectedPrimary(keyPath string, client *clientv3.Client) string { - leader, err := etcdutil.GetValue(client, strings.Join([]string{keyPath, ExpectedPrimary}, "/")) +func GetExpectedPrimary(client *clientv3.Client, leaderPath string) string { + leader, err := etcdutil.GetValue(client, strings.Join([]string{leaderPath, ExpectedPrimary}, "/")) if err != nil { log.Error("get expected primary key error", errs.ZapError(err)) return "" @@ -86,7 +86,10 @@ func GetExpectedPrimary(keyPath string, client *clientv3.Client) string { } // RemoveExpectedPrimary removes the expected primary key. +// - removed when campaign success +// - removed when server is closed func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) { + log.Info("remove expected primary key", zap.String("leaderPath", leaderPath)) // remove expected leader key resp, err := kv.NewSlowLogTxn(client). Then(clientv3.OpDelete(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))). @@ -99,6 +102,7 @@ func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) { // SetExpectedPrimary sets the expected primary key when the current primary has exited. func SetExpectedPrimary(client *clientv3.Client, leaderPath string) { + log.Info("set expected primary key", zap.String("leaderPath", leaderPath)) leaderRaw, err := etcdutil.GetValue(client, leaderPath) if err != nil { log.Error("[primary] get primary key error", zap.Error(err)) diff --git a/pkg/member/member.go b/pkg/member/member.go index 4522eb7ae33..e1c8994cb9f 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -210,8 +210,8 @@ func (m *EmbeddedEtcdMember) PreCheckLeader() error { return nil } -// GetPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). -func (m *EmbeddedEtcdMember) GetPersistentLeader() (any, int64, error) { +// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). +func (m *EmbeddedEtcdMember) getPersistentLeader() (*pdpb.Member, int64, error) { leader := &pdpb.Member{} ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader) if err != nil { @@ -233,17 +233,17 @@ func (m *EmbeddedEtcdMember) CheckLeader() (ElectionLeader, bool) { return nil, true } - leaderRaw, revision, err := m.GetPersistentLeader() + leader, revision, err := m.getPersistentLeader() if err != nil { log.Error("getting pd leader meets error", errs.ZapError(err)) time.Sleep(200 * time.Millisecond) return nil, true } - if leaderRaw == nil { + if leader == nil { // no leader yet return nil, false } - leader := leaderRaw.(*pdpb.Member) + if m.IsSameLeader(leader) { // oh, we are already a PD leader, which indicates we may meet something wrong // in previous CampaignLeader. We should delete the leadership and campaign again. diff --git a/pkg/member/participant.go b/pkg/member/participant.go index f74d17aee22..bd2ec4d0cf6 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -206,8 +206,8 @@ func (*Participant) PreCheckLeader() error { return nil } -// GetPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). -func (m *Participant) GetPersistentLeader() (any, int64, error) { +// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). +func (m *Participant) getPersistentLeader() (participant, int64, error) { leader := NewParticipantByService(m.serviceName) ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader) if err != nil { @@ -229,18 +229,17 @@ func (m *Participant) CheckLeader() (ElectionLeader, bool) { return nil, true } - leaderRaw, revision, err := m.GetPersistentLeader() + leader, revision, err := m.getPersistentLeader() if err != nil { log.Error("getting the leader meets error", errs.ZapError(err)) time.Sleep(200 * time.Millisecond) return nil, true } - if leaderRaw == nil { + if leader == nil { // no leader yet return nil, false } - leader := leaderRaw.(participant) if m.IsSameLeader(leader) { // oh, we are already the leader, which indicates we may meet something wrong // in previous CampaignLeader. We should delete the leadership and campaign again. diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index e3a0a8302f3..1e67646dd17 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -145,8 +145,6 @@ type ElectionMember interface { GetDCLocationPath(id uint64) string // PreCheckLeader does some pre-check before checking whether it's the leader. PreCheckLeader() error - // GetPersistentLeader returns the persistent leader. - GetPersistentLeader() (any, int64, error) // UnsetLeader unsets the member's leader. UnsetLeader() } diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 495c5ae6744..e17ec3f74ec 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "github.com/tikv/pd/pkg/utils/etcdutil" "runtime/trace" "sync" "sync/atomic" @@ -33,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -562,7 +562,7 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { } // To make sure the expected leader(if exist) and primary are on the same server. - targetPrimary := mcsutils.GetExpectedPrimary(gta.member.GetLeaderPath(), gta.member.Client()) + targetPrimary := mcsutils.GetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath()) if targetPrimary != "" && targetPrimary != gta.member.MemberValue() { log.Info("skip campaigning of scheduling primary and check later", zap.String("server-name", gta.member.Name()), @@ -675,8 +675,8 @@ func (gta *GlobalTSOAllocator) campaignLeader() { } func (gta *GlobalTSOAllocator) primaryWatch(ctx context.Context, exitPrimary chan struct{}) { - _, revision, err := gta.member.GetPersistentLeader() - if err != nil { + resp, err := etcdutil.EtcdKVGet(gta.member.GetLeadership().GetClient(), gta.member.GetLeaderPath()) + if err != nil || resp == nil || len(resp.Kvs) == 0 { log.Error("[primary] getting the leader meets error", errs.ZapError(err)) return } @@ -685,31 +685,28 @@ func (gta *GlobalTSOAllocator) primaryWatch(ctx context.Context, exitPrimary cha zap.String("campaign-tso-primary-name", gta.member.Name())) // Watch will keep looping and never return unless the primary has changed. gta.member.GetLeadership().SetLeaderWatch(true) - gta.member.GetLeadership().Watch(gta.ctx, revision+1) + gta.member.GetLeadership().Watch(gta.ctx, resp.Kvs[0].ModRevision+1) gta.member.GetLeadership().SetLeaderWatch(false) // only API update primary will set the expected leader - // check leader key whether deleted - leaderRaw, err := etcdutil.GetValue(gta.member.Client(), gta.member.GetLeaderPath()) + curPrimary, err := etcdutil.GetValue(gta.member.Client(), gta.member.GetLeaderPath()) if err != nil { - log.Error("[primary] get primary key error", zap.Error(err)) - return - } - if leaderRaw == nil { - log.Info("[primary] leader key is deleted, the primary will step down") + log.Error("[primary] getting the leader meets error", errs.ZapError(err)) return } - mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath()) + if curPrimary != nil && resp.Kvs[0].Value != nil && string(curPrimary) != string(resp.Kvs[0].Value) { + mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath()) - gta.member.UnsetLeader() - for { - select { - case <-ctx.Done(): - log.Info("[primary] exit the primary watch loop") - return - case exitPrimary <- struct{}{}: - return + gta.member.UnsetLeader() + for { + select { + case <-ctx.Done(): + log.Info("[primary] exit the primary watch loop") + return + case exitPrimary <- struct{}{}: + return + } } } } diff --git a/server/apiv2/handlers/micro_service.go b/server/apiv2/handlers/micro_service.go index 96179c41b0a..954313dbd75 100644 --- a/server/apiv2/handlers/micro_service.go +++ b/server/apiv2/handlers/micro_service.go @@ -84,8 +84,8 @@ func GetPrimary(c *gin.Context) { // @Summary Transfer the primary member of the specified service. // @Produce json // @Param service path string true "service name" -// @Param new_primary query string false "new primary address" -// @Success 200 {object} string +// @Param new_primary body string false "new primary address" +// @Success 200 string string // @Router /ms/primary/transfer/{service} [post] func TransferPrimary(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)