Skip to content

Commit

Permalink
Transaction methods
Browse files Browse the repository at this point in the history
  • Loading branch information
XmasApple committed Oct 23, 2023
1 parent 4f14504 commit 8c9d588
Show file tree
Hide file tree
Showing 5 changed files with 382 additions and 165 deletions.
317 changes: 263 additions & 54 deletions src/Ydb.Sdk/src/Services/Query/QueryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,64 +12,63 @@ public enum TxMode
StaleRO
}

public class RetrySettings
public enum ExecMode
{
public bool IsIdempotent;
}

// public class ExecuteQuerySettings : RequestSettings
// {
// }

public class ExecuteQueryPart : ResponseWithResultBase<ExecuteQueryPart.ResultData>
{
protected ExecuteQueryPart(Status status, ResultData? result = null) : base(status, result)
{
}
Unspecified = 0,
Parse = 10,
Validate = 20,
Explain = 30,

public class ResultData
{
public Value.ResultSet? ResultSet => throw new NotImplementedException();
}
// reserved 40; // EXEC_MODE_PREPARE
Execute = 50
}

public class ExecuteQueryStream : StreamResponse<ExecuteQueryResponse, ExecuteQueryPart>,
IAsyncEnumerable<ExecuteQueryPart>, IAsyncEnumerator<ExecuteQueryPart>
public enum Syntax
{
public object ExecStats { get; }
Unspecified = 0,

internal ExecuteQueryStream(Driver.StreamIterator<ExecuteQueryResponse> iterator) : base(iterator)
{
throw new NotImplementedException();
}

protected override ExecuteQueryPart MakeResponse(ExecuteQueryResponse protoResponse)
{
throw new NotImplementedException();
}
/// <summary>
/// YQL
/// </summary>
YqlV1 = 1,

protected override ExecuteQueryPart MakeResponse(Status status)
{
throw new NotImplementedException();
}

public IAsyncEnumerator<ExecuteQueryPart> GetAsyncEnumerator(
CancellationToken cancellationToken = new CancellationToken())
{
throw new NotImplementedException();
}
/// <summary>
/// PostgresQL
/// </summary>
Pg = 2
}

public ValueTask DisposeAsync()
{
throw new NotImplementedException();
}
public enum StatsMode
{
Unspecified = 0,

/// <summary>
/// Stats collection is disabled
/// </summary>
None = 10,

/// <summary>
/// Aggregated stats of reads, updates and deletes per table
/// </summary>
Basic = 20,

/// <summary>
/// Add execution stats and plan on top of STATS_MODE_BASIC
/// </summary>
Full = 30,

/// <summary>
/// Detailed execution stats including stats for individual tasks and channels
/// </summary>
Profile = 40
}

public ValueTask<bool> MoveNextAsync()
{
throw new NotImplementedException();
}
public class ExecuteQuerySettings : RequestSettings
{
public ExecMode ExecMode { get; set; }
public Syntax Syntax { get; set; }

public ExecuteQueryPart Current { get; }
public StatsMode StatsMode { get; set; }
}

public class CreateSessionSettings : RequestSettings
Expand All @@ -84,6 +83,18 @@ public class AttachSessionSettings : RequestSettings
{
}

public class BeginTransactionSettings : RequestSettings
{
}

public class CommitTransactionSettings : RequestSettings
{
}

public class RollbackTransactionSettings : RequestSettings
{
}

