Skip to content

Commit

Permalink
Allow to set preferred node id to execute query
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik committed Nov 1, 2024
1 parent 17a901b commit 66aa643
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 90 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Allow to set preferred node id to execute query

## v3.89.2
* Returned log.XXX methods for create fields, removed from public at v3.85.0

Expand Down
43 changes: 27 additions & 16 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type (
Item interface {
IsAlive() bool
Close(ctx context.Context) error
NodeID() uint32
}
ItemConstraint[T any] interface {
*T
Expand All @@ -30,7 +31,7 @@ type (
clock clockwork.Clock
limit int
createTimeout time.Duration
createItem func(ctx context.Context) (PT, error)
createItem func(ctx context.Context, preferredNodeId uint32) (PT, error)
closeTimeout time.Duration
closeItem func(ctx context.Context, item PT)
idleTimeToLive time.Duration
Expand All @@ -48,7 +49,7 @@ type (
Pool[PT ItemConstraint[T], T any] struct {
config Config[PT, T]

createItem func(ctx context.Context) (PT, error)
createItem func(ctx context.Context, preferredNodeId uint32) (PT, error)
closeItem func(ctx context.Context, item PT)

mu xsync.RWMutex
Expand All @@ -61,9 +62,12 @@ type (
done chan struct{}
}
Option[PT ItemConstraint[T], T any] func(c *Config[PT, T])
SessionCallOption struct {
preferredNodeIdOption uint32
}
)

func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(ctx context.Context) (PT, error)) Option[PT, T] {
func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(ctx context.Context, preferredNodeId uint32) (PT, error)) Option[PT, T] {
return func(c *Config[PT, T]) {
c.createItem = f
}
Expand Down Expand Up @@ -173,7 +177,7 @@ func New[PT ItemConstraint[T], T any](
}

// defaultCreateItem returns a new item
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error) {
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context, uint32) (PT, error) {
var item T

return &item, nil
Expand All @@ -182,8 +186,8 @@ func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error)
// makeAsyncCreateItemFunc wraps the createItem function with timeout handling
func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
p *Pool[PT, T],
) func(ctx context.Context) (PT, error) {
return func(ctx context.Context) (PT, error) {
) func(ctx context.Context, preferredNodeId uint32) (PT, error) {
return func(ctx context.Context, preferredNodeId uint32) (PT, error) {
if !xsync.WithLock(&p.mu, func() bool {
if len(p.index)+p.createInProgress < p.config.limit {
p.createInProgress++
Expand Down Expand Up @@ -222,7 +226,7 @@ func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
defer cancelCreate()
}

newItem, err := p.config.createItem(createCtx)
newItem, err := p.config.createItem(createCtx, preferredNodeId)
if newItem != nil {
p.mu.WithLock(func() {
var useCounter uint64
Expand Down Expand Up @@ -314,7 +318,7 @@ func (p *Pool[PT, T]) changeState(changeState func() Stats) {
}
}

func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item PT) error) (finalErr error) {
func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item PT) error, preferredNodeId uint32) (finalErr error) {
if onTry := p.config.trace.OnTry; onTry != nil {
onDone := onTry(&ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).try"),
Expand All @@ -334,7 +338,7 @@ func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item
default:
}

item, err := p.getItem(ctx)
item, err := p.getItem(ctx, preferredNodeId)
if err != nil {
if xerrors.IsYdb(err) {
return xerrors.WithStackTrace(xerrors.Retryable(err))
Expand All @@ -358,6 +362,7 @@ func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item
func (p *Pool[PT, T]) With(
ctx context.Context,
f func(ctx context.Context, item PT) error,
preferredNodeId uint32,
opts ...retry.Option,
) (finalErr error) {
var attempts int
Expand All @@ -375,7 +380,7 @@ func (p *Pool[PT, T]) With(

err := retry.Retry(ctx, func(ctx context.Context) error {
attempts++
err := p.try(ctx, f)
err := p.try(ctx, f, preferredNodeId)
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -460,8 +465,13 @@ func (p *Pool[PT, T]) putWaitCh(ch *chan PT) { //nolint:gocritic
}

// p.mu must be held.
func (p *Pool[PT, T]) peekFirstIdle() (item PT, touched time.Time) {
func (p *Pool[PT, T]) peekFirstIdle(preferredNodeId uint32) (item PT, touched time.Time) {
el := p.idle.Front()
if preferredNodeId != 0 {
for el != nil && el.Value.NodeID() != preferredNodeId {
el = el.Next()
}
}
if el == nil {
return
}
Expand All @@ -478,8 +488,8 @@ func (p *Pool[PT, T]) peekFirstIdle() (item PT, touched time.Time) {
// to prevent session from dying in the internalPoolGC after it was returned
// to be used only in outgoing functions that make session busy.
// p.mu must be held.
func (p *Pool[PT, T]) removeFirstIdle() PT {
idle, _ := p.peekFirstIdle()
func (p *Pool[PT, T]) removeFirstIdle(preferredNodeId uint32) PT {
idle, _ := p.peekFirstIdle(preferredNodeId)
if idle != nil {
info := p.removeIdle(idle)
p.index[idle] = info
Expand Down Expand Up @@ -567,7 +577,7 @@ func (p *Pool[PT, T]) pushIdle(item PT, now time.Time) {

const maxAttempts = 100

func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { //nolint:funlen
func (p *Pool[PT, T]) getItem(ctx context.Context, preferredNodeId uint32) (item PT, finalErr error) { //nolint:funlen
var (
start = p.config.clock.Now()
attempt int
Expand All @@ -593,8 +603,9 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
}

if item := xsync.WithLock(&p.mu, func() PT { //nolint:nestif
return p.removeFirstIdle()
return p.removeFirstIdle(preferredNodeId)
}); item != nil {

if item.IsAlive() {
info := xsync.WithLock(&p.mu, func() itemInfo[PT, T] {
info, has := p.index[item]
Expand Down Expand Up @@ -625,7 +636,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
}
}

item, err := p.createItem(ctx)
item, err := p.createItem(ctx, preferredNodeId)
if item != nil {
return item, nil
}
Expand Down
Loading

0 comments on commit 66aa643

Please sign in to comment.