Skip to content

Commit

Permalink
Basic session pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
XmasApple committed Oct 24, 2023
1 parent a2c110b commit c3481a2
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 14 deletions.
57 changes: 48 additions & 9 deletions src/Ydb.Sdk/src/Services/Query/QueryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,36 @@ internal static RollbackTransactionResponse FromProto(Ydb.Query.RollbackTransact
}
}

public class QueryClientConfig
{
public SessionPoolConfig SessionPoolConfig { get; }

public QueryClientConfig(
SessionPoolConfig? sessionPoolConfig = null)
{
SessionPoolConfig = sessionPoolConfig ?? new SessionPoolConfig();
}
}

public class QueryClient :
ClientBase,
IDisposable
{
private readonly ISessionPool _sessionPool;
private readonly ILogger _logger;
public QueryClient(Driver driver) : base(driver)

public QueryClient(Driver driver, QueryClientConfig config) : base(driver)
{
_logger = Driver.LoggerFactory.CreateLogger<QueryClient>();

_sessionPool = new SessionPool(driver, config.SessionPoolConfig);
}

internal QueryClient(Driver driver, ISessionPool sessionPool) : base(driver)
{
_logger = driver.LoggerFactory.CreateLogger<QueryClient>();

_sessionPool = sessionPool;
}

public async Task<CreateSessionResponse> CreateSession(CreateSessionSettings? settings = null)
Expand Down Expand Up @@ -475,7 +496,13 @@ public async Task<IResponse> ExecOnSession(
RetrySettings? retrySettings = null
)
{
retrySettings = new RetrySettings();
if (_sessionPool is not SessionPool sessionPool)
{
throw new InvalidCastException(
$"{nameof(_sessionPool)} is not object of type {typeof(SessionPool).FullName}");
}

retrySettings ??= new RetrySettings();

IResponse response = new ClientInternalErrorResponse("SessionRetry, unexpected response value.");
Session? session = null;
Expand All @@ -486,7 +513,13 @@ public async Task<IResponse> ExecOnSession(
{
if (session is null)
{
// TODO get session from pool
var getSessionResponse = await sessionPool.GetSession();
if (getSessionResponse.Status.IsSuccess)
{
session = getSessionResponse.Session;
}

response = getSessionResponse;
}

if (session is not null)
Expand All @@ -501,16 +534,22 @@ public async Task<IResponse> ExecOnSession(
}

var retryRule = retrySettings.GetRetryRule(response.Status.StatusCode);
if (retryRule.DeleteSession && session is not null)
if (retryRule.DeleteSession)
{
_logger.LogTrace($"Retry: Session ${session?.Id} invalid, disposing");
session?.Dispose();
}
else if (session is not null)
{
_logger.LogTrace($"Retry: Session ${session.Id} invalid, disposing");
// TODO delete session
sessionPool.ReturnSession(session);
}

if (retryRule.Idempotency == Idempotency.Idempotent && retrySettings.IsIdempotent ||
retryRule.Idempotency == Idempotency.NonIdempotent)
{
_logger.LogTrace($"Retry: idempotent error {response.Status.StatusCode}, retrying on session ${session?.Id}");
_logger.LogTrace(
$"Retry: Session ${session?.Id}, " +
$"idempotent error {response.Status.StatusCode} retrying ");
await Task.Delay(retryRule.BackoffSettings.CalcBackoff(attempt));
}
else
Expand All @@ -521,7 +560,7 @@ public async Task<IResponse> ExecOnSession(
}
finally
{
// TODO delete session
session?.Dispose();
}

return response;
Expand Down
47 changes: 45 additions & 2 deletions src/Ydb.Sdk/src/Services/Query/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ namespace Ydb.Sdk.Services.Query;
/// </summary>
public class Session : ClientBase
{
internal SessionPool? SessionPool;
private readonly ILogger _logger;
private bool _disposed;

internal Session(Driver driver, SessionPool? sessionPool, string id, long nodeId, string? endpoint)
internal Session(Driver driver, SessionPool sessionPool, string id, long nodeId, string? endpoint)
: base(driver)
{
// _sessionPool = sessionPool;
SessionPool = sessionPool;
_logger = Driver.LoggerFactory.CreateLogger<Session>();
Id = id;
NodeId = nodeId;
Expand All @@ -30,6 +32,47 @@ internal Session(Driver driver, SessionPool? sessionPool, string id, long nodeId

internal string? Endpoint { get; }

private void CheckSession()
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}
}

public void Dispose(TimeSpan? deleteSessionTimeout = null)
{
Dispose(true, deleteSessionTimeout);
}

protected virtual void Dispose(bool disposing, TimeSpan? deleteSessionTimeout = null)
{
if (_disposed)
{
return;
}

if (disposing)
{
if (SessionPool is null)
{
_logger.LogTrace($"Closing detached session on dispose: {Id}");

var client = new QueryClient(Driver, new NoPool());
_ = client.DeleteSession(Id, new DeleteSessionSettings
{
TransportTimeout = deleteSessionTimeout ?? TimeSpan.FromSeconds(1)
});
}
else
{
SessionPool.DisposeSession(this);
}
}

_disposed = true;
}

//
//
// public ExecuteQueryStream ExecuteQueryYql(
Expand Down
165 changes: 162 additions & 3 deletions src/Ydb.Sdk/src/Services/Query/SessionPool.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,166 @@
using Microsoft.Extensions.Logging;
using Ydb.Sdk.Client;

namespace Ydb.Sdk.Services.Query;

public class SessionPool
public class SessionPoolConfig
{
public SessionPoolConfig(
uint sizeLimit = 100)
{
SizeLimit = sizeLimit;
}

public readonly uint SizeLimit;
public TimeSpan CreateSessionTimeout { get; set; } = TimeSpan.FromSeconds(1);
public TimeSpan DeleteSessionTimeout { get; set; } = TimeSpan.FromSeconds(1);
}

internal class GetSessionResponse : ResponseBase
{
public Session? Session;

internal GetSessionResponse(Status status, Session? session = null)
: base(status)
{
Session = session;
}
}

internal interface ISessionPool : IDisposable
{
}

internal class NoPool : ISessionPool
{
internal uint SizeLimit { get; }

public void Dispose()
{
}
}

internal class SessionPool : ISessionPool
{
private readonly Driver _driver;
private readonly SessionPoolConfig _config;

private readonly object _lock = new();

private readonly ILogger _logger;
private readonly QueryClient _client;

private readonly Dictionary<string, Session> _sessions = new();
private readonly Stack<string> _idleSessions = new();
private uint _pendingSessions;

private bool _disposed;

public SessionPool(Driver driver, SessionPoolConfig config)
{
_driver = driver;
_config = config;

_logger = driver.LoggerFactory.CreateLogger<SessionPool>();
_client = new QueryClient(driver, new NoPool());
}

public async Task<GetSessionResponse> GetSession()
{
var response = new GetSessionResponse(new Status(StatusCode.ClientInternalError,
$"{nameof(SessionPool)},{nameof(GetSession)}: unexpected response value."));

const int maxAttempts = 100;
for (var attempt = 0; attempt < maxAttempts; attempt++)
{
lock (_lock)
{
while (_idleSessions.Count > 0)
{
var sessionId = _idleSessions.Pop();

if (_sessions.TryGetValue(sessionId, out var session))
{
_logger.LogTrace(
$"{nameof(SessionPool)},{nameof(GetSession)}: Session {sessionId} removed from pool");
return new GetSessionResponse(Status.Success, session);
}
}

if (_sessions.Count + _pendingSessions >= _config.SizeLimit)
{
_logger.LogWarning($"{nameof(SessionPool)},{nameof(GetSession)}: size limit exceeded" +
$", limit: {_config.SizeLimit}" +
$", pending sessions: {_pendingSessions}");

return new GetSessionResponse(new Status(StatusCode.ClientResourceExhausted,
$"{nameof(SessionPool)},{nameof(GetSession)}: max active sessions limit exceeded."));
}

_pendingSessions++;
}

var createSessionResponse = await _client.CreateSession(new CreateSessionSettings
{ TransportTimeout = _config.CreateSessionTimeout });

lock (_lock)
{
_pendingSessions--;
if (createSessionResponse.Status.IsSuccess)
{
var session = createSessionResponse.Session!;
session.SessionPool = this;

_sessions.Add(session.Id, session);
_logger.LogTrace($"Session {session.Id} created, " +
$"endpoint: {session.Endpoint}, " +
$"nodeId: {session.NodeId}");
return new GetSessionResponse(createSessionResponse.Status, session);
}

_logger.LogWarning($"Failed to create session: {createSessionResponse.Status}");
response = new GetSessionResponse(createSessionResponse.Status);
}
}

return response;
}


public void ReturnSession(Session session)
{
_idleSessions.Push(session.Id);
_sessions[session.Id] = session;
_logger.LogTrace($"Session returned to pool: {session.Id}");
}

internal void DisposeSession(Session session)
{
_sessions.Remove(session.Id);
}

public void Dispose()
{
Dispose(true);
}

private void Dispose(bool disposing)
{
lock (_lock)
{
if (_disposed)
{
return;
}

if (disposing)
{
foreach (var session in _sessions.Values)
{
session.Dispose(_config.DeleteSessionTimeout);
_logger.LogTrace($"Closing session on session pool dispose: {session.Id}");
}
}

_disposed = true;
}
}
}
3 changes: 3 additions & 0 deletions src/Ydb.Sdk/src/Status.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Text;
using Google.Protobuf.Collections;
using Ydb.Issue;
using Ydb.Sdk.Services.Table;

namespace Ydb.Sdk;

Expand Down Expand Up @@ -185,6 +186,8 @@ public Status(StatusCode statusCode, string message) : this(
{
}

public static readonly Status Success = new(StatusCode.Success);

public bool IsSuccess => StatusCode == StatusCode.Success;

public void EnsureSuccess()
Expand Down

0 comments on commit c3481a2

Please sign in to comment.