diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index e9e5c6b82..fdd1a81c5 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -76,6 +76,7 @@ type batchCommandsEntry struct { err error pri uint64 + // start indicates when the batch commands entry is generated and sent to the batch conn channel. start time.Time sendLat int64 recvLat int64 @@ -389,6 +390,7 @@ func (a *batchConn) fetchMorePendingRequests( const idleTimeout = 3 * time.Minute var ( + // presetBatchPolicies defines a set of [turboBatchOptions] as batch policies. presetBatchPolicies = map[string]turboBatchOptions{ config.BatchPolicyBasic: {}, config.BatchPolicyStandard: {V: turboBatchTimeBased, T: 0.0001, N: 5, W: 0.2, P: 0.8, Q: 0.8}, @@ -402,6 +404,7 @@ const ( turboBatchProbBased ) +// turboBatchOptions defines internal options for the [turboBatchTrigger]. type turboBatchOptions struct { V int `json:"v"` N int `json:"n,omitempty"` @@ -411,6 +414,21 @@ type turboBatchOptions struct { Q float64 `json:"q,omitempty"` } +// turboBatchTrigger is used to trigger the `fetchMorePendingRequests` dynamically according to the request arrival +// intervals. The option `v` indicates the strategy of triggering: +// +// - turboBatchAlways: always fetch more requests. +// +// - turboBatchTimeBased: fetch more requests if estArrivalInterval < p * t +// where estArrivalInterval = w*reqArrivalInterval + (1-w)*estArrivalInterval +// and reqArrivalInterval = min(reqArrivalInterval, n * t) +// +// - turboBatchProbBased: fetch more requests if estFetchMoreProb > p +// where estFetchMoreProb = w*thisProb + (1-w)*estFetchMoreProb +// and thisProb = reqArrivalInterval < t ? 1 : 0 +// +// The option `q` is used to adjust the `batchWaitSize` dynamically. If the fractional part of the `avgBatchWaitSize` is +// greater or equal to `q`, the `batchWaitSize` will be increased by 1. type turboBatchTrigger struct { opts turboBatchOptions