From b0b0dc22c1199224ea16b0265a0b01bc05dc0d22 Mon Sep 17 00:00:00 2001 From: XmasApple <86735308+XmasApple@users.noreply.github.com> Date: Fri, 17 Nov 2023 13:39:46 +0300 Subject: [PATCH] Graceful session shutdown support (#47) * Graceful session shutdown support --- src/Ydb.Sdk/src/Driver.cs | 17 ++-- src/Ydb.Sdk/src/Metadata.cs | 3 + .../src/Services/Table/ExecuteDataQuery.cs | 3 +- src/Ydb.Sdk/src/Services/Table/Session.cs | 25 ++++-- .../tests/Table/TestGracefulShutdown.cs | 82 +++++++++++++++++++ 5 files changed, 118 insertions(+), 12 deletions(-) create mode 100644 src/Ydb.Sdk/tests/Table/TestGracefulShutdown.cs diff --git a/src/Ydb.Sdk/src/Driver.cs b/src/Ydb.Sdk/src/Driver.cs index ad505d59..a9a04344 100644 --- a/src/Ydb.Sdk/src/Driver.cs +++ b/src/Ydb.Sdk/src/Driver.cs @@ -117,15 +117,19 @@ internal async Task> UnaryCall( try { - var data = await callInvoker.AsyncUnaryCall( + using var call = callInvoker.AsyncUnaryCall( method: method, host: null, options: GetCallOptions(settings, false), request: request); + var data = await call.ResponseAsync; + var trailers = call.GetTrailers(); + return new UnaryResponse( data: data, - usedEndpoint: endpoint); + usedEndpoint: endpoint, + trailers: trailers); } catch (RpcException e) { @@ -312,17 +316,20 @@ private static Status ConvertStatus(Grpc.Core.Status rpcStatus) internal sealed class UnaryResponse { - internal UnaryResponse( - TResponse data, - string usedEndpoint) + internal UnaryResponse(TResponse data, + string usedEndpoint, + Grpc.Core.Metadata? trailers) { Data = data; UsedEndpoint = usedEndpoint; + Trailers = trailers; } public TResponse Data { get; } public string UsedEndpoint { get; } + + public Grpc.Core.Metadata? Trailers { get; } } internal sealed class StreamIterator diff --git a/src/Ydb.Sdk/src/Metadata.cs b/src/Ydb.Sdk/src/Metadata.cs index b49b8600..0b30c802 100644 --- a/src/Ydb.Sdk/src/Metadata.cs +++ b/src/Ydb.Sdk/src/Metadata.cs @@ -7,4 +7,7 @@ internal static class Metadata public const string RpcRequestTypeHeader = "x-ydb-request-type"; public const string RpcTraceIdHeader = "x-ydb-trace-id"; public const string RpcSdkInfoHeader = "x-ydb-sdk-build-info"; + public const string RpcServerHintsHeader = "x-ydb-server-hints"; + + public const string GracefulShutdownHint = "session-close"; } diff --git a/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs b/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs index b0b5f380..a01681a5 100644 --- a/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs +++ b/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs @@ -85,8 +85,7 @@ public async Task ExecuteDataQuery( request: request, settings: settings); - ExecuteQueryResult? resultProto; - var status = UnpackOperation(response.Data.Operation, out resultProto); + var status = UnpackOperation(response.Data.Operation, out ExecuteQueryResult? resultProto); OnResponseStatus(status); var txState = TransactionState.Unknown; diff --git a/src/Ydb.Sdk/src/Services/Table/Session.cs b/src/Ydb.Sdk/src/Services/Table/Session.cs index ffee604c..2e3b1706 100644 --- a/src/Ydb.Sdk/src/Services/Table/Session.cs +++ b/src/Ydb.Sdk/src/Services/Table/Session.cs @@ -35,11 +35,24 @@ private void CheckSession() private void OnResponseStatus(Status status) { - if (status.StatusCode == StatusCode.BadSession || status.StatusCode == StatusCode.SessionBusy) + if (status.StatusCode is StatusCode.BadSession or StatusCode.SessionBusy) { - if (_sessionPool != null) + _sessionPool?.InvalidateSession(Id); + } + } + + private void OnResponseTrailers(Grpc.Core.Metadata? trailers) + { + if (trailers is null) + { + return; + } + + foreach (var hint in trailers.GetAll(Metadata.RpcServerHintsHeader)) + { + if (hint.Value == Metadata.GracefulShutdownHint) { - _sessionPool.InvalidateSession(Id); + _sessionPool?.InvalidateSession(Id); } } } @@ -77,18 +90,20 @@ protected virtual void Dispose(bool disposing) _disposed = true; } - internal Task> UnaryCall( + internal async Task> UnaryCall( Method method, TRequest request, RequestSettings settings) where TRequest : class where TResponse : class { - return Driver.UnaryCall( + var response = await Driver.UnaryCall( method: method, request: request, settings: settings, preferredEndpoint: Endpoint ); + OnResponseTrailers(response.Trailers); + return response; } } diff --git a/src/Ydb.Sdk/tests/Table/TestGracefulShutdown.cs b/src/Ydb.Sdk/tests/Table/TestGracefulShutdown.cs new file mode 100644 index 00000000..ccd9950d --- /dev/null +++ b/src/Ydb.Sdk/tests/Table/TestGracefulShutdown.cs @@ -0,0 +1,82 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Xunit; +using Ydb.Sdk.Services.Table; + +namespace Ydb.Sdk.Tests.Table; + +[Trait("Category", "Integration")] +public class TestGracefulShutdown +{ + private readonly ILoggerFactory _loggerFactory; + + private readonly DriverConfig _driverConfig = new( + endpoint: "grpc://localhost:2136", + database: "/local" + ); + + private const string ShutdownUrl = "http://localhost:8765/actors/kqp_proxy?force_shutdown=all"; + + public TestGracefulShutdown() + { + _loggerFactory = Utils.GetLoggerFactory() ?? NullLoggerFactory.Instance; + } + + [Fact] + public async Task Test() + { + await using var driver = await Driver.CreateInitialized(_driverConfig, _loggerFactory); + using var tableClient = new TableClient(driver); + + + var session1 = ""; + await tableClient.SessionExec( + async session => + { + session1 = session.Id; + return await session.ExecuteDataQuery("SELECT 1", TxControl.BeginSerializableRW().Commit()); + } + ); + + var session2 = ""; + await tableClient.SessionExec( + async session => + { + session2 = session.Id; + return await session.ExecuteDataQuery("SELECT 1", TxControl.BeginSerializableRW().Commit()); + } + ); + + // control check + Assert.NotEqual("", session1); + Assert.Equal(session1, session2); + + // SHUTDOWN + using var httpClient = new HttpClient(); + await httpClient.GetAsync(ShutdownUrl); + + // new session + var session3 = ""; + await tableClient.SessionExec( + async session => + { + session3 = session.Id; + return await session.ExecuteDataQuery("SELECT 1", TxControl.BeginSerializableRW().Commit()); + } + ); + + Assert.Equal(session2, session3); + + var session4 = ""; + await tableClient.SessionExec( + async session => + { + session4 = session.Id; + return await session.ExecuteDataQuery("SELECT 1", TxControl.BeginSerializableRW().Commit()); + } + ); + + Assert.NotEqual("", session3); + Assert.NotEqual(session3, session4); + } +}