Skip to content

Commit

Permalink
move tso to independent thread
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Oct 16, 2024
1 parent 3d4c416 commit 94ce97a
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 20 deletions.
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ func (k *serviceModeKeeper) close() {
defer k.Unlock()
switch k.serviceMode {
case pdpb.ServiceMode_API_SVC_MODE:
k.tsoSvcDiscovery.Close()
if k.tsoSvcDiscovery != nil {
k.tsoSvcDiscovery.Close()
}
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
if k.tsoClient != nil {
Expand Down
104 changes: 89 additions & 15 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cluster
import (
"context"
"encoding/json"
errorspkg "errors"
"fmt"
"io"
"math"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/memory"
"github.com/tikv/pd/pkg/progress"
"github.com/tikv/pd/pkg/ratelimit"
Expand All @@ -56,6 +58,7 @@ import (
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/syncer"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/unsaferecovery"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/keypath"
Expand Down Expand Up @@ -88,12 +91,13 @@ const (
// nodeStateCheckJobInterval is the interval to run node state check job.
nodeStateCheckJobInterval = 10 * time.Second
// metricsCollectionJobInterval is the interval to run metrics collection job.
metricsCollectionJobInterval = 10 * time.Second
updateStoreStatsInterval = 9 * time.Millisecond
clientTimeout = 3 * time.Second
defaultChangedRegionsLimit = 10000
gcTombstoneInterval = 30 * 24 * time.Hour
serviceCheckInterval = 10 * time.Second
metricsCollectionJobInterval = 10 * time.Second
updateStoreStatsInterval = 9 * time.Millisecond
clientTimeout = 3 * time.Second
defaultChangedRegionsLimit = 10000
gcTombstoneInterval = 30 * 24 * time.Hour
schedulingServiceCheckInterval = 10 * time.Second
tsoServiceCheckInterval = 100 * time.Millisecond
// persistLimitRetryTimes is used to reduce the probability of the persistent error
// since the once the store is added or removed, we shouldn't return an error even if the store limit is failed to persist.
persistLimitRetryTimes = 5
Expand Down Expand Up @@ -144,6 +148,7 @@ type RaftCluster struct {
cancel context.CancelFunc

*core.BasicCluster // cached cluster info
member *member.EmbeddedEtcdMember

etcdClient *clientv3.Client
httpClient *http.Client
Expand Down Expand Up @@ -174,6 +179,8 @@ type RaftCluster struct {
keyspaceGroupManager *keyspace.GroupManager
independentServices sync.Map
hbstreams *hbstream.HeartbeatStreams
tsoAllocator *tso.AllocatorManager
checkTSOCh chan struct{}

// heartbeatRunner is used to process the subtree update task asynchronously.
heartbeatRunner ratelimit.Runner
Expand All @@ -194,16 +201,19 @@ type Status struct {
}

// NewRaftCluster create a new cluster.
func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
httpClient *http.Client) *RaftCluster {
func NewRaftCluster(ctx context.Context, clusterID uint64, member *member.EmbeddedEtcdMember, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client,
httpClient *http.Client, tsoAllocator *tso.AllocatorManager, checkTSOCh chan struct{}) *RaftCluster {
return &RaftCluster{
serverCtx: ctx,
clusterID: clusterID,
member: member,
regionSyncer: regionSyncer,
httpClient: httpClient,
etcdClient: etcdClient,
BasicCluster: basicCluster,
storage: storage,
tsoAllocator: tsoAllocator,
checkTSOCh: checkTSOCh,
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
Expand Down Expand Up @@ -314,11 +324,13 @@ func (c *RaftCluster) Start(s Server) error {
if err != nil {
return err
}
c.checkTSOService()
cluster, err := c.LoadClusterInfo()
if err != nil {
return err
}
if cluster == nil {
log.Warn("cluster is not bootstrapped")
return nil
}

Expand Down Expand Up @@ -351,7 +363,7 @@ func (c *RaftCluster) Start(s Server) error {
return err
}
}
c.checkServices()
c.checkSchedulingService()
c.wg.Add(9)
go c.runServiceCheckJob()
go c.runMetricsCollectionJob()
Expand All @@ -370,7 +382,7 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}

func (c *RaftCluster) checkServices() {
func (c *RaftCluster) checkSchedulingService() {
if c.isAPIServiceMode {
servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), constant.SchedulingServiceName)
if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
Expand All @@ -390,27 +402,89 @@ func (c *RaftCluster) checkServices() {
}
}

// checkTSOService checks the TSO service.
func (c *RaftCluster) checkTSOService() {
if !c.isAPIServiceMode {
if c.member.IsLeader() {
if err := c.startTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
return
}
} else {
// leader exits, reset the allocator group
if err := c.stopTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
return
}

failpoint.Inject("updateAfterResetTSO", func() {
allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
}
if allocator.IsInitialize() {
log.Panic("the allocator should be uninitialized after reset")
}
})
}
}
}

func (c *RaftCluster) runServiceCheckJob() {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(serviceCheckInterval)
schedulingTicker := time.NewTicker(schedulingServiceCheckInterval)
failpoint.Inject("highFrequencyClusterJobs", func() {
ticker.Reset(time.Millisecond)
schedulingTicker.Reset(time.Millisecond)
})
defer ticker.Stop()
defer schedulingTicker.Stop()
tsoTicker := time.NewTicker(tsoServiceCheckInterval)
defer tsoTicker.Stop()

for {
select {
case <-c.ctx.Done():
log.Info("service check job is stopped")
return
case <-ticker.C:
c.checkServices()
case <-schedulingTicker.C:
c.checkSchedulingService()
case <-tsoTicker.C:
c.checkTSOService()
case <-c.checkTSOCh:
c.checkTSOService()
}
}
}

func (c *RaftCluster) startTSOJobs() error {
allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get global TSO allocator", errs.ZapError(err))
return err
}
if !allocator.IsInitialize() {
log.Info("initializing the global TSO allocator")
if err := allocator.Initialize(0); err != nil {
log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
return err
}
}
return nil
}

func (c *RaftCluster) stopTSOJobs() error {
allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get global TSO allocator", errs.ZapError(err))
return err
}
if allocator.IsInitialize() {
c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true)
}
return nil
}

// startGCTuner
func (c *RaftCluster) startGCTuner() {
defer logutil.LogPanic()
Expand Down
23 changes: 22 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ type Server struct {

// Cgroup Monitor
cgMonitor cgroup.Monitor

checkTSOCh chan struct{}
}

// HandlerBuilder builds a server HTTP handler.
Expand Down Expand Up @@ -269,6 +271,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le
}{
clients: make(map[string]tsopb.TSO_TsoClient),
},
checkTSOCh: make(chan struct{}, 1),
}
s.handler = newHandler(s)

Expand Down Expand Up @@ -490,7 +493,7 @@ func (s *Server) startServer(ctx context.Context) error {

s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient)
s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager, s.checkTSOCh)
keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{
Client: s.client,
RootPath: s.rootPath,
Expand Down Expand Up @@ -912,6 +915,11 @@ func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.tsoAllocatorManager
}

// GetCheckTSOCh returns the check TSO channel of server.
func (s *Server) GetCheckTSOCh() chan struct{} {
return s.checkTSOCh
}

// GetKeyspaceManager returns the keyspace manager of server.
func (s *Server) GetKeyspaceManager() *keyspace.Manager {
return s.keyspaceManager
Expand Down Expand Up @@ -1715,6 +1723,19 @@ func (s *Server) campaignLeader() {
s.member.KeepLeader(ctx)
log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name()))

// notify cluster to start TSO service
select {
case s.checkTSOCh <- struct{}{}:
default:
}

defer func() {
select {
case s.checkTSOCh <- struct{}{}:
default:
}
}()

if !s.IsAPIServiceMode() {
allocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ func TestLoadClusterInfo(t *testing.T) {
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager(), svr.GetCheckTSOCh())

// Cluster is not bootstrapped.
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
Expand Down Expand Up @@ -952,7 +952,7 @@ func TestLoadClusterInfo(t *testing.T) {
}
re.NoError(testStorage.Flush())

raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), basicCluster, testStorage, syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager(), svr.GetCheckTSOCh())
raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
raftCluster, err = raftCluster.LoadClusterInfo()
re.NoError(err)
Expand Down Expand Up @@ -1666,7 +1666,7 @@ func TestTransferLeaderBack(t *testing.T) {
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), svr.GetMember(), svr.GetBasicCluster(), svr.GetStorage(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient(), svr.GetTSOAllocatorManager(), svr.GetCheckTSOCh())
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetHBStreams(), svr.GetKeyspaceGroupManager())
storage := rc.GetStorage()
meta := &metapb.Cluster{Id: 123}
Expand Down

0 comments on commit 94ce97a

Please sign in to comment.