Skip to content

Commit

Permalink
Merge pull request #20 from configcat/grpc-status-metrics-improvements
Browse files Browse the repository at this point in the history
gRPC / status / metrics improvements
  • Loading branch information
z4kn4fein authored Mar 14, 2024
2 parents 95e8a28 + 824c85e commit 27566ea
Show file tree
Hide file tree
Showing 52 changed files with 1,795 additions and 379 deletions.
47 changes: 44 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"fmt"
"github.com/configcat/configcat-proxy/log"
"github.com/configcat/configcat-proxy/model"
"google.golang.org/grpc/keepalive"
"gopkg.in/yaml.v3"
"os"
"path"
"path/filepath"
"regexp"
"runtime"
"time"
)

const defaultConfigName = "options.yml"
Expand Down Expand Up @@ -58,9 +60,20 @@ type SDKConfig struct {
}

type GrpcConfig struct {
Enabled bool `yaml:"enabled"`
Port int `yaml:"port"`
Log LogConfig
Enabled bool `yaml:"enabled"`
Port int `yaml:"port"`
ServerReflectionEnabled bool `yaml:"server_reflection_enabled"`
HealthCheckEnabled bool `yaml:"health_check_enabled"`
KeepAlive KeepAliveConfig `yaml:"keep_alive"`
Log LogConfig
}

type KeepAliveConfig struct {
MaxConnectionIdle int `yaml:"max_connection_idle"`
MaxConnectionAge int `yaml:"max_connection_age"`
MaxConnectionAgeGrace int `yaml:"max_connection_age_grace"`
Time int `yaml:"time"`
Timeout int `yaml:"timeout"`
}

