Skip to content

Commit

Permalink
QueryService: not use begin/commit/rollback transaction methods from …
Browse files Browse the repository at this point in the history
…TableService
  • Loading branch information
XmasApple committed Nov 15, 2023
1 parent ca2b754 commit baeb93c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 183 deletions.
180 changes: 7 additions & 173 deletions src/Ydb.Sdk/src/Services/Query/QueryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ internal BeginTransactionResponse(Status status) : base(status)
private BeginTransactionResponse(Ydb.Query.BeginTransactionResponse proto) : base(
Status.FromProto(proto.Status, proto.Issues))
{
Tx.TxId = proto.TxMeta.Id;
var txId = proto.TxMeta.Id;
var tx = new Tx(new TransactionControl { TxId = txId });
Tx = tx;
}

internal static BeginTransactionResponse FromProto(Ydb.Query.BeginTransactionResponse proto)
Expand Down Expand Up @@ -291,66 +293,6 @@ internal static RollbackTransactionResponse FromProto(Ydb.Query.RollbackTransact
}
}

// TODO remove crutch calls
// ===========================================================================
// WARNING: temporary structs just for testing should be removed
// ===========================================================================
public class BeginCrutchTxSettings : OperationRequestSettings
{
}

public class CommitCrutchTxSettings : OperationRequestSettings
{
}

public class RollbackCrutchTxSettings : OperationRequestSettings
{
}

internal class BeginCrutchTxResponse : ResponseWithResultBase<BeginCrutchTxResponse.ResultData>
{
internal BeginCrutchTxResponse(Status status, ResultData? result = null) : base(status, result)
{
}

public class ResultData
{
public Tx? Tx { get; }

public ResultData(Tx? tx)
{
Tx = tx;
}

internal static ResultData FromProto(Ydb.Table.BeginTransactionResult resultProto)
{
var txId = resultProto.TxMeta.Id;
// var tx = new Tx { TxId = resultProto.TxMeta.Id };
var tx = new Tx(new TransactionControl { TxId = txId });
var result = new ResultData(tx);

return result;
}
}
}

internal class CommitCrutchTxResponse : ResponseBase
{
internal CommitCrutchTxResponse(Status status) : base(status)
{
}
}

internal class RollbackCrutchTxResponse : ResponseBase
{
internal RollbackCrutchTxResponse(Status status) : base(status)
{
}
}
// ===========================================================================
// END WARNING: temporary structs just for testing should be removed
// ===========================================================================

