Skip to content

Commit

Permalink
graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 committed Nov 15, 2023
1 parent b8ce9c0 commit d53bf9a
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 37 deletions.
12 changes: 10 additions & 2 deletions conf/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@
# "v2" => accept proxy protocol if any, require backends to support proxy protocol.
# proxy-protocol = ""

# graceful-wait-before-shutdown is recommanded to be set to 0 when there's no other proxy(e.g. NLB) between the client and TiProxy.
# possible values:
# 0 => disable graceful shutdown.
# 30 => graceful shutdown waiting time in 30 seconds.
# 0 => begin to drain clients immediately.
# 30 => HTTP status returns unhealthy and the SQL port accepts new connections for the last 30 seconds. After that, refuse new connections and drain clients.
# graceful-wait-before-shutdown = 0

# graceful-close-conn-timeout is recommanded to be set longer than the lifecycle of a transaction.
# possible values:
# 0 => force closing connections immediately.
# 15 => close connections when they have finished current transactions (AKA drain clients). After 15s, force closing all the connections.

graceful-close-conn-timeout = 15

# possible values:
# "" => enable static routing.
# "pd-addr:pd-port" => automatically tidb discovery.
Expand Down
2 changes: 2 additions & 0 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type ProxyServerOnline struct {
BackendUnhealthyKeepalive KeepAlive `yaml:"backend-unhealthy-keepalive" toml:"backend-unhealthy-keepalive" json:"backend-unhealthy-keepalive"`
ProxyProtocol string `yaml:"proxy-protocol,omitempty" toml:"proxy-protocol,omitempty" json:"proxy-protocol,omitempty"`
GracefulWaitBeforeShutdown int `yaml:"graceful-wait-before-shutdown,omitempty" toml:"graceful-wait-before-shutdown,omitempty" json:"graceful-wait-before-shutdown,omitempty"`
GracefulCloseConnTimeout int `yaml:"graceful-close-conn-timeout,omitempty" toml:"graceful-close-conn-timeout,omitempty" json:"graceful-close-conn-timeout,omitempty"`
}

