From 1fe976d675f19a4331195f6e6b7174cac166dab1 Mon Sep 17 00:00:00 2001 From: husharp Date: Fri, 10 May 2024 17:01:22 +0800 Subject: [PATCH] change log Signed-off-by: husharp --- pkg/election/leadership.go | 4 ++-- pkg/mcs/discovery/discover.go | 15 ++++++++------- pkg/mcs/scheduling/server/server.go | 12 +++++------- pkg/mcs/utils/util.go | 7 +++---- pkg/tso/global_allocator.go | 16 +++++++--------- server/apiv2/handlers/micro_service.go | 15 +++++++++++++-- 6 files changed, 38 insertions(+), 31 deletions(-) diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index b9a5d8ac1c22..a85bcb5c6375 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -391,9 +391,9 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) return } - // only API update the leader key to transfer the leader will meet + // only API update the leader key to transfer the primary will meet if ev.Type == mvccpb.PUT && ls.IsLeader() { - log.Info("[LeaderWatch] current leadership is updated", + log.Info("[PrimaryWatch] current leadership is updated", zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) return } diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index 4635ec373ba0..f4ad2ef0c098 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -81,7 +81,8 @@ func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistr return nil, errors.Errorf("unknown service name %s", serviceName) } -func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimary string) error { +// TransferPrimary transfers the primary of the specified service. +func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimary string, keyspaceGroupID uint32) error { log.Info("transfer primary", zap.String("service", serviceName), zap.String("from", oldPrimary), zap.String("to", newPrimary)) entries, err := GetMSMembers(serviceName, client) if err != nil { @@ -90,22 +91,22 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar // Do nothing when I am the only member of cluster. if len(entries) == 1 && newPrimary == "" { - return errors.New("no valid follower to transfer primary") + return errors.New("no valid secondary to transfer primary") } var primaryIDs []string - var memberValues []string + var secondaryValues []string for _, member := range entries { if (newPrimary == "" && member.ServiceAddr != oldPrimary) || (newPrimary != "" && member.ServiceAddr == newPrimary) { primaryIDs = append(primaryIDs, member.ServiceAddr) if string(member.MemberValue) == "" { return errors.New("member value is empty") } - memberValues = append(memberValues, string(member.MemberValue)) + secondaryValues = append(secondaryValues, string(member.MemberValue)) } } if len(primaryIDs) == 0 { - return errors.New("no valid follower to transfer primary") + return errors.New("no valid secondary to transfer primary") } r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -122,12 +123,12 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar primaryKey = endpoint.SchedulingPrimaryPath(clusterID) case utils.TSOServiceName: tsoRootPath := endpoint.TSOSvcRootPath(clusterID) - primaryKey = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, utils.DefaultKeyspaceGroupID) + primaryKey = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID) } // update primary key to notify old primary server. putResp, err := kv.NewSlowLogTxn(client). - Then(clientv3.OpPut(primaryKey, memberValues[nextPrimaryID])). + Then(clientv3.OpPut(primaryKey, secondaryValues[nextPrimaryID])). Commit() if err != nil || !putResp.Succeeded { return errors.Errorf("failed to write primary flag for %s", serviceName) diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index c17c1e5fe173..de7d03766183 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -355,13 +355,11 @@ func (s *Server) primaryWatch(ctx context.Context, exitPrimary chan struct{}) { s.participant.UnsetLeader() defer log.Info("scheduling primary exit the primary watch loop") - for { - select { - case <-ctx.Done(): - return - case exitPrimary <- struct{}{}: - return - } + select { + case <-ctx.Done(): + return + case exitPrimary <- struct{}{}: + return } } } diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index ccdf926a7935..913a5669cd74 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -76,18 +76,17 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err // GetExpectedPrimary indicates API has changed the primary, ONLY SET VALUE BY API. func GetExpectedPrimary(client *clientv3.Client, leaderPath string) string { - leader, err := etcdutil.GetValue(client, strings.Join([]string{leaderPath, ExpectedPrimary}, "/")) + primary, err := etcdutil.GetValue(client, strings.Join([]string{leaderPath, ExpectedPrimary}, "/")) if err != nil { log.Error("get expected primary key error", errs.ZapError(err)) return "" } - return string(leader) + return string(primary) } // RemoveExpectedPrimary removes the expected primary key. -// - removed when campaign new primary success -// - removed when old primary server is closed +// - removed when campaign new primary successfully func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) { log.Info("remove expected primary key", zap.String("leaderPath", leaderPath)) // remove expected leader key diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 9fb159bd3ad0..8e5cf04c923d 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -668,7 +668,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() { logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0)) return case <-exitPrimary: - log.Info("no longer a primary because primary have been updated, the TSO primary/leader will step down") + log.Info("no longer a primary because primary have been updated, the TSO primary will step down") return } } @@ -694,19 +694,17 @@ func (gta *GlobalTSOAllocator) primaryWatch(ctx context.Context, exitPrimary cha log.Error("tso primary getting the leader meets error", errs.ZapError(err)) return } - + // only trigger by updating primary if curPrimary != nil && resp.Kvs[0].Value != nil && string(curPrimary) != string(resp.Kvs[0].Value) { mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath()) gta.member.UnsetLeader() defer log.Info("tso primary exit the primary watch loop") - for { - select { - case <-ctx.Done(): - return - case exitPrimary <- struct{}{}: - return - } + select { + case <-ctx.Done(): + return + case exitPrimary <- struct{}{}: + return } } } diff --git a/server/apiv2/handlers/micro_service.go b/server/apiv2/handlers/micro_service.go index 954313dbd758..b27594790b38 100644 --- a/server/apiv2/handlers/micro_service.go +++ b/server/apiv2/handlers/micro_service.go @@ -16,9 +16,11 @@ package handlers import ( "net/http" + "strconv" "github.com/gin-gonic/gin" "github.com/tikv/pd/pkg/mcs/discovery" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/server" "github.com/tikv/pd/server/apiv2/middlewares" ) @@ -101,16 +103,25 @@ func TransferPrimary(c *gin.Context) { return } - newPrimary := "" + newPrimary, keyspaceGroupID := "", utils.DefaultKeyspaceGroupID if v, ok := input["new_primary"]; ok { newPrimary = v } + + if v, ok := input["keyspace_group_id"]; ok { + keyspaceGroupIDRaw, err := strconv.ParseUint(v, 10, 32) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + keyspaceGroupID = uint32(keyspaceGroupIDRaw) + } oldPrimary, _ := svr.GetServicePrimaryAddr(c.Request.Context(), service) if oldPrimary == newPrimary { c.AbortWithStatusJSON(http.StatusInternalServerError, "new primary is the same as the old one") return } - if err := discovery.TransferPrimary(svr.GetClient(), service, oldPrimary, newPrimary); err != nil { + if err := discovery.TransferPrimary(svr.GetClient(), service, oldPrimary, newPrimary, keyspaceGroupID); err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return }