Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to set preferred node id to execute query #1547

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Added option to set preferred node id to execute query
* Set the `pick_first` balancer for short-lived grpc connection inside ydb cluster discovery attempt

## v3.90.1
Expand Down
5 changes: 5 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,8 @@ func WithOperationTimeout(ctx context.Context, operationTimeout time.Duration) c
func WithOperationCancelAfter(ctx context.Context, operationCancelAfter time.Duration) context.Context {
return operation.WithCancelAfter(ctx, operationCancelAfter)
}

// WithPreferredNodeID allows to set preferred node to get session from
func WithPreferredNodeID(ctx context.Context, nodeID uint32) context.Context {
return operation.WithPreferredNodeID(ctx, nodeID)
}
12 changes: 12 additions & 0 deletions internal/conn/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"

"google.golang.org/grpc"

balancerContext "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
)

var _ grpc.ClientConnInterface = (*middleware)(nil)
Expand All @@ -30,6 +32,16 @@ func (m *middleware) NewStream(
return m.newStream(ctx, desc, method, opts...)
}

func ModifyConn(cc grpc.ClientConnInterface, nodeID uint32) grpc.ClientConnInterface {
if nodeID != 0 {
return WithContextModifier(cc, func(ctx context.Context) context.Context {
return balancerContext.WithNodeID(ctx, nodeID)
})
}

return cc
}

func WithContextModifier(
cc grpc.ClientConnInterface,
modifyCtx func(ctx context.Context) context.Context,
Expand Down
18 changes: 18 additions & 0 deletions internal/operation/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
type (
ctxOperationTimeoutKey struct{}
ctxOperationCancelAfterKey struct{}
ctxWithPreferredNodeIDKey struct{}
)

// WithTimeout returns a copy of parent context in which YDB operation timeout
Expand All @@ -33,6 +34,10 @@ func WithCancelAfter(ctx context.Context, operationCancelAfter time.Duration) co
return context.WithValue(ctx, ctxOperationCancelAfterKey{}, operationCancelAfter)
}

func WithPreferredNodeID(ctx context.Context, nodeID uint32) context.Context {
return context.WithValue(ctx, ctxWithPreferredNodeIDKey{}, nodeID)
}

// ctxTimeout returns the timeout within given context after which
// YDB should try to cancel operation and return result regardless of the cancelation.
func ctxTimeout(ctx context.Context) (d time.Duration, ok bool) {
Expand All @@ -57,3 +62,16 @@ func ctxUntilDeadline(ctx context.Context) (time.Duration, bool) {

return 0, false
}

func CtxPreferredNodeID(ctx context.Context) uint32 {
x := ctx.Value(ctxWithPreferredNodeIDKey{})
if x == nil {
return 0
}
val, ok := x.(uint32)
if !ok {
return 0
}

return val
}
35 changes: 22 additions & 13 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/jonboulle/clockwork"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
Expand All @@ -20,6 +21,7 @@ type (
Item interface {
IsAlive() bool
Close(ctx context.Context) error
NodeID() uint32
}
ItemConstraint[T any] interface {
*T
Expand All @@ -30,7 +32,7 @@ type (
clock clockwork.Clock
limit int
createTimeout time.Duration
createItem func(ctx context.Context) (PT, error)
createItem func(ctx context.Context, preferredNodeID uint32) (PT, error)
closeTimeout time.Duration
closeItem func(ctx context.Context, item PT)
idleTimeToLive time.Duration
Expand All @@ -48,7 +50,7 @@ type (
Pool[PT ItemConstraint[T], T any] struct {
config Config[PT, T]

createItem func(ctx context.Context) (PT, error)
createItem func(ctx context.Context, preferredNodeID uint32) (PT, error)
closeItem func(ctx context.Context, item PT)

mu xsync.RWMutex
Expand All @@ -63,7 +65,7 @@ type (
Option[PT ItemConstraint[T], T any] func(c *Config[PT, T])
)

func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(ctx context.Context) (PT, error)) Option[PT, T] {
func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(context.Context, uint32) (PT, error)) Option[PT, T] {
return func(c *Config[PT, T]) {
c.createItem = f
}
Expand Down Expand Up @@ -173,7 +175,7 @@ func New[PT ItemConstraint[T], T any](
}

// defaultCreateItem returns a new item
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error) {
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context, uint32) (PT, error) {
var item T

return &item, nil
Expand All @@ -182,8 +184,8 @@ func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error)
// makeAsyncCreateItemFunc wraps the createItem function with timeout handling
func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
p *Pool[PT, T],
) func(ctx context.Context) (PT, error) {
return func(ctx context.Context) (PT, error) {
) func(ctx context.Context, preferredNodeID uint32) (PT, error) {
return func(ctx context.Context, preferredNodeID uint32) (PT, error) {
if !xsync.WithLock(&p.mu, func() bool {
if len(p.index)+p.createInProgress < p.config.limit {
p.createInProgress++
Expand Down Expand Up @@ -222,7 +224,7 @@ func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
defer cancelCreate()
}

newItem, err := p.config.createItem(createCtx)
newItem, err := p.config.createItem(createCtx, preferredNodeID)
if newItem != nil {
p.mu.WithLock(func() {
var useCounter uint64
Expand Down Expand Up @@ -314,7 +316,7 @@ func (p *Pool[PT, T]) changeState(changeState func() Stats) {
}
}

func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item PT) error) (finalErr error) {
func (p *Pool[PT, T]) try(ctx context.Context, f func(context.Context, PT) error) (finalErr error) {
if onTry := p.config.trace.OnTry; onTry != nil {
onDone := onTry(&ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/pool.(*Pool).try"),
Expand Down Expand Up @@ -460,8 +462,13 @@ func (p *Pool[PT, T]) putWaitCh(ch *chan PT) { //nolint:gocritic
}

// p.mu must be held.
func (p *Pool[PT, T]) peekFirstIdle() (item PT, touched time.Time) {
func (p *Pool[PT, T]) peekFirstIdle(preferredNodeID uint32) (item PT, touched time.Time) {
el := p.idle.Front()
if preferredNodeID != 0 {
for el != nil && el.Value.NodeID() != preferredNodeID {
el = el.Next()
}
}
if el == nil {
return
}
Expand All @@ -478,8 +485,8 @@ func (p *Pool[PT, T]) peekFirstIdle() (item PT, touched time.Time) {
// to prevent session from dying in the internalPoolGC after it was returned
// to be used only in outgoing functions that make session busy.
// p.mu must be held.
func (p *Pool[PT, T]) removeFirstIdle() PT {
idle, _ := p.peekFirstIdle()
func (p *Pool[PT, T]) removeFirstIdle(preferredNodeID uint32) PT {
idle, _ := p.peekFirstIdle(preferredNodeID)
if idle != nil {
info := p.removeIdle(idle)
p.index[idle] = info
Expand Down Expand Up @@ -585,6 +592,8 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
}
}

preferredNodeID := operation.CtxPreferredNodeID(ctx)

for ; attempt < maxAttempts; attempt++ {
select {
case <-p.done:
Expand All @@ -593,7 +602,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
}

if item := xsync.WithLock(&p.mu, func() PT { //nolint:nestif
return p.removeFirstIdle()
return p.removeFirstIdle(preferredNodeID)
}); item != nil {
if item.IsAlive() {
info := xsync.WithLock(&p.mu, func() itemInfo[PT, T] {
Expand Down Expand Up @@ -625,7 +634,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
}
}

item, err := p.createItem(ctx)
item, err := p.createItem(ctx, preferredNodeID)
if item != nil {
return item, nil
}
Expand Down
Loading
Loading