Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve rdma logging & clean up resources in destructor; issue-1020 #1920

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 51 additions & 52 deletions cloud/blockstore/libs/rdma/impl/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ class TClientEndpoint final
union {
const ui64 Id = RandomNumber(Max<ui64>());
struct {
ui32 RecvMagic;
ui32 SendMagic;
const ui32 RecvMagic;
const ui32 SendMagic;
};
};
ui16 Generation = Max<ui16>();
Expand Down Expand Up @@ -456,7 +456,7 @@ class TClientEndpoint final
bool CheckState(EEndpointState expectedState) const;
void ChangeState(EEndpointState expectedState, EEndpointState newState);
void ChangeState(EEndpointState newState) noexcept;
void SetError() noexcept;
void Disconnect() noexcept;
void FlushQueues() noexcept;

// called from CM thread
Expand Down Expand Up @@ -486,9 +486,6 @@ class TClientEndpoint final
bool Flushed() const;
bool FlushHanging() const;

// called from CM and external thread
TString PeerAddress() const;

private:
// called from CQ thread
void HandleQueuedRequests();
Expand Down Expand Up @@ -542,14 +539,21 @@ TClientEndpoint::TClientEndpoint(
return TStringBuilder() << "[" << Id << "] " << msg;
});

RDMA_INFO("new endpoint [send_magic=" << Hex(SendMagic, 0)
<< " recv_magic=" << Hex(RecvMagic, 0) << "] to " << Host);
RDMA_INFO("start endpoint " << Host
<< " [send_magic=" << Hex(SendMagic, HF_FULL)
<< " recv_magic=" << Hex(RecvMagic, HF_FULL) << "]");
}

TClientEndpoint::~TClientEndpoint()
{
RDMA_INFO("close endpoint [send_magic=" << Hex(SendMagic, 0)
<< " recv_magic=" << Hex(RecvMagic, 0) << "] to " << Host);
// Free resources if endpoint wasn't properly stopped. We don't need to
// detach it, because if we got there, poller is also getting destroyed
if (!StopResult.HasValue()) {
DestroyQP();
Connection.reset();
}

RDMA_INFO("stop endpoint");
}

bool TClientEndpoint::CheckState(EEndpointState expectedState) const
Expand Down Expand Up @@ -635,7 +639,10 @@ void TClientEndpoint::CreateQP()
RecvWrs.resize(Config.QueueSize);

Generation++;
RDMA_INFO("new generation " << Generation);

if (Generation > 1) {
RDMA_DEBUG("new generation " << Generation);
}

ui32 i = 0;
ui64 requestMsg = SendBuffer.Address;
Expand Down Expand Up @@ -973,7 +980,7 @@ void TClientEndpoint::SendRequestCompleted(

Counters->SendRequestError();
ReportRdmaError();
SetError();
Disconnect();
return;
}

Expand Down Expand Up @@ -1022,7 +1029,7 @@ void TClientEndpoint::RecvResponseCompleted(
Counters->RecvResponseError();
RecvResponse(recv);
ReportRdmaError();
SetError();
Disconnect();
return;
}

Expand Down Expand Up @@ -1067,8 +1074,7 @@ void TClientEndpoint::RecvResponseCompleted(
TFuture<void> TClientEndpoint::Stop() noexcept
{
if (!StopFlag.test_and_set()) {
RDMA_DEBUG("stop endpoint");
SetError();
Disconnect();
}
return StopResult.GetFuture();
}
Expand All @@ -1084,19 +1090,14 @@ void TClientEndpoint::SetConnection(NVerbs::TConnectionPtr connection) noexcept
Connection = std::move(connection);
}

TString TClientEndpoint::PeerAddress() const
{
return NVerbs::PrintAddress(rdma_get_peer_addr(Connection.get()));
}

int TClientEndpoint::ReconnectTimerHandle() const
{
return Reconnect.Timer.Handle();
}

void TClientEndpoint::FlushQueues() noexcept
{
STORAGE_INFO("flush queues");
RDMA_DEBUG("flush queues");

try {
ibv_qp_attr attr = {.qp_state = IBV_QPS_ERR};
Expand All @@ -1108,7 +1109,7 @@ void TClientEndpoint::FlushQueues() noexcept
}
}

