Skip to content

Commit

Permalink
feat: added first integration tests.yml
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillKurdyukov committed Nov 15, 2024
1 parent 253b08c commit b24c825
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 84 deletions.
42 changes: 40 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ jobs:
run: |
cd src
dotnet test --filter "Category=Unit" -f ${{ matrix.dotnet-target-framework }}
ado-net-tests:
runs-on: ubuntu-22.04
strategy:
Expand Down Expand Up @@ -74,7 +73,46 @@ jobs:
docker cp ydb-local:/ydb_certs/ca.pem ~/
cd src
dotnet test --filter "(FullyQualifiedName~Ado) | (FullyQualifiedName~Dapper)" -l "console;verbosity=normal"
topic-tests:
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
ydb-version: [ 'trunk' ]
dotnet-version: [ 6.0.x, 7.0.x ]
include:
- dotnet-version: 6.0.x
dotnet-target-framework: net6.0
- dotnet-version: 7.0.x
dotnet-target-framework: net7.0
services:
ydb:
image: cr.yandex/yc/yandex-docker-local-ydb:${{ matrix.ydb-version }}
ports:
- 2135:2135
- 2136:2136
- 8765:8765
env:
YDB_LOCAL_SURVIVE_RESTART: true
YDB_USE_IN_MEMORY_PDISKS: true
options: '--name ydb-local -h localhost'
env:
OS: ubuntu-22.04
YDB_VERSION: ${{ matrix.ydb-version }}
YDB_CONNECTION_STRING: grpc://localhost:2136/local
YDB_CONNECTION_STRING_SECURE: grpcs://localhost:2135/local
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install Dotnet
uses: actions/setup-dotnet@v4
with:
dotnet-version: ${{ matrix.dotnet-version }}
- name: Run ADO.NET tests
run: |
docker cp ydb-local:/ydb_certs/ca.pem ~/
cd src
dotnet test --filter "FullyQualifiedName~Topic" -l "console;verbosity=normal"
core-integration-tests:
runs-on: ubuntu-22.04
strategy:
Expand Down
20 changes: 13 additions & 7 deletions src/Ydb.Sdk/src/Services/Topic/Exceptions.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
namespace Ydb.Sdk.Services.Topic;

public class YdbTopicException : Exception
public class YdbWriterException : Exception
{
protected YdbTopicException(string message) : base(message)
public YdbWriterException(string message) : base(message)
{
Status = new Status(StatusCode.Unspecified);
}
}

public class YdbWriterException : YdbTopicException
{
public YdbWriterException(string message) : base(message)
public YdbWriterException(string message, Status status) : base(message + ": " + status)
{
Status = status;
}

public YdbWriterException(string message, Driver.TransportException e) : base(message, e)
{
Status = e.Status;
}

public Status Status { get; }
}

public class YdbReaderException : YdbTopicException
public class YdbReaderException : Exception
{
protected YdbReaderException(string message) : base(message)
{
Expand Down
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/Services/Topic/TopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ namespace Ydb.Sdk.Services.Topic;

public class TopicClient
{
private readonly Driver _driver;
private readonly IDriver _driver;

public TopicClient(Driver driver)
public TopicClient(IDriver driver)
{
_driver = driver;
}
Expand Down
21 changes: 2 additions & 19 deletions src/Ydb.Sdk/src/Services/Topic/TopicSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
protected readonly string SessionId;

private int _isActive = 1;
private bool _disposed;

protected TopicSession(BidirectionalStream<TFromClient, TFromServer> stream, ILogger logger,
string sessionId, Func<Task> initialize)
Expand All @@ -32,28 +31,12 @@ protected async void ReconnectSession()
}

Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);

while (!_disposed)
{
try
{
await _initialize();
break;
}
catch (Exception e)
{
Logger.LogError(e, "Unable to reconnect the session due to the following error");
}
}

await _initialize();
}

public void Dispose()
{
lock (this)
{
_disposed = true;
}

Stream.Dispose();
}
}
130 changes: 84 additions & 46 deletions src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ internal class Writer<TValue> : IWriter<TValue>
private readonly ILogger<Writer<TValue>> _logger;
private readonly ISerializer<TValue> _serializer;
private readonly ConcurrentQueue<MessageSending> _toSendBuffer = new();
private readonly CancellationTokenSource _disposeTokenSource = new();

private volatile TaskCompletionSource _taskWakeUpCompletionSource = new();
private volatile IWriteSession _session = null!;
private volatile bool _disposed;
private volatile IWriteSession _session = new NotStartedWriterSession("Session not started!");

private int _limitBufferMaxSize;

Expand Down Expand Up @@ -55,7 +55,7 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message)
var messageData = new MessageData
{
Data = ByteString.CopyFrom(data),
CreatedAt = Timestamp.FromDateTime(message.Timestamp),
CreatedAt = Timestamp.FromDateTime(message.Timestamp.ToUniversalTime()),
UncompressedSize = data.Length
};

Expand Down Expand Up @@ -111,7 +111,7 @@ private async void StartWriteWorker()
{
await Initialize();

while (!_disposed)
while (!_disposeTokenSource.Token.IsCancellationRequested)
{
await _taskWakeUpCompletionSource.Task;
_taskWakeUpCompletionSource = new TaskCompletionSource();
Expand All @@ -127,76 +127,100 @@ private void WakeUpWorker()

private async Task Initialize()
{
_logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config);
try
{
_logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config);

var stream = _driver.BidirectionalStreamCall(
TopicService.StreamWriteMethod,
GrpcRequestSettings.DefaultInstance
);
var stream = _driver.BidirectionalStreamCall(
TopicService.StreamWriteMethod,
GrpcRequestSettings.DefaultInstance
);

var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath };
if (_config.ProducerId != null)
{
initRequest.ProducerId = _config.ProducerId;
}
var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath };
if (_config.ProducerId != null)
{
initRequest.ProducerId = _config.ProducerId;
}

