From d64c3f60a5f010cf11877d114960aecc4a51ca20 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 25 Apr 2024 11:55:46 +0800 Subject: [PATCH] Extract connectionCtxsUpdater Signed-off-by: JmPotato --- client/tso_dispatcher.go | 97 ++++++++++++++++++++++------------------ 1 file changed, 53 insertions(+), 44 deletions(-) diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 9fc7596b2368..d14e9222d3ed 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -336,50 +336,7 @@ func (c *tsoClient) handleDispatcher( c.wg.Done() }() // Daemon goroutine to update the connectionCtxs periodically and handle the `connectionCtxs` update event. - go func() { - var updateTicker = &time.Ticker{} - setNewUpdateTicker := func(ticker *time.Ticker) { - if updateTicker.C != nil { - updateTicker.Stop() - } - updateTicker = ticker - } - // Set to nil before returning to ensure that the existing ticker can be GC. - defer setNewUpdateTicker(nil) - - for { - c.updateTSOConnectionCtxs(ctx, dc, &connectionCtxs) - select { - case <-ctx.Done(): - return - case <-c.option.enableTSOFollowerProxyCh: - // TODO: implement TSO Follower Proxy support for the Local TSO. - if dc != globalDCLocation { - continue - } - enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() - log.Info("[tso] tso follower proxy status changed", - zap.String("dc-location", dc), - zap.Bool("enable", enableTSOFollowerProxy)) - if enableTSOFollowerProxy && updateTicker.C == nil { - // Because the TSO Follower Proxy is enabled, - // the periodic check needs to be performed. - setNewUpdateTicker(time.NewTicker(memberUpdateInterval)) - } else if !enableTSOFollowerProxy && updateTicker.C != nil { - // Because the TSO Follower Proxy is disabled, - // the periodic check needs to be turned off. - setNewUpdateTicker(&time.Ticker{}) - } else { - // The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered - continue - } - case <-updateTicker.C: - // Triggered periodically when the TSO Follower Proxy is enabled. - case <-c.updateTSOConnectionCtxsCh: - // Triggered by the Global TSO Allocator leader change. - } - } - }() + go c.connectionCtxsUpdater(ctx, dc, &connectionCtxs) // Loop through each batch of TSO requests and send them for processing. streamLoopTimer := time.NewTimer(c.option.timeout) @@ -521,6 +478,58 @@ tsoBatchLoop: } } +// updateTSOConnectionCtxs updates the `connectionCtxs` for the specified DC location regularly. +func (c *tsoClient) connectionCtxsUpdater( + ctx context.Context, + dc string, + connectionCtxs *sync.Map, +) { + log.Info("[tso] start tso connection contexts updater", zap.String("dc-location", dc)) + var updateTicker = &time.Ticker{} + setNewUpdateTicker := func(ticker *time.Ticker) { + if updateTicker.C != nil { + updateTicker.Stop() + } + updateTicker = ticker + } + // Set to nil before returning to ensure that the existing ticker can be GC. + defer setNewUpdateTicker(nil) + + for { + c.updateTSOConnectionCtxs(ctx, dc, connectionCtxs) + select { + case <-ctx.Done(): + log.Info("[tso] exit tso connection contexts updater", zap.String("dc-location", dc)) + return + case <-c.option.enableTSOFollowerProxyCh: + // TODO: implement TSO Follower Proxy support for the Local TSO. + if dc != globalDCLocation { + continue + } + enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() + log.Info("[tso] tso follower proxy status changed", + zap.String("dc-location", dc), + zap.Bool("enable", enableTSOFollowerProxy)) + if enableTSOFollowerProxy && updateTicker.C == nil { + // Because the TSO Follower Proxy is enabled, + // the periodic check needs to be performed. + setNewUpdateTicker(time.NewTicker(memberUpdateInterval)) + } else if !enableTSOFollowerProxy && updateTicker.C != nil { + // Because the TSO Follower Proxy is disabled, + // the periodic check needs to be turned off. + setNewUpdateTicker(&time.Ticker{}) + } else { + // The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered + continue + } + case <-updateTicker.C: + // Triggered periodically when the TSO Follower Proxy is enabled. + case <-c.updateTSOConnectionCtxsCh: + // Triggered by the leader/follower change. + } + } +} + // chooseStream uses the reservoir sampling algorithm to randomly choose a connection. // connectionCtxs will only have only one stream to choose when the TSO Follower Proxy is off. func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext) {