Skip to content

Commit

Permalink
feat: Init MVP Writer implementation (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillKurdyukov authored Oct 25, 2024
1 parent d0d22de commit 5a964cd
Show file tree
Hide file tree
Showing 13 changed files with 462 additions and 185 deletions.
14 changes: 3 additions & 11 deletions src/Ydb.Sdk/src/Driver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,7 @@ public async ValueTask<bool> MoveNextAsync()
}
}

internal sealed class BidirectionalStream<TRequest, TResponse> : IAsyncEnumerator<TResponse>,
IAsyncEnumerable<TResponse>
internal sealed class BidirectionalStream<TRequest, TResponse> : IDisposable
{
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _bidirectionalStream;
private readonly Action _rpcErrorAction;
Expand All @@ -394,13 +393,6 @@ public async Task Write(TRequest request)
}
}

public ValueTask DisposeAsync()
{
_bidirectionalStream.Dispose();

return default;
}

public async ValueTask<bool> MoveNextAsync()
{
try
Expand All @@ -417,9 +409,9 @@ public async ValueTask<bool> MoveNextAsync()

public TResponse Current => _bidirectionalStream.ResponseStream.Current;

public IAsyncEnumerator<TResponse> GetAsyncEnumerator(CancellationToken cancellationToken = new())
public void Dispose()
{
return this;
_bidirectionalStream.Dispose();
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/Services/Topic/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ protected YdbTopicException(string message) : base(message)
}
}

public class YdbProducerException : YdbTopicException
public class YdbWriterException : YdbTopicException
{
public YdbProducerException(string message) : base(message)
public YdbWriterException(string message) : base(message)
{
}
}
8 changes: 0 additions & 8 deletions src/Ydb.Sdk/src/Services/Topic/IProducer.cs

This file was deleted.

8 changes: 8 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/IWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Ydb.Sdk.Services.Topic;

public interface IWriter<TValue>
{
public Task<WriteResult> WriteAsync(TValue data);

public Task<WriteResult> WriteAsync(Message<TValue> message);
}
17 changes: 17 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Message.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Ydb.Sdk.Services.Topic;

public class Message<TValue>
{
public Message(TValue data)
{
Data = data;
}

public DateTime Timestamp { get; set; } = DateTime.Now;

public TValue Data { get; }

public List<Metadata> Metadata { get; } = new();
}

public record Metadata(string Key, byte[] Value);
88 changes: 0 additions & 88 deletions src/Ydb.Sdk/src/Services/Topic/Producer.cs

This file was deleted.

60 changes: 0 additions & 60 deletions src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs

This file was deleted.

16 changes: 0 additions & 16 deletions src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs

This file was deleted.

59 changes: 59 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/TopicSession.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using Microsoft.Extensions.Logging;

namespace Ydb.Sdk.Services.Topic;

internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
{
private readonly Func<Task> _initialize;

protected readonly Driver.BidirectionalStream<TFromClient, TFromServer> Stream;
protected readonly ILogger Logger;
protected readonly string SessionId;

private int _isActive = 1;
private bool _disposed;

protected TopicSession(Driver.BidirectionalStream<TFromClient, TFromServer> stream, ILogger logger,
string sessionId, Func<Task> initialize)
{
Stream = stream;
Logger = logger;
SessionId = sessionId;
_initialize = initialize;
}

protected async void ReconnectSession()
{
if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)
{
Logger.LogWarning("Skipping reconnect. A reconnect session has already been initiated");

return;
}

Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);

while (!_disposed)
{
try
{
await _initialize();
break;
}
catch (Exception e)
{
Logger.LogError(e, "Unable to reconnect the session due to the following error");
}
}
}

public void Dispose()
{
lock (this)
{
_disposed = true;
}

Stream.Dispose();
}
}
40 changes: 40 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/WriteResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using Ydb.Topic;

namespace Ydb.Sdk.Services.Topic;

public class WriteResult
{
private readonly long _offset;

internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
{
switch (ack.MessageWriteStatusCase)
{
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Written:
Status = PersistenceStatus.Written;
_offset = ack.Written.Offset;
break;
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Skipped:
Status = PersistenceStatus.AlreadyWritten;
break;
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.None:
default:
throw new YdbWriterException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}");
}
}

public PersistenceStatus Status { get; }

public bool TryGetOffset(out long offset)
{
offset = _offset;

return Status == PersistenceStatus.Written;
}
}

public enum PersistenceStatus
{
Written,
AlreadyWritten
}
Loading

0 comments on commit 5a964cd

Please sign in to comment.