Skip to content

Commit

Permalink
wrapped error BAD_SESSION in query session calls
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Jun 17, 2024
1 parent 96864d9 commit 8ee1b6a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 4 deletions.
1 change: 1 addition & 0 deletions internal/query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func defaults() *Config {
sessionCreateTimeout: DefaultSessionCreateTimeout,
sessionDeleteTimeout: DefaultSessionDeleteTimeout,
trace: &trace.Query{},
useSessionPool: true,
}
}

Expand Down
19 changes: 19 additions & 0 deletions internal/query/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync/atomic"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
Expand Down Expand Up @@ -78,6 +79,16 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
cfg: cfg,
grpcClient: client,
statusCode: statusUnknown,
checks: []func(s *Session) bool{
func(s *Session) bool {
switch s.status() {
case statusClosed, statusClosing, statusError:
return false
default:
return true
}
},
},
}
defer func() {
if finalErr != nil && s != nil {
Expand Down Expand Up @@ -263,6 +274,10 @@ func (s *Session) Begin(

tx, err = begin(ctx, s.grpcClient, s, txSettings)
if err != nil {
if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION) {
s.setStatus(statusClosed)
}

return nil, xerrors.WithStackTrace(err)
}
tx.s = s
Expand Down Expand Up @@ -301,6 +316,10 @@ func (s *Session) Execute(

tx, r, err := execute(ctx, s, s.grpcClient, q, options.ExecuteSettings(opts...))
if err != nil {
if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION) {
s.setStatus(statusClosed)
}

return nil, nil, xerrors.WithStackTrace(err)
}

Expand Down
31 changes: 27 additions & 4 deletions internal/query/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
Expand Down Expand Up @@ -80,6 +81,10 @@ func (tx transaction) Execute(ctx context.Context, q string, opts ...options.TxE

_, res, err := execute(ctx, tx.s, tx.s.grpcClient, q, options.TxExecuteSettings(tx.ID(), opts...).ExecuteSettings)
if err != nil {
if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION) {
tx.s.setStatus(statusClosed)
}

return nil, xerrors.WithStackTrace(err)
}

Expand All @@ -98,8 +103,17 @@ func commitTx(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi
return nil
}

func (tx transaction) CommitTx(ctx context.Context) (err error) {
return commitTx(ctx, tx.s.grpcClient, tx.s.id, tx.ID())
func (tx transaction) CommitTx(ctx context.Context) error {
err := commitTx(ctx, tx.s.grpcClient, tx.s.id, tx.ID())
if err != nil {
if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION) {
tx.s.setStatus(statusClosed)
}

return xerrors.WithStackTrace(err)
}

return nil
}

func rollback(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID, txID string) error {
Expand All @@ -114,6 +128,15 @@ func rollback(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi
return nil
}

func (tx transaction) Rollback(ctx context.Context) (err error) {
return rollback(ctx, tx.s.grpcClient, tx.s.id, tx.ID())
func (tx transaction) Rollback(ctx context.Context) error {
err := rollback(ctx, tx.s.grpcClient, tx.s.id, tx.ID())
if err != nil {
if xerrors.IsOperationError(err, Ydb.StatusIds_BAD_SESSION) {
tx.s.setStatus(statusClosed)
}

return xerrors.WithStackTrace(err)
}

return nil
}

0 comments on commit 8ee1b6a

Please sign in to comment.