Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stop rdma endpoints; issue-1020 #1628

Merged
merged 1 commit into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/rdma/iface/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ struct IClientEndpoint
virtual void SendRequest(
TClientRequestPtr req,
TCallContextPtr callContext) = 0;

virtual NThreading::TFuture<void> Stop() = 0;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down
135 changes: 88 additions & 47 deletions cloud/blockstore/libs/rdma/impl/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,13 @@ class TClientEndpoint final

std::atomic<EEndpointState> State = EEndpointState::Disconnected;
std::atomic<ui32> Status = S_OK;
std::atomic_flag StopFlag = ATOMIC_FLAG_INIT;

NVerbs::TCompletionChannelPtr CompletionChannel = NVerbs::NullPtr;
NVerbs::TCompletionQueuePtr CompletionQueue = NVerbs::NullPtr;

TPromise<IClientEndpointPtr> StartResult = NewPromise<IClientEndpointPtr>();
TPromise<void> StopResult = NewPromise<void>();

ui64 FlushStartCycles = 0;

Expand Down Expand Up @@ -455,14 +457,15 @@ class TClientEndpoint final
void ChangeState(EEndpointState expectedState, EEndpointState newState);
void ChangeState(EEndpointState newState) noexcept;
void SetError() noexcept;
void FlushQueues() noexcept;

// called from CM thread
void CreateQP();
void DestroyQP() noexcept;
void StartReceive();
void ResetConnection(NVerbs::TConnectionPtr connection) noexcept;
TString PeerAddress() const;
void SetConnection(NVerbs::TConnectionPtr connection) noexcept;
int ReconnectTimerHandle() const;
bool ShouldStop() const;

// called from external thread
TResultOrError<TClientRequestPtr> AllocateRequest(
Expand All @@ -473,6 +476,7 @@ class TClientEndpoint final
void SendRequest(
TClientRequestPtr creq,
TCallContextPtr callContext) noexcept override;
TFuture<void> Stop() noexcept override;

// called from CQ thread
void HandleCompletionEvent(ibv_wc* wc) override;
Expand All @@ -482,6 +486,9 @@ class TClientEndpoint final
bool Flushed() const;
bool FlushHanging() const;

// called from CM and external thread
TString PeerAddress() const;

private:
// called from CQ thread
void HandleQueuedRequests();
Expand Down Expand Up @@ -535,23 +542,14 @@ TClientEndpoint::TClientEndpoint(
return TStringBuilder() << "[" << Id << "] " << msg;
});

RDMA_INFO("new session [send_magic=" << Hex(SendMagic, 0)
<< " recv_magic=" << Hex(RecvMagic, 0) << "]");
RDMA_INFO("new endpoint [send_magic=" << Hex(SendMagic, 0)
<< " recv_magic=" << Hex(RecvMagic, 0) << "] to " << Host);
}

TClientEndpoint::~TClientEndpoint()
{
RDMA_INFO("close session");

if (SendBuffers.Initialized()) {
SendBuffers.ReleaseBuffer(SendBuffer);
}

if (RecvBuffers.Initialized()) {
RecvBuffers.ReleaseBuffer(RecvBuffer);
}

// TODO detach pollers
RDMA_INFO("close endpoint [send_magic=" << Hex(SendMagic, 0)
<< " recv_magic=" << Hex(RecvMagic, 0) << "] to " << Host);
}

bool TClientEndpoint::CheckState(EEndpointState expectedState) const
Expand All @@ -571,16 +569,15 @@ void TClientEndpoint::ChangeState(
GetEndpointStateName(expectedState),
GetEndpointStateName(actualState));

// FIXME change back to DEBUG when NBS-4644 is resolved
RDMA_INFO(GetEndpointStateName(expectedState)
RDMA_DEBUG(GetEndpointStateName(expectedState)
<< " -> " << GetEndpointStateName(newState));
}

void TClientEndpoint::ChangeState(EEndpointState newState) noexcept
{
auto currentState = State.exchange(newState);

RDMA_INFO(GetEndpointStateName(currentState)
RDMA_DEBUG(GetEndpointStateName(currentState)
<< " -> " << GetEndpointStateName(newState));
}

Expand Down Expand Up @@ -1067,8 +1064,21 @@ void TClientEndpoint::RecvResponseCompleted(
responseBytes);
}

void TClientEndpoint::ResetConnection(
NVerbs::TConnectionPtr connection) noexcept
TFuture<void> TClientEndpoint::Stop() noexcept
{
if (!StopFlag.test_and_set()) {
RDMA_DEBUG("stop endpoint");
SetError();
}
return StopResult.GetFuture();
}