type ProxyServer struct {
Expand Down Expand Up @@ -140,6 +141,7 @@ func NewConfig() *Config {
cfg.Proxy.FrontendKeepalive, cfg.Proxy.BackendHealthyKeepalive, cfg.Proxy.BackendUnhealthyKeepalive = DefaultKeepAlive()
cfg.Proxy.RequireBackendTLS = true
cfg.Proxy.PDAddrs = "127.0.0.1:2379"
cfg.Proxy.GracefulCloseConnTimeout = 15

cfg.API.Addr = "0.0.0.0:3080"

Expand Down
60 changes: 40 additions & 20 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ import (
"go.uber.org/zap"
)

type serverStatus int

const (
// statusNormal: normal status
statusNormal serverStatus = iota

Check failure on line 30 in pkg/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / lint / make (ubuntu-latest)

const `statusNormal` is unused (unused)
// statusWaitShutdown: during graceful-wait-before-shutdown
statusWaitShutdown
// statusDrainClient: during graceful-close-conn-timeout
statusDrainClient
)

type serverState struct {
sync.RWMutex
healthyKeepAlive config.KeepAlive
Expand All @@ -34,8 +45,9 @@ type serverState struct {
requireBackendTLS bool
tcpKeepAlive bool
proxyProtocol bool
gracefulWait int
inShutdown bool
gracefulWait int // graceful-wait-before-shutdown
gracefulClose int // graceful-close-conn-timeout
status serverStatus
}

type SQLServer struct {
Expand Down Expand Up @@ -84,6 +96,7 @@ func (s *SQLServer) reset(cfg *config.ProxyServerOnline) {
s.mu.requireBackendTLS = cfg.RequireBackendTLS
s.mu.proxyProtocol = cfg.ProxyProtocol != ""
s.mu.gracefulWait = cfg.GracefulWaitBeforeShutdown
s.mu.gracefulClose = cfg.GracefulCloseConnTimeout
s.mu.healthyKeepAlive = cfg.BackendHealthyKeepalive
s.mu.unhealthyKeepAlive = cfg.BackendUnhealthyKeepalive
s.mu.connBufferSize = cfg.ConnBufferSize
Expand Down Expand Up @@ -148,11 +161,6 @@ func (s *SQLServer) onConn(ctx context.Context, conn net.Conn, addr string) {
s.logger.Warn("too many connections", zap.Uint64("max connections", maxConns), zap.String("client_addr", conn.RemoteAddr().Network()), zap.Error(conn.Close()))
return
}
if s.mu.inShutdown {
s.mu.Unlock()
s.logger.Warn("in shutdown", zap.String("client_addr", conn.RemoteAddr().Network()), zap.Error(conn.Close()))
return
}

connID := s.mu.connID
s.mu.connID++
Expand Down Expand Up @@ -192,29 +200,44 @@ func (s *SQLServer) onConn(ctx context.Context, conn net.Conn, addr string) {
clientConn.Run(ctx)
}

// IsClosing tells the HTTP API whether it should return healthy status.
func (s *SQLServer) IsClosing() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.mu.inShutdown
return s.mu.status >= statusWaitShutdown
}

// Graceful shutdown doesn't close the listener but rejects new connections.
// Whether this affects NLB is to be tested.
func (s *SQLServer) gracefulShutdown() {
// Step 1: HTTP status returns unhealthy so that NLB takes this instance offline and then new connections won't come.
s.mu.Lock()
gracefulWait := s.mu.gracefulWait
if gracefulWait == 0 {
s.mu.status = statusWaitShutdown
s.mu.Unlock()
s.logger.Info("SQL server prepares for shutdown", zap.Int("graceful_wait", gracefulWait))
if gracefulWait > 0 {
time.Sleep(time.Duration(gracefulWait) * time.Second)
}

// Step 2: reject new connections and drain clients.
for i := range s.listeners {
if err := s.listeners[i].Close(); err != nil {
s.logger.Warn("closing listener fails", zap.Error(err))
}
}
s.mu.Lock()
s.mu.status = statusDrainClient
gracefulClose := s.mu.gracefulClose
s.logger.Info("SQL server is shutting down", zap.Int("graceful_close", gracefulClose), zap.Int("conn_count", len(s.mu.clients)))
if gracefulClose <= 0 {
s.mu.Unlock()
return
}
s.mu.inShutdown = true
for _, conn := range s.mu.clients {
conn.GracefulClose()
}
s.mu.Unlock()
s.logger.Info("SQL server is shutting down", zap.Int("graceful_wait", gracefulWait))

timer := time.NewTimer(time.Duration(gracefulWait) * time.Second)
timer := time.NewTimer(time.Duration(gracefulClose) * time.Second)
defer timer.Stop()
for {
select {
Expand All @@ -239,19 +262,16 @@ func (s *SQLServer) Close() error {
s.cancelFunc()
s.cancelFunc = nil
}
errs := make([]error, 0, 4)
for i := range s.listeners {
errs = append(errs, s.listeners[i].Close())
}

s.mu.RLock()
s.logger.Info("force closing connections", zap.Int("conn_count", len(s.mu.clients)))
for _, conn := range s.mu.clients {
if err := conn.Close(); err != nil {
errs = append(errs, err)
s.logger.Warn("close connection error", zap.Error(err))
}
}
s.mu.RUnlock()

s.wg.Wait()
return errors.Collect(ErrCloseServer, errs...)
return nil
}
68 changes: 53 additions & 15 deletions pkg/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/lib/util/errors"
"github.com/pingcap/tiproxy/lib/util/logger"
"github.com/pingcap/tiproxy/lib/util/waitgroup"
"github.com/pingcap/tiproxy/pkg/manager/cert"
"github.com/pingcap/tiproxy/pkg/manager/router"
"github.com/pingcap/tiproxy/pkg/proxy/backend"
Expand All @@ -24,15 +25,16 @@ import (
"github.com/stretchr/testify/require"
)

func TestGracefulShutdown(t *testing.T) {
func TestGracefulCloseConn(t *testing.T) {
// Graceful shutdown finishes immediately if there's no connection.
lg, _ := logger.CreateLoggerForTest(t)
hsHandler := backend.NewDefaultHandshakeHandler(nil)
server, err := NewSQLServer(lg, config.ProxyServer{
cfg := config.ProxyServer{
ProxyServerOnline: config.ProxyServerOnline{
GracefulWaitBeforeShutdown: 10,
GracefulCloseConnTimeout: 10,
},
}, nil, hsHandler)
}
server, err := NewSQLServer(lg, cfg, nil, hsHandler)
require.NoError(t, err)
finish := make(chan struct{})
go func() {
Expand Down Expand Up @@ -62,11 +64,7 @@ func TestGracefulShutdown(t *testing.T) {
}

// Graceful shutdown will be blocked if there are alive connections.
server, err = NewSQLServer(lg, config.ProxyServer{
ProxyServerOnline: config.ProxyServerOnline{
GracefulWaitBeforeShutdown: 10,
},
}, nil, hsHandler)
server, err = NewSQLServer(lg, cfg, nil, hsHandler)
require.NoError(t, err)
clientConn := createClientConn()
go func() {
Expand All @@ -89,12 +87,9 @@ func TestGracefulShutdown(t *testing.T) {
case <-finish:
}

// Graceful shutdown will shut down after GracefulWaitBeforeShutdown.
server, err = NewSQLServer(lg, config.ProxyServer{
ProxyServerOnline: config.ProxyServerOnline{
GracefulWaitBeforeShutdown: 1,
},
}, nil, hsHandler)
// Graceful shutdown will shut down after GracefulCloseConnTimeout.
cfg.GracefulCloseConnTimeout = 1
server, err = NewSQLServer(lg, cfg, nil, hsHandler)
require.NoError(t, err)
createClientConn()
go func() {
Expand All @@ -108,6 +103,49 @@ func TestGracefulShutdown(t *testing.T) {
}
}

func TestGracefulShutDown(t *testing.T) {
lg, _ := logger.CreateLoggerForTest(t)
hsHandler := backend.NewDefaultHandshakeHandler(nil)
cfg := config.ProxyServer{
ProxyServerOnline: config.ProxyServerOnline{
GracefulWaitBeforeShutdown: 2,
GracefulCloseConnTimeout: 10,
},
}
server, err := NewSQLServer(lg, cfg, nil, hsHandler)
require.NoError(t, err)

var wg waitgroup.WaitGroup
wg.Run(func() {
// Wait until the server begins to shut down.
for i := 0; ; i++ {
if server.IsClosing() {
break
}
if i >= 50 {
t.Fatal("timeout")
}
time.Sleep(10 * time.Millisecond)
}
// The listener should be open.
conn1, err := net.Dial("tcp", server.listeners[0].Addr().String())
require.NoError(t, err)
// The listener should be closed after GracefulWaitBeforeShutdown.
require.Eventually(t, func() bool {
conn, err := net.Dial("tcp", server.listeners[0].Addr().String())
if err == nil {
require.NoError(t, conn.Close())
} else {
require.ErrorContains(t, err, "connection refused")
}
return err != nil
}, 3*time.Second, 100*time.Millisecond)
require.NoError(t, conn1.Close())
})
require.NoError(t, server.Close())
wg.Wait()
}

func TestMultiAddr(t *testing.T) {
lg, _ := logger.CreateLoggerForTest(t)
certManager := cert.NewCertManager()
Expand Down

0 comments on commit d53bf9a

Please sign in to comment.