diff --git a/cloud/blockstore/libs/rdma/impl/client.cpp b/cloud/blockstore/libs/rdma/impl/client.cpp index 9ed3ac95fed..78656bd784e 100644 --- a/cloud/blockstore/libs/rdma/impl/client.cpp +++ b/cloud/blockstore/libs/rdma/impl/client.cpp @@ -422,8 +422,8 @@ class TClientEndpoint final union { const ui64 Id = RandomNumber(Max()); struct { - ui32 RecvMagic; - ui32 SendMagic; + const ui32 RecvMagic; + const ui32 SendMagic; }; }; ui16 Generation = Max(); @@ -456,7 +456,7 @@ class TClientEndpoint final bool CheckState(EEndpointState expectedState) const; void ChangeState(EEndpointState expectedState, EEndpointState newState); void ChangeState(EEndpointState newState) noexcept; - void SetError() noexcept; + void Disconnect() noexcept; void FlushQueues() noexcept; // called from CM thread @@ -486,9 +486,6 @@ class TClientEndpoint final bool Flushed() const; bool FlushHanging() const; - // called from CM and external thread - TString PeerAddress() const; - private: // called from CQ thread void HandleQueuedRequests(); @@ -542,14 +539,21 @@ TClientEndpoint::TClientEndpoint( return TStringBuilder() << "[" << Id << "] " << msg; }); - RDMA_INFO("new endpoint [send_magic=" << Hex(SendMagic, 0) - << " recv_magic=" << Hex(RecvMagic, 0) << "] to " << Host); + RDMA_INFO("start endpoint " << Host + << " [send_magic=" << Hex(SendMagic, HF_FULL) + << " recv_magic=" << Hex(RecvMagic, HF_FULL) << "]"); } TClientEndpoint::~TClientEndpoint() { - RDMA_INFO("close endpoint [send_magic=" << Hex(SendMagic, 0) - << " recv_magic=" << Hex(RecvMagic, 0) << "] to " << Host); + // Free resources if endpoint wasn't properly stopped. We don't need to + // detach it, because if we got there, poller is also getting destroyed + if (!StopResult.HasValue()) { + DestroyQP(); + Connection.reset(); + } + + RDMA_INFO("stop endpoint"); } bool TClientEndpoint::CheckState(EEndpointState expectedState) const @@ -635,7 +639,10 @@ void TClientEndpoint::CreateQP() RecvWrs.resize(Config.QueueSize); Generation++; - RDMA_INFO("new generation " << Generation); + + if (Generation > 1) { + RDMA_DEBUG("new generation " << Generation); + } ui32 i = 0; ui64 requestMsg = SendBuffer.Address; @@ -973,7 +980,7 @@ void TClientEndpoint::SendRequestCompleted( Counters->SendRequestError(); ReportRdmaError(); - SetError(); + Disconnect(); return; } @@ -1022,7 +1029,7 @@ void TClientEndpoint::RecvResponseCompleted( Counters->RecvResponseError(); RecvResponse(recv); ReportRdmaError(); - SetError(); + Disconnect(); return; } @@ -1067,8 +1074,7 @@ void TClientEndpoint::RecvResponseCompleted( TFuture TClientEndpoint::Stop() noexcept { if (!StopFlag.test_and_set()) { - RDMA_DEBUG("stop endpoint"); - SetError(); + Disconnect(); } return StopResult.GetFuture(); } @@ -1084,11 +1090,6 @@ void TClientEndpoint::SetConnection(NVerbs::TConnectionPtr connection) noexcept Connection = std::move(connection); } -TString TClientEndpoint::PeerAddress() const -{ - return NVerbs::PrintAddress(rdma_get_peer_addr(Connection.get())); -} - int TClientEndpoint::ReconnectTimerHandle() const { return Reconnect.Timer.Handle(); @@ -1096,7 +1097,7 @@ int TClientEndpoint::ReconnectTimerHandle() const void TClientEndpoint::FlushQueues() noexcept { - STORAGE_INFO("flush queues"); + RDMA_DEBUG("flush queues"); try { ibv_qp_attr attr = {.qp_state = IBV_QPS_ERR}; @@ -1108,7 +1109,7 @@ void TClientEndpoint::FlushQueues() noexcept } } -void TClientEndpoint::SetError() noexcept +void TClientEndpoint::Disconnect() noexcept { Status = E_RDMA_UNAVAILABLE; @@ -1126,6 +1127,8 @@ void TClientEndpoint::SetError() noexcept // flush queues, signal the poller and schedule reconnect case EEndpointState::Connected: + RDMA_INFO("disconnect"); + ChangeState( EEndpointState::Connected, EEndpointState::Disconnecting); @@ -1457,7 +1460,7 @@ class TCompletionPoller final } } catch (const TServiceError& e) { RDMA_ERROR(endpoint->Log, e.what()); - endpoint->SetError(); + endpoint->Disconnect(); } } @@ -1492,7 +1495,7 @@ class TCompletionPoller final } } catch (const TServiceError& e) { RDMA_ERROR(endpoint->Log, e.what()); - endpoint->SetError(); + endpoint->Disconnect(); } } @@ -1534,8 +1537,8 @@ class TCompletionPoller final continue; } RDMA_ERROR(endpoint->Log, "flush timeout " - << "(send_queue.size=" << endpoint->SendQueue.Size() - << " recv_queue.size=" << endpoint->RecvQueue.Size() << ")"); + << "[send_queue.size=" << endpoint->SendQueue.Size() + << " recv_queue.size=" << endpoint->RecvQueue.Size() << "]"); } endpoint->ChangeState( @@ -1737,7 +1740,7 @@ void TClient::HandleConnectionEvent(NVerbs::TConnectionEventPtr event) noexcept { TClientEndpoint* endpoint = TClientEndpoint::FromEvent(event.get()); - RDMA_INFO(endpoint->Log, "received " << NVerbs::GetEventName(event->event)); + RDMA_DEBUG(endpoint->Log, "received " << NVerbs::GetEventName(event->event)); switch (event->event) { case RDMA_CM_EVENT_CONNECT_REQUEST: @@ -1790,6 +1793,8 @@ void TClient::HandleConnectionEvent(NVerbs::TConnectionEventPtr event) noexcept void TClient::StopEndpoint(TClientEndpoint* endpoint) noexcept { + RDMA_INFO(endpoint->Log, "detach pollers and close connection"); + ConnectionPoller->Detach(endpoint); endpoint->Poller->Detach(endpoint); endpoint->DestroyQP(); @@ -1815,6 +1820,8 @@ void TClient::Reconnect(TClientEndpoint* endpoint) noexcept if (endpoint->Reconnect.Hanging()) { // if this is our first connection, fail over to IC if (endpoint->StartResult.Initialized()) { + RDMA_ERROR(endpoint->Log, "connection timeout"); + auto startResult = std::move(endpoint->StartResult); startResult.SetException(std::make_exception_ptr(TServiceError( MakeError(endpoint->Status, "connection timeout")))); @@ -1823,6 +1830,7 @@ void TClient::Reconnect(TClientEndpoint* endpoint) noexcept return; } // otherwise keep trying + RDMA_WARN(endpoint->Log, "connection is hanging"); } RDMA_DEBUG(endpoint->Log, "reconnect timer hit in " @@ -1866,15 +1874,7 @@ void TClient::BeginResolveAddress(TClientEndpoint* endpoint) noexcept auto addrinfo = Verbs->GetAddressInfo( endpoint->Host, endpoint->Port, &hints); - if (addrinfo->ai_src_addr) { - RDMA_INFO(endpoint->Log, "resolve source address " - << NVerbs::PrintAddress(addrinfo->ai_src_addr)); - } - - if (addrinfo->ai_dst_addr) { - RDMA_INFO(endpoint->Log, "resolve destination address " - << NVerbs::PrintAddress(addrinfo->ai_dst_addr)); - } + RDMA_DEBUG(endpoint->Log, "resolve rdma address"); endpoint->ChangeState( EEndpointState::Disconnected, @@ -1885,14 +1885,13 @@ void TClient::BeginResolveAddress(TClientEndpoint* endpoint) noexcept } catch (const TServiceError& e) { RDMA_ERROR(endpoint->Log, e.what()); - endpoint->SetError(); + endpoint->Disconnect(); } } void TClient::BeginResolveRoute(TClientEndpoint* endpoint) noexcept { - RDMA_INFO(endpoint->Log, "resolve route to " << endpoint->Host << " " - << endpoint->PeerAddress()); + RDMA_DEBUG(endpoint->Log, "resolve route"); endpoint->ChangeState( EEndpointState::ResolvingAddress, @@ -1903,7 +1902,7 @@ void TClient::BeginResolveRoute(TClientEndpoint* endpoint) noexcept } catch (const TServiceError& e) { RDMA_ERROR(endpoint->Log, e.what()); - endpoint->SetError(); + endpoint->Disconnect(); } } @@ -1936,15 +1935,14 @@ void TClient::BeginConnect(TClientEndpoint* endpoint) noexcept .rnr_retry_count = 7, }; - RDMA_INFO(endpoint->Log, "connect to " - << endpoint->PeerAddress() << " " + RDMA_INFO(endpoint->Log, "connect " << NVerbs::PrintConnectionParams(¶m)); Verbs->Connect(endpoint->Connection.get(), ¶m); } catch (const TServiceError& e) { RDMA_ERROR(endpoint->Log, e.what()); - endpoint->SetError(); + endpoint->Disconnect(); } } @@ -1954,8 +1952,7 @@ void TClient::HandleConnected( { const rdma_conn_param* param = &event->param.conn; - RDMA_INFO(endpoint->Log, "validate connection from " - << endpoint->PeerAddress() << " " + RDMA_DEBUG(endpoint->Log, "validate " << NVerbs::PrintConnectionParams(param)); if (param->private_data == nullptr || @@ -1963,7 +1960,7 @@ void TClient::HandleConnected( ParseMessageHeader(param->private_data) != RDMA_PROTO_VERSION) { RDMA_ERROR(endpoint->Log, "unable to parse accept message"); - endpoint->SetError(); + endpoint->Disconnect(); return; } @@ -1979,7 +1976,7 @@ void TClient::HandleConnected( } catch (const TServiceError& e) { RDMA_ERROR(endpoint->Log, e.what()); - endpoint->SetError(); + endpoint->Disconnect(); return; } @@ -1999,7 +1996,7 @@ void TClient::HandleRejected( param->private_data_len < sizeof(TRejectMessage) || ParseMessageHeader(param->private_data) != RDMA_PROTO_VERSION) { - endpoint->SetError(); + endpoint->Disconnect(); return; } @@ -2009,20 +2006,20 @@ void TClient::HandleRejected( if (msg->Status == RDMA_PROTO_CONFIG_MISMATCH) { if (endpoint->Config.QueueSize > msg->QueueSize) { RDMA_INFO(endpoint->Log, "set QueueSize=" << msg->QueueSize - << " supported by server " << endpoint->PeerAddress()); + << " supported by " << endpoint->Host); endpoint->Config.QueueSize = msg->QueueSize; } if (endpoint->Config.MaxBufferSize > msg->MaxBufferSize) { RDMA_INFO(endpoint->Log, "set MaxBufferSize=" << msg->MaxBufferSize - << " supported by server " << endpoint->PeerAddress()); + << " supported by " << endpoint->Host); endpoint->Config.MaxBufferSize = msg->MaxBufferSize; } } - endpoint->SetError(); + endpoint->Disconnect(); } void TClient::HandleDisconnected(TClientEndpoint* endpoint) noexcept @@ -2030,7 +2027,7 @@ void TClient::HandleDisconnected(TClientEndpoint* endpoint) noexcept // we can't reset config right away, because disconnect needs to know queue // size to reap flushed WRs endpoint->ResetConfig = true; - endpoint->SetError(); + endpoint->Disconnect(); } TCompletionPoller& TClient::PickPoller() noexcept @@ -2080,6 +2077,7 @@ void TClient::DumpHtml(IOutputStream& out) const TABLEHEAD() { TABLER() { TABLEH() { out << "Poller"; } + TABLEH() { out << "Id"; } TABLEH() { out << "Host"; } TABLEH() { out << "Port"; } TABLEH() { out << "Magic"; } @@ -2093,6 +2091,7 @@ void TClient::DumpHtml(IOutputStream& out) const for (auto& ep: *endpoints) { TABLER() { TABLED() { out << i; } + TABLED() { out << ep->Id; } TABLED() { out << ep->Host; } TABLED() { out << ep->Port; } TABLED() diff --git a/cloud/blockstore/libs/rdma/impl/client_ut.cpp b/cloud/blockstore/libs/rdma/impl/client_ut.cpp index 66f0f0f88fe..ad3203ca5de 100644 --- a/cloud/blockstore/libs/rdma/impl/client_ut.cpp +++ b/cloud/blockstore/libs/rdma/impl/client_ut.cpp @@ -73,9 +73,7 @@ Y_UNIT_TEST_SUITE(TRdmaClientTest) client->Stop(); }; - auto clientEndpoint = client->StartEndpoint( - "::", - 10020); + auto clientEndpoint = client->StartEndpoint("::", 10020); Y_UNUSED(clientEndpoint); } diff --git a/cloud/blockstore/libs/rdma/impl/server.cpp b/cloud/blockstore/libs/rdma/impl/server.cpp index fdadc30ce8a..1ddd58d9bf2 100644 --- a/cloud/blockstore/libs/rdma/impl/server.cpp +++ b/cloud/blockstore/libs/rdma/impl/server.cpp @@ -5,6 +5,7 @@ #include "list.h" #include "poll.h" #include "rcu.h" +#include "log.h" #include "utils.h" #include "verbs.h" #include "work_queue.h" @@ -287,8 +288,13 @@ class TServerSession final TWorkQueue SendQueue; TWorkQueue RecvQueue; - const ui32 SendMagic = RandomNumber(Max()); - const ui32 RecvMagic = RandomNumber(Max()); + union { + const ui64 Id = RandomNumber(Max()); + struct { + const ui32 RecvMagic; + const ui32 SendMagic; + }; + }; TLockFreeList InputRequests; TSimpleList QueuedRequests; @@ -320,7 +326,6 @@ class TServerSession final // called from external thread void EnqueueRequest(TRequestPtr req) noexcept; - TString PeerAddress() const; // called from CQ thread void HandleCompletionEvent(ibv_wc* wc) override; @@ -367,8 +372,14 @@ TServerSession::TServerSession( { Connection->context = this; - STORAGE_INFO("new session [send_magic=%X recv_magic=%X]", - SendMagic, RecvMagic); + Log.SetFormatter([=](ELogPriority p, TStringBuf msg) { + Y_UNUSED(p); + return TStringBuilder() << "[" << Id << "] " << msg; + }); + + RDMA_INFO("start session " << Verbs->GetPeer(Connection.get()) + << " [send_magic=" << Hex(SendMagic, HF_FULL) + << " recv_magic=" << Hex(RecvMagic, HF_FULL) << "]"); CompletionChannel = Verbs->CreateCompletionChannel(Connection->verbs); SetNonBlock(CompletionChannel->fd, true); @@ -453,10 +464,7 @@ TServerSession::TServerSession( TServerSession::~TServerSession() { - STORAGE_INFO("close session [send_magic=%X recv_magic=%X] to %s", - SendMagic, - RecvMagic, - PeerAddress().c_str()); + RDMA_INFO("stop session"); Verbs->DestroyQP(Connection.get()); @@ -488,7 +496,7 @@ void TServerSession::Stop() noexcept Verbs->Disconnect(Connection.get()); } catch (const TServiceError &e) { - STORAGE_ERROR("unable to disconnect session"); + RDMA_ERROR("unable to disconnect session"); } } @@ -502,11 +510,6 @@ void TServerSession::EnqueueRequest(TRequestPtr req) noexcept } } -TString TServerSession::PeerAddress() const -{ - return NVerbs::PrintAddress(rdma_get_peer_addr(Connection.get())); -} - bool TServerSession::HandleInputRequests() { if (Config->WaitMode == EWaitMode::Poll) { @@ -578,8 +581,7 @@ bool TServerSession::HandleCompletionEvents() void TServerSession::Flush() { - STORAGE_INFO("flush session [send_magic=%X recv_magic=%X]", - SendMagic, RecvMagic); + RDMA_DEBUG("flush queues"); struct ibv_qp_attr attr = {.qp_state = IBV_QPS_ERR}; Verbs->ModifyQP(Connection->qp, &attr, IBV_QP_STATE); @@ -622,11 +624,11 @@ void TServerSession::HandleCompletionEvent(ibv_wc* wc) { auto id = TWorkRequestId(wc->wr_id); - STORAGE_TRACE(NVerbs::GetOpcodeName(wc->opcode) << " " << id + RDMA_TRACE(NVerbs::GetOpcodeName(wc->opcode) << " " << id << " completed with " << NVerbs::GetStatusString(wc->status)); if (!IsWorkRequestValid(id)) { - STORAGE_ERROR_T(LogThrottler.Unexpected, "unexpected completion " + RDMA_ERROR(LogThrottler.Unexpected, Log, "unexpected completion " << NVerbs::PrintCompletion(wc)); Counters->UnexpectedCompletion(); @@ -657,7 +659,7 @@ void TServerSession::HandleCompletionEvent(ibv_wc* wc) break; default: - STORAGE_ERROR_T(LogThrottler.Unexpected, "unexpected completion " + RDMA_ERROR(LogThrottler.Unexpected, Log, "unexpected completion " << NVerbs::PrintCompletion(wc)); Counters->UnexpectedCompletion(); @@ -669,7 +671,7 @@ void TServerSession::RecvRequest(TRecvWr* recv) auto* requestMsg = recv->Message(); Zero(*requestMsg); - STORAGE_TRACE("RECV " << TWorkRequestId(recv->wr.wr_id)); + RDMA_TRACE("RECV " << TWorkRequestId(recv->wr.wr_id)); Verbs->PostRecv(Connection->qp, &recv->wr); Counters->RecvRequestStarted(); } @@ -686,7 +688,7 @@ void TServerSession::FreeRequest(TRequestPtr req, TSendWr* send) noexcept void TServerSession::RecvRequestCompleted(TRecvWr* recv, ibv_wc_status status) { if (status != IBV_WC_SUCCESS) { - STORAGE_ERROR("RECV " << TWorkRequestId(recv->wr.wr_id) << ": " + RDMA_ERROR("RECV " << TWorkRequestId(recv->wr.wr_id) << ": " << NVerbs::GetStatusString(status)); Counters->RecvRequestError(); @@ -699,7 +701,7 @@ void TServerSession::RecvRequestCompleted(TRecvWr* recv, ibv_wc_status status) const int version = ParseMessageHeader(msg); if (version != RDMA_PROTO_VERSION) { - STORAGE_ERROR("RECV " << TWorkRequestId(recv->wr.wr_id) + RDMA_ERROR("RECV " << TWorkRequestId(recv->wr.wr_id) << ": incompatible protocol version " << version << " != "<< int(RDMA_PROTO_VERSION)); @@ -721,7 +723,7 @@ void TServerSession::RecvRequestCompleted(TRecvWr* recv, ibv_wc_status status) RecvRequest(recv); // should always be posted if (req->In.Length > Config->MaxBufferSize) { - STORAGE_ERROR("RECV " << TWorkRequestId(recv->wr.wr_id) + RDMA_ERROR("RECV " << TWorkRequestId(recv->wr.wr_id) << ": request exceeds maximum supported size " << req->In.Length << " > " << Config->MaxBufferSize); @@ -730,7 +732,7 @@ void TServerSession::RecvRequestCompleted(TRecvWr* recv, ibv_wc_status status) } if (req->Out.Length > Config->MaxBufferSize) { - STORAGE_ERROR("RECV " << TWorkRequestId(recv->wr.wr_id) + RDMA_ERROR("RECV " << TWorkRequestId(recv->wr.wr_id) << ": request exceeds maximum supported size " << req->Out.Length << " > " << Config->MaxBufferSize); @@ -744,7 +746,7 @@ void TServerSession::RecvRequestCompleted(TRecvWr* recv, ibv_wc_status status) req->CallContext->RequestId); if (MaxInflightBytes < req->In.Length + req->Out.Length) { - STORAGE_INFO_T(LogThrottler.Inflight, "reached inflight limit, " + RDMA_INFO(LogThrottler.Inflight, Log, "reached inflight limit, " << MaxInflightBytes << "/" << Config->MaxInflightBytes << " bytes available"); @@ -819,7 +821,7 @@ void TServerSession::ReadRequestData(TRequestPtr req, TSendWr* send) wr.wr.rdma.rkey = req->In.Key; wr.wr.rdma.remote_addr = req->In.Address; - STORAGE_TRACE("READ " << TWorkRequestId(wr.wr_id)); + RDMA_TRACE("READ " << TWorkRequestId(wr.wr_id)); Verbs->PostSend(Connection->qp, &wr); Counters->ReadRequestStarted(); @@ -838,13 +840,13 @@ void TServerSession::ReadRequestDataCompleted( auto req = ExtractRequest(send); if (req == nullptr) { - STORAGE_WARN("READ " << TWorkRequestId(send->wr.wr_id) + RDMA_WARN("READ " << TWorkRequestId(send->wr.wr_id) << ": request is empty"); return; } if (status != IBV_WC_SUCCESS) { - STORAGE_ERROR("READ " << TWorkRequestId(send->wr.wr_id) << ": " + RDMA_ERROR("READ " << TWorkRequestId(send->wr.wr_id) << ": " << NVerbs::GetStatusString(status)); Counters->ReadRequestError(); @@ -910,7 +912,7 @@ void TServerSession::WriteResponseData(TRequestPtr req, TSendWr* send) wr.wr.rdma.rkey = req->Out.Key; wr.wr.rdma.remote_addr = req->Out.Address; - STORAGE_TRACE("WRITE " << TWorkRequestId(wr.wr_id)); + RDMA_TRACE("WRITE " << TWorkRequestId(wr.wr_id)); Verbs->PostSend(Connection->qp, &wr); Counters->WriteResponseStarted(); @@ -929,14 +931,14 @@ void TServerSession::WriteResponseDataCompleted( auto req = ExtractRequest(send); if (req == nullptr) { - STORAGE_WARN("WRITE " << TWorkRequestId(send->wr.wr_id) + RDMA_WARN("WRITE " << TWorkRequestId(send->wr.wr_id) << ": request is empty"); return; } if (status != IBV_WC_SUCCESS) { if (status != IBV_WC_SUCCESS) { - STORAGE_ERROR("WRITE " << TWorkRequestId(send->wr.wr_id) + RDMA_ERROR("WRITE " << TWorkRequestId(send->wr.wr_id) << ": " << NVerbs::GetStatusString(status)); } @@ -971,7 +973,7 @@ void TServerSession::SendResponse(TRequestPtr req, TSendWr* send) responseMsg->Status = req->Status; responseMsg->ResponseBytes = req->ResponseBytes; - STORAGE_TRACE("SEND " << TWorkRequestId(send->wr.wr_id)); + RDMA_TRACE("SEND " << TWorkRequestId(send->wr.wr_id)); Verbs->PostSend(Connection->qp, &send->wr); Counters->SendResponseStarted(); @@ -988,13 +990,13 @@ void TServerSession::SendResponseCompleted(TSendWr* send, ibv_wc_status status) auto req = ExtractRequest(send); if (req == nullptr) { - STORAGE_WARN("SEND " << TWorkRequestId(send->wr.wr_id) + RDMA_WARN("SEND " << TWorkRequestId(send->wr.wr_id) << ": request is empty"); return; } if (status != IBV_WC_SUCCESS) { - STORAGE_ERROR("SEND " << TWorkRequestId(send->wr.wr_id) + RDMA_ERROR("SEND " << TWorkRequestId(send->wr.wr_id) << ": " << NVerbs::GetStatusString(status)); Counters->SendResponseError(); @@ -1179,7 +1181,7 @@ class TConnectionPoller final return Verbs->GetConnectionEvent(EventChannel.get()); } catch (const TServiceError &e) { - STORAGE_ERROR(e.what()); + RDMA_ERROR(e.what()); return NVerbs::NullPtr; } } @@ -1330,7 +1332,7 @@ class TCompletionPoller final break; } } catch (const TServiceError& e) { - STORAGE_ERROR(e.what()); + RDMA_ERROR(e.what()); } } @@ -1359,7 +1361,7 @@ class TCompletionPoller final hasWork |= session->HandleCompletionEvents(); } catch (const TServiceError& e) { - STORAGE_ERROR(e.what()); + RDMA_ERROR(e.what()); } } @@ -1482,7 +1484,7 @@ void TServer::Start() { Log = Logging->CreateLog("BLOCKSTORE_RDMA"); - STORAGE_DEBUG("start server"); + RDMA_DEBUG("start server"); auto counters = Monitoring->GetCounters(); auto rootGroup = counters->GetSubgroup("counters", "blockstore"); @@ -1502,14 +1504,14 @@ void TServer::Start() ConnectionPoller->Start(); } catch (const TServiceError &e) { - STORAGE_ERROR("unable to start server. " << e.what()); + RDMA_ERROR("unable to start server: " << e.what()); Stop(); } } void TServer::Stop() { - STORAGE_DEBUG("stop server"); + RDMA_DEBUG("stop server"); if (ConnectionPoller) { ConnectionPoller->Stop(); @@ -1530,8 +1532,7 @@ IServerEndpointPtr TServer::StartEndpoint( IServerHandlerPtr handler) { if (ConnectionPoller == nullptr) { - STORAGE_ERROR("unable to create rdma endpoint. " - << "connection poller is down"); + RDMA_ERROR("unable to create rdma endpoint: connection poller is down"); return nullptr; } @@ -1552,7 +1553,7 @@ IServerEndpointPtr TServer::StartEndpoint( return endpoint; } catch (const TServiceError& e) { - STORAGE_ERROR("unable to create rdma endpoint. " << e.what()); + RDMA_ERROR("unable to create rdma endpoint: " << e.what()); return nullptr; } } @@ -1569,7 +1570,7 @@ void TServer::Listen(TServerEndpoint* endpoint) endpoint->Port, &hints); - STORAGE_INFO("LISTEN address " + RDMA_INFO("listen on address " << NVerbs::PrintAddress(addrinfo->ai_src_addr)); Verbs->BindAddress(endpoint->Connection.get(), addrinfo->ai_src_addr); @@ -1654,6 +1655,7 @@ void TServer::DumpHtml(IOutputStream& out) const TABLE_SORTABLE_CLASS("table table-bordered") { TABLEHEAD() { TABLER() { + TABLEH() { out << "Id"; } TABLEH() { out << "Address"; } TABLEH() { out << "Magic"; } } @@ -1661,9 +1663,11 @@ void TServer::DumpHtml(IOutputStream& out) const for (auto& session: *sessions) { TABLER() { - TABLED() { out << session->PeerAddress(); } - TABLED() - { + TABLED() { out << session->Id; } + TABLED() { + out << Verbs->GetPeer(session->Connection.get());; + } + TABLED() { Printf( out, "%08X:%08X", @@ -1682,7 +1686,7 @@ void TServer::DumpHtml(IOutputStream& out) const // implements IConnectionEventHandler void TServer::HandleConnectionEvent(rdma_cm_event* event) noexcept { - STORAGE_INFO(NVerbs::GetEventName(event->event) << " received"); + RDMA_DEBUG(NVerbs::GetEventName(event->event) << " received"); switch (event->event) { case RDMA_CM_EVENT_ADDR_RESOLVED: @@ -1730,9 +1734,8 @@ void TServer::HandleConnectRequest( { const rdma_conn_param* connectParams = &event->param.conn; - STORAGE_INFO("validate connection from " - << NVerbs::PrintAddress(rdma_get_peer_addr(event->id)) - << " " << NVerbs::PrintConnectionParams(connectParams)); + RDMA_DEBUG("validate " << Verbs->GetPeer(event->id) << " " + << NVerbs::PrintConnectionParams(connectParams)); if (connectParams->private_data == nullptr || connectParams->private_data_len < sizeof(TConnectMessage) || @@ -1782,9 +1785,8 @@ void TServer::Accept(TServerEndpoint* endpoint, rdma_cm_event* event) noexcept .rnr_retry_count = 7, }; - STORAGE_INFO("accept connection from " - << NVerbs::PrintAddress(rdma_get_peer_addr(event->id)) - << " " << NVerbs::PrintConnectionParams(&acceptParams)); + RDMA_INFO(session->Log, "accept " + << NVerbs::PrintConnectionParams(&acceptParams)); Verbs->Accept(event->id, &acceptParams); @@ -1792,7 +1794,7 @@ void TServer::Accept(TServerEndpoint* endpoint, rdma_cm_event* event) noexcept session->CompletionPoller->Acquire(std::move(session)); } catch (const TServiceError& e) { - STORAGE_ERROR(e.what()) + RDMA_ERROR(e.what()) Reject(event->id, RDMA_PROTO_FAIL); } } @@ -1804,19 +1806,20 @@ void TServer::HandleConnected(TServerSession* session) noexcept session->CompletionPoller->Attach(session); } catch (const TServiceError& e) { - STORAGE_ERROR(e.what()); + RDMA_ERROR(e.what()); session->Stop(); } } void TServer::HandleDisconnected(TServerSession* session) noexcept { + RDMA_INFO(session->Log, "disconnect") session->Flush(); } void TServer::Reject(rdma_cm_id* id, int status) noexcept { - STORAGE_INFO("reject with status " << status); + RDMA_INFO("reject " << Verbs->GetPeer(id) << " with status " << status); TRejectMessage rejectMsg = { .Status = SafeCast(status), @@ -1829,7 +1832,7 @@ void TServer::Reject(rdma_cm_id* id, int status) noexcept Verbs->Reject(id, &rejectMsg, sizeof(TRejectMessage)); } catch (const TServiceError& e) { - STORAGE_ERROR(e.what()); + RDMA_ERROR(e.what()); } } diff --git a/cloud/blockstore/libs/rdma/impl/test_verbs.cpp b/cloud/blockstore/libs/rdma/impl/test_verbs.cpp index 99ded90de25..e5eb2b0948b 100644 --- a/cloud/blockstore/libs/rdma/impl/test_verbs.cpp +++ b/cloud/blockstore/libs/rdma/impl/test_verbs.cpp @@ -355,6 +355,8 @@ struct TTestVerbs struct TConnection : rdma_cm_id { + TString Peer; + TConnection( rdma_event_channel* channel, void* context, @@ -405,7 +407,8 @@ struct TTestVerbs Y_UNUSED(dst_addr); Y_UNUSED(timeout); - id->route.addr.dst_addr.sa_family = AF_INET6; + memcpy(&id->route.addr.src_storage, src_addr, sizeof(sockaddr_storage)); + memcpy(&id->route.addr.dst_storage, dst_addr, sizeof(sockaddr_storage)); EnqueueConnectionEvent(TestContext, RDMA_CM_EVENT_ADDR_RESOLVED, id); } @@ -417,6 +420,16 @@ struct TTestVerbs EnqueueConnectionEvent(TestContext, RDMA_CM_EVENT_ROUTE_RESOLVED, id); } + TString GetPeer(rdma_cm_id *id) override + { + auto* addr = &id->route.addr.dst_addr; + + if (addr->sa_family) { + return PrintAddress(addr); + } + return "unknown"; + } + void Listen(rdma_cm_id* id, int backlog) override { if (TestContext->Listen) { diff --git a/cloud/blockstore/libs/rdma/impl/verbs.cpp b/cloud/blockstore/libs/rdma/impl/verbs.cpp index 8fcac4857c3..9dd564d2454 100644 --- a/cloud/blockstore/libs/rdma/impl/verbs.cpp +++ b/cloud/blockstore/libs/rdma/impl/verbs.cpp @@ -11,6 +11,8 @@ #include #include +#include + namespace NCloud::NBlockStore::NRdma::NVerbs { //////////////////////////////////////////////////////////////////////////////// @@ -265,6 +267,26 @@ struct TVerbs } } + TString GetPeer(rdma_cm_id *id) override + { + auto* addr = rdma_get_peer_addr(id); + char host[NI_MAXHOST]; + + int err = getnameinfo( + addr, + sizeof(sockaddr_storage), + host, + sizeof(host), + NULL, // serv + 0, // servlen + 0); // flags + + if (err) { + return PrintAddress(addr); + } + return TString(host); + } + void Listen(rdma_cm_id* id, int backlog) override { int res = rdma_listen(id, backlog); @@ -442,7 +464,7 @@ const char* GetEventName(rdma_cm_event_type event) TString PrintAddress(const sockaddr* addr) { - return NAddr::PrintHostAndPort(NAddr::TOpaqueAddr(addr)); + return NAddr::PrintHost(NAddr::TOpaqueAddr(addr)); } TString PrintConnectionParams(const rdma_conn_param* conn) diff --git a/cloud/blockstore/libs/rdma/impl/verbs.h b/cloud/blockstore/libs/rdma/impl/verbs.h index 1b5f0c3eef7..8c2b86ab67f 100644 --- a/cloud/blockstore/libs/rdma/impl/verbs.h +++ b/cloud/blockstore/libs/rdma/impl/verbs.h @@ -109,6 +109,7 @@ struct IVerbs sockaddr* dst, TDuration timeout) = 0; virtual void ResolveRoute(rdma_cm_id* id, TDuration timeout) = 0; + virtual TString GetPeer(rdma_cm_id* id) = 0; virtual void Listen(rdma_cm_id* id, int backlog) = 0; virtual void Connect(rdma_cm_id* id, rdma_conn_param* param) = 0;