Skip to content

Commit

Permalink
Crutch DoTx with TableService methods for transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
XmasApple committed Nov 2, 2023
1 parent 4c43f81 commit f2e47f7
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 30 deletions.
198 changes: 170 additions & 28 deletions src/Ydb.Sdk/src/Services/Query/QueryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,31 +99,11 @@ public class RollbackTransactionSettings : RequestSettings

public class CreateSessionResponse : ResponseWithResultBase<CreateSessionResponse.ResultData>
{
// public Session? Session { get; }

// private CreateSessionResponse(Ydb.Query.CreateSessionResponse proto, Session? session = null)
// : base(Status.FromProto(proto.Status, proto.Issues))
// {
// Session = session;
// }
internal CreateSessionResponse(Status status, ResultData? result = null)
: base(status, result)
{
}

// internal static CreateSessionResponse FromProto(Ydb.Query.CreateSessionResponse proto, Driver driver,
// string endpoint)
// {
// var session = new Session(
// driver: driver,
// sessionPool: null,
// id: proto.SessionId,
// nodeId: proto.NodeId,
// endpoint: endpoint
// );
// return new CreateSessionResponse(proto, session);
// }

public class ResultData
{
private ResultData(Session session)
Expand Down Expand Up @@ -201,10 +181,6 @@ protected override SessionState MakeResponse(Status status)
}
}

public class QueryStats
{
}

public class ExecuteQueryResponsePart : ResponseBase
{
public long? ResultSetIndex;
Expand Down Expand Up @@ -322,6 +298,65 @@ internal static RollbackTransactionResponse FromProto(Ydb.Query.RollbackTransact
}
}

// ===========================================================================
// WARNING: temporary structs just for testing should be removed
// ===========================================================================
public class BeginCrutchTxSettings : OperationRequestSettings
{
}

public class CommitCrutchTxSettings : OperationRequestSettings
{
}

public class RollbackCrutchTxSettings : OperationRequestSettings
{
}

internal class BeginCrutchTxResponse : ResponseWithResultBase<BeginCrutchTxResponse.ResultData>
{
internal BeginCrutchTxResponse(Status status, ResultData? result = null) : base(status, result)
{
}

public class ResultData
{
public Tx? Tx { get; }

public ResultData(Tx? tx)
{
Tx = tx;
}

internal static ResultData FromProto(Ydb.Table.BeginTransactionResult resultProto)
{
var txId = resultProto.TxMeta.Id;
// var tx = new Tx { TxId = resultProto.TxMeta.Id };
var tx = new Tx(new TransactionControl { TxId = txId });
var result = new ResultData(tx);

return result;
}
}
}

internal class CommitCrutchTxResponse : ResponseBase
{
internal CommitCrutchTxResponse(Status status) : base(status)
{
}
}

internal class RollbackCrutchTxResponse : ResponseBase
{
internal RollbackCrutchTxResponse(Status status) : base(status)
{
}
}
// ===========================================================================
// END WARNING: temporary structs just for testing should be removed
// ===========================================================================

