Skip to content

Commit

Permalink
Fix DS servers not connecting due to ports logic (#4941) (#4951)
Browse files Browse the repository at this point in the history
* Refs #21170: Add DS servers connection test

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21170: Support DS servers connection

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21170: Revision

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21170: Uncrustify

Signed-off-by: cferreiragonz <[email protected]>

* Refs #21170: Fix Windows build & comment

Signed-off-by: cferreiragonz <[email protected]>

---------

Signed-off-by: cferreiragonz <[email protected]>
(cherry picked from commit b441560)

Co-authored-by: Carlos Ferreira González <[email protected]>
  • Loading branch information
mergify[bot] and cferreiragonz authored Jun 19, 2024
1 parent bf9bac7 commit 660e1d3
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 31 deletions.
10 changes: 0 additions & 10 deletions src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,16 +487,6 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
wout->reader_data_filter(pdp_filter);
// Enable separate sending so the filter can be called for each change and reader proxy
wout->set_separate_sending(true);

if (!secure)
{
eprosima::shared_lock<eprosima::shared_mutex> disc_lock(mp_builtin->getDiscoveryMutex());

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
match_pdp_reader_nts_(it);
}
}
}
// Could not create PDP Writer, so return false
else
Expand Down
15 changes: 15 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,21 @@ RTPSParticipantImpl::RTPSParticipantImpl(
}
});
}
for (fastdds::rtps::RemoteServerAttributes& it : m_att.builtin.discovery_config.m_DiscoveryServers)
{
std::for_each(it.metatrafficUnicastLocatorList.begin(),
it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator)
{
// TCP DS default logical port is the same as the physical one
if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6)
{
if (IPLocator::getLogicalPort(locator) == 0)
{
IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator));
}
}
});
}
}
}
break;
Expand Down
19 changes: 19 additions & 0 deletions test/blackbox/api/dds-pim/PubSubParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,25 @@ class PubSubParticipant
on_participant_qos_update_ = f;
}

PubSubParticipant& fill_server_qos(
eprosima::fastdds::dds::WireProtocolConfigQos& qos,
eprosima::fastrtps::rtps::GuidPrefix_t& guid,
eprosima::fastrtps::rtps::Locator_t& locator_server,
uint16_t port,
uint32_t kind)
{
qos.builtin.discovery_config.discoveryProtocol = eprosima::fastrtps::rtps::DiscoveryProtocol_t::SERVER;
qos.prefix = guid;
// Generate server's listening locator
eprosima::fastrtps::rtps::IPLocator::setIPv4(locator_server, 127, 0, 0, 1);
eprosima::fastrtps::rtps::IPLocator::setPhysicalPort(locator_server, port);
locator_server.kind = kind;
// Leave logical port as 0 to use TCP DS default logical port
qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server);

return wire_protocol(qos);
}

private:

PubSubParticipant& operator =(
Expand Down
124 changes: 103 additions & 21 deletions test/blackbox/common/DDSBlackboxTestsDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,34 +208,28 @@ TEST(DDSDiscovery, AddDiscoveryServerToListTCP)
using namespace eprosima::fastrtps::rtps;

// TCP default DS port
std::string W_UNICAST_PORT_RANDOM_NUMBER_STR = "42100";
constexpr uint16_t W_UNICAST_PORT_RANDOM_NUMBER_STR = 42100;

/* Create first server */
PubSubParticipant<HelloWorldPubSubType> server_1(0u, 0u, 0u, 0u);
// Set participant as server
WireProtocolConfigQos server_1_qos;
server_1_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SERVER;
// Generate random GUID prefix
srand(static_cast<unsigned>(time(nullptr)));
GuidPrefix_t server_1_prefix;
for (auto i = 0; i < 12; i++)
{
server_1_prefix.value[i] = eprosima::fastrtps::rtps::octet(rand() % 254);
}
server_1_qos.prefix = server_1_prefix;
// Generate server's listening locator
uint16_t server_1_port = W_UNICAST_PORT_RANDOM_NUMBER_STR;
Locator_t locator_server_1;
IPLocator::setIPv4(locator_server_1, 127, 0, 0, 1);
uint16_t server_1_port = static_cast<uint16_t>(stoi(W_UNICAST_PORT_RANDOM_NUMBER_STR));
IPLocator::setPhysicalPort(locator_server_1, server_1_port);
locator_server_1.kind = LOCATOR_KIND_TCPv4;
// Leave logical port as 0 to use TCP DS default logical port
server_1_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server_1);
// Add TCP transport
auto descriptor_1 = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
descriptor_1->add_listener_port(server_1_port);

