Skip to content

Commit

Permalink
Merge branch 'etcd'
Browse files Browse the repository at this point in the history
  • Loading branch information
zhu-mi-shan committed Jun 18, 2024
2 parents d90e5b2 + 266977f commit 184fb79
Show file tree
Hide file tree
Showing 19 changed files with 260 additions and 238 deletions.
8 changes: 3 additions & 5 deletions etcd/client/client.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package client
package etcdclient

import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"text/template"
"time"
)
Expand All @@ -13,7 +12,6 @@ type ReaderOptions struct {
Prefix string
PathFormat string
Timeout time.Duration
LoggerConfig *zap.Config
ConfigParser ConfigParser
MyConfig Config
}
Expand All @@ -35,8 +33,8 @@ func NewReader(opts ReaderOptions) (*EtcdReader, error) {
opts.PathFormat = EtcdClientDefaultPath
}
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: opts.Node,
LogConfig: opts.LoggerConfig,
Endpoints: opts.Node,
DialTimeout: opts.Timeout,
})
if err != nil {
return nil, err
Expand Down
56 changes: 36 additions & 20 deletions etcd/client/decoder.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package client
package etcdclient

import (
"encoding/json"
"fmt"
"time"
"strings"
)

type ConfigParser interface {
Decode(data []byte, config EtcdConfig) error
Decode(data []byte, config *EtcdConfig) error
}

type defaultParser struct {
}

func (p *defaultParser) Decode(data []byte, config EtcdConfig) error {
return json.Unmarshal(data, &config)
func (p *defaultParser) Decode(data []byte, config *EtcdConfig) error {
return json.Unmarshal(data, config)
}

type Config interface {
Expand All @@ -23,25 +23,41 @@ type Config interface {

type EtcdConfig struct {
ClientBasicInfo *EndpointBasicInfo `mapstructure:"ClientBasicInfo"`
HostPorts *string `mapstructure:"HostPorts"`
HostPorts []string `mapstructure:"HostPorts"`
DestService *string `mapstructure:"DestService"`
Protocol *string `mapstructure:"Protocol"`
Connection *Connection `mapstructure:"Connection"`
MyConfig Config `mapstructure:"MyConfig"`
}

func (c *EtcdConfig) String() string {
baseInfo := "nil"
var builder strings.Builder

if c.ClientBasicInfo != nil {
builder.WriteString(fmt.Sprintf("ClientBasicInfo: %v\n", *c.ClientBasicInfo))
}

if c.HostPorts != nil {
builder.WriteString(fmt.Sprintf("HostPorts: %v\n", c.HostPorts))
}

if c.DestService != nil {
builder.WriteString(fmt.Sprintf("DestService: %v\n", *c.DestService))
}

if c.Protocol != nil {
builder.WriteString(fmt.Sprintf("Protocol: %v\n", *c.Protocol))
}

if c.Connection != nil {
builder.WriteString(fmt.Sprintf("Connection: %v\n", *c.Connection))
}

if c.MyConfig != nil {
baseInfo = c.MyConfig.String()
builder.WriteString(c.MyConfig.String())
}
return fmt.Sprintf("ClientBasicInfo: %v\n"+
" HostPorts: %s\n"+
" DestService: %s\n"+
" Protocol: %s\n"+
" Connection: %v\n"+
" MyConfig: %s\n",
*c.ClientBasicInfo, *c.HostPorts, *c.DestService, *c.Protocol, *c.Connection, baseInfo)

return builder.String()
}

type EndpointBasicInfo struct {
Expand All @@ -51,13 +67,13 @@ type EndpointBasicInfo struct {
}

type IdleConfig struct {
MinIdlePerAddress int `mapstructure:"MinIdlePerAddress"`
MaxIdlePerAddress int `mapstructure:"MaxIdlePerAddress"`
MaxIdleGlobal int `mapstructure:"MaxIdleGlobal"`
MaxIdleTimeout time.Duration `mapstructure:"MaxIdleTimeout"`
MinIdlePerAddress int `mapstructure:"MinIdlePerAddress"`
MaxIdlePerAddress int `mapstructure:"MaxIdlePerAddress"`
MaxIdleGlobal int `mapstructure:"MaxIdleGlobal"`
MaxIdleTimeout string `mapstructure:"MaxIdleTimeout"`
}
type MuxConnection struct {
ConnNum int `json:"connNum"`
ConnNum int `mapstructure:"ConnNum"`
}

type Connection struct {
Expand Down
7 changes: 6 additions & 1 deletion etcd/client/loader.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package client
package etcdclient

import (
kitexclient "github.com/cloudwego/kitex/client"
Expand All @@ -22,6 +22,11 @@ type EtcdLoader struct {
}

func (l *EtcdLoader) Load() error {
path := Path{ClientServiceName: l.ClientServiceName, ServerServiceName: l.ServerServiceName}
err := l.reader.ReadToConfig(&path)
if err != nil {
return err
}
config, err := l.reader.GetConfig()
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions etcd/client/reader.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package client
package etcdclient

import (
"bytes"
Expand All @@ -13,7 +13,7 @@ const (
EtcdDefaultNode = "http://127.0.0.1:2379"
EtcdDefaultConfigPrefix = "/KitexConfig"
EtcdDefaultTimeout = 5 * time.Second
EtcdClientDefaultPath = "{{.ClientServiceName}}/{{.ServerServiceName}}"
EtcdClientDefaultPath = "/{{.ClientServiceName}}/{{.ServerServiceName}}"
)

type Reader interface {
Expand Down Expand Up @@ -55,7 +55,7 @@ func (r *EtcdReader) ReadToConfig(p *Path) error {
klog.Debugf("[etcd] key: %s config get value failed", key)
return err
}
err = r.parser.Decode(data.Kvs[0].Value, *r.config)
err = r.parser.Decode(data.Kvs[0].Value, r.config)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion etcd/client/suite.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package client
package etcdclient

import (
"github.com/cloudwego/kitex/client"
Expand Down
38 changes: 23 additions & 15 deletions etcd/client/translator.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package client
package etcdclient

import (
"fmt"
"github.com/Printemps417/optionloader/utils"
kitexclient "github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/connpool"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/transport"
"strconv"
"strings"
)

// Protocol indicates the transport protocol.
Expand Down Expand Up @@ -38,6 +37,9 @@ var protocolMap = map[string]Protocol{

func basicInfoTranslator(config *EtcdConfig) ([]kitexclient.Option, error) {
c := config.ClientBasicInfo
if c == nil {
return nil, nil
}
var res []kitexclient.Option
rpcInfo := rpcinfo.EndpointBasicInfo{
ServiceName: c.ServiceName,
Expand All @@ -49,6 +51,9 @@ func basicInfoTranslator(config *EtcdConfig) ([]kitexclient.Option, error) {
}
func protocolTranslator(config *EtcdConfig) ([]kitexclient.Option, error) {
c := config.Protocol
if c == nil {
return nil, nil
}
var res []kitexclient.Option
protocol, ok := protocolMap[*c]
if !ok {
Expand All @@ -60,40 +65,43 @@ func protocolTranslator(config *EtcdConfig) ([]kitexclient.Option, error) {
}
func destServiceTranslator(config *EtcdConfig) ([]kitexclient.Option, error) {
c := config.DestService
if c == nil {
return nil, nil
}
var res []kitexclient.Option
res = append(res, kitexclient.WithDestService(*c))
return res, nil
}

func hostPortsTranslator(config *EtcdConfig) ([]kitexclient.Option, error) {
c := config.HostPorts
var res []kitexclient.Option

ports := strings.Split(*c, ",")
for _, port := range ports {
hostPort := strings.Split(port, ":")
portNum, err := strconv.Atoi(hostPort[len(hostPort)-1])
if err != nil || portNum < 1 || portNum > 65535 {
return nil, fmt.Errorf("invalid port number: %s", port)
}
if c == nil {
return nil, nil
}

res = append(res, kitexclient.WithHostPorts(*c))
var res []kitexclient.Option
res = append(res, kitexclient.WithHostPorts(c...))
return res, nil
}
func connectionTranslator(config *EtcdConfig) ([]kitexclient.Option, error) {
c := config.Connection
if c == nil {
return nil, nil
}
var res []kitexclient.Option

switch c.Method {
case "ShortConnection":
res = append(res, kitexclient.WithShortConnection())
case "LongConnection":
MaxIdleTimeout, err := utils.ParseDuration(c.LongConnection.MaxIdleTimeout)
if err != nil {
return nil, err
}
idleConfig := connpool.IdleConfig{
MinIdlePerAddress: c.LongConnection.MinIdlePerAddress,
MaxIdlePerAddress: c.LongConnection.MaxIdlePerAddress,
MaxIdleGlobal: c.LongConnection.MaxIdleGlobal,
MaxIdleTimeout: c.LongConnection.MaxIdleTimeout,
MaxIdleTimeout: MaxIdleTimeout,
}
res = append(res, kitexclient.WithLongConnection(idleConfig))
case "MuxConnection":
Expand Down
8 changes: 4 additions & 4 deletions etcd/server/decoder.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package server
package etcdserver

import (
"encoding/json"
"fmt"
)

type ConfigParser interface {
Decode(data []byte, config EtcdConfig) error
Decode(data []byte, config *EtcdConfig) error
}

type defaultParser struct {
}

func (p *defaultParser) Decode(data []byte, config EtcdConfig) error {
return json.Unmarshal(data, &config)
func (p *defaultParser) Decode(data []byte, config *EtcdConfig) error {
return json.Unmarshal(data, config)
}

type Config interface {
Expand Down
7 changes: 6 additions & 1 deletion etcd/server/loader.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package server
package etcdserver

import (
"github.com/cloudwego/kitex/pkg/klog"
Expand All @@ -21,6 +21,11 @@ type EtcdLoader struct {
}

func (l *EtcdLoader) Load() error {
path := Path{ServerServiceName: l.ServerServiceName}
err := l.reader.ReadToConfig(&path)
if err != nil {
return err
}
config, err := l.reader.GetConfig()
if err != nil {
return err
Expand Down
7 changes: 3 additions & 4 deletions etcd/server/reader.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package server
package etcdserver

import (
"bytes"
Expand All @@ -13,7 +13,7 @@ const (
EtcdDefaultNode = "http://127.0.0.1:2379"
EtcdDefaultConfigPrefix = "/KitexConfig"
EtcdDefaultTimeout = 5 * time.Second
EtcdServerDefaultPath = "{{.ServerServiceName}}"
EtcdServerDefaultPath = "/{{.ServerServiceName}}"
)

type Reader interface {
Expand All @@ -33,7 +33,6 @@ type EtcdReader struct {
}

type Path struct {
ClientServiceName string
ServerServiceName string
}

Expand All @@ -55,7 +54,7 @@ func (r *EtcdReader) ReadToConfig(p *Path) error {
klog.Debugf("[etcd] key: %s config get value failed", key)
return err
}
err = r.parser.Decode(data.Kvs[0].Value, *r.config)
err = r.parser.Decode(data.Kvs[0].Value, r.config)
if err != nil {
return err
}
Expand Down
8 changes: 3 additions & 5 deletions etcd/server/server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package server
package etcdserver

import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"text/template"
"time"
)
Expand All @@ -13,7 +12,6 @@ type ReaderOptions struct {
Prefix string
PathFormat string
Timeout time.Duration
LoggerConfig *zap.Config
ConfigParser ConfigParser
MyConfig Config
}
Expand All @@ -35,8 +33,8 @@ func NewReader(opts ReaderOptions) (*EtcdReader, error) {
opts.PathFormat = EtcdServerDefaultPath
}
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: opts.Node,
LogConfig: opts.LoggerConfig,
Endpoints: opts.Node,
DialTimeout: opts.Timeout,
})
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion etcd/server/suite.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package server
package etcdserver

import (
"github.com/cloudwego/kitex/server"
Expand Down
Loading

0 comments on commit 184fb79

Please sign in to comment.