From 176b5c3f4381c06fecb360997d4815f62bf7855e Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 3 Jul 2024 07:25:44 +0200 Subject: [PATCH] Create InitialConnection for TCP initial peers 3.x & 2.14 (#4946) (#4998) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Refs #20650: Add test Signed-off-by: cferreiragonz * Refs #20650: Use OpenOutputChannels for initialPeers Signed-off-by: cferreiragonz --------- Signed-off-by: cferreiragonz (cherry picked from commit 068092b8478e34819fcb810534b4f62d48e83561) Co-authored-by: Carlos Ferreira González --- .../discovery/participant/PDPSimple.cpp | 7 +- .../rtps/transport/UDPTransportInterface.cpp | 13 ++++ .../rtps/transport/UDPTransportInterface.h | 12 ++++ .../shared_mem/SharedMemTransport.cpp | 13 ++++ .../transport/shared_mem/SharedMemTransport.h | 12 ++++ .../common/BlackboxTestsTransportTCP.cpp | 64 ++++++++++++++++++- 6 files changed, 118 insertions(+), 3 deletions(-) diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp index 99cec6078ab..42cb70ba1e3 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp @@ -379,7 +379,12 @@ bool PDPSimple::create_dcps_participant_endpoints() WriterAttributes watt = create_builtin_writer_attributes(); watt.endpoint.reliabilityKind = BEST_EFFORT; - watt.endpoint.remoteLocatorList = m_discovery.initialPeersList; + if (!m_discovery.initialPeersList.empty()) + { + auto entry = LocatorSelectorEntry::create_fully_selected_entry( + m_discovery.initialPeersList, LocatorList_t()); + mp_RTPSParticipant->createSenderResources(entry); + } if (pattr.throughputController.bytesPerPeriod != UINT32_MAX && pattr.throughputController.periodMillisecs != 0) { diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 583e9b0efb3..215a7396ed2 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -432,6 +432,19 @@ bool UDPTransportInterface::OpenOutputChannel( return true; } +bool UDPTransportInterface::OpenOutputChannels( + SendResourceList& send_resource_list, + const LocatorSelectorEntry& locator_selector_entry) +{ + bool success = false; + for (size_t i = 0; i < locator_selector_entry.state.unicast.size(); ++i) + { + size_t index = locator_selector_entry.state.unicast[i]; + success |= OpenOutputChannel(send_resource_list, locator_selector_entry.unicast[index]); + } + return success; +} + Locator UDPTransportInterface::RemoteToMainLocal( const Locator& remote) const { diff --git a/src/cpp/rtps/transport/UDPTransportInterface.h b/src/cpp/rtps/transport/UDPTransportInterface.h index 749cfadb7a3..43d971e6b97 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.h +++ b/src/cpp/rtps/transport/UDPTransportInterface.h @@ -80,6 +80,18 @@ class UDPTransportInterface : public TransportInterface SendResourceList& sender_resource_list, const Locator&) override; + /** + * Opens a socket on the locators provided by the given locator_selector_entry. + * + * @param sender_resource_list Participant's send resource list. + * @param locator_selector_entry Locator selector entry with the remote entity locators. + * + * @return true if the socket was correctly opened or if finding an already opened one. + */ + bool OpenOutputChannels( + SendResourceList& sender_resource_list, + const fastrtps::rtps::LocatorSelectorEntry& locator_selector_entry) override; + /** * Converts a given remote locator (that is, a locator referring to a remote * destination) to the main local locator whose channel can write to that diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp index 065e3f61a9f..df70aba1cc6 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp @@ -388,6 +388,19 @@ bool SharedMemTransport::OpenOutputChannel( return true; } +bool SharedMemTransport::OpenOutputChannels( + SendResourceList& send_resource_list, + const LocatorSelectorEntry& locator_selector_entry) +{ + bool success = false; + for (size_t i = 0; i < locator_selector_entry.state.unicast.size(); ++i) + { + size_t index = locator_selector_entry.state.unicast[i]; + success |= OpenOutputChannel(send_resource_list, locator_selector_entry.unicast[index]); + } + return success; +} + Locator SharedMemTransport::RemoteToMainLocal( const Locator& remote) const { diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.h b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.h index 2b2989908c9..4cf7d0b1030 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.h +++ b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.h @@ -87,6 +87,18 @@ class SharedMemTransport : public TransportInterface SendResourceList& sender_resource_list, const Locator&) override; + /** + * Opens a socket on the locators provided by the given locator_selector_entry. + * + * @param sender_resource_list Participant's send resource list. + * @param locator_selector_entry Locator selector entry with the remote entity locators. + * + * @return true if the socket was correctly opened or if finding an already opened one. + */ + bool OpenOutputChannels( + SendResourceList& sender_resource_list, + const fastrtps::rtps::LocatorSelectorEntry& locator_selector_entry) override; + /** * Converts a given remote locator (that is, a locator referring to a remote * destination) to the main local locator whose channel can write to that diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index 80897508aec..a7a3edad7a3 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -674,7 +674,7 @@ TEST(TransportTCP, Client_reconnection) delete requester; } -// Test copy constructor and copy assignment for TCPv4 +// Test zero listening port for TCPv4 TEST_P(TransportTCP, TCPv4_autofill_port) { PubSubReader p1(TEST_TOPIC_NAME); @@ -704,7 +704,7 @@ TEST_P(TransportTCP, TCPv4_autofill_port) EXPECT_TRUE(IPLocator::getPhysicalPort(p2_locators.begin()[0]) == port); } -// Test copy constructor and copy assignment for TCPv6 +// Test zero listening port for TCPv6 TEST_P(TransportTCP, TCPv6_autofill_port) { PubSubReader p1(TEST_TOPIC_NAME); @@ -1325,6 +1325,66 @@ TEST_P(TransportTCP, large_message_large_data_send_receive) reader.block_for_all(); } +// Test CreateInitialConnection for TCP +TEST_P(TransportTCP, TCP_initial_peers_connection) +{ + PubSubWriter p1(TEST_TOPIC_NAME); + PubSubReader p2(TEST_TOPIC_NAME); + PubSubReader p3(TEST_TOPIC_NAME); + + // Add TCP Transport with listening port + auto p1_transport = std::make_shared(); + p1_transport->add_listener_port(global_port); + auto p2_transport = std::make_shared(); + p2_transport->add_listener_port(global_port + 1); + auto p3_transport = std::make_shared(); + p3_transport->add_listener_port(global_port - 1); + + // Add initial peer to clients + Locator_t initialPeerLocator; + initialPeerLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); + initialPeerLocator.port = global_port; + LocatorList_t initial_peer_list; + initial_peer_list.push_back(initialPeerLocator); + + // Setup participants + p1.disable_builtin_transport() + .add_user_transport_to_pparams(p1_transport); + + p2.disable_builtin_transport() + .initial_peers(initial_peer_list) + .add_user_transport_to_pparams(p2_transport); + + p3.disable_builtin_transport() + .initial_peers(initial_peer_list) + .add_user_transport_to_pparams(p3_transport); + + // Init participants + p1.init(); + p2.init(); + p3.init(); + ASSERT_TRUE(p1.isInitialized()); + ASSERT_TRUE(p2.isInitialized()); + ASSERT_TRUE(p3.isInitialized()); + + // Wait for discovery + p1.wait_discovery(2, std::chrono::seconds(0)); + p2.wait_discovery(std::chrono::seconds(0), 1); + p3.wait_discovery(std::chrono::seconds(0), 1); + + // Send and receive data + auto data = default_helloworld_data_generator(); + p2.startReception(data); + p3.startReception(data); + + p1.send(data); + EXPECT_TRUE(data.empty()); + + p2.block_for_all(); + p3.block_for_all(); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else