diff --git a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs index 38533cb9..09443a0c 100644 --- a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs +++ b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs @@ -99,31 +99,11 @@ public class RollbackTransactionSettings : RequestSettings public class CreateSessionResponse : ResponseWithResultBase { - // public Session? Session { get; } - - // private CreateSessionResponse(Ydb.Query.CreateSessionResponse proto, Session? session = null) - // : base(Status.FromProto(proto.Status, proto.Issues)) - // { - // Session = session; - // } internal CreateSessionResponse(Status status, ResultData? result = null) : base(status, result) { } - // internal static CreateSessionResponse FromProto(Ydb.Query.CreateSessionResponse proto, Driver driver, - // string endpoint) - // { - // var session = new Session( - // driver: driver, - // sessionPool: null, - // id: proto.SessionId, - // nodeId: proto.NodeId, - // endpoint: endpoint - // ); - // return new CreateSessionResponse(proto, session); - // } - public class ResultData { private ResultData(Session session) @@ -201,10 +181,6 @@ protected override SessionState MakeResponse(Status status) } } -public class QueryStats -{ -} - public class ExecuteQueryResponsePart : ResponseBase { public long? ResultSetIndex; @@ -322,6 +298,65 @@ internal static RollbackTransactionResponse FromProto(Ydb.Query.RollbackTransact } } +// =========================================================================== +// WARNING: temporary structs just for testing should be removed +// =========================================================================== +public class BeginCrutchTxSettings : OperationRequestSettings +{ +} + +public class CommitCrutchTxSettings : OperationRequestSettings +{ +} + +public class RollbackCrutchTxSettings : OperationRequestSettings +{ +} + +internal class BeginCrutchTxResponse : ResponseWithResultBase +{ + internal BeginCrutchTxResponse(Status status, ResultData? result = null) : base(status, result) + { + } + + public class ResultData + { + public Tx? Tx { get; } + + public ResultData(Tx? tx) + { + Tx = tx; + } + + internal static ResultData FromProto(Ydb.Table.BeginTransactionResult resultProto) + { + var txId = resultProto.TxMeta.Id; + // var tx = new Tx { TxId = resultProto.TxMeta.Id }; + var tx = new Tx(new TransactionControl { TxId = txId }); + var result = new ResultData(tx); + + return result; + } + } +} + +internal class CommitCrutchTxResponse : ResponseBase +{ + internal CommitCrutchTxResponse(Status status) : base(status) + { + } +} + +internal class RollbackCrutchTxResponse : ResponseBase +{ + internal RollbackCrutchTxResponse(Status status) : base(status) + { + } +} +// =========================================================================== +// END WARNING: temporary structs just for testing should be removed +// =========================================================================== + public class QueryClientConfig { public SessionPoolConfig SessionPoolConfig { get; } @@ -601,10 +636,12 @@ public async Task Query(string queryString, Func> Rollback(Session session, Tx tx, Status status) { + // TODO remove crutch calls _logger.LogTrace($"Transaction {tx.TxId} not committed, try to rollback"); try { - var rollbackResponse = await RollbackTransaction(session.Id, tx); + // var rollbackResponse = await RollbackTransaction(session.Id, tx); + var rollbackResponse = await RollbackCrutchTx(session.Id, tx); rollbackResponse.EnsureSuccess(); } catch (StatusUnsuccessfulException e) @@ -623,9 +660,15 @@ public async Task> DoTx(Func> func, var response = await ExecOnSession( async session => { - var beginTransactionResponse = await BeginTransaction(session.Id, Tx.Begin(txModeSettings)); + // var beginTransactionResponse = await BeginTransaction(session.Id, Tx.Begin(txModeSettings)); + // beginTransactionResponse.EnsureSuccess(); + // var tx = beginTransactionResponse.Tx; + // tx.QueryClient = this; + // tx.SessionId = session.Id; + + var beginTransactionResponse = await BeginCrutchTx(session.Id, Tx.Begin(txModeSettings)); beginTransactionResponse.EnsureSuccess(); - var tx = beginTransactionResponse.Tx; + var tx = beginTransactionResponse.Result.Tx!; tx.QueryClient = this; tx.SessionId = session.Id; @@ -644,7 +687,8 @@ public async Task> DoTx(Func> func, new Status(StatusCode.InternalError, $"Failed to execute lambda on tx {tx.TxId}: {e.Message}")); } - var commitResponse = await CommitTransaction(session.Id, tx); + // var commitResponse = await CommitTransaction(session.Id, tx); + var commitResponse = await CommitCrutchTx(session.Id, tx); if (!commitResponse.Status.IsSuccess) { return await Rollback(session, tx, commitResponse.Status); @@ -704,6 +748,104 @@ private void Dispose(bool disposing) _disposed = true; } + + +// =========================================================================== +// WARNING: temporary methods just for testing should be removed +// =========================================================================== + internal async Task BeginCrutchTx(string sessionId, + Tx tx, BeginCrutchTxSettings? settings = null) + { + settings ??= new BeginCrutchTxSettings(); + var request = new Ydb.Table.BeginTransactionRequest + { + SessionId = sessionId, + OperationParams = MakeOperationParams(settings), + TxSettings = new Ydb.Table.TransactionSettings + { SerializableReadWrite = new Ydb.Table.SerializableModeSettings() } + }; + try + { + var response = await Driver.UnaryCall( + method: Ydb.Table.V1.TableService.BeginTransactionMethod, + request: request, + settings: settings + ); + var status = UnpackOperation(response.Data.Operation, out Ydb.Table.BeginTransactionResult? resultProto); + BeginCrutchTxResponse.ResultData? result = null; + + if (status.IsSuccess && resultProto is not null) + { + result = BeginCrutchTxResponse.ResultData.FromProto(resultProto); + } + + return new BeginCrutchTxResponse(status, result); + } + catch (Driver.TransportException e) + { + return new BeginCrutchTxResponse(e.Status); + } + } + + internal async Task CommitCrutchTx(string sessionId, Tx tx, + CommitCrutchTxSettings? settings = null) + { + settings ??= new CommitCrutchTxSettings(); + var request = new Ydb.Table.CommitTransactionRequest + { + SessionId = sessionId, + OperationParams = MakeOperationParams(settings), + TxId = tx.TxId + }; + try + { + var response = await Driver.UnaryCall( + method: Ydb.Table.V1.TableService.CommitTransactionMethod, + request: request, + settings: settings + ); + var status = UnpackOperation(response.Data.Operation); + + return new CommitCrutchTxResponse(status); + } + catch (Driver.TransportException e) + { + return new CommitCrutchTxResponse(e.Status); + } + } + + internal async Task RollbackCrutchTx(string sessionId, Tx tx, + RollbackCrutchTxSettings? settings = null) + { + settings ??= new RollbackCrutchTxSettings(); + var request = new Ydb.Table.RollbackTransactionRequest + { + SessionId = sessionId, + OperationParams = MakeOperationParams(settings), + TxId = tx.TxId + }; + + try + { + var response = await Driver.UnaryCall( + method: Ydb.Table.V1.TableService.RollbackTransactionMethod, + request: request, + settings: settings + ); + + var status = UnpackOperation(response.Data.Operation); + + return new RollbackCrutchTxResponse(status); + } + catch (Driver.TransportException e) + { + return new RollbackCrutchTxResponse(e.Status); + } + } + +// =========================================================================== +// END WARNING: temporary methods just for testing should be removed +// =========================================================================== } public class QueryResponse : ResponseBase diff --git a/src/Ydb.Sdk/src/Services/Query/Tx.cs b/src/Ydb.Sdk/src/Services/Query/Tx.cs index fab97b6a..195745fb 100644 --- a/src/Ydb.Sdk/src/Services/Query/Tx.cs +++ b/src/Ydb.Sdk/src/Services/Query/Tx.cs @@ -49,7 +49,7 @@ public Tx() _proto = new TransactionControl(); } - private Tx(TransactionControl proto) + internal Tx(TransactionControl proto) { _proto = proto; } diff --git a/src/Ydb.Sdk/tests/Query/TestExecuteQuery.cs b/src/Ydb.Sdk/tests/Query/TestExecuteQuery.cs index 051cdcd8..f6d02535 100644 --- a/src/Ydb.Sdk/tests/Query/TestExecuteQuery.cs +++ b/src/Ydb.Sdk/tests/Query/TestExecuteQuery.cs @@ -112,7 +112,7 @@ public async Task Tx() var resultSet = part.ResultSet; if (resultSet is not null) { - titles.AddRange(resultSet.Rows.Select(row => row["title"].GetUtf8())); + titles.AddRange(resultSet.Rows.Select(row => row["title"].GetOptionalUtf8()!)); } }