Skip to content

Commit

Permalink
register immediately
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Mar 11, 2024
1 parent acf9c7d commit 0dfa1a4
Showing 1 changed file with 33 additions and 23 deletions.
56 changes: 33 additions & 23 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,38 +739,48 @@ func (gta *GlobalTSOAllocator) deregisterAllocator() error {
}

func (gta *GlobalTSOAllocator) tryRegister() <-chan *clientv3.LeaseKeepAliveResponse {
// register immediately
kresp, needRetry := gta.register()
if !needRetry {
return kresp
}
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
outerLoop:
for {
select {
case <-gta.ctx.Done():
return nil
case <-ticker.C:
ctx, cancel := context.WithTimeout(gta.ctx, time.Duration(3)*time.Second)
resp, err := gta.am.etcdClient.Get(ctx, gta.am.allocatorKeyPrefix, clientv3.WithPrefix())
cancel()
if err != nil {
continue
}
// wait for the previous allocator with different mode to be deregistered
if len(resp.Kvs) > 0 {
for _, kv := range resp.Kvs {
key := string(kv.Key)
if !strings.Contains(key, gta.am.allocatorKeyPrefix) {
continue outerLoop
}
}
if kresp, needRetry := gta.register(); !needRetry {
return kresp

Check warning on line 755 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L755

Added line #L755 was not covered by tests
}
id, err := gta.txnWithTTL(gta.am.allocatorKey, "")
if err != nil {
continue
}
kresp, err := gta.am.etcdClient.KeepAlive(gta.ctx, id)
if err != nil {
continue
}
}
}

func (gta *GlobalTSOAllocator) register() (<-chan *clientv3.LeaseKeepAliveResponse, bool) {
ctx, cancel := context.WithTimeout(gta.ctx, time.Duration(3)*time.Second)
resp, err := gta.am.etcdClient.Get(ctx, gta.am.allocatorKeyPrefix, clientv3.WithPrefix())
cancel()
if err != nil {
return nil, true
}
// wait for the previous allocator with different mode to be deregistered
if len(resp.Kvs) > 0 {
for _, kv := range resp.Kvs {
key := string(kv.Key)
if !strings.Contains(key, gta.am.allocatorKeyPrefix) {
return nil, true

Check warning on line 773 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L773

Added line #L773 was not covered by tests
}
return kresp
}
}
id, err := gta.txnWithTTL(gta.am.allocatorKey, "")
if err != nil {
return nil, true
}
kresp, err := gta.am.etcdClient.KeepAlive(gta.ctx, id)
if err != nil {
return nil, true

Check warning on line 783 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L783

Added line #L783 was not covered by tests
}
return kresp, false
}

0 comments on commit 0dfa1a4

Please sign in to comment.