diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index 7a16d88..2a1d761 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -24,11 +24,11 @@ import ( "testing" "time" - "github.com/cloudwego/hertz/pkg/app/client/discovery" "github.com/stretchr/testify/require" "github.com/cloudwego/hertz/pkg/app" "github.com/cloudwego/hertz/pkg/app/client" + "github.com/cloudwego/hertz/pkg/app/client/discovery" "github.com/cloudwego/hertz/pkg/app/middlewares/client/sd" "github.com/cloudwego/hertz/pkg/app/server" "github.com/cloudwego/hertz/pkg/app/server/registry" @@ -397,6 +397,26 @@ func TestEtcdRegistryWithEnvironmentVariable(t *testing.T) { teardownEmbedEtcd(s) } +func TestRetryOption(t *testing.T) { + o := newOptionForServer([]string{"127.0.0.1:2345"}) + assert.Equal(t, o.etcdCfg.Endpoints, []string{"127.0.0.1:2345"}) + assert.Equal(t, uint(5), o.retryCfg.maxAttemptTimes) + assert.Equal(t, 30*time.Second, o.retryCfg.observeDelay) + assert.Equal(t, 10*time.Second, o.retryCfg.retryDelay) +} + +func TestRetryCustomConfig(t *testing.T) { + o := newOptionForServer( + []string{"127.0.0.1:2345"}, + WithMaxAttemptTimes(10), + WithObserveDelay(20*time.Second), + WithRetryDelay(5*time.Second), + ) + assert.Equal(t, uint(10), o.retryCfg.maxAttemptTimes) + assert.Equal(t, 20*time.Second, o.retryCfg.observeDelay) + assert.Equal(t, 5*time.Second, o.retryCfg.retryDelay) +} + func setupEmbedEtcd(t *testing.T) (*embed.Etcd, string) { endpoint := fmt.Sprintf("unix://localhost:%06d", os.Getpid()) u, err := url.Parse(endpoint) diff --git a/etcd/example/server/retry/main.go b/etcd/example/server/retry/main.go new file mode 100644 index 0000000..8ed9ecb --- /dev/null +++ b/etcd/example/server/retry/main.go @@ -0,0 +1,51 @@ +// Copyright 2021 CloudWeGo Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "time" + + "github.com/cloudwego/hertz/pkg/app" + "github.com/cloudwego/hertz/pkg/app/server" + "github.com/cloudwego/hertz/pkg/app/server/registry" + "github.com/cloudwego/hertz/pkg/common/utils" + "github.com/cloudwego/hertz/pkg/protocol/consts" + "github.com/hertz-contrib/registry/etcd" +) + +func main() { + r, _ := etcd.NewEtcdRegistry( + []string{"127.0.0.1:2379"}, + etcd.WithMaxAttemptTimes(10), + etcd.WithObserveDelay(20*time.Second), + etcd.WithRetryDelay(5*time.Second), + ) + + addr := "127.0.0.1:8888" + h := server.Default( + server.WithHostPorts(addr), + server.WithRegistry(r, ®istry.Info{ + ServiceName: "hertz.test.demo", + Addr: utils.NewNetAddr("tcp", addr), + Weight: 10, + Tags: nil, + }), + ) + h.GET("/ping", func(_ context.Context, ctx *app.RequestContext) { + ctx.JSON(consts.StatusOK, utils.H{"ping": "pong2"}) + }) + h.Spin() +} diff --git a/etcd/example/server/main.go b/etcd/example/server/simple/main.go similarity index 100% rename from etcd/example/server/main.go rename to etcd/example/server/simple/main.go diff --git a/etcd/common.go b/etcd/option.go similarity index 69% rename from etcd/common.go rename to etcd/option.go index 078da91..53a4f93 100644 --- a/etcd/common.go +++ b/etcd/option.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "os" "strconv" + "time" "github.com/cloudwego/hertz/pkg/app/server/registry" "github.com/cloudwego/hertz/pkg/common/hlog" @@ -32,6 +33,50 @@ const ( defaultTTL = 60 ) +type option struct { + // etcd client config + etcdCfg clientv3.Config + retryCfg *retryCfg +} + +type retryCfg struct { + // The maximum number of call attempt times, including the initial call + maxAttemptTimes uint + // observeDelay is the delay time for checking the service status under normal conditions + observeDelay time.Duration + // retryDelay is the delay time for attempting to register the service after disconnecting + retryDelay time.Duration +} + +type Option func(o *option) + +// WithMaxAttemptTimes sets the maximum number of call attempt times, including the initial call +func WithMaxAttemptTimes(maxAttemptTimes uint) Option { + return func(o *option) { + o.retryCfg.maxAttemptTimes = maxAttemptTimes + } +} + +// WithObserveDelay sets the delay time for checking the service status under normal conditions +func WithObserveDelay(observeDelay time.Duration) Option { + return func(o *option) { + o.retryCfg.observeDelay = observeDelay + } +} + +// WithRetryDelay sets the delay time of retry +func WithRetryDelay(t time.Duration) Option { + return func(o *option) { + o.retryCfg.retryDelay = t + } +} + +func (o *option) apply(opts ...Option) { + for _, opt := range opts { + opt(o) + } +} + // instanceInfo used to stored service basic info in etcd. type instanceInfo struct { Network string `json:"network"` @@ -74,25 +119,22 @@ func getTTL() int64 { return ttl } -// Option sets options such as username, tls etc. -type Option func(cfg *clientv3.Config) - // WithTLSOpt returns a option that authentication by tls/ssl. func WithTLSOpt(certFile, keyFile, caFile string) Option { - return func(cfg *clientv3.Config) { + return func(o *option) { tlsCfg, err := newTLSConfig(certFile, keyFile, caFile, "") if err != nil { hlog.Errorf("HERTZ: tls failed with err: %v , skipping tls.", err) } - cfg.TLS = tlsCfg + o.etcdCfg.TLS = tlsCfg } } // WithAuthOpt returns an option that authentication by username and password. func WithAuthOpt(username, password string) Option { - return func(cfg *clientv3.Config) { - cfg.Username = username - cfg.Password = password + return func(o *option) { + o.etcdCfg.Username = username + o.etcdCfg.Password = password } } diff --git a/etcd/readme.md b/etcd/readme.md index 8392633..0413363 100644 --- a/etcd/readme.md +++ b/etcd/readme.md @@ -191,6 +191,61 @@ func main() { } } ``` +## Retry + +After the service is registered to `ETCD`, it will regularly check the status of the service. If any abnormal status is found, it will try to register the service again. `observeDelay` is the delay time for checking the service status under normal conditions, and `retryDelay` is the delay time for attempting to register the service after disconnecting. + +### Default Retry Config + +| Config Name | Default Value | Description | +|:--------------------|:-----------------|:------------------------------------------------------------------------------------------| +| WithMaxAttemptTimes | 5 | Used to set the maximum number of attempts, if 0, it means infinite attempts | +| WithObserveDelay | 30 * time.Second | Used to set the delay time for checking service status under normal connection conditions | +| WithRetryDelay | 10 * time.Second | Used to set the retry delay time after disconnecting | + +### Example + +```go +package main + +import ( + "context" + "time" + + "github.com/cloudwego/hertz/pkg/app" + "github.com/cloudwego/hertz/pkg/app/server" + "github.com/cloudwego/hertz/pkg/app/server/registry" + "github.com/cloudwego/hertz/pkg/common/utils" + "github.com/cloudwego/hertz/pkg/protocol/consts" + "github.com/hertz-contrib/registry/etcd" +) + +func main() { + r, _ := etcd.NewEtcdRegistry( + []string{"127.0.0.1:2379"}, + etcd.WithMaxAttemptTimes(10), + etcd.WithObserveDelay(20*time.Second), + etcd.WithRetryDelay(5*time.Second), + ) + + addr := "127.0.0.1:8888" + h := server.Default( + server.WithHostPorts(addr), + server.WithRegistry(r, ®istry.Info{ + ServiceName: "hertz.test.demo", + Addr: utils.NewNetAddr("tcp", addr), + Weight: 10, + Tags: nil, + }), + ) + h.GET("/ping", func(_ context.Context, ctx *app.RequestContext) { + ctx.JSON(consts.StatusOK, utils.H{"ping": "pong2"}) + }) + h.Spin() +} + +``` + ## How to Dynamically specify ip and port To dynamically specify an IP and port, one should first set the environment variables `HERTZ_IP_TO_REGISTRY` and `HERTZ_PORT_TO_REGISTRY`. If these variables are not set, the system defaults to using the service's listening IP and port. Notably, if the service's listening IP is either not set or set to "::", the system will automatically retrieve and use the machine's IPV4 address. diff --git a/etcd/registry.go b/etcd/registry.go index 0edd762..77e7563 100644 --- a/etcd/registry.go +++ b/etcd/registry.go @@ -38,10 +38,13 @@ const ( ) type etcdRegistry struct { - etcdClient *clientv3.Client - leaseTTL int64 - meta *registerMeta - mu sync.Mutex + etcdClient *clientv3.Client + retryConfig *retryCfg + + leaseTTL int64 + meta *registerMeta + mu sync.Mutex + stop chan struct{} } type registerMeta struct { @@ -52,19 +55,16 @@ type registerMeta struct { // NewEtcdRegistry creates a etcd based registry. func NewEtcdRegistry(endpoints []string, opts ...Option) (registry.Registry, error) { - cfg := clientv3.Config{ - Endpoints: endpoints, - } - for _, opt := range opts { - opt(&cfg) - } - etcdClient, err := clientv3.New(cfg) + cfg := newOptionForServer(endpoints, opts...) + etcdClient, err := clientv3.New(cfg.etcdCfg) if err != nil { return nil, err } return &etcdRegistry{ - etcdClient: etcdClient, - leaseTTL: getTTL(), + etcdClient: etcdClient, + leaseTTL: getTTL(), + retryConfig: cfg.retryCfg, + stop: make(chan struct{}, 1), }, nil } @@ -84,12 +84,13 @@ func (e *etcdRegistry) Register(info *registry.Info) error { leaseID: leaseID, } meta.ctx, meta.cancel = context.WithCancel(context.Background()) - if err := e.keepalive(&meta); err != nil { + if err := e.keepalive(meta); err != nil { return err } e.mu.Lock() e.meta = &meta e.mu.Unlock() + return nil } @@ -107,7 +108,7 @@ func (e *etcdRegistry) Deregister(info *registry.Info) error { } func (e *etcdRegistry) grantLease() (clientv3.LeaseID, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*100) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() resp, err := e.etcdClient.Grant(ctx, e.leaseTTL) if err != nil { @@ -133,7 +134,16 @@ func (e *etcdRegistry) register(info *registry.Info, leaseID clientv3.LeaseID) e ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() _, err = e.etcdClient.Put(ctx, serviceKey(info.ServiceName, addr), string(val), clientv3.WithLease(leaseID)) - return err + if err != nil { + return err + } + + // retry start + go func(key, val string) { + e.keepRegister(key, val, e.retryConfig) + }(serviceKey(info.ServiceName, addr), string(val)) + + return nil } func (e *etcdRegistry) deregister(info *registry.Info) error { @@ -144,11 +154,15 @@ func (e *etcdRegistry) deregister(info *registry.Info) error { return err } _, err = e.etcdClient.Delete(ctx, serviceKey(info.ServiceName, addr)) - return err + if err != nil { + return err + } + e.stop <- struct{}{} + return nil } // keepalive keep the lease alive -func (e *etcdRegistry) keepalive(meta *registerMeta) error { +func (e *etcdRegistry) keepalive(meta registerMeta) error { keepAlive, err := e.etcdClient.KeepAlive(meta.ctx, meta.leaseID) if err != nil { return err @@ -168,6 +182,81 @@ func (e *etcdRegistry) keepalive(meta *registerMeta) error { return nil } +// keepRegister keep register service by retryConfig +func (e *etcdRegistry) keepRegister(key, val string, retryConfig *retryCfg) { + var ( + failedTimes uint + resp *clientv3.GetResponse + err error + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + ) + + delay := retryConfig.observeDelay + // if maxAttemptTimes is 0, keep register forever + for retryConfig.maxAttemptTimes == 0 || failedTimes < retryConfig.maxAttemptTimes { + select { + case _, ok := <-e.stop: + if !ok { + close(e.stop) + } + hlog.Infof("stop keep register service %s", key) + return + case <-time.After(delay): + } + + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel = context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + resp, err = e.etcdClient.Get(ctx, key) + }() + wg.Wait() + + if err != nil { + hlog.Warnf("keep register get %s failed with err: %v", key, err) + delay = retryConfig.retryDelay + failedTimes++ + continue + } + + if len(resp.Kvs) == 0 { + hlog.Infof("keep register service %s", key) + delay = retryConfig.retryDelay + leaseID, err := e.grantLease() + if err != nil { + hlog.Warnf("keep register grant lease %s failed with err: %v", key, err) + failedTimes++ + continue + } + + _, err = e.etcdClient.Put(ctx, key, val, clientv3.WithLease(leaseID)) + if err != nil { + hlog.Warnf("keep register put %s failed with err: %v", key, err) + failedTimes++ + continue + } + + meta := registerMeta{ + leaseID: leaseID, + } + meta.ctx, meta.cancel = context.WithCancel(context.Background()) + if err := e.keepalive(meta); err != nil { + hlog.Warnf("keep register keepalive %s failed with err: %v", key, err) + failedTimes++ + continue + } + e.meta.cancel() + e.meta = &meta + delay = retryConfig.observeDelay + } + failedTimes = 0 + } + hlog.Errorf("keep register service %s failed times:%d", key, failedTimes) +} + // getAddressOfRegistration returns the address of the service registration. func (e *etcdRegistry) getAddressOfRegistration(info *registry.Info) (string, error) { host, port, err := net.SplitHostPort(info.Addr.String()) @@ -213,3 +302,18 @@ func getLocalIPv4Host() (string, error) { } return "", fmt.Errorf("not found ipv4 address") } + +func newOptionForServer(endpoints []string, opts ...Option) *option { + cfg := &option{ + etcdCfg: clientv3.Config{ + Endpoints: endpoints, + }, + retryCfg: &retryCfg{ + maxAttemptTimes: 5, + observeDelay: 30 * time.Second, + retryDelay: 10 * time.Second, + }, + } + cfg.apply(opts...) + return cfg +} diff --git a/etcd/resolver.go b/etcd/resolver.go index 1710c5b..bcd119a 100644 --- a/etcd/resolver.go +++ b/etcd/resolver.go @@ -16,8 +16,8 @@ package etcd import ( "context" - "encoding/json" + "github.com/bytedance/sonic" "github.com/cloudwego/hertz/pkg/app/client/discovery" "github.com/cloudwego/hertz/pkg/app/server/registry" "github.com/cloudwego/hertz/pkg/common/hlog" @@ -32,13 +32,13 @@ type etcdResolver struct { // NewEtcdResolver creates a etcd based resolver. func NewEtcdResolver(endpoints []string, opts ...Option) (discovery.Resolver, error) { - cfg := clientv3.Config{ - Endpoints: endpoints, + cfg := &option{ + etcdCfg: clientv3.Config{ + Endpoints: endpoints, + }, } - for _, opt := range opts { - opt(&cfg) - } - etcdClient, err := clientv3.New(cfg) + cfg.apply(opts...) + etcdClient, err := clientv3.New(cfg.etcdCfg) if err != nil { return nil, err } @@ -58,7 +58,7 @@ func (e *etcdResolver) Resolve(ctx context.Context, desc string) (discovery.Resu var eps []discovery.Instance for _, kv := range resp.Kvs { var info instanceInfo - err := json.Unmarshal(kv.Value, &info) + err := sonic.Unmarshal(kv.Value, &info) if err != nil { hlog.Warnf("HERTZ: fail to unmarshal with err: %v, ignore key: %v", err, string(kv.Key)) continue