Skip to content

Commit

Permalink
Fix Discovery Server over TCP (#4584)
Browse files Browse the repository at this point in the history
* Refs #20628: Update OpenOutputChannel

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Refactor PDPClient to handle initial TPC connections

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Configuration to use logical port 0 as default in DS

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Refactor PDPServer to handle initial TPC connections

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Add TCP DS blackbox test

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Uncrustify

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Check transport in function

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Check interface changes before creating new send resources

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Minor corrections

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Check loc.kind and methods in RTPSPartImpl

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Check loc.kind for default logical port

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20628: Fix windows

Signed-off-by: cferreiragonz <[email protected]>

---------

Signed-off-by: cferreiragonz <[email protected]>
(cherry picked from commit 9ff962c)
  • Loading branch information
cferreiragonz committed Apr 10, 2024
1 parent 407a93b commit 266cea9
Show file tree
Hide file tree
Showing 6 changed files with 359 additions and 24 deletions.
22 changes: 22 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <rtps/builtin/discovery/participant/DS/PDPSecurityInitiatorListener.hpp>
#include <rtps/builtin/discovery/participant/timedevent/DSClientEvent.h>
#include <rtps/participant/RTPSParticipantImpl.h>
#include <fastdds/rtps/transport/TCPTransportDescriptor.h>
#include <utils/SystemInfo.hpp>
#include <vector>

Expand Down Expand Up @@ -443,6 +444,15 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
{
eprosima::shared_lock<eprosima::shared_mutex> disc_lock(mp_builtin->getDiscoveryMutex());

// TCP Clients need to handle logical ports
if (mp_RTPSParticipant->has_tcp_transports())
{
for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
mp_RTPSParticipant->create_tcp_connections(it.metatrafficUnicastLocatorList);
}
}

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList);
Expand Down Expand Up @@ -841,8 +851,20 @@ void PDPClient::update_remote_servers_list()
{
eprosima::shared_lock<eprosima::shared_mutex> disc_lock(mp_builtin->getDiscoveryMutex());

// TCP Clients need to handle logical ports
bool set_logicals = mp_RTPSParticipant->has_tcp_transports();

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) ||
!endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader()))
{
if (set_logicals)
{
mp_RTPSParticipant->create_tcp_connections(it.metatrafficUnicastLocatorList);
}
}

if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()))
{
match_pdp_writer_nts_(it);
Expand Down
25 changes: 25 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <fastdds/rtps/participant/RTPSParticipantListener.h>
#include <fastdds/rtps/reader/StatefulReader.h>
#include <fastdds/rtps/writer/StatefulWriter.h>
#include <fastdds/rtps/transport/TCPTransportDescriptor.h>

#include <fastdds/rtps/history/WriterHistory.h>
#include <fastdds/rtps/history/ReaderHistory.h>
Expand Down Expand Up @@ -512,6 +513,15 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
{
eprosima::shared_lock<eprosima::shared_mutex> disc_lock(mp_builtin->getDiscoveryMutex());

// TCP Clients need to handle logical ports
if (mp_RTPSParticipant->has_tcp_transports())
{
for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
mp_RTPSParticipant->create_tcp_connections(it.metatrafficUnicastLocatorList);
}
}

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList);
Expand Down Expand Up @@ -1183,8 +1193,20 @@ void PDPServer::update_remote_servers_list()

eprosima::shared_lock<eprosima::shared_mutex> disc_lock(mp_builtin->getDiscoveryMutex());

// TCP Clients need to handle logical ports
bool set_logicals = mp_RTPSParticipant->has_tcp_transports();

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) ||
!endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader()))
{
if (set_logicals)
{
mp_RTPSParticipant->create_tcp_connections(it.metatrafficUnicastLocatorList);
}
}

if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()))
{
match_pdp_writer_nts_(it);
Expand All @@ -1200,6 +1222,9 @@ void PDPServer::update_remote_servers_list()
{
discovery_db_.add_server(server.guidPrefix);
}

// Need to reactivate the server thread to send the DATA(p) to the new servers
awake_server_thread();
}