void TClientEndpoint::SetError() noexcept
void TClientEndpoint::Disconnect() noexcept
{
Status = E_RDMA_UNAVAILABLE;

Expand All @@ -1126,6 +1127,8 @@ void TClientEndpoint::SetError() noexcept

// flush queues, signal the poller and schedule reconnect
case EEndpointState::Connected:
RDMA_INFO("disconnect");

ChangeState(
EEndpointState::Connected,
EEndpointState::Disconnecting);
Expand Down Expand Up @@ -1457,7 +1460,7 @@ class TCompletionPoller final
}
} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->SetError();
endpoint->Disconnect();
}
}

Expand Down Expand Up @@ -1492,7 +1495,7 @@ class TCompletionPoller final
}
} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->SetError();
endpoint->Disconnect();
}
}

Expand Down Expand Up @@ -1534,8 +1537,8 @@ class TCompletionPoller final
continue;
}
RDMA_ERROR(endpoint->Log, "flush timeout "
<< "(send_queue.size=" << endpoint->SendQueue.Size()
<< " recv_queue.size=" << endpoint->RecvQueue.Size() << ")");
<< "[send_queue.size=" << endpoint->SendQueue.Size()
<< " recv_queue.size=" << endpoint->RecvQueue.Size() << "]");
}

endpoint->ChangeState(
Expand Down Expand Up @@ -1737,7 +1740,7 @@ void TClient::HandleConnectionEvent(NVerbs::TConnectionEventPtr event) noexcept
{
TClientEndpoint* endpoint = TClientEndpoint::FromEvent(event.get());

RDMA_INFO(endpoint->Log, "received " << NVerbs::GetEventName(event->event));
RDMA_DEBUG(endpoint->Log, "received " << NVerbs::GetEventName(event->event));

switch (event->event) {
case RDMA_CM_EVENT_CONNECT_REQUEST:
Expand Down Expand Up @@ -1790,6 +1793,8 @@ void TClient::HandleConnectionEvent(NVerbs::TConnectionEventPtr event) noexcept

void TClient::StopEndpoint(TClientEndpoint* endpoint) noexcept
{
RDMA_INFO(endpoint->Log, "detach pollers and close connection");

ConnectionPoller->Detach(endpoint);
endpoint->Poller->Detach(endpoint);
endpoint->DestroyQP();
Expand All @@ -1815,6 +1820,8 @@ void TClient::Reconnect(TClientEndpoint* endpoint) noexcept
if (endpoint->Reconnect.Hanging()) {
// if this is our first connection, fail over to IC
if (endpoint->StartResult.Initialized()) {
RDMA_ERROR(endpoint->Log, "connection timeout");

auto startResult = std::move(endpoint->StartResult);
startResult.SetException(std::make_exception_ptr(TServiceError(
MakeError(endpoint->Status, "connection timeout"))));
Expand All @@ -1823,6 +1830,7 @@ void TClient::Reconnect(TClientEndpoint* endpoint) noexcept
return;
}
// otherwise keep trying
RDMA_WARN(endpoint->Log, "connection is hanging");
}

RDMA_DEBUG(endpoint->Log, "reconnect timer hit in "
Expand Down Expand Up @@ -1866,15 +1874,7 @@ void TClient::BeginResolveAddress(TClientEndpoint* endpoint) noexcept
auto addrinfo = Verbs->GetAddressInfo(
endpoint->Host, endpoint->Port, &hints);

if (addrinfo->ai_src_addr) {
RDMA_INFO(endpoint->Log, "resolve source address "
<< NVerbs::PrintAddress(addrinfo->ai_src_addr));
}

if (addrinfo->ai_dst_addr) {
RDMA_INFO(endpoint->Log, "resolve destination address "
<< NVerbs::PrintAddress(addrinfo->ai_dst_addr));
}
RDMA_DEBUG(endpoint->Log, "resolve rdma address");

endpoint->ChangeState(
EEndpointState::Disconnected,
Expand All @@ -1885,14 +1885,13 @@ void TClient::BeginResolveAddress(TClientEndpoint* endpoint) noexcept

} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->SetError();
endpoint->Disconnect();
}
}

void TClient::BeginResolveRoute(TClientEndpoint* endpoint) noexcept
{
RDMA_INFO(endpoint->Log, "resolve route to " << endpoint->Host << " "
<< endpoint->PeerAddress());
RDMA_DEBUG(endpoint->Log, "resolve route");

endpoint->ChangeState(
EEndpointState::ResolvingAddress,
Expand All @@ -1903,7 +1902,7 @@ void TClient::BeginResolveRoute(TClientEndpoint* endpoint) noexcept

} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->SetError();
endpoint->Disconnect();
}
}

Expand Down Expand Up @@ -1936,15 +1935,14 @@ void TClient::BeginConnect(TClientEndpoint* endpoint) noexcept
.rnr_retry_count = 7,
};

RDMA_INFO(endpoint->Log, "connect to "
<< endpoint->PeerAddress() << " "
RDMA_INFO(endpoint->Log, "connect "
<< NVerbs::PrintConnectionParams(&param));

Verbs->Connect(endpoint->Connection.get(), &param);

} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->SetError();
endpoint->Disconnect();
}
}