bool TClientEndpoint::ShouldStop() const
{
return StopFlag.test();
}

void TClientEndpoint::SetConnection(NVerbs::TConnectionPtr connection) noexcept
{
connection->context = this;
Connection = std::move(connection);
Expand All @@ -1084,6 +1094,20 @@ int TClientEndpoint::ReconnectTimerHandle() const
return Reconnect.Timer.Handle();
}

void TClientEndpoint::FlushQueues() noexcept
{
STORAGE_INFO("flush queues");

try {
ibv_qp_attr attr = {.qp_state = IBV_QPS_ERR};
Verbs->ModifyQP(Connection->qp, &attr, IBV_QP_STATE);
FlushStartCycles = GetCycleCount();

} catch (const TServiceError& e) {
RDMA_ERROR("flush error: " << e.what());
}
}

void TClientEndpoint::SetError() noexcept
{
Status = E_RDMA_UNAVAILABLE;
Expand All @@ -1100,20 +1124,13 @@ void TClientEndpoint::SetError() noexcept
case EEndpointState::ResolvingRoute:
break;

// flush queues and schedule reconnect
// flush queues, signal the poller and schedule reconnect
case EEndpointState::Connected:
ChangeState(
EEndpointState::Connected,
EEndpointState::Disconnecting);

try {
struct ibv_qp_attr attr = {.qp_state = IBV_QPS_ERR};
Verbs->ModifyQP(Connection->qp, &attr, IBV_QP_STATE);
FlushStartCycles = GetCycleCount();

} catch (const TServiceError& e) {
RDMA_ERROR("flush error: " << e.what());
}
FlushQueues();

if (WaitMode == EWaitMode::Poll) {
DisconnectEvent.Set();
Expand Down Expand Up @@ -1342,12 +1359,20 @@ class TCompletionPoller final
Join();
}

void Add(TClientEndpointPtr endpoint)
void Acquire(TClientEndpointPtr endpoint)
{
endpoint->Poller = this;
Endpoints.Add(std::move(endpoint));
}

void Release(TClientEndpoint* endpoint)
{
endpoint->Poller = nullptr;
Endpoints.Delete([=](auto x) {
return endpoint == x.get();
});
}

void Attach(TClientEndpoint* endpoint)
{
if (Config->WaitMode == EWaitMode::Poll) {
Expand Down Expand Up @@ -1603,6 +1628,7 @@ class TClient final
TClientEndpoint* endpoint,
NVerbs::TConnectionEventPtr event) noexcept;
TCompletionPoller& PickPoller() noexcept;
void StopEndpoint(TClientEndpoint* endpoint) noexcept;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1695,7 +1721,7 @@ TFuture<IClientEndpointPtr> TClient::StartEndpoint(
auto future = endpoint->StartResult.GetFuture();

ConnectionPoller->Attach(endpoint.get());
PickPoller().Add(endpoint);
PickPoller().Acquire(endpoint);
BeginResolveAddress(endpoint.get());
return future;

Expand Down Expand Up @@ -1762,15 +1788,38 @@ void TClient::HandleConnectionEvent(NVerbs::TConnectionEventPtr event) noexcept
}
}

void TClient::StopEndpoint(TClientEndpoint* endpoint) noexcept
{
ConnectionPoller->Detach(endpoint);
endpoint->Poller->Detach(endpoint);
endpoint->DestroyQP();
endpoint->Connection.reset();
endpoint->StopResult.SetValue();
endpoint->Poller->Release(endpoint);
}

// implements IConnectionEventHandler
void TClient::Reconnect(TClientEndpoint* endpoint) noexcept
{
if (endpoint->ShouldStop()) {
if (endpoint->CheckState(EEndpointState::Disconnecting)) {
// wait for completion poller to flush WRs
endpoint->Reconnect.Schedule();
} else {
// detach pollers and close connection
StopEndpoint(endpoint);
}
return;
}

if (endpoint->Reconnect.Hanging()) {
// if this is our first connection, fail over to IC
if (endpoint->StartResult.Initialized()) {
auto startResult = std::move(endpoint->StartResult);
startResult.SetException(std::make_exception_ptr(TServiceError(
MakeError(endpoint->Status, "connection timeout"))));

StopEndpoint(endpoint);
return;
}
// otherwise keep trying
Expand All @@ -1790,25 +1839,16 @@ void TClient::Reconnect(TClientEndpoint* endpoint) noexcept
case EEndpointState::ResolvingRoute:
break;

// reset connection and try again
// create new connection and try again
case EEndpointState::Connecting:
case EEndpointState::Disconnected:
try {
auto connection = ConnectionPoller->CreateConnection();
endpoint->Poller->Detach(endpoint);
endpoint->DestroyQP();
endpoint->ResetConnection(std::move(connection));

} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->Reconnect.Schedule();
return;
}
endpoint->Poller->Detach(endpoint);
endpoint->DestroyQP();
endpoint->SetConnection(ConnectionPoller->CreateConnection());
break;

// shouldn't happen
// reconnect timer hit at the same time connection was established
case EEndpointState::Connected:
RDMA_WARN(endpoint->Log, "attempting to reconnect in Connected state");
return;
}

Expand Down Expand Up @@ -1851,7 +1891,7 @@ void TClient::BeginResolveAddress(TClientEndpoint* endpoint) noexcept

void TClient::BeginResolveRoute(TClientEndpoint* endpoint) noexcept
{
RDMA_INFO(endpoint->Log, "resolve route to "
RDMA_INFO(endpoint->Log, "resolve route to " << endpoint->Host << " "
<< endpoint->PeerAddress());

endpoint->ChangeState(
Expand Down Expand Up @@ -2049,7 +2089,8 @@ void TClient::DumpHtml(IOutputStream& out) const
for (size_t i = 0; i < CompletionPollers.size(); ++i) {
auto& poller = CompletionPollers[i];
auto endpoints = poller->GetEndpoints();
for (auto& ep : *endpoints) {

for (auto& ep: *endpoints) {
TABLER() {
TABLED() { out << i; }
TABLED() { out << ep->Host; }
Expand Down
35 changes: 35 additions & 0 deletions cloud/blockstore/libs/rdma/impl/client_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,41 @@ Y_UNIT_TEST_SUITE(TRdmaClientTest)
Y_UNUSED(clientEndpoint);
}

Y_UNIT_TEST(ShouldDetachFromPoller)
{
auto testContext = MakeIntrusive<NVerbs::TTestContext>();
testContext->AllowConnect = true;

auto verbs = NVerbs::CreateTestVerbs(testContext);
auto monitoring = CreateMonitoringServiceStub();
auto clientConfig = std::make_shared<TClientConfig>();

auto logging = CreateLoggingService(
"console",
TLogSettings{TLOG_RESOURCES});

auto client = CreateClient(
verbs,
logging,
monitoring,
clientConfig);

client->Start();
Y_DEFER {
client->Stop();
};

auto shared = client->StartEndpoint("::", 10020).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(2, shared.use_count());

shared->Stop().Wait();
auto deadline = GetCycleCount() + DurationToCycles(TDuration::Seconds(10));
while (deadline > GetCycleCount() && shared.use_count() > 1) {
SpinLockPause();
}
UNIT_ASSERT_VALUES_EQUAL(1, shared.use_count());
}

Y_UNIT_TEST(ShouldReturnErrorUponStartEndpointTimeout)
{
auto verbs =
Expand Down
10 changes: 5 additions & 5 deletions cloud/blockstore/libs/rdma/impl/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ class TServerSession final

// called from external thread
void EnqueueRequest(TRequestPtr req) noexcept;
TString GetAddress() const;
TString PeerAddress() const;

// called from CQ thread
void HandleCompletionEvent(ibv_wc* wc) override;
Expand Down Expand Up @@ -456,7 +456,7 @@ TServerSession::~TServerSession()
STORAGE_INFO("close session [send_magic=%X recv_magic=%X] to %s",
SendMagic,
RecvMagic,
GetAddress().c_str());
PeerAddress().c_str());

Verbs->DestroyQP(Connection.get());

Expand Down Expand Up @@ -502,7 +502,7 @@ void TServerSession::EnqueueRequest(TRequestPtr req) noexcept
}
}

TString TServerSession::GetAddress() const
TString TServerSession::PeerAddress() const
{
return NVerbs::PrintAddress(rdma_get_peer_addr(Connection.get()));
}
Expand Down Expand Up @@ -1372,7 +1372,7 @@ class TCompletionPoller final

for (const auto& session: *sessions) {
if (session->IsFlushed()) {
session->CompletionPoller->Release(session.get());
Release(session.get());
}
}
}
Expand Down Expand Up @@ -1661,7 +1661,7 @@ void TServer::DumpHtml(IOutputStream& out) const

for (auto& session: *sessions) {
TABLER() {
TABLED() { out << session->GetAddress(); }
TABLED() { out << session->PeerAddress(); }
TABLED()
{
Printf(
Expand Down
Loading
Loading