public class CreateSessionResponse : ResponseBase
{
public Session? Session { get; }
Expand All @@ -98,7 +109,7 @@ private CreateSessionResponse(Ydb.Query.CreateSessionResponse proto, Session? se
Session = session;
}

public static CreateSessionResponse FromProto(Ydb.Query.CreateSessionResponse proto, Driver driver,
internal static CreateSessionResponse FromProto(Ydb.Query.CreateSessionResponse proto, Driver driver,
string endpoint)
{
var session = new Session(
Expand All @@ -123,7 +134,7 @@ private DeleteSessionResponse(Ydb.Query.DeleteSessionResponse proto)
{
}

public static DeleteSessionResponse FromProto(Ydb.Query.DeleteSessionResponse proto)
internal static DeleteSessionResponse FromProto(Ydb.Query.DeleteSessionResponse proto)
{
return new DeleteSessionResponse(proto);
}
Expand All @@ -140,7 +151,7 @@ private SessionState(Ydb.Query.SessionState proto)
{
}

public static SessionState FromProto(Ydb.Query.SessionState proto)
internal static SessionState FromProto(Ydb.Query.SessionState proto)
{
return new SessionState(proto);
}
Expand All @@ -163,6 +174,115 @@ protected override SessionState MakeResponse(Status status)
}
}

public class ExecuteQueryResponsePart : ResponseBase
{
internal ExecuteQueryResponsePart(Status status) : base(status)
{
}

private ExecuteQueryResponsePart(Ydb.Query.ExecuteQueryResponsePart proto)
: base(Status.FromProto(proto.Status, proto.Issues))
{
}

internal static ExecuteQueryResponsePart FromProto(Ydb.Query.ExecuteQueryResponsePart proto)
{
return new ExecuteQueryResponsePart(proto);
}
}

public class ExecuteQueryStream : StreamResponse<Ydb.Query.ExecuteQueryResponsePart, ExecuteQueryResponsePart>
, IAsyncEnumerable<ExecuteQueryResponsePart>, IAsyncEnumerator<ExecuteQueryResponsePart>
{
public object ExecStats { get; }

internal ExecuteQueryStream(Driver.StreamIterator<Ydb.Query.ExecuteQueryResponsePart> iterator) : base(iterator)
{
}

protected override ExecuteQueryResponsePart MakeResponse(Ydb.Query.ExecuteQueryResponsePart protoResponse)
{
return ExecuteQueryResponsePart.FromProto(protoResponse);
}

protected override ExecuteQueryResponsePart MakeResponse(Status status)
{
return new ExecuteQueryResponsePart(status);
}

public IAsyncEnumerator<ExecuteQueryResponsePart> GetAsyncEnumerator(
CancellationToken cancellationToken = new CancellationToken())
{
throw new NotImplementedException();
}

public ValueTask DisposeAsync()
{
throw new NotImplementedException();
}

public ValueTask<bool> MoveNextAsync()
{
throw new NotImplementedException();
}

public ExecuteQueryResponsePart Current { get; }
}

public class BeginTransactionResponse : ResponseBase
{
internal BeginTransactionResponse(Status status) : base(status)
{
}

public Tx Tx { get; } = new();

private BeginTransactionResponse(Ydb.Query.BeginTransactionResponse proto) : base(
Status.FromProto(proto.Status, proto.Issues))
{
Tx.TxId = proto.TxMeta.Id;
}

internal static BeginTransactionResponse FromProto(Ydb.Query.BeginTransactionResponse proto)
{
return new BeginTransactionResponse(proto);
}
}

public class CommitTransactionResponse : ResponseBase
{
internal CommitTransactionResponse(Status status) : base(status)
{
}

private CommitTransactionResponse(Ydb.Query.CommitTransactionResponse proto) : base(
Status.FromProto(proto.Status, proto.Issues))
{
}

internal static CommitTransactionResponse FromProto(Ydb.Query.CommitTransactionResponse proto)
{
return new CommitTransactionResponse(proto);
}
}

public class RollbackTransactionResponse : ResponseBase
{
internal RollbackTransactionResponse(Status status) : base(status)
{
}

private RollbackTransactionResponse(Ydb.Query.RollbackTransactionResponse proto) : base(
Status.FromProto(proto.Status, proto.Issues))
{
}

internal static RollbackTransactionResponse FromProto(Ydb.Query.RollbackTransactionResponse proto)
{
return new RollbackTransactionResponse(proto);
}
}

public class QueryClient :
ClientBase,
IDisposable
Expand Down Expand Up @@ -230,18 +350,107 @@ public SessionStateStream AttachSession(string sessionId, AttachSessionSettings?
return new SessionStateStream(streamIterator);
}

internal async Task BeginTransaction()
public async Task<BeginTransactionResponse> BeginTransaction(
string sessionId,
Tx tx,
BeginTransactionSettings? settings = null)
{
settings ??= new BeginTransactionSettings();

var request = new BeginTransactionRequest { SessionId = sessionId, TxSettings = tx.ToProto().BeginTx };
try
{
var response = await Driver.UnaryCall(
QueryService.BeginTransactionMethod,
request: request,
settings: settings
);
return BeginTransactionResponse.FromProto(response.Data);
}
catch (Driver.TransportException e)
{
return new BeginTransactionResponse(e.Status);
}
}

public async Task<CommitTransactionResponse> CommitTransaction(
string sessionId,
Tx tx,
CommitTransactionSettings? settings = null)
{
settings ??= new CommitTransactionSettings();

var request = new CommitTransactionRequest { SessionId = sessionId, TxId = tx.TxId };

try
{
var response = await Driver.UnaryCall(
QueryService.CommitTransactionMethod,
request: request,
settings: settings
);
return CommitTransactionResponse.FromProto(response.Data);
}
catch (Driver.TransportException e)
{
return new CommitTransactionResponse(e.Status);
}
}

internal async Task CommitTransaction()
public async Task<RollbackTransactionResponse> RollbackTransaction(
string sessionId,
Tx tx,
RollbackTransactionSettings? settings = null)
{
settings ??= new RollbackTransactionSettings();

var request = new RollbackTransactionRequest { SessionId = sessionId, TxId = tx.TxId };
try
{
var response = await Driver.UnaryCall(
QueryService.RollbackTransactionMethod,
request: request,
settings: settings
);
return RollbackTransactionResponse.FromProto(response.Data);
}
catch (Driver.TransportException e)
{
return new RollbackTransactionResponse(e.Status);
}
}

internal async Task RollbackTransaction()

public ExecuteQueryStream ExecuteQuery(
string sessionId,
string queryString,
Tx tx,
IReadOnlyDictionary<string, YdbValue>? parameters,
ExecuteQuerySettings? settings = null)
{
settings ??= new ExecuteQuerySettings();
parameters ??= new Dictionary<string, YdbValue>();

var request = new ExecuteQueryRequest
{
SessionId = sessionId,
ExecMode = (Ydb.Query.ExecMode)settings.ExecMode,
TxControl = tx.ToProto(),
QueryContent = new QueryContent { Syntax = (Ydb.Query.Syntax)settings.Syntax, Text = queryString },
StatsMode = (Ydb.Query.StatsMode)settings.StatsMode
};

request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));

var streamIterator = Driver.StreamCall(
method: QueryService.ExecuteQueryMethod,
request: request,
settings: settings);

return new ExecuteQueryStream(streamIterator);
}


public async Task<T> Query<T>(string query, Dictionary<string, YdbValue> parameters,
Func<ExecuteQueryStream, T> func, TxMode txMode = TxMode.SerializableRW)
{
Expand Down
Loading

0 comments on commit 8c9d588

Please sign in to comment.