From 66fc7c48ca8f9eb395489549b22855f99e81d81e Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 31 Jul 2024 17:07:21 +0800 Subject: [PATCH 1/8] schedule: fix the filter metrics flush (#8097) (#8463) close tikv/pd#8098 schedule: fix the filter metrics flush Signed-off-by: nolouch Co-authored-by: nolouch --- pkg/schedule/schedulers/balance_leader.go | 2 +- pkg/schedule/schedulers/balance_region.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index a9b63b659ea..692256dc7d4 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -361,6 +361,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun if dryRun { collector = plan.NewCollector(basePlan) } + defer l.filterCounter.Flush() batch := l.conf.getBatch() balanceLeaderScheduleCounter.Inc() @@ -402,7 +403,6 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun } } } - l.filterCounter.Flush() l.retryQuota.GC(append(sourceCandidate.stores, targetCandidate.stores...)) return result, collector.GetPlans() } diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index 36e8becf3fb..dca22475808 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -121,6 +121,7 @@ func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster func (s *balanceRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { basePlan := plan.NewBalanceSchedulerPlan() + defer s.filterCounter.Flush() var collector *plan.Collector if dryRun { collector = plan.NewCollector(basePlan) @@ -224,7 +225,6 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun } s.retryQuota.Attenuate(solver.Source) } - s.filterCounter.Flush() s.retryQuota.GC(stores) return nil, collector.GetPlans() } From 1f5e9b60fcd3c3e80c7751efcac44ed7ff25ea5b Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 31 Jul 2024 17:44:51 +0800 Subject: [PATCH 2/8] *: fix redact log (#8415) (#8472) close tikv/pd#8419 Signed-off-by: Ryan Leung Co-authored-by: Ryan Leung --- pkg/keyspace/keyspace.go | 5 +++-- pkg/schedule/labeler/rules.go | 36 ++++++++++++++++++++++++++++++++ server/cluster/cluster_worker.go | 4 ++-- 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index d84b3698f69..71390f80e64 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" ) @@ -328,7 +329,7 @@ func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (er if waitRegionSplit { ranges := keyspaceRule.Data.([]*labeler.KeyRangeRule) if len(ranges) < 2 { - log.Warn("[keyspace] failed to split keyspace region with insufficient range", zap.Any("label-rule", keyspaceRule)) + log.Warn("[keyspace] failed to split keyspace region with insufficient range", logutil.ZapRedactString("label-rule", keyspaceRule.String())) return ErrRegionSplitFailed } rawLeftBound, rawRightBound := ranges[0].StartKey, ranges[0].EndKey @@ -377,7 +378,7 @@ func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (er log.Info("[keyspace] added region label for keyspace", zap.Uint32("keyspace-id", id), - zap.Any("label-rule", keyspaceRule), + logutil.ZapRedactString("label-rule", keyspaceRule.String()), zap.Duration("takes", time.Since(start)), ) return diff --git a/pkg/schedule/labeler/rules.go b/pkg/schedule/labeler/rules.go index 3462cb7c459..39a420032d8 100644 --- a/pkg/schedule/labeler/rules.go +++ b/pkg/schedule/labeler/rules.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "reflect" + "strings" "time" "github.com/pingcap/failpoint" @@ -38,6 +39,10 @@ type RegionLabel struct { expire *time.Time } +func (l *RegionLabel) String() string { + return fmt.Sprintf("key: %s, value: %s", l.Key, l.Value) +} + // LabelRule is the rule to assign labels to a region. // NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. type LabelRule struct { @@ -49,6 +54,37 @@ type LabelRule struct { minExpire *time.Time } +func (rule *LabelRule) String() string { + var b strings.Builder + b.WriteString(fmt.Sprintf("id: %s, index: %d, type: %s", rule.ID, rule.Index, rule.RuleType)) + b.WriteString(", labels: ") + for i, l := range rule.Labels { + if i == 0 { + b.WriteString("[") + } + b.WriteString(l.String()) + if i == len(rule.Labels)-1 { + b.WriteString("]") + } else { + b.WriteString(", ") + } + } + b.WriteString(", data: ") + ranges := rule.Data.([]*KeyRangeRule) + for i, r := range ranges { + if i == 0 { + b.WriteString("[") + } + b.WriteString(fmt.Sprintf("startKey: {%s}, endKey: {%s}", r.StartKeyHex, r.EndKeyHex)) + if i == len(ranges)-1 { + b.WriteString("]") + } else { + b.WriteString(", ") + } + } + return b.String() +} + // NewLabelRuleFromJSON creates a label rule from the JSON data. func NewLabelRuleFromJSON(data []byte) (*LabelRule, error) { lr := &LabelRule{} diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index ecf2f8dc01b..14a73d2fe62 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -221,7 +221,7 @@ func (c *RaftCluster) HandleBatchReportSplit(request *pdpb.ReportBatchSplitReque err := c.checkSplitRegions(regions) if err != nil { log.Warn("report batch split region is invalid", - zap.Stringer("region-meta", hrm), + logutil.ZapRedactStringer("region-meta", hrm), errs.ZapError(err)) return nil, err } @@ -230,7 +230,7 @@ func (c *RaftCluster) HandleBatchReportSplit(request *pdpb.ReportBatchSplitReque hrm = core.RegionsToHexMeta(regions[:last]) log.Info("region batch split, generate new regions", zap.Uint64("region-id", originRegion.GetId()), - zap.Stringer("origin", hrm), + logutil.ZapRedactStringer("origin", hrm), zap.Int("total", last)) return &pdpb.ReportBatchSplitResponse{}, nil } From 9fe738a05c2a9d0ac6d79af66b9211221235bd6c Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 31 Jul 2024 22:29:52 +0800 Subject: [PATCH 3/8] schedule: fix progress cannot display when enabling scheduling service (#8334) (#8471) close tikv/pd#8331 Signed-off-by: Ryan Leung Co-authored-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/core/region.go | 8 +++ pkg/schedule/coordinator.go | 1 - pkg/schedule/prepare_checker.go | 9 ++- server/cluster/cluster.go | 15 +++- .../mcs/scheduling/server_test.go | 69 +++++++++++++++++++ tests/testutil.go | 5 ++ 6 files changed, 100 insertions(+), 7 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index f7a4ef5f0fd..50fcfbd4b9a 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -44,6 +44,8 @@ import ( const ( randomRegionMaxRetry = 10 scanRegionLimit = 1000 + // CollectFactor is the factor of collected region heartbeat. + CollectFactor = 0.9 ) // errRegionIsStale is error info for region is stale. @@ -1396,6 +1398,12 @@ func (r *RegionsInfo) GetNotFromStorageRegionsCntByStore(storeID uint64) int { return r.getNotFromStorageRegionsCntByStoreLocked(storeID) } +// IsStorePrepared checks if a store is prepared. +// For each store, the number of active regions should be more than total region of the store * CollectFactor +func (r *RegionsInfo) IsStorePrepared(storeID uint64) bool { + return float64(r.GetNotFromStorageRegionsCntByStore(storeID)) >= float64(r.GetStoreRegionCount(storeID))*CollectFactor +} + // getNotFromStorageRegionsCntByStoreLocked gets the `NotFromStorageRegionsCnt` count of a store's leader, follower and learner by storeID. func (r *RegionsInfo) getNotFromStorageRegionsCntByStoreLocked(storeID uint64) int { return r.leaders[storeID].notFromStorageRegionsCount() + r.followers[storeID].notFromStorageRegionsCount() + r.learners[storeID].notFromStorageRegionsCount() diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 5ab38aad81d..1eb220b39aa 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -46,7 +46,6 @@ import ( const ( runSchedulerCheckInterval = 3 * time.Second checkSuspectRangesInterval = 100 * time.Millisecond - collectFactor = 0.9 collectTimeout = 5 * time.Minute maxLoadConfigRetries = 10 // pushOperatorTickInterval is the interval try to push the operator. diff --git a/pkg/schedule/prepare_checker.go b/pkg/schedule/prepare_checker.go index 126e3bba41d..df7074b9073 100644 --- a/pkg/schedule/prepare_checker.go +++ b/pkg/schedule/prepare_checker.go @@ -51,8 +51,8 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti } notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt() totalRegionsCnt := c.GetTotalRegionCount() - // The number of active regions should be more than total region of all stores * collectFactor - if float64(totalRegionsCnt)*collectFactor > float64(notLoadedFromRegionsCnt) { + // The number of active regions should be more than total region of all stores * core.CollectFactor + if float64(totalRegionsCnt)*core.CollectFactor > float64(notLoadedFromRegionsCnt) { return false } for _, store := range c.GetStores() { @@ -61,11 +61,10 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti } storeID := store.GetID() // It is used to avoid sudden scheduling when scheduling service is just started. - if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) { + if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) { return false } - // For each store, the number of active regions should be more than total region of the store * collectFactor - if float64(c.GetStoreRegionCount(storeID))*collectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) { + if !c.IsStorePrepared(storeID) { return false } } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 2269301f255..15a990f11ba 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1697,6 +1697,19 @@ func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error { return nil } +func (c *RaftCluster) isStorePrepared() bool { + for _, store := range c.GetStores() { + if !store.IsPreparing() && !store.IsServing() { + continue + } + storeID := store.GetID() + if !c.IsStorePrepared(storeID) { + return false + } + } + return true +} + func (c *RaftCluster) checkStores() { var offlineStores []*metapb.Store var upStoreCount int @@ -1728,7 +1741,7 @@ func (c *RaftCluster) checkStores() { zap.Int("region-count", c.GetTotalRegionCount()), errs.ZapError(err)) } - } else if c.IsPrepared() { + } else if c.IsPrepared() || (c.IsServiceIndependent(mcsutils.SchedulingServiceName) && c.isStorePrepared()) { threshold := c.getThreshold(stores, store) regionSize := float64(store.GetRegionSize()) log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize)) diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 38c1cc6a41b..b317dd01f90 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -675,3 +675,72 @@ func (suite *multipleServerTestSuite) TestReElectLeader() { rc = suite.pdLeader.GetServer().GetRaftCluster() rc.IsPrepared() } + +func (suite *serverTestSuite) TestOnlineProgress() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + rc := suite.pdLeader.GetServer().GetRaftCluster() + re.NotNil(rc) + s := &server.GrpcServer{Server: suite.pdLeader.GetServer()} + for i := uint64(1); i <= 3; i++ { + resp, err := s.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: i, + Address: fmt.Sprintf("mock://%d", i), + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + } + regionLen := 1000 + regions := tests.InitRegions(regionLen) + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + re.NoError(err) + } + time.Sleep(2 * time.Second) + + // add a new store + resp, err := s.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: 4, + Address: fmt.Sprintf("mock://%d", 4), + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + + time.Sleep(2 * time.Second) + for i, r := range regions { + if i < 50 { + r.GetMeta().Peers[2].StoreId = 4 + r.GetMeta().RegionEpoch.ConfVer = 2 + r.GetMeta().RegionEpoch.Version = 2 + err = rc.HandleRegionHeartbeat(r) + re.NoError(err) + } + } + time.Sleep(2 * time.Second) + action, progress, ls, cs, err := rc.GetProgressByID("4") + re.Equal("preparing", action) + re.NotEmpty(progress) + re.NotEmpty(cs) + re.NotEmpty(ls) + re.NoError(err) + suite.TearDownSuite() + suite.SetupSuite() +} diff --git a/tests/testutil.go b/tests/testutil.go index 106cddc9dfb..77514a662c9 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -395,6 +395,11 @@ func InitRegions(regionLen int) []*core.RegionInfo { {Id: allocator.alloc(), StoreId: uint64(3)}, }, } + if i == 0 { + r.StartKey = []byte{} + } else if i == regionLen-1 { + r.EndKey = []byte{} + } region := core.NewRegionInfo(r, r.Peers[0], core.SetSource(core.Heartbeat)) // Here is used to simulate the upgrade process. if i < regionLen/2 { From ae905fa2b734a4affd6b966c7b13e3788d608206 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 1 Aug 2024 10:56:50 +0800 Subject: [PATCH 4/8] mcs: update node every restart (#8302) (#8468) close tikv/pd#8154 Signed-off-by: Ryan Leung Co-authored-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/keyspace/tso_keyspace_group.go | 62 ++++++------ pkg/utils/typeutil/comparison.go | 11 +++ pkg/utils/typeutil/comparison_test.go | 8 ++ server/apiv2/handlers/tso_keyspace_group.go | 12 ++- .../mcs/keyspace/tso_keyspace_group_test.go | 96 +++++++++++++++++++ 5 files changed, 157 insertions(+), 32 deletions(-) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 5ed9747e923..372356d716c 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -36,6 +36,7 @@ import ( "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -181,10 +182,6 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) { return case <-ticker.C: } - countOfNodes := m.GetNodesCount() - if countOfNodes < utils.DefaultKeyspaceGroupReplicaCount { - continue - } groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0) if err != nil { log.Error("failed to load all keyspace groups", zap.Error(err)) @@ -194,23 +191,26 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) { if len(groups) == 0 { continue } - withError := false for _, group := range groups { - if len(group.Members) < utils.DefaultKeyspaceGroupReplicaCount { - nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.DefaultKeyspaceGroupReplicaCount) + existMembers := make(map[string]struct{}) + for _, member := range group.Members { + if exist, addr := m.IsExistNode(member.Address); exist { + existMembers[addr] = struct{}{} + } + } + numExistMembers := len(existMembers) + if numExistMembers != 0 && numExistMembers == len(group.Members) && numExistMembers == m.GetNodesCount() { + continue + } + if numExistMembers < utils.DefaultKeyspaceGroupReplicaCount { + nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, existMembers, utils.DefaultKeyspaceGroupReplicaCount) if err != nil { - withError = true log.Error("failed to alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", group.ID), zap.Error(err)) continue } group.Members = nodes } } - if !withError { - // all keyspace groups have equal or more than default replica count - log.Info("all keyspace groups have equal or more than default replica count, stop to alloc node") - return - } } } @@ -745,7 +745,7 @@ func (m *GroupManager) GetNodesCount() int { } // AllocNodesForKeyspaceGroup allocates nodes for the keyspace group. -func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) { +func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, existMembers map[string]struct{}, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) { m.Lock() defer m.Unlock() ctx, cancel := context.WithTimeout(m.ctx, allocNodesTimeout) @@ -770,32 +770,34 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount if kg.IsMerging() { return ErrKeyspaceGroupInMerging(id) } - exists := make(map[string]struct{}) - for _, member := range kg.Members { - exists[member.Address] = struct{}{} - nodes = append(nodes, member) - } - if len(exists) >= desiredReplicaCount { - return nil + + for addr := range existMembers { + nodes = append(nodes, endpoint.KeyspaceGroupMember{ + Address: addr, + Priority: utils.DefaultKeyspaceGroupReplicaPriority, + }) } - for len(exists) < desiredReplicaCount { + + for len(existMembers) < desiredReplicaCount { select { case <-ctx.Done(): return nil case <-ticker.C: } - countOfNodes := m.GetNodesCount() - if countOfNodes < desiredReplicaCount || countOfNodes == 0 { // double check + if m.GetNodesCount() == 0 { // double check return ErrNoAvailableNode } + if len(existMembers) == m.GetNodesCount() { + break + } addr := m.nodesBalancer.Next() if addr == "" { return ErrNoAvailableNode } - if _, ok := exists[addr]; ok { + if _, ok := existMembers[addr]; ok { continue } - exists[addr] = struct{}{} + existMembers[addr] = struct{}{} nodes = append(nodes, endpoint.KeyspaceGroupMember{ Address: addr, Priority: utils.DefaultKeyspaceGroupReplicaPriority, @@ -894,14 +896,14 @@ func (m *GroupManager) SetPriorityForKeyspaceGroup(id uint32, node string, prior } // IsExistNode checks if the node exists. -func (m *GroupManager) IsExistNode(addr string) bool { +func (m *GroupManager) IsExistNode(addr string) (bool, string) { nodes := m.nodesBalancer.GetAll() for _, node := range nodes { - if node == addr { - return true + if typeutil.EqualBaseURLs(node, addr) { + return true, node } } - return false + return false, "" } // MergeKeyspaceGroups merges the keyspace group in the list into the target keyspace group. diff --git a/pkg/utils/typeutil/comparison.go b/pkg/utils/typeutil/comparison.go index c976ac47102..f4fb602a2f7 100644 --- a/pkg/utils/typeutil/comparison.go +++ b/pkg/utils/typeutil/comparison.go @@ -17,6 +17,7 @@ package typeutil import ( "math" "sort" + "strings" "time" ) @@ -78,3 +79,13 @@ func AreStringSlicesEquivalent(a, b []string) bool { func Float64Equal(a, b float64) bool { return math.Abs(a-b) <= 1e-6 } + +// EqualBaseURLs compares two URLs without scheme. +func EqualBaseURLs(url1, url2 string) bool { + return TrimScheme(url1) == TrimScheme(url2) +} + +// TrimScheme trims the scheme from the URL. +func TrimScheme(s string) string { + return strings.TrimPrefix(strings.TrimPrefix(s, "https://"), "http://") +} diff --git a/pkg/utils/typeutil/comparison_test.go b/pkg/utils/typeutil/comparison_test.go index b296405b3d5..05f0e5c0baf 100644 --- a/pkg/utils/typeutil/comparison_test.go +++ b/pkg/utils/typeutil/comparison_test.go @@ -71,3 +71,11 @@ func TestAreStringSlicesEquivalent(t *testing.T) { re.False(AreStringSlicesEquivalent([]string{}, []string{"a", "b"})) re.False(AreStringSlicesEquivalent([]string{"a", "b"}, []string{})) } + +func TestCompareURLsWithoutScheme(t *testing.T) { + re := require.New(t) + re.True(EqualBaseURLs("", "")) + re.True(EqualBaseURLs("http://127.0.0.1", "http://127.0.0.1")) + re.True(EqualBaseURLs("http://127.0.0.1", "https://127.0.0.1")) + re.True(EqualBaseURLs("127.0.0.1", "http://127.0.0.1")) +} diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index a9f042687f6..835b4a5242b 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -413,8 +413,16 @@ func AllocNodesForKeyspaceGroup(c *gin.Context) { c.AbortWithStatusJSON(http.StatusBadRequest, "existed replica is larger than the new replica") return } + + // check if nodes exist + existMembers := make(map[string]struct{}) + for _, member := range keyspaceGroup.Members { + if exist, addr := manager.IsExistNode(member.Address); exist { + existMembers[addr] = struct{}{} + } + } // get the nodes - nodes, err := manager.AllocNodesForKeyspaceGroup(id, allocParams.Replica) + nodes, err := manager.AllocNodesForKeyspaceGroup(id, existMembers, allocParams.Replica) if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return @@ -460,7 +468,7 @@ func SetNodesForKeyspaceGroup(c *gin.Context) { } // check if node exists for _, node := range setParams.Nodes { - if !manager.IsExistNode(node) { + if exist, _ := manager.IsExistNode(node); !exist { c.AbortWithStatusJSON(http.StatusBadRequest, "node does not exist") return } diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index ccec0a7cdc0..33e775211d2 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -318,6 +318,102 @@ func (suite *keyspaceGroupTestSuite) TestDefaultKeyspaceGroup() { } } +func (suite *keyspaceGroupTestSuite) TestAllocNodes() { + re := suite.Require() + // add three nodes. + nodes := make(map[string]bs.Server) + var cleanups []func() + defer func() { + for _, cleanup := range cleanups { + cleanup() + } + }() + for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount+1; i++ { + s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) + cleanups = append(cleanups, cleanup) + nodes[s.GetAddr()] = s + } + tests.WaitForPrimaryServing(re, nodes) + + // create a keyspace group. + kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: uint32(1), + UserKind: endpoint.Standard.String(), + }, + }} + code := suite.tryCreateKeyspaceGroup(re, kgs) + re.Equal(http.StatusOK, code) + + // alloc nodes for the keyspace group + var kg *endpoint.KeyspaceGroup + testutil.Eventually(re, func() bool { + kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID) + return code == http.StatusOK && kg != nil && len(kg.Members) == utils.DefaultKeyspaceGroupReplicaCount + }) + stopNode := kg.Members[0].Address + // close one of members + nodes[stopNode].Close() + + // the member list will be updated + testutil.Eventually(re, func() bool { + kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID) + for _, member := range kg.Members { + if member.Address == stopNode { + return false + } + } + return code == http.StatusOK && kg != nil && len(kg.Members) == utils.DefaultKeyspaceGroupReplicaCount + }) +} + +func (suite *keyspaceGroupTestSuite) TestAllocOneNode() { + re := suite.Require() + // add one tso server + nodes := make(map[string]bs.Server) + oldTSOServer, cleanupOldTSOserver := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) + defer cleanupOldTSOserver() + nodes[oldTSOServer.GetAddr()] = oldTSOServer + + tests.WaitForPrimaryServing(re, nodes) + + // create a keyspace group. + kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: uint32(1), + UserKind: endpoint.Standard.String(), + }, + }} + code := suite.tryCreateKeyspaceGroup(re, kgs) + re.Equal(http.StatusOK, code) + + // alloc nodes for the keyspace group + var kg *endpoint.KeyspaceGroup + testutil.Eventually(re, func() bool { + kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID) + return code == http.StatusOK && kg != nil && len(kg.Members) == 1 + }) + stopNode := kg.Members[0].Address + // close old tso server + nodes[stopNode].Close() + + // create a new tso server + newTSOServer, cleanupNewTSOServer := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) + defer cleanupNewTSOServer() + nodes[newTSOServer.GetAddr()] = newTSOServer + + tests.WaitForPrimaryServing(re, nodes) + + // the member list will be updated + testutil.Eventually(re, func() bool { + kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID) + if len(kg.Members) != 0 && kg.Members[0].Address == stopNode { + return false + } + return code == http.StatusOK && kg != nil && len(kg.Members) == 1 + }) +} + func (suite *keyspaceGroupTestSuite) tryAllocNodesForKeyspaceGroup(re *require.Assertions, id int, request *handlers.AllocNodesForKeyspaceGroupParams) ([]endpoint.KeyspaceGroupMember, int) { data, err := json.Marshal(request) re.NoError(err) From a20565e3290d40d4ffd7b4c2955cd2f5606ef043 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 1 Aug 2024 11:07:20 +0800 Subject: [PATCH 5/8] mcs: tso server compare address without scheme (#8283) (#8470) close tikv/pd#8284 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Co-authored-by: lhy1024 --- pkg/keyspace/tso_keyspace_group.go | 2 +- pkg/mcs/tso/server/grpc_service.go | 3 +-- pkg/storage/endpoint/tso_keyspace_group.go | 9 +++++++++ pkg/tso/keyspace_group_manager.go | 8 ++++---- pkg/tso/keyspace_group_manager_test.go | 2 +- server/apiv2/handlers/tso_keyspace_group.go | 2 +- 6 files changed, 17 insertions(+), 9 deletions(-) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 372356d716c..40d21b4111f 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -876,7 +876,7 @@ func (m *GroupManager) SetPriorityForKeyspaceGroup(id uint32, node string, prior inKeyspaceGroup := false members := make([]endpoint.KeyspaceGroupMember, 0, len(kg.Members)) for _, member := range kg.Members { - if member.Address == node { + if member.IsAddressEquivalent(node) { inKeyspaceGroup = true member.Priority = priority } diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index 31a74f2a688..b9f95be15e0 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -19,7 +19,6 @@ import ( "io" "net/http" "strconv" - "strings" "time" "github.com/pingcap/errors" @@ -164,7 +163,7 @@ func (s *Service) FindGroupByKeyspaceID( Address: member.Address, // TODO: watch the keyspace groups' primary serving address changes // to get the latest primary serving addresses of all keyspace groups. - IsPrimary: strings.EqualFold(member.Address, am.GetLeaderAddr()), + IsPrimary: member.IsAddressEquivalent(am.GetLeaderAddr()), }) } diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index 39a08afe937..dd3d4f91eb3 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -20,6 +20,7 @@ import ( "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" ) @@ -80,6 +81,14 @@ type KeyspaceGroupMember struct { Priority int `json:"priority"` } +// IsAddressEquivalent compares the address with the given address. +// It compares the address without the scheme. +// Otherwise, it will not work when we update the scheme from http to https. +// Issue: https://github.com/tikv/pd/issues/8284 +func (m *KeyspaceGroupMember) IsAddressEquivalent(addr string) bool { + return typeutil.EqualBaseURLs(m.Address, addr) +} + // SplitState defines the split state of a keyspace group. type SplitState struct { // SplitSource is the current keyspace group ID from which the keyspace group is split. diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index d259ab27a5b..79521e90e8f 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -290,7 +290,7 @@ func (s *state) getNextPrimaryToReset( if member.Priority > maxPriority { maxPriority = member.Priority } - if member.Address == localAddress { + if member.IsAddressEquivalent(localAddress) { localPriority = member.Priority } } @@ -625,7 +625,7 @@ func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() { if member != nil { aliveTSONodes := make(map[string]struct{}) kgm.tsoNodes.Range(func(key, _ any) bool { - aliveTSONodes[key.(string)] = struct{}{} + aliveTSONodes[typeutil.TrimScheme(key.(string))] = struct{}{} return true }) if len(aliveTSONodes) == 0 { @@ -638,7 +638,7 @@ func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() { if member.Priority <= localPriority { continue } - if _, ok := aliveTSONodes[member.Address]; ok { + if _, ok := aliveTSONodes[typeutil.TrimScheme(member.Address)]; ok { resetLeader = true break } @@ -667,7 +667,7 @@ func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() { func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool { return slice.AnyOf(group.Members, func(i int) bool { - return group.Members[i].Address == kgm.tsoServiceID.ServiceAddr + return group.Members[i].IsAddressEquivalent(kgm.tsoServiceID.ServiceAddr) }) } diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 9fe6da6edc9..4c9c9f44707 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -891,7 +891,7 @@ func collectAssignedKeyspaceGroupIDs(re *require.Assertions, kgm *KeyspaceGroupM re.Equal(i, int(am.kgID)) re.Equal(i, int(kg.ID)) for _, m := range kg.Members { - if m.Address == kgm.tsoServiceID.ServiceAddr { + if m.IsAddressEquivalent(kgm.tsoServiceID.ServiceAddr) { ids = append(ids, uint32(i)) break } diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index 835b4a5242b..b4149c23f03 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -520,7 +520,7 @@ func SetPriorityForKeyspaceGroup(c *gin.Context) { // check if node exists members := kg.Members if slice.NoneOf(members, func(i int) bool { - return members[i].Address == node + return members[i].IsAddressEquivalent(node) }) { c.AbortWithStatusJSON(http.StatusBadRequest, "tso node does not exist in the keyspace group") } From 18892e5bbfee71053308f61a965866b32a627722 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 1 Aug 2024 11:23:50 +0800 Subject: [PATCH 6/8] apiutil/middleware: add retry logic for obtaining PD leader in redirector (#8216) (#8466) close tikv/pd#8142 Add retry logic to improve PD HTTP request forwarding success rate during PD leader switch. Signed-off-by: JmPotato Co-authored-by: JmPotato --- pkg/utils/apiutil/serverapi/middleware.go | 52 +++++++++++++++++++---- tests/server/api/api_test.go | 19 +++++++++ 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 2432e15c967..ce3617453d2 100755 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -18,7 +18,9 @@ import ( "net/http" "net/url" "strings" + "time" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" mcsutils "github.com/tikv/pd/pkg/mcs/utils" @@ -204,20 +206,19 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http clientUrls = append(clientUrls, targetAddr) // Add a header to the response, it is used to mark whether the request has been forwarded to the micro service. w.Header().Add(apiutil.XForwardedToMicroServiceHeader, "true") - } else { - leader := h.s.GetMember().GetLeader() + } else if name := r.Header.Get(apiutil.PDRedirectorHeader); len(name) == 0 { + leader := h.waitForLeader(r) if leader == nil { http.Error(w, "no leader", http.StatusServiceUnavailable) return } clientUrls = leader.GetClientUrls() - // Prevent more than one redirection among PD/API servers. - if name := r.Header.Get(apiutil.PDRedirectorHeader); len(name) != 0 { - log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect)) - http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError) - return - } r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name()) + } else { + // Prevent more than one redirection among PD/API servers. + log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect)) + http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError) + return } urls := make([]url.URL, 0, len(clientUrls)) @@ -233,3 +234,38 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http client := h.s.GetHTTPClient() apiutil.NewCustomReverseProxies(client, urls).ServeHTTP(w, r) } + +const ( + backoffMaxDelay = 3 * time.Second + backoffInterval = 100 * time.Millisecond +) + +// If current server does not have a leader, backoff to increase the chance of success. +func (h *redirector) waitForLeader(r *http.Request) (leader *pdpb.Member) { + var ( + interval = backoffInterval + maxDelay = backoffMaxDelay + curDelay = time.Duration(0) + ) + for { + leader = h.s.GetMember().GetLeader() + if leader != nil { + return + } + select { + case <-time.After(interval): + curDelay += interval + if curDelay >= maxDelay { + return + } + interval *= 2 + if curDelay+interval > maxDelay { + interval = maxDelay - curDelay + } + case <-r.Context().Done(): + return + case <-h.s.Context().Done(): + return + } + } +} diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index b70c688993d..b32739b631e 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -605,6 +605,25 @@ func (suite *redirectorTestSuite) TestRedirect() { re.Equal(h, header) } } + // Test redirect during leader election. + leader = suite.cluster.GetLeaderServer() + re.NotNil(leader) + err := leader.ResignLeader() + re.NoError(err) + for _, svr := range suite.cluster.GetServers() { + request, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/pd/api/v1/version", svr.GetServer().GetAddr()), http.NoBody) + re.NoError(err) + testutil.Eventually(re, func() bool { + resp, err := dialClient.Do(request) + re.NoError(err) + defer resp.Body.Close() + _, err = io.ReadAll(resp.Body) + re.NoError(err) + // Should not meet 503 since the retry logic ensure the request is sent to the new leader eventually. + re.NotEqual(http.StatusServiceUnavailable, resp.StatusCode) + return resp.StatusCode == http.StatusOK + }) + } } func (suite *redirectorTestSuite) TestAllowFollowerHandle() { From 52ec66d9b36365f776f64474dd13e937b3e618bc Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 1 Aug 2024 11:50:50 +0800 Subject: [PATCH 7/8] api: remove set node replica check (#8153) (#8467) ref tikv/pd#8154 Signed-off-by: Ryan Leung Co-authored-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/apiv2/handlers/tso_keyspace_group.go | 5 ----- tests/integrations/mcs/keyspace/tso_keyspace_group_test.go | 2 +- tools/pd-ctl/tests/keyspace/keyspace_group_test.go | 2 +- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index b4149c23f03..6dafc98e603 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -461,11 +461,6 @@ func SetNodesForKeyspaceGroup(c *gin.Context) { c.AbortWithStatusJSON(http.StatusBadRequest, "keyspace group does not exist") return } - // check if nodes is less than default replica count - if len(setParams.Nodes) < utils.DefaultKeyspaceGroupReplicaCount { - c.AbortWithStatusJSON(http.StatusBadRequest, "invalid num of nodes") - return - } // check if node exists for _, node := range setParams.Nodes { if exist, _ := manager.IsExistNode(node); !exist { diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index 33e775211d2..715fbf825b9 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -280,7 +280,7 @@ func (suite *keyspaceGroupTestSuite) TestSetNodes() { Nodes: []string{nodesList[0]}, } _, code = suite.trySetNodesForKeyspaceGroup(re, id, params) - re.Equal(http.StatusBadRequest, code) + re.Equal(http.StatusOK, code) // the keyspace group is not exist. id = 2 diff --git a/tools/pd-ctl/tests/keyspace/keyspace_group_test.go b/tools/pd-ctl/tests/keyspace/keyspace_group_test.go index 5d85f35dacf..b999b1bd0fd 100644 --- a/tools/pd-ctl/tests/keyspace/keyspace_group_test.go +++ b/tools/pd-ctl/tests/keyspace/keyspace_group_test.go @@ -263,7 +263,7 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { args := []string{"-u", pdAddr, "keyspace-group", "set-node", defaultKeyspaceGroupID, tsoAddrs[0]} output, err := tests.ExecuteCommand(cmd, args...) re.NoError(err) - re.Contains(string(output), "invalid num of nodes") + re.Contains(string(output), "Success!") args = []string{"-u", pdAddr, "keyspace-group", "set-node", defaultKeyspaceGroupID, "", ""} output, err = tests.ExecuteCommand(cmd, args...) re.NoError(err) From 5e84948d47a8d523e7cfa5b8d8401b8e09b7e82c Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 1 Aug 2024 13:23:50 +0800 Subject: [PATCH 8/8] client/http, api/middleware: enhance the retry logic of the HTTP client (#8229) (#8464) ref tikv/pd#7300 Schedule a member change check when the HTTP status code is 503 or receives a leader/primary change error. Signed-off-by: JmPotato Co-authored-by: JmPotato --- client/client.go | 11 ----- client/errs/errno.go | 13 +++-- client/errs/errs.go | 18 +++++++ client/http/client.go | 49 +++++++++++++------ client/http/request_info.go | 11 +++++ client/pd_service_discovery_test.go | 3 +- client/resource_manager_client.go | 7 +-- client/tso_dispatcher.go | 2 +- errors.toml | 10 ++++ pkg/errs/errno.go | 9 ++-- .../apiutil/multiservicesapi/middleware.go | 4 +- pkg/utils/apiutil/serverapi/middleware.go | 4 +- server/apiv2/middlewares/redirector.go | 4 +- tests/integrations/client/client_test.go | 3 +- .../mcs/tso/keyspace_group_manager_test.go | 5 +- tests/server/cluster/cluster_test.go | 2 +- 16 files changed, 99 insertions(+), 56 deletions(-) diff --git a/client/client.go b/client/client.go index 8211711bc38..534f683ebdb 100644 --- a/client/client.go +++ b/client/client.go @@ -1424,17 +1424,6 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint return resp, nil } -// IsLeaderChange will determine whether there is a leader change. -func IsLeaderChange(err error) bool { - if err == errs.ErrClientTSOStreamClosed { - return true - } - errMsg := err.Error() - return strings.Contains(errMsg, errs.NotLeaderErr) || - strings.Contains(errMsg, errs.MismatchLeaderErr) || - strings.Contains(errMsg, errs.NotServedErr) -} - const ( httpSchemePrefix = "http://" httpsSchemePrefix = "https://" diff --git a/client/errs/errno.go b/client/errs/errno.go index 50c136dd5f2..0dbcb4fe147 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -20,21 +20,20 @@ import ( "github.com/pingcap/errors" ) +// Note: keep the same as the ones defined on the server side to ensure the client can use them correctly. const ( + // NoLeaderErr indicates there is no leader in the cluster currently. + NoLeaderErr = "no leader" // NotLeaderErr indicates the non-leader member received the requests which should be received by leader. - // Note: keep the same as the ones defined on the server side, because the client side checks if an error message - // contains this string to judge whether the leader is changed. - NotLeaderErr = "is not leader" + NotLeaderErr = "not leader" // MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader. - // Note: keep the same as the ones defined on the server side, because the client side checks if an error message - // contains this string to judge whether the leader is changed. MismatchLeaderErr = "mismatch leader id" // NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it. - // Note: keep the same as the ones defined on the server side, because the client side checks if an error message - // contains this string to judge whether the leader is changed. NotServedErr = "is not served" // RetryTimeoutErr indicates the server is busy. RetryTimeoutErr = "retry timeout" + // NotPrimaryErr indicates the non-primary member received the requests which should be received by primary. + NotPrimaryErr = "not primary" ) // client errors diff --git a/client/errs/errs.go b/client/errs/errs.go index 47f7c29a467..da333efda4c 100644 --- a/client/errs/errs.go +++ b/client/errs/errs.go @@ -15,11 +15,29 @@ package errs import ( + "strings" + "github.com/pingcap/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) +// IsLeaderChange will determine whether there is a leader/primary change. +func IsLeaderChange(err error) bool { + if err == nil { + return false + } + if err == ErrClientTSOStreamClosed { + return true + } + errMsg := err.Error() + return strings.Contains(errMsg, NoLeaderErr) || + strings.Contains(errMsg, NotLeaderErr) || + strings.Contains(errMsg, MismatchLeaderErr) || + strings.Contains(errMsg, NotServedErr) || + strings.Contains(errMsg, NotPrimaryErr) +} + // ZapError is used to make the log output easier. func ZapError(err error, causeError ...error) zap.Field { if err == nil { diff --git a/client/http/client.go b/client/http/client.go index 30144ebe2c5..a282904bb91 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -120,10 +120,25 @@ func (ci *clientInner) requestWithRetry( headerOpts ...HeaderOption, ) error { var ( + serverURL string + isLeader bool statusCode int err error + logFields = append(reqInfo.logFields(), zap.String("source", ci.source)) ) execFunc := func() error { + defer func() { + // If the status code is 503, it indicates that there may be PD leader/follower changes. + // If the error message contains the leader/primary change information, it indicates that there may be PD leader/primary change. + if statusCode == http.StatusServiceUnavailable || errs.IsLeaderChange(err) { + ci.sd.ScheduleCheckMemberChanged() + } + log.Debug("[pd] http request finished", append(logFields, + zap.String("server-url", serverURL), + zap.Bool("is-leader", isLeader), + zap.Int("status-code", statusCode), + zap.Error(err))...) + }() // It will try to send the request to the PD leader first and then try to send the request to the other PD followers. clients := ci.sd.GetAllServiceClients() if len(clients) == 0 { @@ -131,17 +146,21 @@ func (ci *clientInner) requestWithRetry( } skipNum := 0 for _, cli := range clients { - url := cli.GetURL() - if reqInfo.targetURL != "" && reqInfo.targetURL != url { + serverURL = cli.GetURL() + isLeader = cli.IsConnectedToLeader() + if len(reqInfo.targetURL) > 0 && reqInfo.targetURL != serverURL { skipNum++ continue } - statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...) + statusCode, err = ci.doRequest(ctx, serverURL, reqInfo, headerOpts...) if err == nil || noNeedRetry(statusCode) { return err } - log.Debug("[pd] request url failed", - zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err)) + log.Debug("[pd] http request url failed", append(logFields, + zap.String("server-url", serverURL), + zap.Bool("is-leader", isLeader), + zap.Int("status-code", statusCode), + zap.Error(err))...) } if skipNum == len(clients) { return errs.ErrClientNoTargetMember @@ -168,26 +187,21 @@ func noNeedRetry(statusCode int) bool { func (ci *clientInner) doRequest( ctx context.Context, - url string, reqInfo *requestInfo, + serverURL string, reqInfo *requestInfo, headerOpts ...HeaderOption, ) (int, error) { var ( - source = ci.source callerID = reqInfo.callerID name = reqInfo.name method = reqInfo.method body = reqInfo.body res = reqInfo.res respHandler = reqInfo.respHandler + url = reqInfo.getURL(serverURL) + logFields = append(reqInfo.logFields(), + zap.String("source", ci.source), + zap.String("url", url)) ) - url = reqInfo.getURL(url) - logFields := []zap.Field{ - zap.String("source", source), - zap.String("name", name), - zap.String("url", url), - zap.String("method", method), - zap.String("caller-id", callerID), - } log.Debug("[pd] request the http url", logFields...) req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(body)) if err != nil { @@ -228,11 +242,14 @@ func (ci *clientInner) doRequest( if readErr != nil { logFields = append(logFields, zap.NamedError("read-body-error", err)) } else { + // API server will return a JSON body containing the detailed error message + // when the status code is not `http.StatusOK` 200. + bs = bytes.TrimSpace(bs) logFields = append(logFields, zap.ByteString("body", bs)) } log.Error("[pd] request failed with a non-200 status", logFields...) - return resp.StatusCode, errors.Errorf("request pd http api failed with status: '%s'", resp.Status) + return resp.StatusCode, errors.Errorf("request pd http api failed with status: '%s', body: '%s'", resp.Status, bs) } if res == nil { diff --git a/client/http/request_info.go b/client/http/request_info.go index 0ce7072d1ba..79fe3c6e567 100644 --- a/client/http/request_info.go +++ b/client/http/request_info.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/tikv/pd/client/retry" + "go.uber.org/zap" ) // The following constants are the names of the requests. @@ -156,3 +157,13 @@ func (ri *requestInfo) WithTargetURL(targetURL string) *requestInfo { func (ri *requestInfo) getURL(addr string) string { return fmt.Sprintf("%s%s", addr, ri.uri) } + +func (ri *requestInfo) logFields() []zap.Field { + return []zap.Field{ + zap.String("caller-id", ri.callerID), + zap.String("name", ri.name), + zap.String("uri", ri.uri), + zap.String("method", ri.method), + zap.String("target-url", ri.targetURL), + } +} diff --git a/client/pd_service_discovery_test.go b/client/pd_service_discovery_test.go index f4cde0e1911..44171873b1a 100644 --- a/client/pd_service_discovery_test.go +++ b/client/pd_service_discovery_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" "github.com/tikv/pd/client/testutil" "google.golang.org/grpc" @@ -205,7 +206,7 @@ func (suite *serviceClientTestSuite) TestServiceClient() { re.NotNil(leaderConn) _, err := pb.NewGreeterClient(followerConn).SayHello(suite.ctx, &pb.HelloRequest{Name: "pd"}) - re.ErrorContains(err, "not leader") + re.ErrorContains(err, errs.NotLeaderErr) resp, err := pb.NewGreeterClient(leaderConn).SayHello(suite.ctx, &pb.HelloRequest{Name: "pd"}) re.NoError(err) re.Equal("Hello pd", resp.GetMessage()) diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 872b241cfe7..98b123c0823 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -16,7 +16,6 @@ package pd import ( "context" - "strings" "time" "github.com/gogo/protobuf/proto" @@ -35,10 +34,6 @@ const ( modify actionType = 1 groupSettingsPathPrefix = "resource_group/settings" controllerConfigPathPrefix = "resource_group/controller" - // errNotPrimary is returned when the requested server is not primary. - errNotPrimary = "not primary" - // errNotLeader is returned when the requested server is not pd leader. - errNotLeader = "not leader" ) // GroupSettingsPathPrefixBytes is used to watch or get resource groups. @@ -83,7 +78,7 @@ func (c *client) resourceManagerClient() (rmpb.ResourceManagerClient, error) { // gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service. func (c *client) gRPCErrorHandler(err error) { - if strings.Contains(err.Error(), errNotPrimary) || strings.Contains(err.Error(), errNotLeader) { + if errs.IsLeaderChange(err) { c.pdSvcDiscovery.ScheduleCheckMemberChanged() } } diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 460e9b697b9..eb89f892f75 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -522,7 +522,7 @@ tsoBatchLoop: cancel() stream = nil // Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP. - if IsLeaderChange(err) { + if errs.IsLeaderChange(err) { if err := bo.Exec(dispatcherCtx, c.svcDiscovery.CheckMemberChanged); err != nil { select { case <-dispatcherCtx.Done(): diff --git a/errors.toml b/errors.toml index 64101000478..a61c23a6fbd 100644 --- a/errors.toml +++ b/errors.toml @@ -16,11 +16,21 @@ error = ''' redirect failed ''' +["PD:apiutil:ErrRedirectNoLeader"] +error = ''' +redirect finds no leader +''' + ["PD:apiutil:ErrRedirectToNotLeader"] error = ''' redirect to not leader ''' +["PD:apiutil:ErrRedirectToNotPrimary"] +error = ''' +redirect to not primary +''' + ["PD:autoscaling:ErrEmptyMetricsResponse"] error = ''' metrics response from Prometheus is empty diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 8c3e914531b..1f56a821032 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -195,10 +195,11 @@ var ( // apiutil errors var ( - ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect")) - ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist")) - // ErrRedirectToNotLeader is the error message for redirect to not leader. - ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader")) + ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect")) + ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist")) + ErrRedirectNoLeader = errors.Normalize("redirect finds no leader", errors.RFCCodeText("PD:apiutil:ErrRedirectNoLeader")) + ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader")) + ErrRedirectToNotPrimary = errors.Normalize("redirect to not primary", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotPrimary")) ) // grpcutil errors diff --git a/pkg/utils/apiutil/multiservicesapi/middleware.go b/pkg/utils/apiutil/multiservicesapi/middleware.go index ed34ecc6afb..4343adcc981 100644 --- a/pkg/utils/apiutil/multiservicesapi/middleware.go +++ b/pkg/utils/apiutil/multiservicesapi/middleware.go @@ -48,8 +48,8 @@ func ServiceRedirector() gin.HandlerFunc { // Prevent more than one redirection. if name := c.Request.Header.Get(ServiceRedirectorHeader); len(name) != 0 { - log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect)) - c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error()) + log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirectToNotPrimary)) + c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirectToNotPrimary.FastGenByArgs().Error()) return } diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index ce3617453d2..cb2340978b7 100755 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -209,14 +209,14 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http } else if name := r.Header.Get(apiutil.PDRedirectorHeader); len(name) == 0 { leader := h.waitForLeader(r) if leader == nil { - http.Error(w, "no leader", http.StatusServiceUnavailable) + http.Error(w, errs.ErrRedirectNoLeader.FastGenByArgs().Error(), http.StatusServiceUnavailable) return } clientUrls = leader.GetClientUrls() r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name()) } else { // Prevent more than one redirection among PD/API servers. - log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect)) + log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirectToNotLeader)) http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError) return } diff --git a/server/apiv2/middlewares/redirector.go b/server/apiv2/middlewares/redirector.go index 37c06de1585..9c2c4081175 100644 --- a/server/apiv2/middlewares/redirector.go +++ b/server/apiv2/middlewares/redirector.go @@ -43,8 +43,8 @@ func Redirector() gin.HandlerFunc { // Prevent more than one redirection. if name := c.Request.Header.Get(apiutil.PDRedirectorHeader); len(name) != 0 { - log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect)) - c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error()) + log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirectToNotLeader)) + c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirectToNotLeader.FastGenByArgs().Error()) return } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index da4be99638d..7fadddf4532 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -39,6 +39,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + clierrs "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" @@ -493,7 +494,7 @@ func TestGlobalAndLocalTSO(t *testing.T) { re.NotEmpty(cluster.WaitLeader()) _, _, err = cli.GetTS(ctx) re.Error(err) - re.True(pd.IsLeaderChange(err)) + re.True(clierrs.IsLeaderChange(err)) _, _, err = cli.GetTS(ctx) re.NoError(err) re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateMember")) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index af33b85b1a1..ba53785b870 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + clierrs "github.com/tikv/pd/client/errs" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" mcsutils "github.com/tikv/pd/pkg/mcs/utils" @@ -448,8 +449,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) dispatchClient( errMsg := err.Error() // Ignore the errors caused by the split and context cancellation. if strings.Contains(errMsg, "context canceled") || - strings.Contains(errMsg, "not leader") || - strings.Contains(errMsg, "not served") || + strings.Contains(errMsg, clierrs.NotLeaderErr) || + strings.Contains(errMsg, clierrs.NotServedErr) || strings.Contains(errMsg, "ErrKeyspaceNotAssigned") || strings.Contains(errMsg, "ErrKeyspaceGroupIsMerging") { continue diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 3415c22a77b..332fc42431e 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -662,7 +662,7 @@ func TestNotLeader(t *testing.T) { grpcStatus, ok := status.FromError(err) re.True(ok) re.Equal(codes.Unavailable, grpcStatus.Code()) - re.Equal("not leader", grpcStatus.Message()) + re.ErrorContains(server.ErrNotLeader, grpcStatus.Message()) } func TestStoreVersionChange(t *testing.T) {