public class QueryClientConfig
{
public SessionPoolConfig SessionPoolConfig { get; }
Expand Down Expand Up @@ -601,10 +636,12 @@ public async Task<QueryResponse> Query(string queryString, Func<ExecuteQueryStre

private async Task<QueryResponseWithResult<T>> Rollback<T>(Session session, Tx tx, Status status)
{
// TODO remove crutch calls
_logger.LogTrace($"Transaction {tx.TxId} not committed, try to rollback");
try
{
var rollbackResponse = await RollbackTransaction(session.Id, tx);
// var rollbackResponse = await RollbackTransaction(session.Id, tx);
var rollbackResponse = await RollbackCrutchTx(session.Id, tx);
rollbackResponse.EnsureSuccess();
}
catch (StatusUnsuccessfulException e)
Expand All @@ -623,9 +660,15 @@ public async Task<QueryResponseWithResult<T>> DoTx<T>(Func<Tx, Task<T>> func,
var response = await ExecOnSession(
async session =>
{
var beginTransactionResponse = await BeginTransaction(session.Id, Tx.Begin(txModeSettings));
// var beginTransactionResponse = await BeginTransaction(session.Id, Tx.Begin(txModeSettings));
// beginTransactionResponse.EnsureSuccess();
// var tx = beginTransactionResponse.Tx;
// tx.QueryClient = this;
// tx.SessionId = session.Id;
var beginTransactionResponse = await BeginCrutchTx(session.Id, Tx.Begin(txModeSettings));
beginTransactionResponse.EnsureSuccess();
var tx = beginTransactionResponse.Tx;
var tx = beginTransactionResponse.Result.Tx!;
tx.QueryClient = this;
tx.SessionId = session.Id;
Expand All @@ -644,7 +687,8 @@ public async Task<QueryResponseWithResult<T>> DoTx<T>(Func<Tx, Task<T>> func,
new Status(StatusCode.InternalError, $"Failed to execute lambda on tx {tx.TxId}: {e.Message}"));
}
var commitResponse = await CommitTransaction(session.Id, tx);
// var commitResponse = await CommitTransaction(session.Id, tx);
var commitResponse = await CommitCrutchTx(session.Id, tx);
if (!commitResponse.Status.IsSuccess)
{
return await Rollback<T>(session, tx, commitResponse.Status);
Expand Down Expand Up @@ -704,6 +748,104 @@ private void Dispose(bool disposing)

_disposed = true;
}


// ===========================================================================
// WARNING: temporary methods just for testing should be removed
// ===========================================================================
internal async Task<BeginCrutchTxResponse> BeginCrutchTx(string sessionId,
Tx tx, BeginCrutchTxSettings? settings = null)
{
settings ??= new BeginCrutchTxSettings();
var request = new Ydb.Table.BeginTransactionRequest
{
SessionId = sessionId,
OperationParams = MakeOperationParams(settings),
TxSettings = new Ydb.Table.TransactionSettings
{ SerializableReadWrite = new Ydb.Table.SerializableModeSettings() }
};
try
{
var response = await Driver.UnaryCall(
method: Ydb.Table.V1.TableService.BeginTransactionMethod,
request: request,
settings: settings
);
var status = UnpackOperation(response.Data.Operation, out Ydb.Table.BeginTransactionResult? resultProto);
BeginCrutchTxResponse.ResultData? result = null;

if (status.IsSuccess && resultProto is not null)
{
result = BeginCrutchTxResponse.ResultData.FromProto(resultProto);
}

return new BeginCrutchTxResponse(status, result);
}
catch (Driver.TransportException e)
{
return new BeginCrutchTxResponse(e.Status);
}
}

internal async Task<CommitCrutchTxResponse> CommitCrutchTx(string sessionId, Tx tx,
CommitCrutchTxSettings? settings = null)
{
settings ??= new CommitCrutchTxSettings();
var request = new Ydb.Table.CommitTransactionRequest
{
SessionId = sessionId,
OperationParams = MakeOperationParams(settings),
TxId = tx.TxId
};
try
{
var response = await Driver.UnaryCall(
method: Ydb.Table.V1.TableService.CommitTransactionMethod,
request: request,
settings: settings
);
var status = UnpackOperation(response.Data.Operation);

return new CommitCrutchTxResponse(status);
}
catch (Driver.TransportException e)
{
return new CommitCrutchTxResponse(e.Status);
}
}

internal async Task<RollbackCrutchTxResponse> RollbackCrutchTx(string sessionId, Tx tx,
RollbackCrutchTxSettings? settings = null)
{
settings ??= new RollbackCrutchTxSettings();
var request = new Ydb.Table.RollbackTransactionRequest
{
SessionId = sessionId,
OperationParams = MakeOperationParams(settings),
TxId = tx.TxId
};

try
{
var response = await Driver.UnaryCall(
method: Ydb.Table.V1.TableService.RollbackTransactionMethod,
request: request,
settings: settings
);

var status = UnpackOperation(response.Data.Operation);

return new RollbackCrutchTxResponse(status);
}
catch (Driver.TransportException e)
{
return new RollbackCrutchTxResponse(e.Status);
}
}

// ===========================================================================
// END WARNING: temporary methods just for testing should be removed
// ===========================================================================
}

public class QueryResponse : ResponseBase
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Services/Query/Tx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Tx()
_proto = new TransactionControl();
}

private Tx(TransactionControl proto)
internal Tx(TransactionControl proto)
{
_proto = proto;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/tests/Query/TestExecuteQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public async Task Tx()
var resultSet = part.ResultSet;
if (resultSet is not null)
{
titles.AddRange(resultSet.Rows.Select(row => row["title"].GetUtf8()));
titles.AddRange(resultSet.Rows.Select(row => row["title"].GetOptionalUtf8()!));
}
}
Expand Down

0 comments on commit f2e47f7

Please sign in to comment.