Skip to content

Commit

Permalink
Add implementation of making tsoStream asynchronous
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta committed Aug 5, 2024
1 parent 9af28fc commit 140a7c2
Show file tree
Hide file tree
Showing 4 changed files with 362 additions and 108 deletions.
24 changes: 17 additions & 7 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ func initAndRegisterMetrics(constLabels prometheus.Labels) {
}

var (
cmdDuration *prometheus.HistogramVec
cmdFailedDuration *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
tsoBestBatchSize prometheus.Histogram
tsoBatchSize prometheus.Histogram
tsoBatchSendLatency prometheus.Histogram
requestForwarded *prometheus.GaugeVec
cmdDuration *prometheus.HistogramVec
cmdFailedDuration *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
tsoBestBatchSize prometheus.Histogram
tsoBatchSize prometheus.Histogram
tsoBatchSendLatency prometheus.Histogram
requestForwarded *prometheus.GaugeVec
ongoingRequestCountGauge *prometheus.GaugeVec
)

func initMetrics(constLabels prometheus.Labels) {
Expand Down Expand Up @@ -117,6 +118,15 @@ func initMetrics(constLabels prometheus.Labels) {
Help: "The status to indicate if the request is forwarded",
ConstLabels: constLabels,
}, []string{"host", "delegate"})

ongoingRequestCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "ongoing_requests_count",
Help: "Current count of ongoing batch tso requests",
ConstLabels: constLabels,
}, []string{"stream"})
}

var (
Expand Down
83 changes: 56 additions & 27 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,42 +30,79 @@ type tsoBatchController struct {
// bestBatchSize is a dynamic size that changed based on the current batch effect.
bestBatchSize int

tsoRequestCh chan *tsoRequest
collectedRequests []*tsoRequest
collectedRequestCount int

batchStartTime time.Time
// The time after getting the first request and the token, and before performing extra batching.
extraBatchingStartTime time.Time
}

func newTSOBatchController(tsoRequestCh chan *tsoRequest, maxBatchSize int) *tsoBatchController {
func newTSOBatchController(maxBatchSize int) *tsoBatchController {
return &tsoBatchController{
maxBatchSize: maxBatchSize,
bestBatchSize: 8, /* Starting from a low value is necessary because we need to make sure it will be converged to (current_batch_size - 4) */
tsoRequestCh: tsoRequestCh,
collectedRequests: make([]*tsoRequest, maxBatchSize+1),
collectedRequestCount: 0,
}
}

// fetchPendingRequests will start a new round of the batch collecting from the channel.
// It returns true if everything goes well, otherwise false which means we should stop the service.
func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, maxBatchWaitInterval time.Duration) error {
var firstRequest *tsoRequest
select {
case <-ctx.Done():
return ctx.Err()
case firstRequest = <-tbc.tsoRequestCh:
}
// Start to batch when the first TSO request arrives.
tbc.batchStartTime = time.Now()
// It returns nil error if everything goes well, otherwise a non-nil error which means we should stop the service.
// It's guaranteed that if this function failed after collecting some requests, then these requests will be cancelled
// when the function returns, so the caller don't need to clear them manually.
func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, tsoRequestCh <-chan *tsoRequest, tokenCh chan struct{}, maxBatchWaitInterval time.Duration) (errRet error) {
var tokenAcquired bool
defer func() {
if errRet != nil {
// Something went wrong when collecting a batch of requests. Release the token and cancel collected requests
// if any.
if tokenAcquired {
tokenCh <- struct{}{}
}
if tbc.collectedRequestCount > 0 {
tbc.finishCollectedRequests(0, 0, 0, errRet)
}
}
}()

// Wait until BOTH the first request and the token have arrived.
// TODO: `tbc.collectedRequestCount` should never be non-empty here. Consider do assertion here.
tbc.collectedRequestCount = 0
tbc.pushRequest(firstRequest)
for {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-tsoRequestCh:
// Start to batch when the first TSO request arrives.
tbc.pushRequest(req)
// A request arrives but the token is not ready yet. Continue waiting, and also allowing collecting the next
// request if it arrives.
continue
case <-tokenCh:
tokenAcquired = true
}

// The token is ready. If the first request didn't arrive, wait for it.
if tbc.collectedRequestCount == 0 {
select {
case <-ctx.Done():
return ctx.Err()
case firstRequest := <-tsoRequestCh:
tbc.pushRequest(firstRequest)
}
}

// Both token and the first request have arrived.
break
}

tbc.extraBatchingStartTime = time.Now()

// This loop is for trying best to collect more requests, so we use `tbc.maxBatchSize` here.
fetchPendingRequestsLoop:
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
case tsoReq := <-tsoRequestCh:
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
Expand All @@ -88,7 +125,7 @@ fetchPendingRequestsLoop:
defer after.Stop()
for tbc.collectedRequestCount < tbc.bestBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
case tsoReq := <-tsoRequestCh:
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
Expand All @@ -103,7 +140,7 @@ fetchPendingRequestsLoop:
// we can adjust the `tbc.bestBatchSize` dynamically later.
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
case tsoReq := <-tsoRequestCh:
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -149,18 +186,10 @@ func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical in
tbc.collectedRequestCount = 0
}

func (tbc *tsoBatchController) revokePendingRequests(err error) {
for i := 0; i < len(tbc.tsoRequestCh); i++ {
req := <-tbc.tsoRequestCh
req.tryDone(err)
}
}

func (tbc *tsoBatchController) clear() {
log.Info("[pd] clear the tso batch controller",
zap.Int("max-batch-size", tbc.maxBatchSize), zap.Int("best-batch-size", tbc.bestBatchSize),
zap.Int("collected-request-count", tbc.collectedRequestCount), zap.Int("pending-request-count", len(tbc.tsoRequestCh)))
zap.Int("collected-request-count", tbc.collectedRequestCount))
tsoErr := errors.WithStack(errClosing)
tbc.finishCollectedRequests(0, 0, 0, tsoErr)
tbc.revokePendingRequests(tsoErr)
}
Loading

0 comments on commit 140a7c2

Please sign in to comment.