diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index cd2a099..0eaa968 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -9,7 +9,7 @@ jobs: - uses: actions/checkout@v3 - name: Check License Header - uses: apache/skywalking-eyes/header@main + uses: apache/skywalking-eyes/header@v0.4.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/README.md b/README.md index de39d04..fed44ed 100644 --- a/README.md +++ b/README.md @@ -288,7 +288,24 @@ The echo method uses the following configuration (0.3, 100) and other methods us } } ``` +##### Degradation: Category=degradation + +| Variable | Introduction | +|------------|------------------------------------| +| enable | Whether to enable degradation | +| percentage | The percentage of dropped requests | + +Example: + +> configPath: /KitexConfig/ClientName/ServiceName/degradation +```json +{ + "enable": true, + "percentage": 30 +} +``` +Note: Degradation is not enabled by default. ### More Info Refer to [example](https://github.com/kitex-contrib/config-consul/tree/main/example) for more usage. diff --git a/README_CN.md b/README_CN.md index 01b2bb5..bb7e87b 100644 --- a/README_CN.md +++ b/README_CN.md @@ -289,6 +289,25 @@ echo 方法使用下面的配置(0.3、100),其他方法使用全局默认 } ``` +##### 降级: Category=degradation + +| 参数 | 说明 | +|------------|----------| +| enable | 是否开启降级策略 | +| percentage | 丢弃请求的比例 | + +例子: + +> configPath: /KitexConfig/ClientName/ServiceName/degradation + +```json +{ + "enable": true, + "percentage": 30 +} +``` + +注:默认不开启降级(enable为false) ### 更多信息 更多示例请参考 [example](https://github.com/kitex-contrib/config-consul/tree/main/example) diff --git a/check_branch_name.sh b/check_branch_name.sh old mode 100755 new mode 100644 diff --git a/client/degradation.go b/client/degradation.go new file mode 100644 index 0000000..05958e4 --- /dev/null +++ b/client/degradation.go @@ -0,0 +1,62 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "github.com/cloudwego/kitex/client" + "github.com/cloudwego/kitex/pkg/klog" + "github.com/kitex-contrib/config-consul/consul" + "github.com/kitex-contrib/config-consul/pkg/degradation" + "github.com/kitex-contrib/config-consul/utils" +) + +func WithDegradation(dest, src string, consulClient consul.Client, uniqueID int64, opts utils.Options) []client.Option { + param, err := consulClient.ClientConfigParam(&consul.ConfigParamConfig{ + Category: degradationConfigName, + ServerServiceName: dest, + ClientServiceName: src, + }) + if err != nil { + panic(err) + } + for _, f := range opts.ConsulCustomFunctions { + f(¶m) + } + key := param.Prefix + "/" + param.Path + container := initDegradationOptions(param.Type, key, dest, uniqueID, consulClient) + return []client.Option{ + client.WithACLRules(container.GetAclRule()), + client.WithCloseCallbacks(func() error { + // cancel the configuration listener when client is closed. + consulClient.DeregisterConfig(key, uniqueID) + return nil + }), + } +} + +func initDegradationOptions(configType consul.ConfigType, key, dest string, uniqueID int64, consulClient consul.Client) *degradation.DegradationContainer { + container := degradation.NewDegradationContainer() + onChangeCallback := func(data string, parser consul.ConfigParser) { + config := °radation.DegradationConfig{} + err := parser.Decode(configType, data, config) + if err != nil { + klog.Warnf("[consul] %s server consul degradation config: unmarshal data %s failed: %s, skip...", key, data, err) + return + } + container.NotifyPolicyChange(config) + } + consulClient.RegisterConfigCallback(key, uniqueID, onChangeCallback) + return container +} diff --git a/client/suite.go b/client/suite.go index b9b549a..e7681e7 100644 --- a/client/suite.go +++ b/client/suite.go @@ -25,6 +25,7 @@ const ( retryConfigName = "retry" rpcTimeoutConfigName = "rpc_timeout" circuitBreakerConfigName = "circuit_break" + degradationConfigName = "degradation" ) type ConsulClientSuite struct { @@ -59,5 +60,6 @@ func (s *ConsulClientSuite) Options() []client.Option { opts = append(opts, WithCircuitBreaker(s.service, s.client, s.consulClient, s.uid, s.opts)...) opts = append(opts, WithRetryPolicy(s.service, s.client, s.consulClient, s.uid, s.opts)...) opts = append(opts, WithRPCTimeout(s.service, s.client, s.consulClient, s.uid, s.opts)...) + opts = append(opts, WithDegradation(s.service, s.client, s.consulClient, s.uid, s.opts)...) return opts } diff --git a/pkg/degradation/item_degradation.go b/pkg/degradation/item_degradation.go new file mode 100644 index 0000000..21ae40b --- /dev/null +++ b/pkg/degradation/item_degradation.go @@ -0,0 +1,81 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package degradation + +import ( + "context" + "errors" + "sync/atomic" + + "github.com/bytedance/gopkg/lang/fastrand" + "github.com/cloudwego/configmanager/iface" + "github.com/cloudwego/kitex/pkg/acl" +) + +var errRejected = errors.New("rejected by client degradation config") + +var defaultDegradationConfig = &DegradationConfig{ + Enable: false, + Percentage: 0, +} + +type DegradationConfig struct { + Enable bool `json:"enable"` + Percentage int `json:"percentage"` +} + +// DeepCopy returns a copy of the current DegradationConfig +func (c *DegradationConfig) DeepCopy() iface.ConfigValueItem { + result := &DegradationConfig{ + Enable: c.Enable, + Percentage: c.Percentage, + } + return result +} + +// EqualsTo returns true if the current DegradationConfig equals to the other DegradationConfig +func (c *DegradationConfig) EqualsTo(other iface.ConfigValueItem) bool { + o := other.(*DegradationConfig) + return c.Enable == o.Enable && c.Percentage == o.Percentage +} + +// DegradationContainer is a wrapper for DegradationConfig +type DegradationContainer struct { + config atomic.Value +} + +func NewDegradationContainer() *DegradationContainer { + c := &DegradationContainer{} + c.config.Store(defaultDegradationConfig) + return c +} + +// NotifyPolicyChange to receive policy when it changes +func (c *DegradationContainer) NotifyPolicyChange(cfg *DegradationConfig) { + c.config.Store(cfg) +} + +func (c *DegradationContainer) GetAclRule() acl.RejectFunc { + return func(ctx context.Context, request interface{}) (reason error) { + cfg := c.config.Load().(*DegradationConfig) + if !cfg.Enable { + return nil + } + if fastrand.Intn(100) < cfg.Percentage { + return errRejected + } + return nil + } +} diff --git a/pkg/degradation/item_degradation_test.go b/pkg/degradation/item_degradation_test.go new file mode 100644 index 0000000..22223e4 --- /dev/null +++ b/pkg/degradation/item_degradation_test.go @@ -0,0 +1,40 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package degradation + +import ( + "context" + "errors" + "testing" + + "github.com/cloudwego/kitex/pkg/acl" + "github.com/cloudwego/thriftgo/pkg/test" +) + +var errFake = errors.New("fake error") + +func invoke(ctx context.Context, request, response interface{}) error { + return errFake +} + +func TestNewContainer(t *testing.T) { + container := NewDegradationContainer() + aclMiddleware := acl.NewACLMiddleware([]acl.RejectFunc{container.GetAclRule()}) + test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake)) + container.NotifyPolicyChange(&DegradationConfig{Enable: false, Percentage: 100}) + test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake)) + container.NotifyPolicyChange(&DegradationConfig{Enable: true, Percentage: 100}) + test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errRejected)) +} diff --git a/server/suit.go b/server/suit.go index d3e6a1a..d4f3535 100644 --- a/server/suit.go +++ b/server/suit.go @@ -49,7 +49,7 @@ func NewSuite(service string, cli consul.Client, return su } -// Options return a list client.Option +// Options return a list server.Option func (s *ConsulServerSuite) Options() []server.Option { opts := make([]server.Option, 0, 2) opts = append(opts, WithLimiter(s.service, s.consulClient, s.uid, s.opts))