Skip to content

Commit

Permalink
better timeouts for AttachSession
Browse files Browse the repository at this point in the history
  • Loading branch information
XmasApple committed Nov 2, 2023
1 parent f2e47f7 commit 9b53dff
Showing 1 changed file with 42 additions and 15 deletions.
57 changes: 42 additions & 15 deletions src/Ydb.Sdk/src/Services/Query/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ private protected override async Task<GetSessionResponse> CreateSession()
nodeId: createSessionResponse.Result.Session.NodeId,
endpoint: createSessionResponse.Result.Session.Endpoint);

Sessions.Add(session.Id, new SessionState(session));

_ = Task.Run(() => AttachAndMonitor(session.Id));

Sessions.Add(session.Id, new SessionState(session));

Logger.LogTrace($"Session {session.Id} created, " +
$"endpoint: {session.Endpoint}, " +
Expand All @@ -50,30 +50,57 @@ private protected override async Task<GetSessionResponse> CreateSession()
}
}

private async Task AttachAndMonitor(string id)
private async Task AttachAndMonitor(string sessionId)
{
var stream = Client.AttachSession(id);
var stream = Client.AttachSession(sessionId);

var cts = new CancellationTokenSource();
cts.CancelAfter(Config.CreateSessionTimeout);

var firstPartTask = Task.Run(async () =>
{
if (await stream.Next())
{
return stream.Response;
}
return null;
}, cts.Token);

var firstPart = await firstPartTask;

if (firstPartTask.IsCanceled || firstPart is null)
{
InvalidateSession(sessionId);
return;
}

CheckPart(firstPart, sessionId);

while (await stream.Next())
{
var part = stream.Response;
CheckPart(part, sessionId);
}
}

if (part.Status.IsSuccess)
{
Logger.LogTrace($"Successful stream response for session: {id}");
private void CheckPart(Query.SessionState part, string sessionId)
{
if (part.Status.IsSuccess)
{
Logger.LogTrace($"Successful stream response for session: {sessionId}");

lock (Lock)
lock (Lock)
{
if (Sessions.TryGetValue(sessionId, out var sessionState))
{
if (Sessions.TryGetValue(id, out var sessionState))
{
sessionState.LastAccessTime = DateTime.Now;
}
sessionState.LastAccessTime = DateTime.Now;
}
}
else
{
InvalidateSession(id);
}
}
else
{
InvalidateSession(sessionId);
}
}

Expand Down

0 comments on commit 9b53dff

Please sign in to comment.