public class QueryClientConfig
{
public SessionPoolConfig SessionPoolConfig { get; }
Expand Down Expand Up @@ -638,12 +580,10 @@ public async Task<QueryResponse> Exec(string queryString,

private async Task<QueryResponseWithResult<T>> Rollback<T>(Session session, Tx tx, Status status)
{
// TODO remove crutch calls
_logger.LogTrace($"Transaction {tx.TxId} not committed, try to rollback");
try
{
// var rollbackResponse = await RollbackTransaction(session.Id, tx);
var rollbackResponse = await RollbackCrutchTx(session.Id, tx);
var rollbackResponse = await RollbackTransaction(session.Id, tx);
rollbackResponse.EnsureSuccess();
}
catch (StatusUnsuccessfulException e)
Expand All @@ -659,19 +599,12 @@ public async Task<QueryResponseWithResult<T>> DoTx<T>(Func<Tx, Task<T>> func,
ITxModeSettings? txModeSettings = null,
RetrySettings? retrySettings = null)
{
// TODO remove crutch calls
var response = await ExecOnSession(
async session =>
{
// var beginTransactionResponse = await BeginTransaction(session.Id, Tx.Begin(txModeSettings));
// beginTransactionResponse.EnsureSuccess();
// var tx = beginTransactionResponse.Tx;
// tx.QueryClient = this;
// tx.SessionId = session.Id;
var beginTransactionResponse = await BeginCrutchTx(session.Id, Tx.Begin(txModeSettings));
var beginTransactionResponse = await BeginTransaction(session.Id, Tx.Begin(txModeSettings));
beginTransactionResponse.EnsureSuccess();
var tx = beginTransactionResponse.Result.Tx!;
var tx = beginTransactionResponse.Tx;
tx.QueryClient = this;
tx.SessionId = session.Id;
Expand All @@ -694,8 +627,7 @@ public async Task<QueryResponseWithResult<T>> DoTx<T>(Func<Tx, Task<T>> func,
return rollbackResponse;
}
// var commitResponse = await CommitTransaction(session.Id, tx);
var commitResponse = await CommitCrutchTx(session.Id, tx);
var commitResponse = await CommitTransaction(session.Id, tx);
if (!commitResponse.Status.IsSuccess)
{
return await Rollback<T>(session, tx, commitResponse.Status);
Expand Down Expand Up @@ -755,104 +687,6 @@ private void Dispose(bool disposing)

_disposed = true;
}


// ===========================================================================
// WARNING: temporary methods just for testing should be removed
// ===========================================================================
internal async Task<BeginCrutchTxResponse> BeginCrutchTx(string sessionId,
Tx tx, BeginCrutchTxSettings? settings = null)
{
settings ??= new BeginCrutchTxSettings();
var request = new Ydb.Table.BeginTransactionRequest
{
SessionId = sessionId,
OperationParams = MakeOperationParams(settings),
TxSettings = new Ydb.Table.TransactionSettings
{ SerializableReadWrite = new Ydb.Table.SerializableModeSettings() }
};
try
{
var response = await Driver.UnaryCall(
method: Ydb.Table.V1.TableService.BeginTransactionMethod,
request: request,
settings: settings
);
var status = UnpackOperation(response.Data.Operation, out Ydb.Table.BeginTransactionResult? resultProto);
BeginCrutchTxResponse.ResultData? result = null;

if (status.IsSuccess && resultProto is not null)
{
result = BeginCrutchTxResponse.ResultData.FromProto(resultProto);
}

return new BeginCrutchTxResponse(status, result);
}
catch (Driver.TransportException e)
{
return new BeginCrutchTxResponse(e.Status);
}
}

internal async Task<CommitCrutchTxResponse> CommitCrutchTx(string sessionId, Tx tx,
CommitCrutchTxSettings? settings = null)
{
settings ??= new CommitCrutchTxSettings();
var request = new Ydb.Table.CommitTransactionRequest
{
SessionId = sessionId,
OperationParams = MakeOperationParams(settings),
TxId = tx.TxId
};
try
{
var response = await Driver.UnaryCall(
method: Ydb.Table.V1.TableService.CommitTransactionMethod,
request: request,
settings: settings
);
var status = UnpackOperation(response.Data.Operation);

return new CommitCrutchTxResponse(status);
}
catch (Driver.TransportException e)
{
return new CommitCrutchTxResponse(e.Status);
}
}

internal async Task<RollbackCrutchTxResponse> RollbackCrutchTx(string sessionId, Tx tx,
RollbackCrutchTxSettings? settings = null)
{
settings ??= new RollbackCrutchTxSettings();
var request = new Ydb.Table.RollbackTransactionRequest
{
SessionId = sessionId,
OperationParams = MakeOperationParams(settings),
TxId = tx.TxId
};

try
{
var response = await Driver.UnaryCall(
method: Ydb.Table.V1.TableService.RollbackTransactionMethod,
request: request,
settings: settings
);

var status = UnpackOperation(response.Data.Operation);

return new RollbackCrutchTxResponse(status);
}
catch (Driver.TransportException e)
{
return new RollbackCrutchTxResponse(e.Status);
}
}

// ===========================================================================
// END WARNING: temporary methods just for testing should be removed
// ===========================================================================
}

public class QueryResponse : ResponseBase
Expand Down
11 changes: 1 addition & 10 deletions src/Ydb.Sdk/src/Services/Query/Tx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,7 @@ public class Tx
internal QueryClient? QueryClient { private get; set; }
internal string? SessionId { private get; set; }

public string? TxId
{
get => _proto.TxId;
set
{
_proto.TxId = value;
if (!string.IsNullOrEmpty(value))
_proto.BeginTx = null;
}
}
public string? TxId => _proto.TxId;

private readonly TransactionControl _proto;

Expand Down

0 comments on commit baeb93c

Please sign in to comment.