Skip to content

Commit

Permalink
feat: fix NPE on Describe method & invoke onStatus on !Success (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillKurdyukov committed Aug 28, 2024
1 parent 0c6702c commit 97a05c5
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 21 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
- Check status of the transport or server for an invalidated session
- Fixed NPE in DescribeTable

## v0.6.0
- ADO.NET over query-service
- Add EndpointPool
Expand Down
5 changes: 3 additions & 2 deletions src/Ydb.Sdk/src/Ado/YdbCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
? new ExecuteQuerySettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
: ExecuteQuerySettings.DefaultInstance;

var ydbDataReader = await YdbDataReader.CreateYdbDataReader(YdbConnection.Session.ExecuteQuery(_commandText,
DbParameterCollection.YdbParameters, execSettings, Transaction?.TransactionControl), Transaction);
var ydbDataReader = await YdbDataReader.CreateYdbDataReader(YdbConnection.Session.ExecuteQuery(
_commandText, DbParameterCollection.YdbParameters, execSettings, Transaction?.TransactionControl),
YdbConnection.Session.OnStatus, Transaction);

YdbConnection.LastReader = ydbDataReader;
YdbConnection.LastCommand = CommandText;
Expand Down
18 changes: 15 additions & 3 deletions src/Ydb.Sdk/src/Ado/YdbDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public sealed class YdbDataReader : DbDataReader, IAsyncEnumerable<YdbDataRecord
private readonly IAsyncEnumerator<ExecuteQueryResponsePart> _stream;
private readonly YdbTransaction? _ydbTransaction;
private readonly RepeatedField<IssueMessage> _issueMessagesInStream = new();
private readonly Action<Status> _onStatus;

private int _currentRowIndex = -1;
private long _resultSetIndex = -1;
Expand Down Expand Up @@ -48,17 +49,22 @@ private interface IMetadata
private Value.ResultSet.Row CurrentRow => CurrentResultSet.Rows[_currentRowIndex];
private int RowsCount => ReaderMetadata.RowsCount;

private YdbDataReader(IAsyncEnumerator<ExecuteQueryResponsePart> resultSetStream, YdbTransaction? ydbTransaction)
private YdbDataReader(
IAsyncEnumerator<ExecuteQueryResponsePart> resultSetStream,
Action<Status> onStatus,
YdbTransaction? ydbTransaction)
{
_stream = resultSetStream;
_onStatus = onStatus;
_ydbTransaction = ydbTransaction;
}

internal static async Task<YdbDataReader> CreateYdbDataReader(
IAsyncEnumerator<ExecuteQueryResponsePart> resultSetStream,
Action<Status> onStatus,
YdbTransaction? ydbTransaction = null)
{
var ydbDataReader = new YdbDataReader(resultSetStream, ydbTransaction);
var ydbDataReader = new YdbDataReader(resultSetStream, onStatus, ydbTransaction);
await ydbDataReader.Init();

return ydbDataReader;
Expand Down Expand Up @@ -474,7 +480,11 @@ private async Task<State> NextExecPart()
_issueMessagesInStream.AddRange(_stream.Current.Issues);
}

throw new YdbException(Status.FromProto(part.Status, _issueMessagesInStream));
var status = Status.FromProto(part.Status, _issueMessagesInStream);

_onStatus(status);

throw new YdbException(status);
}

_currentResultSet = part.ResultSet?.FromProto();
Expand All @@ -498,6 +508,8 @@ private async Task<State> NextExecPart()
{
OnFailReadStream();

_onStatus(e.Status);

throw new YdbException(e.Status);
}
}
Expand Down
20 changes: 14 additions & 6 deletions src/Ydb.Sdk/src/Pool/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,26 @@ internal async Task<T> ExecOnSession<T>(Func<TSession, Task<T>> onSession, Retry
}
catch (Exception e)
{
if (attempt == retrySettings.MaxAttempts - 1)
{
throw;
}

var statusErr = e switch
{
Driver.TransportException transportException => transportException.Status,
StatusUnsuccessfulException unsuccessfulException => unsuccessfulException.Status,
_ => null
};

if (attempt == retrySettings.MaxAttempts - 1)
{
if (statusErr != null)
{
session?.OnStatus(statusErr);
}

throw;
}

if (statusErr != null)
{
session?.OnStatus(statusErr);
var retryRule = retrySettings.GetRetryRule(statusErr.StatusCode);

if (retryRule.Policy == RetryPolicy.None ||
Expand All @@ -103,8 +109,10 @@ internal async Task<T> ExecOnSession<T>(Func<TSession, Task<T>> onSession, Retry
}

Logger.LogTrace(
"Retry: attempt {attempt}, Session ${session?.SessionId}, idempotent error {status} retrying",
"Retry: attempt {attempt}, Session ${session.SessionId}, idempotent error {status} retrying",
attempt, session?.SessionId, statusErr);


await Task.Delay(retryRule.BackoffSettings.CalcBackoff(attempt));
}
else
Expand Down
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/Services/Table/DescribeTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class TableStats
public ulong StoreSize { get; }
public ulong Partitions { get; }
public DateTime CreationTime { get; }
public DateTime ModificationTime { get; }
public DateTime? ModificationTime { get; }

internal TableStats(Ydb.Table.TableStats? proto)
{
Expand All @@ -79,7 +79,7 @@ internal TableStats(Ydb.Table.TableStats? proto)
StoreSize = proto.StoreSize;
Partitions = proto.Partitions;
CreationTime = proto.CreationTime.ToDateTime();
ModificationTime = proto.ModificationTime.ToDateTime();
ModificationTime = proto.ModificationTime?.ToDateTime();
}
}

Expand Down
27 changes: 21 additions & 6 deletions src/Ydb.Sdk/tests/Ado/YdbDataReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ public class YdbDataReaderTests
[Fact]
public async Task BasedIteration_WhenNotCallMethodRead_ThrowException()
{
var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess());
var statuses = new List<Status>();
var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(), statuses.Add);

