Skip to content

Commit

Permalink
check target before redirection
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 committed Dec 14, 2023
1 parent f22f82b commit 8ef7f18
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 73 deletions.
8 changes: 4 additions & 4 deletions pkg/manager/router/backend_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (

type BackendStatus int

func (bs *BackendStatus) ToScore() int {
return statusScores[*bs]
func (bs BackendStatus) ToScore() int {
return statusScores[bs]
}

func (bs *BackendStatus) String() string {
status, ok := statusNames[*bs]
func (bs BackendStatus) String() string {
status, ok := statusNames[bs]
if !ok {
return "unknown"
}
Expand Down
57 changes: 54 additions & 3 deletions pkg/manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package router

import (
"sync"
"time"

glist "github.com/bahlo/generic-list-go"
Expand Down Expand Up @@ -63,14 +64,23 @@ type RedirectableConn interface {
SetValue(key, val any)
Value(key any) any
// Redirect returns false if the current conn is not redirectable.
Redirect(addr string) bool
Redirect(backend BackendInst) bool
NotifyBackendStatus(status BackendStatus)
ConnectionID() uint64
}

// BackendInst defines a backend that a connection is redirecting to.
type BackendInst interface {
Addr() string
Healthy() bool
}

// backendWrapper contains the connections on the backend.
type backendWrapper struct {
*backendHealth
mu struct {
sync.RWMutex
backendHealth
}
addr string
// connScore is used for calculating backend scores and check if the backend can be removed from the list.
// connScore = connList.Len() + incoming connections - outgoing connections.
Expand All @@ -80,9 +90,50 @@ type backendWrapper struct {
connList *glist.List[*connWrapper]
}

func (b *backendWrapper) setHealth(health backendHealth) {
b.mu.Lock()
b.mu.backendHealth = health
b.mu.Unlock()
}

// score calculates the score of the backend. Larger score indicates higher load.
func (b *backendWrapper) score() int {
return b.status.ToScore() + b.connScore
b.mu.RLock()
score := b.mu.status.ToScore() + b.connScore
b.mu.RUnlock()
return score
}

func (b *backendWrapper) Addr() string {
return b.addr
}

func (b *backendWrapper) Status() BackendStatus {
b.mu.RLock()
status := b.mu.status
b.mu.RUnlock()
return status
}

func (b *backendWrapper) Healthy() bool {
b.mu.RLock()
healthy := b.mu.status == StatusHealthy
b.mu.RUnlock()
return healthy
}

func (b *backendWrapper) ServerVersion() string {
b.mu.RLock()
version := b.mu.serverVersion
b.mu.RUnlock()
return version
}

func (b *backendWrapper) String() string {
b.mu.RLock()
str := b.mu.String()
b.mu.RUnlock()
return str
}

// connWrapper wraps RedirectableConn.
Expand Down
69 changes: 37 additions & 32 deletions pkg/manager/router/router_score.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (router *ScoreBasedRouter) routeOnce(excluded []string) (string, error) {
for be := router.backends.Back(); be != nil; be = be.Prev() {
backend := be.Value
// These backends may be recycled, so we should not connect to them again.
switch backend.status {
switch backend.Status() {
case StatusCannotConnect, StatusSchemaOutdated:
continue
}
Expand All @@ -97,7 +97,7 @@ func (router *ScoreBasedRouter) routeOnce(excluded []string) (string, error) {
}
if !found {
backend.connScore++
router.adjustBackendList(be)
router.adjustBackendList(be, false)
return backend.addr, nil
}
}
Expand All @@ -123,29 +123,29 @@ func (router *ScoreBasedRouter) onCreateConn(addr string, conn RedirectableConn,
conn.SetEventReceiver(router)
} else {
backend.connScore--
router.adjustBackendList(be)
router.adjustBackendList(be, true)
}
}

func (router *ScoreBasedRouter) removeConn(be *glist.Element[*backendWrapper], ce *glist.Element[*connWrapper]) {
backend := be.Value
backend.connList.Remove(ce)
setBackendConnMetrics(backend.addr, backend.connList.Len())
router.adjustBackendList(be)
router.adjustBackendList(be, true)
}

func (router *ScoreBasedRouter) addConn(be *glist.Element[*backendWrapper], conn *connWrapper) {
backend := be.Value
ce := backend.connList.PushBack(conn)
setBackendConnMetrics(backend.addr, backend.connList.Len())
router.setConnWrapper(conn, ce)
conn.NotifyBackendStatus(backend.status)
router.adjustBackendList(be)
conn.NotifyBackendStatus(backend.Status())
router.adjustBackendList(be, false)
}

// adjustBackendList moves `be` after the score of `be` changes to keep the list ordered.
func (router *ScoreBasedRouter) adjustBackendList(be *glist.Element[*backendWrapper]) {
if router.removeBackendIfEmpty(be) {
func (router *ScoreBasedRouter) adjustBackendList(be *glist.Element[*backendWrapper], removeEmpty bool) {
if removeEmpty && router.removeBackendIfEmpty(be) {
return
}

Expand Down Expand Up @@ -193,7 +193,7 @@ func (router *ScoreBasedRouter) RedirectConnections() error {
if connWrapper.phase != phaseRedirectNotify {
connWrapper.phase = phaseRedirectNotify
// we dont care the results
_ = connWrapper.Redirect(backend.addr)
_ = connWrapper.Redirect(backend)
}
}
}
Expand Down Expand Up @@ -225,14 +225,15 @@ func (router *ScoreBasedRouter) ensureBackend(addr string, forward bool) *glist.
if be == nil {
// The backend should always exist if it will be needed. Add a warning and add it back.
router.logger.Warn("backend is not found in the router", zap.String("backend_addr", addr), zap.Stack("stack"))
be = router.backends.PushFront(&backendWrapper{
backendHealth: &backendHealth{
status: StatusCannotConnect,
},
backend := &backendWrapper{
addr: addr,
connList: glist.New[*connWrapper](),
}
backend.setHealth(backendHealth{
status: StatusCannotConnect,
})
router.adjustBackendList(be)
be = router.backends.PushFront(backend)
router.adjustBackendList(be, false)
}
return be
}
Expand Down Expand Up @@ -261,9 +262,9 @@ func (router *ScoreBasedRouter) onRedirectFinished(from, to string, conn Redirec
connWrapper.phase = phaseRedirectEnd
} else {
fromBe.Value.connScore++
router.adjustBackendList(fromBe)
router.adjustBackendList(fromBe, false)
toBe.Value.connScore--
router.adjustBackendList(toBe)
router.adjustBackendList(toBe, true)
connWrapper.phase = phaseRedirectFail
}
addMigrateMetrics(from, to, succeed, connWrapper.lastRedirect)
Expand Down Expand Up @@ -291,18 +292,19 @@ func (router *ScoreBasedRouter) OnBackendChanged(backends map[string]*backendHea
if be == nil && health.status != StatusCannotConnect {
router.logger.Info("update backend", zap.String("backend_addr", addr),
zap.String("prev", "none"), zap.String("cur", health.String()))
be = router.backends.PushBack(&backendWrapper{
backendHealth: health,
addr: addr,
connList: glist.New[*connWrapper](),
})
router.adjustBackendList(be)
backend := &backendWrapper{
addr: addr,
connList: glist.New[*connWrapper](),
}
backend.setHealth(*health)
be = router.backends.PushBack(backend)
router.adjustBackendList(be, false)
} else if be != nil {
backend := be.Value
router.logger.Info("update backend", zap.String("backend_addr", addr),
zap.String("prev", backend.String()), zap.String("cur", health.String()))
backend.backendHealth = health
router.adjustBackendList(be)
zap.String("prev", backend.mu.String()), zap.String("cur", health.String()))
backend.setHealth(*health)
router.adjustBackendList(be, true)
for ele := backend.connList.Front(); ele != nil; ele = ele.Next() {
conn := ele.Value
conn.NotifyBackendStatus(health.status)
Expand Down Expand Up @@ -371,20 +373,20 @@ func (router *ScoreBasedRouter) rebalance(maxNum int) {
zap.String("from", busiestBackend.addr), zap.String("to", idlestBackend.addr),
zap.Int("from_score", busiestBackend.score()), zap.Int("to_score", idlestBackend.score()))
busiestBackend.connScore--
router.adjustBackendList(busiestEle)
router.adjustBackendList(busiestEle, true)
idlestBackend.connScore++
router.adjustBackendList(idlestEle)
router.adjustBackendList(idlestEle, false)
conn.phase = phaseRedirectNotify
conn.lastRedirect = curTime
conn.Redirect(idlestBackend.addr)
conn.Redirect(idlestBackend)
}
}

func (router *ScoreBasedRouter) removeBackendIfEmpty(be *glist.Element[*backendWrapper]) bool {
backend := be.Value
// If connList.Len() == 0, there won't be any outgoing connections.
// And if also connScore == 0, there won't be any incoming connections.
if backend.status == StatusCannotConnect && backend.connList.Len() == 0 && backend.connScore <= 0 {
if backend.Status() == StatusCannotConnect && backend.connList.Len() == 0 && backend.connScore <= 0 {
router.backends.Remove(be)
return true
}
Expand All @@ -406,9 +408,12 @@ func (router *ScoreBasedRouter) ConnCount() int {
func (router *ScoreBasedRouter) updateServerVersion() {
for be := router.backends.Front(); be != nil; be = be.Next() {
backend := be.Value
if backend.backendHealth.status != StatusCannotConnect && len(backend.serverVersion) > 0 {
router.serverVersion = backend.serverVersion
return
if backend.Status() != StatusCannotConnect {
serverVersion := backend.ServerVersion()
if len(serverVersion) > 0 {
router.serverVersion = serverVersion
return
}
}
}
}
Expand Down
Loading

0 comments on commit 8ef7f18

Please sign in to comment.