From aee59e2b4a4a2089b27dc84192cd35b756fb16d1 Mon Sep 17 00:00:00 2001 From: whalecold Date: Sat, 16 Sep 2023 17:40:21 +0800 Subject: [PATCH] modify according comments --- README.md | 109 +++++++++++++++++----------------- README_CN.md | 112 ++++++++++++++++++----------------- client/circuit_breaker.go | 5 +- client/retry.go | 5 +- client/rpc_timeout.go | 5 +- example/client/main.go | 12 +--- example/server/main.go | 12 +--- nacos/env.go | 53 +++++------------ nacos/env_test.go | 31 ++-------- nacos/nacos.go | 120 +++++++++++++++++++++++++++++++++++--- server/limiter.go | 6 +- 11 files changed, 269 insertions(+), 201 deletions(-) diff --git a/README.md b/README.md index 5313cc0..2e71e77 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ Nacos as config centre. #### Server ```go +package main + import ( "context" "log" @@ -32,25 +34,20 @@ type EchoImpl struct{} // Echo implements the Echo interface. func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Response, err error) { klog.Info("echo called") - return &api.Response{Message: req.Message}, nil + return &api.Response{Message: req.Message}, nil } func main() { - nacosClient, err := nacos.DefaultClient() + klog.SetLevel(klog.LevelDebug) + nacosClient, err := nacos.New(nacos.Options{}) if err != nil { panic(err) } - serviceName := "echo" - - opts := []server.Option{ - server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}), - } - - opts = append(opts, nacosserver.NewSuite(serviceName, nacosClient).Options()...) - + serviceName := "server" svr := echo.NewServer( new(EchoImpl), - opts..., + server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}), + server.WithSuite(nacosserver.NewSuite(serviceName, nacosClient)), ) if err := svr.Run(); err != nil { log.Println("server stopped with error:", err) @@ -63,6 +60,8 @@ func main() { #### Client ```go +package main + import ( "context" "log" @@ -79,7 +78,7 @@ import ( func main() { klog.SetLevel(klog.LevelDebug) - nacosClient, err := nacos.DefaultClient() + nacosClient, err := nacos.New(nacos.Options{}) if err != nil { panic(err) } @@ -88,18 +87,12 @@ func main() { klog.Infof("nacos config %v", cp) } - opts := []client.Option{ - client.WithHostPorts("0.0.0.0:8888"), - } - - serviceName := "echo" - clientName := "test" - - opts = append(opts, nacosclient.NewSuite(serviceName, clientName, nacosClient, fn).Options()...) - + serviceName := "server" + clientName := "client" client, err := echo.NewClient( serviceName, - opts..., + client.WithHostPorts("0.0.0.0:8888"), + client.WithSuite(nacosclient.NewSuite(serviceName, clientName, nacosClient, fn)), ) if err != nil { log.Fatal(err) @@ -118,35 +111,40 @@ func main() { ### Nacos Configuration -The client obtains the nacos address, port and namespace from the environment variables and connects to the nacos server. After the connection is established, the suite subscribes to the appropriate configuration based on configGroup and configDataId and dynamically updates its own policy. See the environment variables below for specific parameters. +The client is initialized according to the parameters of `Options` and connects to the nacos server. If the parameters are empty, the addr, port and namespace of nacos are obtained from the environment variables. After the connection is established, the suite subscribes the appropriate configuration based on `configGroup` and `configDataId` to updates its own policy dynamically. See the environment variables below for specific parameters. -The configured format supports `json` and `yaml` by default. You can use the `SetParser` function to customise the format parsing method, and the `CustomFunction` function to customise the format of the subscription function during `NewSuite`. +The configuration format supports `json` and `yaml`. You can use the [SetParser](https://github.com/kitex-contrib/config-nacos/blob/eb006978517678dd75a81513142d3faed6a66f8d/nacos/nacos.go#L68) function to customise the format parsing method, and the `CustomFunction` function to customise the format of the subscription function during `NewSuite`. #### #### Environment Variable | Environment Variable Name | Environment Variable Default Value | Environment Variable Introduction | | ------------------------- | ---------------------------------- | --------------------------------- | -| serverAddr | 127.0.0.1 | nacos server address | -| serverPort | 8848 | nacos server port | -| namespace | | the namespaceId of nacos | -| configDataId | {{.ClientServiceName}}.{{.ServerServiceName}}.{{.Category}} | Use go [template](https://pkg.go.dev/text/template) syntax rendering to generate the appropriate ID, and use `ClientServiceName` `ServiceName` `Category` three metadata that can be customised | -| configGroup | DEFAULT_GROUP | Use fixed values or dynamic rendering. Usage is the same as configDataId. | +| KITEX_CONFIG_NACOS_SERVER_ADDR | 127.0.0.1 | Nacos server address | +| KITEX_CONFIG_NACOS_SERVER_PORT | 8848 | Nacos server port | +| KITEX_CONFIG_NACOS_NAMESPACE | | The namespaceId of nacos | +| KITEX_CONFIG_NACOS_DATA_ID | {{.ClientServiceName}}.{{.ServerServiceName}}.{{.Category}} | Use go [template](https://pkg.go.dev/text/template) syntax rendering to generate the appropriate ID, and use `ClientServiceName` `ServiceName` `Category` three metadata that can be customised | +| KITEX_CONFIG_NACOS_GROUP | DEFAULT_GROUP | Use fixed values or dynamic rendering. Usage is the same as configDataId. | #### Governance Policy -> The configDataId and configGroup in the following example use default values, the service name is echo and the client name is requester. +> The configDataId and configGroup in the following example use default values, the service name is `ServiceName` and the client name is `ClientName`. ##### Rate Limit Category=limit > Currently, current limiting only supports the server side, so ClientServiceName is empty. [JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/limiter/item_limiter.go#L33) +|Variable|Introduction| +|----|----| +|connection_limit| max concurrent connection | +|qps_limit| max request every 100ms | Example: ``` -configDataID: .echo.limit +configDataID: ServiceName.limit + { - "connection_limit": 100, // Maximum 100 concurrent connections - "qps_limit": 2000 // Maximum 2000 QPS per 100ms + "connection_limit": 100, + "qps_limit": 2000 } ``` @@ -159,13 +157,18 @@ Note: ##### Retry Policy Category=retry [JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/retry/policy.go#L63) +|Variable|Introduction| +|----|----| +|type| 0: failure_policy 1: backup_policy| +|failure_policy.backoff_policy| can only be set one of `fixed` `none` `random` | + Example: ``` -configDataId: requester.echo.retry +configDataId: ClientName.ServiceName.retry { - "*": { // * default value, If you do not configure all fallbacks to this policy + "*": { "enable": true, - "type": 0, // failed retry(type=0) + "type": 0, "failure_policy": { "stop_policy": { "max_retry_times": 3, @@ -175,35 +178,32 @@ configDataId: requester.echo.retry } }, "backoff_policy": { - "backoff_type": "fixed", + "backoff_type": "fixed", "cfg_items": { "fix_ms": 50 } - } + }, + "retry_same_node": false } }, - "echo": { // the method, lower-case + "echo": { "enable": true, - "type": 1, // backoff_policy(type=1) - "failure_policy": { + "type": 1, + "backup_policy": { + "retry_delay_ms": 100, + "retry_same_node": false, "stop_policy": { - "max_retry_times": 3, - "max_duration_ms": 2000, + "max_retry_times": 2, + "max_duration_ms": 300, "cb_policy": { - "error_rate": 0.5 - } - }, - "backoff_policy": { - "backoff_type": "fixed", - "cfg_items": { - "fix_ms": 50 + "error_rate": 0.2 } } } } } ``` -Note: retry.Container has built-in support for specifying the default configuration using the * wildcard (see the [getRetryer](https://github.com/cloudwego/kitex/blob/v0.5.1/pkg/retry/retryer.go#L240) method for details). +Note: retry.Container has built-in support for specifying the default configuration using the `*` wildcard (see the [getRetryer](https://github.com/cloudwego/kitex/blob/v0.5.1/pkg/retry/retryer.go#L240) method for details). ##### RPC Timeout Category=rpc_timeout @@ -211,7 +211,7 @@ Note: retry.Container has built-in support for specifying the default configurat Example: ``` -configDataId: requester.echo.rpc_timeout +configDataId: ClietnName.ServiceName.rpc_timeout { "*": { "conn_timeout_ms": 100, @@ -229,10 +229,13 @@ Note: The circuit breaker implementation of kitex does not currently support cha [JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/circuitbreak/item_circuit_breaker.go#L30) +|Variable|Introduction| +|----|----| +|min_sample| Minimum statistical sample number| Example: ``` The Echo method uses the following configuration (0.3, 100) and other methods use the global default configuration (0.5, 200) -configDataId: `requester.echo.circuit_break` +configDataId: `ClientName.ServiecName.circuit_break` { "Echo": { "enable": true, diff --git a/README_CN.md b/README_CN.md index 40100d5..a0a2e65 100644 --- a/README_CN.md +++ b/README_CN.md @@ -11,6 +11,8 @@ #### 服务端 ```go +package main + import ( "context" "log" @@ -36,21 +38,16 @@ func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Respon } func main() { - nacosClient, err := nacos.DefaultClient() + klog.SetLevel(klog.LevelDebug) + nacosClient, err := nacos.New(nacos.Options{}) if err != nil { panic(err) } - serviceName := "echo" - - opts := []server.Option{ - server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}), - } - - opts = append(opts, nacosserver.NewSuite(serviceName, nacosClient).Options()...) - + serviceName := "server" svr := echo.NewServer( new(EchoImpl), - opts..., + server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}), + server.WithSuite(nacosserver.NewSuite(serviceName, nacosClient)), ) if err := svr.Run(); err != nil { log.Println("server stopped with error:", err) @@ -63,6 +60,8 @@ func main() { #### 客户端 ```go +package main + import ( "context" "log" @@ -79,7 +78,7 @@ import ( func main() { klog.SetLevel(klog.LevelDebug) - nacosClient, err := nacos.DefaultClient() + nacosClient, err := nacos.New(nacos.Options{}) if err != nil { panic(err) } @@ -88,18 +87,12 @@ func main() { klog.Infof("nacos config %v", cp) } - opts := []client.Option{ - client.WithHostPorts("0.0.0.0:8888"), - } - - serviceName := "echo" - clientName := "test" - - opts = append(opts, nacosclient.NewSuite(serviceName, clientName, nacosClient, fn).Options()...) - + serviceName := "server" + clientName := "client" client, err := echo.NewClient( serviceName, - opts..., + client.WithHostPorts("0.0.0.0:8888"), + client.WithSuite(nacosclient.NewSuite(serviceName, clientName, nacosClient, fn)), ) if err != nil { log.Fatal(err) @@ -117,35 +110,41 @@ func main() { ``` ### Nacos 配置 -client 根据环境变量获取到 nacos 的 addr, port 以及 namespace 链接到 nacos 服务器上,建立链接之后 suite 会根据 configGroup 以及 configDataId 订阅对应的配置并动态更新自身策略,具体参数参考下面环境变量。 +根据 Options 的参数初始化 client, 如果参数为空则会根据环境变量获取到 nacos 的 addr, port 以及 namespace 链接到 nacos 服务器上,建立链接之后 suite 会根据 configGroup 以及 configDataId 订阅对应的配置并动态更新自身策略,具体参数参考下面环境变量。 配置的格式默认支持 `json` 和 `yaml`,可以使用函数 `SetParser` 进行自定义格式解析方式,并在 `NewSuite` 的时候使用 `CustomFunction` 函数修改订阅函数的格式。 #### 环境变量 + | 变量名 | 变量默认值 | 作用 | | ------------------------- | ---------------------------------- | --------------------------------- | -| serverAddr | 127.0.0.1 | nacos 服务器地址 | -| serverPort | 8848 | nacos 服务器端口 | -| namespace | | nacos 中的 namespace Id | -| configDataId | {{.ClientServiceName}}.{{.ServerServiceName}}.{{.Category}} | 使用 go [template](https://pkg.go.dev/text/template) 语法渲染生成对应的 ID, 使用 `ClientServiceName` `ServiceName` `Category` 三个元数据,可以自定义 | -| configGroup | DEFAULT_GROUP | 使用固定值,也可以动态渲染,用法同 configDataId | +| KITEX_CONFIG_NACOS_SERVER_ADDR | 127.0.0.1 | nacos 服务器地址 | +| KITEX_CONFIG_NACOS_SERVER_PORT | 8848 | nacos 服务器端口 | +| KITEX_CONFIG_NACOS_NAMESPACE | | nacos 中的 namespace Id | +| KITEX_CONFIG_NACOS_DATA_ID | {{.ClientServiceName}}.{{.ServerServiceName}}.{{.Category}} | 使用 go [template](https://pkg.go.dev/text/template) 语法渲染生成对应的 ID, 使用 `ClientServiceName` `ServiceName` `Category` 三个元数据,可以自定义 | +| KITEX_CONFIG_NACOS_GROUP | DEFAULT_GROUP | 使用固定值,也可以动态渲染,用法同 configDataId | #### 治理策略 -下面例子中的 configDataId 以及 configGroup 均使用默认值,服务名称为 echo,客户端名称为 requester +下面例子中的 configDataId 以及 configGroup 均使用默认值,服务名称为 ServiceName,客户端名称为 ClientName ##### 限流 Category=limit > 限流目前只支持服务端,所以 ClientServiceName 为空。 [JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/limiter/item_limiter.go#L33) +|字段|说明| +|----|----| +|connection_limit|最大并发数量| +|qps_limit|每 100ms 内的最大请求数量| + 例子: ``` -configDataID: .echo.limit +configDataID: ServiceName.limit { - "connection_limit": 100, // 最大100并发 - "qps_limit": 2000 // 每 100ms 内最大 2000QPS + "connection_limit": 100, + "qps_limit": 2000 } ``` 注: @@ -158,13 +157,18 @@ configDataID: .echo.limit [JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/retry/policy.go#L63) +|参数|说明| +|----|----| +|type| 0: failure_policy 1: backup_policy| +|failure_policy.backoff_policy| 可以设置的策略: `fixed` `none` `random` | + 例子: ``` -configDataId: requester.echo.retry +configDataId: ClientName.ServiceName.retry { - "*": { // * 默认值,如果不配置全部 fallback 到这个策略 + "*": { "enable": true, - "type": 0, // 失败重试(type=0) + "type": 0, "failure_policy": { "stop_policy": { "max_retry_times": 3, @@ -174,28 +178,25 @@ configDataId: requester.echo.retry } }, "backoff_policy": { - "backoff_type": "fixed", + "backoff_type": "fixed", "cfg_items": { "fix_ms": 50 } - } + }, + "retry_same_node": false } }, - "echo": { // 请求方法名称,要转成全小写 + "echo": { "enable": true, - "type": 1, // 备用请求(type=1) - "failure_policy": { + "type": 1, + "backup_policy": { + "retry_delay_ms": 100, + "retry_same_node": false, "stop_policy": { - "max_retry_times": 3, - "max_duration_ms": 2000, + "max_retry_times": 2, + "max_duration_ms": 300, "cb_policy": { - "error_rate": 0.5 - } - }, - "backoff_policy": { - "backoff_type": "fixed", - "cfg_items": { - "fix_ms": 50 + "error_rate": 0.2 } } } @@ -210,14 +211,14 @@ configDataId: requester.echo.retry 例子: ``` -configDataId: requester.echo.rpc_timeout +configDataId: ClientName.ServiceName.rpc_timeout { "*": { - "conn_timeout_ms": 100, // 其他方法连接超时 100ms、请求超时 3s + "conn_timeout_ms": 100, "rpc_timeout_ms": 3000 }, "echo": { - "conn_timeout_ms": 50, // Echo 方法链接超时 50ms、请求超时 1s, + "conn_timeout_ms": 50, "rpc_timeout_ms": 1000 } } @@ -228,15 +229,18 @@ configDataId: requester.echo.rpc_timeout [JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/circuitbreak/item_circuit_breaker.go#L30) +|参数|说明| +|----|----| +|min_sample| 最小的统计样本数| 例子: ``` Echo 方法使用下面的配置(0.3、100),其他方法使用全局默认配置(0.5、200) -configDataId: `requester.echo.circuit_break` +configDataId: `ClientName.ServiceName.circuit_break` { "Echo": { "enable": true, - "err_rate": 0.3, 错误率 - "min_sample": 100 最小的统计个数 + "err_rate": 0.3, + "min_sample": 100 } } ``` diff --git a/client/circuit_breaker.go b/client/circuit_breaker.go index 0f0ae59..9a13f36 100644 --- a/client/circuit_breaker.go +++ b/client/circuit_breaker.go @@ -31,11 +31,14 @@ import ( func WithCircuitBreaker(dest, src string, nacosClient nacos.Client, cfs ...nacos.CustomFunction, ) []client.Option { - param := nacos.NacosConfigParam(&nacos.ConfigParamConfig{ + param, err := nacosClient.NacosConfigParam(&nacos.ConfigParamConfig{ Category: circuitBreakerConfigName, ServerServiceName: dest, ClientServiceName: src, }, cfs...) + if err != nil { + panic(err) + } cbSuite := initCircuitBreaker(param, dest, src, nacosClient) diff --git a/client/retry.go b/client/retry.go index 31fc0f7..9c32923 100644 --- a/client/retry.go +++ b/client/retry.go @@ -28,11 +28,14 @@ import ( func WithRetryPolicy(dest, src string, nacosClient nacos.Client, cfs ...nacos.CustomFunction, ) []client.Option { - param := nacos.NacosConfigParam(&nacos.ConfigParamConfig{ + param, err := nacosClient.NacosConfigParam(&nacos.ConfigParamConfig{ Category: retryConfigName, ServerServiceName: dest, ClientServiceName: src, }, cfs...) + if err != nil { + panic(err) + } return []client.Option{ client.WithRetryContainer(initRetryContainer(param, dest, nacosClient)), diff --git a/client/rpc_timeout.go b/client/rpc_timeout.go index 063b5be..36ab552 100644 --- a/client/rpc_timeout.go +++ b/client/rpc_timeout.go @@ -27,11 +27,14 @@ import ( func WithRPCTimeout(dest, src string, nacosClient nacos.Client, cfs ...nacos.CustomFunction, ) []client.Option { - param := nacos.NacosConfigParam(&nacos.ConfigParamConfig{ + param, err := nacosClient.NacosConfigParam(&nacos.ConfigParamConfig{ Category: rpcTimeoutConfigName, ServerServiceName: dest, ClientServiceName: src, }, cfs...) + if err != nil { + panic(err) + } return []client.Option{ client.WithTimeoutProvider(initRPCTimeoutContainer(param, dest, nacosClient)), diff --git a/example/client/main.go b/example/client/main.go index f06a875..bc362cf 100644 --- a/example/client/main.go +++ b/example/client/main.go @@ -31,7 +31,7 @@ import ( func main() { klog.SetLevel(klog.LevelDebug) - nacosClient, err := nacos.DefaultClient() + nacosClient, err := nacos.New(nacos.Options{}) if err != nil { panic(err) } @@ -40,18 +40,12 @@ func main() { klog.Infof("nacos config %v", cp) } - opts := []client.Option{ - client.WithHostPorts("0.0.0.0:8888"), - } - serviceName := "echo" clientName := "test" - - opts = append(opts, nacosclient.NewSuite(serviceName, clientName, nacosClient, fn).Options()...) - client, err := echo.NewClient( serviceName, - opts..., + client.WithHostPorts("0.0.0.0:8888"), + client.WithSuite(nacosclient.NewSuite(serviceName, clientName, nacosClient, fn)), ) if err != nil { log.Fatal(err) diff --git a/example/server/main.go b/example/server/main.go index 4ae4201..c9494e5 100644 --- a/example/server/main.go +++ b/example/server/main.go @@ -41,21 +41,15 @@ func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Respon func main() { klog.SetLevel(klog.LevelDebug) - nacosClient, err := nacos.DefaultClient() + nacosClient, err := nacos.New(nacos.Options{}) if err != nil { panic(err) } serviceName := "echo" - - opts := []server.Option{ - server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}), - } - - opts = append(opts, nacosserver.NewSuite(serviceName, nacosClient).Options()...) - svr := echo.NewServer( new(EchoImpl), - opts..., + server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}), + server.WithSuite(nacosserver.NewSuite(serviceName, nacosClient)), ) if err := svr.Run(); err != nil { log.Println("server stopped with error:", err) diff --git a/nacos/env.go b/nacos/env.go index 5410b50..48a3cb3 100644 --- a/nacos/env.go +++ b/nacos/env.go @@ -26,14 +26,13 @@ import ( ) const ( - NACOS_ENV_SERVER_ADDR = "serverAddr" - NACOS_ENV_PORT = "serverPort" - NACOS_ENV_NAMESPACE_ID = "namespace" - NACOS_ENV_CONFIG_GROUP = "configGroup" - NACOS_ENV_CONFIG_DATA_ID = "configDataId" + NACOS_ENV_SERVER_ADDR = "KITEX_CONFIG_NACOS_SERVER_ADDR" + NACOS_ENV_PORT = "KITEX_CONFIG_NACOS_SERVER_PORT" + NACOS_ENV_NAMESPACE_ID = "KITEX_CONFIG_NACOS_NAMESPACE" + NACOS_ENV_CONFIG_GROUP = "KITEX_CONFIG_NACOS_GROUP" + NACOS_ENV_CONFIG_DATA_ID = "KITEX_CONFIG_NACOS_DATA_ID" NACOS_DEFAULT_SERVER_ADDR = "127.0.0.1" NACOS_DEFAULT_PORT = 8848 - NACOS_DEFAULT_REGIONID = "cn-hangzhou" NACOS_DEFAULT_CONFIG_GROUP = "DEFAULT_GROUP" NACOS_DEFAULT_DATA_ID = "{{.ClientServiceName}}.{{.ServerServiceName}}.{{.Category}}" ) @@ -66,28 +65,8 @@ func render(name, format string, cpc *ConfigParamConfig) string { return tpl.String() } -// NacosConfigParam Get nacos config from environment variables. All the parameters can be customized with CustomFunction. -// ConfigParam explain: -// 1. Type: data format, support JSON and YAML, JSON by default. Could extend it by implementing the ConfigParser interface. -// 2. Content: empty by default. Customize with CustomFunction. -// 3. Group: DEFAULT_GROUP by default. -// 4. DataId: {{.ClientServiceName}}.{{.ServerServiceName}}.{{.Category}} by default. Customize it by CustomFunction or -// use specified format. ref: nacos/env.go:46 -func NacosConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) vo.ConfigParam { - param := vo.ConfigParam{ - DataId: render("dataId", NacosConfigDataId(), cpc), - Group: render("group", NacosConfigGroup(), cpc), - Type: vo.JSON, - Content: defaultContent, - } - for _, cf := range cfs { - cf(¶m) - } - return param -} - -// NacosConfigDataId Get nacos DataId from environment variables -func NacosConfigDataId() string { +// nacosConfigDataId Get nacos DataId from environment variables +func nacosConfigDataId() string { dataId := os.Getenv(NACOS_ENV_CONFIG_DATA_ID) if len(dataId) == 0 { return NACOS_DEFAULT_DATA_ID @@ -95,8 +74,8 @@ func NacosConfigDataId() string { return dataId } -// NacosConfigGroup Get nacos config group from environment variables -func NacosConfigGroup() string { +// nacosConfigGroup Get nacos config group from environment variables +func nacosConfigGroup() string { configGroup := os.Getenv(NACOS_ENV_CONFIG_GROUP) if len(configGroup) == 0 { return NACOS_DEFAULT_CONFIG_GROUP @@ -104,8 +83,8 @@ func NacosConfigGroup() string { return configGroup } -// NacosPort Get Nacos port from environment variables -func NacosPort() int64 { +// nacosPort Get Nacos port from environment variables +func nacosPort() uint64 { portText := os.Getenv(NACOS_ENV_PORT) if len(portText) == 0 { return NACOS_DEFAULT_PORT @@ -115,11 +94,11 @@ func NacosPort() int64 { klog.Errorf("ParseInt failed,err:%s", err.Error()) return NACOS_DEFAULT_PORT } - return port + return uint64(port) } -// NacosAddr Get Nacos addr from environment variables -func NacosAddr() string { +// nacosAddr Get Nacos addr from environment variables +func nacosAddr() string { addr := os.Getenv(NACOS_ENV_SERVER_ADDR) if len(addr) == 0 { return NACOS_DEFAULT_SERVER_ADDR @@ -127,7 +106,7 @@ func NacosAddr() string { return addr } -// NacosNameSpaceId Get Nacos namespace id from environment variables -func NacosNameSpaceId() string { +// nacosNameSpaceId Get Nacos namespace id from environment variables +func nacosNameSpaceId() string { return os.Getenv(NACOS_ENV_NAMESPACE_ID) } diff --git a/nacos/env_test.go b/nacos/env_test.go index ee1a260..90e9559 100644 --- a/nacos/env_test.go +++ b/nacos/env_test.go @@ -17,27 +17,14 @@ package nacos import ( "testing" - "github.com/nacos-group/nacos-sdk-go/vo" "github.com/stretchr/testify/assert" ) // TestEnvFunc test env func func TestEnvFunc(t *testing.T) { - cpc := &ConfigParamConfig{ - Category: "retry", - ServerServiceName: "svc", - ClientServiceName: "cli", - } - - assert.Equal(t, int64(8848), NacosPort()) - assert.Equal(t, "127.0.0.1", NacosAddr()) - assert.Equal(t, "", NacosNameSpaceId()) - assert.Equal(t, vo.ConfigParam{ - Type: vo.JSON, - Group: NACOS_DEFAULT_CONFIG_GROUP, - Content: defaultContent, - DataId: "cli.svc.retry", - }, NacosConfigParam(cpc)) + assert.Equal(t, int64(8848), nacosPort()) + assert.Equal(t, "127.0.0.1", nacosAddr()) + assert.Equal(t, "", nacosNameSpaceId()) t.Setenv(NACOS_ENV_NAMESPACE_ID, "ns") t.Setenv(NACOS_ENV_SERVER_ADDR, "1.1.1.1") @@ -45,13 +32,7 @@ func TestEnvFunc(t *testing.T) { t.Setenv(NACOS_ENV_CONFIG_DATA_ID, "{{.ClientServiceName}}") t.Setenv(NACOS_ENV_CONFIG_GROUP, "{{.Category}}") - assert.Equal(t, int64(80), NacosPort()) - assert.Equal(t, "1.1.1.1", NacosAddr()) - assert.Equal(t, "ns", NacosNameSpaceId()) - assert.Equal(t, vo.ConfigParam{ - Type: vo.JSON, - Group: "retry", - Content: defaultContent, - DataId: "cli", - }, NacosConfigParam(cpc)) + assert.Equal(t, int64(80), nacosPort()) + assert.Equal(t, "1.1.1.1", nacosAddr()) + assert.Equal(t, "ns", nacosNameSpaceId()) } diff --git a/nacos/nacos.go b/nacos/nacos.go index 43b9fd4..7a46499 100644 --- a/nacos/nacos.go +++ b/nacos/nacos.go @@ -15,16 +15,22 @@ package nacos import ( + "bytes" + "strings" + "text/template" + "github.com/cloudwego/kitex/pkg/klog" "github.com/nacos-group/nacos-sdk-go/clients" "github.com/nacos-group/nacos-sdk-go/clients/config_client" "github.com/nacos-group/nacos-sdk-go/common/constant" + "github.com/nacos-group/nacos-sdk-go/common/logger" "github.com/nacos-group/nacos-sdk-go/vo" ) // Client the wrapper of nacos client. type Client interface { SetParser(ConfigParser) + NacosConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (vo.ConfigParam, error) RegisterConfigCallback(vo.ConfigParam, func(string, ConfigParser)) DeregisterConfig(vo.ConfigParam) error } @@ -32,21 +38,57 @@ type Client interface { type client struct { ncli config_client.IConfigClient // support customise parser - parser ConfigParser + parser ConfigParser + groupTemplate *template.Template + dataIDTemplate *template.Template +} + +// Options nacos config options. All the fields have default value. +type Options struct { + Address string + Port uint64 + NamespaceID string + RegionID string + Group string + DataIDFormat string + CustomLogger logger.Logger + ConfigParser ConfigParser } -// DefaultClient Create a default Nacos client +// New Create a default Nacos client // It can create a client with default config by env variable. // See: env.go -func DefaultClient() (Client, error) { +func New(opts Options) (Client, error) { + if opts.Address == "" { + opts.Address = nacosAddr() + } + if opts.Port == 0 { + opts.Port = nacosPort() + } + if opts.NamespaceID == "" { + opts.NamespaceID = nacosNameSpaceId() + } + if opts.CustomLogger == nil { + opts.CustomLogger = NewCustomNacosLogger() + } + if opts.ConfigParser == nil { + opts.ConfigParser = defaultConfigParse() + } + if opts.Group == "" { + opts.Group = nacosConfigGroup() + } + if opts.DataIDFormat == "" { + opts.DataIDFormat = nacosConfigDataId() + } + sc := []constant.ServerConfig{ - *constant.NewServerConfig(NacosAddr(), uint64(NacosPort())), + *constant.NewServerConfig(opts.Address, opts.Port), } cc := constant.ClientConfig{ - NamespaceId: NacosNameSpaceId(), - RegionId: NACOS_DEFAULT_REGIONID, + NamespaceId: opts.NamespaceID, + RegionId: opts.RegionID, NotLoadCacheAtStart: true, - CustomLogger: NewCustomNacosLogger(), + CustomLogger: opts.CustomLogger, } nacosClient, err := clients.NewConfigClient( vo.NacosClientParam{ @@ -57,9 +99,19 @@ func DefaultClient() (Client, error) { if err != nil { return nil, err } + groupTemplate, err := template.New("group").Parse(opts.Group) + if err != nil { + return nil, err + } + dataIDTemplate, err := template.New("dataID").Parse(opts.DataIDFormat) + if err != nil { + return nil, err + } c := &client{ - ncli: nacosClient, - parser: defaultConfigParse(), + ncli: nacosClient, + parser: opts.ConfigParser, + groupTemplate: groupTemplate, + dataIDTemplate: dataIDTemplate, } return c, nil } @@ -69,6 +121,56 @@ func (c *client) SetParser(parser ConfigParser) { c.parser = parser } +func (c *client) renderGroup(cpc *ConfigParamConfig) (string, error) { + var tpl bytes.Buffer + err := c.groupTemplate.Execute(&tpl, cpc) + if err != nil { + return "", err + } + return tpl.String(), nil +} + +func (c *client) renderDataID(cpc *ConfigParamConfig) (string, error) { + var tpl bytes.Buffer + err := c.dataIDTemplate.Execute(&tpl, cpc) + if err != nil { + return "", err + } + return tpl.String(), nil +} + +// NacosConfigParam Get nacos config from environment variables. All the parameters can be customized with CustomFunction. +// ConfigParam explain: +// 1. Type: data format, support JSON and YAML, JSON by default. Could extend it by implementing the ConfigParser interface. +// 2. Content: empty by default. Customize with CustomFunction. +// 3. Group: DEFAULT_GROUP by default. +// 4. DataId: {{.ClientServiceName}}.{{.ServerServiceName}}.{{.Category}} by default. Customize it by CustomFunction or +// use specified format. ref: nacos/env.go:46 +func (c *client) NacosConfigParam(cpc *ConfigParamConfig, cfs ...CustomFunction) (vo.ConfigParam, error) { + param := vo.ConfigParam{ + Type: vo.JSON, + Content: defaultContent, + } + var err error + param.DataId, err = c.renderDataID(cpc) + if err != nil { + return param, err + } + param.Group, err = c.renderGroup(cpc) + if err != nil { + return param, err + } + + // TODO trim the specified prefix string + param.DataId = strings.TrimPrefix(param.DataId, ".") + param.Group = strings.TrimPrefix(param.Group, ".") + + for _, cf := range cfs { + cf(¶m) + } + return param, nil +} + // DeregisterConfig deregister the config. func (c *client) DeregisterConfig(cfg vo.ConfigParam) error { return c.ncli.CancelListenConfig(cfg) diff --git a/server/limiter.go b/server/limiter.go index 63a806c..5e0b654 100644 --- a/server/limiter.go +++ b/server/limiter.go @@ -30,14 +30,16 @@ import ( func WithLimiter(dest string, nacosClient nacos.Client, cfs ...nacos.CustomFunction, ) server.Option { - param := nacos.NacosConfigParam(&nacos.ConfigParamConfig{ + param, err := nacosClient.NacosConfigParam(&nacos.ConfigParamConfig{ Category: limiterConfigName, ServerServiceName: dest, }, cfs...) + if err != nil { + panic(err) + } return server.WithLimit(initLimitOptions(param, dest, nacosClient)) } - func initLimitOptions(param vo.ConfigParam, dest string, nacosClient nacos.Client) *limit.Option { var updater atomic.Value opt := &limit.Option{}