Skip to content

Commit

Permalink
client/tso: decouple tsoClient and tsoDispatcher (#8134)
Browse files Browse the repository at this point in the history
ref #8047

- Decouple `tsoClient` and `tsoDispatcher` to appropriately categorize different methods.
- Update `connectionCtxsUpdater` to support the Local TSO.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored Apr 30, 2024
1 parent 8ee18a4 commit 1679dbc
Show file tree
Hide file tree
Showing 5 changed files with 450 additions and 419 deletions.
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (k *serviceModeKeeper) close() {
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
if k.tsoClient != nil {
k.tsoClient.Close()
k.tsoClient.close()
}
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
}
Expand Down Expand Up @@ -651,11 +651,11 @@ func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) {
log.Warn("[pd] intend to switch to unknown service mode, just return")
return
}
newTSOCli.Setup()
newTSOCli.setup()
// Replace the old TSO client.
oldTSOClient := c.tsoClient
c.tsoClient = newTSOCli
oldTSOClient.Close()
oldTSOClient.close()
// Replace the old TSO service discovery if needed.
oldTSOSvcDiscovery := c.tsoSvcDiscovery
// If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to PD service mode and
Expand Down
4 changes: 3 additions & 1 deletion client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,11 @@ func (tbc *tsoBatchController) adjustBestBatchSize() {
func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) {
for i := 0; i < tbc.collectedRequestCount; i++ {
tsoReq := tbc.collectedRequests[i]
// Retrieve the request context before the request is done to trace without race.
requestCtx := tsoReq.requestCtx
tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
defer trace.StartRegion(tsoReq.requestCtx, "pdclient.tsoReqDequeue").End() // nolint
tsoReq.tryDone(err)
trace.StartRegion(requestCtx, "pdclient.tsoReqDequeue").End()
}
// Prevent the finished requests from being processed again.
tbc.collectedRequestCount = 0
Expand Down
Loading

0 comments on commit 1679dbc

Please sign in to comment.