Skip to content

Commit

Permalink
Added metadata middleware for child balancers
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman Golov committed Nov 1, 2024
1 parent 17a901b commit 7ab3bba
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added `meatadata.Middleware` for balancers

## v3.89.2
* Returned log.XXX methods for create fields, removed from public at v3.85.0

Expand Down
5 changes: 3 additions & 2 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Driver struct {
pool *conn.Pool

mtx sync.Mutex
balancer *balancer.Balancer
balancer balancer.ConnBalancer

children map[uint64]*Driver
childrenMtx xsync.Mutex
Expand Down Expand Up @@ -416,10 +416,11 @@ func (d *Driver) connect(ctx context.Context) (err error) {
d.pool = conn.NewPool(ctx, d.config)
}
if d.balancer == nil {
d.balancer, err = balancer.New(ctx, d.config, d.pool, d.discoveryOptions...)
b, err := balancer.New(ctx, d.config, d.pool, d.discoveryOptions...)
if err != nil {
return xerrors.WithStackTrace(err)
}
d.balancer = balancer.WithMetadata(b, d.config.Meta())
}

d.table = xsync.OnceValue(func() (*internalTable.Client, error) {
Expand Down
9 changes: 5 additions & 4 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type discoveryClient interface {
Discover(ctx context.Context) ([]endpoint.Endpoint, error)
}

type ConnBalancer interface {
grpc.ClientConnInterface
closer.Closer
}

type Balancer struct {
driverConfig *config.Config
config balancerConfig.Config
Expand Down Expand Up @@ -299,10 +304,6 @@ func (b *Balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc
}
}()

if ctx, err = b.driverConfig.Meta().Context(ctx); err != nil {
return xerrors.WithStackTrace(err)
}

if err = f(ctx, cc); err != nil {
if conn.UseWrapping(ctx) {
if credentials.IsAccessError(err) {
Expand Down
41 changes: 41 additions & 0 deletions internal/balancer/shared.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package balancer

import (
"context"

Check failure on line 5 in internal/balancer/shared.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/ydb-platform/ydb-go-sdk/v3) (gci)
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"google.golang.org/grpc"

Check failure on line 8 in internal/balancer/shared.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/ydb-platform/ydb-go-sdk/v3) (gci)
)

type Shared struct {
balancer ConnBalancer
meta *meta.Meta
}

func WithMetadata(b ConnBalancer, m *meta.Meta) *Shared {
return &Shared{balancer: b, meta: m}
}

func (s *Shared) Invoke(ctx context.Context, method string, args any, reply any,
opts ...grpc.CallOption) error {

Check failure on line 21 in internal/balancer/shared.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gofumpt`-ed (gofumpt)
metaCtx, err := s.meta.Context(ctx)
if err != nil {
return xerrors.WithStackTrace(err)
}
return s.balancer.Invoke(metaCtx, method, args, reply, opts...)

Check failure on line 26 in internal/balancer/shared.go

View workflow job for this annotation

GitHub Actions / golangci-lint

return with no blank line before (nlreturn)

Check failure on line 27 in internal/balancer/shared.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gofumpt`-ed (gofumpt)
}

Check failure on line 28 in internal/balancer/shared.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unnecessary trailing newline (whitespace)

func (s *Shared) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string,
opts ...grpc.CallOption) (grpc.ClientStream, error) {

Check failure on line 31 in internal/balancer/shared.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gofumpt`-ed (gofumpt)
metaCtx, err := s.meta.Context(ctx)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
return s.balancer.NewStream(metaCtx, desc, method, opts...)

Check failure on line 36 in internal/balancer/shared.go

View workflow job for this annotation

GitHub Actions / golangci-lint

return with no blank line before (nlreturn)
}

func (s *Shared) Close(ctx context.Context) error {
return s.balancer.Close(ctx)
}
4 changes: 2 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ydb-platform/ydb-go-sdk/v3/config"
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer"
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/certificates"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
Expand Down Expand Up @@ -602,8 +603,7 @@ func WithPanicCallback(panicCallback func(e interface{})) Option {
// WithSharedBalancer sets balancer from parent driver to child driver
func WithSharedBalancer(parent *Driver) Option {
return func(ctx context.Context, c *Driver) error {
c.balancer = parent.balancer

c.balancer = balancer.WithMetadata(parent.balancer, c.config.Meta())
return nil

Check failure on line 607 in options.go

View workflow job for this annotation

GitHub Actions / golangci-lint

return with no blank line before (nlreturn)
}
}
Expand Down

0 comments on commit 7ab3bba

Please sign in to comment.