bool PDPServer::process_writers_acknowledgements()
Expand Down
146 changes: 134 additions & 12 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,68 @@ RTPSParticipantImpl::RTPSParticipantImpl(
switch (m_att.builtin.discovery_config.discoveryProtocol)
{
case DiscoveryProtocol::BACKUP:
case DiscoveryProtocol::CLIENT:
case DiscoveryProtocol::SERVER:
// Verify if listening ports are provided
for (auto& transportDescriptor : m_att.userTransports)
{
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT)
{
if (pT->listening_ports.empty())
{
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT,
"Participant " << m_att.getName() << " with GUID " << m_guid <<
" tries to create a TCP server for discovery server without providing a proper listening port.");
break;
}
if (!m_att.builtin.metatrafficUnicastLocatorList.empty())
{
std::for_each(m_att.builtin.metatrafficUnicastLocatorList.begin(),
m_att.builtin.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator)
{
// TCP DS default logical port is the same as the physical one
if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6)
{
if (IPLocator::getLogicalPort(locator) == 0)
{
IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator));
}
}
});
}
}
}
break;
case DiscoveryProtocol::CLIENT:
case DiscoveryProtocol::SUPER_CLIENT:
// Verify if listening ports are provided
for (auto& transportDescriptor : m_att.userTransports)
{
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT && pT->listening_ports.empty())
if (pT)
{
EPROSIMA_LOG_INFO(RTPS_PARTICIPANT,
"Participant " << m_att.getName() << " with GUID " << m_guid <<
" tries to use discovery server over TCP without providing a proper listening port.");
if (pT->listening_ports.empty())
{
EPROSIMA_LOG_INFO(RTPS_PARTICIPANT,
"Participant " << m_att.getName() << " with GUID " << m_guid <<
" tries to create a TCP client for discovery server without providing a proper listening port." <<
" No TCP participants will be able to connect to this participant, but it will be able make connections.");
}
for (fastdds::rtps::RemoteServerAttributes& it : m_att.builtin.discovery_config.m_DiscoveryServers)
{
std::for_each(it.metatrafficUnicastLocatorList.begin(),
it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator)
{
// TCP DS default logical port is the same as the physical one
if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6)
{
if (IPLocator::getLogicalPort(locator) == 0)
{
IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator));
}
}
});
}
}
}
default:
Expand Down Expand Up @@ -1299,8 +1349,37 @@ void RTPSParticipantImpl::update_attributes(
auto pdp = mp_builtinProtocols->mp_PDP;
bool update_pdp = false;

// Check if discovery servers need to be updated
eprosima::fastdds::rtps::RemoteServerList_t converted_discovery_servers =
patt.builtin.discovery_config.m_DiscoveryServers;
if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers)
{
for (auto& transportDescriptor : m_att.userTransports)
{
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT)
{
for (fastdds::rtps::RemoteServerAttributes& it : converted_discovery_servers)
{
std::for_each(it.metatrafficUnicastLocatorList.begin(),
it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator)
{
// TCP DS default logical port is the same as the physical one
if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6)
{
if (IPLocator::getLogicalPort(locator) == 0)
{
IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator));
}
}
});
}
}
}
}

