Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful session shutdown support #47

Merged
merged 6 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions src/Ydb.Sdk/src/Driver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,19 @@ internal async Task<UnaryResponse<TResponse>> UnaryCall<TRequest, TResponse>(

try
{
var data = await callInvoker.AsyncUnaryCall(
using var call = callInvoker.AsyncUnaryCall(
method: method,
host: null,
options: GetCallOptions(settings, false),
request: request);

var data = await call.ResponseAsync;
var trailers = call.GetTrailers();

return new UnaryResponse<TResponse>(
data: data,
usedEndpoint: endpoint);
usedEndpoint: endpoint,
trailers: trailers);
}
catch (RpcException e)
{
Expand Down Expand Up @@ -312,17 +316,20 @@ private static Status ConvertStatus(Grpc.Core.Status rpcStatus)

internal sealed class UnaryResponse<TResponse>
{
internal UnaryResponse(
TResponse data,
string usedEndpoint)
internal UnaryResponse(TResponse data,
string usedEndpoint,
Grpc.Core.Metadata? trailers)
{
Data = data;
UsedEndpoint = usedEndpoint;
Trailers = trailers;
}

public TResponse Data { get; }

public string UsedEndpoint { get; }

public Grpc.Core.Metadata? Trailers { get; }
}

internal sealed class StreamIterator<TResponse>
Expand Down
3 changes: 3 additions & 0 deletions src/Ydb.Sdk/src/Metadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ internal static class Metadata
public const string RpcRequestTypeHeader = "x-ydb-request-type";
public const string RpcTraceIdHeader = "x-ydb-trace-id";
public const string RpcSdkInfoHeader = "x-ydb-sdk-build-info";
public const string RpcServerHintsHeader = "x-ydb-server-hints";

public const string GracefulShutdownHint = "session-close";
}
3 changes: 1 addition & 2 deletions src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public async Task<ExecuteDataQueryResponse> ExecuteDataQuery(
request: request,
settings: settings);

ExecuteQueryResult? resultProto;
var status = UnpackOperation(response.Data.Operation, out resultProto);
var status = UnpackOperation(response.Data.Operation, out ExecuteQueryResult? resultProto);
OnResponseStatus(status);

var txState = TransactionState.Unknown;
Expand Down
25 changes: 20 additions & 5 deletions src/Ydb.Sdk/src/Services/Table/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,24 @@ private void CheckSession()

private void OnResponseStatus(Status status)
{
if (status.StatusCode == StatusCode.BadSession || status.StatusCode == StatusCode.SessionBusy)
if (status.StatusCode is StatusCode.BadSession or StatusCode.SessionBusy)
{
if (_sessionPool != null)
_sessionPool?.InvalidateSession(Id);
}
}

private void OnResponseTrailers(Grpc.Core.Metadata? trailers)
{
if (trailers is null)
{
return;
}

foreach (var hint in trailers.GetAll(Metadata.RpcServerHintsHeader))
{
if (hint.Value == Metadata.GracefulShutdownHint)
{
_sessionPool.InvalidateSession(Id);
_sessionPool?.InvalidateSession(Id);
}
}
}
Expand Down Expand Up @@ -77,18 +90,20 @@ protected virtual void Dispose(bool disposing)
_disposed = true;
}

internal Task<Driver.UnaryResponse<TResponse>> UnaryCall<TRequest, TResponse>(
internal async Task<Driver.UnaryResponse<TResponse>> UnaryCall<TRequest, TResponse>(
Method<TRequest, TResponse> method,
TRequest request,
RequestSettings settings)
where TRequest : class
where TResponse : class
{
return Driver.UnaryCall(
var response = await Driver.UnaryCall(
method: method,
request: request,
settings: settings,
preferredEndpoint: Endpoint
);
OnResponseTrailers(response.Trailers);
return response;
}
}
82 changes: 82 additions & 0 deletions src/Ydb.Sdk/tests/Table/TestGracefulShutdown.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Xunit;
using Ydb.Sdk.Services.Table;

namespace Ydb.Sdk.Tests.Table;

[Trait("Category", "Integration")]
public class TestGracefulShutdown
{
private readonly ILoggerFactory _loggerFactory;

private readonly DriverConfig _driverConfig = new(
endpoint: "grpc://localhost:2136",
database: "/local"
);

private const string ShutdownUrl = "http://localhost:8765/actors/kqp_proxy?force_shutdown=all";

public TestGracefulShutdown()
{
_loggerFactory = Utils.GetLoggerFactory() ?? NullLoggerFactory.Instance;
}

[Fact]
public async Task Test()
{
await using var driver = await Driver.CreateInitialized(_driverConfig, _loggerFactory);
using var tableClient = new TableClient(driver);


var session1 = "";
await tableClient.SessionExec(
async session =>
{
session1 = session.Id;
return await session.ExecuteDataQuery("SELECT 1", TxControl.BeginSerializableRW().Commit());
}
);

var session2 = "";
await tableClient.SessionExec(
async session =>
{
session2 = session.Id;
return await session.ExecuteDataQuery("SELECT 1", TxControl.BeginSerializableRW().Commit());
}
);

// control check
Assert.NotEqual("", session1);
Assert.Equal(session1, session2);

// SHUTDOWN
using var httpClient = new HttpClient();
await httpClient.GetAsync(ShutdownUrl);

// new session
var session3 = "";
await tableClient.SessionExec(
async session =>
{
session3 = session.Id;
return await session.ExecuteDataQuery("SELECT 1", TxControl.BeginSerializableRW().Commit());
}
);

Assert.Equal(session2, session3);

var session4 = "";
await tableClient.SessionExec(
async session =>
{
session4 = session.Id;
return await session.ExecuteDataQuery("SELECT 1", TxControl.BeginSerializableRW().Commit());
}
);

Assert.NotEqual("", session3);
Assert.NotEqual(session3, session4);
}
}