From 0cf8b5c0b330223d2230cc846c1e3adef6a8c20b Mon Sep 17 00:00:00 2001 From: XmasApple Date: Tue, 10 Oct 2023 21:35:54 +0300 Subject: [PATCH] Graceful session shutdown support --- src/Ydb.Sdk/src/Driver.cs | 18 +++++++++++++----- src/Ydb.Sdk/src/Metadata.cs | 3 +++ .../src/Services/Table/ExecuteDataQuery.cs | 4 ++-- .../src/Services/Table/ExecuteSchemeQuery.cs | 1 + src/Ydb.Sdk/src/Services/Table/Session.cs | 19 ++++++++++++++++--- 5 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/Ydb.Sdk/src/Driver.cs b/src/Ydb.Sdk/src/Driver.cs index f818d30f..cb995750 100644 --- a/src/Ydb.Sdk/src/Driver.cs +++ b/src/Ydb.Sdk/src/Driver.cs @@ -1,5 +1,6 @@ using System.Reflection; using Grpc.Core; +using Grpc.Net.Client; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Ydb.Discovery; @@ -117,15 +118,19 @@ internal async Task> UnaryCall( try { - var data = await callInvoker.AsyncUnaryCall( + using var call = callInvoker.AsyncUnaryCall( method: method, host: null, options: GetCallOptions(settings, false), request: request); + var data = await call.ResponseAsync; + var trailers = call.GetTrailers(); + return new UnaryResponse( data: data, - usedEndpoint: endpoint); + usedEndpoint: endpoint, + trailers: trailers); } catch (RpcException e) { @@ -312,17 +317,20 @@ private static Status ConvertStatus(Grpc.Core.Status rpcStatus) internal sealed class UnaryResponse { - internal UnaryResponse( - TResponse data, - string usedEndpoint) + internal UnaryResponse(TResponse data, + string usedEndpoint, + Grpc.Core.Metadata? trailers) { Data = data; UsedEndpoint = usedEndpoint; + Trailers = trailers; } public TResponse Data { get; } public string UsedEndpoint { get; } + + public Grpc.Core.Metadata? Trailers { get; } } internal sealed class StreamIterator diff --git a/src/Ydb.Sdk/src/Metadata.cs b/src/Ydb.Sdk/src/Metadata.cs index 6b820ea2..ae528f95 100644 --- a/src/Ydb.Sdk/src/Metadata.cs +++ b/src/Ydb.Sdk/src/Metadata.cs @@ -7,4 +7,7 @@ internal static class Metadata public const string RpcRequestTypeHeader = "x-ydb-request-type"; public const string RpcTraceIdHeader = "x-ydb-trace-id"; public const string RpcSdkInfoHeader = "x-ydb-sdk-build-info"; + public const string RpcServerHintsHeader = "x-ydb-server-hints"; + + public const string GracefulShutdownHint = "session-close"; } \ No newline at end of file diff --git a/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs b/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs index 7ecb7f1d..f2362ae2 100644 --- a/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs +++ b/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs @@ -84,9 +84,9 @@ public async Task ExecuteDataQuery( request: request, settings: settings); - ExecuteQueryResult? resultProto; - var status = UnpackOperation(response.Data.Operation, out resultProto); + var status = UnpackOperation(response.Data.Operation, out ExecuteQueryResult? resultProto); OnResponseStatus(status); + OnResponseTrailers(response.Trailers); var txState = TransactionState.Unknown; Transaction? tx = null; diff --git a/src/Ydb.Sdk/src/Services/Table/ExecuteSchemeQuery.cs b/src/Ydb.Sdk/src/Services/Table/ExecuteSchemeQuery.cs index 0bf08691..9f198f12 100644 --- a/src/Ydb.Sdk/src/Services/Table/ExecuteSchemeQuery.cs +++ b/src/Ydb.Sdk/src/Services/Table/ExecuteSchemeQuery.cs @@ -41,6 +41,7 @@ public async Task ExecuteSchemeQuery( var status = UnpackOperation(response.Data.Operation); OnResponseStatus(status); + OnResponseTrailers(response.Trailers); return new ExecuteSchemeQueryResponse(status); } diff --git a/src/Ydb.Sdk/src/Services/Table/Session.cs b/src/Ydb.Sdk/src/Services/Table/Session.cs index 0e12cf92..5cf8d221 100644 --- a/src/Ydb.Sdk/src/Services/Table/Session.cs +++ b/src/Ydb.Sdk/src/Services/Table/Session.cs @@ -35,11 +35,24 @@ private void CheckSession() private void OnResponseStatus(Status status) { - if (status.StatusCode == StatusCode.BadSession || status.StatusCode == StatusCode.SessionBusy) + if (status.StatusCode is StatusCode.BadSession or StatusCode.SessionBusy) { - if (_sessionPool != null) + _sessionPool?.InvalidateSession(Id); + } + } + + private void OnResponseTrailers(Grpc.Core.Metadata? trailers) + { + if (trailers is null) + { + return; + } + + foreach (var hint in trailers.GetAll(Metadata.RpcServerHintsHeader)) + { + if (hint.Value == Metadata.GracefulShutdownHint) { - _sessionPool.InvalidateSession(Id); + _sessionPool?.InvalidateSession(Id); } } }