Skip to content

Commit

Permalink
improve rdma logging & clean up resources in destructor; issue-1020 (#…
Browse files Browse the repository at this point in the history
…1920)

* improve rdma logging; issue-1020 (#1912)

* add more debug points to differentiate between states
* move non-essential messages to debug
* use RDMA_ logging macros in server
* resolve client's hostname

* issue-1020: fixing heap-buffer-overflow after #1912 (#1926)

---------

Co-authored-by: Andrei Strelkovskii <[email protected]>
  • Loading branch information
tpashkin and qkrorlqr authored Sep 2, 2024
1 parent c79ea7c commit 63d2ea4
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 120 deletions.
103 changes: 51 additions & 52 deletions cloud/blockstore/libs/rdma/impl/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ class TClientEndpoint final
union {
const ui64 Id = RandomNumber(Max<ui64>());
struct {
ui32 RecvMagic;
ui32 SendMagic;
const ui32 RecvMagic;
const ui32 SendMagic;
};
};
ui16 Generation = Max<ui16>();
Expand Down Expand Up @@ -456,7 +456,7 @@ class TClientEndpoint final
bool CheckState(EEndpointState expectedState) const;
void ChangeState(EEndpointState expectedState, EEndpointState newState);
void ChangeState(EEndpointState newState) noexcept;
void SetError() noexcept;
void Disconnect() noexcept;
void FlushQueues() noexcept;

// called from CM thread
Expand Down Expand Up @@ -486,9 +486,6 @@ class TClientEndpoint final
bool Flushed() const;
bool FlushHanging() const;

// called from CM and external thread
TString PeerAddress() const;

private:
// called from CQ thread
void HandleQueuedRequests();
Expand Down Expand Up @@ -542,14 +539,21 @@ TClientEndpoint::TClientEndpoint(
return TStringBuilder() << "[" << Id << "] " << msg;
});

RDMA_INFO("new endpoint [send_magic=" << Hex(SendMagic, 0)
<< " recv_magic=" << Hex(RecvMagic, 0) << "] to " << Host);
RDMA_INFO("start endpoint " << Host
<< " [send_magic=" << Hex(SendMagic, HF_FULL)
<< " recv_magic=" << Hex(RecvMagic, HF_FULL) << "]");
}

TClientEndpoint::~TClientEndpoint()
{
RDMA_INFO("close endpoint [send_magic=" << Hex(SendMagic, 0)
<< " recv_magic=" << Hex(RecvMagic, 0) << "] to " << Host);
// Free resources if endpoint wasn't properly stopped. We don't need to
// detach it, because if we got there, poller is also getting destroyed
if (!StopResult.HasValue()) {
DestroyQP();
Connection.reset();
}

RDMA_INFO("stop endpoint");
}

bool TClientEndpoint::CheckState(EEndpointState expectedState) const
Expand Down Expand Up @@ -635,7 +639,10 @@ void TClientEndpoint::CreateQP()
RecvWrs.resize(Config.QueueSize);

Generation++;
RDMA_INFO("new generation " << Generation);

if (Generation > 1) {
RDMA_DEBUG("new generation " << Generation);
}

ui32 i = 0;
ui64 requestMsg = SendBuffer.Address;
Expand Down Expand Up @@ -973,7 +980,7 @@ void TClientEndpoint::SendRequestCompleted(

Counters->SendRequestError();
ReportRdmaError();
SetError();
Disconnect();
return;
}

Expand Down Expand Up @@ -1022,7 +1029,7 @@ void TClientEndpoint::RecvResponseCompleted(
Counters->RecvResponseError();
RecvResponse(recv);
ReportRdmaError();
SetError();
Disconnect();
return;
}

Expand Down Expand Up @@ -1067,8 +1074,7 @@ void TClientEndpoint::RecvResponseCompleted(
TFuture<void> TClientEndpoint::Stop() noexcept
{
if (!StopFlag.test_and_set()) {
RDMA_DEBUG("stop endpoint");
SetError();
Disconnect();
}
return StopResult.GetFuture();
}
Expand All @@ -1084,19 +1090,14 @@ void TClientEndpoint::SetConnection(NVerbs::TConnectionPtr connection) noexcept
Connection = std::move(connection);
}

TString TClientEndpoint::PeerAddress() const
{
return NVerbs::PrintAddress(rdma_get_peer_addr(Connection.get()));
}

int TClientEndpoint::ReconnectTimerHandle() const
{
return Reconnect.Timer.Handle();
}

