Skip to content

Commit

Permalink
config: support online reload for some config (#396)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Nov 10, 2023
1 parent d394ee5 commit 39626dd
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 111 deletions.
5 changes: 2 additions & 3 deletions lib/config/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ type FrontendNamespace struct {
}

type BackendNamespace struct {
Instances []string `yaml:"instances" json:"instances" toml:"instances"`
SelectorType string `yaml:"selector-type" json:"selector-type" toml:"selector-type"`
Security TLSConfig `yaml:"security" json:"security" toml:"security"`
Instances []string `yaml:"instances" json:"instances" toml:"instances"`
Security TLSConfig `yaml:"security" json:"security" toml:"security"`
//HealthCheck HealthCheck `yaml:"health-check" json:"health-check" toml:"health-check"`
}

Expand Down
3 changes: 1 addition & 2 deletions lib/config/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ var testNamespaceConfig = Namespace{
},
},
Backend: BackendNamespace{
Instances: []string{"127.0.0.1:4000", "127.0.0.1:4001"},
SelectorType: "random",
Instances: []string{"127.0.0.1:4000", "127.0.0.1:4001"},
Security: TLSConfig{
CA: "t",
Cert: "t",
Expand Down
10 changes: 3 additions & 7 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type KeepAlive struct {
}

type ProxyServerOnline struct {
RequireBackendTLS bool `yaml:"require-backend-tls,omitempty" toml:"require-backend-tls,omitempty" json:"require-backend-tls,omitempty"`
MaxConnections uint64 `yaml:"max-connections,omitempty" toml:"max-connections,omitempty" json:"max-connections,omitempty"`
ConnBufferSize int `yaml:"conn-buffer-size,omitempty" toml:"conn-buffer-size,omitempty" json:"conn-buffer-size,omitempty"`
FrontendKeepalive KeepAlive `yaml:"frontend-keepalive" toml:"frontend-keepalive" json:"frontend-keepalive"`
Expand All @@ -62,17 +63,12 @@ type ProxyServerOnline struct {
type ProxyServer struct {
Addr string `yaml:"addr,omitempty" toml:"addr,omitempty" json:"addr,omitempty"`
PDAddrs string `yaml:"pd-addrs,omitempty" toml:"pd-addrs,omitempty" json:"pd-addrs,omitempty"`
ServerVersion string `yaml:"server-version,omitempty" toml:"server-version,omitempty" json:"server-version,omitempty"`
RequireBackendTLS bool `yaml:"require-backend-tls,omitempty" toml:"require-backend-tls,omitempty" json:"require-backend-tls,omitempty"`
ProxyServerOnline `yaml:",inline" toml:",inline" json:",inline"`
}

type API struct {
Addr string `yaml:"addr,omitempty" toml:"addr,omitempty" json:"addr,omitempty"`
User string `yaml:"user,omitempty" toml:"user,omitempty" json:"user,omitempty"`
Password string `yaml:"password,omitempty" toml:"password,omitempty" json:"password,omitempty"`
EnableBasicAuth bool `yaml:"enable-basic-auth,omitempty" toml:"enable-basic-auth,omitempty" json:"enable-basic-auth,omitempty"`
ProxyProtocol string `yaml:"proxy-protocol,omitempty" toml:"proxy-protocol,omitempty" json:"proxy-protocol,omitempty"`
Addr string `yaml:"addr,omitempty" toml:"addr,omitempty" json:"addr,omitempty"`
ProxyProtocol string `yaml:"proxy-protocol,omitempty" toml:"proxy-protocol,omitempty" json:"proxy-protocol,omitempty"`
}

type Advance struct {
Expand Down
11 changes: 4 additions & 7 deletions lib/config/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ var testProxyConfig = Config{
IgnoreWrongNamespace: true,
},
Proxy: ProxyServer{
Addr: "0.0.0.0:4000",
PDAddrs: "127.0.0.1:4089",
RequireBackendTLS: true,
Addr: "0.0.0.0:4000",
PDAddrs: "127.0.0.1:4089",
ProxyServerOnline: ProxyServerOnline{
RequireBackendTLS: true,
MaxConnections: 1,
FrontendKeepalive: KeepAlive{Enabled: true},
ProxyProtocol: "v2",
Expand All @@ -30,10 +30,7 @@ var testProxyConfig = Config{
},
},
API: API{
Addr: "0.0.0.0:3080",
EnableBasicAuth: false,
User: "user",
Password: "pwd",
Addr: "0.0.0.0:3080",
},
Metrics: Metrics{
MetricsAddr: "127.0.0.1:9021",
Expand Down
85 changes: 57 additions & 28 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"time"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/lib/util/systimemon"
"github.com/pingcap/tiproxy/lib/util/waitgroup"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -52,12 +53,12 @@ func NewMetricsManager() *MetricsManager {
var registerOnce = &sync.Once{}

// Init registers metrics and pushes metrics to prometheus.
func (mm *MetricsManager) Init(ctx context.Context, logger *zap.Logger, metricsAddr string, metricsInterval uint, proxyAddr string) {
func (mm *MetricsManager) Init(ctx context.Context, logger *zap.Logger, proxyAddr string, cfg config.Metrics, cfgch <-chan *config.Config) {
mm.logger = logger
registerOnce.Do(registerProxyMetrics)
ctx, mm.cancel = context.WithCancel(ctx)
mm.setupMonitor(ctx)
mm.pushMetric(ctx, metricsAddr, time.Duration(metricsInterval)*time.Second, proxyAddr)
mm.pushMetric(ctx, proxyAddr, cfg, cfgch)
}

// Close stops all goroutines.
Expand Down Expand Up @@ -89,17 +90,64 @@ func (mm *MetricsManager) setupMonitor(ctx context.Context) {
}

// pushMetric pushes metrics in background.
func (mm *MetricsManager) pushMetric(ctx context.Context, addr string, interval time.Duration, proxyAddr string) {
if interval == time.Duration(0) || len(addr) == 0 {
mm.logger.Info("disable Prometheus push client")
return
}
mm.logger.Info("start prometheus push client", zap.String("server addr", addr), zap.String("interval", interval.String()))
func (mm *MetricsManager) pushMetric(ctx context.Context, proxyAddr string, cfg config.Metrics, cfgch <-chan *config.Config) {
mm.wg.Run(func() {
prometheusPushClient(ctx, mm.logger, addr, interval, proxyAddr)
proxyInstance := instanceName(proxyAddr)
addr := cfg.MetricsAddr
interval := time.Duration(cfg.MetricsInterval) * time.Second
pusher := mm.buildPusher(addr, interval, proxyInstance)

for ctx.Err() == nil {
select {
case newCfg := <-cfgch:
if newCfg == nil {
return
}
interval = time.Duration(newCfg.Metrics.MetricsInterval) * time.Second
if addr != newCfg.Metrics.MetricsAddr {
addr = newCfg.Metrics.MetricsAddr
pusher = mm.buildPusher(addr, interval, proxyInstance)
}
default:
}

// Wait until the config is legal.
if interval == 0 || pusher == nil {
select {
case <-time.After(time.Second):
continue
case <-ctx.Done():
return
}
}

if err := pusher.Push(); err != nil {
mm.logger.Error("could not push metrics to prometheus pushgateway", zap.Error(err))
}
select {
case <-time.After(interval):
case <-ctx.Done():
return
}
}
})
}

func (mm *MetricsManager) buildPusher(addr string, interval time.Duration, proxyInstance string) *push.Pusher {
var pusher *push.Pusher
if len(addr) > 0 {
// Create a new pusher when the address changes.
mm.logger.Info("start prometheus push client", zap.String("server addr", addr), zap.Stringer("interval", interval))
pusher = push.New(addr, "tiproxy")
pusher = pusher.Gatherer(prometheus.DefaultGatherer)
pusher = pusher.Grouping("instance", proxyInstance)
} else {
mm.logger.Info("disable prometheus push client")
pusher = nil
}
return pusher
}

// registerProxyMetrics registers metrics.
func registerProxyMetrics() {
prometheus.DefaultRegisterer.Unregister(collectors.NewGoCollector())
Expand All @@ -122,25 +170,6 @@ func registerProxyMetrics() {
prometheus.MustRegister(MigrateDurationHistogram)
}

// prometheusPushClient pushes metrics to Prometheus Pushgateway.
func prometheusPushClient(ctx context.Context, logger *zap.Logger, addr string, interval time.Duration, proxyAddr string) {
job := "tiproxy"
pusher := push.New(addr, job)
pusher = pusher.Gatherer(prometheus.DefaultGatherer)
pusher = pusher.Grouping("instance", instanceName(proxyAddr))
for ctx.Err() == nil {
err := pusher.Push()
if err != nil {
logger.Error("could not push metrics to prometheus pushgateway", zap.String("err", err.Error()))
}
select {
case <-time.After(interval):
case <-ctx.Done():
return
}
}
}

func instanceName(proxyAddr string) string {
hostname, err := os.Hostname()
if err != nil {
Expand Down
81 changes: 49 additions & 32 deletions pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"
"time"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/lib/util/logger"
"github.com/stretchr/testify/require"
)
Expand All @@ -21,64 +22,80 @@ import (
func TestPushMetrics(t *testing.T) {
proxyAddr := "0.0.0.0:6000"
labelName := fmt.Sprintf("%s_%s_maxprocs", ModuleProxy, LabelServer)
hostname, err := os.Hostname()
require.NoError(t, err)
expectedPath := fmt.Sprintf("/metrics/job/tiproxy/instance/%s_6000", hostname)
bodyCh := make(chan string)
pgwOK := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
bodyCh <- string(body)
require.Equal(t, expectedPath, r.URL.EscapedPath())
w.Header().Set("Content-Type", `text/plain; charset=utf-8`)
w.WriteHeader(http.StatusOK)
}),
)
defer pgwOK.Close()
bodyCh1, bodyCh2 := make(chan string), make(chan string)
pgwOK1, pgwOK2 := setupServer(t, bodyCh1), setupServer(t, bodyCh2)
log, _ := logger.CreateLoggerForTest(t)

tests := []struct {
metricsAddr string
metricsInterval uint
pushed bool
pushedCh chan string
}{
{
metricsAddr: pgwOK.URL,
metricsAddr: pgwOK1.URL,
metricsInterval: 1,
pushed: true,
pushedCh: bodyCh1,
},
{
metricsAddr: "",
metricsAddr: pgwOK1.URL,
metricsInterval: 0,
pushedCh: nil,
},
{
metricsAddr: pgwOK2.URL,
metricsInterval: 1,
pushed: false,
pushedCh: bodyCh2,
},
{
metricsAddr: pgwOK.URL,
metricsInterval: 0,
pushed: false,
metricsAddr: "",
metricsInterval: 1,
pushedCh: nil,
},
}
mm := NewMetricsManager()
cfgCh := make(chan *config.Config, 1)
mm.Init(context.Background(), log, proxyAddr, config.Metrics{}, cfgCh)
for _, tt := range tests {
for len(bodyCh) > 0 {
<-bodyCh
cfgCh <- &config.Config{
Metrics: config.Metrics{
MetricsAddr: tt.metricsAddr,
MetricsInterval: tt.metricsInterval,
},
}
mm := NewMetricsManager()
mm.Init(context.Background(), log, tt.metricsAddr, tt.metricsInterval, proxyAddr)
if tt.pushed {
if tt.pushedCh != nil {
select {
case body := <-bodyCh:
case body := <-tt.pushedCh:
require.Contains(t, body, labelName)
case <-time.After(2 * time.Second):
t.Fatal("not pushed")
}
} else {
select {
case <-bodyCh:
t.Fatal("pushed")
case <-bodyCh1:
t.Fatal("pushed 1")
case <-bodyCh2:
t.Fatal("pushed 2")
case <-time.After(2 * time.Second):
}
}
mm.Close()
}
mm.Close()
}

func setupServer(t *testing.T, bodyCh chan string) *httptest.Server {
hostname, err := os.Hostname()
require.NoError(t, err)
expectedPath := fmt.Sprintf("/metrics/job/tiproxy/instance/%s_6000", hostname)
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
bodyCh <- string(body)
require.Equal(t, expectedPath, r.URL.EscapedPath())
w.Header().Set("Content-Type", `text/plain; charset=utf-8`)
w.WriteHeader(http.StatusOK)
}),
)
t.Cleanup(server.Close)
return server
}
11 changes: 3 additions & 8 deletions pkg/proxy/backend/handshake_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,12 @@ type HandshakeHandler interface {
}

type DefaultHandshakeHandler struct {
nsManager *namespace.NamespaceManager
serverVersion string
nsManager *namespace.NamespaceManager
}

func NewDefaultHandshakeHandler(nsManager *namespace.NamespaceManager, serverVersion string) *DefaultHandshakeHandler {
func NewDefaultHandshakeHandler(nsManager *namespace.NamespaceManager) *DefaultHandshakeHandler {
return &DefaultHandshakeHandler{
nsManager: nsManager,
serverVersion: serverVersion,
nsManager: nsManager,
}
}

Expand Down Expand Up @@ -128,9 +126,6 @@ func (handler *DefaultHandshakeHandler) GetCapability() pnet.Capability {
}

func (handler *DefaultHandshakeHandler) GetServerVersion() string {
if len(handler.serverVersion) > 0 {
return handler.serverVersion
}
// TiProxy sends the server version before getting the router, so we don't know which router to get.
// Just get the default one.
if ns, ok := handler.nsManager.GetNamespace("default"); ok {
Expand Down
Loading

0 comments on commit 39626dd

Please sign in to comment.