Skip to content

Commit

Permalink
add more error sources
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 committed Nov 21, 2023
1 parent d148588 commit 2cc8f0f
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 98 deletions.
18 changes: 10 additions & 8 deletions pkg/proxy/backend/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ func (auth *Authenticator) verifyBackendCaps(logger *zap.Logger, backendCapabili
// The error cannot be sent to the client because the client only expects an initial handshake packet.
// The only way is to log it and disconnect.
logger.Error("require backend capabilities", zap.Stringer("common", commonCaps), zap.Stringer("required", requiredBackendCaps^commonCaps))
return errors.Wrapf(ErrCapabilityNegotiation, "require %s from backend", requiredBackendCaps^commonCaps)
return pnet.WrapUserError(errors.Wrap(ErrBackendHandshake, errors.Wrapf(ErrCapabilityNegotiation, "require %s from backend", requiredBackendCaps^commonCaps)), capabilityErrMsg)
}
if auth.requireBackendTLS && (backendCapability&pnet.ClientSSL == 0) {
return pnet.WrapUserError(errors.New("backend doesn't enable TLS"), requireTiDBTLSErrMsg)
return pnet.WrapUserError(errors.Wrap(ErrBackendHandshake, errors.New("backend doesn't enable TLS")), requireTiDBTLSErrMsg)
}
return nil
}
Expand Down Expand Up @@ -125,7 +125,7 @@ func (auth *Authenticator) handshakeFirstTime(logger *zap.Logger, cctx ConnConte
if writeErr := clientIO.WriteErrPacket(mysql.NewDefaultError(mysql.ER_NOT_SUPPORTED_AUTH_MODE)); writeErr != nil {
return writeErr
}
return errors.Wrapf(ErrCapabilityNegotiation, "require %s from frontend", requiredFrontendCaps&^commonCaps)
return errors.Wrap(ErrClientHandshake, errors.Wrapf(ErrCapabilityNegotiation, "require %s from frontend", requiredFrontendCaps&^commonCaps))
}
commonCaps := frontendCapability & proxyCapability
if frontendCapability^commonCaps != 0 {
Expand Down Expand Up @@ -185,7 +185,7 @@ RECONNECT:
}

if err := auth.verifyBackendCaps(logger, backendCapability); err != nil {
return pnet.WrapUserError(err, capabilityErrMsg)
return err
}

