Skip to content

Commit

Permalink
feat: impl GetChars, GetBytes, GetChar, GetEnumerator in YdbDataReader (
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillKurdyukov authored Aug 8, 2024
1 parent 3fe3e0f commit fe1ede4
Show file tree
Hide file tree
Showing 3 changed files with 378 additions and 12 deletions.
87 changes: 75 additions & 12 deletions src/Ydb.Sdk/src/Ado/YdbDataReader.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Collections;
using System.Data.Common;
using Google.Protobuf.Collections;
using Ydb.Issue;
Expand All @@ -7,7 +6,7 @@

namespace Ydb.Sdk.Ado;

public sealed class YdbDataReader : DbDataReader
public sealed class YdbDataReader : DbDataReader, IAsyncEnumerable<YdbDataRecord>
{
private readonly IAsyncEnumerator<ExecuteQueryResponsePart> _stream;
private readonly YdbTransaction? _ydbTransaction;
Expand Down Expand Up @@ -60,19 +59,72 @@ public sbyte GetSByte(int ordinal)
return GetFieldYdbValue(ordinal).GetInt8();
}

// ReSharper disable once MemberCanBePrivate.Global
public byte[] GetBytes(int ordinal)
{
return GetFieldYdbValue(ordinal).GetString();
}

public override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
{
throw new NotImplementedException();
var bytes = GetBytes(ordinal);

CheckOffsets(dataOffset, buffer, bufferOffset, length);

if (buffer == null)
{
return 0;
}

var copyCount = Math.Min(bytes.Length - dataOffset, length);
Array.Copy(bytes, (int)dataOffset, buffer, bufferOffset, copyCount);

return copyCount;
}

public override char GetChar(int ordinal)
{
throw new NotImplementedException();
return GetString(ordinal)[0];
}

public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
{
throw new NotImplementedException();
var chars = GetString(ordinal).ToCharArray();

CheckOffsets(dataOffset, buffer, bufferOffset, length);

if (buffer == null)
{
return 0;
}

var copyCount = Math.Min(chars.Length - dataOffset, length);
Array.Copy(chars, (int)dataOffset, buffer, bufferOffset, copyCount);

return copyCount;
}

private static void CheckOffsets<T>(long dataOffset, T[]? buffer, int bufferOffset, int length)
{
if (dataOffset is < 0 or > int.MaxValue)
{
throw new IndexOutOfRangeException($"dataOffset must be between 0 and {int.MaxValue}");
}

if (buffer != null && (bufferOffset < 0 || bufferOffset >= buffer.Length))
{
throw new IndexOutOfRangeException($"bufferOffset must be between 0 and {buffer.Length}");
}

if (buffer != null && length < 0)
{
throw new IndexOutOfRangeException($"length must be between 0 and {buffer.Length}");
}

if (buffer != null && length > buffer.Length - bufferOffset)
{
throw new IndexOutOfRangeException($"bufferOffset must be between 0 and {buffer.Length - length}");
}
}

public override string GetDataTypeName(int ordinal)
Expand Down Expand Up @@ -124,7 +176,7 @@ public override float GetFloat(int ordinal)

public override Guid GetGuid(int ordinal)
{
throw new NotImplementedException();
throw new YdbException("Ydb does not supported Guid");
}

public override short GetInt16(int ordinal)
Expand Down Expand Up @@ -267,9 +319,12 @@ public override async Task<bool> ReadAsync(CancellationToken cancellationToken)

public override int Depth => 0;

public override IEnumerator GetEnumerator()
public override IEnumerator<YdbDataRecord> GetEnumerator()
{
throw new NotImplementedException();
while (Read())
{
yield return new YdbDataRecord(this);
}
}

public override async Task CloseAsync()
Expand Down Expand Up @@ -360,9 +415,8 @@ private async Task<State> NextExecPart()

if (part.Status != StatusIds.Types.StatusCode.Success)
{
CompleteTransaction();
OnFailReadStream();

ReaderState = State.Closed;
while (await _stream.MoveNextAsync())
{
_issueMessagesInStream.AddRange(_stream.Current.Issues);
Expand All @@ -389,14 +443,15 @@ private async Task<State> NextExecPart()
}
catch (Driver.TransportException e)
{
CompleteTransaction();
OnFailReadStream();

throw new YdbException(e.Status);
}
}

private void CompleteTransaction()
private void OnFailReadStream()
{
ReaderState = State.Closed;
if (_ydbTransaction != null)
{
_ydbTransaction.Completed = true;
Expand All @@ -407,4 +462,12 @@ public override async ValueTask DisposeAsync()
{
await CloseAsync();
}

public async IAsyncEnumerator<YdbDataRecord> GetAsyncEnumerator(CancellationToken cancellationToken = new())
{
while (await ReadAsync(cancellationToken))
{
yield return new YdbDataRecord(this);
}
}
}
164 changes: 164 additions & 0 deletions src/Ydb.Sdk/src/Ado/YdbDataRecord.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
using System.Data.Common;

namespace Ydb.Sdk.Ado;

public class YdbDataRecord : DbDataRecord
{
private readonly YdbDataReader _ydbDataReader;

internal YdbDataRecord(YdbDataReader ydbDataReader)
{
_ydbDataReader = ydbDataReader;
}

public override bool GetBoolean(int i)
{
return _ydbDataReader.GetBoolean(i);
}

public override byte GetByte(int i)
{
return _ydbDataReader.GetByte(i);
}

public override long GetBytes(int i, long dataIndex, byte[]? buffer, int bufferIndex, int length)
{
return _ydbDataReader.GetBytes(i, dataIndex, buffer, bufferIndex, length);
}

public override char GetChar(int i)
{
return _ydbDataReader.GetChar(i);
}

public override long GetChars(int i, long dataIndex, char[]? buffer, int bufferIndex, int length)
{
return _ydbDataReader.GetChars(i, dataIndex, buffer, bufferIndex, length);
}

public override string GetDataTypeName(int i)
{
return _ydbDataReader.GetDataTypeName(i);
}

public override DateTime GetDateTime(int i)
{
return _ydbDataReader.GetDateTime(i);
}

public override decimal GetDecimal(int i)
{
return _ydbDataReader.GetDecimal(i);
}

public override double GetDouble(int i)
{
return _ydbDataReader.GetDouble(i);
}

public override System.Type GetFieldType(int i)
{
return _ydbDataReader.GetFieldType(i);
}

public override float GetFloat(int i)
{
return _ydbDataReader.GetFloat(i);
}

public override Guid GetGuid(int i)
{
return _ydbDataReader.GetGuid(i);
}

public override short GetInt16(int i)
{
return _ydbDataReader.GetInt16(i);
}

public override int GetInt32(int i)
{
return _ydbDataReader.GetInt32(i);
}

public override long GetInt64(int i)
{
return _ydbDataReader.GetInt64(i);
}

public override string GetName(int i)
{
return _ydbDataReader.GetName(i);
}

public override int GetOrdinal(string name)
{
return _ydbDataReader.GetOrdinal(name);
}

public override string GetString(int i)
{
return _ydbDataReader.GetString(i);
}

public override object GetValue(int i)
{
return _ydbDataReader.GetValue(i);
}

public override int GetValues(object[] values)
{
return _ydbDataReader.GetValues(values);
}

public override bool IsDBNull(int i)
{
return _ydbDataReader.IsDBNull(i);
}

public override int FieldCount => _ydbDataReader.FieldCount;

public override object this[int i] => _ydbDataReader[i];

public override object this[string name] => _ydbDataReader[name];

public byte[] GetBytes(int i)
{
return _ydbDataReader.GetBytes(i);
}

public sbyte GetSByte(int i)
{
return _ydbDataReader.GetSByte(i);
}

public ulong GetUint16(int i)
{
return _ydbDataReader.GetUint16(i);
}

public ulong GetUint32(int i)
{
return _ydbDataReader.GetUint32(i);
}

public ulong GetUint64(int i)
{
return _ydbDataReader.GetUint64(i);
}

public string GetJson(int i)
{
return _ydbDataReader.GetJson(i);
}

public string GetJsonDocument(int i)
{
return _ydbDataReader.GetJsonDocument(i);
}

public TimeSpan GetInterval(int i)
{
return _ydbDataReader.GetInterval(i);
}
}
Loading

0 comments on commit fe1ede4

Please sign in to comment.