From 8c9d588bfe96a21ecf6818fac2d7b2196afde595 Mon Sep 17 00:00:00 2001 From: XmasApple Date: Mon, 23 Oct 2023 19:38:25 +0300 Subject: [PATCH] Transaction methods --- src/Ydb.Sdk/src/Services/Query/QueryClient.cs | 317 +++++++++++++++--- src/Ydb.Sdk/src/Services/Query/Session.cs | 50 --- src/Ydb.Sdk/src/Services/Query/Tx.cs | 147 ++++---- src/Ydb.Sdk/tests/Query/TestSession.cs | 8 +- src/Ydb.Sdk/tests/Query/TestTransactions.cs | 25 ++ 5 files changed, 382 insertions(+), 165 deletions(-) create mode 100644 src/Ydb.Sdk/tests/Query/TestTransactions.cs diff --git a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs index a358316d..a62bcbf8 100644 --- a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs +++ b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs @@ -12,64 +12,63 @@ public enum TxMode StaleRO } -public class RetrySettings +public enum ExecMode { - public bool IsIdempotent; -} - -// public class ExecuteQuerySettings : RequestSettings -// { -// } - -public class ExecuteQueryPart : ResponseWithResultBase -{ - protected ExecuteQueryPart(Status status, ResultData? result = null) : base(status, result) - { - } + Unspecified = 0, + Parse = 10, + Validate = 20, + Explain = 30, - public class ResultData - { - public Value.ResultSet? ResultSet => throw new NotImplementedException(); - } + // reserved 40; // EXEC_MODE_PREPARE + Execute = 50 } -public class ExecuteQueryStream : StreamResponse, - IAsyncEnumerable, IAsyncEnumerator +public enum Syntax { - public object ExecStats { get; } + Unspecified = 0, - internal ExecuteQueryStream(Driver.StreamIterator iterator) : base(iterator) - { - throw new NotImplementedException(); - } - - protected override ExecuteQueryPart MakeResponse(ExecuteQueryResponse protoResponse) - { - throw new NotImplementedException(); - } + /// + /// YQL + /// + YqlV1 = 1, - protected override ExecuteQueryPart MakeResponse(Status status) - { - throw new NotImplementedException(); - } - - public IAsyncEnumerator GetAsyncEnumerator( - CancellationToken cancellationToken = new CancellationToken()) - { - throw new NotImplementedException(); - } + /// + /// PostgresQL + /// + Pg = 2 +} - public ValueTask DisposeAsync() - { - throw new NotImplementedException(); - } +public enum StatsMode +{ + Unspecified = 0, + + /// + /// Stats collection is disabled + /// + None = 10, + + /// + /// Aggregated stats of reads, updates and deletes per table + /// + Basic = 20, + + /// + /// Add execution stats and plan on top of STATS_MODE_BASIC + /// + Full = 30, + + /// + /// Detailed execution stats including stats for individual tasks and channels + /// + Profile = 40 +} - public ValueTask MoveNextAsync() - { - throw new NotImplementedException(); - } +public class ExecuteQuerySettings : RequestSettings +{ + public ExecMode ExecMode { get; set; } + public Syntax Syntax { get; set; } - public ExecuteQueryPart Current { get; } + public StatsMode StatsMode { get; set; } } public class CreateSessionSettings : RequestSettings @@ -84,6 +83,18 @@ public class AttachSessionSettings : RequestSettings { } +public class BeginTransactionSettings : RequestSettings +{ +} + +public class CommitTransactionSettings : RequestSettings +{ +} + +public class RollbackTransactionSettings : RequestSettings +{ +} + public class CreateSessionResponse : ResponseBase { public Session? Session { get; } @@ -98,7 +109,7 @@ private CreateSessionResponse(Ydb.Query.CreateSessionResponse proto, Session? se Session = session; } - public static CreateSessionResponse FromProto(Ydb.Query.CreateSessionResponse proto, Driver driver, + internal static CreateSessionResponse FromProto(Ydb.Query.CreateSessionResponse proto, Driver driver, string endpoint) { var session = new Session( @@ -123,7 +134,7 @@ private DeleteSessionResponse(Ydb.Query.DeleteSessionResponse proto) { } - public static DeleteSessionResponse FromProto(Ydb.Query.DeleteSessionResponse proto) + internal static DeleteSessionResponse FromProto(Ydb.Query.DeleteSessionResponse proto) { return new DeleteSessionResponse(proto); } @@ -140,7 +151,7 @@ private SessionState(Ydb.Query.SessionState proto) { } - public static SessionState FromProto(Ydb.Query.SessionState proto) + internal static SessionState FromProto(Ydb.Query.SessionState proto) { return new SessionState(proto); } @@ -163,6 +174,115 @@ protected override SessionState MakeResponse(Status status) } } +public class ExecuteQueryResponsePart : ResponseBase +{ + internal ExecuteQueryResponsePart(Status status) : base(status) + { + } + + private ExecuteQueryResponsePart(Ydb.Query.ExecuteQueryResponsePart proto) + : base(Status.FromProto(proto.Status, proto.Issues)) + { + } + + internal static ExecuteQueryResponsePart FromProto(Ydb.Query.ExecuteQueryResponsePart proto) + { + return new ExecuteQueryResponsePart(proto); + } +} + +public class ExecuteQueryStream : StreamResponse + , IAsyncEnumerable, IAsyncEnumerator +{ + public object ExecStats { get; } + + internal ExecuteQueryStream(Driver.StreamIterator iterator) : base(iterator) + { + } + + protected override ExecuteQueryResponsePart MakeResponse(Ydb.Query.ExecuteQueryResponsePart protoResponse) + { + return ExecuteQueryResponsePart.FromProto(protoResponse); + } + + protected override ExecuteQueryResponsePart MakeResponse(Status status) + { + return new ExecuteQueryResponsePart(status); + } + + public IAsyncEnumerator GetAsyncEnumerator( + CancellationToken cancellationToken = new CancellationToken()) + { + throw new NotImplementedException(); + } + + public ValueTask DisposeAsync() + { + throw new NotImplementedException(); + } + + public ValueTask MoveNextAsync() + { + throw new NotImplementedException(); + } + + public ExecuteQueryResponsePart Current { get; } +} + +public class BeginTransactionResponse : ResponseBase +{ + internal BeginTransactionResponse(Status status) : base(status) + { + } + + public Tx Tx { get; } = new(); + + private BeginTransactionResponse(Ydb.Query.BeginTransactionResponse proto) : base( + Status.FromProto(proto.Status, proto.Issues)) + { + Tx.TxId = proto.TxMeta.Id; + } + + internal static BeginTransactionResponse FromProto(Ydb.Query.BeginTransactionResponse proto) + { + return new BeginTransactionResponse(proto); + } +} + +public class CommitTransactionResponse : ResponseBase +{ + internal CommitTransactionResponse(Status status) : base(status) + { + } + + private CommitTransactionResponse(Ydb.Query.CommitTransactionResponse proto) : base( + Status.FromProto(proto.Status, proto.Issues)) + { + } + + internal static CommitTransactionResponse FromProto(Ydb.Query.CommitTransactionResponse proto) + { + return new CommitTransactionResponse(proto); + } +} + +public class RollbackTransactionResponse : ResponseBase +{ + internal RollbackTransactionResponse(Status status) : base(status) + { + } + + private RollbackTransactionResponse(Ydb.Query.RollbackTransactionResponse proto) : base( + Status.FromProto(proto.Status, proto.Issues)) + { + } + + internal static RollbackTransactionResponse FromProto(Ydb.Query.RollbackTransactionResponse proto) + { + return new RollbackTransactionResponse(proto); + } +} + public class QueryClient : ClientBase, IDisposable @@ -230,18 +350,107 @@ public SessionStateStream AttachSession(string sessionId, AttachSessionSettings? return new SessionStateStream(streamIterator); } - internal async Task BeginTransaction() + public async Task BeginTransaction( + string sessionId, + Tx tx, + BeginTransactionSettings? settings = null) + { + settings ??= new BeginTransactionSettings(); + + var request = new BeginTransactionRequest { SessionId = sessionId, TxSettings = tx.ToProto().BeginTx }; + try + { + var response = await Driver.UnaryCall( + QueryService.BeginTransactionMethod, + request: request, + settings: settings + ); + return BeginTransactionResponse.FromProto(response.Data); + } + catch (Driver.TransportException e) + { + return new BeginTransactionResponse(e.Status); + } + } + + public async Task CommitTransaction( + string sessionId, + Tx tx, + CommitTransactionSettings? settings = null) { + settings ??= new CommitTransactionSettings(); + + var request = new CommitTransactionRequest { SessionId = sessionId, TxId = tx.TxId }; + + try + { + var response = await Driver.UnaryCall( + QueryService.CommitTransactionMethod, + request: request, + settings: settings + ); + return CommitTransactionResponse.FromProto(response.Data); + } + catch (Driver.TransportException e) + { + return new CommitTransactionResponse(e.Status); + } } - internal async Task CommitTransaction() + public async Task RollbackTransaction( + string sessionId, + Tx tx, + RollbackTransactionSettings? settings = null) { + settings ??= new RollbackTransactionSettings(); + + var request = new RollbackTransactionRequest { SessionId = sessionId, TxId = tx.TxId }; + try + { + var response = await Driver.UnaryCall( + QueryService.RollbackTransactionMethod, + request: request, + settings: settings + ); + return RollbackTransactionResponse.FromProto(response.Data); + } + catch (Driver.TransportException e) + { + return new RollbackTransactionResponse(e.Status); + } } - internal async Task RollbackTransaction() + + public ExecuteQueryStream ExecuteQuery( + string sessionId, + string queryString, + Tx tx, + IReadOnlyDictionary? parameters, + ExecuteQuerySettings? settings = null) { + settings ??= new ExecuteQuerySettings(); + parameters ??= new Dictionary(); + + var request = new ExecuteQueryRequest + { + SessionId = sessionId, + ExecMode = (Ydb.Query.ExecMode)settings.ExecMode, + TxControl = tx.ToProto(), + QueryContent = new QueryContent { Syntax = (Ydb.Query.Syntax)settings.Syntax, Text = queryString }, + StatsMode = (Ydb.Query.StatsMode)settings.StatsMode + }; + + request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto())); + + var streamIterator = Driver.StreamCall( + method: QueryService.ExecuteQueryMethod, + request: request, + settings: settings); + + return new ExecuteQueryStream(streamIterator); } + public async Task Query(string query, Dictionary parameters, Func func, TxMode txMode = TxMode.SerializableRW) { diff --git a/src/Ydb.Sdk/src/Services/Query/Session.cs b/src/Ydb.Sdk/src/Services/Query/Session.cs index 9a963852..eb474edb 100644 --- a/src/Ydb.Sdk/src/Services/Query/Session.cs +++ b/src/Ydb.Sdk/src/Services/Query/Session.cs @@ -3,30 +3,6 @@ namespace Ydb.Sdk.Services.Query; -public enum QueryExecMode -{ - Unspecified = 0, - Parse = 10, - Validate = 20, - Explain = 30, - - // reserved 40; // EXEC_MODE_PREPARE - Execute = 50 -} - -public enum QuerySyntax -{ - Unspecified = 0, - YqlV1 = 1, // YQL - Pg = 2 // PostgresQL -} - -public class ExecuteQueryResponse : IResponse -{ - // Заглушка, надо вытянуть из Ydb.Protos - public Status Status => throw new NotImplementedException(); -} - /// /// Sessions are basic primitives for communicating with YDB Query Service. The are similar to /// connections for classic relational DBs. Sessions serve three main purposes: @@ -54,32 +30,6 @@ internal Session(Driver driver, SessionPool? sessionPool, string id, long nodeId internal string? Endpoint { get; } - - // public ExecuteQueryStream ExecuteQuery( - // string query, - // Tx tx, - // IReadOnlyDictionary? parameters, - // QuerySyntax syntax, - // ExecuteQuerySettings? settings = null) - // { - // // settings ??= new ExecuteQuerySettings(); - // // parameters ??= new Dictionary(); - // - // // var request = new ExecuteQueryRequest - // // { - // // SessionId = Id, - // // ... - // // }; - // - // // var streamIterator = Driver.StreamCall( - // // method: QueryService.ExecuteQuery, - // // request: request, - // // settings: settings); - // // - // // return new ExecuteQueryStream(streamIterator); - // - // throw new NotImplementedException(); - // } // // // public ExecuteQueryStream ExecuteQueryYql( diff --git a/src/Ydb.Sdk/src/Services/Query/Tx.cs b/src/Ydb.Sdk/src/Services/Query/Tx.cs index 1c435b31..2bf497d3 100644 --- a/src/Ydb.Sdk/src/Services/Query/Tx.cs +++ b/src/Ydb.Sdk/src/Services/Query/Tx.cs @@ -1,69 +1,104 @@ +using Ydb.Query; using Ydb.Sdk.Value; namespace Ydb.Sdk.Services.Query; -// public class CommitTxResponse -// { -// public void EnsureSuccess() -// { -// throw new NotImplementedException(); -// } -// } - public class RollbackTxResponse { } +public interface ITxModeSettings +{ +} + +public class SerializableModeSettings : ITxModeSettings +{ +} + +public class OnlineModeSettings : ITxModeSettings +{ + public OnlineModeSettings(bool allowInconsistentReads = false) + { + AllowInconsistentReads = allowInconsistentReads; + } + + public bool AllowInconsistentReads { get; } +} + +public class StaleModeSettings : ITxModeSettings +{ +} + +public class SnapshotModeSettings : ITxModeSettings +{ +} + public class Tx - // : IDisposable, IAsyncDisposable { - // public object TxSettings; - // - // private Tx() - // { - // throw new NotImplementedException(); - // } - // - // public static Tx Begin() - // { - // throw new NotImplementedException(); - // } - - // public Tx WithCommit() - // { - // return this; - // } - // - // // calls rpc CommitTransaction(Query.CommitTransactionRequest) returns (Query.CommitTransactionResponse); - // public CommitTxResponse Commit() - // { - // throw new NotImplementedException(); - // } - // - // // calls rpc RollbackTransaction(Query.RollbackTransactionRequest) returns (Query.RollbackTransactionResponse); - // public RollbackTxResponse Rollback() - // { - // throw new NotImplementedException(); - // } - // - // public void Dispose() - // { - // throw new NotImplementedException(); - // } - // - // public ValueTask DisposeAsync() - // { - // throw new NotImplementedException(); - // } - // - // - // public ExecuteQueryStream ExecuteQueryYql( - // string query, - // IReadOnlyDictionary? parameters = null, - // ExecuteQuerySettings? settings = null) - // { - // throw new NotImplementedException(); - // } + public string? TxId + { + get => _proto.TxId; + set + { + _proto.TxId = value; + if (!string.IsNullOrEmpty(value)) + _proto.BeginTx = null; + } + } + + private TransactionControl _proto; + + public Tx() + { + } + + private Tx(TransactionControl proto) + { + _proto = proto; + } + + internal TransactionControl ToProto() + { + return _proto; + } + + public static Tx Begin(ITxModeSettings? txModeSettings = null, bool commit = true) + { + txModeSettings ??= new SerializableModeSettings(); + + var txSettings = GetTransactionSettings(txModeSettings); + + var tx = new Tx(new TransactionControl { BeginTx = txSettings, CommitTx = commit }); + return tx; + } + + private static TransactionSettings GetTransactionSettings(ITxModeSettings txModeSettings) + { + var txSettings = txModeSettings switch + { + SerializableModeSettings => new TransactionSettings + { + SerializableReadWrite = new Ydb.Query.SerializableModeSettings() + }, + OnlineModeSettings onlineModeSettings => new TransactionSettings + { + OnlineReadOnly = new Ydb.Query.OnlineModeSettings + { + AllowInconsistentReads = onlineModeSettings.AllowInconsistentReads + } + }, + StaleModeSettings => new TransactionSettings + { + StaleReadOnly = new Ydb.Query.StaleModeSettings() + }, + SnapshotModeSettings => new TransactionSettings + { + SnapshotReadOnly = new Ydb.Query.SnapshotModeSettings() + }, + _ => throw new InvalidCastException(nameof(txModeSettings)) + }; + return txSettings; + } public async Task Query(string query, Dictionary parameters) { diff --git a/src/Ydb.Sdk/tests/Query/TestSession.cs b/src/Ydb.Sdk/tests/Query/TestSession.cs index d8f94e98..a8da29ba 100644 --- a/src/Ydb.Sdk/tests/Query/TestSession.cs +++ b/src/Ydb.Sdk/tests/Query/TestSession.cs @@ -9,7 +9,6 @@ namespace Ydb.Sdk.Tests.Query; [Trait("Category", "Integration")] public class TestSession { - private readonly ITestOutputHelper _testOutputHelper; private readonly ILoggerFactory _loggerFactory; private readonly DriverConfig _driverConfig = new( @@ -17,9 +16,8 @@ public class TestSession database: "/local" ); - public TestSession(ITestOutputHelper testOutputHelper) + public TestSession() { - _testOutputHelper = testOutputHelper; _loggerFactory = Utils.GetLoggerFactory() ?? NullLoggerFactory.Instance; _loggerFactory.CreateLogger(); } @@ -68,8 +66,8 @@ public async Task Attach() while (await sessionStateStream.Next()) { - sessionStateStream.Response.EnsureSuccess(); - _testOutputHelper.WriteLine(1.ToString()); + Assert.True(sessionStateStream.Response.Status.IsSuccess); + break; } } } \ No newline at end of file diff --git a/src/Ydb.Sdk/tests/Query/TestTransactions.cs b/src/Ydb.Sdk/tests/Query/TestTransactions.cs new file mode 100644 index 00000000..676898d1 --- /dev/null +++ b/src/Ydb.Sdk/tests/Query/TestTransactions.cs @@ -0,0 +1,25 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Xunit; +using Xunit.Abstractions; +using Ydb.Sdk.Services.Query; + +namespace Ydb.Sdk.Tests.Query; + +[Trait("Category", "Integration")] +public class TestTransactions +{ + private readonly ILoggerFactory _loggerFactory; + + private readonly DriverConfig _driverConfig = new( + endpoint: "grpc://localhost:2136", + database: "/local" + ); + + + public TestTransactions() + { + _loggerFactory = Utils.GetLoggerFactory() ?? NullLoggerFactory.Instance; + _loggerFactory.CreateLogger(); + } +} \ No newline at end of file