diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index 7a16d88..f12e2b3 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -21,18 +21,20 @@ import ( "io/ioutil" "net/url" "os" + "sync" "testing" "time" - "github.com/cloudwego/hertz/pkg/app/client/discovery" "github.com/stretchr/testify/require" "github.com/cloudwego/hertz/pkg/app" "github.com/cloudwego/hertz/pkg/app/client" + "github.com/cloudwego/hertz/pkg/app/client/discovery" "github.com/cloudwego/hertz/pkg/app/middlewares/client/sd" "github.com/cloudwego/hertz/pkg/app/server" "github.com/cloudwego/hertz/pkg/app/server/registry" "github.com/cloudwego/hertz/pkg/common/config" + "github.com/cloudwego/hertz/pkg/common/hlog" "github.com/cloudwego/hertz/pkg/common/utils" "github.com/cloudwego/hertz/pkg/protocol/consts" "github.com/stretchr/testify/assert" @@ -397,6 +399,23 @@ func TestEtcdRegistryWithEnvironmentVariable(t *testing.T) { teardownEmbedEtcd(s) } +func TestEtcdRegistryKeepRegister(t *testing.T) { + etcdClient, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{"127.0.0.1:3564"}, + }) + etcd := &etcdRegistry{ + etcdClient: etcdClient, + } + reqCount, b1, b2 := etcd.keepRegisterForTest("testKey", "testValue", &retryCfg{ + maxAttemptTimes: 3, + observeDelay: 2 * time.Second, + retryDelay: 1 * time.Second, + }) + assert.Equal(t, 3, reqCount) + assert.True(t, b1) + assert.True(t, b2) +} + func setupEmbedEtcd(t *testing.T) (*embed.Etcd, string) { endpoint := fmt.Sprintf("unix://localhost:%06d", os.Getpid()) u, err := url.Parse(endpoint) @@ -421,3 +440,82 @@ func teardownEmbedEtcd(s *embed.Etcd) { s.Close() _ = os.RemoveAll(s.Config().Dir) } + +func (e *etcdRegistry) keepRegisterForTest(key, val string, retryConfig *retryCfg) (reqTime int, observeDelayCorrect, retryDelayCorrect bool) { + var ( + failedTimes uint + resp *clientv3.GetResponse + err error + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + ) + + observeStartTime := time.Now() + delay := retryConfig.observeDelay + // if maxAttemptTimes is 0, keep register forever + for retryConfig.maxAttemptTimes == 0 || failedTimes < retryConfig.maxAttemptTimes { + select { + case _, ok := <-e.stop: + if !ok { + close(e.stop) + } + return + case <-time.After(delay): + if delay == retryConfig.observeDelay { + observeEndTime := time.Now() + actualObserveDelay := observeEndTime.Sub(observeStartTime) + observeDelayCorrect = actualObserveDelay >= retryConfig.observeDelay + } else if delay == retryConfig.retryDelay { + retryDelayCorrect = true + } + } + + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel = context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + resp, err = e.etcdClient.Get(ctx, key) + reqTime++ + }() + wg.Wait() + + if err != nil { + delay = retryConfig.retryDelay + failedTimes++ + continue + } + + if len(resp.Kvs) == 0 { + delay = retryConfig.retryDelay + leaseID, err := e.grantLease() + if err != nil { + failedTimes++ + continue + } + + _, err = e.etcdClient.Put(ctx, key, val, clientv3.WithLease(leaseID)) + if err != nil { + failedTimes++ + continue + } + + meta := registerMeta{ + leaseID: leaseID, + } + meta.ctx, meta.cancel = context.WithCancel(context.Background()) + if err := e.keepalive(meta); err != nil { + failedTimes++ + continue + } + e.meta.cancel() + e.meta = &meta + delay = retryConfig.observeDelay + observeStartTime = time.Now() + } + failedTimes = 0 + } + hlog.Errorf("keep register service %s failed times:%d", key, failedTimes) + return reqTime, observeDelayCorrect, retryDelayCorrect +} diff --git a/etcd/registry.go b/etcd/registry.go index f628014..073db93 100644 --- a/etcd/registry.go +++ b/etcd/registry.go @@ -221,8 +221,8 @@ func (e *etcdRegistry) keepRegister(key, val string, retryConfig *retryCfg) { go func() { defer wg.Done() ctx, cancel = context.WithTimeout(context.Background(), time.Second*3) - resp, err = e.etcdClient.Get(ctx, key) defer cancel() + resp, err = e.etcdClient.Get(ctx, key) }() wg.Wait()