diff --git a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs index dab11980..a582f653 100644 --- a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs +++ b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs @@ -34,10 +34,10 @@ private protected override async Task CreateSession() nodeId: createSessionResponse.Result.Session.NodeId, endpoint: createSessionResponse.Result.Session.Endpoint); + Sessions.Add(session.Id, new SessionState(session)); _ = Task.Run(() => AttachAndMonitor(session.Id)); - Sessions.Add(session.Id, new SessionState(session)); Logger.LogTrace($"Session {session.Id} created, " + $"endpoint: {session.Endpoint}, " + @@ -50,30 +50,57 @@ private protected override async Task CreateSession() } } - private async Task AttachAndMonitor(string id) + private async Task AttachAndMonitor(string sessionId) { - var stream = Client.AttachSession(id); + var stream = Client.AttachSession(sessionId); + + var cts = new CancellationTokenSource(); + cts.CancelAfter(Config.CreateSessionTimeout); + + var firstPartTask = Task.Run(async () => + { + if (await stream.Next()) + { + return stream.Response; + } + + return null; + }, cts.Token); + + var firstPart = await firstPartTask; + + if (firstPartTask.IsCanceled || firstPart is null) + { + InvalidateSession(sessionId); + return; + } + + CheckPart(firstPart, sessionId); while (await stream.Next()) { var part = stream.Response; + CheckPart(part, sessionId); + } + } - if (part.Status.IsSuccess) - { - Logger.LogTrace($"Successful stream response for session: {id}"); + private void CheckPart(Query.SessionState part, string sessionId) + { + if (part.Status.IsSuccess) + { + Logger.LogTrace($"Successful stream response for session: {sessionId}"); - lock (Lock) + lock (Lock) + { + if (Sessions.TryGetValue(sessionId, out var sessionState)) { - if (Sessions.TryGetValue(id, out var sessionState)) - { - sessionState.LastAccessTime = DateTime.Now; - } + sessionState.LastAccessTime = DateTime.Now; } } - else - { - InvalidateSession(id); - } + } + else + { + InvalidateSession(sessionId); } }