diff --git a/CHANGELOG.md b/CHANGELOG.md index 839523749..19cd13fe3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added internal balancer wuth metadata customization + ## v3.89.2 * Returned log.XXX methods for create fields, removed from public at v3.85.0 diff --git a/balancer.go b/balancer.go new file mode 100644 index 000000000..e2e2c2c75 --- /dev/null +++ b/balancer.go @@ -0,0 +1,49 @@ +package ydb + +import ( + "context" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/meta" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "google.golang.org/grpc" +) + +type balancerWithMeta struct { + balancer *balancer.Balancer + meta *meta.Meta +} + +func newBalancerWithMeta(b *balancer.Balancer, m *meta.Meta) *balancerWithMeta { + return &balancerWithMeta{balancer: b, meta: m} +} + +func (b *balancerWithMeta) Balancer() *balancer.Balancer { + return b.balancer +} + +func (b *balancerWithMeta) Invoke(ctx context.Context, method string, args any, reply any, + opts ...grpc.CallOption, +) error { + metaCtx, err := b.meta.Context(ctx) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return b.balancer.Invoke(metaCtx, method, args, reply, opts...) +} + +func (b *balancerWithMeta) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, + opts ...grpc.CallOption, +) (grpc.ClientStream, error) { + metaCtx, err := b.meta.Context(ctx) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + + return b.balancer.NewStream(metaCtx, desc, method, opts...) +} + +func (b *balancerWithMeta) Close(ctx context.Context) error { + return b.balancer.Close(ctx) +} diff --git a/driver.go b/driver.go index b168f87ae..b1e8f34af 100644 --- a/driver.go +++ b/driver.go @@ -96,7 +96,7 @@ type Driver struct { pool *conn.Pool mtx sync.Mutex - balancer *balancer.Balancer + balancer *balancerWithMeta children map[uint64]*Driver childrenMtx xsync.Mutex @@ -416,10 +416,11 @@ func (d *Driver) connect(ctx context.Context) (err error) { d.pool = conn.NewPool(ctx, d.config) } if d.balancer == nil { - d.balancer, err = balancer.New(ctx, d.config, d.pool, d.discoveryOptions...) + b, err := balancer.New(ctx, d.config, d.pool, d.discoveryOptions...) if err != nil { return xerrors.WithStackTrace(err) } + d.balancer = newBalancerWithMeta(b, d.config.Meta()) } d.table = xsync.OnceValue(func() (*internalTable.Client, error) { diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index 273993ded..03376ee81 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -299,10 +299,6 @@ func (b *Balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc } }() - if ctx, err = b.driverConfig.Meta().Context(ctx); err != nil { - return xerrors.WithStackTrace(err) - } - if err = f(ctx, cc); err != nil { if conn.UseWrapping(ctx) { if credentials.IsAccessError(err) { diff --git a/options.go b/options.go index 7dfdfc3dd..a8039ddef 100644 --- a/options.go +++ b/options.go @@ -602,7 +602,7 @@ func WithPanicCallback(panicCallback func(e interface{})) Option { // WithSharedBalancer sets balancer from parent driver to child driver func WithSharedBalancer(parent *Driver) Option { return func(ctx context.Context, c *Driver) error { - c.balancer = parent.balancer + c.balancer = newBalancerWithMeta(parent.balancer.Balancer(), c.config.Meta()) return nil }