Skip to content

Commit

Permalink
*: use a unified session pool definition AMAP (#55170)
Browse files Browse the repository at this point in the history
ref #54434
  • Loading branch information
JmPotato authored Aug 5, 2024
1 parent 6e03d23 commit 5aad7df
Show file tree
Hide file tree
Showing 35 changed files with 234 additions and 190 deletions.
2 changes: 1 addition & 1 deletion pkg/bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//pkg/sessionctx/variable",
"//pkg/types",
"//pkg/types/parser_driver",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/hack",
"//pkg/util/hint",
Expand All @@ -41,7 +42,6 @@ go_library(
"//pkg/util/stmtsummary/v2:stmtsummary",
"//pkg/util/stringutil",
"//pkg/util/table-filter",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_golang_x_sync//singleflight",
Expand Down
5 changes: 3 additions & 2 deletions pkg/bindinfo/global_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/types"
driver "github.com/pingcap/tidb/pkg/types/parser_driver"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/hint"
utilparser "github.com/pingcap/tidb/pkg/util/parser"
Expand Down Expand Up @@ -109,7 +110,7 @@ type GlobalBindingHandle interface {

// globalBindingHandle is used to handle all global sql bind operations.
type globalBindingHandle struct {
sPool SessionPool
sPool util.SessionPool

fuzzyBindingCache atomic.Value

Expand Down Expand Up @@ -149,7 +150,7 @@ const (
)

// NewGlobalBindingHandle creates a new GlobalBindingHandle.
func NewGlobalBindingHandle(sPool SessionPool) GlobalBindingHandle {
func NewGlobalBindingHandle(sPool util.SessionPool) GlobalBindingHandle {
handle := &globalBindingHandle{sPool: sPool}
handle.Reset()
return handle
Expand Down
5 changes: 3 additions & 2 deletions pkg/bindinfo/global_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,5 +609,6 @@ func (p *mockSessionPool) Get() (pools.Resource, error) {
return p.se, nil
}

func (p *mockSessionPool) Put(pools.Resource) {
}
func (p *mockSessionPool) Put(pools.Resource) {}

func (p *mockSessionPool) Close() {}
7 changes: 0 additions & 7 deletions pkg/bindinfo/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package bindinfo
import (
"context"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx"
Expand All @@ -37,9 +36,3 @@ func execRows(sctx sessionctx.Context, sql string, args ...any) (rows []chunk.Ro
return sqlExec.ExecRestrictedSQL(kv.WithInternalSourceType(context.Background(), kv.InternalTxnBindInfo),
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, args...)
}

// SessionPool is used to recycle sessionctx.
type SessionPool interface {
Get() (pools.Resource, error)
Put(pools.Resource)
}
2 changes: 1 addition & 1 deletion pkg/disttask/framework/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ go_library(
"//pkg/kv",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/cpu",
"//pkg/util/logutil",
"//pkg/util/sqlescape",
"//pkg/util/sqlexec",
"@com_github_docker_go_units//:go-units",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//util",
Expand Down
15 changes: 5 additions & 10 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import (
"sync/atomic"

"github.com/docker/go-units"
"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/tikv/client-go/v2/util"
clitutil "github.com/tikv/client-go/v2/util"
)

const (
Expand Down Expand Up @@ -99,12 +99,7 @@ type TaskHandle interface {

// TaskManager is the manager of task and subtask.
type TaskManager struct {
sePool sessionPool
}

type sessionPool interface {
Get() (pools.Resource, error)
Put(resource pools.Resource)
sePool util.SessionPool
}

var _ SessionExecutor = &TaskManager{}
Expand All @@ -117,7 +112,7 @@ var (
)

// NewTaskManager creates a new task manager.
func NewTaskManager(sePool sessionPool) *TaskManager {
func NewTaskManager(sePool util.SessionPool) *TaskManager {
return &TaskManager{
sePool: sePool,
}
Expand Down Expand Up @@ -149,7 +144,7 @@ func (mgr *TaskManager) WithNewSession(fn func(se sessionctx.Context) error) err

// WithNewTxn executes the fn in a new transaction.
func (mgr *TaskManager) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error {
ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask)
ctx = clitutil.WithInternalSourceType(ctx, kv.InternalDistTask)
return mgr.WithNewSession(func(se sessionctx.Context) (err error) {
_, err = sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), "begin")
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,11 @@ go_test(
"ru_stats_test.go",
"schema_checker_test.go",
"schema_validator_test.go",
"session_pool_test.go",
"topn_slow_query_test.go",
],
embed = [":domain"],
flaky = True,
shard_count = 29,
shard_count = 28,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand All @@ -151,7 +150,6 @@ go_test(
"//pkg/parser/terror",
"//pkg/server",
"//pkg/session",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/store/mockstore",
"//pkg/testkit",
Expand Down
4 changes: 2 additions & 2 deletions pkg/domain/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ func TestAbnormalSessionPool(t *testing.T) {
info.SetSessionManager(svr)

pool := domain.SysSessionPool()
failpoint.Enable("github.com/pingcap/tidb/pkg/domain/mockSessionPoolReturnError", "return")
failpoint.Enable("github.com/pingcap/tidb/pkg/util/mockSessionPoolReturnError", "return")
se, err := pool.Get()
require.Error(t, err)
failpoint.Disable("github.com/pingcap/tidb/pkg/domain/mockSessionPoolReturnError")
failpoint.Disable("github.com/pingcap/tidb/pkg/util/mockSessionPoolReturnError")
require.Equal(t, svr.InternalSessionExists(se), false)
}
103 changes: 19 additions & 84 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ type Domain struct {
globalCfgSyncer *globalconfigsync.GlobalConfigSyncer
m syncutil.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
sysSessionPool util.SessionPool
exit chan struct{}
// `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace.
etcdClient *clientv3.Client
Expand Down Expand Up @@ -1221,9 +1221,21 @@ const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool wil
func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory) *Domain {
capacity := 200 // capacity of the sysSessionPool size
do := &Domain{
store: store,
exit: make(chan struct{}),
sysSessionPool: newSessionPool(capacity, factory),
store: store,
exit: make(chan struct{}),
sysSessionPool: util.NewSessionPool(
capacity, factory,
func(r pools.Resource) {
_, ok := r.(sessionctx.Context)
intest.Assert(ok)
infosync.StoreInternalSession(r)
},
func(r pools.Resource) {
_, ok := r.(sessionctx.Context)
intest.Assert(ok)
infosync.DeleteInternalSession(r)
},
),
statsLease: statsLease,
slowQuery: newTopNSlowQueries(config.GetGlobalConfig().InMemSlowQueryTopNNum, time.Hour*24*7, config.GetGlobalConfig().InMemSlowQueryRecentNum),
dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName(), GetExtractTaskDirName()}},
Expand Down Expand Up @@ -1726,84 +1738,8 @@ func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storag
}
}

type sessionPool struct {
resources chan pools.Resource
factory pools.Factory
mu struct {
sync.RWMutex
closed bool
}
}

func newSessionPool(capacity int, factory pools.Factory) *sessionPool {
return &sessionPool{
resources: make(chan pools.Resource, capacity),
factory: factory,
}
}

func (p *sessionPool) Get() (resource pools.Resource, err error) {
var ok bool
select {
case resource, ok = <-p.resources:
if !ok {
err = errors.New("session pool closed")
}
default:
resource, err = p.factory()
}

// Put the internal session to the map of SessionManager
failpoint.Inject("mockSessionPoolReturnError", func() {
err = errors.New("mockSessionPoolReturnError")
})

if nil == err {
_, ok = resource.(sessionctx.Context)
intest.Assert(ok)
infosync.StoreInternalSession(resource)
}

return
}

func (p *sessionPool) Put(resource pools.Resource) {
_, ok := resource.(sessionctx.Context)
intest.Assert(ok)

p.mu.RLock()
defer p.mu.RUnlock()
// Delete the internal session to the map of SessionManager
infosync.DeleteInternalSession(resource)
if p.mu.closed {
resource.Close()
return
}

select {
case p.resources <- resource:
default:
resource.Close()
}
}

func (p *sessionPool) Close() {
p.mu.Lock()
if p.mu.closed {
p.mu.Unlock()
return
}
p.mu.closed = true
close(p.resources)
p.mu.Unlock()

for r := range p.resources {
r.Close()
}
}

// SysSessionPool returns the system session pool.
func (do *Domain) SysSessionPool() *sessionPool {
func (do *Domain) SysSessionPool() util.SessionPool {
return do.sysSessionPool
}

Expand Down Expand Up @@ -2796,12 +2732,11 @@ func (do *Domain) NotifyUpdatePrivilege() error {
}

// update locally
sysSessionPool := do.SysSessionPool()
ctx, err := sysSessionPool.Get()
ctx, err := do.sysSessionPool.Get()
if err != nil {
return err
}
defer sysSessionPool.Put(ctx)
defer do.sysSessionPool.Put(ctx)
return do.PrivilegeHandle().Update(ctx.(sessionctx.Context))
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/domain/ru_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
pd "github.com/tikv/pd/client"
Expand All @@ -46,7 +47,7 @@ type RUStatsWriter struct {
RMClient pd.ResourceManagerClient
InfoCache *infoschema.InfoCache
store kv.Storage
sessPool *sessionPool
sessPool util.SessionPool
// current time, cache it here to make unit test easier.
StartTime time.Time
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/domain/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (do *Domain) handleRemoveStaleRunawayWatch(record *resourcegroup.Quarantine
return err
}

func execRestrictedSQL(sessPool *sessionPool, sql string, params []any) ([]chunk.Row, error) {
func execRestrictedSQL(sessPool util.SessionPool, sql string, params []any) ([]chunk.Row, error) {
se, err := sessPool.Get()
defer func() {
sessPool.Put(se)
Expand Down Expand Up @@ -484,11 +484,11 @@ func (do *Domain) initResourceGroupsController(ctx context.Context, pdClient pd.
type runawaySyncer struct {
newWatchReader *SystemTableReader
deletionWatchReader *SystemTableReader
sysSessionPool *sessionPool
sysSessionPool util.SessionPool
mu sync.Mutex
}

func newRunawaySyncer(sysSessionPool *sessionPool) *runawaySyncer {
func newRunawaySyncer(sysSessionPool util.SessionPool) *runawaySyncer {
return &runawaySyncer{
sysSessionPool: sysSessionPool,
newWatchReader: &SystemTableReader{
Expand Down
5 changes: 2 additions & 3 deletions pkg/domain/sysvar_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,11 @@ func (do *Domain) rebuildSysVarCache(ctx sessionctx.Context) error {
newSessionCache := make(map[string]string)
newGlobalCache := make(map[string]string)
if ctx == nil {
sysSessionPool := do.SysSessionPool()
res, err := sysSessionPool.Get()
res, err := do.sysSessionPool.Get()
if err != nil {
return err
}
defer sysSessionPool.Put(res)
defer do.sysSessionPool.Put(res)
ctx = res.(sessionctx.Context)
}
// Only one rebuild can be in progress at a time, this prevents a lost update race
Expand Down
7 changes: 1 addition & 6 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,12 +1163,7 @@ func sqlForLog(sql string) string {
return executor.QueryReplacer.Replace(sql)
}

type sessionPool interface {
Get() (pools.Resource, error)
Put(pools.Resource)
}

func (s *session) sysSessionPool() sessionPool {
func (s *session) sysSessionPool() util.SessionPool {
return domain.GetDomain(s).SysSessionPool()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//pkg/statistics/handle/usage",
"//pkg/statistics/handle/util",
"//pkg/types",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/memory",
Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/statistics/handle/usage"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
pkgutil "github.com/pingcap/tidb/pkg/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -110,7 +111,7 @@ func NewHandle(
_, /* ctx, keep it for feature usage */
initStatsCtx sessionctx.Context,
lease time.Duration,
pool util.SessionPool,
pool pkgutil.SessionPool,
tracker sysproctrack.Tracker,
autoAnalyzeProcIDGetter func() uint64,
releaseAutoAnalyzeProcID func(uint64),
Expand Down
Loading

0 comments on commit 5aad7df

Please sign in to comment.