Skip to content

Commit

Permalink
Merge pull request #1538 from ydb-platform/re-create-discovery-client…
Browse files Browse the repository at this point in the history
…-on-re-discovery

* Changed behaviour on re-discovery: always open new grpc connection for discovery request
  • Loading branch information
asmyasnikov authored Nov 3, 2024
2 parents 43f2131 + 7397acf commit d7f0965
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 299 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Changed behaviour on re-discovery: always open new grpc connection for discovery request

## v3.89.3
* Wrapped internal balancer with metadata middleware

Expand Down
46 changes: 0 additions & 46 deletions balancer.go

This file was deleted.

129 changes: 75 additions & 54 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/dsn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
internalQuery "github.com/ydb-platform/ydb-go-sdk/v3/internal/query"
queryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
internalRatelimiter "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter"
Expand Down Expand Up @@ -50,74 +51,94 @@ import (

var _ Connection = (*Driver)(nil)

// Driver type provide access to YDB service clients
type Driver struct {
ctxCancel context.CancelFunc
type (
// Driver type provide access to YDB service clients
Driver struct {
ctxCancel context.CancelFunc

userInfo *dsn.UserInfo
userInfo *dsn.UserInfo

logger log.Logger
loggerOpts []log.Option
loggerDetails trace.Detailer
logger log.Logger
loggerOpts []log.Option
loggerDetails trace.Detailer

opts []Option
opts []Option

config *config.Config
options []config.Option
config *config.Config
options []config.Option

discovery *xsync.Once[*internalDiscovery.Client]
discoveryOptions []discoveryConfig.Option
discovery *xsync.Once[*internalDiscovery.Client]
discoveryOptions []discoveryConfig.Option

operation *xsync.Once[*operation.Client]
operation *xsync.Once[*operation.Client]

table *xsync.Once[*internalTable.Client]
tableOptions []tableConfig.Option
table *xsync.Once[*internalTable.Client]
tableOptions []tableConfig.Option

query *xsync.Once[*internalQuery.Client]
queryOptions []queryConfig.Option
query *xsync.Once[*internalQuery.Client]
queryOptions []queryConfig.Option

scripting *xsync.Once[*internalScripting.Client]
scriptingOptions []scriptingConfig.Option
scripting *xsync.Once[*internalScripting.Client]
scriptingOptions []scriptingConfig.Option

scheme *xsync.Once[*internalScheme.Client]
schemeOptions []schemeConfig.Option
scheme *xsync.Once[*internalScheme.Client]
schemeOptions []schemeConfig.Option

coordination *xsync.Once[*internalCoordination.Client]
coordinationOptions []coordinationConfig.Option
coordination *xsync.Once[*internalCoordination.Client]
coordinationOptions []coordinationConfig.Option

ratelimiter *xsync.Once[*internalRatelimiter.Client]
ratelimiterOptions []ratelimiterConfig.Option
ratelimiter *xsync.Once[*internalRatelimiter.Client]
ratelimiterOptions []ratelimiterConfig.Option

topic *xsync.Once[*topicclientinternal.Client]
topicOptions []topicoptions.TopicOption
topic *xsync.Once[*topicclientinternal.Client]
topicOptions []topicoptions.TopicOption

databaseSQLOptions []xsql.ConnectorOption
databaseSQLOptions []xsql.ConnectorOption

pool *conn.Pool
pool *conn.Pool

mtx sync.Mutex
balancer *balancerWithMeta
mtx sync.Mutex
metaBalancer *balancerWithMeta

children map[uint64]*Driver
childrenMtx xsync.Mutex
onClose []func(c *Driver)
children map[uint64]*Driver
childrenMtx xsync.Mutex
onClose []func(c *Driver)

panicCallback func(e interface{})
panicCallback func(e interface{})
}
balancerWithMeta struct {
balancer *balancer.Balancer
meta *meta.Meta
}
)

func (b *balancerWithMeta) Invoke(ctx context.Context, method string, args any, reply any,
opts ...grpc.CallOption,
) error {
metaCtx, err := b.meta.Context(ctx)
if err != nil {
return xerrors.WithStackTrace(err)
}

return b.balancer.Invoke(metaCtx, method, args, reply, opts...)
}

func (d *Driver) trace() *trace.Driver {
if d.config != nil {
return d.config.Trace()
func (b *balancerWithMeta) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string,
opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
metaCtx, err := b.meta.Context(ctx)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return &trace.Driver{}
return b.balancer.NewStream(metaCtx, desc, method, opts...)
}

// Close closes Driver and clear resources
//
//nolint:nonamedreturns
func (d *Driver) Close(ctx context.Context) (finalErr error) {
onDone := trace.DriverOnClose(d.trace(), &ctx,
onDone := trace.DriverOnClose(d.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/ydb.(*Driver).Close"),
)
defer func() {
Expand Down Expand Up @@ -155,7 +176,7 @@ func (d *Driver) Close(ctx context.Context) (finalErr error) {
d.query.Close,
d.topic.Close,
d.discovery.Close,
d.balancer.Close,
d.metaBalancer.balancer.Close,
d.pool.Release,
)

Expand Down Expand Up @@ -263,7 +284,7 @@ func Open(ctx context.Context, dsn string, opts ...Option) (_ *Driver, _ error)
}

onDone := trace.DriverOnInit(
d.trace(), &ctx,
d.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/ydb.Open"),
d.config.Endpoint(), d.config.Database(), d.config.Secure(),
)
Expand Down Expand Up @@ -305,7 +326,7 @@ func New(ctx context.Context, opts ...Option) (_ *Driver, err error) { //nolint:
}

onDone := trace.DriverOnInit(
d.trace(), &ctx,
d.config.Trace(), &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/ydb.New"),
d.config.Endpoint(), d.config.Database(), d.config.Secure(),
)
Expand Down Expand Up @@ -415,17 +436,17 @@ func (d *Driver) connect(ctx context.Context) (err error) {
if d.pool == nil {
d.pool = conn.NewPool(ctx, d.config)
}
if d.balancer == nil {
if d.metaBalancer == nil {
b, err := balancer.New(ctx, d.config, d.pool, d.discoveryOptions...)
if err != nil {
return xerrors.WithStackTrace(err)
}
d.balancer = newBalancerWithMeta(b, d.config.Meta())
d.metaBalancer = &balancerWithMeta{balancer: b, meta: d.config.Meta()}
}

d.table = xsync.OnceValue(func() (*internalTable.Client, error) {
return internalTable.New(xcontext.ValueOnly(ctx),
d.balancer,
d.metaBalancer,
tableConfig.New(
append(
// prepend common params from root config
Expand All @@ -440,7 +461,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {

d.query = xsync.OnceValue(func() (*internalQuery.Client, error) {
return internalQuery.New(xcontext.ValueOnly(ctx),
d.balancer,
d.metaBalancer,
queryConfig.New(
append(
// prepend common params from root config
Expand All @@ -458,7 +479,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {

d.scheme = xsync.OnceValue(func() (*internalScheme.Client, error) {
return internalScheme.New(xcontext.ValueOnly(ctx),
d.balancer,
d.metaBalancer,
schemeConfig.New(
append(
// prepend common params from root config
Expand All @@ -474,7 +495,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {

d.coordination = xsync.OnceValue(func() (*internalCoordination.Client, error) {
return internalCoordination.New(xcontext.ValueOnly(ctx),
d.balancer,
d.metaBalancer,
coordinationConfig.New(
append(
// prepend common params from root config
Expand All @@ -489,7 +510,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {

d.ratelimiter = xsync.OnceValue(func() (*internalRatelimiter.Client, error) {
return internalRatelimiter.New(xcontext.ValueOnly(ctx),
d.balancer,
d.metaBalancer,
ratelimiterConfig.New(
append(
// prepend common params from root config
Expand Down Expand Up @@ -523,13 +544,13 @@ func (d *Driver) connect(ctx context.Context) (err error) {

d.operation = xsync.OnceValue(func() (*operation.Client, error) {
return operation.New(xcontext.ValueOnly(ctx),
d.balancer,
d.metaBalancer,
), nil
})

d.scripting = xsync.OnceValue(func() (*internalScripting.Client, error) {
return internalScripting.New(xcontext.ValueOnly(ctx),
d.balancer,
d.metaBalancer,
scriptingConfig.New(
append(
// prepend common params from root config
Expand All @@ -544,7 +565,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {

d.topic = xsync.OnceValue(func() (*topicclientinternal.Client, error) {
return topicclientinternal.New(xcontext.ValueOnly(ctx),
d.balancer,
d.metaBalancer,
d.config.Credentials(),
append(
// prepend common params from root config
Expand All @@ -565,5 +586,5 @@ func (d *Driver) connect(ctx context.Context) (err error) {
//
// Warning: for connect to driver-unsupported YDB services
func GRPCConn(cc *Driver) grpc.ClientConnInterface {
return conn.WithContextModifier(cc.balancer, conn.WithoutWrapping)
return conn.WithContextModifier(cc.metaBalancer, conn.WithoutWrapping)
}
Loading

0 comments on commit d7f0965

Please sign in to comment.