diff --git a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs index a582f653..08ef870c 100644 --- a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs +++ b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs @@ -8,6 +8,8 @@ namespace Ydb.Sdk.Services.Query; internal class SessionPool : SessionPool { + private readonly Dictionary _attachedSessions = new(); + public SessionPool(Driver driver, SessionPoolConfig config) : base( driver: driver, @@ -77,6 +79,23 @@ private async Task AttachAndMonitor(string sessionId) CheckPart(firstPart, sessionId); + cts = new CancellationTokenSource(); + + var monitorTask = Task.Run(async () => await Monitor(sessionId, stream), cts.Token); + lock (Lock) + { + _attachedSessions.Add(sessionId, cts); + } + + await monitorTask; + lock (Lock) + { + _attachedSessions.Remove(sessionId); + } + } + + private async Task Monitor(string sessionId, SessionStateStream stream) + { while (await stream.Next()) { var part = stream.Response; @@ -104,6 +123,22 @@ private void CheckPart(Query.SessionState part, string sessionId) } } + internal new void InvalidateSession(string id) + { + DetachSession(id); + base.InvalidateSession(id); + } + + private void DetachSession(string id) + { + lock (Lock) + { + _attachedSessions.Remove(id, out var cts); + cts?.Cancel(); + Logger.LogInformation($"Session detached: {id}"); + } + } + private protected override Session CopySession(Session other) { return new Session( @@ -116,9 +151,11 @@ private protected override Session CopySession(Session other) private protected override void DeleteSession(string id) { + DetachSession(id); + _ = Client.DeleteSession(id, new DeleteSessionSettings { TransportTimeout = Shared.Session.DeleteSessionTimeout }); } -} \ No newline at end of file +}