Skip to content

Commit

Permalink
stop rdma endpoints; issue-1020
Browse files Browse the repository at this point in the history
  • Loading branch information
tpashkin authored Jul 17, 2024
1 parent 16c590c commit 8271163
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 54 deletions.
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/rdma/iface/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ struct IClientEndpoint
virtual void SendRequest(
TClientRequestPtr req,
TCallContextPtr callContext) = 0;

virtual NThreading::TFuture<void> Stop() = 0;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down
135 changes: 88 additions & 47 deletions cloud/blockstore/libs/rdma/impl/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,13 @@ class TClientEndpoint final

std::atomic<EEndpointState> State = EEndpointState::Disconnected;
std::atomic<ui32> Status = S_OK;
std::atomic_flag StopFlag = ATOMIC_FLAG_INIT;

NVerbs::TCompletionChannelPtr CompletionChannel = NVerbs::NullPtr;
NVerbs::TCompletionQueuePtr CompletionQueue = NVerbs::NullPtr;

TPromise<IClientEndpointPtr> StartResult = NewPromise<IClientEndpointPtr>();
TPromise<void> StopResult = NewPromise<void>();

ui64 FlushStartCycles = 0;

Expand Down Expand Up @@ -455,14 +457,15 @@ class TClientEndpoint final
void ChangeState(EEndpointState expectedState, EEndpointState newState);
void ChangeState(EEndpointState newState) noexcept;
void SetError() noexcept;
void FlushQueues() noexcept;

// called from CM thread
void CreateQP();
void DestroyQP() noexcept;
void StartReceive();
void ResetConnection(NVerbs::TConnectionPtr connection) noexcept;
TString PeerAddress() const;
void SetConnection(NVerbs::TConnectionPtr connection) noexcept;
int ReconnectTimerHandle() const;
bool ShouldStop() const;

// called from external thread
TResultOrError<TClientRequestPtr> AllocateRequest(
Expand All @@ -473,6 +476,7 @@ class TClientEndpoint final
void SendRequest(
TClientRequestPtr creq,
TCallContextPtr callContext) noexcept override;
TFuture<void> Stop() noexcept override;

// called from CQ thread
void HandleCompletionEvent(ibv_wc* wc) override;
Expand All @@ -482,6 +486,9 @@ class TClientEndpoint final
bool Flushed() const;
bool FlushHanging() const;

// called from CM and external thread
TString PeerAddress() const;

private:
// called from CQ thread
void HandleQueuedRequests();
Expand Down Expand Up @@ -535,23 +542,14 @@ TClientEndpoint::TClientEndpoint(
return TStringBuilder() << "[" << Id << "] " << msg;
});

RDMA_INFO("new session [send_magic=" << Hex(SendMagic, 0)
<< " recv_magic=" << Hex(RecvMagic, 0) << "]");
RDMA_INFO("new endpoint [send_magic=" << Hex(SendMagic, 0)
<< " recv_magic=" << Hex(RecvMagic, 0) << "] to " << Host);
}

TClientEndpoint::~TClientEndpoint()
{
RDMA_INFO("close session");

if (SendBuffers.Initialized()) {
SendBuffers.ReleaseBuffer(SendBuffer);
}

if (RecvBuffers.Initialized()) {
RecvBuffers.ReleaseBuffer(RecvBuffer);
}

// TODO detach pollers
RDMA_INFO("close endpoint [send_magic=" << Hex(SendMagic, 0)
<< " recv_magic=" << Hex(RecvMagic, 0) << "] to " << Host);
}

bool TClientEndpoint::CheckState(EEndpointState expectedState) const
Expand All @@ -571,16 +569,15 @@ void TClientEndpoint::ChangeState(
GetEndpointStateName(expectedState),
GetEndpointStateName(actualState));

// FIXME change back to DEBUG when NBS-4644 is resolved
RDMA_INFO(GetEndpointStateName(expectedState)
RDMA_DEBUG(GetEndpointStateName(expectedState)
<< " -> " << GetEndpointStateName(newState));
}

void TClientEndpoint::ChangeState(EEndpointState newState) noexcept
{
auto currentState = State.exchange(newState);

RDMA_INFO(GetEndpointStateName(currentState)
RDMA_DEBUG(GetEndpointStateName(currentState)
<< " -> " << GetEndpointStateName(newState));
}

Expand Down Expand Up @@ -1067,8 +1064,21 @@ void TClientEndpoint::RecvResponseCompleted(
responseBytes);
}

void TClientEndpoint::ResetConnection(
NVerbs::TConnectionPtr connection) noexcept
TFuture<void> TClientEndpoint::Stop() noexcept
{
if (!StopFlag.test_and_set()) {
RDMA_DEBUG("stop endpoint");
SetError();
}
return StopResult.GetFuture();
}

