Skip to content

Commit

Permalink
adjust check tso service
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Oct 15, 2024
1 parent edf3908 commit 064ca7b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 26 deletions.
57 changes: 35 additions & 22 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/memory"
"github.com/tikv/pd/pkg/progress"
"github.com/tikv/pd/pkg/ratelimit"
Expand Down Expand Up @@ -147,6 +148,7 @@ type RaftCluster struct {
cancel context.CancelFunc

*core.BasicCluster // cached cluster info
member *member.EmbeddedEtcdMember

etcdClient *clientv3.Client
httpClient *http.Client
Expand Down Expand Up @@ -181,7 +183,7 @@ type RaftCluster struct {
independentServices sync.Map
hbstreams *hbstream.HeartbeatStreams
tsoAllocator *tso.AllocatorManager

tsoCh chan struct{}
// heartbeatRunner is used to process the subtree update task asynchronously.
heartbeatRunner ratelimit.Runner
// miscRunner is used to process the statistics and persistent tasks asynchronously.
Expand All @@ -201,17 +203,19 @@ type Status struct {
}

// NewRaftCluster create a new cluster.
func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
httpClient *http.Client, tsoAllocator *tso.AllocatorManager) *RaftCluster {
func NewRaftCluster(ctx context.Context, clusterID uint64, member *member.EmbeddedEtcdMember, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
httpClient *http.Client, tsoAllocator *tso.AllocatorManager, tsoCh chan struct{}) *RaftCluster {
return &RaftCluster{
serverCtx: ctx,
clusterID: clusterID,
member: member,
regionSyncer: regionSyncer,
httpClient: httpClient,
etcdClient: etcdClient,
BasicCluster: basicCluster,
storage: storage,
tsoAllocator: tsoAllocator,
tsoCh: tsoCh,
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
Expand Down Expand Up @@ -415,6 +419,10 @@ func (c *RaftCluster) checkTSOService() {
if err := c.startTSOJobs(); err != nil {
return
}
if c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("PD server starts to provide timestamp")
}
c.UnsetServiceIndependent(constant.TSOServiceName)
} else {
// If the previous TSO is provided by the PD server, we need to reset the PD's allocator group
// and start to let the TSO service provide the timestamp through SetServiceIndependent.
Expand All @@ -427,8 +435,24 @@ func (c *RaftCluster) checkTSOService() {
}
} else {
// If the PD server is not in the API service mode, PD should provide the TSO service.
if err := c.startTSOJobs(); err != nil {
return
if c.member.IsLeader() {
if err := c.startTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
return
}
} else {
// leader exits, reset the allocator group
c.stopTSOJobs()

failpoint.Inject("updateAfterResetTSO", func() {
allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
}
if allocator.IsInitialize() {
log.Panic("the allocator should be uninitialized after reset")
}
})
}
}
}
Expand All @@ -448,25 +472,14 @@ func (c *RaftCluster) runServiceCheckJob() {
for {
select {
case <-c.ctx.Done():
if !c.IsServiceIndependent(constant.TSOServiceName) {
// leader exits, reset the allocator group
c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true)
}
failpoint.Inject("updateAfterResetTSO", func() {
allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
}
if allocator.IsInitialize() {
log.Panic("the allocator should be uninitialized after reset")
}
})
log.Info("service check job is stopped")
return
case <-schedulingTicker.C:
c.checkSchedulingService()
case <-tsoTicker.C:
c.checkTSOService()
case <-c.tsoCh:
c.checkTSOService()
}
}
}
Expand All @@ -484,13 +497,13 @@ func (c *RaftCluster) startTSOJobs() error {
return err
}
}
if c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("PD server starts to provide timestamp")
}
c.UnsetServiceIndependent(constant.TSOServiceName)
return nil
}

func (c *RaftCluster) stopTSOJobs() {
c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true)
}

// startGCTuner
func (c *RaftCluster) startGCTuner() {
defer logutil.LogPanic()
Expand Down
24 changes: 23 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ type Server struct {

// Cgroup Monitor
cgMonitor cgroup.Monitor

tsoCh chan struct{}
}

// HandlerBuilder builds a server HTTP handler.
Expand Down Expand Up @@ -268,6 +270,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le
}{
clients: make(map[string]tsopb.TSO_TsoClient),
},
tsoCh: make(chan struct{}, 1),
}
s.handler = newHandler(s)

Expand Down Expand Up @@ -489,7 +492,7 @@ func (s *Server) startServer(ctx context.Context) error {

s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager)
s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.member, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager, s.tsoCh)
keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{
Client: s.client,
RootPath: s.rootPath,
Expand Down Expand Up @@ -911,6 +914,11 @@ func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.tsoAllocatorManager
}

// GetTSOCh returns the TSO channel of server.
func (s *Server) GetTSOCh() chan struct{} {
return s.tsoCh
}

// GetKeyspaceManager returns the keyspace manager of server.
func (s *Server) GetKeyspaceManager() *keyspace.Manager {
return s.keyspaceManager
Expand Down Expand Up @@ -1713,6 +1721,20 @@ func (s *Server) campaignLeader() {
s.member.KeepLeader(ctx)
log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name()))

// notify cluster to start TSO service
select {
case s.tsoCh <- struct{}{}:
default:
}

defer func() {
// notify cluster to stop TSO service
select {
case s.tsoCh <- struct{}{}:
default:
}
}()

if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
return
Expand Down
6 changes: 3 additions & 3 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ func TestLoadClusterInfo(t *testing.T) {
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager())
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager(), svr.GetTSOCh())

// Cluster is not bootstrapped.
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
Expand Down Expand Up @@ -950,7 +950,7 @@ func TestLoadClusterInfo(t *testing.T) {
}
re.NoError(testStorage.Flush())

raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager())
raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager(), svr.GetTSOCh())
raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
raftCluster, err = raftCluster.LoadClusterInfo()
re.NoError(err)
Expand Down Expand Up @@ -1664,7 +1664,7 @@ func TestTransferLeaderBack(t *testing.T) {
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager())
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager(), svr.GetTSOCh())
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
storage := rc.GetStorage()
meta := &metapb.Cluster{Id: 123}
Expand Down

0 comments on commit 064ca7b

Please sign in to comment.