if common := proxyCapability & backendCapability; (proxyCapability^common)&^pnet.ClientSSL != 0 {
Expand Down Expand Up @@ -224,12 +224,14 @@ loop:
}
return err
}
var packetErr error
var packetErr *mysql.MyError
if serverPkt[0] == pnet.ErrHeader.Byte() {
packetErr = pnet.ParseErrorPacket(serverPkt)
if handshakeHandler.HandleHandshakeErr(cctx, packetErr.(*mysql.MyError)) {
logger.Warn("handle handshake error, start reconnect", zap.Error(err))
backendIO.Close()
if handshakeHandler.HandleHandshakeErr(cctx, packetErr) {
logger.Warn("handle handshake error, start reconnect", zap.Error(packetErr))
if closeErr := backendIO.Close(); closeErr != nil {
logger.Warn("close backend error", zap.Error(closeErr))
}
goto RECONNECT
}
}
Expand Down
46 changes: 15 additions & 31 deletions pkg/proxy/backend/backend_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func NewBackendConnManager(logger *zap.Logger, handshakeHandler HandshakeHandler
// There are 2 types of signals, which may be sent concurrently.
signalReceived: make(chan signalType, signalTypeNums),
redirectResCh: make(chan *redirectResult, 1),
quitSource: SrcClientQuit,
quitSource: SrcNone,
}
mgr.SetValue(ConnContextKeyConnID, connectionID)
return mgr
Expand All @@ -170,13 +170,11 @@ func (mgr *BackendConnManager) Connect(ctx context.Context, clientIO *pnet.Packe
mgr.clientIO = clientIO
err := mgr.authenticator.handshakeFirstTime(mgr.logger.Named("authenticator"), mgr, clientIO, mgr.handshakeHandler, mgr.getBackendIO, frontendTLSConfig, backendTLSConfig)
if err != nil {
mgr.setQuitSourceByErr(err)
mgr.handshakeHandler.OnHandshake(mgr, mgr.ServerAddr(), err)
mgr.handshakeHandler.OnHandshake(mgr, mgr.ServerAddr(), err, Error2Source(err))
clientIO.WriteUserError(err)
return err
}
mgr.resetQuitSource()
mgr.handshakeHandler.OnHandshake(mgr, mgr.ServerAddr(), nil)
mgr.handshakeHandler.OnHandshake(mgr, mgr.ServerAddr(), nil, SrcNone)

mgr.cmdProcessor.capability = mgr.authenticator.capability
childCtx, cancelFunc := context.WithCancel(ctx)
Expand Down Expand Up @@ -235,8 +233,7 @@ func (mgr *BackendConnManager) getBackendIO(cctx ConnContext, auth *Authenticato
backoff.WithContext(backoff.NewConstantBackOff(200*time.Millisecond), bctx),
func(err error, d time.Duration) {
origErr = err
mgr.setQuitSourceByErr(err)
mgr.handshakeHandler.OnHandshake(cctx, addr, err)
mgr.handshakeHandler.OnHandshake(cctx, addr, err, Error2Source(err))
},
)
cancel()
Expand Down Expand Up @@ -431,7 +428,7 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) {
// If the backend connection is closed, also close the client connection.
// Otherwise, if the client is idle, the mgr will keep retrying.
if errors.Is(rs.err, net.ErrClosed) || pnet.IsDisconnectError(rs.err) || errors.Is(rs.err, os.ErrDeadlineExceeded) {
mgr.quitSource = SrcBackendQuit
mgr.quitSource = SrcBackendNetwork
if ignoredErr := mgr.clientIO.GracefulClose(); ignoredErr != nil {
mgr.logger.Warn("graceful close client IO error", zap.Stringer("client_addr", mgr.clientIO.RemoteAddr()), zap.Error(ignoredErr))
}
Expand All @@ -442,21 +439,18 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) {
return
}

defer mgr.resetQuitSource()
var cn net.Conn
cn, rs.err = net.DialTimeout("tcp", rs.to, DialTimeout)
if rs.err != nil {
mgr.quitSource = SrcBackendQuit
mgr.handshakeHandler.OnHandshake(mgr, rs.to, rs.err)
mgr.handshakeHandler.OnHandshake(mgr, rs.to, rs.err, SrcBackendNetwork)
return
}
newBackendIO := pnet.NewPacketIO(cn, mgr.logger, mgr.config.ConnBufferSize, pnet.WithRemoteAddr(rs.to, cn.RemoteAddr()), pnet.WithWrapError(ErrBackendConn))

if rs.err = mgr.authenticator.handshakeSecondTime(mgr.logger, mgr.clientIO, newBackendIO, mgr.backendTLS, sessionToken); rs.err == nil {
rs.err = mgr.initSessionStates(newBackendIO, sessionStates)
} else {
mgr.setQuitSourceByErr(rs.err)
mgr.handshakeHandler.OnHandshake(mgr, newBackendIO.RemoteAddr().String(), rs.err)
mgr.handshakeHandler.OnHandshake(mgr, newBackendIO.RemoteAddr().String(), rs.err, Error2Source(rs.err))
}
if rs.err != nil {
if ignoredErr := newBackendIO.Close(); ignoredErr != nil && !pnet.IsDisconnectError(ignoredErr) {
Expand All @@ -469,7 +463,7 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) {
}
mgr.backendIO.Store(newBackendIO)
mgr.setKeepAlive(mgr.config.HealthyKeepAlive)
mgr.handshakeHandler.OnHandshake(mgr, mgr.ServerAddr(), nil)
mgr.handshakeHandler.OnHandshake(mgr, mgr.ServerAddr(), nil, SrcNone)
}

// The original db in the auth info may be dropped during the session, so we need to authenticate with the current db.
Expand Down Expand Up @@ -553,7 +547,7 @@ func (mgr *BackendConnManager) checkBackendActive() {
if !backendIO.IsPeerActive() {
mgr.logger.Info("backend connection is closed, close client connection",
zap.Stringer("client_addr", mgr.clientIO.RemoteAddr()), zap.Stringer("backend_addr", backendIO.RemoteAddr()))
mgr.quitSource = SrcBackendQuit
mgr.quitSource = SrcBackendNetwork
if err := mgr.clientIO.GracefulClose(); err != nil {
mgr.logger.Warn("graceful close client IO error", zap.Stringer("client_addr", mgr.clientIO.RemoteAddr()), zap.Error(err))
}
Expand Down Expand Up @@ -627,7 +621,7 @@ func (mgr *BackendConnManager) Close() error {
}
mgr.wg.Wait()

handErr := mgr.handshakeHandler.OnConnClose(mgr)
handErr := mgr.handshakeHandler.OnConnClose(mgr, mgr.quitSource)

var connErr error
var addr string
Expand Down Expand Up @@ -677,26 +671,16 @@ func (mgr *BackendConnManager) setKeepAlive(cfg config.KeepAlive) {
}
}

// quitSource will be read by OnHandshake and OnConnClose, so setQuitSourceByErr should be called before them.
func (mgr *BackendConnManager) setQuitSourceByErr(err error) {
// Do not update the source if err is nil. It may be already be set.
if err == nil {
return
}
if errors.Is(err, ErrBackendConn) {
mgr.quitSource = SrcBackendQuit
} else if IsMySQLError(err) {
mgr.quitSource = SrcClientErr
} else if !errors.Is(err, ErrClientConn) {
mgr.quitSource = SrcProxyErr
// The source may be already be set.
// E.g. quitSource is set before TiProxy shuts down and client connection error is caused by shutdown instead of network.
if mgr.quitSource != SrcNone {
return
}
}

func (mgr *BackendConnManager) resetQuitSource() {
// SrcClientQuit is by default.
// Sometimes ErrClientConn is caused by GracefulClose and the quitSource is already set.
// Error maybe set during handshake for OnHandshake. If handshake finally succeeds, we reset it.
mgr.quitSource = SrcClientQuit
mgr.quitSource = Error2Source(err)
}

func (mgr *BackendConnManager) UpdateLogger(fields ...zap.Field) {
Expand Down
115 changes: 113 additions & 2 deletions pkg/proxy/backend/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package backend

import (
gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/tiproxy/lib/util/errors"
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
)

const (
Expand All @@ -18,6 +20,115 @@ const (
)

var (
ErrClientConn = errors.New("this is an error from client")
ErrBackendConn = errors.New("this is an error from backend")
ErrClientConn = errors.New("read or write client connection fail")
ErrClientHandshake = errors.New("handshake with client fail")
ErrBackendConn = errors.New("read or write backend connection fail")
ErrBackendHandshake = errors.New("handshake with backend fail")
)

type SourceComp int

const (
CompNone SourceComp = iota
CompClient
CompProxy
CompBackend
)

type ErrorSource int

const (
// SrcNone includes: succeed for OnHandshake; client normally quit for OnConnClose
SrcNone ErrorSource = iota
// SrcClientNetwork includes: EOF; reset by peer; connection refused; TLS handshake fails
SrcClientNetwork
// SrcClientHandshake includes: client capability unsupported
SrcClientHandshake
// SrcClientSQLErr includes: backend returns auth fail; SQL error
SrcClientSQLErr
// SrcProxyQuit includes: proxy graceful shutdown
SrcProxyQuit
// SrcProxyMalformed includes: malformed packet; invalid sequence
SrcProxyMalformed
// SrcProxyGetBackend includes: no backends
SrcProxyGetBackend
// SrcProxyErr includes: HandshakeHandler returns error; proxy disables TLS
SrcProxyErr
// SrcBackendNetwork includes: EOF; reset by peer; connection refused; TLS handshake fails
SrcBackendNetwork
// SrcBackendHandshake includes: backend capability unsupported; backend disables TLS
SrcBackendHandshake
)

// Error2Source returns the ErrorSource by the error.
func Error2Source(err error) ErrorSource {
switch {
case err == nil:
return SrcNone
case errors.Is(err, pnet.ErrInvalidSequence) || errors.Is(err, gomysql.ErrMalformPacket):
// We assume the clients and TiDB are right and treat it as TiProxy bugs.
// ErrInvalidSequence may be wrapped with ErrClientConn or ErrBackendConn, so put it before other conditions.
return SrcProxyErr
case errors.Is(err, ErrClientConn):
return SrcClientNetwork
case errors.Is(err, ErrBackendConn):
return SrcBackendNetwork
case errors.Is(err, ErrClientHandshake):
return SrcClientHandshake
case errors.Is(err, ErrBackendHandshake):
return SrcBackendHandshake
case IsMySQLError(err):
return SrcClientSQLErr
default:
return SrcProxyErr
}
}

func (es ErrorSource) String() string {
switch es {
case SrcNone:
return "ok"
case SrcClientNetwork:
return "client network break"
case SrcClientHandshake:
return "client handshake fail"
case SrcClientSQLErr:
return "client SQL error"
case SrcProxyQuit:
return "proxy shutdown"
case SrcProxyMalformed:
return "malformed packet"
case SrcProxyGetBackend:
return "proxy get backend fail"
case SrcProxyErr:
return "proxy error"
case SrcBackendNetwork:
return "backend network break"
case SrcBackendHandshake:
return "backend handshake fail"
}
return "unknown"
}

// GetSourceComp returns which component does this error belong to.
func (es ErrorSource) GetSourceComp() SourceComp {
switch es {
case SrcClientNetwork, SrcClientHandshake, SrcClientSQLErr:
return CompClient
case SrcProxyQuit, SrcProxyMalformed, SrcProxyGetBackend, SrcProxyErr:
return CompProxy
case SrcBackendNetwork, SrcBackendHandshake:
return CompBackend
default:
return CompNone
}
}

// Normal returns whether this error source is expected.
func (es ErrorSource) Normal() bool {
switch es {
case SrcNone, SrcProxyQuit:
return true
}
return false
}
Loading

0 comments on commit 2cc8f0f

Please sign in to comment.