type SseConfig struct {
Expand All @@ -77,6 +90,7 @@ type CertConfig struct {
}

type HttpConfig struct {
Enabled bool
Port int `yaml:"port"`
CdnProxy CdnProxyConfig `yaml:"cdn_proxy"`
Log LogConfig
Expand Down Expand Up @@ -250,9 +264,12 @@ func (t *TlsConfig) GetVersion() uint16 {

func (c *Config) setDefaults() {
c.Http.Port = 8050
c.Http.Enabled = true

c.Grpc.Enabled = true
c.Grpc.Port = 50051
c.Grpc.HealthCheckEnabled = true
c.Grpc.ServerReflectionEnabled = false

c.Diag.Enabled = true
c.Diag.Status.Enabled = true
Expand Down Expand Up @@ -380,6 +397,30 @@ func (c *CORSConfig) compileRegexes() error {
return nil
}

func (k *KeepAliveConfig) ToParams() (keepalive.ServerParameters, bool) {
if k.MaxConnectionIdle == 0 && k.MaxConnectionAge == 0 && k.MaxConnectionAgeGrace == 0 &&
k.Time == 0 && k.Timeout == 0 {
return keepalive.ServerParameters{}, false
}
param := keepalive.ServerParameters{}
if k.MaxConnectionIdle != 0 {
param.MaxConnectionIdle = time.Duration(k.MaxConnectionIdle) * time.Second
}
if k.MaxConnectionAge != 0 {
param.MaxConnectionAge = time.Duration(k.MaxConnectionAge) * time.Second
}
if k.MaxConnectionAgeGrace != 0 {
param.MaxConnectionAgeGrace = time.Duration(k.MaxConnectionAgeGrace) * time.Second
}
if k.Time != 0 {
param.Time = time.Duration(k.Time) * time.Second
}
if k.Timeout != 0 {
param.Timeout = time.Duration(k.Timeout) * time.Second
}
return param, true
}

func defaultConfigPath() (string, bool) {
switch runtime.GOOS {
case "windows":
Expand Down
43 changes: 43 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)

func TestConfig_Defaults(t *testing.T) {
conf, err := LoadConfigFromFileAndEnvironment("")
require.NoError(t, err)

assert.Equal(t, 8050, conf.Http.Port)
assert.True(t, conf.Http.Enabled)

assert.Equal(t, 50051, conf.Grpc.Port)
assert.True(t, conf.Grpc.Enabled)
assert.True(t, conf.Grpc.HealthCheckEnabled)
assert.False(t, conf.Grpc.ServerReflectionEnabled)

assert.Equal(t, 8051, conf.Diag.Port)
assert.True(t, conf.Diag.Enabled)
Expand Down Expand Up @@ -400,6 +404,7 @@ diag:
func TestHttpConfig_YAML(t *testing.T) {
utils.UseTempFile(`
http:
enabled: true
port: 8090
log:
level: "info"
Expand Down Expand Up @@ -445,6 +450,7 @@ http:
conf, err := LoadConfigFromFileAndEnvironment(file)
require.NoError(t, err)

assert.True(t, conf.Http.Enabled)
assert.Equal(t, log.Info, conf.Http.Log.GetLevel())
assert.Equal(t, 8090, conf.Http.Port)
assert.True(t, conf.Http.Webhook.Enabled)
Expand Down Expand Up @@ -546,6 +552,14 @@ func TestGrpcConfig_YAML(t *testing.T) {
grpc:
enabled: true
port: 8060
server_reflection_enabled: true
health_check_enabled: false
keep_alive:
max_connection_idle: 1
max_connection_age: 2
max_connection_age_grace: 3
time: 4
timeout: 5
log:
level: "error"
`, func(file string) {
Expand All @@ -555,6 +569,13 @@ grpc:
assert.Equal(t, log.Error, conf.Grpc.Log.GetLevel())
assert.Equal(t, 8060, conf.Grpc.Port)
assert.True(t, conf.Grpc.Enabled)
assert.True(t, conf.Grpc.ServerReflectionEnabled)
assert.False(t, conf.Grpc.HealthCheckEnabled)
assert.Equal(t, 1, conf.Grpc.KeepAlive.MaxConnectionIdle)
assert.Equal(t, 2, conf.Grpc.KeepAlive.MaxConnectionAge)
assert.Equal(t, 3, conf.Grpc.KeepAlive.MaxConnectionAgeGrace)
assert.Equal(t, 4, conf.Grpc.KeepAlive.Time)
assert.Equal(t, 5, conf.Grpc.KeepAlive.Timeout)
})
}

Expand Down Expand Up @@ -591,3 +612,25 @@ default_user_attributes:
assert.Equal(t, []string{"a", "b"}, conf.DefaultAttrs["attr6"])
})
}

func TestGrpcConfig_KeepAlive(t *testing.T) {
conf := KeepAliveConfig{MaxConnectionIdle: 1, MaxConnectionAge: 2, MaxConnectionAgeGrace: 3, Time: 4, Timeout: 5}
param, ok := conf.ToParams()

assert.True(t, ok)
assert.Equal(t, 1*time.Second, param.MaxConnectionIdle)
assert.Equal(t, 2*time.Second, param.MaxConnectionAge)
assert.Equal(t, 3*time.Second, param.MaxConnectionAgeGrace)
assert.Equal(t, 4*time.Second, param.Time)
assert.Equal(t, 5*time.Second, param.Timeout)

conf = KeepAliveConfig{MaxConnectionIdle: 1}
param, ok = conf.ToParams()

assert.True(t, ok)
assert.Equal(t, 1*time.Second, param.MaxConnectionIdle)
assert.Equal(t, time.Duration(0), param.MaxConnectionAge)
assert.Equal(t, time.Duration(0), param.MaxConnectionAgeGrace)
assert.Equal(t, time.Duration(0), param.Time)
assert.Equal(t, time.Duration(0), param.Timeout)
}
29 changes: 29 additions & 0 deletions config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (s *SDKConfig) loadEnv(prefix string) error {

func (h *HttpConfig) loadEnv(prefix string) error {
prefix = concatPrefix(prefix, "HTTP")
if err := readEnv(prefix, "ENABLED", &h.Enabled, toBool); err != nil {
return err
}
if err := readEnv(prefix, "PORT", &h.Port, toInt); err != nil {
return err
}
Expand Down Expand Up @@ -147,9 +150,35 @@ func (g *GrpcConfig) loadEnv(prefix string) error {
if err := readEnv(prefix, "PORT", &g.Port, toInt); err != nil {
return err
}
if err := readEnv(prefix, "HEALTH_CHECK_ENABLED", &g.HealthCheckEnabled, toBool); err != nil {
return err
}
if err := readEnv(prefix, "SERVER_REFLECTION_ENABLED", &g.ServerReflectionEnabled, toBool); err != nil {
return err
}
if err := g.KeepAlive.loadEnv(prefix); err != nil {
return err
}
return g.Log.loadEnv(prefix)
}

func (k *KeepAliveConfig) loadEnv(prefix string) error {
prefix = concatPrefix(prefix, "KEEP_ALIVE")
if err := readEnv(prefix, "MAX_CONNECTION_IDLE", &k.MaxConnectionIdle, toInt); err != nil {
return err
}
if err := readEnv(prefix, "MAX_CONNECTION_AGE", &k.MaxConnectionAge, toInt); err != nil {
return err
}
if err := readEnv(prefix, "MAX_CONNECTION_AGE_GRACE", &k.MaxConnectionAgeGrace, toInt); err != nil {
return err
}
if err := readEnv(prefix, "TIME", &k.Time, toInt); err != nil {
return err
}
return readEnv(prefix, "TIMEOUT", &k.Timeout, toInt)
}

func (h *HttpProxyConfig) loadEnv(prefix string) error {
prefix = concatPrefix(prefix, "HTTP_PROXY")
readEnvString(prefix, "URL", &h.Url)
Expand Down
16 changes: 16 additions & 0 deletions config/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func TestGlobalOfflineConfig_ENV(t *testing.T) {

func TestHttpConfig_ENV(t *testing.T) {
t.Setenv("CONFIGCAT_HTTP_PORT", "8090")
t.Setenv("CONFIGCAT_HTTP_ENABLED", "true")
t.Setenv("CONFIGCAT_HTTP_LOG_LEVEL", "info")
t.Setenv("CONFIGCAT_HTTP_WEBHOOK_ENABLED", "true")
t.Setenv("CONFIGCAT_HTTP_WEBHOOK_AUTH_USER", "mickey")
Expand All @@ -189,6 +190,7 @@ func TestHttpConfig_ENV(t *testing.T) {

assert.Equal(t, log.Info, conf.Http.Log.GetLevel())
assert.Equal(t, 8090, conf.Http.Port)
assert.True(t, conf.Http.Enabled)
assert.True(t, conf.Http.Webhook.Enabled)
assert.Equal(t, "mickey", conf.Http.Webhook.Auth.User)
assert.Equal(t, "pass", conf.Http.Webhook.Auth.Password)
Expand Down Expand Up @@ -257,13 +259,27 @@ func TestGrpcConfig_ENV(t *testing.T) {
t.Setenv("CONFIGCAT_GRPC_PORT", "8060")
t.Setenv("CONFIGCAT_GRPC_LOG_LEVEL", "error")
t.Setenv("CONFIGCAT_GRPC_ENABLED", "true")
t.Setenv("CONFIGCAT_GRPC_HEALTH_CHECK_ENABLED", "false")
t.Setenv("CONFIGCAT_GRPC_SERVER_REFLECTION_ENABLED", "true")
t.Setenv("CONFIGCAT_GRPC_KEEP_ALIVE_MAX_CONNECTION_IDLE", "1")
t.Setenv("CONFIGCAT_GRPC_KEEP_ALIVE_MAX_CONNECTION_AGE", "2")
t.Setenv("CONFIGCAT_GRPC_KEEP_ALIVE_MAX_CONNECTION_AGE_GRACE", "3")
t.Setenv("CONFIGCAT_GRPC_KEEP_ALIVE_TIME", "4")
t.Setenv("CONFIGCAT_GRPC_KEEP_ALIVE_TIMEOUT", "5")

conf, err := LoadConfigFromFileAndEnvironment("")
require.NoError(t, err)

assert.Equal(t, log.Error, conf.Grpc.Log.GetLevel())
assert.Equal(t, 8060, conf.Grpc.Port)
assert.True(t, conf.Grpc.Enabled)
assert.True(t, conf.Grpc.ServerReflectionEnabled)
assert.False(t, conf.Grpc.HealthCheckEnabled)
assert.Equal(t, 1, conf.Grpc.KeepAlive.MaxConnectionIdle)
assert.Equal(t, 2, conf.Grpc.KeepAlive.MaxConnectionAge)
assert.Equal(t, 3, conf.Grpc.KeepAlive.MaxConnectionAgeGrace)
assert.Equal(t, 4, conf.Grpc.KeepAlive.Time)
assert.Equal(t, 5, conf.Grpc.KeepAlive.Timeout)
}

func TestHttpProxyConfig_ENV(t *testing.T) {
Expand Down
56 changes: 40 additions & 16 deletions diag/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,37 @@ import (
type Reporter interface {
IncrementConnection(sdkId string, streamType string, flag string)
DecrementConnection(sdkId string, streamType string, flag string)
AddSentMessageCount(count int, sdkId string, streamType string, flag string)

HttpHandler() http.Handler
}

type reporter struct {
registry *prometheus.Registry
responseTime *prometheus.HistogramVec
sdkResponseTime *prometheus.HistogramVec
connections *prometheus.GaugeVec
registry *prometheus.Registry
httpResponseTime *prometheus.HistogramVec
grpcResponseTime *prometheus.HistogramVec
sdkResponseTime *prometheus.HistogramVec
connections *prometheus.GaugeVec
streamMessageSent *prometheus.CounterVec
}

func NewReporter() Reporter {
reg := prometheus.NewRegistry()

respTime := prometheus.NewHistogramVec(prometheus.HistogramOpts{
httpRespTime := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "configcat",
Name: "http_request_duration_seconds",
Help: "Histogram of Proxy HTTP response time in seconds.",
Buckets: prometheus.DefBuckets,
}, []string{"route", "method", "status"})

grpcRespTime := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "configcat",
Name: "grpc_rpc_duration_seconds",
Help: "Histogram of RPC response latency in seconds.",
Buckets: prometheus.DefBuckets,
}, []string{"method", "code"})

sdkRespTime := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "configcat",
Name: "sdk_http_request_duration_seconds",
Expand All @@ -44,30 +54,44 @@ func NewReporter() Reporter {
Help: "Number of active client connections per stream.",
}, []string{"sdk", "type", "flag"})

streamMessageSent := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "configcat",
Name: "stream_msg_sent_total",
Help: "Total number of stream messages sent by the server.",
}, []string{"sdk", "type", "flag"})

reg.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
respTime,
httpRespTime,
grpcRespTime,
sdkRespTime,
connections,
streamMessageSent,
)

return &reporter{
registry: reg,
responseTime: respTime,
sdkResponseTime: sdkRespTime,
connections: connections,
registry: reg,
httpResponseTime: httpRespTime,
grpcResponseTime: grpcRespTime,
sdkResponseTime: sdkRespTime,
connections: connections,
streamMessageSent: streamMessageSent,
}
}

func (h *reporter) HttpHandler() http.Handler {
return promhttp.HandlerFor(h.registry, promhttp.HandlerOpts{Registry: h.registry})
func (r *reporter) HttpHandler() http.Handler {
return promhttp.HandlerFor(r.registry, promhttp.HandlerOpts{Registry: r.registry})
}

func (r *reporter) IncrementConnection(sdkId string, streamType string, flag string) {
r.connections.WithLabelValues(sdkId, streamType, flag).Inc()
}

func (h *reporter) IncrementConnection(sdkId string, streamType string, flag string) {
h.connections.WithLabelValues(sdkId, streamType, flag).Inc()
func (r *reporter) DecrementConnection(sdkId string, streamType string, flag string) {
r.connections.WithLabelValues(sdkId, streamType, flag).Dec()
}

func (h *reporter) DecrementConnection(sdkId string, streamType string, flag string) {
h.connections.WithLabelValues(sdkId, streamType, flag).Dec()
func (r *reporter) AddSentMessageCount(count int, sdkId string, streamType string, flag string) {
r.streamMessageSent.WithLabelValues(sdkId, streamType, flag).Add(float64(count))
}
7 changes: 7 additions & 0 deletions diag/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@ func TestConnection(t *testing.T) {
handler.IncrementConnection("test", "t2", "n1")
handler.IncrementConnection("test", "t2", "n1")

handler.AddSentMessageCount(1, "test", "t1", "n1")
handler.AddSentMessageCount(4, "test", "t2", "n1")

assert.Equal(t, 2, testutil.CollectAndCount(handler.connections))
assert.Equal(t, 2, testutil.CollectAndCount(handler.streamMessageSent))

assert.Equal(t, float64(2), testutil.ToFloat64(handler.connections.WithLabelValues("test", "t1", "n1")))
assert.Equal(t, float64(3), testutil.ToFloat64(handler.connections.WithLabelValues("test", "t2", "n1")))

assert.Equal(t, float64(1), testutil.ToFloat64(handler.streamMessageSent.WithLabelValues("test", "t1", "n1")))
assert.Equal(t, float64(4), testutil.ToFloat64(handler.streamMessageSent.WithLabelValues("test", "t2", "n1")))

handler.DecrementConnection("test", "t1", "n1")
handler.DecrementConnection("test", "t2", "n1")
handler.DecrementConnection("test", "t2", "n1")
Expand Down
Loading

0 comments on commit 27566ea

Please sign in to comment.