Skip to content

Commit

Permalink
Added balancer with metadata customization
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman Golov committed Nov 2, 2024
1 parent 17a901b commit a789c3c
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added internal balancer wuth metadata customization

## v3.89.2
* Returned log.XXX methods for create fields, removed from public at v3.85.0

Expand Down
50 changes: 50 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package ydb

import (
"context"

"google.golang.org/grpc"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
)

type balancerWithMeta struct {
balancer *balancer.Balancer
meta *meta.Meta
}

func newBalancerWithMeta(b *balancer.Balancer, m *meta.Meta) *balancerWithMeta {
return &balancerWithMeta{balancer: b, meta: m}
}

func (b *balancerWithMeta) Balancer() *balancer.Balancer {
return b.balancer
}

func (b *balancerWithMeta) Invoke(ctx context.Context, method string, args any, reply any,
opts ...grpc.CallOption,
) error {
metaCtx, err := b.meta.Context(ctx)
if err != nil {
return xerrors.WithStackTrace(err)
}

return b.balancer.Invoke(metaCtx, method, args, reply, opts...)
}

func (b *balancerWithMeta) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string,
opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
metaCtx, err := b.meta.Context(ctx)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return b.balancer.NewStream(metaCtx, desc, method, opts...)
}

func (b *balancerWithMeta) Close(ctx context.Context) error {
return b.balancer.Close(ctx)
}
5 changes: 3 additions & 2 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Driver struct {
pool *conn.Pool

mtx sync.Mutex
balancer *balancer.Balancer
balancer *balancerWithMeta

children map[uint64]*Driver
childrenMtx xsync.Mutex
Expand Down Expand Up @@ -416,10 +416,11 @@ func (d *Driver) connect(ctx context.Context) (err error) {
d.pool = conn.NewPool(ctx, d.config)
}
if d.balancer == nil {
d.balancer, err = balancer.New(ctx, d.config, d.pool, d.discoveryOptions...)
b, err := balancer.New(ctx, d.config, d.pool, d.discoveryOptions...)
if err != nil {
return xerrors.WithStackTrace(err)
}
d.balancer = newBalancerWithMeta(b, d.config.Meta())
}

d.table = xsync.OnceValue(func() (*internalTable.Client, error) {
Expand Down
4 changes: 0 additions & 4 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,6 @@ func (b *Balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc
}
}()

if ctx, err = b.driverConfig.Meta().Context(ctx); err != nil {
return xerrors.WithStackTrace(err)
}

if err = f(ctx, cc); err != nil {
if conn.UseWrapping(ctx) {
if credentials.IsAccessError(err) {
Expand Down
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func WithPanicCallback(panicCallback func(e interface{})) Option {
// WithSharedBalancer sets balancer from parent driver to child driver
func WithSharedBalancer(parent *Driver) Option {
return func(ctx context.Context, c *Driver) error {
c.balancer = parent.balancer
c.balancer = newBalancerWithMeta(parent.balancer.Balancer(), c.config.Meta())

return nil
}
Expand Down

0 comments on commit a789c3c

Please sign in to comment.