Skip to content

Commit

Permalink
improve batch efficiency for high throughput workloads (#1411)
Browse files Browse the repository at this point in the history
* improve batch efficiency by multiple attempts

Signed-off-by: zyguan <[email protected]>

* add support for aggressive batching

Signed-off-by: zyguan <[email protected]>

* add some metrics for batch client

Signed-off-by: zyguan <[email protected]>

* fetch more requests according to recent wait head durs

Signed-off-by: zyguan <[email protected]>

* add experimental batch options

Signed-off-by: zyguan <[email protected]>

* optimize and refactor

Signed-off-by: zyguan <[email protected]>

* some minor updates

Signed-off-by: zyguan <[email protected]>

* fix the metric of head arrival interval

Signed-off-by: zyguan <[email protected]>

* a minor update

Signed-off-by: zyguan <[email protected]>

* some minor fixes

Signed-off-by: zyguan <[email protected]>

* update according to the spec

Signed-off-by: zyguan <[email protected]>

* fix the ut

Signed-off-by: zyguan <[email protected]>

* fix batch condition

Signed-off-by: zyguan <[email protected]>

* add some doc comments

Signed-off-by: zyguan <[email protected]>

* address #1411 (comment)

Signed-off-by: zyguan <[email protected]>

* rename some vars according to the comments

Signed-off-by: zyguan <[email protected]>

* add more comments

Signed-off-by: zyguan <[email protected]>

---------

Signed-off-by: zyguan <[email protected]>
  • Loading branch information
zyguan authored Aug 13, 2024
1 parent f0f57f2 commit 4c6b217
Show file tree
Hide file tree
Showing 6 changed files with 422 additions and 72 deletions.
16 changes: 16 additions & 0 deletions config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ const (
DefGrpcInitialWindowSize = 1 << 27 // 128MiB
DefGrpcInitialConnWindowSize = 1 << 27 // 128MiB
DefMaxConcurrencyRequestLimit = math.MaxInt64
DefBatchPolicy = BatchPolicyStandard
)

const (
// BatchPolicyBasic is the basic batch policy whose behavior is consistent with versions before v8.3.0.
BatchPolicyBasic = "basic"
// BatchPolicyStandard dynamically batches requests based the arrival time intervals of recent requests.
BatchPolicyStandard = "standard"
// BatchPolicyPositive always performs additional batching.
BatchPolicyPositive = "positive"
// BatchPolicyCustom allows users to customize the internal batch options.
BatchPolicyCustom = "custom"
)

// TiKVClient is the config for tikv client.
Expand All @@ -72,6 +84,9 @@ type TiKVClient struct {
// CommitTimeout is the max time which command 'commit' will wait.
CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"`
AsyncCommit AsyncCommit `toml:"async-commit" json:"async-commit"`

// BatchPolicy is the policy for batching requests.
BatchPolicy string `toml:"batch-policy" json:"batch-policy"`
// MaxBatchSize is the max batch size when calling batch commands API.
MaxBatchSize uint `toml:"max-batch-size" json:"max-batch-size"`
// If TiKV load is greater than this, TiDB will wait for a while to avoid little batch.
Expand Down Expand Up @@ -153,6 +168,7 @@ func DefaultTiKVClient() TiKVClient {
AllowedClockDrift: 500 * time.Millisecond,
},

BatchPolicy: DefBatchPolicy,
MaxBatchSize: 128,
OverloadThreshold: 200,
MaxBatchWaitTime: 0,
Expand Down
4 changes: 2 additions & 2 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
allowBatch := (cfg.TiKVClient.MaxBatchSize > 0) && enableBatch
if allowBatch {
a.batchConn = newBatchConn(uint(len(a.v)), cfg.TiKVClient.MaxBatchSize, idleNotify)
a.pendingRequests = metrics.TiKVBatchPendingRequests.WithLabelValues(a.target)
a.batchSize = metrics.TiKVBatchRequests.WithLabelValues(a.target)
a.batchConn.initMetrics(a.target)
}
keepAlive := cfg.TiKVClient.GrpcKeepAliveTime
keepAliveTimeout := cfg.TiKVClient.GrpcKeepAliveTimeout
Expand Down Expand Up @@ -365,6 +364,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
dialTimeout: a.dialTimeout,
tryLock: tryLock{sync.NewCond(new(sync.Mutex)), false},
eventListener: eventListener,
metrics: &a.metrics,
}
batchClient.maxConcurrencyRequestLimit.Store(cfg.TiKVClient.MaxConcurrencyRequestLimit)
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
Expand Down
Loading

0 comments on commit 4c6b217

Please sign in to comment.