void TClientEndpoint::FlushQueues() noexcept
{
STORAGE_INFO("flush queues");
RDMA_DEBUG("flush queues");

try {
ibv_qp_attr attr = {.qp_state = IBV_QPS_ERR};
Expand All @@ -1108,7 +1109,7 @@ void TClientEndpoint::FlushQueues() noexcept
}
}

void TClientEndpoint::SetError() noexcept
void TClientEndpoint::Disconnect() noexcept
{
Status = E_RDMA_UNAVAILABLE;

Expand All @@ -1126,6 +1127,8 @@ void TClientEndpoint::SetError() noexcept

// flush queues, signal the poller and schedule reconnect
case EEndpointState::Connected:
RDMA_INFO("disconnect");

ChangeState(
EEndpointState::Connected,
EEndpointState::Disconnecting);
Expand Down Expand Up @@ -1457,7 +1460,7 @@ class TCompletionPoller final
}
} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->SetError();
endpoint->Disconnect();
}
}

Expand Down Expand Up @@ -1492,7 +1495,7 @@ class TCompletionPoller final
}
} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->SetError();
endpoint->Disconnect();
}
}

Expand Down Expand Up @@ -1534,8 +1537,8 @@ class TCompletionPoller final
continue;
}
RDMA_ERROR(endpoint->Log, "flush timeout "
<< "(send_queue.size=" << endpoint->SendQueue.Size()
<< " recv_queue.size=" << endpoint->RecvQueue.Size() << ")");
<< "[send_queue.size=" << endpoint->SendQueue.Size()
<< " recv_queue.size=" << endpoint->RecvQueue.Size() << "]");
}

endpoint->ChangeState(
Expand Down Expand Up @@ -1737,7 +1740,7 @@ void TClient::HandleConnectionEvent(NVerbs::TConnectionEventPtr event) noexcept
{
TClientEndpoint* endpoint = TClientEndpoint::FromEvent(event.get());

RDMA_INFO(endpoint->Log, "received " << NVerbs::GetEventName(event->event));
RDMA_DEBUG(endpoint->Log, "received " << NVerbs::GetEventName(event->event));

switch (event->event) {
case RDMA_CM_EVENT_CONNECT_REQUEST:
Expand Down Expand Up @@ -1790,6 +1793,8 @@ void TClient::HandleConnectionEvent(NVerbs::TConnectionEventPtr event) noexcept

void TClient::StopEndpoint(TClientEndpoint* endpoint) noexcept
{
RDMA_INFO(endpoint->Log, "detach pollers and close connection");

ConnectionPoller->Detach(endpoint);
endpoint->Poller->Detach(endpoint);
endpoint->DestroyQP();
Expand All @@ -1815,6 +1820,8 @@ void TClient::Reconnect(TClientEndpoint* endpoint) noexcept
if (endpoint->Reconnect.Hanging()) {
// if this is our first connection, fail over to IC
if (endpoint->StartResult.Initialized()) {
RDMA_ERROR(endpoint->Log, "connection timeout");

auto startResult = std::move(endpoint->StartResult);
startResult.SetException(std::make_exception_ptr(TServiceError(
MakeError(endpoint->Status, "connection timeout"))));
Expand All @@ -1823,6 +1830,7 @@ void TClient::Reconnect(TClientEndpoint* endpoint) noexcept
return;
}
// otherwise keep trying
RDMA_WARN(endpoint->Log, "connection is hanging");
}

RDMA_DEBUG(endpoint->Log, "reconnect timer hit in "
Expand Down Expand Up @@ -1866,15 +1874,7 @@ void TClient::BeginResolveAddress(TClientEndpoint* endpoint) noexcept
auto addrinfo = Verbs->GetAddressInfo(
endpoint->Host, endpoint->Port, &hints);

if (addrinfo->ai_src_addr) {
RDMA_INFO(endpoint->Log, "resolve source address "
<< NVerbs::PrintAddress(addrinfo->ai_src_addr));
}

if (addrinfo->ai_dst_addr) {
RDMA_INFO(endpoint->Log, "resolve destination address "
<< NVerbs::PrintAddress(addrinfo->ai_dst_addr));
}
RDMA_DEBUG(endpoint->Log, "resolve rdma address");

endpoint->ChangeState(
EEndpointState::Disconnected,
Expand All @@ -1885,14 +1885,13 @@ void TClient::BeginResolveAddress(TClientEndpoint* endpoint) noexcept

} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->SetError();
endpoint->Disconnect();
}
}

void TClient::BeginResolveRoute(TClientEndpoint* endpoint) noexcept
{
RDMA_INFO(endpoint->Log, "resolve route to " << endpoint->Host << " "
<< endpoint->PeerAddress());
RDMA_DEBUG(endpoint->Log, "resolve route");

endpoint->ChangeState(
EEndpointState::ResolvingAddress,
Expand All @@ -1903,7 +1902,7 @@ void TClient::BeginResolveRoute(TClientEndpoint* endpoint) noexcept

} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->SetError();
endpoint->Disconnect();
}
}

Expand Down Expand Up @@ -1936,15 +1935,14 @@ void TClient::BeginConnect(TClientEndpoint* endpoint) noexcept
.rnr_retry_count = 7,
};

RDMA_INFO(endpoint->Log, "connect to "
<< endpoint->PeerAddress() << " "
RDMA_INFO(endpoint->Log, "connect "
<< NVerbs::PrintConnectionParams(&param));

Verbs->Connect(endpoint->Connection.get(), &param);

} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->SetError();
endpoint->Disconnect();
}
}