// Read first metadata
Assert.True(reader.HasRows);
Expand All @@ -36,19 +37,25 @@ public async Task BasedIteration_WhenNotCallMethodRead_ThrowException()

Assert.Equal("The reader is closed",
Assert.Throws<InvalidOperationException>(() => reader.GetValue(0)).Message);
Assert.Empty(statuses);
}

[Fact]
public void CreateYdbDataReader_WhenAbortedStatus_ThrowException()
{
var statuses = new List<Status>();
Assert.Equal("Status: Aborted", Assert.Throws<YdbException>(
() => YdbDataReader.CreateYdbDataReader(SingleEnumeratorFailed).GetAwaiter().GetResult()).Message);
() => YdbDataReader.CreateYdbDataReader(SingleEnumeratorFailed, statuses.Add).GetAwaiter().GetResult())
.Message);
Assert.Single(statuses);
Assert.Equal(StatusCode.Aborted, statuses[0].StatusCode);
}

[Fact]
public async Task NextResult_WhenNextResultSkipResultSet_ReturnNextResultSet()
{
var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(2));
var statuses = new List<Status>();
var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(2), statuses.Add);

Assert.True(reader.NextResult());
Assert.True(reader.NextResult());
Expand All @@ -57,12 +64,14 @@ public async Task NextResult_WhenNextResultSkipResultSet_ReturnNextResultSet()

Assert.False(reader.Read());
Assert.False(reader.NextResult());
Assert.Empty(statuses);
}

[Fact]
public async Task NextResult_WhenFirstRead_ReturnResultSet()
{
var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(2));
var statuses = new List<Status>();
var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(2), statuses.Add);

Assert.True(reader.Read());
Assert.True((bool)reader.GetValue(0));
Expand All @@ -74,12 +83,14 @@ public async Task NextResult_WhenFirstRead_ReturnResultSet()

Assert.False(reader.NextResult());
Assert.False(reader.Read());
Assert.Empty(statuses);
}

[Fact]
public async Task NextResult_WhenLongResultSet_ReturnResultSet()
{
var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(2, true));
var statuses = new List<Status>();
var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(2, true), statuses.Add);

Assert.True(reader.Read());
Assert.True((bool)reader.GetValue(0));
Expand All @@ -93,11 +104,13 @@ public async Task NextResult_WhenLongResultSet_ReturnResultSet()

Assert.False(reader.NextResult());
Assert.False(reader.Read());
Assert.Empty(statuses);
}

[Fact]
public async Task Read_WhenReadAsyncThrowException_AggregateIssuesBeforeErrorAndAfter()
{
var statuses = new List<Status>();
var result = ResultSet.Parser.ParseJson(
"{ \"columns\": [ { \"name\": \"column0\", " +
"\"type\": { \"typeId\": \"BOOL\" } } ], " +
Expand All @@ -116,7 +129,7 @@ public async Task Read_WhenReadAsyncThrowException_AggregateIssuesBeforeErrorAnd
nextFailPart.Issues.Add(new IssueMessage { Message = "Some message 3" });

var reader = await YdbDataReader.CreateYdbDataReader(new MockAsyncEnumerator<ExecuteQueryResponsePart>(
new List<ExecuteQueryResponsePart> { successPart, failPart, nextFailPart }));
new List<ExecuteQueryResponsePart> { successPart, failPart, nextFailPart }), statuses.Add);

Assert.True(reader.Read());
Assert.Equal(@"Status: Aborted, Issues:
Expand All @@ -125,6 +138,8 @@ public async Task Read_WhenReadAsyncThrowException_AggregateIssuesBeforeErrorAnd
[0] Fatal: Some message 2
[0] Fatal: Some message 3
", Assert.Throws<YdbException>(() => reader.Read()).Message);
Assert.Single(statuses);
Assert.Equal(StatusCode.Aborted, statuses[0].StatusCode);
}

private static MockAsyncEnumerator<ExecuteQueryResponsePart> EnumeratorSuccess(int size = 1,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
using Xunit;
using Ydb.Sdk.Services.Table;
using Ydb.Sdk.Tests.Fixture;

namespace Ydb.Sdk.Tests.Table;
namespace Ydb.Sdk.Tests;

[Trait("Category", "Integration")]
public class DescribeTableTests : IClassFixture<TableClientFixture>
Expand Down Expand Up @@ -29,7 +30,10 @@ public async Task CreateAndDescribe()

await Utils.CreateSimpleTable(_tableClientFixture.TableClient, tablePath, columnName);

var describeResponse = await _tableClientFixture.TableClient.DescribeTable(tablePath);
var describeResponse = await _tableClientFixture.TableClient.DescribeTable(tablePath, new DescribeTableSettings
{
OperationTimeout = TimeSpan.FromSeconds(5)
}.WithTableStats());
describeResponse.Status.EnsureSuccess();
Assert.True(describeResponse.Result.PrimaryKey.SequenceEqual(new[] { columnName }));

Expand Down

0 comments on commit 97a05c5

Please sign in to comment.