From d5d9eabe8d22ff7999970c803b9b075459c8021f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 22 May 2024 14:58:17 +0800 Subject: [PATCH 01/11] controller: Reduce the frequency of printing logs (#8160) (#8191) close tikv/pd#8159 Signed-off-by: ti-chi-bot Signed-off-by: nolouch Co-authored-by: ShuNing Co-authored-by: nolouch --- client/resource_group/controller/limiter.go | 23 ++++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index 61c2350153b..f509acbc991 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -329,6 +329,8 @@ func (lim *Limiter) AvailableTokens(now time.Time) float64 { return tokens } +const reserveWarnLogInterval = 10 * time.Millisecond + // reserveN is a helper method for Reserve. // maxFutureReserve specifies the maximum reservation wait duration allowed. // reserveN returns Reservation, not *Reservation. @@ -374,14 +376,19 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur lim.tokens = tokens lim.maybeNotify() } else { - log.Warn("[resource group controller] cannot reserve enough tokens", - zap.Duration("need-wait-duration", waitDuration), - zap.Duration("max-wait-duration", maxFutureReserve), - zap.Float64("current-ltb-tokens", lim.tokens), - zap.Float64("current-ltb-rate", float64(lim.limit)), - zap.Float64("request-tokens", n), - zap.Int64("burst", lim.burst), - zap.Int("remaining-notify-times", lim.remainingNotifyTimes)) + // print log if the limiter cannot reserve for a while. + if time.Since(lim.last) > reserveWarnLogInterval { + log.Warn("[resource group controller] cannot reserve enough tokens", + zap.Duration("need-wait-duration", waitDuration), + zap.Duration("max-wait-duration", maxFutureReserve), + zap.Float64("current-ltb-tokens", lim.tokens), + zap.Float64("current-ltb-rate", float64(lim.limit)), + zap.Float64("request-tokens", n), + zap.Float64("notify-threshold", lim.notifyThreshold), + zap.Bool("is-low-process", lim.isLowProcess), + zap.Int64("burst", lim.burst), + zap.Int("remaining-notify-times", lim.remainingNotifyTimes)) + } lim.last = last if lim.limit == 0 { lim.notify() From 9d9b9448fc990f88679f53994392a7f24263b38f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 31 May 2024 17:19:22 +0800 Subject: [PATCH 02/11] controller: fix error retry and add more metrics (#8219) (#8232) close tikv/pd#8217 controller: fix error retry and add more metrics Signed-off-by: ti-chi-bot Signed-off-by: nolouch Co-authored-by: ShuNing Co-authored-by: nolouch --- .../resource_group/controller/controller.go | 71 ++++++++++++------- .../controller/controller_test.go | 15 ++++ client/resource_group/controller/limiter.go | 27 ++++++- client/resource_group/controller/metrics.go | 20 ++++-- 4 files changed, 102 insertions(+), 31 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 0c8eaeac828..1834e509696 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -491,7 +491,7 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex request := gc.collectRequestAndConsumption(typ) if request != nil { c.run.currentRequests = append(c.run.currentRequests, request) - gc.tokenRequestCounter.Inc() + gc.metrics.tokenRequestCounter.Inc() } return true }) @@ -576,13 +576,9 @@ type groupCostController struct { calculators []ResourceCalculator handleRespFunc func(*rmpb.TokenBucketResponse) - successfulRequestDuration prometheus.Observer - failedLimitReserveDuration prometheus.Observer - requestRetryCounter prometheus.Counter - failedRequestCounter prometheus.Counter - tokenRequestCounter prometheus.Counter - - mu struct { + // metrics + metrics *groupMetricsCollection + mu struct { sync.Mutex consumption *rmpb.Consumption storeCounter map[uint64]*rmpb.Consumption @@ -629,6 +625,30 @@ type groupCostController struct { tombstone bool } +type groupMetricsCollection struct { + successfulRequestDuration prometheus.Observer + failedLimitReserveDuration prometheus.Observer + requestRetryCounter prometheus.Counter + failedRequestCounterWithOthers prometheus.Counter + failedRequestCounterWithThrottled prometheus.Counter + tokenRequestCounter prometheus.Counter +} + +func initMetrics(name string) *groupMetricsCollection { + const ( + otherType = "others" + throttledType = "throttled" + ) + return &groupMetricsCollection{ + successfulRequestDuration: successfulRequestDuration.WithLabelValues(name), + failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(name), + failedRequestCounterWithOthers: failedRequestCounter.WithLabelValues(name, otherType), + failedRequestCounterWithThrottled: failedRequestCounter.WithLabelValues(name, throttledType), + requestRetryCounter: requestRetryCounter.WithLabelValues(name), + tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(name), + } +} + type tokenCounter struct { getTokenBucketFunc func() *rmpb.TokenBucket @@ -669,16 +689,13 @@ func newGroupCostController( default: return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type") } + ms := initMetrics(group.Name) gc := &groupCostController{ - meta: group, - name: group.Name, - mainCfg: mainCfg, - mode: group.GetMode(), - successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name), - failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(group.Name), - failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name), - requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name), - tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name), + meta: group, + name: group.Name, + mainCfg: mainCfg, + mode: group.GetMode(), + metrics: ms, calculators: []ResourceCalculator{ newKVCalculator(mainCfg), newSQLCalculator(mainCfg), @@ -733,7 +750,7 @@ func (gc *groupCostController) initRunState() { case rmpb.GroupMode_RUMode: gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter) for typ := range requestUnitLimitTypeList { - limiter := NewLimiterWithCfg(now, cfgFunc(getRUTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan) + limiter := NewLimiterWithCfg(gc.name, now, cfgFunc(getRUTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan) counter := &tokenCounter{ limiter: limiter, avgRUPerSec: 0, @@ -747,7 +764,7 @@ func (gc *groupCostController) initRunState() { case rmpb.GroupMode_RawMode: gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter) for typ := range requestResourceLimitTypeList { - limiter := NewLimiterWithCfg(now, cfgFunc(getRawResourceTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan) + limiter := NewLimiterWithCfg(gc.name, now, cfgFunc(getRawResourceTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan) counter := &tokenCounter{ limiter: limiter, avgRUPerSec: 0, @@ -1176,7 +1193,7 @@ func (gc *groupCostController) onRequestWait( res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } - if d, err = WaitReservations(ctx, now, res); err == nil { + if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) { break retryLoop } case rmpb.GroupMode_RUMode: @@ -1186,17 +1203,19 @@ func (gc *groupCostController) onRequestWait( res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } - if d, err = WaitReservations(ctx, now, res); err == nil { + if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) { break retryLoop } } - gc.requestRetryCounter.Inc() + gc.metrics.requestRetryCounter.Inc() time.Sleep(retryInterval) } if err != nil { - gc.failedRequestCounter.Inc() - if d.Seconds() > 0 { - gc.failedLimitReserveDuration.Observe(d.Seconds()) + if errs.ErrClientResourceGroupThrottled.Equal(err) { + gc.metrics.failedRequestCounterWithThrottled.Inc() + gc.metrics.failedLimitReserveDuration.Observe(d.Seconds()) + } else { + gc.metrics.failedRequestCounterWithOthers.Inc() } gc.mu.Lock() sub(gc.mu.consumption, delta) @@ -1206,7 +1225,7 @@ func (gc *groupCostController) onRequestWait( }) return nil, nil, err } - gc.successfulRequestDuration.Observe(d.Seconds()) + gc.metrics.successfulRequestDuration.Observe(d.Seconds()) } gc.mu.Lock() diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 165d501ddb1..3696dcba845 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -26,6 +26,7 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/stretchr/testify/require" + "github.com/tikv/pd/client/errs" ) func createTestGroupCostController(re *require.Assertions) *groupCostController { @@ -112,3 +113,17 @@ func TestRequestAndResponseConsumption(t *testing.T) { re.Equal(expectedConsumption.TotalCpuTimeMs, consumption.TotalCpuTimeMs, caseNum) } } + +func TestResourceGroupThrottledError(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + gc.initRunState() + req := &TestRequestInfo{ + isWrite: true, + writeBytes: 10000000, + } + // The group is throttled + _, _, err := gc.onRequestWait(context.TODO(), req) + re.Error(err) + re.True(errs.ErrClientResourceGroupThrottled.Equal(err)) +} diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index f509acbc991..bb1bc18dbfc 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -26,6 +26,7 @@ import ( "time" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/errs" "go.uber.org/zap" ) @@ -81,6 +82,15 @@ type Limiter struct { isLowProcess bool // remainingNotifyTimes is used to limit notify when the speed limit is already set. remainingNotifyTimes int + name string + + // metrics + metrics *limiterMetricsCollection +} + +// limiterMetricsCollection is a collection of metrics for a limiter. +type limiterMetricsCollection struct { + lowTokenNotifyCounter prometheus.Counter } // Limit returns the maximum overall event rate. @@ -106,8 +116,9 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify // NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter { +func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter { lim := &Limiter{ + name: name, limit: Limit(cfg.NewRate), last: now, tokens: cfg.NewTokens, @@ -115,6 +126,9 @@ func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensN notifyThreshold: cfg.NotifyThreshold, lowTokensNotifyChan: lowTokensNotifyChan, } + lim.metrics = &limiterMetricsCollection{ + lowTokenNotifyCounter: lowTokenRequestNotifyCounter.WithLabelValues(lim.name), + } log.Debug("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim))) return lim } @@ -223,6 +237,14 @@ func (lim *Limiter) SetupNotificationThreshold(now time.Time, threshold float64) lim.notifyThreshold = threshold } +// SetName sets the name of the limiter. +func (lim *Limiter) SetName(name string) *Limiter { + lim.mu.Lock() + defer lim.mu.Unlock() + lim.name = name + return lim +} + // notify tries to send a non-blocking notification on notifyCh and disables // further notifications (until the next Reconfigure or StartNotification). func (lim *Limiter) notify() { @@ -233,6 +255,9 @@ func (lim *Limiter) notify() { lim.isLowProcess = true select { case lim.lowTokensNotifyChan <- struct{}{}: + if lim.metrics != nil { + lim.metrics.lowTokenNotifyCounter.Inc() + } default: } } diff --git a/client/resource_group/controller/metrics.go b/client/resource_group/controller/metrics.go index 7e6a559265b..a340cb93a0d 100644 --- a/client/resource_group/controller/metrics.go +++ b/client/resource_group/controller/metrics.go @@ -21,7 +21,9 @@ const ( requestSubsystem = "request" tokenRequestSubsystem = "token_request" - resourceGroupNameLabel = "name" + resourceGroupNameLabel = "name" + newResourceGroupNameLabel = "resource_group" + errType = "type" ) var ( @@ -38,7 +40,7 @@ var ( Namespace: namespace, Subsystem: requestSubsystem, Name: "success", - Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30 + Buckets: []float64{0.0005, .005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30, 60, 600, 1800, 3600}, // 0.0005 ~ 1h Help: "Bucketed histogram of wait duration of successful request.", }, []string{resourceGroupNameLabel}) @@ -47,7 +49,7 @@ var ( Namespace: namespace, Subsystem: requestSubsystem, Name: "limit_reserve_time_failed", - Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30 + Buckets: []float64{0.0005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30, 60, 600, 1800, 3600, 86400}, // 0.0005 ~ 24h Help: "Bucketed histogram of wait duration of failed request.", }, []string{resourceGroupNameLabel}) @@ -57,7 +59,7 @@ var ( Subsystem: requestSubsystem, Name: "fail", Help: "Counter of failed request.", - }, []string{resourceGroupNameLabel}) + }, []string{resourceGroupNameLabel, errType}) requestRetryCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -71,6 +73,7 @@ var ( prometheus.HistogramOpts{ Namespace: namespace, Subsystem: tokenRequestSubsystem, + Buckets: prometheus.ExponentialBuckets(0.001, 2, 13), // 1ms ~ 8s Name: "duration", Help: "Bucketed histogram of latency(s) of token request.", }, []string{"type"}) @@ -82,6 +85,14 @@ var ( Name: "resource_group", Help: "Counter of token request by every resource group.", }, []string{resourceGroupNameLabel}) + + lowTokenRequestNotifyCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: tokenRequestSubsystem, + Name: "low_token_notified", + Help: "Counter of low token request.", + }, []string{newResourceGroupNameLabel}) ) var ( @@ -98,4 +109,5 @@ func init() { prometheus.MustRegister(requestRetryCounter) prometheus.MustRegister(tokenRequestDuration) prometheus.MustRegister(resourceGroupTokenRequestCounter) + prometheus.MustRegister(lowTokenRequestNotifyCounter) } From 8ed3b31ce7f5ed829e7797cbcb13679be9e0970f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 7 Jun 2024 13:34:27 +0800 Subject: [PATCH 03/11] chore: add prow OWNERS files to control the approvals for critical configuration files (#8218) (#8250) close tikv/pd#8167 Signed-off-by: wuhuizuo Co-authored-by: wuhuizuo --- OWNERS_ALIASES | 6 ++++++ client/resource_group/controller/OWNERS | 7 +++++++ client/tlsutil/OWNERS | 7 +++++++ conf/OWNERS | 7 +++++++ pkg/encryption/OWNERS | 7 +++++++ pkg/mcs/resourcemanager/server/OWNERS | 7 +++++++ pkg/mcs/scheduling/server/config/OWNERS | 7 +++++++ pkg/mcs/tso/server/OWNERS | 7 +++++++ pkg/schedule/config/OWNERS | 7 +++++++ pkg/schedule/schedulers/OWNERS | 7 +++++++ server/config/OWNERS | 7 +++++++ 11 files changed, 76 insertions(+) create mode 100644 OWNERS_ALIASES create mode 100644 client/resource_group/controller/OWNERS create mode 100644 client/tlsutil/OWNERS create mode 100644 conf/OWNERS create mode 100644 pkg/encryption/OWNERS create mode 100644 pkg/mcs/resourcemanager/server/OWNERS create mode 100644 pkg/mcs/scheduling/server/config/OWNERS create mode 100644 pkg/mcs/tso/server/OWNERS create mode 100644 pkg/schedule/config/OWNERS create mode 100644 pkg/schedule/schedulers/OWNERS create mode 100644 server/config/OWNERS diff --git a/OWNERS_ALIASES b/OWNERS_ALIASES new file mode 100644 index 00000000000..516a466c91e --- /dev/null +++ b/OWNERS_ALIASES @@ -0,0 +1,6 @@ +# Sort the member alphabetically. +aliases: + sig-critical-approvers-config: + - easonn7 + - kevin-xianliu + - niubell diff --git a/client/resource_group/controller/OWNERS b/client/resource_group/controller/OWNERS new file mode 100644 index 00000000000..aa02465dbd9 --- /dev/null +++ b/client/resource_group/controller/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|config\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/client/tlsutil/OWNERS b/client/tlsutil/OWNERS new file mode 100644 index 00000000000..211db06feee --- /dev/null +++ b/client/tlsutil/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|tlsconfig\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/conf/OWNERS b/conf/OWNERS new file mode 100644 index 00000000000..1a435c49089 --- /dev/null +++ b/conf/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|config\\.toml)$": + approvers: + - sig-critical-approvers-config diff --git a/pkg/encryption/OWNERS b/pkg/encryption/OWNERS new file mode 100644 index 00000000000..aa02465dbd9 --- /dev/null +++ b/pkg/encryption/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|config\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/pkg/mcs/resourcemanager/server/OWNERS b/pkg/mcs/resourcemanager/server/OWNERS new file mode 100644 index 00000000000..aa02465dbd9 --- /dev/null +++ b/pkg/mcs/resourcemanager/server/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|config\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/pkg/mcs/scheduling/server/config/OWNERS b/pkg/mcs/scheduling/server/config/OWNERS new file mode 100644 index 00000000000..aa02465dbd9 --- /dev/null +++ b/pkg/mcs/scheduling/server/config/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|config\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/pkg/mcs/tso/server/OWNERS b/pkg/mcs/tso/server/OWNERS new file mode 100644 index 00000000000..aa02465dbd9 --- /dev/null +++ b/pkg/mcs/tso/server/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|config\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/pkg/schedule/config/OWNERS b/pkg/schedule/config/OWNERS new file mode 100644 index 00000000000..ce5d15ddc19 --- /dev/null +++ b/pkg/schedule/config/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|(config|store_config)\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/pkg/schedule/schedulers/OWNERS b/pkg/schedule/schedulers/OWNERS new file mode 100644 index 00000000000..ae96e4f1f42 --- /dev/null +++ b/pkg/schedule/schedulers/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|hot_region_config\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/server/config/OWNERS b/server/config/OWNERS new file mode 100644 index 00000000000..179de4843e6 --- /dev/null +++ b/server/config/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|(config|service_middleware_config)\\.go)$": + approvers: + - sig-critical-approvers-config From 7f4c846a32f4df3b86dd14df19696bf63caf611f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 11 Jun 2024 18:24:00 +0800 Subject: [PATCH 04/11] OWNERS: Auto Sync OWNERS files from community membership (#8163) (#8273) Signed-off-by: Ti Chi Robot --- OWNERS | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 OWNERS diff --git a/OWNERS b/OWNERS new file mode 100644 index 00000000000..5911dfd3b66 --- /dev/null +++ b/OWNERS @@ -0,0 +1,26 @@ +# See the OWNERS docs at https://go.k8s.io/owners +approvers: + - AndreMouche + - binshi-bing + - bufferflies + - CabinfeverB + - Connor1996 + - disksing + - huachaohuang + - HunDunDM + - HuSharp + - JmPotato + - lhy1024 + - nolouch + - overvenus + - qiuyesuifeng + - rleungx + - siddontang + - Yisaer + - zhouqiang-cl +reviewers: + - BusyJay + - howardlau1999 + - Luffbee + - shafreeck + - xhebox From 22c5c9ee64cac3da40890763626c7d394708fc40 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 17 Jun 2024 11:40:16 +0800 Subject: [PATCH 05/11] rc: fix group change will meet data race (#8268) (#8294) close tikv/pd#8267 Signed-off-by: husharp Co-authored-by: husharp --- pkg/mcs/resource_manager/server/manager.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/mcs/resource_manager/server/manager.go b/pkg/mcs/resource_manager/server/manager.go index 8e4b23c8c9f..fee844e6e96 100644 --- a/pkg/mcs/resource_manager/server/manager.go +++ b/pkg/mcs/resource_manager/server/manager.go @@ -120,7 +120,9 @@ func (m *Manager) Init(ctx context.Context) error { return err } // Load resource group meta info from storage. + m.Lock() m.groups = make(map[string]*ResourceGroup) + m.Unlock() handler := func(k, v string) { group := &rmpb.ResourceGroup{} if err := proto.Unmarshal([]byte(v), group); err != nil { From 50c70afbbfb8afded1ad1b85bf139f9f2b1520f1 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 26 Jun 2024 10:14:51 +0800 Subject: [PATCH 06/11] config: fix the panic caused by zero RegionSplitSizeMB (#8324) (#8328) close tikv/pd#8323 Bypass the case that `RegionSplitSizeMB` might be zero in `(*StoreConfig) CheckRegionSize`. Signed-off-by: ti-chi-bot Signed-off-by: JmPotato Co-authored-by: JmPotato --- pkg/utils/typeutil/size_test.go | 16 ++++++++++++---- server/config/store_config.go | 9 +++++++-- server/config/store_config_test.go | 6 ++++++ 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/pkg/utils/typeutil/size_test.go b/pkg/utils/typeutil/size_test.go index 57c246953e4..f05e1521281 100644 --- a/pkg/utils/typeutil/size_test.go +++ b/pkg/utils/typeutil/size_test.go @@ -40,6 +40,8 @@ func TestSizeJSON(t *testing.T) { } func TestParseMbFromText(t *testing.T) { + const defaultValue = 2 + t.Parallel() re := require.New(t) testCases := []struct { @@ -47,18 +49,24 @@ func TestParseMbFromText(t *testing.T) { size uint64 }{{ body: []string{"10Mib", "10MiB", "10M", "10MB"}, - size: uint64(10), + size: 10, }, { body: []string{"10GiB", "10Gib", "10G", "10GB"}, - size: uint64(10 * units.GiB / units.MiB), + size: 10 * units.GiB / units.MiB, + }, { + body: []string{"1024KiB", "1048576"}, + size: 1, + }, { + body: []string{"100KiB", "1023KiB", "1048575", "0"}, + size: 0, }, { body: []string{"10yiB", "10aib"}, - size: uint64(1), + size: defaultValue, }} for _, testCase := range testCases { for _, b := range testCase.body { - re.Equal(int(testCase.size), int(ParseMBFromText(b, 1))) + re.Equal(testCase.size, ParseMBFromText(b, defaultValue)) } } } diff --git a/server/config/store_config.go b/server/config/store_config.go index 8f4baa91522..b7ad34a6b51 100644 --- a/server/config/store_config.go +++ b/server/config/store_config.go @@ -164,9 +164,14 @@ func (c *StoreConfig) CheckRegionSize(size, mergeSize uint64) error { if size < c.GetRegionMaxSize() { return nil } - + // This could happen when the region split size is set to a value less than 1MiB, + // which is a very extreme case, we just pass the check here to prevent panic. + regionSplitSize := c.GetRegionSplitSize() + if regionSplitSize == 0 { + return nil + } // the smallest of the split regions can not be merge again, so it's size should less merge size. - if smallSize := size % c.GetRegionSplitSize(); smallSize <= mergeSize && smallSize != 0 { + if smallSize := size % regionSplitSize; smallSize <= mergeSize && smallSize != 0 { log.Debug("region size is too small", zap.Uint64("size", size), zap.Uint64("merge-size", mergeSize), zap.Uint64("small-size", smallSize)) return errs.ErrCheckerMergeAgain.FastGenByArgs("the smallest region of the split regions is less than max-merge-region-size, " + "it will be merged again") diff --git a/server/config/store_config_test.go b/server/config/store_config_test.go index 05fb43ab5f6..b78ddd42377 100644 --- a/server/config/store_config_test.go +++ b/server/config/store_config_test.go @@ -23,6 +23,7 @@ import ( "github.com/docker/go-units" "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/utils/typeutil" ) func TestTiKVConfig(t *testing.T) { @@ -165,4 +166,9 @@ func TestMergeCheck(t *testing.T) { re.Error(config.CheckRegionKeys(v.keys, v.mergeKeys)) } } + // Test CheckRegionSize when the region split size is 0. + config.RegionSplitSize = "100KiB" + config.RegionSplitSizeMB = typeutil.ParseMBFromText(config.RegionSplitSize, defaultRegionSplitSize) + re.Empty(config.GetRegionSplitSize()) + re.NoError(config.CheckRegionSize(defaultRegionMaxSize, 50)) } From d8883142bb47a89819b96127f6f6c074aeb90d57 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 4 Jul 2024 15:41:58 +0800 Subject: [PATCH 07/11] client/controller: record context error and add slowlog about token bucket (#8344) (#8354) close tikv/pd#8343, ref tikv/pd#8349 client/controller: record context error and add slowlog about token bucket - record low process start time, and log it if it's too slow - record the context error Signed-off-by: ti-chi-bot Signed-off-by: Shuning Chen Co-authored-by: ShuNing Co-authored-by: Shuning Chen --- .../resource_group/controller/controller.go | 47 +++++++++++-------- .../controller/controller_test.go | 2 +- client/resource_group/controller/limiter.go | 43 ++++++++++------- .../resource_group/controller/limiter_test.go | 21 +++++++-- 4 files changed, 72 insertions(+), 41 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 1834e509696..29f0853cb92 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -36,12 +36,13 @@ import ( ) const ( - controllerConfigPath = "resource_group/controller" - maxRetry = 10 - retryInterval = 50 * time.Millisecond - maxNotificationChanLen = 200 - needTokensAmplification = 1.1 - trickleReserveDuration = 1250 * time.Millisecond + controllerConfigPath = "resource_group/controller" + maxRetry = 10 + retryInterval = 50 * time.Millisecond + maxNotificationChanLen = 200 + needTokensAmplification = 1.1 + trickleReserveDuration = 1250 * time.Millisecond + slowNotifyFilterDuration = 10 * time.Millisecond watchRetryInterval = 30 * time.Second ) @@ -115,7 +116,7 @@ type ResourceGroupsController struct { calculators []ResourceCalculator // When a signal is received, it means the number of available token is low. - lowTokenNotifyChan chan struct{} + lowTokenNotifyChan chan notifyMsg // When a token bucket response received from server, it will be sent to the channel. tokenResponseChan chan []*rmpb.TokenBucketResponse // When the token bucket of a resource group is updated, it will be sent to the channel. @@ -157,7 +158,7 @@ func NewResourceGroupController( clientUniqueID: clientUniqueID, provider: provider, ruConfig: ruConfig, - lowTokenNotifyChan: make(chan struct{}, 1), + lowTokenNotifyChan: make(chan notifyMsg, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), opts: opts, @@ -262,7 +263,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */) + c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */, notifyMsg{}) } case <-watchRetryTimer.C: if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil { @@ -300,11 +301,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.handleTokenBucketResponse(resp) } c.run.currentRequests = nil - case <-c.lowTokenNotifyChan: + case notifyMsg := <-c.lowTokenNotifyChan: c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */) + c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg) } if c.run.inDegradedMode { c.executeOnAllGroups((*groupCostController).applyDegradedMode) @@ -484,7 +485,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB } } -func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType) { +func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType, notifyMsg notifyMsg) { c.run.currentRequests = make([]*rmpb.TokenBucketRequest, 0) c.groupsController.Range(func(name, value any) bool { gc := value.(*groupCostController) @@ -496,11 +497,11 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex return true }) if len(c.run.currentRequests) > 0 { - c.sendTokenBucketRequests(ctx, c.run.currentRequests, source) + c.sendTokenBucketRequests(ctx, c.run.currentRequests, source, notifyMsg) } } -func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string) { +func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string, notifyMsg notifyMsg) { now := time.Now() req := &rmpb.TokenBucketsRequest{ Requests: requests, @@ -518,13 +519,16 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, if err != nil { // Don't log any errors caused by the stopper canceling the context. if !errors.ErrorEqual(err, context.Canceled) { - log.L().Sugar().Infof("[resource group controller] token bucket rpc error: %v", err) + log.Error("[resource group controller] token bucket rpc error", zap.Error(err)) } resp = nil failedTokenRequestDuration.Observe(latency.Seconds()) } else { successfulTokenRequestDuration.Observe(latency.Seconds()) } + if !notifyMsg.startTime.IsZero() && time.Since(notifyMsg.startTime) > slowNotifyFilterDuration { + log.Warn("[resource group controller] slow token bucket request", zap.String("source", source), zap.Duration("cost", time.Since(notifyMsg.startTime))) + } logControllerTrace("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency)) c.tokenResponseChan <- resp }() @@ -588,7 +592,7 @@ type groupCostController struct { // fast path to make once token limit with un-limit burst. burstable *atomic.Bool - lowRUNotifyChan chan<- struct{} + lowRUNotifyChan chan<- notifyMsg tokenBucketUpdateChan chan<- *groupCostController // run contains the state that is updated by the main loop. @@ -678,7 +682,7 @@ type tokenCounter struct { func newGroupCostController( group *rmpb.ResourceGroup, mainCfg *RUConfig, - lowRUNotifyChan chan struct{}, + lowRUNotifyChan chan notifyMsg, tokenBucketUpdateChan chan *groupCostController, ) (*groupCostController, error) { switch group.Mode { @@ -797,7 +801,7 @@ func (gc *groupCostController) updateRunState() { } *gc.run.consumption = *gc.mu.consumption gc.mu.Unlock() - logControllerTrace("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption)) + logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption)) gc.run.now = newTime } @@ -997,7 +1001,7 @@ func (gc *groupCostController) applyBasicConfigForRUTokenCounters() { cfg.NewRate = 99999999 }) counter.limiter.Reconfigure(gc.run.now, cfg, resetLowProcess()) - log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("resource-group", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)])) + log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)])) } } @@ -1051,6 +1055,9 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket timerDuration = (trickleDuration + trickleReserveDuration) / 2 } counter.notify.mu.Lock() + if counter.notify.setupNotificationTimer != nil { + counter.notify.setupNotificationTimer.Stop() + } counter.notify.setupNotificationTimer = time.NewTimer(timerDuration) counter.notify.setupNotificationCh = counter.notify.setupNotificationTimer.C counter.notify.setupNotificationThreshold = 1 @@ -1221,7 +1228,7 @@ func (gc *groupCostController) onRequestWait( sub(gc.mu.consumption, delta) gc.mu.Unlock() failpoint.Inject("triggerUpdate", func() { - gc.lowRUNotifyChan <- struct{}{} + gc.lowRUNotifyChan <- notifyMsg{} }) return nil, nil, err } diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 3696dcba845..fbd3ab0548f 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -41,7 +41,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController }, }, } - ch1 := make(chan struct{}) + ch1 := make(chan notifyMsg) ch2 := make(chan *groupCostController) gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2) re.NoError(err) diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index bb1bc18dbfc..1d250c91214 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -75,7 +75,7 @@ type Limiter struct { // last is the last time the limiter's tokens field was updated last time.Time notifyThreshold float64 - lowTokensNotifyChan chan<- struct{} + lowTokensNotifyChan chan<- notifyMsg // To prevent too many chan sent, the notifyThreshold is set to 0 after notify. // So the notifyThreshold cannot show whether the limiter is in the low token state, // isLowProcess is used to check it. @@ -88,6 +88,11 @@ type Limiter struct { metrics *limiterMetricsCollection } +// notifyMsg is a message to notify the low token state. +type notifyMsg struct { + startTime time.Time +} + // limiterMetricsCollection is a collection of metrics for a limiter. type limiterMetricsCollection struct { lowTokenNotifyCounter prometheus.Counter @@ -102,7 +107,7 @@ func (lim *Limiter) Limit() Limit { // NewLimiter returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- struct{}) *Limiter { +func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- notifyMsg) *Limiter { lim := &Limiter{ limit: r, last: now, @@ -116,7 +121,7 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify // NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter { +func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- notifyMsg) *Limiter { lim := &Limiter{ name: name, limit: Limit(cfg.NewRate), @@ -136,13 +141,14 @@ func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArg // A Reservation holds information about events that are permitted by a Limiter to happen after a delay. // A Reservation may be canceled, which may enable the Limiter to permit additional events. type Reservation struct { - ok bool - lim *Limiter - tokens float64 - timeToAct time.Time - needWaitDurtion time.Duration + ok bool + lim *Limiter + tokens float64 + timeToAct time.Time + needWaitDuration time.Duration // This is the Limit at reservation time, it can change later. limit Limit + err error } // OK returns whether the limiter can provide the requested number of tokens @@ -217,7 +223,8 @@ func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now select { case <-ctx.Done(): return &Reservation{ - ok: false, + ok: false, + err: ctx.Err(), } default: } @@ -254,7 +261,7 @@ func (lim *Limiter) notify() { lim.notifyThreshold = 0 lim.isLowProcess = true select { - case lim.lowTokensNotifyChan <- struct{}{}: + case lim.lowTokensNotifyChan <- notifyMsg{startTime: time.Now()}: if lim.metrics != nil { lim.metrics.lowTokenNotifyCounter.Inc() } @@ -386,10 +393,10 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur // Prepare reservation r := Reservation{ - ok: ok, - lim: lim, - limit: lim.limit, - needWaitDurtion: waitDuration, + ok: ok, + lim: lim, + limit: lim.limit, + needWaitDuration: waitDuration, } if ok { r.tokens = n @@ -412,7 +419,8 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur zap.Float64("notify-threshold", lim.notifyThreshold), zap.Bool("is-low-process", lim.isLowProcess), zap.Int64("burst", lim.burst), - zap.Int("remaining-notify-times", lim.remainingNotifyTimes)) + zap.Int("remaining-notify-times", lim.remainingNotifyTimes), + zap.String("name", lim.name)) } lim.last = last if lim.limit == 0 { @@ -493,7 +501,10 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv for _, res := range reservations { if !res.ok { cancel() - return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled + if res.err != nil { + return res.needWaitDuration, res.err + } + return res.needWaitDuration, errs.ErrClientResourceGroupThrottled } delay := res.DelayFrom(now) if delay > longestDelayDuration { diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index d8703a1e674..ba4771810d8 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -83,7 +83,7 @@ func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected flo } func TestSimpleReserve(t *testing.T) { - lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1)) + lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1)) runReserveMax(t, lim, request{t0, 3, t1, true}) runReserveMax(t, lim, request{t0, 3, t4, true}) @@ -103,7 +103,7 @@ func TestSimpleReserve(t *testing.T) { func TestReconfig(t *testing.T) { re := require.New(t) - lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1)) + lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1)) runReserveMax(t, lim, request{t0, 4, t2, true}) args := tokenBucketReconfigureArgs{ @@ -118,7 +118,7 @@ func TestReconfig(t *testing.T) { } func TestNotify(t *testing.T) { - nc := make(chan struct{}, 1) + nc := make(chan notifyMsg, 1) lim := NewLimiter(t0, 1, 0, 0, nc) args := tokenBucketReconfigureArgs{ @@ -139,7 +139,7 @@ func TestCancel(t *testing.T) { ctx := context.Background() ctx1, cancel1 := context.WithDeadline(ctx, t2) re := require.New(t) - nc := make(chan struct{}, 1) + nc := make(chan notifyMsg, 1) lim1 := NewLimiter(t0, 1, 0, 10, nc) lim2 := NewLimiter(t0, 1, 0, 0, nc) @@ -177,3 +177,16 @@ func TestCancel(t *testing.T) { checkTokens(re, lim1, t5, 15) checkTokens(re, lim2, t5, 5) } + +func TestCancelErrorOfReservation(t *testing.T) { + re := require.New(t) + nc := make(chan notifyMsg, 1) + lim := NewLimiter(t0, 10, 0, 10, nc) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + r := lim.Reserve(ctx, InfDuration, t0, 5) + d, err := WaitReservations(context.Background(), t0, []*Reservation{r}) + re.Equal(0*time.Second, d) + re.Error(err) + re.Contains(err.Error(), "context canceled") +} From 9a296de213c71c117c47c1112976e249770e6b24 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 4 Jul 2024 17:19:28 +0800 Subject: [PATCH 08/11] resource_control: allow configuration of the maximum retry time for the local bucket (#8352) (#8364) close tikv/pd#8349 resource_control: allow configuration of the maximum retry time for the local bucket - Added config `ltb-token-rpc-max-delay` - Increased default max delay from 500ms to 1s Signed-off-by: nolouch Co-authored-by: nolouch --- client/resource_group/controller/config.go | 72 +++++++++++++++++-- .../resource_group/controller/controller.go | 36 +++++++--- pkg/mcs/resource_manager/server/config.go | 25 +++++-- .../resource_manager/server/config_test.go | 8 ++- .../resource_manager/resource_manager_test.go | 24 +++++-- .../resource_manager_command_test.go | 10 +-- 6 files changed, 138 insertions(+), 37 deletions(-) diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index fe30cc445fd..8bcebf09485 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -53,6 +53,12 @@ const ( defaultTargetPeriod = 5 * time.Second // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. defaultMaxWaitDuration = 30 * time.Second + // defaultLTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. + defaultLTBTokenRPCMaxDelay = 1 * time.Second + // defaultWaitRetryTimes is the times to retry when waiting for the token. + defaultWaitRetryTimes = 20 + // defaultWaitRetryInterval is the interval to retry when waiting for the token. + defaultWaitRetryInterval = 50 * time.Millisecond ) const ( @@ -67,17 +73,35 @@ const ( // Because the resource manager has not been deployed in microservice mode, // do not enable this function. - defaultDegradedModeWaitDuration = 0 + defaultDegradedModeWaitDuration = time.Duration(0) ) -// Config is the configuration of the resource manager controller which includes some option for client needed. -type Config struct { +// TokenRPCParams is the parameters for local bucket RPC. +type TokenRPCParams struct { + // WaitRetryInterval is the interval to retry when waiting for the token. + WaitRetryInterval Duration `toml:"wait-retry-interval" json:"wait-retry-interval"` + + // WaitRetryTimes is the times to retry when waiting for the token. + WaitRetryTimes int `toml:"wait-retry-times" json:"wait-retry-times"` +} + +// LocalBucketConfig is the configuration for local bucket. not export to server side. +type LocalBucketConfig struct { + TokenRPCParams `toml:"token-rpc-params" json:"token-rpc-params"` +} + +// BaseConfig is the configuration of the resource manager controller which includes some option for client needed. +// TODO: unified the configuration for client and server, server side in pkg/mcs/resourcemanger/config.go. +type BaseConfig struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` // LTBMaxWaitDuration is the max wait time duration for local token bucket. LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` + // LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. + LTBTokenRPCMaxDelay Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"` + // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` @@ -86,13 +110,43 @@ type Config struct { EnableControllerTraceLog bool `toml:"enable-controller-trace-log" json:"enable-controller-trace-log,string"` } +// Config is the configuration of the resource manager controller. +type Config struct { + BaseConfig + LocalBucketConfig +} + +// Adjust adjusts the configuration. +func (c *Config) Adjust() { + // valid the configuration, TODO: separately add the valid function. + if c.BaseConfig.LTBMaxWaitDuration.Duration == 0 { + c.BaseConfig.LTBMaxWaitDuration = NewDuration(defaultMaxWaitDuration) + } + if c.LocalBucketConfig.WaitRetryInterval.Duration == 0 { + c.LocalBucketConfig.WaitRetryInterval = NewDuration(defaultWaitRetryInterval) + } + // adjust the client settings. calculate the retry times. + if int(c.BaseConfig.LTBTokenRPCMaxDelay.Duration) != int(c.LocalBucketConfig.WaitRetryInterval.Duration)*c.LocalBucketConfig.WaitRetryTimes { + c.LocalBucketConfig.WaitRetryTimes = int(c.BaseConfig.LTBTokenRPCMaxDelay.Duration / c.LocalBucketConfig.WaitRetryInterval.Duration) + } +} + // DefaultConfig returns the default resource manager controller configuration. func DefaultConfig() *Config { return &Config{ - DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), - LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration), - RequestUnit: DefaultRequestUnitConfig(), - EnableControllerTraceLog: false, + BaseConfig: BaseConfig{ + DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), + RequestUnit: DefaultRequestUnitConfig(), + EnableControllerTraceLog: false, + LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration), + LTBTokenRPCMaxDelay: NewDuration(defaultLTBTokenRPCMaxDelay), + }, + LocalBucketConfig: LocalBucketConfig{ + TokenRPCParams: TokenRPCParams{ + WaitRetryInterval: NewDuration(defaultWaitRetryInterval), + WaitRetryTimes: defaultWaitRetryTimes, + }, + }, } } @@ -140,6 +194,8 @@ type RUConfig struct { // some config for client LTBMaxWaitDuration time.Duration + WaitRetryInterval time.Duration + WaitRetryTimes int DegradedModeWaitDuration time.Duration } @@ -159,6 +215,8 @@ func GenerateRUConfig(config *Config) *RUConfig { WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte), CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost), LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration, + WaitRetryInterval: config.WaitRetryInterval.Duration, + WaitRetryTimes: config.WaitRetryTimes, DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration, } } diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 29f0853cb92..06e9bb780ef 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -37,8 +37,6 @@ import ( const ( controllerConfigPath = "resource_group/controller" - maxRetry = 10 - retryInterval = 50 * time.Millisecond maxNotificationChanLen = 200 needTokensAmplification = 1.1 trickleReserveDuration = 1250 * time.Millisecond @@ -101,6 +99,20 @@ func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption { } } +// WithWaitRetryInterval is the option to set the retry interval when waiting for the token. +func WithWaitRetryInterval(d time.Duration) ResourceControlCreateOption { + return func(controller *ResourceGroupsController) { + controller.ruConfig.WaitRetryInterval = d + } +} + +// WithWaitRetryTimes is the option to set the times to retry when waiting for the token. +func WithWaitRetryTimes(times int) ResourceControlCreateOption { + return func(controller *ResourceGroupsController) { + controller.ruConfig.WaitRetryTimes = times + } +} + var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil) // ResourceGroupsController implements ResourceGroupKVInterceptor. @@ -169,6 +181,7 @@ func NewResourceGroupController( log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig)) controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)} controller.safeRuConfig.Store(controller.ruConfig) + enableControllerTraceLog.Store(config.EnableControllerTraceLog) return controller, nil } @@ -177,12 +190,14 @@ func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Con if err != nil { return nil, err } - if len(resp.Kvs) == 0 { + config := DefaultConfig() + defer config.Adjust() + kvs := resp.GetKvs() + if len(kvs) == 0 { log.Warn("[resource group controller] server does not save config, load config failed") - return DefaultConfig(), nil + return config, nil } - config := &Config{} - err = json.Unmarshal(resp.Kvs[0].GetValue(), config) + err = json.Unmarshal(kvs[0].GetValue(), config) if err != nil { return nil, err } @@ -284,7 +299,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchRetryTimer.Reset(watchRetryInterval) } } - case <-emergencyTokenAcquisitionTicker.C: c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) /* channels */ @@ -362,10 +376,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } for _, item := range resp { cfgRevision = item.Kv.ModRevision - config := &Config{} + config := DefaultConfig() if err := json.Unmarshal(item.Kv.Value, config); err != nil { continue } + config.Adjust() c.ruConfig = GenerateRUConfig(config) // Stay compatible with serverless @@ -379,7 +394,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig)) } - case gc := <-c.tokenBucketUpdateChan: now := gc.run.now go gc.handleTokenBucketUpdateEvent(c.loopCtx, now) @@ -1191,7 +1205,7 @@ func (gc *groupCostController) onRequestWait( var i int var d time.Duration retryLoop: - for i = 0; i < maxRetry; i++ { + for i = 0; i < gc.mainCfg.WaitRetryTimes; i++ { switch gc.mode { case rmpb.GroupMode_RawMode: res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) @@ -1215,7 +1229,7 @@ func (gc *groupCostController) onRequestWait( } } gc.metrics.requestRetryCounter.Inc() - time.Sleep(retryInterval) + time.Sleep(gc.mainCfg.WaitRetryInterval) } if err != nil { if errs.ErrClientResourceGroupThrottled.Equal(err) { diff --git a/pkg/mcs/resource_manager/server/config.go b/pkg/mcs/resource_manager/server/config.go index b3a29446b4f..2c45ce5c355 100644 --- a/pkg/mcs/resource_manager/server/config.go +++ b/pkg/mcs/resource_manager/server/config.go @@ -53,6 +53,8 @@ const ( defaultDegradedModeWaitDuration = time.Second * 0 // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. defaultMaxWaitDuration = 30 * time.Second + // defaultLTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. + defaultLTBTokenRPCMaxDelay = 1 * time.Second ) // Config is the configuration for the resource manager. @@ -90,6 +92,9 @@ type ControllerConfig struct { // LTBMaxWaitDuration is the max wait time duration for local token bucket. LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` + // LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. + LTBTokenRPCMaxDelay typeutil.Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"` + // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` @@ -103,10 +108,16 @@ func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData) { if rmc == nil { return } - rmc.RequestUnit.Adjust() - - configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration) - configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration) + rmc.RequestUnit.Adjust(meta.Child("request-unit")) + if !meta.IsDefined("degraded-mode-wait-duration") { + configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration) + } + if !meta.IsDefined("ltb-max-wait-duration") { + configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration) + } + if !meta.IsDefined("ltb-token-rpc-max-delay") { + configutil.AdjustDuration(&rmc.LTBTokenRPCMaxDelay, defaultLTBTokenRPCMaxDelay) + } failpoint.Inject("enableDegradedMode", func() { configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, time.Second) }) @@ -131,7 +142,7 @@ type RequestUnitConfig struct { } // Adjust adjusts the configuration and initializes it with the default value if necessary. -func (ruc *RequestUnitConfig) Adjust() { +func (ruc *RequestUnitConfig) Adjust(_ *configutil.ConfigMetaData) { if ruc == nil { return } @@ -182,11 +193,11 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error { configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr") configutil.AdjustCommandlineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr") - return c.Adjust(meta, false) + return c.Adjust(meta) } // Adjust is used to adjust the resource manager configurations. -func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { +func (c *Config) Adjust(meta *toml.MetaData) error { configMetaData := configutil.NewConfigMetadata(meta) warningMsgs := make([]string, 0) if err := configMetaData.CheckUndecoded(); err != nil { diff --git a/pkg/mcs/resource_manager/server/config_test.go b/pkg/mcs/resource_manager/server/config_test.go index dd8dd2d2814..ae9dfc2cad3 100644 --- a/pkg/mcs/resource_manager/server/config_test.go +++ b/pkg/mcs/resource_manager/server/config_test.go @@ -28,6 +28,7 @@ func TestControllerConfig(t *testing.T) { cfgData := ` [controller] ltb-max-wait-duration = "60s" +ltb-token-rpc-max-delay = "500ms" degraded-mode-wait-duration = "2s" [controller.request-unit] read-base-cost = 1.0 @@ -39,11 +40,12 @@ read-cpu-ms-cost = 5.0 cfg := NewConfig() meta, err := toml.Decode(cfgData, &cfg) re.NoError(err) - err = cfg.Adjust(&meta, false) + err = cfg.Adjust(&meta) re.NoError(err) - re.Equal(cfg.Controller.DegradedModeWaitDuration.Duration, time.Second*2) - re.Equal(cfg.Controller.LTBMaxWaitDuration.Duration, time.Second*60) + re.Equal(2*time.Second, cfg.Controller.DegradedModeWaitDuration.Duration) + re.Equal(60*time.Second, cfg.Controller.LTBMaxWaitDuration.Duration) + re.Equal(500*time.Millisecond, cfg.Controller.LTBTokenRPCMaxDelay.Duration) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.CPUMsCost-5), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteCostPerByte-4), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteBaseCost-3), 1e-7) diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 17d1a3090c4..e34d00a1ea5 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -33,6 +33,7 @@ import ( "github.com/tikv/pd/client/resource_group/controller" "github.com/tikv/pd/pkg/mcs/resource_manager/server" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/tests" "go.uber.org/goleak" @@ -1235,16 +1236,24 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh configURL := "/resource-manager/api/v1/config/controller" waitDuration := 10 * time.Second + tokenRPCMaxDelay := 2 * time.Second readBaseCost := 1.5 defaultCfg := controller.DefaultConfig() - // failpoint enableDegradedMode will setup and set it be 1s. - defaultCfg.DegradedModeWaitDuration.Duration = time.Second + expectCfg := server.ControllerConfig{ + // failpoint enableDegradedMode will setup and set it be 1s. + DegradedModeWaitDuration: typeutil.NewDuration(time.Second), + LTBMaxWaitDuration: typeutil.Duration(defaultCfg.LTBMaxWaitDuration), + LTBTokenRPCMaxDelay: typeutil.Duration(defaultCfg.LTBTokenRPCMaxDelay), + RequestUnit: server.RequestUnitConfig(defaultCfg.RequestUnit), + EnableControllerTraceLog: defaultCfg.EnableControllerTraceLog, + } expectRUCfg := controller.GenerateRUConfig(defaultCfg) + expectRUCfg.DegradedModeWaitDuration = time.Second // initial config verification respString := sendRequest("GET", getAddr()+configURL, nil) - defaultString, err := json.Marshal(defaultCfg) + expectStr, err := json.Marshal(expectCfg) re.NoError(err) - re.JSONEq(string(respString), string(defaultString)) + re.JSONEq(string(respString), string(expectStr)) re.EqualValues(expectRUCfg, c1.GetConfig()) testCases := []struct { @@ -1257,6 +1266,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh value: waitDuration, expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = waitDuration }, }, + { + configJSON: fmt.Sprintf(`{"ltb-token-rpc-max-delay": "%v"}`, tokenRPCMaxDelay), + value: waitDuration, + expected: func(ruConfig *controller.RUConfig) { + ruConfig.WaitRetryTimes = int(tokenRPCMaxDelay / ruConfig.WaitRetryInterval) + }, + }, { configJSON: fmt.Sprintf(`{"ltb-max-wait-duration": "%v"}`, waitDuration), value: waitDuration, diff --git a/tests/pdctl/resourcemanager/resource_manager_command_test.go b/tests/pdctl/resourcemanager/resource_manager_command_test.go index d44edec2d10..d4416172fbf 100644 --- a/tests/pdctl/resourcemanager/resource_manager_command_test.go +++ b/tests/pdctl/resourcemanager/resource_manager_command_test.go @@ -56,7 +56,7 @@ func (s *testResourceManagerSuite) TearDownSuite() { } func (s *testResourceManagerSuite) TestConfigController() { - expectCfg := server.ControllerConfig{} + expectCfg := server.Config{} expectCfg.Adjust(nil) // Show controller config checkShow := func() { @@ -67,7 +67,7 @@ func (s *testResourceManagerSuite) TestConfigController() { actualCfg := server.ControllerConfig{} err = json.Unmarshal(output, &actualCfg) s.Nil(err) - s.Equal(expectCfg, actualCfg) + s.Equal(expectCfg.Controller, actualCfg) } // Check default config @@ -78,20 +78,20 @@ func (s *testResourceManagerSuite) TestConfigController() { output, err := pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) s.Nil(err) s.Contains(string(output), "Success!") - expectCfg.LTBMaxWaitDuration = typeutil.Duration{Duration: 1 * time.Hour} + expectCfg.Controller.LTBMaxWaitDuration = typeutil.Duration{Duration: 1 * time.Hour} checkShow() args = []string{"-u", s.pdAddr, "resource-manager", "config", "controller", "set", "enable-controller-trace-log", "true"} output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) s.Nil(err) s.Contains(string(output), "Success!") - expectCfg.EnableControllerTraceLog = true + expectCfg.Controller.EnableControllerTraceLog = true checkShow() args = []string{"-u", s.pdAddr, "resource-manager", "config", "controller", "set", "write-base-cost", "2"} output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) s.Nil(err) s.Contains(string(output), "Success!") - expectCfg.RequestUnit.WriteBaseCost = 2 + expectCfg.Controller.RequestUnit.WriteBaseCost = 2 checkShow() } From 05207d6819fb14095aecd4017099da9d1b3c0c1f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 8 Jul 2024 15:51:02 +0800 Subject: [PATCH 09/11] controller: fix the low_ru request missed (#8368) (#8371) close tikv/pd#8349 controller: fix the low_ru request missed The problem is that `c.run.currentRequests` is shared by all groups. If one group triggers a token request that isn't handled by the response, the other group's requests will be discarded. Here, we do not discard the low_ru triggers. Signed-off-by: ti-chi-bot Signed-off-by: nolouch Co-authored-by: ShuNing Co-authored-by: nolouch --- client/go.mod | 1 + client/go.sum | 1 + .../resource_group/controller/controller.go | 12 +- .../controller/controller_test.go | 139 ++++++++++++++++++ .../resource_group/controller/limiter_test.go | 15 +- 5 files changed, 164 insertions(+), 4 deletions(-) diff --git a/client/go.mod b/client/go.mod index dcbbd02ad9b..62ffc7e7a04 100644 --- a/client/go.mod +++ b/client/go.mod @@ -31,6 +31,7 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.13.0 // indirect diff --git a/client/go.sum b/client/go.sum index 8b35ed71bd8..d945175c38e 100644 --- a/client/go.sum +++ b/client/go.sum @@ -118,6 +118,7 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 06e9bb780ef..84629090e04 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -318,9 +318,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { case notifyMsg := <-c.lowTokenNotifyChan: c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) - if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg) - } + c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg) if c.run.inDegradedMode { c.executeOnAllGroups((*groupCostController).applyDegradedMode) } @@ -1136,11 +1134,19 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType switch selectTyp { case periodicReport: selected = selected || gc.shouldReportConsumption() + failpoint.Inject("triggerPeriodicReport", func(val failpoint.Value) { + selected = gc.name == val.(string) + }) fallthrough case lowToken: if counter.limiter.IsLowTokens() { selected = true } + failpoint.Inject("triggerLowRUReport", func(val failpoint.Value) { + if selectTyp == lowToken { + selected = gc.name == val.(string) + } + }) } request := &rmpb.RequestUnitItem{ Type: typ, diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index fbd3ab0548f..cf209afed27 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -24,8 +24,12 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/meta_storagepb" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" ) @@ -127,3 +131,138 @@ func TestResourceGroupThrottledError(t *testing.T) { re.Error(err) re.True(errs.ErrClientResourceGroupThrottled.Equal(err)) } + +// MockResourceGroupProvider is a mock implementation of the ResourceGroupProvider interface. +type MockResourceGroupProvider struct { + mock.Mock +} + +func (m *MockResourceGroupProvider) GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error) { + args := m.Called(ctx, resourceGroupName) + return args.Get(0).(*rmpb.ResourceGroup), args.Error(1) +} + +func (m *MockResourceGroupProvider) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { + args := m.Called(ctx) + return args.Get(0).([]*rmpb.ResourceGroup), args.Error(1) +} + +func (m *MockResourceGroupProvider) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) { + args := m.Called(ctx, metaGroup) + return args.String(0), args.Error(1) +} + +func (m *MockResourceGroupProvider) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) { + args := m.Called(ctx, metaGroup) + return args.String(0), args.Error(1) +} + +func (m *MockResourceGroupProvider) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) { + args := m.Called(ctx, resourceGroupName) + return args.String(0), args.Error(1) +} + +func (m *MockResourceGroupProvider) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) { + args := m.Called(ctx, request) + return args.Get(0).([]*rmpb.TokenBucketResponse), args.Error(1) +} + +func (m *MockResourceGroupProvider) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) { + args := m.Called(ctx) + return args.Get(0).([]*rmpb.ResourceGroup), args.Get(1).(int64), args.Error(2) +} + +func (m *MockResourceGroupProvider) Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) { + args := m.Called(ctx, key, opts) + return args.Get(0).(chan []*meta_storagepb.Event), args.Error(1) +} + +func (m *MockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) { + args := m.Called(ctx, key, opts) + return args.Get(0).(*meta_storagepb.GetResponse), args.Error(1) +} + +func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockProvider := new(MockResourceGroupProvider) + + mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil) + // LoadResourceGroups + mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil) + // Watch + mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil) + + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", "default"))) + defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport") + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport", fmt.Sprintf("return(\"%s\")", "test-group"))) + defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport") + + controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil) + controller.Start(ctx) + + defaultResourceGroup := &rmpb.ResourceGroup{Name: "default", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} + testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} + mockProvider.On("GetResourceGroup", mock.Anything, "default", mock.Anything).Return(defaultResourceGroup, nil) + mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil) + + c1, err := controller.tryGetResourceGroup(ctx, "default") + re.NoError(err) + re.Equal(defaultResourceGroup, c1.meta) + + c2, err := controller.tryGetResourceGroup(ctx, "test-group") + re.NoError(err) + re.Equal(testResourceGroup, c2.meta) + + var expectResp []*rmpb.TokenBucketResponse + recTestGroupAcquireTokenRequest := make(chan bool) + mockProvider.On("AcquireTokenBuckets", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + request := args.Get(1).(*rmpb.TokenBucketsRequest) + var responses []*rmpb.TokenBucketResponse + for _, req := range request.Requests { + if req.ResourceGroupName == "default" { + // no response the default group request, that's mean `len(c.run.currentRequests) != 0` always. + time.Sleep(100 * time.Second) + responses = append(responses, &rmpb.TokenBucketResponse{ + ResourceGroupName: "default", + GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{ + { + GrantedTokens: &rmpb.TokenBucket{ + Tokens: 100000, + }, + }, + }, + }) + } else { + responses = append(responses, &rmpb.TokenBucketResponse{ + ResourceGroupName: req.ResourceGroupName, + GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{ + { + GrantedTokens: &rmpb.TokenBucket{ + Tokens: 100000, + }, + }, + }, + }) + } + } + // receive test-group request + if len(request.Requests) == 1 && request.Requests[0].ResourceGroupName == "test-group" { + recTestGroupAcquireTokenRequest <- true + } + expectResp = responses + }).Return(expectResp, nil) + // wait default group request token by PeriodicReport. + time.Sleep(2 * time.Second) + counter := c2.run.requestUnitTokens[0] + counter.limiter.mu.Lock() + counter.limiter.notify() + counter.limiter.mu.Unlock() + select { + case res := <-recTestGroupAcquireTokenRequest: + re.True(res) + case <-time.After(5 * time.Second): + re.Fail("timeout") + } +} diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index ba4771810d8..8854d4b2803 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -44,6 +44,18 @@ var ( t8 = t0.Add(time.Duration(8) * d) ) +func resetTime() { + t0 = time.Now() + t1 = t0.Add(time.Duration(1) * d) + t2 = t0.Add(time.Duration(2) * d) + t3 = t0.Add(time.Duration(3) * d) + t4 = t0.Add(time.Duration(4) * d) + t5 = t0.Add(time.Duration(5) * d) + t6 = t0.Add(time.Duration(6) * d) + t7 = t0.Add(time.Duration(7) * d) + t8 = t0.Add(time.Duration(8) * d) +} + type request struct { t time.Time n float64 @@ -136,6 +148,7 @@ func TestNotify(t *testing.T) { } func TestCancel(t *testing.T) { + resetTime() ctx := context.Background() ctx1, cancel1 := context.WithDeadline(ctx, t2) re := require.New(t) @@ -153,8 +166,8 @@ func TestCancel(t *testing.T) { checkTokens(re, lim1, t2, 7) checkTokens(re, lim2, t2, 2) d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2}) - re.Equal(d, 4*time.Second) re.Error(err) + re.Equal(4*time.Second, d) checkTokens(re, lim1, t3, 13) checkTokens(re, lim2, t3, 3) cancel1() From 0adedca0a4e4ad2234ec90a54e37f6e4c4694597 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 25 Jul 2024 12:00:06 +0800 Subject: [PATCH 10/11] controller: fix limiter cannot work well in high concurrency scenario (#8436) (#8439) close tikv/pd#8435 controller: Fix limiter not functioning well in high concurrency scenarios - In high concurrency scenarios, time may appear rollback because the `now` value passed from outside. high mutext completion leading to more non-sequential execution orders. - Time rollback allows advancing more tokens, which can cause the issue. even result in no limit for the controller. - Fix the problem by avoiding time rollback; instead of acquiring time again within the lock to fix it, as this might incur high costs when frequently acquiring time. Signed-off-by: nolouch Co-authored-by: nolouch --- client/resource_group/controller/limiter.go | 16 +++- .../resource_group/controller/limiter_test.go | 79 +++++++++++++++++++ 2 files changed, 91 insertions(+), 4 deletions(-) diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index 1d250c91214..576475008af 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -334,7 +334,7 @@ func (lim *Limiter) Reconfigure(now time.Time, ) { lim.mu.Lock() defer lim.mu.Unlock() - logControllerTrace("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst)) + logControllerTrace("[resource group controller] before reconfigure", zap.String("name", lim.name), zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst)) if args.NewBurst < 0 { lim.last = now lim.tokens = args.NewTokens @@ -350,7 +350,7 @@ func (lim *Limiter) Reconfigure(now time.Time, opt(lim) } lim.maybeNotify() - logControllerTrace("[resource group controller] after reconfigure", zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst)) + logControllerTrace("[resource group controller] after reconfigure", zap.String("name", lim.name), zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst)) } // AvailableTokens decreases the amount of tokens currently available. @@ -361,6 +361,14 @@ func (lim *Limiter) AvailableTokens(now time.Time) float64 { return tokens } +func (lim *Limiter) updateLast(t time.Time) { + // make sure lim.last is monotonic + // see issue: https://github.com/tikv/pd/issues/8435. + if lim.last.Before(t) { + lim.last = t + } +} + const reserveWarnLogInterval = 10 * time.Millisecond // reserveN is a helper method for Reserve. @@ -404,7 +412,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur } // Update state if ok { - lim.last = now + lim.updateLast(now) lim.tokens = tokens lim.maybeNotify() } else { @@ -422,7 +430,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur zap.Int("remaining-notify-times", lim.remainingNotifyTimes), zap.String("name", lim.name)) } - lim.last = last + lim.updateLast(last) if lim.limit == 0 { lim.notify() } else if lim.remainingNotifyTimes > 0 { diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index 8854d4b2803..6dfe4d1a32e 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -20,8 +20,10 @@ package controller import ( "context" + "fmt" "math" "sync" + "sync/atomic" "testing" "time" @@ -203,3 +205,80 @@ func TestCancelErrorOfReservation(t *testing.T) { re.Error(err) re.Contains(err.Error(), "context canceled") } + +func TestQPS(t *testing.T) { + re := require.New(t) + cases := []struct { + concurrency int + reserveN int64 + ruPerSec int64 + }{ + {1000, 10, 400000}, + } + + for _, tc := range cases { + t.Run(fmt.Sprintf("concurrency=%d,reserveN=%d,limit=%d", tc.concurrency, tc.reserveN, tc.ruPerSec), func(t *testing.T) { + qps, ruSec, waitTime := testQPSCase(tc.concurrency, tc.reserveN, tc.ruPerSec) + t.Log(fmt.Printf("QPS: %.2f, RU: %.2f, new request need wait %s\n", qps, ruSec, waitTime)) + re.LessOrEqual(math.Abs(float64(tc.ruPerSec)-ruSec), float64(100)*float64(tc.reserveN)) + re.LessOrEqual(math.Abs(float64(tc.ruPerSec)/float64(tc.reserveN)-qps), float64(100)) + }) + } +} + +const testCaseRunTime = 4 * time.Second + +func testQPSCase(concurrency int, reserveN int64, limit int64) (qps float64, ru float64, needWait time.Duration) { + nc := make(chan notifyMsg, 1) + lim := NewLimiter(time.Now(), Limit(limit), limit, float64(limit), nc) + ctx, cancel := context.WithCancel(context.Background()) + + var wg sync.WaitGroup + var totalRequests int64 + start := time.Now() + + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + } + r := lim.Reserve(context.Background(), 30*time.Second, time.Now(), float64(reserveN)) + if r.OK() { + delay := r.DelayFrom(time.Now()) + <-time.After(delay) + } else { + panic("r not ok") + } + atomic.AddInt64(&totalRequests, 1) + } + }() + } + var vQPS atomic.Value + var wait time.Duration + ch := make(chan struct{}) + go func() { + var windowRequests int64 + for { + elapsed := time.Since(start) + if elapsed >= testCaseRunTime { + close(ch) + break + } + windowRequests = atomic.SwapInt64(&totalRequests, 0) + vQPS.Store(float64(windowRequests)) + r := lim.Reserve(ctx, 30*time.Second, time.Now(), float64(reserveN)) + wait = r.Delay() + time.Sleep(1 * time.Second) + } + }() + <-ch + cancel() + wg.Wait() + qps = vQPS.Load().(float64) + return qps, qps * float64(reserveN), wait +} From fb162bf0aa3f0de23a303aa25d8869f3c593645d Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 25 Jul 2024 15:07:35 +0800 Subject: [PATCH 11/11] *: fix the initialization of encryption key manager (#8383) (#8390) close tikv/pd#8384 Signed-off-by: ti-chi-bot Signed-off-by: Ryan Leung Co-authored-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/server.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/server/server.go b/server/server.go index 5830f2e0a87..2943640eefd 100644 --- a/server/server.go +++ b/server/server.go @@ -425,6 +425,10 @@ func (s *Server) startServer(ctx context.Context) error { Label: idAllocLabel, Member: s.member.MemberValue(), }) + s.encryptionKeyManager, err = encryption.NewManager(s.client, &s.cfg.Security.Encryption) + if err != nil { + return err + } regionStorage, err := storage.NewStorageWithLevelDBBackend(ctx, filepath.Join(s.cfg.DataDir, "region-meta"), s.encryptionKeyManager) if err != nil { return err @@ -449,11 +453,6 @@ func (s *Server) startServer(ctx context.Context) error { } } - s.encryptionKeyManager, err = encryption.NewManager(s.client, &s.cfg.Security.Encryption) - if err != nil { - return err - } - s.gcSafePointManager = gc.NewSafePointManager(s.storage) s.basicCluster = core.NewBasicCluster() s.cluster = cluster.NewRaftCluster(ctx, s.clusterID, syncer.NewRegionSyncer(s), s.client, s.httpClient)