diff --git a/CHANGELOG.md b/CHANGELOG.md index 839523749..0bb0c597f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added `meatadata.Middleware` for balancers + ## v3.89.2 * Returned log.XXX methods for create fields, removed from public at v3.85.0 diff --git a/driver.go b/driver.go index b168f87ae..b531c6ca3 100644 --- a/driver.go +++ b/driver.go @@ -96,7 +96,7 @@ type Driver struct { pool *conn.Pool mtx sync.Mutex - balancer *balancer.Balancer + balancer balancer.ConnBalancer children map[uint64]*Driver childrenMtx xsync.Mutex @@ -416,10 +416,11 @@ func (d *Driver) connect(ctx context.Context) (err error) { d.pool = conn.NewPool(ctx, d.config) } if d.balancer == nil { - d.balancer, err = balancer.New(ctx, d.config, d.pool, d.discoveryOptions...) + b, err := balancer.New(ctx, d.config, d.pool, d.discoveryOptions...) if err != nil { return xerrors.WithStackTrace(err) } + d.balancer = balancer.WithMetadata(b, d.config.Meta()) } d.table = xsync.OnceValue(func() (*internalTable.Client, error) { diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index 273993ded..005f44ab2 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -34,6 +34,11 @@ type discoveryClient interface { Discover(ctx context.Context) ([]endpoint.Endpoint, error) } +type ConnBalancer interface { + grpc.ClientConnInterface + closer.Closer +} + type Balancer struct { driverConfig *config.Config config balancerConfig.Config @@ -299,10 +304,6 @@ func (b *Balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc } }() - if ctx, err = b.driverConfig.Meta().Context(ctx); err != nil { - return xerrors.WithStackTrace(err) - } - if err = f(ctx, cc); err != nil { if conn.UseWrapping(ctx) { if credentials.IsAccessError(err) { diff --git a/internal/balancer/shared.go b/internal/balancer/shared.go new file mode 100644 index 000000000..17ab584bc --- /dev/null +++ b/internal/balancer/shared.go @@ -0,0 +1,41 @@ +package balancer + +import ( + "context" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/meta" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "google.golang.org/grpc" +) + +type Shared struct { + balancer ConnBalancer + meta *meta.Meta +} + +func WithMetadata(b ConnBalancer, m *meta.Meta) *Shared { + return &Shared{balancer: b, meta: m} +} + +func (s *Shared) Invoke(ctx context.Context, method string, args any, reply any, + opts ...grpc.CallOption) error { + metaCtx, err := s.meta.Context(ctx) + if err != nil { + return xerrors.WithStackTrace(err) + } + return s.balancer.Invoke(metaCtx, method, args, reply, opts...) + +} + +func (s *Shared) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, + opts ...grpc.CallOption) (grpc.ClientStream, error) { + metaCtx, err := s.meta.Context(ctx) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + return s.balancer.NewStream(metaCtx, desc, method, opts...) +} + +func (s *Shared) Close(ctx context.Context) error { + return s.balancer.Close(ctx) +} diff --git a/options.go b/options.go index 7dfdfc3dd..bef8c88f0 100644 --- a/options.go +++ b/options.go @@ -11,6 +11,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/config" "github.com/ydb-platform/ydb-go-sdk/v3/credentials" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer" balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/certificates" "github.com/ydb-platform/ydb-go-sdk/v3/internal/conn" @@ -602,8 +603,7 @@ func WithPanicCallback(panicCallback func(e interface{})) Option { // WithSharedBalancer sets balancer from parent driver to child driver func WithSharedBalancer(parent *Driver) Option { return func(ctx context.Context, c *Driver) error { - c.balancer = parent.balancer - + c.balancer = balancer.WithMetadata(parent.balancer, c.config.Meta()) return nil } }