From 94ce97af7e2fb7127ee4856242ea3972cef3426c Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 16 Oct 2024 12:25:50 +0800 Subject: [PATCH] move tso to independent thread Signed-off-by: Ryan Leung --- client/client.go | 4 +- server/cluster/cluster.go | 104 +++++++++++++++++++++++---- server/server.go | 23 +++++- tests/server/cluster/cluster_test.go | 6 +- 4 files changed, 117 insertions(+), 20 deletions(-) diff --git a/client/client.go b/client/client.go index 27952df13cd..a4d9e8e1907 100644 --- a/client/client.go +++ b/client/client.go @@ -203,7 +203,9 @@ func (k *serviceModeKeeper) close() { defer k.Unlock() switch k.serviceMode { case pdpb.ServiceMode_API_SVC_MODE: - k.tsoSvcDiscovery.Close() + if k.tsoSvcDiscovery != nil { + k.tsoSvcDiscovery.Close() + } fallthrough case pdpb.ServiceMode_PD_SVC_MODE: if k.tsoClient != nil { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 4cce39fa093..459c1079c9f 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -17,6 +17,7 @@ package cluster import ( "context" "encoding/json" + errorspkg "errors" "fmt" "io" "math" @@ -43,6 +44,7 @@ import ( "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/utils/constant" + "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/progress" "github.com/tikv/pd/pkg/ratelimit" @@ -56,6 +58,7 @@ import ( "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/syncer" + "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/unsaferecovery" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/keypath" @@ -88,12 +91,13 @@ const ( // nodeStateCheckJobInterval is the interval to run node state check job. nodeStateCheckJobInterval = 10 * time.Second // metricsCollectionJobInterval is the interval to run metrics collection job. - metricsCollectionJobInterval = 10 * time.Second - updateStoreStatsInterval = 9 * time.Millisecond - clientTimeout = 3 * time.Second - defaultChangedRegionsLimit = 10000 - gcTombstoneInterval = 30 * 24 * time.Hour - serviceCheckInterval = 10 * time.Second + metricsCollectionJobInterval = 10 * time.Second + updateStoreStatsInterval = 9 * time.Millisecond + clientTimeout = 3 * time.Second + defaultChangedRegionsLimit = 10000 + gcTombstoneInterval = 30 * 24 * time.Hour + schedulingServiceCheckInterval = 10 * time.Second + tsoServiceCheckInterval = 100 * time.Millisecond // persistLimitRetryTimes is used to reduce the probability of the persistent error // since the once the store is added or removed, we shouldn't return an error even if the store limit is failed to persist. persistLimitRetryTimes = 5 @@ -144,6 +148,7 @@ type RaftCluster struct { cancel context.CancelFunc *core.BasicCluster // cached cluster info + member *member.EmbeddedEtcdMember etcdClient *clientv3.Client httpClient *http.Client @@ -174,6 +179,8 @@ type RaftCluster struct { keyspaceGroupManager *keyspace.GroupManager independentServices sync.Map hbstreams *hbstream.HeartbeatStreams + tsoAllocator *tso.AllocatorManager + checkTSOCh chan struct{} // heartbeatRunner is used to process the subtree update task asynchronously. heartbeatRunner ratelimit.Runner @@ -194,16 +201,19 @@ type Status struct { } // NewRaftCluster create a new cluster. -func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, - httpClient *http.Client) *RaftCluster { +func NewRaftCluster(ctx context.Context, clusterID uint64, member *member.EmbeddedEtcdMember, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, + httpClient *http.Client, tsoAllocator *tso.AllocatorManager, checkTSOCh chan struct{}) *RaftCluster { return &RaftCluster{ serverCtx: ctx, clusterID: clusterID, + member: member, regionSyncer: regionSyncer, httpClient: httpClient, etcdClient: etcdClient, BasicCluster: basicCluster, storage: storage, + tsoAllocator: tsoAllocator, + checkTSOCh: checkTSOCh, heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), @@ -314,11 +324,13 @@ func (c *RaftCluster) Start(s Server) error { if err != nil { return err } + c.checkTSOService() cluster, err := c.LoadClusterInfo() if err != nil { return err } if cluster == nil { + log.Warn("cluster is not bootstrapped") return nil } @@ -351,7 +363,7 @@ func (c *RaftCluster) Start(s Server) error { return err } } - c.checkServices() + c.checkSchedulingService() c.wg.Add(9) go c.runServiceCheckJob() go c.runMetricsCollectionJob() @@ -370,7 +382,7 @@ func (c *RaftCluster) Start(s Server) error { return nil } -func (c *RaftCluster) checkServices() { +func (c *RaftCluster) checkSchedulingService() { if c.isAPIServiceMode { servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), constant.SchedulingServiceName) if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) { @@ -390,27 +402,89 @@ func (c *RaftCluster) checkServices() { } } +// checkTSOService checks the TSO service. +func (c *RaftCluster) checkTSOService() { + if !c.isAPIServiceMode { + if c.member.IsLeader() { + if err := c.startTSOJobs(); err != nil { + // If there is an error, need to wait for the next check. + return + } + } else { + // leader exits, reset the allocator group + if err := c.stopTSOJobs(); err != nil { + // If there is an error, need to wait for the next check. + return + } + + failpoint.Inject("updateAfterResetTSO", func() { + allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { + log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err)) + } + if allocator.IsInitialize() { + log.Panic("the allocator should be uninitialized after reset") + } + }) + } + } +} + func (c *RaftCluster) runServiceCheckJob() { defer logutil.LogPanic() defer c.wg.Done() - ticker := time.NewTicker(serviceCheckInterval) + schedulingTicker := time.NewTicker(schedulingServiceCheckInterval) failpoint.Inject("highFrequencyClusterJobs", func() { - ticker.Reset(time.Millisecond) + schedulingTicker.Reset(time.Millisecond) }) - defer ticker.Stop() + defer schedulingTicker.Stop() + tsoTicker := time.NewTicker(tsoServiceCheckInterval) + defer tsoTicker.Stop() for { select { case <-c.ctx.Done(): log.Info("service check job is stopped") return - case <-ticker.C: - c.checkServices() + case <-schedulingTicker.C: + c.checkSchedulingService() + case <-tsoTicker.C: + c.checkTSOService() + case <-c.checkTSOCh: + c.checkTSOService() } } } +func (c *RaftCluster) startTSOJobs() error { + allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err != nil { + log.Error("failed to get global TSO allocator", errs.ZapError(err)) + return err + } + if !allocator.IsInitialize() { + log.Info("initializing the global TSO allocator") + if err := allocator.Initialize(0); err != nil { + log.Error("failed to initialize the global TSO allocator", errs.ZapError(err)) + return err + } + } + return nil +} + +func (c *RaftCluster) stopTSOJobs() error { + allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err != nil { + log.Error("failed to get global TSO allocator", errs.ZapError(err)) + return err + } + if allocator.IsInitialize() { + c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true) + } + return nil +} + // startGCTuner func (c *RaftCluster) startGCTuner() { defer logutil.LogPanic() diff --git a/server/server.go b/server/server.go index c79f51d8153..6ef50e1653b 100644 --- a/server/server.go +++ b/server/server.go @@ -237,6 +237,8 @@ type Server struct { // Cgroup Monitor cgMonitor cgroup.Monitor + + checkTSOCh chan struct{} } // HandlerBuilder builds a server HTTP handler. @@ -269,6 +271,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le }{ clients: make(map[string]tsopb.TSO_TsoClient), }, + checkTSOCh: make(chan struct{}, 1), } s.handler = newHandler(s) @@ -490,7 +493,7 @@ func (s *Server) startServer(ctx context.Context) error { s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg) s.basicCluster = core.NewBasicCluster() - s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient) + s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager, s.checkTSOCh) keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{ Client: s.client, RootPath: s.rootPath, @@ -912,6 +915,11 @@ func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager { return s.tsoAllocatorManager } +// GetCheckTSOCh returns the check TSO channel of server. +func (s *Server) GetCheckTSOCh() chan struct{} { + return s.checkTSOCh +} + // GetKeyspaceManager returns the keyspace manager of server. func (s *Server) GetKeyspaceManager() *keyspace.Manager { return s.keyspaceManager @@ -1715,6 +1723,19 @@ func (s *Server) campaignLeader() { s.member.KeepLeader(ctx) log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name())) + // notify cluster to start TSO service + select { + case s.checkTSOCh <- struct{}{}: + default: + } + + defer func() { + select { + case s.checkTSOCh <- struct{}{}: + default: + } + }() + if !s.IsAPIServiceMode() { allocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation) if err != nil { diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index a9be92d19e9..df9d0ba0e84 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -912,7 +912,7 @@ func TestLoadClusterInfo(t *testing.T) { tc.WaitLeader() leaderServer := tc.GetLeaderServer() svr := leaderServer.GetServer() - rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) + rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager(), svr.GetCheckTSOCh()) // Cluster is not bootstrapped. rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) @@ -952,7 +952,7 @@ func TestLoadClusterInfo(t *testing.T) { } re.NoError(testStorage.Flush()) - raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) + raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager(), svr.GetCheckTSOCh()) raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) raftCluster, err = raftCluster.LoadClusterInfo() re.NoError(err) @@ -1666,7 +1666,7 @@ func TestTransferLeaderBack(t *testing.T) { tc.WaitLeader() leaderServer := tc.GetLeaderServer() svr := leaderServer.GetServer() - rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) + rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager(), svr.GetCheckTSOCh()) rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) storage := rc.GetStorage() meta := &metapb.Cluster{Id: 123}