From c3481a2e5f3a4fe1b79109911e8c48d94de701e5 Mon Sep 17 00:00:00 2001 From: XmasApple Date: Tue, 24 Oct 2023 19:26:07 +0300 Subject: [PATCH] Basic session pool. --- src/Ydb.Sdk/src/Services/Query/QueryClient.cs | 57 +++++- src/Ydb.Sdk/src/Services/Query/Session.cs | 47 ++++- src/Ydb.Sdk/src/Services/Query/SessionPool.cs | 165 +++++++++++++++++- src/Ydb.Sdk/src/Status.cs | 3 + 4 files changed, 258 insertions(+), 14 deletions(-) diff --git a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs index f6406ec4..4aa4ad07 100644 --- a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs +++ b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs @@ -284,15 +284,36 @@ internal static RollbackTransactionResponse FromProto(Ydb.Query.RollbackTransact } } +public class QueryClientConfig +{ + public SessionPoolConfig SessionPoolConfig { get; } + + public QueryClientConfig( + SessionPoolConfig? sessionPoolConfig = null) + { + SessionPoolConfig = sessionPoolConfig ?? new SessionPoolConfig(); + } +} + public class QueryClient : ClientBase, IDisposable { + private readonly ISessionPool _sessionPool; private readonly ILogger _logger; - - public QueryClient(Driver driver) : base(driver) + + public QueryClient(Driver driver, QueryClientConfig config) : base(driver) { _logger = Driver.LoggerFactory.CreateLogger(); + + _sessionPool = new SessionPool(driver, config.SessionPoolConfig); + } + + internal QueryClient(Driver driver, ISessionPool sessionPool) : base(driver) + { + _logger = driver.LoggerFactory.CreateLogger(); + + _sessionPool = sessionPool; } public async Task CreateSession(CreateSessionSettings? settings = null) @@ -475,7 +496,13 @@ public async Task ExecOnSession( RetrySettings? retrySettings = null ) { - retrySettings = new RetrySettings(); + if (_sessionPool is not SessionPool sessionPool) + { + throw new InvalidCastException( + $"{nameof(_sessionPool)} is not object of type {typeof(SessionPool).FullName}"); + } + + retrySettings ??= new RetrySettings(); IResponse response = new ClientInternalErrorResponse("SessionRetry, unexpected response value."); Session? session = null; @@ -486,7 +513,13 @@ public async Task ExecOnSession( { if (session is null) { - // TODO get session from pool + var getSessionResponse = await sessionPool.GetSession(); + if (getSessionResponse.Status.IsSuccess) + { + session = getSessionResponse.Session; + } + + response = getSessionResponse; } if (session is not null) @@ -501,16 +534,22 @@ public async Task ExecOnSession( } var retryRule = retrySettings.GetRetryRule(response.Status.StatusCode); - if (retryRule.DeleteSession && session is not null) + if (retryRule.DeleteSession) + { + _logger.LogTrace($"Retry: Session ${session?.Id} invalid, disposing"); + session?.Dispose(); + } + else if (session is not null) { - _logger.LogTrace($"Retry: Session ${session.Id} invalid, disposing"); - // TODO delete session + sessionPool.ReturnSession(session); } if (retryRule.Idempotency == Idempotency.Idempotent && retrySettings.IsIdempotent || retryRule.Idempotency == Idempotency.NonIdempotent) { - _logger.LogTrace($"Retry: idempotent error {response.Status.StatusCode}, retrying on session ${session?.Id}"); + _logger.LogTrace( + $"Retry: Session ${session?.Id}, " + + $"idempotent error {response.Status.StatusCode} retrying "); await Task.Delay(retryRule.BackoffSettings.CalcBackoff(attempt)); } else @@ -521,7 +560,7 @@ public async Task ExecOnSession( } finally { - // TODO delete session + session?.Dispose(); } return response; diff --git a/src/Ydb.Sdk/src/Services/Query/Session.cs b/src/Ydb.Sdk/src/Services/Query/Session.cs index eb474edb..3421ffd1 100644 --- a/src/Ydb.Sdk/src/Services/Query/Session.cs +++ b/src/Ydb.Sdk/src/Services/Query/Session.cs @@ -12,12 +12,14 @@ namespace Ydb.Sdk.Services.Query; /// public class Session : ClientBase { + internal SessionPool? SessionPool; private readonly ILogger _logger; + private bool _disposed; - internal Session(Driver driver, SessionPool? sessionPool, string id, long nodeId, string? endpoint) + internal Session(Driver driver, SessionPool sessionPool, string id, long nodeId, string? endpoint) : base(driver) { - // _sessionPool = sessionPool; + SessionPool = sessionPool; _logger = Driver.LoggerFactory.CreateLogger(); Id = id; NodeId = nodeId; @@ -30,6 +32,47 @@ internal Session(Driver driver, SessionPool? sessionPool, string id, long nodeId internal string? Endpoint { get; } + private void CheckSession() + { + if (_disposed) + { + throw new ObjectDisposedException(GetType().FullName); + } + } + + public void Dispose(TimeSpan? deleteSessionTimeout = null) + { + Dispose(true, deleteSessionTimeout); + } + + protected virtual void Dispose(bool disposing, TimeSpan? deleteSessionTimeout = null) + { + if (_disposed) + { + return; + } + + if (disposing) + { + if (SessionPool is null) + { + _logger.LogTrace($"Closing detached session on dispose: {Id}"); + + var client = new QueryClient(Driver, new NoPool()); + _ = client.DeleteSession(Id, new DeleteSessionSettings + { + TransportTimeout = deleteSessionTimeout ?? TimeSpan.FromSeconds(1) + }); + } + else + { + SessionPool.DisposeSession(this); + } + } + + _disposed = true; + } + // // // public ExecuteQueryStream ExecuteQueryYql( diff --git a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs index 2b03a8e1..bb4b3856 100644 --- a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs +++ b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs @@ -1,7 +1,166 @@ +using Microsoft.Extensions.Logging; +using Ydb.Sdk.Client; + namespace Ydb.Sdk.Services.Query; -public class SessionPool +public class SessionPoolConfig +{ + public SessionPoolConfig( + uint sizeLimit = 100) + { + SizeLimit = sizeLimit; + } + + public readonly uint SizeLimit; + public TimeSpan CreateSessionTimeout { get; set; } = TimeSpan.FromSeconds(1); + public TimeSpan DeleteSessionTimeout { get; set; } = TimeSpan.FromSeconds(1); +} + +internal class GetSessionResponse : ResponseBase +{ + public Session? Session; + + internal GetSessionResponse(Status status, Session? session = null) + : base(status) + { + Session = session; + } +} + +internal interface ISessionPool : IDisposable +{ +} + +internal class NoPool : ISessionPool { - internal uint SizeLimit { get; } - + public void Dispose() + { + } +} + +internal class SessionPool : ISessionPool +{ + private readonly Driver _driver; + private readonly SessionPoolConfig _config; + + private readonly object _lock = new(); + + private readonly ILogger _logger; + private readonly QueryClient _client; + + private readonly Dictionary _sessions = new(); + private readonly Stack _idleSessions = new(); + private uint _pendingSessions; + + private bool _disposed; + + public SessionPool(Driver driver, SessionPoolConfig config) + { + _driver = driver; + _config = config; + + _logger = driver.LoggerFactory.CreateLogger(); + _client = new QueryClient(driver, new NoPool()); + } + + public async Task GetSession() + { + var response = new GetSessionResponse(new Status(StatusCode.ClientInternalError, + $"{nameof(SessionPool)},{nameof(GetSession)}: unexpected response value.")); + + const int maxAttempts = 100; + for (var attempt = 0; attempt < maxAttempts; attempt++) + { + lock (_lock) + { + while (_idleSessions.Count > 0) + { + var sessionId = _idleSessions.Pop(); + + if (_sessions.TryGetValue(sessionId, out var session)) + { + _logger.LogTrace( + $"{nameof(SessionPool)},{nameof(GetSession)}: Session {sessionId} removed from pool"); + return new GetSessionResponse(Status.Success, session); + } + } + + if (_sessions.Count + _pendingSessions >= _config.SizeLimit) + { + _logger.LogWarning($"{nameof(SessionPool)},{nameof(GetSession)}: size limit exceeded" + + $", limit: {_config.SizeLimit}" + + $", pending sessions: {_pendingSessions}"); + + return new GetSessionResponse(new Status(StatusCode.ClientResourceExhausted, + $"{nameof(SessionPool)},{nameof(GetSession)}: max active sessions limit exceeded.")); + } + + _pendingSessions++; + } + + var createSessionResponse = await _client.CreateSession(new CreateSessionSettings + { TransportTimeout = _config.CreateSessionTimeout }); + + lock (_lock) + { + _pendingSessions--; + if (createSessionResponse.Status.IsSuccess) + { + var session = createSessionResponse.Session!; + session.SessionPool = this; + + _sessions.Add(session.Id, session); + _logger.LogTrace($"Session {session.Id} created, " + + $"endpoint: {session.Endpoint}, " + + $"nodeId: {session.NodeId}"); + return new GetSessionResponse(createSessionResponse.Status, session); + } + + _logger.LogWarning($"Failed to create session: {createSessionResponse.Status}"); + response = new GetSessionResponse(createSessionResponse.Status); + } + } + + return response; + } + + + public void ReturnSession(Session session) + { + _idleSessions.Push(session.Id); + _sessions[session.Id] = session; + _logger.LogTrace($"Session returned to pool: {session.Id}"); + } + + internal void DisposeSession(Session session) + { + _sessions.Remove(session.Id); + } + + public void Dispose() + { + Dispose(true); + } + + private void Dispose(bool disposing) + { + lock (_lock) + { + if (_disposed) + { + return; + } + + if (disposing) + { + foreach (var session in _sessions.Values) + { + session.Dispose(_config.DeleteSessionTimeout); + _logger.LogTrace($"Closing session on session pool dispose: {session.Id}"); + } + } + + _disposed = true; + } + } } \ No newline at end of file diff --git a/src/Ydb.Sdk/src/Status.cs b/src/Ydb.Sdk/src/Status.cs index 728e60f5..76003cfb 100644 --- a/src/Ydb.Sdk/src/Status.cs +++ b/src/Ydb.Sdk/src/Status.cs @@ -1,6 +1,7 @@ using System.Text; using Google.Protobuf.Collections; using Ydb.Issue; +using Ydb.Sdk.Services.Table; namespace Ydb.Sdk; @@ -185,6 +186,8 @@ public Status(StatusCode statusCode, string message) : this( { } + public static readonly Status Success = new(StatusCode.Success); + public bool IsSuccess => StatusCode == StatusCode.Success; public void EnsureSuccess()