if (_config.MessageGroupId != null)
{
initRequest.MessageGroupId = _config.MessageGroupId;
}
if (_config.MessageGroupId != null)
{
initRequest.MessageGroupId = _config.MessageGroupId;
}

_logger.LogDebug("Sending initialization request for the write stream: {InitRequest}", initRequest);
_logger.LogDebug("Sending initialization request for the write stream: {InitRequest}", initRequest);

await stream.Write(new MessageFromClient { InitRequest = initRequest });
if (!await stream.MoveNextAsync())
{
_session = new NotStartedWriterSession(
$"Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}");
await stream.Write(new MessageFromClient { InitRequest = initRequest });
if (!await stream.MoveNextAsync())
{
_session = new NotStartedWriterSession(
$"Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}");

_ = Task.Run(Initialize);
_ = Task.Run(Initialize, _disposeTokenSource.Token);

return;
}
return;
}

var receivedInitMessage = stream.Current;
var receivedInitMessage = stream.Current;

var status = Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues);
var status = Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues);

if (status.IsNotSuccess)
{
_session = new NotStartedWriterSession(status.ToString());
if (status.IsNotSuccess)
{
_session = new NotStartedWriterSession("Initialization failed", status);

_ = Task.Run(Initialize);
if (status.StatusCode != StatusCode.SchemeError)
{
_ = Task.Run(Initialize, _disposeTokenSource.Token);
}

return;
}
return;
}

var initResponse = receivedInitMessage.InitResponse;
var initResponse = receivedInitMessage.InitResponse;

_logger.LogDebug("Received a response for the initialization request on the write stream: {InitResponse}",
initResponse);
_logger.LogDebug("Received a response for the initialization request on the writer stream: {InitResponse}",
initResponse);

if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec))
if (initResponse.SupportedCodecs != null &&
!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec))
{
_logger.LogCritical("Topic[{TopicPath}] is not supported codec: {Codec}", _config.TopicPath,
_config.Codec);

_session = new NotStartedWriterSession(
$"Topic[{_config.TopicPath}] is not supported codec: {_config.Codec}");
return;
}

_session = new WriterSession(_config, stream, initResponse, Initialize, _logger);
}
catch (Driver.TransportException e)
{
_logger.LogCritical("Topic[{TopicPath}] is not supported codec: {Codec}", _config.TopicPath, _config.Codec);
_logger.LogError(e, "Unable to connect the session");

_session = new NotStartedWriterSession(
$"Topic[{_config.TopicPath}] is not supported codec: {_config.Codec}");
return;
new YdbWriterException("Transport error on creating write session", e));
}

_session = new WriterSession(_config, stream, initResponse, Initialize, _logger);
}

public void Dispose()
{
_disposed = true;
try
{
_disposeTokenSource.Cancel();

_session.Dispose();
}
finally
{
_disposeTokenSource.Dispose();
}
}
}

internal record MessageSending(MessageData MessageData, TaskCompletionSource<WriteResult> TaskCompletionSource);

internal interface IWriteSession
internal interface IWriteSession : IDisposable
{
Task Write(ConcurrentQueue<MessageSending> toSendBuffer);
}
Expand All @@ -210,6 +234,16 @@ public NotStartedWriterSession(string reasonExceptionMessage)
_reasonException = new YdbWriterException(reasonExceptionMessage);
}

public NotStartedWriterSession(string reasonExceptionMessage, Status status)
{
_reasonException = new YdbWriterException(reasonExceptionMessage, status);
}

public NotStartedWriterSession(YdbWriterException reasonException)
{
_reasonException = reasonException;
}

public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
{
foreach (var messageSending in toSendBuffer)
Expand All @@ -219,6 +253,10 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)

return Task.CompletedTask;
}

public void Dispose()
{
}
}

// No thread safe
Expand Down
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ namespace Ydb.Sdk.Services.Topic.Writer;
public class WriterBuilder<TValue>
{
private readonly WriterConfig _config;
private readonly Driver _driver;
private readonly IDriver _driver;

public WriterBuilder(Driver driver, WriterConfig config)
public WriterBuilder(IDriver driver, WriterConfig config)
{
_driver = driver;
_config = config;
Expand Down
12 changes: 7 additions & 5 deletions src/Ydb.Sdk/tests/Fixture/DriverFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@

namespace Ydb.Sdk.Tests.Fixture;

public abstract class DriverFixture : IAsyncLifetime
public class DriverFixture : IAsyncLifetime
{
protected readonly Driver Driver;
public Driver Driver { get; }

protected DriverFixture(DriverConfig? driverConfig = null)
public DriverFixture()
{
driverConfig ??= new DriverConfig(
var driverConfig = new DriverConfig(
endpoint: "grpc://localhost:2136",
database: "/local"
);

Driver = new Driver(driverConfig, Utils.GetLoggerFactory());
}

protected abstract void ClientDispose();
protected virtual void ClientDispose()
{
}

public Task InitializeAsync()
{
Expand Down
Loading

0 comments on commit b24c825

Please sign in to comment.