Skip to content

Commit

Permalink
Use the TSO request pool at the tsoClient level to avoid data race
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Apr 16, 2024
1 parent 22543a9 commit 7f5f1e7
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
}

func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest {
tsoReqPool := c.tsoClient.tsoReqPool
req := tsoReqPool.Get().(*tsoRequest)
// Set needed fields in the request before using it.
req.start = time.Now()
Expand All @@ -814,6 +815,7 @@ func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoReque
req.physical = 0
req.logical = 0
req.dcLocation = dcLocation
req.pool = tsoReqPool
return req
}

Expand Down
31 changes: 17 additions & 14 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,8 @@ type tsoRequest struct {
physical int64
logical int64
dcLocation string
}

var tsoReqPool = sync.Pool{
New: func() any {
return &tsoRequest{
done: make(chan error, 1),
physical: 0,
logical: 0,
}
},
pool *sync.Pool
}

func (req *tsoRequest) tryDone(err error) {
Expand All @@ -84,6 +76,8 @@ type tsoClient struct {
// tso allocator leader is switched.
tsoAllocServingURLSwitchedCallback []func()

// tsoReqPool is the pool to recycle `*tsoRequest`.
tsoReqPool *sync.Pool
// tsoDispatcher is used to dispatch different TSO requests to
// the corresponding dc-location TSO channel.
tsoDispatcher sync.Map // Same as map[string]*tsoDispatcher
Expand All @@ -104,11 +98,20 @@ func newTSOClient(
) *tsoClient {
ctx, cancel := context.WithCancel(ctx)
c := &tsoClient{
ctx: ctx,
cancel: cancel,
option: option,
svcDiscovery: svcDiscovery,
tsoStreamBuilderFactory: factory,
ctx: ctx,
cancel: cancel,
option: option,
svcDiscovery: svcDiscovery,
tsoStreamBuilderFactory: factory,
tsoReqPool: &sync.Pool{
New: func() any {
return &tsoRequest{
done: make(chan error, 1),
physical: 0,
logical: 0,
}
},
},
checkTSDeadlineCh: make(chan struct{}),
checkTSODispatcherCh: make(chan struct{}, 1),
updateTSOConnectionCtxsCh: make(chan struct{}, 1),
Expand Down
2 changes: 1 addition & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (req *tsoRequest) Wait() (physical int64, logical int64, err error) {
case err = <-req.done:
defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqDone").End()
err = errors.WithStack(err)
defer tsoReqPool.Put(req)
defer req.pool.Put(req)
if err != nil {
cmdFailDurationTSO.Observe(time.Since(req.start).Seconds())
return 0, 0, err
Expand Down

0 comments on commit 7f5f1e7

Please sign in to comment.