Skip to content

Commit

Permalink
Create InitialConnection for TCP initial peers 3.x & 2.14 (#4946)
Browse files Browse the repository at this point in the history
* Refs #20650: Add test

Signed-off-by: cferreiragonz <[email protected]>

* Refs #20650: Use OpenOutputChannels for initialPeers

Signed-off-by: cferreiragonz <[email protected]>

---------

Signed-off-by: cferreiragonz <[email protected]>
  • Loading branch information
cferreiragonz authored Jun 25, 2024
1 parent 43510c3 commit 068092b
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 3 deletions.
7 changes: 6 additions & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,12 @@ bool PDPSimple::create_dcps_participant_endpoints()

WriterAttributes watt = create_builtin_writer_attributes();
watt.endpoint.reliabilityKind = BEST_EFFORT;
watt.endpoint.remoteLocatorList = m_discovery.initialPeersList;
if (!m_discovery.initialPeersList.empty())
{
auto entry = LocatorSelectorEntry::create_fully_selected_entry(
m_discovery.initialPeersList, LocatorList_t());
mp_RTPSParticipant->createSenderResources(entry);
}

// We assume that if we have at least one flow controller defined, we use async flow controller
if (!pattr.flow_controllers.empty())
Expand Down
13 changes: 13 additions & 0 deletions src/cpp/rtps/transport/UDPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,19 @@ bool UDPTransportInterface::OpenOutputChannel(
return true;
}

bool UDPTransportInterface::OpenOutputChannels(
SendResourceList& send_resource_list,
const LocatorSelectorEntry& locator_selector_entry)
{
bool success = false;
for (size_t i = 0; i < locator_selector_entry.state.unicast.size(); ++i)
{
size_t index = locator_selector_entry.state.unicast[i];
success |= OpenOutputChannel(send_resource_list, locator_selector_entry.unicast[index]);
}
return success;
}

Locator UDPTransportInterface::RemoteToMainLocal(
const Locator& remote) const
{
Expand Down
12 changes: 12 additions & 0 deletions src/cpp/rtps/transport/UDPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ class UDPTransportInterface : public TransportInterface
SendResourceList& sender_resource_list,
const Locator&) override;

/**
* Opens a socket on the locators provided by the given locator_selector_entry.
*
* @param sender_resource_list Participant's send resource list.
* @param locator_selector_entry Locator selector entry with the remote entity locators.
*
* @return true if the socket was correctly opened or if finding an already opened one.
*/
bool OpenOutputChannels(
SendResourceList& sender_resource_list,
const LocatorSelectorEntry& locator_selector_entry) override;

/**
* Converts a given remote locator (that is, a locator referring to a remote
* destination) to the main local locator whose channel can write to that
Expand Down
13 changes: 13 additions & 0 deletions src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,19 @@ bool SharedMemTransport::OpenOutputChannel(
return true;
}

bool SharedMemTransport::OpenOutputChannels(
SendResourceList& send_resource_list,
const LocatorSelectorEntry& locator_selector_entry)
{
bool success = false;
for (size_t i = 0; i < locator_selector_entry.state.unicast.size(); ++i)
{
size_t index = locator_selector_entry.state.unicast[i];
success |= OpenOutputChannel(send_resource_list, locator_selector_entry.unicast[index]);
}
return success;
}

Locator SharedMemTransport::RemoteToMainLocal(
const Locator& remote) const
{
Expand Down
12 changes: 12 additions & 0 deletions src/cpp/rtps/transport/shared_mem/SharedMemTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ class SharedMemTransport : public TransportInterface
SendResourceList& sender_resource_list,
const Locator&) override;

/**
* Opens a socket on the locators provided by the given locator_selector_entry.
*
* @param sender_resource_list Participant's send resource list.
* @param locator_selector_entry Locator selector entry with the remote entity locators.
*
* @return true if the socket was correctly opened or if finding an already opened one.
*/
bool OpenOutputChannels(
SendResourceList& sender_resource_list,
const LocatorSelectorEntry& locator_selector_entry) override;

/**
* Converts a given remote locator (that is, a locator referring to a remote
* destination) to the main local locator whose channel can write to that
Expand Down
64 changes: 62 additions & 2 deletions test/blackbox/common/BlackboxTestsTransportTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ TEST(TransportTCP, Client_reconnection)
delete requester;
}

// Test copy constructor and copy assignment for TCPv4
// Test zero listening port for TCPv4
TEST_P(TransportTCP, TCPv4_autofill_port)
{
PubSubReader<HelloWorldPubSubType> p1(TEST_TOPIC_NAME);
Expand Down Expand Up @@ -704,7 +704,7 @@ TEST_P(TransportTCP, TCPv4_autofill_port)
EXPECT_TRUE(IPLocator::getPhysicalPort(p2_locators.begin()[0]) == port);
}

// Test copy constructor and copy assignment for TCPv6
// Test zero listening port for TCPv6
TEST_P(TransportTCP, TCPv6_autofill_port)
{
PubSubReader<HelloWorldPubSubType> p1(TEST_TOPIC_NAME);
Expand Down Expand Up @@ -1325,6 +1325,66 @@ TEST_P(TransportTCP, large_message_large_data_send_receive)
reader.block_for_all();
}

// Test CreateInitialConnection for TCP
TEST_P(TransportTCP, TCP_initial_peers_connection)
{
PubSubWriter<HelloWorldPubSubType> p1(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> p2(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> p3(TEST_TOPIC_NAME);

// Add TCP Transport with listening port
auto p1_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
p1_transport->add_listener_port(global_port);
auto p2_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
p2_transport->add_listener_port(global_port + 1);
auto p3_transport = std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();
p3_transport->add_listener_port(global_port - 1);

// Add initial peer to clients
Locator_t initialPeerLocator;
initialPeerLocator.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1);
initialPeerLocator.port = global_port;
LocatorList_t initial_peer_list;
initial_peer_list.push_back(initialPeerLocator);

// Setup participants
p1.disable_builtin_transport()
.add_user_transport_to_pparams(p1_transport);

p2.disable_builtin_transport()
.initial_peers(initial_peer_list)
.add_user_transport_to_pparams(p2_transport);

p3.disable_builtin_transport()
.initial_peers(initial_peer_list)
.add_user_transport_to_pparams(p3_transport);

// Init participants
p1.init();
p2.init();
p3.init();
ASSERT_TRUE(p1.isInitialized());
ASSERT_TRUE(p2.isInitialized());
ASSERT_TRUE(p3.isInitialized());

// Wait for discovery
p1.wait_discovery(2, std::chrono::seconds(0));
p2.wait_discovery(std::chrono::seconds(0), 1);
p3.wait_discovery(std::chrono::seconds(0), 1);

// Send and receive data
auto data = default_helloworld_data_generator();
p2.startReception(data);
p3.startReception(data);

p1.send(data);
EXPECT_TRUE(data.empty());

p2.block_for_all();
p3.block_for_all();
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down

0 comments on commit 068092b

Please sign in to comment.