Skip to content

Commit

Permalink
use Addr
Browse files Browse the repository at this point in the history
Signed-off-by: xhe <[email protected]>
  • Loading branch information
xhebox committed Nov 3, 2023
1 parent 4f218b5 commit b05fb26
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 25 deletions.
10 changes: 4 additions & 6 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,10 @@ type ProxyServerOnline struct {
}

type ProxyServer struct {
Addr string `yaml:"addr,omitempty" toml:"addr,omitempty" json:"addr,omitempty"`
// Addrs will attach server to additional addrs.
Addrs []string `yaml:"addrs,omitempty" toml:"addrs,omitempty" json:"addrs,omitempty"`
PDAddrs string `yaml:"pd-addrs,omitempty" toml:"pd-addrs,omitempty" json:"pd-addrs,omitempty"`
ServerVersion string `yaml:"server-version,omitempty" toml:"server-version,omitempty" json:"server-version,omitempty"`
RequireBackendTLS bool `yaml:"require-backend-tls,omitempty" toml:"require-backend-tls,omitempty" json:"require-backend-tls,omitempty"`
Addr string `yaml:"addr,omitempty" toml:"addr,omitempty" json:"addr,omitempty"`
PDAddrs string `yaml:"pd-addrs,omitempty" toml:"pd-addrs,omitempty" json:"pd-addrs,omitempty"`
ServerVersion string `yaml:"server-version,omitempty" toml:"server-version,omitempty" json:"server-version,omitempty"`
RequireBackendTLS bool `yaml:"require-backend-tls,omitempty" toml:"require-backend-tls,omitempty" json:"require-backend-tls,omitempty"`
ProxyServerOnline `yaml:",inline" toml:",inline" json:",inline"`
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/manager/config/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,17 @@ func (e *ConfigManager) Close() error {
e.cancel()
e.cancel = nil
}
if e.wch != nil {
wcherr = e.wch.Close()
e.wch = nil
}
e.sts.Lock()
for _, ch := range e.sts.listeners {
close(ch)
}
e.sts.listeners = nil
e.sts.Unlock()
e.wg.Wait()
// close after all goroutines are done
if e.wch != nil {
wcherr = e.wch.Close()
e.wch = nil
}
return wcherr
}
3 changes: 2 additions & 1 deletion pkg/manager/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ func (is *InfoSyncer) getTopologyInfo(cfg *config.Config) (*TopologyInfo, error)
s = ""
}
dir := path.Dir(s)
ip, port, err := net.SplitHostPort(cfg.Proxy.Addr)
addrs := strings.Split(cfg.Proxy.Addr, ",")
ip, port, err := net.SplitHostPort(addrs[0])
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/grafana/tiproxy_summary.json
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@
"expr": "tiproxy_server_connections{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{instance}}",
"legendFormat": "{{instance}} | {{addr}}",
"refId": "A"
},
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/grafana/tiproxy_summary.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ local connectionP = graphPanel.new(
.addTarget(
prometheus.target(
'tiproxy_server_connections{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance"}',
legendFormat='{{instance}}',
legendFormat='{{instance}} | {{addr}}',
)
)
.addTarget(
Expand Down
5 changes: 3 additions & 2 deletions pkg/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@ import (

const (
LblType = "type"
LblAddr = "addr"

EventStart = "start"
EventClose = "close"
)

var (
ConnGauge = prometheus.NewGauge(
ConnGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: ModuleProxy,
Subsystem: LabelServer,
Name: "connections",
Help: "Number of connections.",
})
}, []string{LblAddr})

MaxProcsGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Expand Down
17 changes: 9 additions & 8 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package proxy

import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -66,10 +68,8 @@ func NewSQLServer(logger *zap.Logger, cfg config.ProxyServer, certMgr *cert.Cert

s.reset(&cfg.ProxyServerOnline)

s.addrs = make([]string, 0, len(cfg.Addrs)+1)
s.addrs = append(s.addrs, cfg.Addr)
s.addrs = append(s.addrs, cfg.Addrs...)

s.addrs = strings.Split(cfg.Addr, ",")
fmt.Printf("xhe %s\n", s.addrs)
s.listeners = make([]net.Listener, len(s.addrs))
for i, addr := range s.addrs {
s.listeners[i], err = net.Listen("tcp", addr)
Expand Down Expand Up @@ -113,13 +113,14 @@ func (s *SQLServer) Run(ctx context.Context, cfgch <-chan *config.Config) {
})

for i := range s.listeners {
j := i
s.wg.Run(func() {
for {
select {
case <-ctx.Done():
return
default:
conn, err := s.listeners[i].Accept()
conn, err := s.listeners[j].Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
return
Expand All @@ -130,7 +131,7 @@ func (s *SQLServer) Run(ctx context.Context, cfgch <-chan *config.Config) {
}

s.wg.Run(func() {
util.WithRecovery(func() { s.onConn(ctx, conn, s.addrs[i]) }, nil, s.logger)
util.WithRecovery(func() { s.onConn(ctx, conn, s.addrs[j]) }, nil, s.logger)
})
}
}
Expand Down Expand Up @@ -172,7 +173,7 @@ func (s *SQLServer) onConn(ctx context.Context, conn net.Conn, addr string) {
s.mu.Unlock()

logger.Info("new connection")
metrics.ConnGauge.Inc()
metrics.ConnGauge.WithLabelValues(addr).Inc()

defer func() {
s.mu.Lock()
Expand All @@ -184,7 +185,7 @@ func (s *SQLServer) onConn(ctx context.Context, conn net.Conn, addr string) {
} else {
logger.Info("connection closed")
}
metrics.ConnGauge.Dec()
metrics.ConnGauge.WithLabelValues(addr).Dec()
}()

if err := keepalive.SetKeepalive(conn, config.KeepAlive{Enabled: tcpKeepAlive}); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ func TestGracefulShutdown(t *testing.T) {
}
}

func TestMultiPorts(t *testing.T) {
func TestMultiAddr(t *testing.T) {
lg, _ := logger.CreateLoggerForTest(t)
certManager := cert.NewCertManager()
err := certManager.Init(&config.Config{}, lg, nil)
require.NoError(t, err)
server, err := NewSQLServer(lg, config.ProxyServer{
Addrs: []string{"0.0.0.0:0"},
Addr: "0.0.0.0:0,0.0.0.0:0",
}, certManager, &panicHsHandler{})
require.NoError(t, err)
server.Run(context.Background(), nil)
Expand Down

0 comments on commit b05fb26

Please sign in to comment.