Skip to content

Commit

Permalink
[Feat] support slb connection rebalance in ODP mode (#123)
Browse files Browse the repository at this point in the history
* [Feat] support slb connection rebalance in ODP mode

* [Fix] review
  • Loading branch information
shenyunlong authored Oct 19, 2023
1 parent 4220b48 commit e192eae
Show file tree
Hide file tree
Showing 16 changed files with 589 additions and 62 deletions.
2 changes: 2 additions & 0 deletions client/obclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ func (c *obClient) init() error {
func (c *obClient) initOdp() error {
// 1. Init odp table
t := NewObTable(c.odpIP, c.odpRpcPort, c.tenantName, c.fullUserName, c.password, c.database)
t.setMaxConnectionAge(c.config.MaxConnectionAge)
t.setEnableSLBLoadBalance(c.config.EnableSLBLoadBalance)
err := t.init(c.config.ConnPoolMaxConnSize, c.config.ConnConnectTimeOut, c.config.ConnLoginTimeout)
// 2. Init sql
// ObVersion will be set when login in init()
Expand Down
29 changes: 22 additions & 7 deletions client/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type ObTable struct {

isClosed bool
mutex sync.Mutex

maxConnectionAge time.Duration
enableSLBLoadBalance bool
}

func NewObTable(
Expand All @@ -51,13 +54,15 @@ func NewObTable(
password string,
database string) *ObTable {
return &ObTable{
ip: ip,
port: port,
tenantName: tenantName,
userName: userName,
password: password,
database: database,
isClosed: false,
ip: ip,
port: port,
tenantName: tenantName,
userName: userName,
password: password,
database: database,
isClosed: false,
maxConnectionAge: time.Duration(0),
enableSLBLoadBalance: false,
}
}

Expand All @@ -72,6 +77,8 @@ func (t *ObTable) init(connPoolSize int, connectTimeout time.Duration, loginTime
t.database,
t.userName,
t.password,
t.maxConnectionAge,
t.enableSLBLoadBalance,
)
cli, err := obkvrpc.NewRpcClient(opt)
if err != nil {
Expand All @@ -81,6 +88,14 @@ func (t *ObTable) init(connPoolSize int, connectTimeout time.Duration, loginTime
return nil
}

func (t *ObTable) setMaxConnectionAge(duration time.Duration) {
t.maxConnectionAge = duration
}

func (t *ObTable) setEnableSLBLoadBalance(b bool) {
t.enableSLBLoadBalance = b
}

func (t *ObTable) retry(
ctx context.Context,
request protocol.ObPayload,
Expand Down
10 changes: 9 additions & 1 deletion config/client_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type ClientConfig struct {
RsListHttpGetRetryInterval time.Duration

EnableRerouting bool

// connection rebalance in ODP mode
MaxConnectionAge time.Duration
EnableSLBLoadBalance bool
}

func NewDefaultClientConfig() *ClientConfig {
Expand All @@ -65,6 +69,8 @@ func NewDefaultClientConfig() *ClientConfig {
RsListHttpGetRetryTimes: 3,
RsListHttpGetRetryInterval: time.Duration(100) * time.Millisecond, // 100ms,
EnableRerouting: false,
MaxConnectionAge: time.Duration(0) * time.Second, // valid iff > 0
EnableSLBLoadBalance: false,
}
}

Expand All @@ -85,6 +91,8 @@ func (c *ClientConfig) String() string {
"RsListHttpGetTimeout:" + c.RsListHttpGetTimeout.String() + ", " +
"RsListHttpGetRetryTimes:" + strconv.Itoa(c.RsListHttpGetRetryTimes) + ", " +
"RsListHttpGetRetryInterval:" + c.RsListHttpGetRetryInterval.String() + ", " +
"EnableRerouting:" + strconv.FormatBool(c.EnableRerouting) +
"EnableRerouting:" + strconv.FormatBool(c.EnableRerouting) + ", " +
"MaxConnectionAge:" + c.MaxConnectionAge.String() + ", " +
"EnableSLBLoadBalance:" + strconv.FormatBool(c.EnableSLBLoadBalance) +
"}"
}
10 changes: 7 additions & 3 deletions config/toml_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ type RsListConfig struct {
}

type ExtraConfig struct {
OperationTimeOut int
LogLevel string
EnableRerouting bool
OperationTimeOut int
LogLevel string
EnableRerouting bool
MaxConnectionAge int
EnableSLBLoadBalance bool
}

func (c *ClientConfiguration) checkClientConfiguration() error {
Expand Down Expand Up @@ -147,6 +149,8 @@ func (c *ClientConfiguration) GetClientConfig() *ClientConfig {
RsListHttpGetRetryTimes: c.RsListConfig.HttpGetRetryTimes,
RsListHttpGetRetryInterval: time.Duration(c.RsListConfig.HttpGetRetryInterval) * time.Millisecond,
EnableRerouting: c.ExtraConfig.EnableRerouting,
MaxConnectionAge: time.Duration(c.ExtraConfig.MaxConnectionAge) * time.Millisecond,
EnableSLBLoadBalance: c.ExtraConfig.EnableSLBLoadBalance,
}
}

Expand Down
2 changes: 2 additions & 0 deletions configurations/obkv-table-default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@ HttpGetRetryInterval = 100
OperationTimeOut = 10000
LogLevel = "info"
EnableRerouting = false
MaxConnectionAge = 0
EnableSLBLoadBalance = false
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ go 1.19

require (
github.com/go-sql-driver/mysql v1.7.0
github.com/naoina/toml v0.1.1
github.com/pkg/errors v0.9.1
github.com/scylladb/go-set v1.0.2
github.com/stretchr/testify v1.8.0
go.uber.org/zap v1.24.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/naoina/go-stringutil v0.1.0 // indirect
github.com/naoina/toml v0.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
15 changes: 6 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLj
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/set v0.2.1 h1:nn2CaJyknWE/6txyUDGwysr3G5QC6xWB/PtVjPBbeaA=
github.com/fatih/set v0.2.1/go.mod h1:+RKtMCH+favT2+3YecHGxcc0b4KyVWA1QWWJUs4E0CI=
github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hzifhks=
github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0=
github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8=
Expand All @@ -14,8 +16,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/scylladb/go-set v1.0.2 h1:SkvlMCKhP0wyyct6j+0IHJkBkSZL+TDzZ4E7f7BCcRE=
github.com/scylladb/go-set v1.0.2/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand All @@ -28,11 +30,6 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
12 changes: 9 additions & 3 deletions obkvrpc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ type Connection struct {

ezHeaderLength int
rpcHeaderLength int
expireTime time.Time
isExpired atomic.Bool
slbLoader *SLBLoader
}

type packet struct {
Expand Down Expand Up @@ -211,6 +214,7 @@ func (c *Connection) Execute(
seq := c.seq.Add(1)

totalBuf := c.encodePacket(seq, request)
trace := fmt.Sprintf("Y%X-%016X", request.UniqueId(), request.Sequence())

call := &call{
err: nil,
Expand All @@ -234,21 +238,21 @@ func (c *Connection) Execute(
c.mutex.Lock()
delete(c.pending, seq)
c.mutex.Unlock()
return errors.WithMessage(ctx.Err(), "wait send packet to channel")
return errors.WithMessage(ctx.Err(), "wait send packet to channel, trace: "+trace)
}

// wait call back
select {
case call = <-call.signal:
if call.err != nil { // transport failed
return errors.WithMessage(call.err, "receive packet")
return errors.WithMessage(call.err, "receive packet, trace: "+trace)
}
case <-ctx.Done():
// timeout
c.mutex.Lock()
delete(c.pending, seq)
c.mutex.Unlock()
return errors.WithMessage(ctx.Err(), "wait transport packet")
return errors.WithMessage(ctx.Err(), "wait transport packet, trace: "+trace)
}

// transport success
Expand Down Expand Up @@ -402,6 +406,7 @@ func (c *Connection) writerWrite(packet packet) {
}

func (c *Connection) Close() {
log.Info(fmt.Sprintf("close connection start, remote addr:%s", c.conn.RemoteAddr().String()))
c.active.Store(false)
c.closeOnce.Do(func() {
close(c.packetChannelClose) // close packet channel
Expand All @@ -415,6 +420,7 @@ func (c *Connection) Close() {
}
c.mutex.Unlock()
})
log.Info(fmt.Sprintf("close connection success, remote addr:%s", c.conn.RemoteAddr().String()))
}

func (c *Connection) encodePacket(seq uint32, request protocol.ObPayload) []byte {
Expand Down
122 changes: 122 additions & 0 deletions obkvrpc/connection_lifecycle_mgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*-
* #%L
* OBKV Table Client Framework
* %%
* Copyright (C) 2023 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package obkvrpc

import (
"context"
"fmt"
"github.com/oceanbase/obkv-table-client-go/log"
"go.uber.org/zap"
"math"
"time"
)

type ConnectionLifeCycleMgr struct {
connPool *ConnectionPool
maxConnectionAge time.Duration
lastExpireIdx int
}

func (s *ConnectionLifeCycleMgr) String() string {
return fmt.Sprintf("ConnectionLifeCycleMgr{connPool: %p, maxConnectionAge: %d,lastExpireIdx: %d}",
s.connPool, s.maxConnectionAge, s.lastExpireIdx)
}

func NewConnectionLifeCycleMgr(connPool *ConnectionPool, maxConnectionAge time.Duration) *ConnectionLifeCycleMgr {
connLifeCycleMgr := &ConnectionLifeCycleMgr{
connPool: connPool,
maxConnectionAge: maxConnectionAge,
lastExpireIdx: 0,
}
return connLifeCycleMgr
}

// check and reconnect timeout connections
func (c *ConnectionLifeCycleMgr) run() {
if c.connPool == nil {
log.Error("connection pool is null")
return
}

// 1. get all timeout connections
expiredConnIds := make([]int, 0, len(c.connPool.connections))
for i := 1; i <= len(c.connPool.connections); i++ {
connection := c.connPool.connections[(i+c.lastExpireIdx)%(len(c.connPool.connections))]
if !connection.expireTime.IsZero() && connection.expireTime.Before(time.Now()) {
expiredConnIds = append(expiredConnIds, (i+c.lastExpireIdx)%(len(c.connPool.connections)))
}
}

if len(expiredConnIds) > 0 {
log.Info(fmt.Sprintf("Find %d expired connections", len(expiredConnIds)))
for idx, connIdx := range expiredConnIds {
log.Info(fmt.Sprintf("%d: ip=%s, port=%d", idx, c.connPool.connections[connIdx].option.ip, c.connPool.connections[connIdx].option.port))
}
}

// 2. mark 30% expired connections as expired
maxReconnIdx := int(math.Ceil(float64(len(expiredConnIds)) / 3))
if maxReconnIdx > 0 {
c.lastExpireIdx = expiredConnIds[maxReconnIdx-1]
log.Info(fmt.Sprintf("Begin to refresh expired connections which idx less than %d", maxReconnIdx))
}
for i := 0; i < maxReconnIdx; i++ {
// no one can get expired connection
c.connPool.connections[expiredConnIds[i]].isExpired.Store(true)
}
defer func() {
for i := 0; i < maxReconnIdx; i++ {
c.connPool.connections[expiredConnIds[i]].isExpired.Store(false)
}
}()

// 3. wait all expired connection finished
time.Sleep(DefaultConnectWaitTime)
for i := 0; i < maxReconnIdx; i++ {
pool := c.connPool.connections
idx := expiredConnIds[i]
for j := 0; len(pool[idx].pending) > 0; j++ {
time.Sleep(time.Duration(10) * time.Millisecond)
if j > 0 && j%100 == 0 {
log.Info(fmt.Sprintf("Wait too long time for the connection to end,"+
"connection idx: %d, ip:%s, port:%d, current connection pending size: %d",
idx, pool[idx].option.ip, pool[idx].option.port, len(pool[idx].pending)))
}

if j > 3000 {
log.Warn("Wait too much time for the connection to end, stop ConnectionLifeCycleMgr")
return
}
}
}

// 4. close and reconnect all expired connections
ctx, _ := context.WithTimeout(context.Background(), c.connPool.option.connectTimeout)
for i := 0; i < maxReconnIdx; i++ {
// close and reconnect
c.connPool.connections[expiredConnIds[i]].Close()
_, err := c.connPool.RecreateConnection(ctx, expiredConnIds[i])
if err != nil {
log.Warn("reconnect failed", zap.Error(err))
return
}
}
if maxReconnIdx > 0 {
log.Info(fmt.Sprintf("Finish to refresh expired connections which idx less than %d", maxReconnIdx))
}
}
Loading

0 comments on commit e192eae

Please sign in to comment.