From 064ca7bd3404171915a8b203ca150dce09864fc9 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 15 Oct 2024 17:45:21 +0800 Subject: [PATCH] adjust check tso service Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 57 +++++++++++++++++----------- server/server.go | 24 +++++++++++- tests/server/cluster/cluster_test.go | 6 +-- 3 files changed, 61 insertions(+), 26 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 618b83b094a..f7638904c8d 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -44,6 +44,7 @@ import ( "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/mcs/utils/constant" + "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/progress" "github.com/tikv/pd/pkg/ratelimit" @@ -147,6 +148,7 @@ type RaftCluster struct { cancel context.CancelFunc *core.BasicCluster // cached cluster info + member *member.EmbeddedEtcdMember etcdClient *clientv3.Client httpClient *http.Client @@ -181,7 +183,7 @@ type RaftCluster struct { independentServices sync.Map hbstreams *hbstream.HeartbeatStreams tsoAllocator *tso.AllocatorManager - + tsoCh chan struct{} // heartbeatRunner is used to process the subtree update task asynchronously. heartbeatRunner ratelimit.Runner // miscRunner is used to process the statistics and persistent tasks asynchronously. @@ -201,17 +203,19 @@ type Status struct { } // NewRaftCluster create a new cluster. -func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, - httpClient *http.Client, tsoAllocator *tso.AllocatorManager) *RaftCluster { +func NewRaftCluster(ctx context.Context, clusterID uint64, member *member.EmbeddedEtcdMember, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, + httpClient *http.Client, tsoAllocator *tso.AllocatorManager, tsoCh chan struct{}) *RaftCluster { return &RaftCluster{ serverCtx: ctx, clusterID: clusterID, + member: member, regionSyncer: regionSyncer, httpClient: httpClient, etcdClient: etcdClient, BasicCluster: basicCluster, storage: storage, tsoAllocator: tsoAllocator, + tsoCh: tsoCh, heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), @@ -415,6 +419,10 @@ func (c *RaftCluster) checkTSOService() { if err := c.startTSOJobs(); err != nil { return } + if c.IsServiceIndependent(constant.TSOServiceName) { + log.Info("PD server starts to provide timestamp") + } + c.UnsetServiceIndependent(constant.TSOServiceName) } else { // If the previous TSO is provided by the PD server, we need to reset the PD's allocator group // and start to let the TSO service provide the timestamp through SetServiceIndependent. @@ -427,8 +435,24 @@ func (c *RaftCluster) checkTSOService() { } } else { // If the PD server is not in the API service mode, PD should provide the TSO service. - if err := c.startTSOJobs(); err != nil { - return + if c.member.IsLeader() { + if err := c.startTSOJobs(); err != nil { + // If there is an error, need to wait for the next check. + return + } + } else { + // leader exits, reset the allocator group + c.stopTSOJobs() + + failpoint.Inject("updateAfterResetTSO", func() { + allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { + log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err)) + } + if allocator.IsInitialize() { + log.Panic("the allocator should be uninitialized after reset") + } + }) } } } @@ -448,25 +472,14 @@ func (c *RaftCluster) runServiceCheckJob() { for { select { case <-c.ctx.Done(): - if !c.IsServiceIndependent(constant.TSOServiceName) { - // leader exits, reset the allocator group - c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true) - } - failpoint.Inject("updateAfterResetTSO", func() { - allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) - if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { - log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err)) - } - if allocator.IsInitialize() { - log.Panic("the allocator should be uninitialized after reset") - } - }) log.Info("service check job is stopped") return case <-schedulingTicker.C: c.checkSchedulingService() case <-tsoTicker.C: c.checkTSOService() + case <-c.tsoCh: + c.checkTSOService() } } } @@ -484,13 +497,13 @@ func (c *RaftCluster) startTSOJobs() error { return err } } - if c.IsServiceIndependent(constant.TSOServiceName) { - log.Info("PD server starts to provide timestamp") - } - c.UnsetServiceIndependent(constant.TSOServiceName) return nil } +func (c *RaftCluster) stopTSOJobs() { + c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true) +} + // startGCTuner func (c *RaftCluster) startGCTuner() { defer logutil.LogPanic() diff --git a/server/server.go b/server/server.go index 2197bc597b1..08bdfd2b0da 100644 --- a/server/server.go +++ b/server/server.go @@ -236,6 +236,8 @@ type Server struct { // Cgroup Monitor cgMonitor cgroup.Monitor + + tsoCh chan struct{} } // HandlerBuilder builds a server HTTP handler. @@ -268,6 +270,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le }{ clients: make(map[string]tsopb.TSO_TsoClient), }, + tsoCh: make(chan struct{}, 1), } s.handler = newHandler(s) @@ -489,7 +492,7 @@ func (s *Server) startServer(ctx context.Context) error { s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg) s.basicCluster = core.NewBasicCluster() - s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager) + s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.member, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager, s.tsoCh) keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{ Client: s.client, RootPath: s.rootPath, @@ -911,6 +914,11 @@ func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager { return s.tsoAllocatorManager } +// GetTSOCh returns the TSO channel of server. +func (s *Server) GetTSOCh() chan struct{} { + return s.tsoCh +} + // GetKeyspaceManager returns the keyspace manager of server. func (s *Server) GetKeyspaceManager() *keyspace.Manager { return s.keyspaceManager @@ -1713,6 +1721,20 @@ func (s *Server) campaignLeader() { s.member.KeepLeader(ctx) log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name())) + // notify cluster to start TSO service + select { + case s.tsoCh <- struct{}{}: + default: + } + + defer func() { + // notify cluster to stop TSO service + select { + case s.tsoCh <- struct{}{}: + default: + } + }() + if err := s.reloadConfigFromKV(); err != nil { log.Error("failed to reload configuration", errs.ZapError(err)) return diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 58c1609041a..8bf8758c640 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -910,7 +910,7 @@ func TestLoadClusterInfo(t *testing.T) { tc.WaitLeader() leaderServer := tc.GetLeaderServer() svr := leaderServer.GetServer() - rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager()) + rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager(), svr.GetTSOCh()) // Cluster is not bootstrapped. rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) @@ -950,7 +950,7 @@ func TestLoadClusterInfo(t *testing.T) { } re.NoError(testStorage.Flush()) - raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager()) + raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager(), svr.GetTSOCh()) raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) raftCluster, err = raftCluster.LoadClusterInfo() re.NoError(err) @@ -1664,7 +1664,7 @@ func TestTransferLeaderBack(t *testing.T) { tc.WaitLeader() leaderServer := tc.GetLeaderServer() svr := leaderServer.GetServer() - rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager()) + rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager(), svr.GetTSOCh()) rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager()) storage := rc.GetStorage() meta := &metapb.Cluster{Id: 123}