Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Init MVP Writer implementation #206

Merged
merged 6 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions src/Ydb.Sdk/src/Driver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,7 @@ public async ValueTask<bool> MoveNextAsync()
}
}

internal sealed class BidirectionalStream<TRequest, TResponse> : IAsyncEnumerator<TResponse>,
IAsyncEnumerable<TResponse>
internal sealed class BidirectionalStream<TRequest, TResponse> : IDisposable
{
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _bidirectionalStream;
private readonly Action _rpcErrorAction;
Expand All @@ -394,13 +393,6 @@ public async Task Write(TRequest request)
}
}

public ValueTask DisposeAsync()
{
_bidirectionalStream.Dispose();

return default;
}

public async ValueTask<bool> MoveNextAsync()
{
try
Expand All @@ -417,9 +409,9 @@ public async ValueTask<bool> MoveNextAsync()

public TResponse Current => _bidirectionalStream.ResponseStream.Current;

public IAsyncEnumerator<TResponse> GetAsyncEnumerator(CancellationToken cancellationToken = new())
public void Dispose()
{
return this;
_bidirectionalStream.Dispose();
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/Services/Topic/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ protected YdbTopicException(string message) : base(message)
}
}

public class YdbProducerException : YdbTopicException
public class YdbWriterException : YdbTopicException
{
public YdbProducerException(string message) : base(message)
public YdbWriterException(string message) : base(message)
{
}
}
8 changes: 0 additions & 8 deletions src/Ydb.Sdk/src/Services/Topic/IProducer.cs

This file was deleted.

8 changes: 8 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/IWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Ydb.Sdk.Services.Topic;

public interface IWriter<TValue>
{
public Task<WriteResult> WriteAsync(TValue data);

public Task<WriteResult> WriteAsync(Message<TValue> message);
}
17 changes: 17 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Message.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Ydb.Sdk.Services.Topic;

public class Message<TValue>
{
public Message(TValue data)
{
Data = data;
}

public DateTime Timestamp { get; set; } = DateTime.Now;

public TValue Data { get; }

public List<Metadata> Metadata { get; } = new();
}

public record Metadata(string Key, byte[] Value);
88 changes: 0 additions & 88 deletions src/Ydb.Sdk/src/Services/Topic/Producer.cs

This file was deleted.

60 changes: 0 additions & 60 deletions src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs

This file was deleted.

16 changes: 0 additions & 16 deletions src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs

This file was deleted.

59 changes: 59 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/TopicSession.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using Microsoft.Extensions.Logging;

namespace Ydb.Sdk.Services.Topic;

internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
{
private readonly Func<Task> _initialize;

protected readonly Driver.BidirectionalStream<TFromClient, TFromServer> Stream;
protected readonly ILogger Logger;
protected readonly string SessionId;

private int _isActive = 1;
private bool _disposed;

protected TopicSession(Driver.BidirectionalStream<TFromClient, TFromServer> stream, ILogger logger,
string sessionId, Func<Task> initialize)
{
Stream = stream;
Logger = logger;
SessionId = sessionId;
_initialize = initialize;
}

protected async void ReconnectSession()
{
if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)
{
Logger.LogWarning("Skipping reconnect. A reconnect session has already been initiated");

return;
}

Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);

while (!_disposed)
{
try
{
await _initialize();
break;
}
catch (Exception e)
{
Logger.LogError(e, "Unable to reconnect the session due to the following error");
}
}
}

public void Dispose()
{
lock (this)
{
_disposed = true;
}

Stream.Dispose();
}
}
40 changes: 40 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/WriteResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using Ydb.Topic;

namespace Ydb.Sdk.Services.Topic;

public class WriteResult
{
private readonly long _offset;

internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
{
switch (ack.MessageWriteStatusCase)
{
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Written:
Status = PersistenceStatus.Written;
_offset = ack.Written.Offset;
break;
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Skipped:
Status = PersistenceStatus.AlreadyWritten;
break;
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.None:
default:
throw new YdbWriterException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}");
}
}

public PersistenceStatus Status { get; }

public bool TryGetOffset(out long offset)
{
offset = _offset;

return Status == PersistenceStatus.Written;
}
}

public enum PersistenceStatus
{
Written,
AlreadyWritten
}
Loading
Loading