Skip to content

Commit

Permalink
Graceful session shutdown support
Browse files Browse the repository at this point in the history
  • Loading branch information
XmasApple committed Oct 10, 2023
1 parent b7d2cd6 commit 0cf8b5c
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 10 deletions.
18 changes: 13 additions & 5 deletions src/Ydb.Sdk/src/Driver.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Reflection;
using Grpc.Core;
using Grpc.Net.Client;

Check warning on line 3 in src/Ydb.Sdk/src/Driver.cs

View workflow job for this annotation

GitHub Actions / Inspection

"[RedundantUsingDirective] Using directive is not required by the code and can be safely removed" on /home/runner/work/ydb-dotnet-sdk/ydb-dotnet-sdk/src/Ydb.Sdk/src/Driver.cs(3,42)
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Ydb.Discovery;
Expand Down Expand Up @@ -117,15 +118,19 @@ internal async Task<UnaryResponse<TResponse>> UnaryCall<TRequest, TResponse>(

try
{
var data = await callInvoker.AsyncUnaryCall(
using var call = callInvoker.AsyncUnaryCall(
method: method,
host: null,
options: GetCallOptions(settings, false),
request: request);

var data = await call.ResponseAsync;
var trailers = call.GetTrailers();

return new UnaryResponse<TResponse>(
data: data,
usedEndpoint: endpoint);
usedEndpoint: endpoint,
trailers: trailers);
}
catch (RpcException e)
{
Expand Down Expand Up @@ -312,17 +317,20 @@ private static Status ConvertStatus(Grpc.Core.Status rpcStatus)

internal sealed class UnaryResponse<TResponse>
{
internal UnaryResponse(
TResponse data,
string usedEndpoint)
internal UnaryResponse(TResponse data,
string usedEndpoint,
Grpc.Core.Metadata? trailers)
{
Data = data;
UsedEndpoint = usedEndpoint;
Trailers = trailers;
}

public TResponse Data { get; }

public string UsedEndpoint { get; }

public Grpc.Core.Metadata? Trailers { get; }
}

internal sealed class StreamIterator<TResponse>
Expand Down
3 changes: 3 additions & 0 deletions src/Ydb.Sdk/src/Metadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ internal static class Metadata
public const string RpcRequestTypeHeader = "x-ydb-request-type";
public const string RpcTraceIdHeader = "x-ydb-trace-id";
public const string RpcSdkInfoHeader = "x-ydb-sdk-build-info";
public const string RpcServerHintsHeader = "x-ydb-server-hints";

public const string GracefulShutdownHint = "session-close";
}
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ public async Task<ExecuteDataQueryResponse> ExecuteDataQuery(
request: request,
settings: settings);

ExecuteQueryResult? resultProto;
var status = UnpackOperation(response.Data.Operation, out resultProto);
var status = UnpackOperation(response.Data.Operation, out ExecuteQueryResult? resultProto);
OnResponseStatus(status);
OnResponseTrailers(response.Trailers);

var txState = TransactionState.Unknown;
Transaction? tx = null;
Expand Down
1 change: 1 addition & 0 deletions src/Ydb.Sdk/src/Services/Table/ExecuteSchemeQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public async Task<ExecuteSchemeQueryResponse> ExecuteSchemeQuery(

var status = UnpackOperation(response.Data.Operation);
OnResponseStatus(status);
OnResponseTrailers(response.Trailers);

return new ExecuteSchemeQueryResponse(status);
}
Expand Down
19 changes: 16 additions & 3 deletions src/Ydb.Sdk/src/Services/Table/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,24 @@ private void CheckSession()

private void OnResponseStatus(Status status)
{
if (status.StatusCode == StatusCode.BadSession || status.StatusCode == StatusCode.SessionBusy)
if (status.StatusCode is StatusCode.BadSession or StatusCode.SessionBusy)
{
if (_sessionPool != null)
_sessionPool?.InvalidateSession(Id);
}
}

private void OnResponseTrailers(Grpc.Core.Metadata? trailers)
{
if (trailers is null)
{
return;
}

foreach (var hint in trailers.GetAll(Metadata.RpcServerHintsHeader))
{
if (hint.Value == Metadata.GracefulShutdownHint)
{
_sessionPool.InvalidateSession(Id);
_sessionPool?.InvalidateSession(Id);
}
}
}
Expand Down

0 comments on commit 0cf8b5c

Please sign in to comment.