diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4ca1164c..d8c271fa 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -33,7 +33,6 @@ jobs: run: | cd src dotnet test --filter "Category=Unit" -f ${{ matrix.dotnet-target-framework }} - ado-net-tests: runs-on: ubuntu-22.04 strategy: @@ -74,7 +73,46 @@ jobs: docker cp ydb-local:/ydb_certs/ca.pem ~/ cd src dotnet test --filter "(FullyQualifiedName~Ado) | (FullyQualifiedName~Dapper)" -l "console;verbosity=normal" - + topic-tests: + runs-on: ubuntu-22.04 + strategy: + fail-fast: false + matrix: + ydb-version: [ 'trunk' ] + dotnet-version: [ 6.0.x, 7.0.x ] + include: + - dotnet-version: 6.0.x + dotnet-target-framework: net6.0 + - dotnet-version: 7.0.x + dotnet-target-framework: net7.0 + services: + ydb: + image: cr.yandex/yc/yandex-docker-local-ydb:${{ matrix.ydb-version }} + ports: + - 2135:2135 + - 2136:2136 + - 8765:8765 + env: + YDB_LOCAL_SURVIVE_RESTART: true + YDB_USE_IN_MEMORY_PDISKS: true + options: '--name ydb-local -h localhost' + env: + OS: ubuntu-22.04 + YDB_VERSION: ${{ matrix.ydb-version }} + YDB_CONNECTION_STRING: grpc://localhost:2136/local + YDB_CONNECTION_STRING_SECURE: grpcs://localhost:2135/local + steps: + - name: Checkout code + uses: actions/checkout@v4 + - name: Install Dotnet + uses: actions/setup-dotnet@v4 + with: + dotnet-version: ${{ matrix.dotnet-version }} + - name: Run ADO.NET tests + run: | + docker cp ydb-local:/ydb_certs/ca.pem ~/ + cd src + dotnet test --filter "FullyQualifiedName~Topic" -l "console;verbosity=normal" core-integration-tests: runs-on: ubuntu-22.04 strategy: diff --git a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs index 84648c71..8e8d102b 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs @@ -1,20 +1,26 @@ namespace Ydb.Sdk.Services.Topic; -public class YdbTopicException : Exception +public class YdbWriterException : Exception { - protected YdbTopicException(string message) : base(message) + public YdbWriterException(string message) : base(message) { + Status = new Status(StatusCode.Unspecified); } -} -public class YdbWriterException : YdbTopicException -{ - public YdbWriterException(string message) : base(message) + public YdbWriterException(string message, Status status) : base(message + ": " + status) { + Status = status; } + + public YdbWriterException(string message, Driver.TransportException e) : base(message, e) + { + Status = e.Status; + } + + public Status Status { get; } } -public class YdbReaderException : YdbTopicException +public class YdbReaderException : Exception { protected YdbReaderException(string message) : base(message) { diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicClient.cs b/src/Ydb.Sdk/src/Services/Topic/TopicClient.cs index 265814df..4f70dcfd 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicClient.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicClient.cs @@ -6,9 +6,9 @@ namespace Ydb.Sdk.Services.Topic; public class TopicClient { - private readonly Driver _driver; + private readonly IDriver _driver; - public TopicClient(Driver driver) + public TopicClient(IDriver driver) { _driver = driver; } diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index 0d425331..ff81376e 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -11,7 +11,6 @@ internal abstract class TopicSession : IDisposable protected readonly string SessionId; private int _isActive = 1; - private bool _disposed; protected TopicSession(BidirectionalStream stream, ILogger logger, string sessionId, Func initialize) @@ -32,28 +31,12 @@ protected async void ReconnectSession() } Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId); - - while (!_disposed) - { - try - { - await _initialize(); - break; - } - catch (Exception e) - { - Logger.LogError(e, "Unable to reconnect the session due to the following error"); - } - } + + await _initialize(); } public void Dispose() { - lock (this) - { - _disposed = true; - } - Stream.Dispose(); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs index e1d0c7ee..3b2e6553 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs @@ -24,10 +24,10 @@ internal class Writer : IWriter private readonly ILogger> _logger; private readonly ISerializer _serializer; private readonly ConcurrentQueue _toSendBuffer = new(); + private readonly CancellationTokenSource _disposeTokenSource = new(); private volatile TaskCompletionSource _taskWakeUpCompletionSource = new(); - private volatile IWriteSession _session = null!; - private volatile bool _disposed; + private volatile IWriteSession _session = new NotStartedWriterSession("Session not started!"); private int _limitBufferMaxSize; @@ -55,7 +55,7 @@ public async Task WriteAsync(Message message) var messageData = new MessageData { Data = ByteString.CopyFrom(data), - CreatedAt = Timestamp.FromDateTime(message.Timestamp), + CreatedAt = Timestamp.FromDateTime(message.Timestamp.ToUniversalTime()), UncompressedSize = data.Length }; @@ -111,7 +111,7 @@ private async void StartWriteWorker() { await Initialize(); - while (!_disposed) + while (!_disposeTokenSource.Token.IsCancellationRequested) { await _taskWakeUpCompletionSource.Task; _taskWakeUpCompletionSource = new TaskCompletionSource(); @@ -127,76 +127,100 @@ private void WakeUpWorker() private async Task Initialize() { - _logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config); + try + { + _logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config); - var stream = _driver.BidirectionalStreamCall( - TopicService.StreamWriteMethod, - GrpcRequestSettings.DefaultInstance - ); + var stream = _driver.BidirectionalStreamCall( + TopicService.StreamWriteMethod, + GrpcRequestSettings.DefaultInstance + ); - var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath }; - if (_config.ProducerId != null) - { - initRequest.ProducerId = _config.ProducerId; - } + var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath }; + if (_config.ProducerId != null) + { + initRequest.ProducerId = _config.ProducerId; + } - if (_config.MessageGroupId != null) - { - initRequest.MessageGroupId = _config.MessageGroupId; - } + if (_config.MessageGroupId != null) + { + initRequest.MessageGroupId = _config.MessageGroupId; + } - _logger.LogDebug("Sending initialization request for the write stream: {InitRequest}", initRequest); + _logger.LogDebug("Sending initialization request for the write stream: {InitRequest}", initRequest); - await stream.Write(new MessageFromClient { InitRequest = initRequest }); - if (!await stream.MoveNextAsync()) - { - _session = new NotStartedWriterSession( - $"Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}"); + await stream.Write(new MessageFromClient { InitRequest = initRequest }); + if (!await stream.MoveNextAsync()) + { + _session = new NotStartedWriterSession( + $"Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}"); - _ = Task.Run(Initialize); + _ = Task.Run(Initialize, _disposeTokenSource.Token); - return; - } + return; + } - var receivedInitMessage = stream.Current; + var receivedInitMessage = stream.Current; - var status = Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues); + var status = Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues); - if (status.IsNotSuccess) - { - _session = new NotStartedWriterSession(status.ToString()); + if (status.IsNotSuccess) + { + _session = new NotStartedWriterSession("Initialization failed", status); - _ = Task.Run(Initialize); + if (status.StatusCode != StatusCode.SchemeError) + { + _ = Task.Run(Initialize, _disposeTokenSource.Token); + } - return; - } + return; + } - var initResponse = receivedInitMessage.InitResponse; + var initResponse = receivedInitMessage.InitResponse; - _logger.LogDebug("Received a response for the initialization request on the write stream: {InitResponse}", - initResponse); + _logger.LogDebug("Received a response for the initialization request on the writer stream: {InitResponse}", + initResponse); - if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec)) + if (initResponse.SupportedCodecs != null && + !initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec)) + { + _logger.LogCritical("Topic[{TopicPath}] is not supported codec: {Codec}", _config.TopicPath, + _config.Codec); + + _session = new NotStartedWriterSession( + $"Topic[{_config.TopicPath}] is not supported codec: {_config.Codec}"); + return; + } + + _session = new WriterSession(_config, stream, initResponse, Initialize, _logger); + } + catch (Driver.TransportException e) { - _logger.LogCritical("Topic[{TopicPath}] is not supported codec: {Codec}", _config.TopicPath, _config.Codec); + _logger.LogError(e, "Unable to connect the session"); _session = new NotStartedWriterSession( - $"Topic[{_config.TopicPath}] is not supported codec: {_config.Codec}"); - return; + new YdbWriterException("Transport error on creating write session", e)); } - - _session = new WriterSession(_config, stream, initResponse, Initialize, _logger); } public void Dispose() { - _disposed = true; + try + { + _disposeTokenSource.Cancel(); + + _session.Dispose(); + } + finally + { + _disposeTokenSource.Dispose(); + } } } internal record MessageSending(MessageData MessageData, TaskCompletionSource TaskCompletionSource); -internal interface IWriteSession +internal interface IWriteSession : IDisposable { Task Write(ConcurrentQueue toSendBuffer); } @@ -210,6 +234,16 @@ public NotStartedWriterSession(string reasonExceptionMessage) _reasonException = new YdbWriterException(reasonExceptionMessage); } + public NotStartedWriterSession(string reasonExceptionMessage, Status status) + { + _reasonException = new YdbWriterException(reasonExceptionMessage, status); + } + + public NotStartedWriterSession(YdbWriterException reasonException) + { + _reasonException = reasonException; + } + public Task Write(ConcurrentQueue toSendBuffer) { foreach (var messageSending in toSendBuffer) @@ -219,6 +253,10 @@ public Task Write(ConcurrentQueue toSendBuffer) return Task.CompletedTask; } + + public void Dispose() + { + } } // No thread safe diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs index b144c91f..e3015c65 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs @@ -3,9 +3,9 @@ namespace Ydb.Sdk.Services.Topic.Writer; public class WriterBuilder { private readonly WriterConfig _config; - private readonly Driver _driver; + private readonly IDriver _driver; - public WriterBuilder(Driver driver, WriterConfig config) + public WriterBuilder(IDriver driver, WriterConfig config) { _driver = driver; _config = config; diff --git a/src/Ydb.Sdk/tests/Fixture/DriverFixture.cs b/src/Ydb.Sdk/tests/Fixture/DriverFixture.cs index e98b628f..450de26d 100644 --- a/src/Ydb.Sdk/tests/Fixture/DriverFixture.cs +++ b/src/Ydb.Sdk/tests/Fixture/DriverFixture.cs @@ -2,13 +2,13 @@ namespace Ydb.Sdk.Tests.Fixture; -public abstract class DriverFixture : IAsyncLifetime +public class DriverFixture : IAsyncLifetime { - protected readonly Driver Driver; + public Driver Driver { get; } - protected DriverFixture(DriverConfig? driverConfig = null) + public DriverFixture() { - driverConfig ??= new DriverConfig( + var driverConfig = new DriverConfig( endpoint: "grpc://localhost:2136", database: "/local" ); @@ -16,7 +16,9 @@ protected DriverFixture(DriverConfig? driverConfig = null) Driver = new Driver(driverConfig, Utils.GetLoggerFactory()); } - protected abstract void ClientDispose(); + protected virtual void ClientDispose() + { + } public Task InitializeAsync() { diff --git a/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs b/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs new file mode 100644 index 00000000..2a96a537 --- /dev/null +++ b/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs @@ -0,0 +1,50 @@ +using Xunit; +using Ydb.Sdk.Services.Topic; +using Ydb.Sdk.Services.Topic.Writer; +using Ydb.Sdk.Tests.Fixture; + +namespace Ydb.Sdk.Tests.Topic; + +public class WriterIntegrationTests : IClassFixture +{ + private readonly IDriver _driver; + private readonly string _topicName; + + public WriterIntegrationTests(DriverFixture driverFixture) + { + _driver = driverFixture.Driver; + _topicName = "topic_" + Utils.Net; + } + + [Fact] + public async Task WriteAsync_WhenOneMessage_ReturnWritten() + { + var topicClient = new TopicClient(_driver); + var topicSettings = new CreateTopicSettings + { + Path = _topicName + }; + await topicClient.CreateTopic(topicSettings); + + using var writer = new WriterBuilder(_driver, + new WriterConfig(_topicName) { ProducerId = "producerId" }) + .Build(); + + var result = await writer.WriteAsync("abacaba"); + + Assert.Equal(PersistenceStatus.Written, result.Status); + + await topicClient.DropTopic(new DropTopicSettings { Path = _topicName }); + } + + [Fact] + public async Task WriteAsync_WhenTopicNotFound_ReturnNotFoundException() + { + using var writer = new WriterBuilder(_driver, + new WriterConfig(_topicName + "_not_found") { ProducerId = "producerId" }) + .Build(); + + Assert.Equal(StatusCode.SchemeError, (await Assert.ThrowsAsync( + () => writer.WriteAsync("hello world"))).Status.StatusCode); + } +} diff --git a/src/Ydb.Sdk/tests/Utils.cs b/src/Ydb.Sdk/tests/Utils.cs index 4da30819..825cad9c 100644 --- a/src/Ydb.Sdk/tests/Utils.cs +++ b/src/Ydb.Sdk/tests/Utils.cs @@ -47,7 +47,7 @@ public static async Task ExecuteSchemeQuery( internal static ILoggerFactory GetLoggerFactory() { return new ServiceCollection() - .AddLogging(configure => configure.AddConsole().SetMinimumLevel(LogLevel.Information)) + .AddLogging(configure => configure.AddConsole().SetMinimumLevel(LogLevel.Debug)) .BuildServiceProvider() .GetService() ?? NullLoggerFactory.Instance; }