Skip to content

Commit

Permalink
Graceful session shutdown support (#47)
Browse files Browse the repository at this point in the history
* Graceful session shutdown support
  • Loading branch information
XmasApple committed Nov 17, 2023
1 parent 19db82d commit b0b0dc2
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 12 deletions.
17 changes: 12 additions & 5 deletions src/Ydb.Sdk/src/Driver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,19 @@ internal async Task<UnaryResponse<TResponse>> UnaryCall<TRequest, TResponse>(

try
{
var data = await callInvoker.AsyncUnaryCall(
using var call = callInvoker.AsyncUnaryCall(
method: method,
host: null,
options: GetCallOptions(settings, false),
request: request);

var data = await call.ResponseAsync;
var trailers = call.GetTrailers();

return new UnaryResponse<TResponse>(
data: data,
usedEndpoint: endpoint);
usedEndpoint: endpoint,
trailers: trailers);
}
catch (RpcException e)
{
Expand Down Expand Up @@ -312,17 +316,20 @@ private static Status ConvertStatus(Grpc.Core.Status rpcStatus)

internal sealed class UnaryResponse<TResponse>
{
internal UnaryResponse(
TResponse data,
string usedEndpoint)
internal UnaryResponse(TResponse data,
string usedEndpoint,
Grpc.Core.Metadata? trailers)
{
Data = data;
UsedEndpoint = usedEndpoint;
Trailers = trailers;
}

public TResponse Data { get; }

public string UsedEndpoint { get; }

public Grpc.Core.Metadata? Trailers { get; }
}

internal sealed class StreamIterator<TResponse>
Expand Down
3 changes: 3 additions & 0 deletions src/Ydb.Sdk/src/Metadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ internal static class Metadata
public const string RpcRequestTypeHeader = "x-ydb-request-type";
public const string RpcTraceIdHeader = "x-ydb-trace-id";
public const string RpcSdkInfoHeader = "x-ydb-sdk-build-info";
public const string RpcServerHintsHeader = "x-ydb-server-hints";

public const string GracefulShutdownHint = "session-close";
}
3 changes: 1 addition & 2 deletions src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ public async Task<ExecuteDataQueryResponse> ExecuteDataQuery(
request: request,
settings: settings);

ExecuteQueryResult? resultProto;
var status = UnpackOperation(response.Data.Operation, out resultProto);
var status = UnpackOperation(response.Data.Operation, out ExecuteQueryResult? resultProto);
OnResponseStatus(status);

var txState = TransactionState.Unknown;
Expand Down
25 changes: 20 additions & 5 deletions src/Ydb.Sdk/src/Services/Table/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,24 @@ private void CheckSession()

private void OnResponseStatus(Status status)
{
if (status.StatusCode == StatusCode.BadSession || status.StatusCode == StatusCode.SessionBusy)
if (status.StatusCode is StatusCode.BadSession or StatusCode.SessionBusy)
{
if (_sessionPool != null)
_sessionPool?.InvalidateSession(Id);
}
}

private void OnResponseTrailers(Grpc.Core.Metadata? trailers)
{
if (trailers is null)
{
return;
}

foreach (var hint in trailers.GetAll(Metadata.RpcServerHintsHeader))
{
if (hint.Value == Metadata.GracefulShutdownHint)
{
_sessionPool.InvalidateSession(Id);
_sessionPool?.InvalidateSession(Id);
}
}
}
Expand Down Expand Up @@ -77,18 +90,20 @@ protected virtual void Dispose(bool disposing)
_disposed = true;
}

internal Task<Driver.UnaryResponse<TResponse>> UnaryCall<TRequest, TResponse>(
internal async Task<Driver.UnaryResponse<TResponse>> UnaryCall<TRequest, TResponse>(
Method<TRequest, TResponse> method,
TRequest request,
RequestSettings settings)
where TRequest : class
where TResponse : class
{
return Driver.UnaryCall(
var response = await Driver.UnaryCall(
method: method,
request: request,
settings: settings,
preferredEndpoint: Endpoint
);
OnResponseTrailers(response.Trailers);
return response;
}
}
82 changes: 82 additions & 0 deletions src/Ydb.Sdk/tests/Table/TestGracefulShutdown.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Xunit;
using Ydb.Sdk.Services.Table;

namespace Ydb.Sdk.Tests.Table;

[Trait("Category", "Integration")]
public class TestGracefulShutdown
{
private readonly ILoggerFactory _loggerFactory;

private readonly DriverConfig _driverConfig = new(
endpoint: "grpc://localhost:2136",
database: "/local"
);

private const string ShutdownUrl = "http://localhost:8765/actors/kqp_proxy?force_shutdown=all";

public TestGracefulShutdown()
{
_loggerFactory = Utils.GetLoggerFactory() ?? NullLoggerFactory.Instance;
}

[Fact]
public async Task Test()
{
await using var driver = await Driver.CreateInitialized(_driverConfig, _loggerFactory);
using var tableClient = new TableClient(driver);


var session1 = "";
await tableClient.SessionExec(
async session =>
{
session1 = session.Id;
return await session.ExecuteDataQuery("SELECT 1", TxControl.BeginSerializableRW().Commit());
}
);

var session2 = "";
await tableClient.SessionExec(
async session =>
{
session2 = session.Id;
return await session.ExecuteDataQuery("SELECT 1", TxControl.BeginSerializableRW().Commit());
}
);

// control check
Assert.NotEqual("", session1);
Assert.Equal(session1, session2);

// SHUTDOWN
using var httpClient = new HttpClient();
await httpClient.GetAsync(ShutdownUrl);

// new session
var session3 = "";
await tableClient.SessionExec(
async session =>
{
session3 = session.Id;
return await session.ExecuteDataQuery("SELECT 1", TxControl.BeginSerializableRW().Commit());
}
);

Assert.Equal(session2, session3);

var session4 = "";
await tableClient.SessionExec(
async session =>
{
session4 = session.Id;
return await session.ExecuteDataQuery("SELECT 1", TxControl.BeginSerializableRW().Commit());
}
);

Assert.NotEqual("", session3);
Assert.NotEqual(session3, session4);
}
}

0 comments on commit b0b0dc2

Please sign in to comment.