Expand All @@ -1954,16 +1952,15 @@ void TClient::HandleConnected(
{
const rdma_conn_param* param = &event->param.conn;

RDMA_INFO(endpoint->Log, "validate connection from "
<< endpoint->PeerAddress() << " "
RDMA_DEBUG(endpoint->Log, "validate "
<< NVerbs::PrintConnectionParams(param));

if (param->private_data == nullptr ||
param->private_data_len < sizeof(TAcceptMessage) ||
ParseMessageHeader(param->private_data) != RDMA_PROTO_VERSION)
{
RDMA_ERROR(endpoint->Log, "unable to parse accept message");
endpoint->SetError();
endpoint->Disconnect();
return;
}

Expand All @@ -1979,7 +1976,7 @@ void TClient::HandleConnected(

} catch (const TServiceError& e) {
RDMA_ERROR(endpoint->Log, e.what());
endpoint->SetError();
endpoint->Disconnect();
return;
}

Expand All @@ -1999,7 +1996,7 @@ void TClient::HandleRejected(
param->private_data_len < sizeof(TRejectMessage) ||
ParseMessageHeader(param->private_data) != RDMA_PROTO_VERSION)
{
endpoint->SetError();
endpoint->Disconnect();
return;
}

Expand All @@ -2009,28 +2006,28 @@ void TClient::HandleRejected(
if (msg->Status == RDMA_PROTO_CONFIG_MISMATCH) {
if (endpoint->Config.QueueSize > msg->QueueSize) {
RDMA_INFO(endpoint->Log, "set QueueSize=" << msg->QueueSize
<< " supported by server " << endpoint->PeerAddress());
<< " supported by " << endpoint->Host);

endpoint->Config.QueueSize = msg->QueueSize;
}

if (endpoint->Config.MaxBufferSize > msg->MaxBufferSize) {
RDMA_INFO(endpoint->Log, "set MaxBufferSize=" << msg->MaxBufferSize
<< " supported by server " << endpoint->PeerAddress());
<< " supported by " << endpoint->Host);

endpoint->Config.MaxBufferSize = msg->MaxBufferSize;
}
}

endpoint->SetError();
endpoint->Disconnect();
}

void TClient::HandleDisconnected(TClientEndpoint* endpoint) noexcept
{
// we can't reset config right away, because disconnect needs to know queue
// size to reap flushed WRs
endpoint->ResetConfig = true;
endpoint->SetError();
endpoint->Disconnect();
}

TCompletionPoller& TClient::PickPoller() noexcept
Expand Down Expand Up @@ -2080,6 +2077,7 @@ void TClient::DumpHtml(IOutputStream& out) const
TABLEHEAD() {
TABLER() {
TABLEH() { out << "Poller"; }
TABLEH() { out << "Id"; }
TABLEH() { out << "Host"; }
TABLEH() { out << "Port"; }
TABLEH() { out << "Magic"; }
Expand All @@ -2093,6 +2091,7 @@ void TClient::DumpHtml(IOutputStream& out) const
for (auto& ep: *endpoints) {
TABLER() {
TABLED() { out << i; }
TABLED() { out << ep->Id; }
TABLED() { out << ep->Host; }
TABLED() { out << ep->Port; }
TABLED()
Expand Down
4 changes: 1 addition & 3 deletions cloud/blockstore/libs/rdma/impl/client_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ Y_UNIT_TEST_SUITE(TRdmaClientTest)
client->Stop();
};

auto clientEndpoint = client->StartEndpoint(
"::",
10020);
auto clientEndpoint = client->StartEndpoint("::", 10020);

Y_UNUSED(clientEndpoint);
}
Expand Down
Loading
Loading