// Check if there are changes
if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers
if (converted_discovery_servers != m_att.builtin.discovery_config.m_DiscoveryServers
|| patt.userData != m_att.userData
|| local_interfaces_changed)
{
Expand Down Expand Up @@ -1337,7 +1416,7 @@ void RTPSParticipantImpl::update_attributes(
for (auto existing_server : m_att.builtin.discovery_config.m_DiscoveryServers)
{
bool contained = false;
for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers)
for (auto incoming_server : converted_discovery_servers)
{
if (existing_server.guidPrefix == incoming_server.guidPrefix)
{
Expand Down Expand Up @@ -1396,9 +1475,12 @@ void RTPSParticipantImpl::update_attributes(
local_participant_proxy_data->default_locators.add_unicast_locator(locator);
}

createSenderResources(m_att.builtin.metatrafficMulticastLocatorList);
createSenderResources(m_att.builtin.metatrafficUnicastLocatorList);
createSenderResources(m_att.defaultUnicastLocatorList);
if (local_interfaces_changed)
{
createSenderResources(m_att.builtin.metatrafficMulticastLocatorList);
createSenderResources(m_att.builtin.metatrafficUnicastLocatorList);
createSenderResources(m_att.defaultUnicastLocatorList);
}
if (!modified_locators.empty())
{
createSenderResources(modified_locators);
Expand All @@ -1410,8 +1492,8 @@ void RTPSParticipantImpl::update_attributes(
m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER ||
m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP)
{
// Add incoming servers iff we don't know about them already or the listening locator has been modified
for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers)
// Add incoming servers if we don't know about them already or the listening locator has been modified
for (auto incoming_server : converted_discovery_servers)
{
eprosima::fastdds::rtps::RemoteServerList_t::iterator server_it;
for (server_it = m_att.builtin.discovery_config.m_DiscoveryServers.begin();
Expand Down Expand Up @@ -2226,6 +2308,38 @@ fastdds::dds::builtin::TypeLookupManager* RTPSParticipantImpl::typelookup_manage
return mp_builtinProtocols->tlm_;
}

bool RTPSParticipantImpl::has_tcp_transports()
{
const RTPSParticipantAttributes& pattr = getRTPSParticipantAttributes();
bool has_tcp_transports = false;
for (auto& transportDescriptor : pattr.userTransports)
{
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT)
{
has_tcp_transports = true;
break;
}
}

return has_tcp_transports;
}

void RTPSParticipantImpl::create_tcp_connections(
const LocatorList_t& locators)
{
for (const Locator_t& loc : locators)
{
if (loc.kind == LOCATOR_KIND_TCPv4 || loc.kind == LOCATOR_KIND_TCPv6)
{
// Set logical port to 0 and call createSenderResources to allow opening a TCP CONNECT channel in the transport
Locator_t loc_with_logical_zero = loc;
IPLocator::setLogicalPort(loc_with_logical_zero, 0);
createSenderResources(loc_with_logical_zero);
}
}
}

IPersistenceService* RTPSParticipantImpl::get_persistence_service(
const EndpointAttributes& param)
{
Expand Down Expand Up @@ -2427,9 +2541,17 @@ bool RTPSParticipantImpl::did_mutation_took_place_on_meta(
case LOCATOR_KIND_TCPv4:
set_wan_address(ret);
IPLocator::setPhysicalPort(ret, Tcp4ListeningPort());
if (IPLocator::getLogicalPort(ret) == 0)
{
IPLocator::setLogicalPort(ret, IPLocator::getPhysicalPort(ret));
}
break;
case LOCATOR_KIND_TCPv6:
IPLocator::setPhysicalPort(ret, Tcp6ListeningPort());
if (IPLocator::getLogicalPort(ret) == 0)
{
IPLocator::setLogicalPort(ret, IPLocator::getPhysicalPort(ret));
}
break;
}
return ret;
Expand Down
13 changes: 13 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,19 @@ class RTPSParticipantImpl
return has_shm_transport_;
}

//! Check if the participant has at least one TCP transport
bool has_tcp_transports();

/**
* This method creates the needed sender resources for a locator list, but forces
* each logical port to be zero. It is used to enforce the proper creation of a
* CONNECT channel in TCP scenarios.
*
* @param locators List of unicast locators.
*/
void create_tcp_connections(
const LocatorList_t& locators);

uint32_t get_min_network_send_buffer_size()
{
return m_network_Factory.get_min_send_buffer_size();
Expand Down
35 changes: 25 additions & 10 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -679,10 +679,14 @@ bool TCPTransportInterface::OpenOutputChannel(
return false;
}

bool always_connect = false;
uint16_t logical_port = IPLocator::getLogicalPort(locator);
if (0 == logical_port)
{
return false;
// During builtin endpoints setup, a logical port equal to 0 indicates that the locator belongs
// to discovery server remote server. A connect channel is always needed.
// Should only be called once to avoid adding a logical port equal to 0.
always_connect = true;
}

Locator physical_locator = IPLocator::toPhysicalLocator(locator);
Expand All @@ -701,6 +705,8 @@ bool TCPTransportInterface::OpenOutputChannel(
IPLocator::WanToLanLocator(physical_locator) ==
tcp_sender_resource->locator())))
{

// If missing, logical port will be added in first send()
// Add logical port to channel if it's not there yet
auto channel_resource = channel_resources_.find(physical_locator);

Expand All @@ -711,14 +717,17 @@ bool TCPTransportInterface::OpenOutputChannel(
channel_resource = channel_resources_.find(IPLocator::toPhysicalLocator(wan_locator));
}

if (channel_resource != channel_resources_.end())
if (logical_port != 0)
{
channel_resource->second->add_logical_port(logical_port, rtcp_message_manager_.get());
}
else
{
std::lock_guard<std::mutex> channelPendingLock(channel_pending_logical_ports_mutex_);
channel_pending_logical_ports_[physical_locator].insert(logical_port);
if (channel_resource != channel_resources_.end())
{
channel_resource->second->add_logical_port(logical_port, rtcp_message_manager_.get());
}
else
{
std::lock_guard<std::mutex> channelPendingLock(channel_pending_logical_ports_mutex_);
channel_pending_logical_ports_[physical_locator].insert(logical_port);
}
}

statistics_info_.add_entry(locator);
Expand Down Expand Up @@ -792,7 +801,9 @@ bool TCPTransportInterface::OpenOutputChannel(
// If the remote physical port is higher than our listening port, a new CONNECT channel needs to be created and connected
// and the locator added to the send_resource_list.
// If the remote physical port is lower than our listening port, only the locator needs to be added to the send_resource_list.
if (IPLocator::getPhysicalPort(physical_locator) > listening_port || local_lower_interface)
// If the ports are equal, the CONNECT channel is created if the local interface is lower.
// If this locator belong to a DS server, a CONNECT channel is always needed.
if (always_connect || IPLocator::getPhysicalPort(physical_locator) > listening_port || local_lower_interface)
{
// Client side (either Server-Client or LARGE_DATA)
EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel: [CONNECT] (physical: "
Expand All @@ -814,7 +825,11 @@ bool TCPTransportInterface::OpenOutputChannel(

channel_resources_[physical_locator] = channel;
channel->connect(channel_resources_[physical_locator]);
channel->add_logical_port(logical_port, rtcp_message_manager_.get());
// Add logical port only if it's not 0
if (!always_connect)
{
channel->add_logical_port(logical_port, rtcp_message_manager_.get());
}
}
else
{
Expand Down
Loading

0 comments on commit 266cea9

Please sign in to comment.