Expand All @@ -1954,16 +1952,15 @@ void TClient::HandleConnected(
{
const rdma_conn_param* param = &event->param.conn;

RDMA_INFO(endpoint->Log, "validate connection from "
<< endpoint->PeerAddress() << " "
RDMA_DEBUG(endpoint->Log, "validate "
<< NVerbs::PrintConnectionParams(param));

if (param->private_data == nullptr ||
param->private_data_len < sizeof(TAcceptMessage) ||
ParseMessageHeader(param->private_data) != RDMA_PROTO_VERSION)
{
RDMA_ERROR(endpoint->Log, "unable to parse accept message");
endpoint->SetError();
endpoint->Disconnect();
return;
}

Expand All @@ -1979,7 +1976,7 @@ void TClient::HandleConnected(

} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->SetError();
endpoint->Disconnect();
return;
}

Expand All @@ -1999,7 +1996,7 @@ void TClient::HandleRejected(
param->private_data_len < sizeof(TRejectMessage) ||
ParseMessageHeader(param->private_data) != RDMA_PROTO_VERSION)
{
endpoint->SetError();
endpoint->Disconnect();
return;
}

Expand All @@ -2009,28 +2006,28 @@ void TClient::HandleRejected(
if (msg->Status == RDMA_PROTO_CONFIG_MISMATCH) {
if (endpoint->Config.QueueSize > msg->QueueSize) {
RDMA_INFO(endpoint->Log, "set QueueSize=" << msg->QueueSize
<< " supported by server " << endpoint->PeerAddress());
<< " supported by " << endpoint->Host);

endpoint->Config.QueueSize = msg->QueueSize;
}

if (endpoint->Config.MaxBufferSize > msg->MaxBufferSize) {
RDMA_INFO(endpoint->Log, "set MaxBufferSize=" << msg->MaxBufferSize
<< " supported by server " << endpoint->PeerAddress());
<< " supported by " << endpoint->Host);

endpoint->Config.MaxBufferSize = msg->MaxBufferSize;
}
}

endpoint->SetError();
endpoint->Disconnect();
}

void TClient::HandleDisconnected(TClientEndpoint* endpoint) noexcept
{
// we can't reset config right away, because disconnect needs to know queue
// size to reap flushed WRs
endpoint->ResetConfig = true;
endpoint->SetError();
endpoint->Disconnect();
}

TCompletionPoller& TClient::PickPoller() noexcept
Expand Down Expand Up @@ -2080,6 +2077,7 @@ void TClient::DumpHtml(IOutputStream& out) const
TABLEHEAD() {
TABLER() {
TABLEH() { out << "Poller"; }
TABLEH() { out << "Id"; }
TABLEH() { out << "Host"; }
TABLEH() { out << "Port"; }
TABLEH() { out << "Magic"; }
Expand All @@ -2093,6 +2091,7 @@ void TClient::DumpHtml(IOutputStream& out) const
for (auto& ep: *endpoints) {
TABLER() {
TABLED() { out << i; }
TABLED() { out << ep->Id; }
TABLED() { out << ep->Host; }
TABLED() { out << ep->Port; }
TABLED()
Expand Down
4 changes: 1 addition & 3 deletions cloud/blockstore/libs/rdma/impl/client_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ Y_UNIT_TEST_SUITE(TRdmaClientTest)
client->Stop();
};

auto clientEndpoint = client->StartEndpoint(
"::",
10020);
auto clientEndpoint = client->StartEndpoint("::", 10020);

Y_UNUSED(clientEndpoint);
}
Expand Down
Loading

0 comments on commit 63d2ea4

Please sign in to comment.