Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Apr 15, 2024
1 parent 7595213 commit 7f23d25
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 5 deletions.
3 changes: 2 additions & 1 deletion pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func NewAllocatorManager(
cfg Config,
startGlobalLeaderLoop bool,
allocatorKeyPrefix string,
allocatorKey string,
) *AllocatorManager {
ctx, cancel := context.WithCancel(ctx)
am := &AllocatorManager{
Expand All @@ -225,8 +226,8 @@ func NewAllocatorManager(
leaderLease: cfg.GetLeaderLease(),
maxResetTSGap: cfg.GetMaxResetTSGap,
securityConfig: cfg.GetTLSConfig(),
allocatorKey: path.Join(allocatorKeyPrefix, fmt.Sprintf("keyspace_group_%d", keyspaceGroupID)),
allocatorKeyPrefix: allocatorKeyPrefix,
allocatorKey: allocatorKey,
}
am.mu.allocatorGroups = make(map[string]*allocatorGroup)
am.mu.clusterDCLocations = make(map[string]*DCLocationInfo)
Expand Down
6 changes: 4 additions & 2 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
"google.golang.org/grpc"
)

const ttlSeconds = 3

// Allocator is a Timestamp Oracle allocator.
type Allocator interface {
// Initialize is used to initialize a TSO allocator.
Expand Down Expand Up @@ -696,7 +698,7 @@ func (gta *GlobalTSOAllocator) registerAllocator() error {
}

func (gta *GlobalTSOAllocator) renewKeepalive() <-chan *clientv3.LeaseKeepAliveResponse {
t := time.NewTicker(time.Duration(3) * time.Second / 2)
t := time.NewTicker(time.Duration(ttlSeconds) * time.Second / 2)
defer t.Stop()
for {
select {
Expand All @@ -712,7 +714,7 @@ func (gta *GlobalTSOAllocator) renewKeepalive() <-chan *clientv3.LeaseKeepAliveR
func (gta *GlobalTSOAllocator) txnWithTTL(key, value string) (clientv3.LeaseID, error) {
ctx, cancel := context.WithTimeout(gta.ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
grantResp, err := gta.am.etcdClient.Grant(ctx, 3)
grantResp, err := gta.am.etcdClient.Grant(ctx, ttlSeconds)
if err != nil {
return 0, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,9 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
storage = kgm.tsoSvcStorage
}
// Initialize all kinds of maps.
allocatorKeyPrefix := endpoint.GlobalTSOAllocatorsPrefix(kgm.clusterID)
am := NewAllocatorManager(kgm.ctx, kgm.etcdClient, group.ID, participant, tsRootPath, storage, kgm.cfg, true,
path.Join(endpoint.GlobalTSOAllocatorsPrefix(kgm.clusterID), "tso"))
allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "tso", fmt.Sprintf("keyspace_group_%d", group.ID)))
log.Info("created allocator manager",
zap.Uint32("keyspace-group-id", group.ID),
zap.String("timestamp-path", am.GetTimestampPath("")))
Expand Down
3 changes: 2 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,9 @@ func (s *Server) startServer(ctx context.Context) error {
s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.pdProtoFactory = &tsoutil.PDProtoFactory{}
if !s.IsAPIServiceMode() {
allocatorKeyPrefix := endpoint.GlobalTSOAllocatorsPrefix(s.clusterID.Load())
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, s.client, mcs.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false,
path.Join(endpoint.GlobalTSOAllocatorsPrefix(s.clusterID.Load()), "pd"))
allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "pd"))
// When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists.
if !s.cfg.EnableLocalTSO {
if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil {
Expand Down

0 comments on commit 7f23d25

Please sign in to comment.