Skip to content

Commit

Permalink
Extract connectionCtxsUpdater
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Apr 25, 2024
1 parent 4eaa182 commit d64c3f6
Showing 1 changed file with 53 additions and 44 deletions.
97 changes: 53 additions & 44 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,50 +336,7 @@ func (c *tsoClient) handleDispatcher(
c.wg.Done()
}()
// Daemon goroutine to update the connectionCtxs periodically and handle the `connectionCtxs` update event.
go func() {
var updateTicker = &time.Ticker{}
setNewUpdateTicker := func(ticker *time.Ticker) {
if updateTicker.C != nil {
updateTicker.Stop()
}
updateTicker = ticker
}
// Set to nil before returning to ensure that the existing ticker can be GC.
defer setNewUpdateTicker(nil)

for {
c.updateTSOConnectionCtxs(ctx, dc, &connectionCtxs)
select {
case <-ctx.Done():
return
case <-c.option.enableTSOFollowerProxyCh:
// TODO: implement TSO Follower Proxy support for the Local TSO.
if dc != globalDCLocation {
continue
}
enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy()
log.Info("[tso] tso follower proxy status changed",
zap.String("dc-location", dc),
zap.Bool("enable", enableTSOFollowerProxy))
if enableTSOFollowerProxy && updateTicker.C == nil {
// Because the TSO Follower Proxy is enabled,
// the periodic check needs to be performed.
setNewUpdateTicker(time.NewTicker(memberUpdateInterval))
} else if !enableTSOFollowerProxy && updateTicker.C != nil {
// Because the TSO Follower Proxy is disabled,
// the periodic check needs to be turned off.
setNewUpdateTicker(&time.Ticker{})
} else {
// The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered
continue
}
case <-updateTicker.C:
// Triggered periodically when the TSO Follower Proxy is enabled.
case <-c.updateTSOConnectionCtxsCh:
// Triggered by the Global TSO Allocator leader change.
}
}
}()
go c.connectionCtxsUpdater(ctx, dc, &connectionCtxs)

// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(c.option.timeout)
Expand Down Expand Up @@ -521,6 +478,58 @@ tsoBatchLoop:
}
}

// updateTSOConnectionCtxs updates the `connectionCtxs` for the specified DC location regularly.
func (c *tsoClient) connectionCtxsUpdater(
ctx context.Context,
dc string,
connectionCtxs *sync.Map,
) {
log.Info("[tso] start tso connection contexts updater", zap.String("dc-location", dc))
var updateTicker = &time.Ticker{}
setNewUpdateTicker := func(ticker *time.Ticker) {
if updateTicker.C != nil {
updateTicker.Stop()
}
updateTicker = ticker
}
// Set to nil before returning to ensure that the existing ticker can be GC.
defer setNewUpdateTicker(nil)

for {
c.updateTSOConnectionCtxs(ctx, dc, connectionCtxs)
select {
case <-ctx.Done():
log.Info("[tso] exit tso connection contexts updater", zap.String("dc-location", dc))
return
case <-c.option.enableTSOFollowerProxyCh:
// TODO: implement TSO Follower Proxy support for the Local TSO.
if dc != globalDCLocation {
continue
}
enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy()
log.Info("[tso] tso follower proxy status changed",
zap.String("dc-location", dc),
zap.Bool("enable", enableTSOFollowerProxy))
if enableTSOFollowerProxy && updateTicker.C == nil {
// Because the TSO Follower Proxy is enabled,
// the periodic check needs to be performed.
setNewUpdateTicker(time.NewTicker(memberUpdateInterval))
} else if !enableTSOFollowerProxy && updateTicker.C != nil {
// Because the TSO Follower Proxy is disabled,
// the periodic check needs to be turned off.
setNewUpdateTicker(&time.Ticker{})
} else {

Check warning on line 521 in client/tso_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

client/tso_dispatcher.go#L520-L521

Added lines #L520 - L521 were not covered by tests
// The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered
continue

Check warning on line 523 in client/tso_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

client/tso_dispatcher.go#L523

Added line #L523 was not covered by tests
}
case <-updateTicker.C:

Check warning on line 525 in client/tso_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

client/tso_dispatcher.go#L525

Added line #L525 was not covered by tests
// Triggered periodically when the TSO Follower Proxy is enabled.
case <-c.updateTSOConnectionCtxsCh:
// Triggered by the leader/follower change.
}
}
}

// chooseStream uses the reservoir sampling algorithm to randomly choose a connection.
// connectionCtxs will only have only one stream to choose when the TSO Follower Proxy is off.
func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext) {
Expand Down

0 comments on commit d64c3f6

Please sign in to comment.