From 4c6b2171b262807c9197bdb58a82d4626fb0495f Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 13 Aug 2024 12:55:44 +0800 Subject: [PATCH] improve batch efficiency for high throughput workloads (#1411) * improve batch efficiency by multiple attempts Signed-off-by: zyguan * add support for aggressive batching Signed-off-by: zyguan * add some metrics for batch client Signed-off-by: zyguan * fetch more requests according to recent wait head durs Signed-off-by: zyguan * add experimental batch options Signed-off-by: zyguan * optimize and refactor Signed-off-by: zyguan * some minor updates Signed-off-by: zyguan * fix the metric of head arrival interval Signed-off-by: zyguan * a minor update Signed-off-by: zyguan * some minor fixes Signed-off-by: zyguan * update according to the spec Signed-off-by: zyguan * fix the ut Signed-off-by: zyguan * fix batch condition Signed-off-by: zyguan * add some doc comments Signed-off-by: zyguan * address https://github.com/tikv/client-go/pull/1411#discussion_r1714086744 Signed-off-by: zyguan * rename some vars according to the comments Signed-off-by: zyguan * add more comments Signed-off-by: zyguan --------- Signed-off-by: zyguan --- config/client.go | 16 ++ internal/client/client.go | 4 +- internal/client/client_batch.go | 304 +++++++++++++++++++++++++++----- internal/client/client_test.go | 75 +++++++- metrics/metrics.go | 86 ++++++--- metrics/shortcuts.go | 9 +- 6 files changed, 422 insertions(+), 72 deletions(-) diff --git a/config/client.go b/config/client.go index 4b8811c48..83b4155f5 100644 --- a/config/client.go +++ b/config/client.go @@ -48,6 +48,18 @@ const ( DefGrpcInitialWindowSize = 1 << 27 // 128MiB DefGrpcInitialConnWindowSize = 1 << 27 // 128MiB DefMaxConcurrencyRequestLimit = math.MaxInt64 + DefBatchPolicy = BatchPolicyStandard +) + +const ( + // BatchPolicyBasic is the basic batch policy whose behavior is consistent with versions before v8.3.0. + BatchPolicyBasic = "basic" + // BatchPolicyStandard dynamically batches requests based the arrival time intervals of recent requests. + BatchPolicyStandard = "standard" + // BatchPolicyPositive always performs additional batching. + BatchPolicyPositive = "positive" + // BatchPolicyCustom allows users to customize the internal batch options. + BatchPolicyCustom = "custom" ) // TiKVClient is the config for tikv client. @@ -72,6 +84,9 @@ type TiKVClient struct { // CommitTimeout is the max time which command 'commit' will wait. CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"` AsyncCommit AsyncCommit `toml:"async-commit" json:"async-commit"` + + // BatchPolicy is the policy for batching requests. + BatchPolicy string `toml:"batch-policy" json:"batch-policy"` // MaxBatchSize is the max batch size when calling batch commands API. MaxBatchSize uint `toml:"max-batch-size" json:"max-batch-size"` // If TiKV load is greater than this, TiDB will wait for a while to avoid little batch. @@ -153,6 +168,7 @@ func DefaultTiKVClient() TiKVClient { AllowedClockDrift: 500 * time.Millisecond, }, + BatchPolicy: DefBatchPolicy, MaxBatchSize: 128, OverloadThreshold: 200, MaxBatchWaitTime: 0, diff --git a/internal/client/client.go b/internal/client/client.go index fb3249957..10bb7ed5b 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -300,8 +300,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint allowBatch := (cfg.TiKVClient.MaxBatchSize > 0) && enableBatch if allowBatch { a.batchConn = newBatchConn(uint(len(a.v)), cfg.TiKVClient.MaxBatchSize, idleNotify) - a.pendingRequests = metrics.TiKVBatchPendingRequests.WithLabelValues(a.target) - a.batchSize = metrics.TiKVBatchRequests.WithLabelValues(a.target) + a.batchConn.initMetrics(a.target) } keepAlive := cfg.TiKVClient.GrpcKeepAliveTime keepAliveTimeout := cfg.TiKVClient.GrpcKeepAliveTimeout @@ -365,6 +364,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint dialTimeout: a.dialTimeout, tryLock: tryLock{sync.NewCond(new(sync.Mutex)), false}, eventListener: eventListener, + metrics: &a.metrics, } batchClient.maxConcurrencyRequestLimit.Store(cfg.TiKVClient.MaxConcurrencyRequestLimit) a.batchCommandsClients = append(a.batchCommandsClients, batchClient) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index cfbf88cc5..0b23d72aa 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -37,9 +37,12 @@ package client import ( "context" + "encoding/json" "fmt" "math" + "runtime" "runtime/trace" + "strings" "sync" "sync/atomic" "time" @@ -72,6 +75,11 @@ type batchCommandsEntry struct { canceled int32 err error pri uint64 + + // start indicates when the batch commands entry is generated and sent to the batch conn channel. + start time.Time + sendLat int64 + recvLat int64 } func (b *batchCommandsEntry) isCanceled() bool { @@ -98,6 +106,8 @@ type batchCommandsBuilder struct { requestIDs []uint64 // In most cases, there isn't any forwardingReq. forwardingReqs map[string]*tikvpb.BatchCommandsRequest + + latestReqStartTime time.Time } func (b *batchCommandsBuilder) len() int { @@ -106,6 +116,9 @@ func (b *batchCommandsBuilder) len() int { func (b *batchCommandsBuilder) push(entry *batchCommandsEntry) { b.entries.Push(entry) + if entry.start.After(b.latestReqStartTime) { + b.latestReqStartTime = entry.start + } } const highTaskPriority = 10 @@ -208,6 +221,23 @@ func newBatchCommandsBuilder(maxBatchSize uint) *batchCommandsBuilder { } } +type batchConnMetrics struct { + pendingRequests prometheus.Observer + batchSize prometheus.Observer + + sendLoopWaitHeadDur prometheus.Observer + sendLoopWaitMoreDur prometheus.Observer + sendLoopSendDur prometheus.Observer + + recvLoopRecvDur prometheus.Observer + recvLoopProcessDur prometheus.Observer + + headArrivalInterval prometheus.Observer + batchMoreRequests prometheus.Observer + + bestBatchSize prometheus.Observer +} + type batchConn struct { // An atomic flag indicates whether the batch is idle or not. // 0 for busy, others for idle. @@ -225,10 +255,11 @@ type batchConn struct { idleNotify *uint32 idleDetect *time.Timer - pendingRequests prometheus.Observer - batchSize prometheus.Observer + fetchMoreTimer *time.Timer index uint32 + + metrics batchConnMetrics } func newBatchConn(connCount, maxBatchSize uint, idleNotify *uint32) *batchConn { @@ -243,15 +274,27 @@ func newBatchConn(connCount, maxBatchSize uint, idleNotify *uint32) *batchConn { } } +func (a *batchConn) initMetrics(target string) { + a.metrics.pendingRequests = metrics.TiKVBatchPendingRequests.WithLabelValues(target) + a.metrics.batchSize = metrics.TiKVBatchRequests.WithLabelValues(target) + a.metrics.sendLoopWaitHeadDur = metrics.TiKVBatchSendLoopDuration.WithLabelValues(target, "wait-head") + a.metrics.sendLoopWaitMoreDur = metrics.TiKVBatchSendLoopDuration.WithLabelValues(target, "wait-more") + a.metrics.sendLoopSendDur = metrics.TiKVBatchSendLoopDuration.WithLabelValues(target, "send") + a.metrics.recvLoopRecvDur = metrics.TiKVBatchRecvLoopDuration.WithLabelValues(target, "recv") + a.metrics.recvLoopProcessDur = metrics.TiKVBatchRecvLoopDuration.WithLabelValues(target, "process") + a.metrics.headArrivalInterval = metrics.TiKVBatchHeadArrivalInterval.WithLabelValues(target) + a.metrics.batchMoreRequests = metrics.TiKVBatchMoreRequests.WithLabelValues(target) + a.metrics.bestBatchSize = metrics.TiKVBatchBestSize.WithLabelValues(target) +} + func (a *batchConn) isIdle() bool { return atomic.LoadUint32(&a.idle) != 0 } // fetchAllPendingRequests fetches all pending requests from the channel. -func (a *batchConn) fetchAllPendingRequests( - maxBatchSize int, -) time.Time { +func (a *batchConn) fetchAllPendingRequests(maxBatchSize int) (headRecvTime time.Time, headArrivalInterval time.Duration) { // Block on the first element. + latestReqStartTime := a.reqBuilder.latestReqStartTime var headEntry *batchCommandsEntry select { case headEntry = <-a.batchCommandsCh: @@ -264,14 +307,17 @@ func (a *batchConn) fetchAllPendingRequests( atomic.AddUint32(&a.idle, 1) atomic.CompareAndSwapUint32(a.idleNotify, 0, 1) // This batchConn to be recycled - return time.Now() + return time.Now(), 0 case <-a.closed: - return time.Now() + return time.Now(), 0 } if headEntry == nil { - return time.Now() + return time.Now(), 0 + } + headRecvTime = time.Now() + if headEntry.start.After(latestReqStartTime) && !latestReqStartTime.IsZero() { + headArrivalInterval = headEntry.start.Sub(latestReqStartTime) } - ts := time.Now() a.reqBuilder.push(headEntry) // This loop is for trying best to collect more requests. @@ -279,14 +325,14 @@ func (a *batchConn) fetchAllPendingRequests( select { case entry := <-a.batchCommandsCh: if entry == nil { - return ts + return } a.reqBuilder.push(entry) default: - return ts + return } } - return ts + return } // fetchMorePendingRequests fetches more pending requests from the channel. @@ -296,23 +342,33 @@ func (a *batchConn) fetchMorePendingRequests( maxWaitTime time.Duration, ) { // Try to collect `batchWaitSize` requests, or wait `maxWaitTime`. - after := time.NewTimer(maxWaitTime) + if a.fetchMoreTimer == nil { + a.fetchMoreTimer = time.NewTimer(maxWaitTime) + } else { + a.fetchMoreTimer.Reset(maxWaitTime) + } for a.reqBuilder.len() < batchWaitSize { select { case entry := <-a.batchCommandsCh: if entry == nil { + if !a.fetchMoreTimer.Stop() { + <-a.fetchMoreTimer.C + } return } a.reqBuilder.push(entry) - case <-after.C: + case <-a.fetchMoreTimer.C: return } } - after.Stop() + if !a.fetchMoreTimer.Stop() { + <-a.fetchMoreTimer.C + } // Do an additional non-block try. Here we test the length with `maxBatchSize` instead // of `batchWaitSize` because trying best to fetch more requests is necessary so that // we can adjust the `batchWaitSize` dynamically. + yielded := false for a.reqBuilder.len() < maxBatchSize { select { case entry := <-a.batchCommandsCh: @@ -321,16 +377,140 @@ func (a *batchConn) fetchMorePendingRequests( } a.reqBuilder.push(entry) default: - return + if yielded { + return + } + // yield once to batch more requests. + runtime.Gosched() + yielded = true } } } const idleTimeout = 3 * time.Minute +var ( + // presetBatchPolicies defines a set of [turboBatchOptions] as batch policies. + presetBatchPolicies = map[string]turboBatchOptions{ + config.BatchPolicyBasic: {}, + config.BatchPolicyStandard: {V: turboBatchTimeBased, T: 0.0001, N: 5, W: 0.2, P: 0.8, Q: 0.8}, + config.BatchPolicyPositive: {V: turboBatchAlways, T: 0.0001}, + } +) + +const ( + turboBatchAlways = iota + turboBatchTimeBased + turboBatchProbBased +) + +// turboBatchOptions defines internal options for the [turboBatchTrigger]. +type turboBatchOptions struct { + // V determines the batch strategy: always(v=0), time-based(v=1), prob-based(v=2). + V int `json:"v"` + // N currently is used to determine the max arrival interval (n * t). + N int `json:"n,omitempty"` + // T is the max wait time for the batch. + T float64 `json:"t,omitempty"` + // W is used to adjust the `estArrivalInterval` or `estFetchMoreProb` dynamically. + // - time-based(v=1): estArrivalInterval = w*reqArrivalInterval + (1-w)*estArrivalInterval + // - prob-based(v=2): estFetchMoreProb = w*thisProb + (1-w)*estFetchMoreProb + W float64 `json:"w,omitempty"` + // P is used to determine whether to fetch more requests: + // - time-based(v=1): estArrivalInterval < p * t + // - prob-based(v=2): estFetchMoreProb > p + P float64 `json:"p,omitempty"` + // Q is used to adjust the `batchWaitSize` dynamically. + Q float64 `json:"q,omitempty"` +} + +// turboBatchTrigger is used to trigger the `fetchMorePendingRequests` dynamically according to the request arrival +// intervals. The option `v` indicates the strategy of triggering: +// +// - turboBatchAlways: always fetch more requests. +// +// - turboBatchTimeBased: fetch more requests if estArrivalInterval < p * t +// where estArrivalInterval = w*reqArrivalInterval + (1-w)*estArrivalInterval +// and reqArrivalInterval = min(reqArrivalInterval, n * t) +// +// - turboBatchProbBased: fetch more requests if estFetchMoreProb > p +// where estFetchMoreProb = w*thisProb + (1-w)*estFetchMoreProb +// and thisProb = reqArrivalInterval < t ? 1 : 0 +// +// The option `q` is used to adjust the `batchWaitSize` dynamically. If the fractional part of the `avgBatchWaitSize` is +// greater or equal to `q`, the `batchWaitSize` will be increased by 1. +type turboBatchTrigger struct { + opts turboBatchOptions + + estFetchMoreProb float64 + estArrivalInterval float64 + maxArrivalInterval float64 +} + +func newTurboBatchTriggerFromPolicy(policy string) (trigger turboBatchTrigger, ok bool) { + if opts, found := presetBatchPolicies[policy]; found { + return turboBatchTrigger{opts: opts}, true + } + rawOpts, _ := strings.CutPrefix(policy, config.BatchPolicyCustom) + if err := json.Unmarshal([]byte(strings.TrimSpace(rawOpts)), &trigger.opts); err != nil { + return turboBatchTrigger{opts: presetBatchPolicies[config.DefBatchPolicy]}, false + } + ok = true + return +} + +func (t *turboBatchTrigger) turboWaitSeconds() float64 { + return t.opts.T +} + +func (t *turboBatchTrigger) turboWaitTime() time.Duration { + return time.Duration(t.opts.T * float64(time.Second)) +} + +func (t *turboBatchTrigger) needFetchMore(reqArrivalInterval time.Duration) bool { + if t.opts.V == turboBatchTimeBased { + thisArrivalInterval := reqArrivalInterval.Seconds() + if t.maxArrivalInterval == 0 { + t.maxArrivalInterval = t.turboWaitSeconds() * float64(t.opts.N) + } + if thisArrivalInterval > t.maxArrivalInterval { + thisArrivalInterval = t.maxArrivalInterval + } + if t.estArrivalInterval == 0 { + t.estArrivalInterval = thisArrivalInterval + } else { + t.estArrivalInterval = t.opts.W*thisArrivalInterval + (1-t.opts.W)*t.estArrivalInterval + } + return t.estArrivalInterval < t.turboWaitSeconds()*t.opts.P + } else if t.opts.V == turboBatchProbBased { + thisProb := .0 + if reqArrivalInterval.Seconds() < t.turboWaitSeconds() { + thisProb = 1 + } + t.estFetchMoreProb = t.opts.W*thisProb + (1-t.opts.W)*t.estFetchMoreProb + return t.estFetchMoreProb > t.opts.P + } else { + return true + } +} + +func (t *turboBatchTrigger) preferredBatchWaitSize(avgBatchWaitSize float64, defBatchWaitSize int) int { + if t.opts.V == turboBatchAlways { + return defBatchWaitSize + } + n, m := math.Modf(avgBatchWaitSize) + batchWaitSize := int(n) + if m >= t.opts.Q { + batchWaitSize++ + } + return batchWaitSize +} + // BatchSendLoopPanicCounter is only used for testing. var BatchSendLoopPanicCounter int64 = 0 +var initBatchPolicyWarn sync.Once + func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { defer func() { if r := recover(); r != nil { @@ -344,11 +524,20 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { } }() - bestBatchWaitSize := cfg.BatchWaitSize + trigger, ok := newTurboBatchTriggerFromPolicy(cfg.BatchPolicy) + if !ok { + initBatchPolicyWarn.Do(func() { + logutil.BgLogger().Warn("fallback to default batch policy due to invalid value", zap.String("value", cfg.BatchPolicy)) + }) + } + turboBatchWaitTime := trigger.turboWaitTime() + + avgBatchWaitSize := float64(cfg.BatchWaitSize) for { + sendLoopStartTime := time.Now() a.reqBuilder.reset() - start := a.fetchAllPendingRequests(int(cfg.MaxBatchSize)) + headRecvTime, headArrivalInterval := a.fetchAllPendingRequests(int(cfg.MaxBatchSize)) // curl -X PUT -d 'return(true)' http://0.0.0.0:10080/fail/tikvclient/mockBlockOnBatchClient if val, err := util.EvalFailpoint("mockBlockOnBatchClient"); err == nil { @@ -357,27 +546,37 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { } } - if a.reqBuilder.len() < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 { - // If the target TiKV is overload, wait a while to collect more requests. - if atomic.LoadUint64(&a.tikvTransportLayerLoad) >= uint64(cfg.OverloadThreshold) { + if batchSize := a.reqBuilder.len(); batchSize < int(cfg.MaxBatchSize) { + if cfg.MaxBatchWaitTime > 0 && atomic.LoadUint64(&a.tikvTransportLayerLoad) > uint64(cfg.OverloadThreshold) { + // If the target TiKV is overload, wait a while to collect more requests. metrics.TiKVBatchWaitOverLoad.Inc() - a.fetchMorePendingRequests(int(cfg.MaxBatchSize), int(bestBatchWaitSize), cfg.MaxBatchWaitTime) + a.fetchMorePendingRequests(int(cfg.MaxBatchSize), int(cfg.BatchWaitSize), cfg.MaxBatchWaitTime) + } else if turboBatchWaitTime > 0 && headArrivalInterval > 0 && trigger.needFetchMore(headArrivalInterval) { + batchWaitSize := trigger.preferredBatchWaitSize(avgBatchWaitSize, int(cfg.BatchWaitSize)) + a.fetchMorePendingRequests(int(cfg.MaxBatchSize), batchWaitSize, turboBatchWaitTime) + a.metrics.batchMoreRequests.Observe(float64(a.reqBuilder.len() - batchSize)) } } - a.pendingRequests.Observe(float64(len(a.batchCommandsCh) + a.reqBuilder.len())) length := a.reqBuilder.len() + a.metrics.pendingRequests.Observe(float64(len(a.batchCommandsCh) + length)) if uint(length) == 0 { // The batch command channel is closed. return - } else if uint(length) < bestBatchWaitSize && bestBatchWaitSize > 1 { - // Waits too long to collect requests, reduce the target batch size. - bestBatchWaitSize-- - } else if uint(length) > bestBatchWaitSize+4 && bestBatchWaitSize < cfg.MaxBatchSize { - bestBatchWaitSize++ + } else { + avgBatchWaitSize = 0.2*float64(length) + 0.8*avgBatchWaitSize } + a.metrics.bestBatchSize.Observe(avgBatchWaitSize) + a.metrics.headArrivalInterval.Observe(headArrivalInterval.Seconds()) + a.metrics.sendLoopWaitHeadDur.Observe(headRecvTime.Sub(sendLoopStartTime).Seconds()) + a.metrics.sendLoopWaitMoreDur.Observe(time.Since(sendLoopStartTime).Seconds()) a.getClientAndSend() - metrics.TiKVBatchSendLatency.Observe(float64(time.Since(start))) + + sendLoopEndTime := time.Now() + a.metrics.sendLoopSendDur.Observe(sendLoopEndTime.Sub(sendLoopStartTime).Seconds()) + if dur := sendLoopEndTime.Sub(headRecvTime); dur > 5*time.Millisecond { + metrics.TiKVBatchSendTailLatency.Observe(dur.Seconds()) + } } } @@ -429,10 +628,12 @@ func (a *batchConn) getClientAndSend() { } defer cli.unlockForSend() available := cli.available() + reqSendTime := time.Now() batch := 0 req, forwardingReqs := a.reqBuilder.buildWithLimit(available, func(id uint64, e *batchCommandsEntry) { cli.batched.Store(id, e) cli.sent.Add(1) + atomic.StoreInt64(&e.sendLat, int64(reqSendTime.Sub(e.start))) if trace.IsEnabled() { trace.Log(e.ctx, "rpc", "send") } @@ -446,7 +647,7 @@ func (a *batchConn) getClientAndSend() { cli.send(forwardedHost, req) } if batch > 0 { - a.batchSize.Observe(float64(batch)) + a.metrics.batchSize.Observe(float64(batch)) } } @@ -490,7 +691,6 @@ type batchCommandsStream struct { } func (s *batchCommandsStream) recv() (resp *tikvpb.BatchCommandsResponse, err error) { - now := time.Now() defer func() { if r := recover(); r != nil { metrics.TiKVPanicCounter.WithLabelValues(metrics.LabelBatchRecvLoop).Inc() @@ -499,11 +699,6 @@ func (s *batchCommandsStream) recv() (resp *tikvpb.BatchCommandsResponse, err er zap.Stack("stack")) err = errors.New("batch conn recv paniced") } - if err == nil { - metrics.BatchRecvHistogramOK.Observe(float64(time.Since(now))) - } else { - metrics.BatchRecvHistogramError.Observe(float64(time.Since(now))) - } }() if _, err := util.EvalFailpoint("gotErrorInRecvLoop"); err == nil { return nil, errors.New("injected error in batchRecvLoop") @@ -567,6 +762,8 @@ type batchCommandsClient struct { // eventListener is the listener set by external code to observe some events in the client. It's stored in a atomic // pointer to make setting thread-safe. eventListener *atomic.Pointer[ClientEventListener] + + metrics *batchConnMetrics } func (c *batchCommandsClient) isStopped() bool { @@ -708,7 +905,7 @@ func (c *batchCommandsClient) recreateStreamingClientOnce(streamClient *batchCom return err } -func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransportLayerLoad *uint64, streamClient *batchCommandsStream) { +func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransportLayerLoad *uint64, connMetrics *batchConnMetrics, streamClient *batchCommandsStream) { defer func() { if r := recover(); r != nil { metrics.TiKVPanicCounter.WithLabelValues(metrics.LabelBatchRecvLoop).Inc() @@ -716,13 +913,16 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport zap.Any("r", r), zap.Stack("stack")) logutil.BgLogger().Info("restart batchRecvLoop") - go c.batchRecvLoop(cfg, tikvTransportLayerLoad, streamClient) + go c.batchRecvLoop(cfg, tikvTransportLayerLoad, connMetrics, streamClient) } }() epoch := atomic.LoadUint64(&c.epoch) for { + recvLoopStartTime := time.Now() resp, err := streamClient.recv() + respRecvTime := time.Now() + connMetrics.recvLoopRecvDur.Observe(respRecvTime.Sub(recvLoopStartTime).Seconds()) if err != nil { if c.isStopped() { return @@ -764,6 +964,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport } entry := value.(*batchCommandsEntry) + atomic.StoreInt64(&entry.recvLat, int64(respRecvTime.Sub(entry.start))) if trace.IsEnabled() { trace.Log(entry.ctx, "rpc", "received") } @@ -781,6 +982,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport // We need to consider TiKV load only if batch-wait strategy is enabled. atomic.StoreUint64(tikvTransportLayerLoad, transportLayerLoad) } + connMetrics.recvLoopProcessDur.Observe(time.Since(recvLoopStartTime).Seconds()) } } @@ -875,7 +1077,7 @@ func (c *batchCommandsClient) initBatchClient(forwardedHost string) error { } else { c.forwardedClients[forwardedHost] = streamClient } - go c.batchRecvLoop(c.tikvClientCfg, c.tikvLoad, streamClient) + go c.batchRecvLoop(c.tikvClientCfg, c.tikvLoad, c.metrics, streamClient) return nil } @@ -908,11 +1110,20 @@ func sendBatchRequest( canceled: 0, err: nil, pri: priority, + start: time.Now(), } timer := time.NewTimer(timeout) - defer timer.Stop() + defer func() { + timer.Stop() + if sendLat := atomic.LoadInt64(&entry.sendLat); sendLat > 0 { + metrics.BatchRequestDurationSend.Observe(time.Duration(sendLat).Seconds()) + } + if recvLat := atomic.LoadInt64(&entry.recvLat); recvLat > 0 { + metrics.BatchRequestDurationRecv.Observe(time.Duration(recvLat).Seconds()) + } + metrics.BatchRequestDurationDone.Observe(time.Since(entry.start).Seconds()) + }() - start := time.Now() select { case batchConn.batchCommandsCh <- entry: case <-ctx.Done(): @@ -925,8 +1136,6 @@ func sendBatchRequest( case <-timer.C: return nil, errors.WithMessage(context.DeadlineExceeded, "wait sendLoop") } - waitSendDuration := time.Since(start) - metrics.TiKVBatchWaitDuration.Observe(float64(waitSendDuration)) select { case res, ok := <-entry.res: @@ -945,8 +1154,13 @@ func sendBatchRequest( return nil, errors.New("batchConn closed") case <-timer.C: atomic.StoreInt32(&entry.canceled, 1) - reason := fmt.Sprintf("wait recvLoop timeout, timeout:%s, wait_send_duration:%s, wait_recv_duration:%s", - timeout, util.FormatDuration(waitSendDuration), util.FormatDuration(time.Since(start)-waitSendDuration)) + reason := fmt.Sprintf("wait recvLoop timeout, timeout:%s", timeout) + if sendLat := atomic.LoadInt64(&entry.sendLat); sendLat > 0 { + reason += fmt.Sprintf(", send:%s", util.FormatDuration(time.Duration(sendLat))) + if recvLat := atomic.LoadInt64(&entry.recvLat); recvLat > 0 { + reason += fmt.Sprintf(", recv:%s", util.FormatDuration(time.Duration(recvLat-sendLat))) + } + } return nil, errors.WithMessage(context.DeadlineExceeded, reason) } } diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 6098829e8..68ae0124f 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -148,7 +148,7 @@ func TestSendWhenReconnect(t *testing.T) { req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{}) _, err = rpcClient.SendRequest(context.Background(), addr, req, 5*time.Second) - require.Regexp(t, "wait recvLoop timeout, timeout:5s, wait_send_duration:.*, wait_recv_duration:.*: context deadline exceeded", err.Error()) + require.Regexp(t, "wait recvLoop timeout, timeout:5s: context deadline exceeded", err.Error()) server.Stop() } @@ -1067,3 +1067,76 @@ func TestConcurrentCloseConnPanic(t *testing.T) { }() wg.Wait() } + +func TestBatchPolicy(t *testing.T) { + t.Run(config.BatchPolicyBasic, func(t *testing.T) { + trigger, ok := newTurboBatchTriggerFromPolicy(config.BatchPolicyBasic) + require.True(t, ok) + require.False(t, trigger.turboWaitTime() > 0) + }) + t.Run(config.BatchPolicyPositive, func(t *testing.T) { + trigger, ok := newTurboBatchTriggerFromPolicy(config.BatchPolicyPositive) + require.True(t, ok) + require.Equal(t, trigger.turboWaitTime(), 100*time.Microsecond) + require.True(t, trigger.needFetchMore(time.Hour)) + require.True(t, trigger.needFetchMore(time.Millisecond)) + require.Equal(t, 8, trigger.preferredBatchWaitSize(1, 8)) + require.Equal(t, 8, trigger.preferredBatchWaitSize(1.2, 8)) + require.Equal(t, 8, trigger.preferredBatchWaitSize(1.8, 8)) + }) + t.Run(config.BatchPolicyStandard, func(t *testing.T) { + trigger, ok := newTurboBatchTriggerFromPolicy(config.BatchPolicyStandard) + require.True(t, ok) + require.Equal(t, 1, trigger.preferredBatchWaitSize(1, 8)) + require.Equal(t, 1, trigger.preferredBatchWaitSize(1.2, 8)) + require.Equal(t, 2, trigger.preferredBatchWaitSize(1.8, 8)) + require.Equal(t, trigger.turboWaitTime(), 100*time.Microsecond) + require.False(t, trigger.needFetchMore(100*time.Microsecond)) + require.False(t, trigger.needFetchMore(80*time.Microsecond)) + require.True(t, trigger.needFetchMore(10*time.Microsecond)) + require.True(t, trigger.needFetchMore(80*time.Microsecond)) + require.False(t, trigger.needFetchMore(90*time.Microsecond)) + + for i := 0; i < 50; i++ { + trigger.needFetchMore(time.Hour) + } + require.Less(t, trigger.estArrivalInterval, trigger.maxArrivalInterval) + for i := 0; i < 8; i++ { + require.False(t, trigger.needFetchMore(10*time.Microsecond)) + } + require.True(t, trigger.needFetchMore(10*time.Microsecond)) + }) + t.Run(config.BatchPolicyCustom, func(t *testing.T) { + trigger, ok := newTurboBatchTriggerFromPolicy(config.BatchPolicyCustom + " {} ") + require.True(t, ok) + require.Equal(t, trigger.opts, presetBatchPolicies[config.BatchPolicyBasic]) + + trigger, ok = newTurboBatchTriggerFromPolicy(`{"t":0.0001}`) + require.True(t, ok) + require.Equal(t, trigger.opts, presetBatchPolicies[config.BatchPolicyPositive]) + + trigger, ok = newTurboBatchTriggerFromPolicy(`{"v":1,"t":0.0001,"n":5,"w":0.2,"p":0.8,"q":0.8}`) + require.True(t, ok) + require.Equal(t, trigger.opts, presetBatchPolicies[config.BatchPolicyStandard]) + + trigger, ok = newTurboBatchTriggerFromPolicy(`{"v":2,"t":0.001,"w":0.2,"p":0.5}`) + require.True(t, ok) + require.Equal(t, 2, trigger.preferredBatchWaitSize(1, 8)) + require.Equal(t, 2, trigger.preferredBatchWaitSize(1.2, 8)) + require.Equal(t, trigger.turboWaitTime(), time.Millisecond) + require.False(t, trigger.needFetchMore(time.Millisecond-time.Microsecond)) + require.False(t, trigger.needFetchMore(time.Millisecond-time.Microsecond)) + require.False(t, trigger.needFetchMore(time.Millisecond-time.Microsecond)) + require.True(t, trigger.needFetchMore(time.Millisecond-time.Microsecond)) + require.False(t, trigger.needFetchMore(time.Millisecond)) + }) + t.Run("invalid", func(t *testing.T) { + for _, val := range []string{ + "", "invalid", "custom", "custom {x:1}", + } { + trigger, ok := newTurboBatchTriggerFromPolicy(val) + require.False(t, ok) + require.Equal(t, trigger.opts, presetBatchPolicies[config.DefBatchPolicy]) + } + }) +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 0cf74ea62..e608d800e 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -64,15 +64,19 @@ var ( TiKVLocalLatchWaitTimeHistogram prometheus.Histogram TiKVStatusDuration *prometheus.HistogramVec TiKVStatusCounter *prometheus.CounterVec - TiKVBatchWaitDuration prometheus.Histogram - TiKVBatchSendLatency prometheus.Histogram + TiKVBatchSendTailLatency prometheus.Histogram + TiKVBatchSendLoopDuration *prometheus.SummaryVec + TiKVBatchRecvLoopDuration *prometheus.SummaryVec + TiKVBatchHeadArrivalInterval *prometheus.SummaryVec + TiKVBatchBestSize *prometheus.SummaryVec + TiKVBatchMoreRequests *prometheus.SummaryVec TiKVBatchWaitOverLoad prometheus.Counter TiKVBatchPendingRequests *prometheus.HistogramVec TiKVBatchRequests *prometheus.HistogramVec + TiKVBatchRequestDuration *prometheus.SummaryVec TiKVBatchClientUnavailable prometheus.Histogram TiKVBatchClientWaitEstablish prometheus.Histogram TiKVBatchClientRecycle prometheus.Histogram - TiKVBatchRecvLatency *prometheus.HistogramVec TiKVRangeTaskStats *prometheus.GaugeVec TiKVRangeTaskPushDuration *prometheus.HistogramVec TiKVTokenWaitDuration prometheus.Histogram @@ -358,35 +362,60 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{LblResult}) - TiKVBatchWaitDuration = prometheus.NewHistogram( + TiKVBatchSendTailLatency = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "batch_wait_duration", - Buckets: prometheus.ExponentialBuckets(1, 2, 34), // 1ns ~ 8s - Help: "batch wait duration", + Name: "batch_send_tail_latency_seconds", + Buckets: prometheus.ExponentialBuckets(0.005, 2, 10), // 5ms ~ 2.56s + Help: "batch send tail latency", ConstLabels: constLabels, }) - TiKVBatchSendLatency = prometheus.NewHistogram( - prometheus.HistogramOpts{ + TiKVBatchSendLoopDuration = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "batch_send_latency", - Buckets: prometheus.ExponentialBuckets(1, 2, 34), // 1ns ~ 8s - Help: "batch send latency", + Name: "batch_send_loop_duration_seconds", + Help: "batch send loop duration breakdown by steps", ConstLabels: constLabels, - }) + }, []string{"store", "step"}) - TiKVBatchRecvLatency = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ + TiKVBatchRecvLoopDuration = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "batch_recv_latency", - Buckets: prometheus.ExponentialBuckets(1000, 2, 34), // 1us ~ 8000s - Help: "batch recv latency", + Name: "batch_recv_loop_duration_seconds", + Help: "batch recv loop duration breakdown by steps", ConstLabels: constLabels, - }, []string{LblResult}) + }, []string{"store", "step"}) + + TiKVBatchHeadArrivalInterval = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "batch_head_arrival_interval_seconds", + Help: "arrival interval of the head request in batch", + ConstLabels: constLabels, + }, []string{"store"}) + + TiKVBatchBestSize = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "batch_best_size", + Help: "best batch size estimated by the batch client", + ConstLabels: constLabels, + }, []string{"store"}) + + TiKVBatchMoreRequests = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "batch_more_requests_total", + Help: "number of requests batched by extra fetch", + ConstLabels: constLabels, + }, []string{"store"}) TiKVBatchWaitOverLoad = prometheus.NewCounter( prometheus.CounterOpts{ @@ -417,6 +446,15 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{"store"}) + TiKVBatchRequestDuration = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "batch_request_duration_seconds", + Help: "batch request duration breakdown by steps", + ConstLabels: constLabels, + }, []string{"step"}) + TiKVBatchClientUnavailable = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: namespace, @@ -839,12 +877,16 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVLocalLatchWaitTimeHistogram) prometheus.MustRegister(TiKVStatusDuration) prometheus.MustRegister(TiKVStatusCounter) - prometheus.MustRegister(TiKVBatchWaitDuration) - prometheus.MustRegister(TiKVBatchSendLatency) - prometheus.MustRegister(TiKVBatchRecvLatency) + prometheus.MustRegister(TiKVBatchSendTailLatency) + prometheus.MustRegister(TiKVBatchSendLoopDuration) + prometheus.MustRegister(TiKVBatchRecvLoopDuration) + prometheus.MustRegister(TiKVBatchHeadArrivalInterval) + prometheus.MustRegister(TiKVBatchBestSize) + prometheus.MustRegister(TiKVBatchMoreRequests) prometheus.MustRegister(TiKVBatchWaitOverLoad) prometheus.MustRegister(TiKVBatchPendingRequests) prometheus.MustRegister(TiKVBatchRequests) + prometheus.MustRegister(TiKVBatchRequestDuration) prometheus.MustRegister(TiKVBatchClientUnavailable) prometheus.MustRegister(TiKVBatchClientWaitEstablish) prometheus.MustRegister(TiKVBatchClientRecycle) diff --git a/metrics/shortcuts.go b/metrics/shortcuts.go index 1041c8087..72f27934d 100644 --- a/metrics/shortcuts.go +++ b/metrics/shortcuts.go @@ -173,6 +173,10 @@ var ( StaleReadLocalOutBytes prometheus.Counter StaleReadRemoteInBytes prometheus.Counter StaleReadRemoteOutBytes prometheus.Counter + + BatchRequestDurationSend prometheus.Observer + BatchRequestDurationRecv prometheus.Observer + BatchRequestDurationDone prometheus.Observer ) func initShortcuts() { @@ -287,8 +291,9 @@ func initShortcuts() { OnePCTxnCounterError = TiKVOnePCTxnCounter.WithLabelValues("err") OnePCTxnCounterFallback = TiKVOnePCTxnCounter.WithLabelValues("fallback") - BatchRecvHistogramOK = TiKVBatchRecvLatency.WithLabelValues("ok") - BatchRecvHistogramError = TiKVBatchRecvLatency.WithLabelValues("err") + BatchRequestDurationSend = TiKVBatchRequestDuration.WithLabelValues("send") + BatchRequestDurationRecv = TiKVBatchRequestDuration.WithLabelValues("recv") + BatchRequestDurationDone = TiKVBatchRequestDuration.WithLabelValues("done") PrewriteAssertionUsageCounterNone = TiKVPrewriteAssertionUsageCounter.WithLabelValues("none") PrewriteAssertionUsageCounterExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("exist")