Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Skyenought committed Oct 3, 2023
1 parent 40428fb commit d9aa7a7
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 2 deletions.
100 changes: 99 additions & 1 deletion etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import (
"io/ioutil"
"net/url"
"os"
"sync"
"testing"
"time"

"github.com/cloudwego/hertz/pkg/app/client/discovery"
"github.com/stretchr/testify/require"

"github.com/cloudwego/hertz/pkg/app"
"github.com/cloudwego/hertz/pkg/app/client"
"github.com/cloudwego/hertz/pkg/app/client/discovery"
"github.com/cloudwego/hertz/pkg/app/middlewares/client/sd"
"github.com/cloudwego/hertz/pkg/app/server"
"github.com/cloudwego/hertz/pkg/app/server/registry"
"github.com/cloudwego/hertz/pkg/common/config"
"github.com/cloudwego/hertz/pkg/common/hlog"
"github.com/cloudwego/hertz/pkg/common/utils"
"github.com/cloudwego/hertz/pkg/protocol/consts"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -397,6 +399,23 @@ func TestEtcdRegistryWithEnvironmentVariable(t *testing.T) {
teardownEmbedEtcd(s)
}

func TestEtcdRegistryKeepRegister(t *testing.T) {
etcdClient, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:3564"},
})
etcd := &etcdRegistry{
etcdClient: etcdClient,
}
reqCount, b1, b2 := etcd.keepRegisterForTest("testKey", "testValue", &retryCfg{
maxAttemptTimes: 3,
observeDelay: 2 * time.Second,
retryDelay: 1 * time.Second,
})
assert.Equal(t, 3, reqCount)
assert.True(t, b1)
assert.True(t, b2)
}

func setupEmbedEtcd(t *testing.T) (*embed.Etcd, string) {
endpoint := fmt.Sprintf("unix://localhost:%06d", os.Getpid())
u, err := url.Parse(endpoint)
Expand All @@ -421,3 +440,82 @@ func teardownEmbedEtcd(s *embed.Etcd) {
s.Close()
_ = os.RemoveAll(s.Config().Dir)
}

func (e *etcdRegistry) keepRegisterForTest(key, val string, retryConfig *retryCfg) (reqTime int, observeDelayCorrect, retryDelayCorrect bool) {
var (
failedTimes uint
resp *clientv3.GetResponse
err error
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
)

observeStartTime := time.Now()
delay := retryConfig.observeDelay
// if maxAttemptTimes is 0, keep register forever
for retryConfig.maxAttemptTimes == 0 || failedTimes < retryConfig.maxAttemptTimes {
select {
case _, ok := <-e.stop:
if !ok {
close(e.stop)
}
return
case <-time.After(delay):
if delay == retryConfig.observeDelay {
observeEndTime := time.Now()
actualObserveDelay := observeEndTime.Sub(observeStartTime)
observeDelayCorrect = actualObserveDelay >= retryConfig.observeDelay
} else if delay == retryConfig.retryDelay {
retryDelayCorrect = true
}
}

wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel = context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
resp, err = e.etcdClient.Get(ctx, key)
reqTime++
}()
wg.Wait()

if err != nil {
delay = retryConfig.retryDelay
failedTimes++
continue
}

if len(resp.Kvs) == 0 {
delay = retryConfig.retryDelay
leaseID, err := e.grantLease()
if err != nil {
failedTimes++
continue
}

_, err = e.etcdClient.Put(ctx, key, val, clientv3.WithLease(leaseID))
if err != nil {
failedTimes++
continue
}

meta := registerMeta{
leaseID: leaseID,
}
meta.ctx, meta.cancel = context.WithCancel(context.Background())
if err := e.keepalive(meta); err != nil {
failedTimes++
continue
}
e.meta.cancel()
e.meta = &meta
delay = retryConfig.observeDelay
observeStartTime = time.Now()
}
failedTimes = 0
}
hlog.Errorf("keep register service %s failed times:%d", key, failedTimes)
return reqTime, observeDelayCorrect, retryDelayCorrect
}
2 changes: 1 addition & 1 deletion etcd/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ func (e *etcdRegistry) keepRegister(key, val string, retryConfig *retryCfg) {
go func() {
defer wg.Done()
ctx, cancel = context.WithTimeout(context.Background(), time.Second*3)
resp, err = e.etcdClient.Get(ctx, key)
defer cancel()
resp, err = e.etcdClient.Get(ctx, key)
}()
wg.Wait()

Expand Down

0 comments on commit d9aa7a7

Please sign in to comment.