Skip to content

Commit

Permalink
Cherry-pick afbce6a with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] committed May 23, 2024
1 parent 09edbca commit 0ed053a
Show file tree
Hide file tree
Showing 11 changed files with 1,924 additions and 19 deletions.
762 changes: 762 additions & 0 deletions go/pools/smartconnpool/pool.go

Large diffs are not rendered by default.

1,082 changes: 1,082 additions & 0 deletions go/pools/smartconnpool/pool_test.go

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions go/vt/vttablet/endtoend/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,10 @@ func TestSidecarTables(t *testing.T) {
}

func TestConsolidation(t *testing.T) {
defer framework.Server.SetPoolSize(framework.Server.PoolSize())
framework.Server.SetPoolSize(1)
defer framework.Server.SetPoolSize(context.Background(), framework.Server.PoolSize())

err := framework.Server.SetPoolSize(context.Background(), 1)
require.NoError(t, err)

const tag = "Waits/Histograms/Consolidations/Count"

Expand Down
7 changes: 5 additions & 2 deletions go/vt/vttablet/endtoend/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package endtoend

import (
"context"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -98,11 +99,13 @@ func TestStreamConsolidation(t *testing.T) {

defaultPoolSize := framework.Server.StreamPoolSize()

framework.Server.SetStreamPoolSize(4)
err = framework.Server.SetStreamPoolSize(context.Background(), 4)
require.NoError(t, err)

framework.Server.SetStreamConsolidationBlocking(true)

defer func() {
framework.Server.SetStreamPoolSize(defaultPoolSize)
_ = framework.Server.SetStreamPoolSize(context.Background(), defaultPoolSize)
framework.Server.SetStreamConsolidationBlocking(false)
}()

Expand Down
6 changes: 0 additions & 6 deletions go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,9 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// ErrConnPoolClosed is returned when the connection pool is closed.
var ErrConnPoolClosed = vterrors.New(vtrpcpb.Code_INTERNAL, "internal error: unexpected: conn pool is closed")

const (
getWithoutS = "GetWithoutSettings"
getWithS = "GetWithSettings"
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vttablet/tabletserver/connpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestConnPoolTimeout(t *testing.T) {
require.NoError(t, err)
defer dbConn.Recycle()
_, err = connPool.Get(context.Background(), nil)
assert.EqualError(t, err, "resource pool timed out")
assert.EqualError(t, err, "connection pool timed out")
}

func TestConnPoolMaxWaiters(t *testing.T) {
Expand Down Expand Up @@ -181,6 +181,7 @@ func TestConnPoolSetCapacity(t *testing.T) {
connPool := newPool()
connPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer connPool.Close()
<<<<<<< HEAD
err := connPool.SetCapacity(-10)
if err == nil {
t.Fatalf("set capacity should return error for negative capacity")
Expand All @@ -189,6 +190,14 @@ func TestConnPoolSetCapacity(t *testing.T) {
if err != nil {
t.Fatalf("set capacity should succeed")
}
=======

assert.Panics(t, func() {
_ = connPool.SetCapacity(context.Background(), -10)
})
err := connPool.SetCapacity(context.Background(), 10)
assert.NoError(t, err)
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))
if connPool.Capacity() != 10 {
t.Fatalf("capacity should be 10")
}
Expand Down
18 changes: 15 additions & 3 deletions go/vt/vttablet/tabletserver/debugenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package tabletserver

import (
"context"
"encoding/json"
"fmt"
"html"
Expand Down Expand Up @@ -82,6 +83,17 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
f(ival)
msg = fmt.Sprintf("Setting %v to: %v", varname, value)
}
setIntValCtx := func(f func(context.Context, int) error) {
ival, err := strconv.Atoi(value)
if err == nil {
err = f(r.Context(), ival)
if err == nil {
msg = fmt.Sprintf("Setting %v to: %v", varname, value)
return
}
}
msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err)
}
setInt64Val := func(f func(int64)) {
ival, err := strconv.ParseInt(value, 10, 64)
if err != nil {
Expand Down Expand Up @@ -111,11 +123,11 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
}
switch varname {
case "PoolSize":
setIntVal(tsv.SetPoolSize)
setIntValCtx(tsv.SetPoolSize)
case "StreamPoolSize":
setIntVal(tsv.SetStreamPoolSize)
setIntValCtx(tsv.SetStreamPoolSize)
case "TxPoolSize":
setIntVal(tsv.SetTxPoolSize)
setIntValCtx(tsv.SetTxPoolSize)
case "MaxResultSize":
setIntVal(tsv.SetMaxResultSize)
case "WarnResultSize":
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ func (qre *QueryExecutor) getConn() (*connpool.DBConn, error) {
span, ctx := trace.NewSpan(qre.ctx, "QueryExecutor.getConn")
defer span.Finish()

<<<<<<< HEAD
start := time.Now()
conn, err := qre.tsv.qe.conns.Get(ctx, qre.setting)

Expand All @@ -783,12 +784,19 @@ func (qre *QueryExecutor) getConn() (*connpool.DBConn, error) {
return nil, err
}
return nil, err
=======
defer func(start time.Time) {
qre.logStats.WaitingForConnection += time.Since(start)
}(time.Now())
return qre.tsv.qe.conns.Get(ctx, qre.setting)
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))
}

func (qre *QueryExecutor) getStreamConn() (*connpool.DBConn, error) {
span, ctx := trace.NewSpan(qre.ctx, "QueryExecutor.getStreamConn")
defer span.Finish()

<<<<<<< HEAD
start := time.Now()
conn, err := qre.tsv.qe.streamConns.Get(ctx, qre.setting)
switch err {
Expand All @@ -799,6 +807,12 @@ func (qre *QueryExecutor) getStreamConn() (*connpool.DBConn, error) {
return nil, err
}
return nil, err
=======
defer func(start time.Time) {
qre.logStats.WaitingForConnection += time.Since(start)
}(time.Now())
return qre.tsv.qe.streamConns.Get(ctx, qre.setting)
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))
}

// txFetch fetches from a TxConnection.
Expand Down
18 changes: 16 additions & 2 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1979,11 +1979,15 @@ func (tsv *TabletServer) EnableHistorian(enabled bool) {
}

// SetPoolSize changes the pool size to the specified value.
func (tsv *TabletServer) SetPoolSize(val int) {
func (tsv *TabletServer) SetPoolSize(ctx context.Context, val int) error {
if val <= 0 {
return
return nil
}
<<<<<<< HEAD
tsv.qe.conns.SetCapacity(val)
=======
return tsv.qe.conns.SetCapacity(ctx, int64(val))
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))
}

// PoolSize returns the pool size.
Expand All @@ -1992,8 +1996,13 @@ func (tsv *TabletServer) PoolSize() int {
}

// SetStreamPoolSize changes the pool size to the specified value.
<<<<<<< HEAD
func (tsv *TabletServer) SetStreamPoolSize(val int) {
tsv.qe.streamConns.SetCapacity(val)
=======
func (tsv *TabletServer) SetStreamPoolSize(ctx context.Context, val int) error {
return tsv.qe.streamConns.SetCapacity(ctx, int64(val))
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))
}

// SetStreamConsolidationBlocking sets whether the stream consolidator should wait for slow clients
Expand All @@ -2007,8 +2016,13 @@ func (tsv *TabletServer) StreamPoolSize() int {
}

// SetTxPoolSize changes the tx pool size to the specified value.
<<<<<<< HEAD
func (tsv *TabletServer) SetTxPoolSize(val int) {
tsv.te.txPool.scp.conns.SetCapacity(val)
=======
func (tsv *TabletServer) SetTxPoolSize(ctx context.Context, val int) error {
return tsv.te.txPool.scp.conns.SetCapacity(ctx, int64(val))
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))
}

// TxPoolSize returns the tx pool size.
Expand Down
12 changes: 9 additions & 3 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2046,23 +2046,29 @@ func TestConfigChanges(t *testing.T) {
newSize := 10
newDuration := time.Duration(10 * time.Millisecond)

tsv.SetPoolSize(newSize)
err := tsv.SetPoolSize(context.Background(), newSize)
require.NoError(t, err)

if val := tsv.PoolSize(); val != newSize {
t.Errorf("PoolSize: %d, want %d", val, newSize)
}
if val := int(tsv.qe.conns.Capacity()); val != newSize {
t.Errorf("tsv.qe.connPool.Capacity: %d, want %d", val, newSize)
}

tsv.SetStreamPoolSize(newSize)
err = tsv.SetStreamPoolSize(context.Background(), newSize)
require.NoError(t, err)

if val := tsv.StreamPoolSize(); val != newSize {
t.Errorf("StreamPoolSize: %d, want %d", val, newSize)
}
if val := int(tsv.qe.streamConns.Capacity()); val != newSize {
t.Errorf("tsv.qe.streamConnPool.Capacity: %d, want %d", val, newSize)
}

tsv.SetTxPoolSize(newSize)
err = tsv.SetTxPoolSize(context.Background(), newSize)
require.NoError(t, err)

if val := tsv.TxPoolSize(); val != newSize {
t.Errorf("TxPoolSize: %d, want %d", val, newSize)
}
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vttablet/tabletserver/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,15 @@ func primeTxPoolWithConnection(t *testing.T, ctx context.Context) (*fakesqldb.DB
db := fakesqldb.New(t)
txPool, _ := newTxPool()
// Set the capacity to 1 to ensure that the db connection is reused.
<<<<<<< HEAD
txPool.scp.conns.SetCapacity(1)
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
=======
err := txPool.scp.conns.SetCapacity(context.Background(), 1)
require.NoError(t, err)
params := dbconfigs.New(db.ConnParams())
txPool.Open(params, params, params)
>>>>>>> afbce6aa87 (connpool: Allow time out during shutdown (#15979))

// Run a query to trigger a database connection. That connection will be
// reused by subsequent transactions.
Expand Down

0 comments on commit 0ed053a

Please sign in to comment.