diff --git a/CHANGELOG.md b/CHANGELOG.md index 7041da367..6fe338f5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Changed behaviour on re-discovery: always open new grpc connection for discovery request + ## v3.89.3 * Wrapped internal balancer with metadata middleware diff --git a/balancer.go b/balancer.go deleted file mode 100644 index 2459b9693..000000000 --- a/balancer.go +++ /dev/null @@ -1,46 +0,0 @@ -package ydb - -import ( - "context" - - "google.golang.org/grpc" - - "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/meta" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" -) - -type balancerWithMeta struct { - balancer *balancer.Balancer - meta *meta.Meta -} - -func newBalancerWithMeta(b *balancer.Balancer, m *meta.Meta) *balancerWithMeta { - return &balancerWithMeta{balancer: b, meta: m} -} - -func (b *balancerWithMeta) Invoke(ctx context.Context, method string, args any, reply any, - opts ...grpc.CallOption, -) error { - metaCtx, err := b.meta.Context(ctx) - if err != nil { - return xerrors.WithStackTrace(err) - } - - return b.balancer.Invoke(metaCtx, method, args, reply, opts...) -} - -func (b *balancerWithMeta) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, - opts ...grpc.CallOption, -) (grpc.ClientStream, error) { - metaCtx, err := b.meta.Context(ctx) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - - return b.balancer.NewStream(metaCtx, desc, method, opts...) -} - -func (b *balancerWithMeta) Close(ctx context.Context) error { - return b.balancer.Close(ctx) -} diff --git a/driver.go b/driver.go index b1e8f34af..172c75ac6 100644 --- a/driver.go +++ b/driver.go @@ -21,6 +21,7 @@ import ( discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/dsn" "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/meta" internalQuery "github.com/ydb-platform/ydb-go-sdk/v3/internal/query" queryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" internalRatelimiter "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter" @@ -50,74 +51,94 @@ import ( var _ Connection = (*Driver)(nil) -// Driver type provide access to YDB service clients -type Driver struct { - ctxCancel context.CancelFunc +type ( + // Driver type provide access to YDB service clients + Driver struct { + ctxCancel context.CancelFunc - userInfo *dsn.UserInfo + userInfo *dsn.UserInfo - logger log.Logger - loggerOpts []log.Option - loggerDetails trace.Detailer + logger log.Logger + loggerOpts []log.Option + loggerDetails trace.Detailer - opts []Option + opts []Option - config *config.Config - options []config.Option + config *config.Config + options []config.Option - discovery *xsync.Once[*internalDiscovery.Client] - discoveryOptions []discoveryConfig.Option + discovery *xsync.Once[*internalDiscovery.Client] + discoveryOptions []discoveryConfig.Option - operation *xsync.Once[*operation.Client] + operation *xsync.Once[*operation.Client] - table *xsync.Once[*internalTable.Client] - tableOptions []tableConfig.Option + table *xsync.Once[*internalTable.Client] + tableOptions []tableConfig.Option - query *xsync.Once[*internalQuery.Client] - queryOptions []queryConfig.Option + query *xsync.Once[*internalQuery.Client] + queryOptions []queryConfig.Option - scripting *xsync.Once[*internalScripting.Client] - scriptingOptions []scriptingConfig.Option + scripting *xsync.Once[*internalScripting.Client] + scriptingOptions []scriptingConfig.Option - scheme *xsync.Once[*internalScheme.Client] - schemeOptions []schemeConfig.Option + scheme *xsync.Once[*internalScheme.Client] + schemeOptions []schemeConfig.Option - coordination *xsync.Once[*internalCoordination.Client] - coordinationOptions []coordinationConfig.Option + coordination *xsync.Once[*internalCoordination.Client] + coordinationOptions []coordinationConfig.Option - ratelimiter *xsync.Once[*internalRatelimiter.Client] - ratelimiterOptions []ratelimiterConfig.Option + ratelimiter *xsync.Once[*internalRatelimiter.Client] + ratelimiterOptions []ratelimiterConfig.Option - topic *xsync.Once[*topicclientinternal.Client] - topicOptions []topicoptions.TopicOption + topic *xsync.Once[*topicclientinternal.Client] + topicOptions []topicoptions.TopicOption - databaseSQLOptions []xsql.ConnectorOption + databaseSQLOptions []xsql.ConnectorOption - pool *conn.Pool + pool *conn.Pool - mtx sync.Mutex - balancer *balancerWithMeta + mtx sync.Mutex + metaBalancer *balancerWithMeta - children map[uint64]*Driver - childrenMtx xsync.Mutex - onClose []func(c *Driver) + children map[uint64]*Driver + childrenMtx xsync.Mutex + onClose []func(c *Driver) - panicCallback func(e interface{}) + panicCallback func(e interface{}) + } + balancerWithMeta struct { + balancer *balancer.Balancer + meta *meta.Meta + } +) + +func (b *balancerWithMeta) Invoke(ctx context.Context, method string, args any, reply any, + opts ...grpc.CallOption, +) error { + metaCtx, err := b.meta.Context(ctx) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return b.balancer.Invoke(metaCtx, method, args, reply, opts...) } -func (d *Driver) trace() *trace.Driver { - if d.config != nil { - return d.config.Trace() +func (b *balancerWithMeta) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, + opts ...grpc.CallOption, +) (grpc.ClientStream, error) { + metaCtx, err := b.meta.Context(ctx) + if err != nil { + return nil, xerrors.WithStackTrace(err) } - return &trace.Driver{} + return b.balancer.NewStream(metaCtx, desc, method, opts...) } // Close closes Driver and clear resources // //nolint:nonamedreturns func (d *Driver) Close(ctx context.Context) (finalErr error) { - onDone := trace.DriverOnClose(d.trace(), &ctx, + onDone := trace.DriverOnClose(d.config.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/ydb.(*Driver).Close"), ) defer func() { @@ -155,7 +176,7 @@ func (d *Driver) Close(ctx context.Context) (finalErr error) { d.query.Close, d.topic.Close, d.discovery.Close, - d.balancer.Close, + d.metaBalancer.balancer.Close, d.pool.Release, ) @@ -263,7 +284,7 @@ func Open(ctx context.Context, dsn string, opts ...Option) (_ *Driver, _ error) } onDone := trace.DriverOnInit( - d.trace(), &ctx, + d.config.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/ydb.Open"), d.config.Endpoint(), d.config.Database(), d.config.Secure(), ) @@ -305,7 +326,7 @@ func New(ctx context.Context, opts ...Option) (_ *Driver, err error) { //nolint: } onDone := trace.DriverOnInit( - d.trace(), &ctx, + d.config.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/ydb.New"), d.config.Endpoint(), d.config.Database(), d.config.Secure(), ) @@ -415,17 +436,17 @@ func (d *Driver) connect(ctx context.Context) (err error) { if d.pool == nil { d.pool = conn.NewPool(ctx, d.config) } - if d.balancer == nil { + if d.metaBalancer == nil { b, err := balancer.New(ctx, d.config, d.pool, d.discoveryOptions...) if err != nil { return xerrors.WithStackTrace(err) } - d.balancer = newBalancerWithMeta(b, d.config.Meta()) + d.metaBalancer = &balancerWithMeta{balancer: b, meta: d.config.Meta()} } d.table = xsync.OnceValue(func() (*internalTable.Client, error) { return internalTable.New(xcontext.ValueOnly(ctx), - d.balancer, + d.metaBalancer, tableConfig.New( append( // prepend common params from root config @@ -440,7 +461,7 @@ func (d *Driver) connect(ctx context.Context) (err error) { d.query = xsync.OnceValue(func() (*internalQuery.Client, error) { return internalQuery.New(xcontext.ValueOnly(ctx), - d.balancer, + d.metaBalancer, queryConfig.New( append( // prepend common params from root config @@ -458,7 +479,7 @@ func (d *Driver) connect(ctx context.Context) (err error) { d.scheme = xsync.OnceValue(func() (*internalScheme.Client, error) { return internalScheme.New(xcontext.ValueOnly(ctx), - d.balancer, + d.metaBalancer, schemeConfig.New( append( // prepend common params from root config @@ -474,7 +495,7 @@ func (d *Driver) connect(ctx context.Context) (err error) { d.coordination = xsync.OnceValue(func() (*internalCoordination.Client, error) { return internalCoordination.New(xcontext.ValueOnly(ctx), - d.balancer, + d.metaBalancer, coordinationConfig.New( append( // prepend common params from root config @@ -489,7 +510,7 @@ func (d *Driver) connect(ctx context.Context) (err error) { d.ratelimiter = xsync.OnceValue(func() (*internalRatelimiter.Client, error) { return internalRatelimiter.New(xcontext.ValueOnly(ctx), - d.balancer, + d.metaBalancer, ratelimiterConfig.New( append( // prepend common params from root config @@ -523,13 +544,13 @@ func (d *Driver) connect(ctx context.Context) (err error) { d.operation = xsync.OnceValue(func() (*operation.Client, error) { return operation.New(xcontext.ValueOnly(ctx), - d.balancer, + d.metaBalancer, ), nil }) d.scripting = xsync.OnceValue(func() (*internalScripting.Client, error) { return internalScripting.New(xcontext.ValueOnly(ctx), - d.balancer, + d.metaBalancer, scriptingConfig.New( append( // prepend common params from root config @@ -544,7 +565,7 @@ func (d *Driver) connect(ctx context.Context) (err error) { d.topic = xsync.OnceValue(func() (*topicclientinternal.Client, error) { return topicclientinternal.New(xcontext.ValueOnly(ctx), - d.balancer, + d.metaBalancer, d.config.Credentials(), append( // prepend common params from root config @@ -565,5 +586,5 @@ func (d *Driver) connect(ctx context.Context) (err error) { // // Warning: for connect to driver-unsupported YDB services func GRPCConn(cc *Driver) grpc.ClientConnInterface { - return conn.WithContextModifier(cc.balancer, conn.WithoutWrapping) + return conn.WithContextModifier(cc.metaBalancer, conn.WithoutWrapping) } diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index 03376ee81..6534e0a20 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -6,16 +6,17 @@ import ( "strings" "sync/atomic" + "github.com/ydb-platform/ydb-go-genproto/Ydb_Discovery_V1" "google.golang.org/grpc" "github.com/ydb-platform/ydb-go-sdk/v3/config" balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" "github.com/ydb-platform/ydb-go-sdk/v3/internal/conn" "github.com/ydb-platform/ydb-go-sdk/v3/internal/credentials" internalDiscovery "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery" discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/meta" "github.com/ydb-platform/ydb-go-sdk/v3/internal/repeater" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" @@ -28,22 +29,17 @@ import ( var ErrNoEndpoints = xerrors.Wrap(fmt.Errorf("no endpoints")) -type discoveryClient interface { - closer.Closer - - Discover(ctx context.Context) ([]endpoint.Endpoint, error) -} - type Balancer struct { driverConfig *config.Config - config balancerConfig.Config + balancerConfig balancerConfig.Config + discoveryConfig *discoveryConfig.Config pool *conn.Pool - discoveryClient discoveryClient discoveryRepeater repeater.Repeater - localDCDetector func(ctx context.Context, endpoints []endpoint.Endpoint) (string, error) - connectionsState atomic.Pointer[connectionsState] + discover func(ctx context.Context) (endpoints []endpoint.Endpoint, location string, err error) + localDCDetector func(ctx context.Context, endpoints []endpoint.Endpoint) (string, error) + connectionsState atomic.Pointer[connectionsState] mu xsync.RWMutex onApplyDiscoveredEndpoints []func(ctx context.Context, endpoints []endpoint.Info) } @@ -93,34 +89,32 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) { address, b.driverConfig.Database(), ) - endpoints []endpoint.Endpoint - localDC string - cancel context.CancelFunc ) defer func() { onDone(err) }() if dialTimeout := b.driverConfig.DialTimeout(); dialTimeout > 0 { + var cancel context.CancelFunc ctx, cancel = xcontext.WithTimeout(ctx, dialTimeout) - } else { - ctx, cancel = xcontext.WithCancel(ctx) + defer cancel() } - defer cancel() - endpoints, err = b.discoveryClient.Discover(ctx) + endpoints, location, err := b.discover(ctx) if err != nil { return xerrors.WithStackTrace(err) } - if b.config.DetectNearestDC { - localDC, err = b.localDCDetector(ctx, endpoints) + if b.balancerConfig.DetectNearestDC { + location, err := b.localDCDetector(ctx, endpoints) if err != nil { return xerrors.WithStackTrace(err) } - } - b.applyDiscoveredEndpoints(ctx, endpoints, localDC) + b.applyDiscoveredEndpoints(ctx, endpoints, location) + } else { + b.applyDiscoveredEndpoints(ctx, endpoints, location) + } return nil } @@ -131,7 +125,7 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi b.driverConfig.Trace(), &ctx, stack.FunctionID( "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.(*Balancer).applyDiscoveredEndpoints"), - b.config.DetectNearestDC, + b.balancerConfig.DetectNearestDC, b.driverConfig.Database(), ) previous = b.connections().All() @@ -155,7 +149,7 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi } info := balancerConfig.Info{SelfLocation: localDC} - state := newConnectionsState(connections, b.config.Filter, info, b.config.AllowFallback) + state := newConnectionsState(connections, b.balancerConfig.Filter, info, b.balancerConfig.AllowFallback) endpointsInfo := make([]endpoint.Info, len(newest)) for i, e := range newest { @@ -184,32 +178,62 @@ func (b *Balancer) Close(ctx context.Context) (err error) { b.discoveryRepeater.Stop() } - if err = b.discoveryClient.Close(ctx); err != nil { - return xerrors.WithStackTrace(err) - } - return nil } -func New( - ctx context.Context, - driverConfig *config.Config, - pool *conn.Pool, - opts ...discoveryConfig.Option, -) (b *Balancer, finalErr error) { - var ( - onDone = trace.DriverOnBalancerInit( - driverConfig.Trace(), &ctx, - stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.New"), - driverConfig.Balancer().String(), +func makeDiscoveryFunc( + driverConfig *config.Config, discoveryConfig *discoveryConfig.Config, +) func(ctx context.Context) (endpoints []endpoint.Endpoint, location string, err error) { + return func(ctx context.Context) (endpoints []endpoint.Endpoint, location string, err error) { + ctx, traceID, err := meta.TraceID(ctx) + if err != nil { + return endpoints, location, xerrors.WithStackTrace(err) + } + + ctx, err = driverConfig.Meta().Context(ctx) + if err != nil { + return endpoints, location, xerrors.WithStackTrace( + fmt.Errorf("failed to enrich context with meta, traceID %q: %w", traceID, err), + ) + } + + cc, err := grpc.DialContext(ctx, + "ydb:///"+driverConfig.Endpoint(), + append( + driverConfig.GrpcDialOptions(), + grpc.WithBlock(), + )..., ) - discoveryConfig = discoveryConfig.New(append(opts, - discoveryConfig.With(driverConfig.Common), - discoveryConfig.WithEndpoint(driverConfig.Endpoint()), - discoveryConfig.WithDatabase(driverConfig.Database()), - discoveryConfig.WithSecure(driverConfig.Secure()), - discoveryConfig.WithMeta(driverConfig.Meta()), - )...) + if err != nil { + return endpoints, location, xerrors.WithStackTrace( + fmt.Errorf("failed to dial %q, traceID %q: %w", driverConfig.Endpoint(), traceID, err), + ) + } + defer func() { + _ = cc.Close() + }() + + endpoints, location, err = internalDiscovery.Discover(ctx, + Ydb_Discovery_V1.NewDiscoveryServiceClient(cc), discoveryConfig, + ) + if err != nil { + return endpoints, location, xerrors.WithStackTrace( + fmt.Errorf("failed to discover database %q, address %q, traceID %q: %w", + driverConfig.Database(), driverConfig.Endpoint(), traceID, err, + ), + ) + } + + return endpoints, location, nil + } +} + +func New(ctx context.Context, driverConfig *config.Config, pool *conn.Pool, opts ...discoveryConfig.Option) ( + b *Balancer, finalErr error, +) { + onDone := trace.DriverOnBalancerInit(driverConfig.Trace(), &ctx, + stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.New"), + driverConfig.Balancer().String(), ) defer func() { onDone(finalErr) @@ -218,19 +242,25 @@ func New( b = &Balancer{ driverConfig: driverConfig, pool: pool, - discoveryClient: internalDiscovery.New(ctx, pool.Get( - endpoint.New(driverConfig.Endpoint()), - ), discoveryConfig), + discoveryConfig: discoveryConfig.New(append(opts, + discoveryConfig.With(driverConfig.Common), + discoveryConfig.WithEndpoint(driverConfig.Endpoint()), + discoveryConfig.WithDatabase(driverConfig.Database()), + discoveryConfig.WithSecure(driverConfig.Secure()), + discoveryConfig.WithMeta(driverConfig.Meta()), + )...), localDCDetector: detectLocalDC, } + b.discover = makeDiscoveryFunc(b.driverConfig, b.discoveryConfig) + if config := driverConfig.Balancer(); config == nil { - b.config = balancerConfig.Config{} + b.balancerConfig = balancerConfig.Config{} } else { - b.config = *config + b.balancerConfig = *config } - if b.config.SingleConn { + if b.balancerConfig.SingleConn { b.applyDiscoveredEndpoints(ctx, []endpoint.Endpoint{ endpoint.New(driverConfig.Endpoint()), }, "") @@ -240,7 +270,7 @@ func New( return nil, xerrors.WithStackTrace(err) } // run background discovering - if d := discoveryConfig.Interval(); d > 0 { + if d := b.discoveryConfig.Interval(); d > 0 { b.discoveryRepeater = repeater.New(xcontext.ValueOnly(ctx), d, b.clusterDiscoveryAttempt, repeater.WithName("discovery"), diff --git a/internal/balancer/local_dc_test.go b/internal/balancer/local_dc_test.go index 5fbcac1b8..c3f13d710 100644 --- a/internal/balancer/local_dc_test.go +++ b/internal/balancer/local_dc_test.go @@ -134,14 +134,16 @@ func TestLocalDCDiscovery(t *testing.T) { config.WithBalancer(balancers.PreferNearestDC(balancers.Default())), ) r := &Balancer{ - driverConfig: cfg, - config: *cfg.Balancer(), - pool: conn.NewPool(context.Background(), cfg), - discoveryClient: discoveryMock{endpoints: []endpoint.Endpoint{ - &mock.Endpoint{AddrField: "a:123", LocationField: "a"}, - &mock.Endpoint{AddrField: "b:234", LocationField: "b"}, - &mock.Endpoint{AddrField: "c:456", LocationField: "c"}, - }}, + driverConfig: cfg, + balancerConfig: *cfg.Balancer(), + pool: conn.NewPool(context.Background(), cfg), + discover: func(ctx context.Context) (endpoints []endpoint.Endpoint, location string, err error) { + return []endpoint.Endpoint{ + &mock.Endpoint{AddrField: "a:123", LocationField: "a"}, + &mock.Endpoint{AddrField: "b:234", LocationField: "b"}, + &mock.Endpoint{AddrField: "c:456", LocationField: "c"}, + }, "", nil + }, localDCDetector: func(ctx context.Context, endpoints []endpoint.Endpoint) (string, error) { return "b", nil }, diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index db477d4eb..f21fc07af 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -37,7 +37,7 @@ type Client struct { client Ydb_Discovery_V1.DiscoveryServiceClient } -func discover( +func Discover( ctx context.Context, client Ydb_Discovery_V1.DiscoveryServiceClient, config *config.Config, @@ -114,7 +114,7 @@ func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, f return nil, xerrors.WithStackTrace(err) } - endpoints, location, err = discover(ctx, c.client, c.config) + endpoints, location, err = Discover(ctx, c.client, c.config) if err != nil { return nil, xerrors.WithStackTrace(err) } diff --git a/internal/discovery/discovery_test.go b/internal/discovery/discovery_test.go index 9bf3a8558..05303edc5 100644 --- a/internal/discovery/discovery_test.go +++ b/internal/discovery/discovery_test.go @@ -60,7 +60,7 @@ func TestDiscover(t *testing.T) { })), }, }, nil) - endpoints, location, err := discover(ctx, client, config.New( + endpoints, location, err := Discover(ctx, client, config.New( config.WithDatabase("test"), config.WithSecure(false), config.WithClock(clock), @@ -86,7 +86,7 @@ func TestDiscover(t *testing.T) { client.EXPECT().ListEndpoints(gomock.Any(), &Ydb_Discovery.ListEndpointsRequest{ Database: "test", }).Return(nil, xerrors.Transport(status.Error(grpcCodes.Unavailable, ""))) - endpoints, location, err := discover(ctx, client, config.New( + endpoints, location, err := Discover(ctx, client, config.New( config.WithDatabase("test"), )) require.Error(t, err) @@ -106,7 +106,7 @@ func TestDiscover(t *testing.T) { Status: Ydb.StatusIds_UNAVAILABLE, }, }, nil) - endpoints, location, err := discover(ctx, client, config.New( + endpoints, location, err := Discover(ctx, client, config.New( config.WithDatabase("test"), )) require.Error(t, err) @@ -141,7 +141,7 @@ func TestDiscover(t *testing.T) { })), }, }, nil) - endpoints, location, err := discover(ctx, client, config.New( + endpoints, location, err := Discover(ctx, client, config.New( config.WithDatabase("test"), config.WithAddressMutator(func(address string) string { return "u-" + address diff --git a/internal/meta/meta.go b/internal/meta/meta.go index 5d56bd4b1..8820f526d 100644 --- a/internal/meta/meta.go +++ b/internal/meta/meta.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc/metadata" "github.com/ydb-platform/ydb-go-sdk/v3/internal/credentials" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/secret" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/version" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -116,12 +117,11 @@ func (m *Meta) meta(ctx context.Context) (_ metadata.MD, err error) { } var token string - done := trace.DriverOnGetCredentials(m.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/meta.(*Meta).meta"), ) defer func() { - done(token, err) + done(secret.Token(token), err) }() token, err = m.credentials.Token(ctx) diff --git a/internal/meta/trace_id.go b/internal/meta/trace_id.go index 182c57dac..e5a1547e0 100644 --- a/internal/meta/trace_id.go +++ b/internal/meta/trace_id.go @@ -17,7 +17,9 @@ func TraceID(ctx context.Context, opts ...func(opts *newTraceIDOpts)) (context.C if id, has := traceID(ctx); has { return ctx, id, nil } - options := newTraceIDOpts{newRandom: uuid.NewRandom} + options := newTraceIDOpts{ + newRandom: uuid.NewRandom, + } for _, opt := range opts { if opt != nil { opt(&options) diff --git a/log/driver.go b/log/driver.go index caa201209..4a52fc2b7 100644 --- a/log/driver.go +++ b/log/driver.go @@ -5,7 +5,6 @@ import ( "time" "github.com/ydb-platform/ydb-go-sdk/v3/internal/kv" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/secret" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -490,13 +489,13 @@ func internalDriver(l Logger, d trace.Detailer) trace.Driver { if info.Error == nil { l.Log(ctx, "done", kv.Latency(start), - kv.String("token", secret.Token(info.Token)), + kv.String("token", info.Token), ) } else { l.Log(WithLevel(ctx, ERROR), "done", kv.Error(info.Error), kv.Latency(start), - kv.String("token", secret.Token(info.Token)), + kv.String("token", info.Token), kv.Version(), ) } diff --git a/options.go b/options.go index 2aa9a375a..551abc642 100644 --- a/options.go +++ b/options.go @@ -34,8 +34,8 @@ import ( type Option func(ctx context.Context, d *Driver) error func WithStaticCredentials(user, password string) Option { - return func(ctx context.Context, c *Driver) error { - c.userInfo = &dsn.UserInfo{ + return func(ctx context.Context, d *Driver) error { + d.userInfo = &dsn.UserInfo{ User: user, Password: password, } @@ -48,8 +48,8 @@ func WithStaticCredentials(user, password string) Option { // // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental func WithNodeAddressMutator(mutator func(address string) string) Option { - return func(ctx context.Context, c *Driver) error { - c.discoveryOptions = append(c.discoveryOptions, discoveryConfig.WithAddressMutator(mutator)) + return func(ctx context.Context, d *Driver) error { + d.discoveryOptions = append(d.discoveryOptions, discoveryConfig.WithAddressMutator(mutator)) return nil } @@ -132,8 +132,8 @@ func WithOauth2TokenExchangeCredentialsFile( // WithApplicationName add provided application name to all api requests func WithApplicationName(applicationName string) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithApplicationName(applicationName)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithApplicationName(applicationName)) return nil } @@ -145,16 +145,16 @@ func WithApplicationName(applicationName string) Option { // Will be removed after Oct 2024. // Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated func WithUserAgent(userAgent string) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithApplicationName(userAgent)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithApplicationName(userAgent)) return nil } } func WithRequestsType(requestsType string) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithRequestsType(requestsType)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithRequestsType(requestsType)) return nil } @@ -168,7 +168,7 @@ func WithRequestsType(requestsType string) Option { // // (Driver string will be required string param of ydb.Open) func WithConnectionString(connectionString string) Option { - return func(ctx context.Context, c *Driver) error { + return func(ctx context.Context, d *Driver) error { if connectionString == "" { return nil } @@ -178,8 +178,8 @@ func WithConnectionString(connectionString string) Option { fmt.Errorf("parse connection string '%s' failed: %w", connectionString, err), ) } - c.options = append(c.options, info.Options...) - c.userInfo = info.UserInfo + d.options = append(d.options, info.Options...) + d.userInfo = info.UserInfo return nil } @@ -187,8 +187,8 @@ func WithConnectionString(connectionString string) Option { // WithConnectionTTL defines duration for parking idle connections func WithConnectionTTL(ttl time.Duration) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithConnectionTTL(ttl)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithConnectionTTL(ttl)) return nil } @@ -202,8 +202,8 @@ func WithConnectionTTL(ttl time.Duration) Option { // // Deprecated: use dsn parameter in Open method func WithEndpoint(endpoint string) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithEndpoint(endpoint)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithEndpoint(endpoint)) return nil } @@ -217,8 +217,8 @@ func WithEndpoint(endpoint string) Option { // // Deprecated: use dsn parameter in Open method func WithDatabase(database string) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithDatabase(database)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithDatabase(database)) return nil } @@ -230,8 +230,8 @@ func WithDatabase(database string) Option { // // For making Driver string from endpoint+database+secure - use sugar.DSN() func WithSecure(secure bool) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithSecure(secure)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithSecure(secure)) return nil } @@ -241,8 +241,8 @@ func WithSecure(secure bool) Option { // // Warning: WithInsecure lost current TLS config. func WithInsecure() Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithSecure(false)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithSecure(false)) return nil } @@ -250,8 +250,8 @@ func WithInsecure() Option { // WithMinTLSVersion set minimum TLS version acceptable for connections func WithMinTLSVersion(minVersion uint16) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithMinTLSVersion(minVersion)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithMinTLSVersion(minVersion)) return nil } @@ -259,8 +259,8 @@ func WithMinTLSVersion(minVersion uint16) Option { // WithTLSSInsecureSkipVerify applies InsecureSkipVerify flag to TLS config func WithTLSSInsecureSkipVerify() Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithTLSSInsecureSkipVerify()) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithTLSSInsecureSkipVerify()) return nil } @@ -270,10 +270,10 @@ func WithTLSSInsecureSkipVerify() Option { // // See trace package documentation for details. func WithLogger(l log.Logger, details trace.Detailer, opts ...log.Option) Option { - return func(ctx context.Context, c *Driver) error { - c.logger = l - c.loggerOpts = opts - c.loggerDetails = details + return func(ctx context.Context, d *Driver) error { + d.logger = l + d.loggerOpts = opts + d.loggerDetails = details return nil } @@ -288,12 +288,12 @@ func WithAnonymousCredentials() Option { // WithCreateCredentialsFunc add callback funcion to provide requests credentials func WithCreateCredentialsFunc(createCredentials func(ctx context.Context) (credentials.Credentials, error)) Option { - return func(ctx context.Context, c *Driver) error { + return func(ctx context.Context, d *Driver) error { creds, err := createCredentials(ctx) if err != nil { return xerrors.WithStackTrace(err) } - c.options = append(c.options, config.WithCredentials(creds)) + d.options = append(d.options, config.WithCredentials(creds)) return nil } @@ -308,8 +308,8 @@ func WithCredentials(c credentials.Credentials) Option { } func WithBalancer(balancer *balancerConfig.Config) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithBalancer(balancer)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithBalancer(balancer)) return nil } @@ -319,8 +319,8 @@ func WithBalancer(balancer *balancerConfig.Config) Option { // // Default dial timeout is config.DefaultDialTimeout func WithDialTimeout(timeout time.Duration) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithDialTimeout(timeout)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithDialTimeout(timeout)) return nil } @@ -330,8 +330,8 @@ func WithDialTimeout(timeout time.Duration) Option { // // This option does not replace collected option, instead it will append provided options. func With(options ...config.Option) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, options...) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, options...) return nil } @@ -339,10 +339,10 @@ func With(options ...config.Option) Option { // MergeOptions concatentaes provided options to one cumulative value. func MergeOptions(opts ...Option) Option { - return func(ctx context.Context, c *Driver) error { + return func(ctx context.Context, d *Driver) error { for _, opt := range opts { if opt != nil { - if err := opt(ctx, c); err != nil { + if err := opt(ctx, d); err != nil { return xerrors.WithStackTrace(err) } } @@ -354,8 +354,8 @@ func MergeOptions(opts ...Option) Option { // WithDiscoveryInterval sets interval between cluster discovery calls. func WithDiscoveryInterval(discoveryInterval time.Duration) Option { - return func(ctx context.Context, c *Driver) error { - c.discoveryOptions = append(c.discoveryOptions, discoveryConfig.WithInterval(discoveryInterval)) + return func(ctx context.Context, d *Driver) error { + d.discoveryOptions = append(d.discoveryOptions, discoveryConfig.WithInterval(discoveryInterval)) return nil } @@ -365,8 +365,8 @@ func WithDiscoveryInterval(discoveryInterval time.Duration) Option { // // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental func WithRetryBudget(b budget.Budget) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithRetryBudget(b)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithRetryBudget(b)) return nil } @@ -374,8 +374,8 @@ func WithRetryBudget(b budget.Budget) Option { // WithTraceDriver appends trace.Driver into driver traces func WithTraceDriver(t trace.Driver, opts ...trace.DriverComposeOption) Option { //nolint:gocritic - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithTrace(t, opts...)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithTrace(t, opts...)) return nil } @@ -383,11 +383,11 @@ func WithTraceDriver(t trace.Driver, opts ...trace.DriverComposeOption) Option { // WithTraceRetry appends trace.Retry into retry traces func WithTraceRetry(t trace.Retry, opts ...trace.RetryComposeOption) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithTraceRetry(&t, append( []trace.RetryComposeOption{ - trace.WithRetryPanicCallback(c.panicCallback), + trace.WithRetryPanicCallback(d.panicCallback), }, opts..., )...), @@ -399,8 +399,8 @@ func WithTraceRetry(t trace.Retry, opts ...trace.RetryComposeOption) Option { // WithCertificate appends certificate to TLS config root certificates func WithCertificate(cert *x509.Certificate) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithCertificate(cert)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithCertificate(cert)) return nil } @@ -420,13 +420,13 @@ func WithCertificatesFromFile(caFile string, opts ...certificates.FromFileOption caFile = file } - return func(ctx context.Context, c *Driver) error { + return func(ctx context.Context, d *Driver) error { certs, err := certificates.FromFile(caFile, opts...) if err != nil { return xerrors.WithStackTrace(err) } for _, cert := range certs { - if err := WithCertificate(cert)(ctx, c); err != nil { + if err := WithCertificate(cert)(ctx, d); err != nil { return xerrors.WithStackTrace(err) } } @@ -440,8 +440,8 @@ func WithCertificatesFromFile(caFile string, opts ...certificates.FromFileOption // Warning: all early TLS config changes (such as WithCertificate, WithCertificatesFromFile, WithCertificatesFromPem, // WithMinTLSVersion, WithTLSSInsecureSkipVerify) will be lost func WithTLSConfig(tlsConfig *tls.Config) Option { - return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithTLSConfig(tlsConfig)) + return func(ctx context.Context, d *Driver) error { + d.options = append(d.options, config.WithTLSConfig(tlsConfig)) return nil } @@ -449,13 +449,13 @@ func WithTLSConfig(tlsConfig *tls.Config) Option { // WithCertificatesFromPem appends certificates from pem-encoded data to TLS config root certificates func WithCertificatesFromPem(bytes []byte, opts ...certificates.FromPemOption) Option { - return func(ctx context.Context, c *Driver) error { + return func(ctx context.Context, d *Driver) error { certs, err := certificates.FromPem(bytes, opts...) if err != nil { return xerrors.WithStackTrace(err) } for _, cert := range certs { - _ = WithCertificate(cert)(ctx, c) + _ = WithCertificate(cert)(ctx, d) } return nil @@ -465,8 +465,8 @@ func WithCertificatesFromPem(bytes []byte, opts ...certificates.FromPemOption) O // WithTableConfigOption collects additional configuration options for table.Client. // This option does not replace collected option, instead it will appen provided options. func WithTableConfigOption(option tableConfig.Option) Option { - return func(ctx context.Context, c *Driver) error { - c.tableOptions = append(c.tableOptions, option) + return func(ctx context.Context, d *Driver) error { + d.tableOptions = append(d.tableOptions, option) return nil } @@ -475,8 +475,8 @@ func WithTableConfigOption(option tableConfig.Option) Option { // WithQueryConfigOption collects additional configuration options for query.Client. // This option does not replace collected option, instead it will appen provided options. func WithQueryConfigOption(option queryConfig.Option) Option { - return func(ctx context.Context, c *Driver) error { - c.queryOptions = append(c.queryOptions, option) + return func(ctx context.Context, d *Driver) error { + d.queryOptions = append(d.queryOptions, option) return nil } @@ -518,10 +518,9 @@ func WithLazyTx(lazyTx bool) Option { // WithSessionPoolIdleThreshold defines interval for idle sessions func WithSessionPoolIdleThreshold(idleThreshold time.Duration) Option { - return func(ctx context.Context, c *Driver) error { - c.tableOptions = append(c.tableOptions, tableConfig.WithIdleThreshold(idleThreshold)) - c.databaseSQLOptions = append( - c.databaseSQLOptions, + return func(ctx context.Context, d *Driver) error { + d.tableOptions = append(d.tableOptions, tableConfig.WithIdleThreshold(idleThreshold)) + d.databaseSQLOptions = append(d.databaseSQLOptions, xsql.WithIdleThreshold(idleThreshold), ) @@ -532,8 +531,8 @@ func WithSessionPoolIdleThreshold(idleThreshold time.Duration) Option { // WithSessionPoolSessionIdleTimeToLive limits maximum time to live of idle session // If idleTimeToLive is less than or equal to zero then sessions will not be closed by idle func WithSessionPoolSessionIdleTimeToLive(idleThreshold time.Duration) Option { - return func(ctx context.Context, c *Driver) error { - c.queryOptions = append(c.queryOptions, queryConfig.WithSessionIdleTimeToLive(idleThreshold)) + return func(ctx context.Context, d *Driver) error { + d.queryOptions = append(d.queryOptions, queryConfig.WithSessionIdleTimeToLive(idleThreshold)) return nil } @@ -541,9 +540,9 @@ func WithSessionPoolSessionIdleTimeToLive(idleThreshold time.Duration) Option { // WithSessionPoolCreateSessionTimeout set timeout for new session creation process in table.Client func WithSessionPoolCreateSessionTimeout(createSessionTimeout time.Duration) Option { - return func(ctx context.Context, c *Driver) error { - c.tableOptions = append(c.tableOptions, tableConfig.WithCreateSessionTimeout(createSessionTimeout)) - c.queryOptions = append(c.queryOptions, queryConfig.WithSessionCreateTimeout(createSessionTimeout)) + return func(ctx context.Context, d *Driver) error { + d.tableOptions = append(d.tableOptions, tableConfig.WithCreateSessionTimeout(createSessionTimeout)) + d.queryOptions = append(d.queryOptions, queryConfig.WithSessionCreateTimeout(createSessionTimeout)) return nil } @@ -551,9 +550,9 @@ func WithSessionPoolCreateSessionTimeout(createSessionTimeout time.Duration) Opt // WithSessionPoolDeleteTimeout set timeout to gracefully close deleting session in table.Client func WithSessionPoolDeleteTimeout(deleteTimeout time.Duration) Option { - return func(ctx context.Context, c *Driver) error { - c.tableOptions = append(c.tableOptions, tableConfig.WithDeleteTimeout(deleteTimeout)) - c.queryOptions = append(c.queryOptions, queryConfig.WithSessionDeleteTimeout(deleteTimeout)) + return func(ctx context.Context, d *Driver) error { + d.tableOptions = append(d.tableOptions, tableConfig.WithDeleteTimeout(deleteTimeout)) + d.queryOptions = append(d.queryOptions, queryConfig.WithSessionDeleteTimeout(deleteTimeout)) return nil } @@ -565,7 +564,7 @@ func WithSessionPoolDeleteTimeout(deleteTimeout time.Duration) Option { // Will be removed after Oct 2024. // Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated func WithSessionPoolKeepAliveMinSize(keepAliveMinSize int) Option { - return func(ctx context.Context, c *Driver) error { return nil } + return func(ctx context.Context, d *Driver) error { return nil } } // WithSessionPoolKeepAliveTimeout set timeout of keep alive requests for session in table.Client @@ -574,13 +573,13 @@ func WithSessionPoolKeepAliveMinSize(keepAliveMinSize int) Option { // Will be removed after Oct 2024. // Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated func WithSessionPoolKeepAliveTimeout(keepAliveTimeout time.Duration) Option { - return func(ctx context.Context, c *Driver) error { return nil } + return func(ctx context.Context, d *Driver) error { return nil } } // WithIgnoreTruncated disables errors on truncated flag func WithIgnoreTruncated() Option { - return func(ctx context.Context, c *Driver) error { - c.tableOptions = append(c.tableOptions, tableConfig.WithIgnoreTruncated()) + return func(ctx context.Context, d *Driver) error { + d.tableOptions = append(d.tableOptions, tableConfig.WithIgnoreTruncated()) return nil } @@ -591,9 +590,9 @@ func WithIgnoreTruncated() Option { // (before `WithTrace{Driver,Table,Scheme,Scripting,Coordination,Ratelimiter}` and other options) // If not defined - panic would not intercept with driver func WithPanicCallback(panicCallback func(e interface{})) Option { - return func(ctx context.Context, c *Driver) error { - c.panicCallback = panicCallback - c.options = append(c.options, config.WithPanicCallback(panicCallback)) + return func(ctx context.Context, d *Driver) error { + d.panicCallback = panicCallback + d.options = append(d.options, config.WithPanicCallback(panicCallback)) return nil } @@ -601,8 +600,8 @@ func WithPanicCallback(panicCallback func(e interface{})) Option { // WithSharedBalancer sets balancer from parent driver to child driver func WithSharedBalancer(parent *Driver) Option { - return func(ctx context.Context, c *Driver) error { - c.balancer = newBalancerWithMeta(parent.balancer.balancer, c.config.Meta()) + return func(ctx context.Context, d *Driver) error { + d.metaBalancer = &balancerWithMeta{balancer: parent.metaBalancer.balancer, meta: d.config.Meta()} return nil } @@ -610,14 +609,14 @@ func WithSharedBalancer(parent *Driver) Option { // WithTraceTable appends trace.Table into table traces func WithTraceTable(t trace.Table, opts ...trace.TableComposeOption) Option { //nolint:gocritic - return func(ctx context.Context, c *Driver) error { - c.tableOptions = append( - c.tableOptions, + return func(ctx context.Context, d *Driver) error { + d.tableOptions = append( + d.tableOptions, tableConfig.WithTrace( &t, append( []trace.TableComposeOption{ - trace.WithTablePanicCallback(c.panicCallback), + trace.WithTablePanicCallback(d.panicCallback), }, opts..., )..., @@ -630,13 +629,12 @@ func WithTraceTable(t trace.Table, opts ...trace.TableComposeOption) Option { // // WithTraceQuery appends trace.Query into query traces func WithTraceQuery(t trace.Query, opts ...trace.QueryComposeOption) Option { //nolint:gocritic - return func(ctx context.Context, c *Driver) error { - c.queryOptions = append( - c.queryOptions, + return func(ctx context.Context, d *Driver) error { + d.queryOptions = append(d.queryOptions, queryConfig.WithTrace(&t, append( []trace.QueryComposeOption{ - trace.WithQueryPanicCallback(c.panicCallback), + trace.WithQueryPanicCallback(d.panicCallback), }, opts..., )..., @@ -649,14 +647,13 @@ func WithTraceQuery(t trace.Query, opts ...trace.QueryComposeOption) Option { // // WithTraceScripting scripting trace option func WithTraceScripting(t trace.Scripting, opts ...trace.ScriptingComposeOption) Option { - return func(ctx context.Context, c *Driver) error { - c.scriptingOptions = append( - c.scriptingOptions, + return func(ctx context.Context, d *Driver) error { + d.scriptingOptions = append(d.scriptingOptions, scriptingConfig.WithTrace( t, append( []trace.ScriptingComposeOption{ - trace.WithScriptingPanicCallback(c.panicCallback), + trace.WithScriptingPanicCallback(d.panicCallback), }, opts..., )..., @@ -669,14 +666,13 @@ func WithTraceScripting(t trace.Scripting, opts ...trace.ScriptingComposeOption) // WithTraceScheme returns scheme trace option func WithTraceScheme(t trace.Scheme, opts ...trace.SchemeComposeOption) Option { - return func(ctx context.Context, c *Driver) error { - c.schemeOptions = append( - c.schemeOptions, + return func(ctx context.Context, d *Driver) error { + d.schemeOptions = append(d.schemeOptions, schemeConfig.WithTrace( t, append( []trace.SchemeComposeOption{ - trace.WithSchemePanicCallback(c.panicCallback), + trace.WithSchemePanicCallback(d.panicCallback), }, opts..., )..., @@ -689,14 +685,13 @@ func WithTraceScheme(t trace.Scheme, opts ...trace.SchemeComposeOption) Option { // WithTraceCoordination returns coordination trace option func WithTraceCoordination(t trace.Coordination, opts ...trace.CoordinationComposeOption) Option { //nolint:gocritic - return func(ctx context.Context, c *Driver) error { - c.coordinationOptions = append( - c.coordinationOptions, + return func(ctx context.Context, d *Driver) error { + d.coordinationOptions = append(d.coordinationOptions, coordinationConfig.WithTrace( &t, append( []trace.CoordinationComposeOption{ - trace.WithCoordinationPanicCallback(c.panicCallback), + trace.WithCoordinationPanicCallback(d.panicCallback), }, opts..., )..., @@ -709,14 +704,13 @@ func WithTraceCoordination(t trace.Coordination, opts ...trace.CoordinationCompo // WithTraceRatelimiter returns ratelimiter trace option func WithTraceRatelimiter(t trace.Ratelimiter, opts ...trace.RatelimiterComposeOption) Option { - return func(ctx context.Context, c *Driver) error { - c.ratelimiterOptions = append( - c.ratelimiterOptions, + return func(ctx context.Context, d *Driver) error { + d.ratelimiterOptions = append(d.ratelimiterOptions, ratelimiterConfig.WithTrace( t, append( []trace.RatelimiterComposeOption{ - trace.WithRatelimiterPanicCallback(c.panicCallback), + trace.WithRatelimiterPanicCallback(d.panicCallback), }, opts..., )..., @@ -729,8 +723,8 @@ func WithTraceRatelimiter(t trace.Ratelimiter, opts ...trace.RatelimiterComposeO // WithRatelimiterOptions returns reatelimiter option func WithRatelimiterOptions(opts ...ratelimiterConfig.Option) Option { - return func(ctx context.Context, c *Driver) error { - c.ratelimiterOptions = append(c.ratelimiterOptions, opts...) + return func(ctx context.Context, d *Driver) error { + d.ratelimiterOptions = append(d.ratelimiterOptions, opts...) return nil } @@ -738,14 +732,13 @@ func WithRatelimiterOptions(opts ...ratelimiterConfig.Option) Option { // WithTraceDiscovery adds configured discovery tracer to Driver func WithTraceDiscovery(t trace.Discovery, opts ...trace.DiscoveryComposeOption) Option { - return func(ctx context.Context, c *Driver) error { - c.discoveryOptions = append( - c.discoveryOptions, + return func(ctx context.Context, d *Driver) error { + d.discoveryOptions = append(d.discoveryOptions, discoveryConfig.WithTrace( t, append( []trace.DiscoveryComposeOption{ - trace.WithDiscoveryPanicCallback(c.panicCallback), + trace.WithDiscoveryPanicCallback(d.panicCallback), }, opts..., )..., @@ -758,14 +751,13 @@ func WithTraceDiscovery(t trace.Discovery, opts ...trace.DiscoveryComposeOption) // WithTraceTopic adds configured discovery tracer to Driver func WithTraceTopic(t trace.Topic, opts ...trace.TopicComposeOption) Option { //nolint:gocritic - return func(ctx context.Context, c *Driver) error { - c.topicOptions = append( - c.topicOptions, + return func(ctx context.Context, d *Driver) error { + d.topicOptions = append(d.topicOptions, topicoptions.WithTrace( t, append( []trace.TopicComposeOption{ - trace.WithTopicPanicCallback(c.panicCallback), + trace.WithTopicPanicCallback(d.panicCallback), }, opts..., )..., @@ -778,14 +770,13 @@ func WithTraceTopic(t trace.Topic, opts ...trace.TopicComposeOption) Option { // // WithTraceDatabaseSQL adds configured discovery tracer to Driver func WithTraceDatabaseSQL(t trace.DatabaseSQL, opts ...trace.DatabaseSQLComposeOption) Option { //nolint:gocritic - return func(ctx context.Context, c *Driver) error { - c.databaseSQLOptions = append( - c.databaseSQLOptions, + return func(ctx context.Context, d *Driver) error { + d.databaseSQLOptions = append(d.databaseSQLOptions, xsql.WithTrace( &t, append( []trace.DatabaseSQLComposeOption{ - trace.WithDatabaseSQLPanicCallback(c.panicCallback), + trace.WithDatabaseSQLPanicCallback(d.panicCallback), }, opts..., )..., @@ -799,16 +790,16 @@ func WithTraceDatabaseSQL(t trace.DatabaseSQL, opts ...trace.DatabaseSQLComposeO // Private technical options for correct copies processing func withOnClose(onClose func(c *Driver)) Option { - return func(ctx context.Context, c *Driver) error { - c.onClose = append(c.onClose, onClose) + return func(ctx context.Context, d *Driver) error { + d.onClose = append(d.onClose, onClose) return nil } } func withConnPool(pool *conn.Pool) Option { - return func(ctx context.Context, c *Driver) error { - c.pool = pool + return func(ctx context.Context, d *Driver) error { + d.pool = pool return pool.Take(ctx) } diff --git a/with.go b/with.go index b2d4c1776..1d004b81c 100644 --- a/with.go +++ b/with.go @@ -48,7 +48,7 @@ func (d *Driver) With(ctx context.Context, opts ...Option) (*Driver, error) { } onDone := trace.DriverOnWith( - d.trace(), &ctx, + d.config.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/ydb.(*Driver).With"), d.config.Endpoint(), d.config.Database(), d.config.Secure(), )