bool TClientEndpoint::ShouldStop() const
{
return StopFlag.test();
}

void TClientEndpoint::SetConnection(NVerbs::TConnectionPtr connection) noexcept
{
connection->context = this;
Connection = std::move(connection);
Expand All @@ -1084,6 +1094,20 @@ int TClientEndpoint::ReconnectTimerHandle() const
return Reconnect.Timer.Handle();
}

void TClientEndpoint::FlushQueues() noexcept
{
STORAGE_INFO("flush queues");

try {
ibv_qp_attr attr = {.qp_state = IBV_QPS_ERR};
Verbs->ModifyQP(Connection->qp, &attr, IBV_QP_STATE);
FlushStartCycles = GetCycleCount();

} catch (const TServiceError& e) {
RDMA_ERROR("flush error: " << e.what());
}
}

void TClientEndpoint::SetError() noexcept
{
Status = E_RDMA_UNAVAILABLE;
Expand All @@ -1100,20 +1124,13 @@ void TClientEndpoint::SetError() noexcept
case EEndpointState::ResolvingRoute:
break;

// flush queues and schedule reconnect
// flush queues, signal the poller and schedule reconnect
case EEndpointState::Connected:
ChangeState(
EEndpointState::Connected,
EEndpointState::Disconnecting);

try {
struct ibv_qp_attr attr = {.qp_state = IBV_QPS_ERR};
Verbs->ModifyQP(Connection->qp, &attr, IBV_QP_STATE);
FlushStartCycles = GetCycleCount();

} catch (const TServiceError& e) {
RDMA_ERROR("flush error: " << e.what());
}
FlushQueues();

if (WaitMode == EWaitMode::Poll) {
DisconnectEvent.Set();
Expand Down Expand Up @@ -1342,12 +1359,20 @@ class TCompletionPoller final
Join();
}

void Add(TClientEndpointPtr endpoint)
void Acquire(TClientEndpointPtr endpoint)
{
endpoint->Poller = this;
Endpoints.Add(std::move(endpoint));
}

void Release(TClientEndpoint* endpoint)
{
endpoint->Poller = nullptr;
Endpoints.Delete([=](auto x) {
return endpoint == x.get();
});
}

void Attach(TClientEndpoint* endpoint)
{
if (Config->WaitMode == EWaitMode::Poll) {
Expand Down Expand Up @@ -1603,6 +1628,7 @@ class TClient final
TClientEndpoint* endpoint,
NVerbs::TConnectionEventPtr event) noexcept;
TCompletionPoller& PickPoller() noexcept;
void StopEndpoint(TClientEndpoint* endpoint) noexcept;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1695,7 +1721,7 @@ TFuture<IClientEndpointPtr> TClient::StartEndpoint(
auto future = endpoint->StartResult.GetFuture();

ConnectionPoller->Attach(endpoint.get());
PickPoller().Add(endpoint);
PickPoller().Acquire(endpoint);
BeginResolveAddress(endpoint.get());
return future;

Expand Down Expand Up @@ -1762,15 +1788,38 @@ void TClient::HandleConnectionEvent(NVerbs::TConnectionEventPtr event) noexcept
}
}

void TClient::StopEndpoint(TClientEndpoint* endpoint) noexcept
{
ConnectionPoller->Detach(endpoint);
endpoint->Poller->Detach(endpoint);
endpoint->DestroyQP();
endpoint->Connection.reset();
endpoint->StopResult.SetValue();
endpoint->Poller->Release(endpoint);
}

// implements IConnectionEventHandler
void TClient::Reconnect(TClientEndpoint* endpoint) noexcept
{
if (endpoint->ShouldStop()) {
if (endpoint->CheckState(EEndpointState::Disconnecting)) {
// wait for completion poller to flush WRs
endpoint->Reconnect.Schedule();
} else {
// detach pollers and close connection
StopEndpoint(endpoint);
}
return;
}

if (endpoint->Reconnect.Hanging()) {
// if this is our first connection, fail over to IC
if (endpoint->StartResult.Initialized()) {
auto startResult = std::move(endpoint->StartResult);
startResult.SetException(std::make_exception_ptr(TServiceError(
MakeError(endpoint->Status, "connection timeout"))));

StopEndpoint(endpoint);
return;
}
// otherwise keep trying
Expand All @@ -1790,25 +1839,16 @@ void TClient::Reconnect(TClientEndpoint* endpoint) noexcept
case EEndpointState::ResolvingRoute:
break;

// reset connection and try again
// create new connection and try again
case EEndpointState::Connecting:
case EEndpointState::Disconnected:
try {
auto connection = ConnectionPoller->CreateConnection();
endpoint->Poller->Detach(endpoint);
endpoint->DestroyQP();
endpoint->ResetConnection(std::move(connection));

} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->Reconnect.Schedule();
return;
}
endpoint->Poller->Detach(endpoint);
endpoint->DestroyQP();
endpoint->SetConnection(ConnectionPoller->CreateConnection());
break;

// shouldn't happen
// reconnect timer hit at the same time connection was established
case EEndpointState::Connected:
RDMA_WARN(endpoint->Log, "attempting to reconnect in Connected state");
return;
}

Expand Down Expand Up @@ -1851,7 +1891,7 @@ void TClient::BeginResolveAddress(TClientEndpoint* endpoint) noexcept

void TClient::BeginResolveRoute(TClientEndpoint* endpoint) noexcept
{
RDMA_INFO(endpoint->Log, "resolve route to "
RDMA_INFO(endpoint->Log, "resolve route to " << endpoint->Host << " "
<< endpoint->PeerAddress());

endpoint->ChangeState(
Expand Down Expand Up @@ -2049,7 +2089,8 @@ void TClient::DumpHtml(IOutputStream& out) const
for (size_t i = 0; i < CompletionPollers.size(); ++i) {
auto& poller = CompletionPollers[i];
auto endpoints = poller->GetEndpoints();
for (auto& ep : *endpoints) {

for (auto& ep: *endpoints) {
TABLER() {
TABLED() { out << i; }
TABLED() { out << ep->Host; }
Expand Down
35 changes: 35 additions & 0 deletions cloud/blockstore/libs/rdma/impl/client_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,41 @@ Y_UNIT_TEST_SUITE(TRdmaClientTest)
Y_UNUSED(clientEndpoint);
}

Y_UNIT_TEST(ShouldDetachFromPoller)
{
auto testContext = MakeIntrusive<NVerbs::TTestContext>();
testContext->AllowConnect = true;

auto verbs = NVerbs::CreateTestVerbs(testContext);
auto monitoring = CreateMonitoringServiceStub();
auto clientConfig = std::make_shared<TClientConfig>();

auto logging = CreateLoggingService(
"console",
TLogSettings{TLOG_RESOURCES});

auto client = CreateClient(
verbs,
logging,
monitoring,
clientConfig);

client->Start();
Y_DEFER {
client->Stop();
};

auto shared = client->StartEndpoint("::", 10020).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(2, shared.use_count());

shared->Stop().Wait();
auto deadline = GetCycleCount() + DurationToCycles(TDuration::Seconds(10));
while (deadline > GetCycleCount() && shared.use_count() > 1) {
SpinLockPause();
}
UNIT_ASSERT_VALUES_EQUAL(1, shared.use_count());
}

Y_UNIT_TEST(ShouldReturnErrorUponStartEndpointTimeout)
{
auto verbs =
Expand Down
10 changes: 5 additions & 5 deletions cloud/blockstore/libs/rdma/impl/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ class TServerSession final

// called from external thread
void EnqueueRequest(TRequestPtr req) noexcept;
TString GetAddress() const;
TString PeerAddress() const;

// called from CQ thread
void HandleCompletionEvent(ibv_wc* wc) override;
Expand Down Expand Up @@ -456,7 +456,7 @@ TServerSession::~TServerSession()
STORAGE_INFO("close session [send_magic=%X recv_magic=%X] to %s",
SendMagic,
RecvMagic,
GetAddress().c_str());
PeerAddress().c_str());

Verbs->DestroyQP(Connection.get());

Expand Down Expand Up @@ -502,7 +502,7 @@ void TServerSession::EnqueueRequest(TRequestPtr req) noexcept
}
}

TString TServerSession::GetAddress() const
TString TServerSession::PeerAddress() const
{
return NVerbs::PrintAddress(rdma_get_peer_addr(Connection.get()));
}
Expand Down Expand Up @@ -1372,7 +1372,7 @@ class TCompletionPoller final

for (const auto& session: *sessions) {
if (session->IsFlushed()) {
session->CompletionPoller->Release(session.get());
Release(session.get());
}
}
}
Expand Down Expand Up @@ -1661,7 +1661,7 @@ void TServer::DumpHtml(IOutputStream& out) const

for (auto& session: *sessions) {
TABLER() {
TABLED() { out << session->GetAddress(); }
TABLED() { out << session->PeerAddress(); }
TABLED()
{
Printf(
Expand Down
Loading

0 comments on commit 8271163

Please sign in to comment.