diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index f12e2b3..2a1d761 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -21,7 +21,6 @@ import ( "io/ioutil" "net/url" "os" - "sync" "testing" "time" @@ -34,7 +33,6 @@ import ( "github.com/cloudwego/hertz/pkg/app/server" "github.com/cloudwego/hertz/pkg/app/server/registry" "github.com/cloudwego/hertz/pkg/common/config" - "github.com/cloudwego/hertz/pkg/common/hlog" "github.com/cloudwego/hertz/pkg/common/utils" "github.com/cloudwego/hertz/pkg/protocol/consts" "github.com/stretchr/testify/assert" @@ -399,21 +397,24 @@ func TestEtcdRegistryWithEnvironmentVariable(t *testing.T) { teardownEmbedEtcd(s) } -func TestEtcdRegistryKeepRegister(t *testing.T) { - etcdClient, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{"127.0.0.1:3564"}, - }) - etcd := &etcdRegistry{ - etcdClient: etcdClient, - } - reqCount, b1, b2 := etcd.keepRegisterForTest("testKey", "testValue", &retryCfg{ - maxAttemptTimes: 3, - observeDelay: 2 * time.Second, - retryDelay: 1 * time.Second, - }) - assert.Equal(t, 3, reqCount) - assert.True(t, b1) - assert.True(t, b2) +func TestRetryOption(t *testing.T) { + o := newOptionForServer([]string{"127.0.0.1:2345"}) + assert.Equal(t, o.etcdCfg.Endpoints, []string{"127.0.0.1:2345"}) + assert.Equal(t, uint(5), o.retryCfg.maxAttemptTimes) + assert.Equal(t, 30*time.Second, o.retryCfg.observeDelay) + assert.Equal(t, 10*time.Second, o.retryCfg.retryDelay) +} + +func TestRetryCustomConfig(t *testing.T) { + o := newOptionForServer( + []string{"127.0.0.1:2345"}, + WithMaxAttemptTimes(10), + WithObserveDelay(20*time.Second), + WithRetryDelay(5*time.Second), + ) + assert.Equal(t, uint(10), o.retryCfg.maxAttemptTimes) + assert.Equal(t, 20*time.Second, o.retryCfg.observeDelay) + assert.Equal(t, 5*time.Second, o.retryCfg.retryDelay) } func setupEmbedEtcd(t *testing.T) (*embed.Etcd, string) { @@ -440,82 +441,3 @@ func teardownEmbedEtcd(s *embed.Etcd) { s.Close() _ = os.RemoveAll(s.Config().Dir) } - -func (e *etcdRegistry) keepRegisterForTest(key, val string, retryConfig *retryCfg) (reqTime int, observeDelayCorrect, retryDelayCorrect bool) { - var ( - failedTimes uint - resp *clientv3.GetResponse - err error - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - ) - - observeStartTime := time.Now() - delay := retryConfig.observeDelay - // if maxAttemptTimes is 0, keep register forever - for retryConfig.maxAttemptTimes == 0 || failedTimes < retryConfig.maxAttemptTimes { - select { - case _, ok := <-e.stop: - if !ok { - close(e.stop) - } - return - case <-time.After(delay): - if delay == retryConfig.observeDelay { - observeEndTime := time.Now() - actualObserveDelay := observeEndTime.Sub(observeStartTime) - observeDelayCorrect = actualObserveDelay >= retryConfig.observeDelay - } else if delay == retryConfig.retryDelay { - retryDelayCorrect = true - } - } - - wg.Add(1) - go func() { - defer wg.Done() - ctx, cancel = context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - resp, err = e.etcdClient.Get(ctx, key) - reqTime++ - }() - wg.Wait() - - if err != nil { - delay = retryConfig.retryDelay - failedTimes++ - continue - } - - if len(resp.Kvs) == 0 { - delay = retryConfig.retryDelay - leaseID, err := e.grantLease() - if err != nil { - failedTimes++ - continue - } - - _, err = e.etcdClient.Put(ctx, key, val, clientv3.WithLease(leaseID)) - if err != nil { - failedTimes++ - continue - } - - meta := registerMeta{ - leaseID: leaseID, - } - meta.ctx, meta.cancel = context.WithCancel(context.Background()) - if err := e.keepalive(meta); err != nil { - failedTimes++ - continue - } - e.meta.cancel() - e.meta = &meta - delay = retryConfig.observeDelay - observeStartTime = time.Now() - } - failedTimes = 0 - } - hlog.Errorf("keep register service %s failed times:%d", key, failedTimes) - return reqTime, observeDelayCorrect, retryDelayCorrect -} diff --git a/etcd/registry.go b/etcd/registry.go index 073db93..77e7563 100644 --- a/etcd/registry.go +++ b/etcd/registry.go @@ -55,18 +55,7 @@ type registerMeta struct { // NewEtcdRegistry creates a etcd based registry. func NewEtcdRegistry(endpoints []string, opts ...Option) (registry.Registry, error) { - cfg := &option{ - etcdCfg: clientv3.Config{ - Endpoints: endpoints, - }, - retryCfg: &retryCfg{ - maxAttemptTimes: 5, - observeDelay: 30 * time.Second, - retryDelay: 10 * time.Second, - }, - } - cfg.apply(opts...) - + cfg := newOptionForServer(endpoints, opts...) etcdClient, err := clientv3.New(cfg.etcdCfg) if err != nil { return nil, err @@ -313,3 +302,18 @@ func getLocalIPv4Host() (string, error) { } return "", fmt.Errorf("not found ipv4 address") } + +func newOptionForServer(endpoints []string, opts ...Option) *option { + cfg := &option{ + etcdCfg: clientv3.Config{ + Endpoints: endpoints, + }, + retryCfg: &retryCfg{ + maxAttemptTimes: 5, + observeDelay: 30 * time.Second, + retryDelay: 10 * time.Second, + }, + } + cfg.apply(opts...) + return cfg +}