// Init server
ASSERT_TRUE(server_1.wire_protocol(server_1_qos)
ASSERT_TRUE(server_1.fill_server_qos(server_1_qos, server_1_prefix, locator_server_1, server_1_port,
LOCATOR_KIND_TCPv4)
.disable_builtin_transport()
.add_user_transport_to_pparams(descriptor_1)
.init_participant());
Expand All @@ -244,25 +238,17 @@ TEST(DDSDiscovery, AddDiscoveryServerToListTCP)
PubSubParticipant<HelloWorldPubSubType> server_2(0u, 0u, 0u, 0u);
// Set participant as server
WireProtocolConfigQos server_2_qos;
server_2_qos.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SERVER;
// Generate random GUID prefix
GuidPrefix_t server_2_prefix = server_1_prefix;
server_2_prefix.value[11]++;
server_2_qos.prefix = server_2_prefix;
// Generate server's listening locator
Locator_t locator_server_2;
IPLocator::setIPv4(locator_server_2, 127, 0, 0, 1);
uint16_t server_2_port = server_1_port + 1;
IPLocator::setPhysicalPort(locator_server_2, server_2_port);
locator_server_2.kind = LOCATOR_KIND_TCPv4;
// Leave logical port as 0 to use TCP DS default logical port
server_2_qos.builtin.metatrafficUnicastLocatorList.push_back(locator_server_2);
// Add TCP transport
auto descriptor_2 = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
descriptor_2->add_listener_port(server_2_port);

// Init server
ASSERT_TRUE(server_2.wire_protocol(server_2_qos)
ASSERT_TRUE(server_2.fill_server_qos(server_2_qos, server_2_prefix, locator_server_2, server_2_port,
LOCATOR_KIND_TCPv4)
.disable_builtin_transport()
.add_user_transport_to_pparams(descriptor_2)
.init_participant());
Expand Down Expand Up @@ -331,6 +317,102 @@ TEST(DDSDiscovery, AddDiscoveryServerToListTCP)
server_2.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows client1 and server1
}

TEST(DDSDiscovery, ServersConnectionTCP)
{
using namespace eprosima;
using namespace eprosima::fastdds::dds;
using namespace eprosima::fastrtps::rtps;

// TCP default DS port
constexpr uint16_t W_UNICAST_PORT_RANDOM_NUMBER_STR = 41100;

/* Create first server */
PubSubParticipant<HelloWorldPubSubType> server_1(0u, 0u, 0u, 0u);
// Set participant as server
WireProtocolConfigQos server_1_qos;
// Generate random GUID prefix
srand(static_cast<unsigned>(time(nullptr)));
GuidPrefix_t server_1_prefix;
for (auto i = 0; i < 12; i++)
{
server_1_prefix.value[i] = eprosima::fastrtps::rtps::octet(rand() % 254);
}
Locator_t locator_server_1;
uint16_t server_1_port = W_UNICAST_PORT_RANDOM_NUMBER_STR;
// Add TCP transport
auto descriptor_1 = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
descriptor_1->add_listener_port(server_1_port);
// Init server
ASSERT_TRUE(server_1.fill_server_qos(server_1_qos, server_1_prefix, locator_server_1, server_1_port,
LOCATOR_KIND_TCPv4)
.disable_builtin_transport()
.add_user_transport_to_pparams(descriptor_1)
.init_participant());

/* Create second server */
PubSubParticipant<HelloWorldPubSubType> server_2(0u, 0u, 0u, 0u);
// Set participant as server
WireProtocolConfigQos server_2_qos;
GuidPrefix_t server_2_prefix = server_1_prefix;
server_2_prefix.value[11]++;
Locator_t locator_server_2;
uint16_t server_2_port = server_1_port + 1;
// Add TCP transport
auto descriptor_2 = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
descriptor_2->add_listener_port(server_2_port);

// Connect to first server
RemoteServerAttributes server_1_att;
server_1_att.guidPrefix = server_1_prefix;
server_1_att.metatrafficUnicastLocatorList.push_back(Locator_t(locator_server_1));
server_2_qos.builtin.discovery_config.m_DiscoveryServers.push_back(server_1_att);

// Init server
ASSERT_TRUE(server_2.fill_server_qos(server_2_qos, server_2_prefix, locator_server_2, server_2_port,
LOCATOR_KIND_TCPv4)
.disable_builtin_transport()
.add_user_transport_to_pparams(descriptor_2)
.init_participant());

/* Create third server */
PubSubParticipant<HelloWorldPubSubType> server_3(0u, 0u, 0u, 0u);
// Set participant as server
WireProtocolConfigQos server_3_qos;
GuidPrefix_t server_3_prefix = server_1_prefix;
server_3_prefix.value[11]--;
Locator_t locator_server_3;
uint16_t server_3_port = server_1_port - 1;
// Add TCP transport
auto descriptor_3 = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
descriptor_3->add_listener_port(server_3_port);
// Connect to first server
server_3_qos.builtin.discovery_config.m_DiscoveryServers.push_back(server_1_att);

// Init server
ASSERT_TRUE(server_3.fill_server_qos(server_3_qos, server_3_prefix, locator_server_3, server_3_port,
LOCATOR_KIND_TCPv4)
.disable_builtin_transport()
.add_user_transport_to_pparams(descriptor_3)
.init_participant());

// Check adding servers before initialization
server_1.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows server2 and server3
server_2.wait_discovery(std::chrono::seconds::zero(), 1, true); // Knows server1
server_3.wait_discovery(std::chrono::seconds::zero(), 1, true); // Knows server1

/* Add server_3 to server_2 */
RemoteServerAttributes server_3_att;
server_3_att.guidPrefix = server_3_prefix;
server_3_att.metatrafficUnicastLocatorList.push_back(Locator_t(locator_server_3));
server_2_qos.builtin.discovery_config.m_DiscoveryServers.push_back(server_3_att);
ASSERT_TRUE(server_2.update_wire_protocol(server_2_qos));

// Check adding servers after initialization
server_1.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows server2 and server3
server_2.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows server1 and server3
server_3.wait_discovery(std::chrono::seconds::zero(), 2, true); // Knows server1 and server2
}

/**
* This test checks the addition of network interfaces at run-time.
*
Expand Down

0 comments on commit 660e1d3

Please sign in to comment.