Skip to content

Commit

Permalink
Discard local SHM locators that cannot be openned (#5118)
Browse files Browse the repository at this point in the history
* Refs #19036: NetworkFactory.h -> NetworkFactory.hpp

Signed-off-by: eduponz <[email protected]>

* Refs #19036: Add NetworkFactory::is_locator_reachable API

Signed-off-by: eduponz <[email protected]>

* Refs #19036: ProxyDataFilters::filter_locators delegates locator checking to the NetworkFactory

Signed-off-by: eduponz <[email protected]>

* Refs #19036: NetworkFactory::is_locator_reachable iterates over the transports until one reports locator reachability

Signed-off-by: eduponz <[email protected]>

* Refs #19036: SharedMemTransport::is_locator_reachable discards non-shm non-local ports, and shm local ports that cannot open to write

Signed-off-by: eduponz <[email protected]>

* Refs #19036: Apply suggestions

Signed-off-by: elianalf <[email protected]>

* Refs #19036: Protect openned_ports_ collection

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #19036: Add missing protection

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: eduponz <[email protected]>
Signed-off-by: elianalf <[email protected]>
Signed-off-by: Mario Dominguez <[email protected]>
Co-authored-by: elianalf <[email protected]>
Co-authored-by: Mario Dominguez <[email protected]>
  • Loading branch information
3 people committed Aug 6, 2024
1 parent 5a75fc4 commit af94cb5
Show file tree
Hide file tree
Showing 35 changed files with 238 additions and 113 deletions.
10 changes: 10 additions & 0 deletions include/fastdds/rtps/transport/ChainingTransport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,16 @@ class ChainingTransport : public TransportInterface
return low_level_transport_->is_locator_allowed(locator);
}

/*!
* Call the low-level transport `is_locator_reachable()`.
* Must report whether the given locator is reachable by this transport.
*/
FASTDDS_EXPORTED_API bool is_locator_reachable(
const fastdds::rtps::Locator_t& locator) override
{
return low_level_transport_->is_locator_reachable(locator);
}

protected:

std::unique_ptr<TransportInterface> low_level_transport_;
Expand Down
11 changes: 11 additions & 0 deletions include/fastdds/rtps/transport/TransportInterface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ class FASTDDS_EXPORTED_API TransportInterface
virtual bool is_locator_allowed(
const Locator&) const = 0;

/**
* Must report whether the given locator is reachable by this transport.
*
* @param [in] locator @ref Locator for which the reachability is checked.
*
* @return true if the input locator is reachable by this transport, false otherwise.
*/
virtual bool is_locator_reachable(
const Locator_t& locator) = 0;


//! Returns the locator describing the main (most general) channel that can write to the provided remote locator.
virtual Locator RemoteToMainLocal(
const Locator& remote) const = 0;
Expand Down
18 changes: 8 additions & 10 deletions src/cpp/rtps/builtin/data/ParticipantProxyData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#include <rtps/builtin/data/ReaderProxyData.hpp>
#include <rtps/builtin/data/WriterProxyData.hpp>
#include <rtps/builtin/discovery/participant/PDPSimple.h>
#include <rtps/network/NetworkFactory.h>
#include <rtps/network/NetworkFactory.hpp>
#include <rtps/resources/TimedEvent.h>
#include <rtps/transport/shared_mem/SHMLocator.hpp>
#include <utils/TimeConversion.hpp>
Expand Down Expand Up @@ -446,16 +446,15 @@ bool ParticipantProxyData::writeToCDRMessage(
bool ParticipantProxyData::readFromCDRMessage(
CDRMessage_t* msg,
bool use_encapsulation,
const NetworkFactory& network,
bool is_shm_transport_available,
NetworkFactory& network,
bool should_filter_locators,
fastdds::rtps::VendorId_t source_vendor_id)
{
auto param_process =
[this, &network, &is_shm_transport_available, &should_filter_locators, source_vendor_id](
[this, &network, &should_filter_locators, source_vendor_id](
CDRMessage_t* msg, const ParameterId_t& pid, uint16_t plength)
{
static_cast<void>(source_vendor_id);
m_VendorId = source_vendor_id;
switch (pid){
case fastdds::dds::PID_KEY_HASH:
{
Expand Down Expand Up @@ -503,7 +502,6 @@ bool ParticipantProxyData::readFromCDRMessage(

m_VendorId[0] = p.vendorId[0];
m_VendorId[1] = p.vendorId[1];
is_shm_transport_available &= (m_VendorId == c_VendorId_eProsima);
break;
}
case fastdds::dds::PID_PRODUCT_VERSION:
Expand Down Expand Up @@ -613,7 +611,7 @@ bool ParticipantProxyData::readFromCDRMessage(
m_guid.is_from_this_host()))
{
ProxyDataFilters::filter_locators(
is_shm_transport_available,
network,
metatraffic_locators,
temp_locator,
false);
Expand Down Expand Up @@ -643,7 +641,7 @@ bool ParticipantProxyData::readFromCDRMessage(
m_guid.is_from_this_host()))
{
ProxyDataFilters::filter_locators(
is_shm_transport_available,
network,
metatraffic_locators,
temp_locator,
true);
Expand Down Expand Up @@ -673,7 +671,7 @@ bool ParticipantProxyData::readFromCDRMessage(
m_guid.is_from_this_host()))
{
ProxyDataFilters::filter_locators(
is_shm_transport_available,
network,
default_locators,
temp_locator,
true);
Expand Down Expand Up @@ -703,7 +701,7 @@ bool ParticipantProxyData::readFromCDRMessage(
m_guid.is_from_this_host()))
{
ProxyDataFilters::filter_locators(
is_shm_transport_available,
network,
default_locators,
temp_locator,
false);
Expand Down
7 changes: 4 additions & 3 deletions src/cpp/rtps/builtin/data/ParticipantProxyData.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
#include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>
#include <fastdds/rtps/attributes/WriterAttributes.hpp>
#include <fastdds/rtps/builtin/data/BuiltinEndpoints.hpp>
#include <fastdds/rtps/common/ProductVersion_t.hpp>
#include <fastdds/rtps/common/RemoteLocators.hpp>
#include <fastdds/rtps/common/Token.hpp>
#include <fastdds/rtps/common/ProductVersion_t.hpp>
#include <fastdds/rtps/common/VendorId_t.hpp>

#include <rtps/network/NetworkFactory.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {
Expand Down Expand Up @@ -157,8 +159,7 @@ class ParticipantProxyData
bool readFromCDRMessage(
CDRMessage_t* msg,
bool use_encapsulation,
const NetworkFactory& network,
bool is_shm_transport_available,
NetworkFactory& network,
bool should_filter_locators,
fastdds::rtps::VendorId_t source_vendor_id = c_VendorId_eProsima);

Expand Down
25 changes: 10 additions & 15 deletions src/cpp/rtps/builtin/data/ProxyDataFilters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _FASTDDS_RTPS_BUILTIN_DATA_PROXYDATAFILTERS_H_
#define _FASTDDS_RTPS_BUILTIN_DATA_PROXYDATAFILTERS_H_
#ifndef FASTDDS_RTPS_BUILTIN_DATA__PROXYDATAFILTERS_HPP
#define FASTDDS_RTPS_BUILTIN_DATA__PROXYDATAFILTERS_HPP

#include <fastdds/rtps/common/RemoteLocators.hpp>

#include <rtps/network/NetworkFactory.hpp>
#include <rtps/transport/shared_mem/SHMLocator.hpp>

namespace eprosima {
Expand All @@ -30,27 +32,20 @@ class ProxyDataFilters
public:

/**
* This function filters out SHM locators when they cannot be used for communication on the local host.
* @param [in] is_shm_transport_available Indicates whether the participant has SHM transport enabled.
* @brief This function filters out unreachable locators.
*
* @param [in] network_factory Reference to the @ref NetworkFactory
* @param [in,out] target_locators_list List where parsed locators are stored
* @param [in] temp_locator New locator to parse
* @param [in] is_unicast true if temp_locator is unicast, false if it is multicast
*/
static void filter_locators(
bool is_shm_transport_available,
NetworkFactory& network_factory,
RemoteLocatorList& target_locators_list,
const Locator_t& temp_locator,
bool is_unicast)
{
using SHMLocator = eprosima::fastdds::rtps::SHMLocator;

bool can_use_locator = LOCATOR_KIND_SHM != temp_locator.kind;
if (!can_use_locator)
{
can_use_locator = is_shm_transport_available && SHMLocator::is_shm_and_from_this_host(temp_locator);
}

if (can_use_locator)
if (network_factory.is_locator_reachable(temp_locator))
{
if (is_unicast)
{
Expand All @@ -69,4 +64,4 @@ class ProxyDataFilters
} /* namespace fastdds */
} /* namespace eprosima */

#endif // _FASTDDS_RTPS_BUILTIN_DATA_PROXYDATAFILTERS_H_
#endif // FASTDDS_RTPS_BUILTIN_DATA__PROXYDATAFILTERS_HPP
14 changes: 6 additions & 8 deletions src/cpp/rtps/builtin/data/ReaderProxyData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <fastdds/rtps/common/CDRMessage_t.hpp>
#include <fastdds/rtps/common/VendorId_t.hpp>

#include <rtps/network/NetworkFactory.h>
#include <rtps/network/NetworkFactory.hpp>

#include "ProxyDataFilters.hpp"

Expand Down Expand Up @@ -647,15 +647,14 @@ bool ReaderProxyData::writeToCDRMessage(

bool ReaderProxyData::readFromCDRMessage(
CDRMessage_t* msg,
const NetworkFactory& network,
bool is_shm_transport_available,
NetworkFactory& network,
bool should_filter_locators,
fastdds::rtps::VendorId_t source_vendor_id)
{
auto param_process = [this, &network, &is_shm_transport_available, &should_filter_locators, source_vendor_id](
auto param_process = [this, &network, &should_filter_locators, source_vendor_id](
CDRMessage_t* msg, const ParameterId_t& pid, uint16_t plength)
{
VendorId_t vendor_id = c_VendorId_Unknown;
VendorId_t vendor_id = source_vendor_id;

switch (pid)
{
Expand All @@ -668,7 +667,6 @@ bool ReaderProxyData::readFromCDRMessage(
return false;
}

is_shm_transport_available &= (p.vendorId == c_VendorId_eProsima);
vendor_id = p.vendorId;
break;
}
Expand Down Expand Up @@ -902,7 +900,7 @@ bool ReaderProxyData::readFromCDRMessage(
m_guid.is_from_this_host()))
{
ProxyDataFilters::filter_locators(
is_shm_transport_available,
network,
remote_locators_,
temp_locator,
true);
Expand Down Expand Up @@ -930,7 +928,7 @@ bool ReaderProxyData::readFromCDRMessage(
m_guid.is_from_this_host()))
{
ProxyDataFilters::filter_locators(
is_shm_transport_available,
network,
remote_locators_,
temp_locator,
false);
Expand Down
4 changes: 1 addition & 3 deletions src/cpp/rtps/builtin/data/ReaderProxyData.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,15 +424,13 @@ class ReaderProxyData
* parameter list.
* @param msg Pointer to the message.
* @param network Reference to network factory for locator validation and transformation
* @param is_shm_transport_available Indicates whether the Reader is reachable by SHM.
* @param should_filter_locators Whether to retrieve the locators before the external locators filtering
* @param source_vendor_id VendorId of the source participant from which the message was received
* @return true on success
*/
bool readFromCDRMessage(
CDRMessage_t* msg,
const NetworkFactory& network,
bool is_shm_transport_available,
NetworkFactory& network,
bool should_filter_locators,
fastdds::rtps::VendorId_t source_vendor_id = c_VendorId_eProsima);

Expand Down
14 changes: 6 additions & 8 deletions src/cpp/rtps/builtin/data/WriterProxyData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <fastdds/rtps/common/VendorId_t.hpp>

#include <rtps/builtin/data/WriterProxyData.hpp>
#include <rtps/network/NetworkFactory.h>
#include <rtps/network/NetworkFactory.hpp>

#include "ProxyDataFilters.hpp"

Expand Down Expand Up @@ -614,15 +614,14 @@ bool WriterProxyData::writeToCDRMessage(

bool WriterProxyData::readFromCDRMessage(
CDRMessage_t* msg,
const NetworkFactory& network,
bool is_shm_transport_available,
NetworkFactory& network,
bool should_filter_locators,
fastdds::rtps::VendorId_t source_vendor_id)
{
auto param_process = [this, &network, &is_shm_transport_available, &should_filter_locators, source_vendor_id](
auto param_process = [this, &network, &should_filter_locators, source_vendor_id](
CDRMessage_t* msg, const ParameterId_t& pid, uint16_t plength)
{
VendorId_t vendor_id = c_VendorId_Unknown;
VendorId_t vendor_id = source_vendor_id;

switch (pid)
{
Expand All @@ -635,7 +634,6 @@ bool WriterProxyData::readFromCDRMessage(
return false;
}

is_shm_transport_available &= (p.vendorId == c_VendorId_eProsima);
vendor_id = p.vendorId;
break;
}
Expand Down Expand Up @@ -899,7 +897,7 @@ bool WriterProxyData::readFromCDRMessage(
m_guid.is_from_this_host()))
{
ProxyDataFilters::filter_locators(
is_shm_transport_available,
network,
remote_locators_,
temp_locator,
true);
Expand All @@ -926,7 +924,7 @@ bool WriterProxyData::readFromCDRMessage(
m_guid.is_from_this_host()))
{
ProxyDataFilters::filter_locators(
is_shm_transport_available,
network,
remote_locators_,
temp_locator,
false);
Expand Down
3 changes: 1 addition & 2 deletions src/cpp/rtps/builtin/data/WriterProxyData.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,7 @@ class WriterProxyData
//!Read a parameter list from a CDRMessage_t.
bool readFromCDRMessage(
CDRMessage_t* msg,
const NetworkFactory& network,
bool is_shm_transport_possible,
NetworkFactory& network,
bool should_filter_locators,
fastdds::rtps::VendorId_t source_vendor_id = c_VendorId_eProsima);

Expand Down
12 changes: 5 additions & 7 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPSimpleListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#include <rtps/builtin/data/WriterProxyData.hpp>
#include <rtps/builtin/discovery/endpoint/EDPSimple.h>
#include <rtps/builtin/discovery/participant/PDPSimple.h>
#include <rtps/network/NetworkFactory.h>
#include <rtps/network/NetworkFactory.hpp>
#include <rtps/reader/StatefulReader.hpp>
#include <rtps/writer/StatefulWriter.hpp>

Expand Down Expand Up @@ -70,13 +70,12 @@ void EDPBasePUBListener::add_writer_from_change(
const EndpointAddedCallback& writer_added_callback /* = nullptr*/)
{
//LOAD INFORMATION IN DESTINATION WRITER PROXY DATA
const NetworkFactory& network = edp->mp_RTPSParticipant->network_factory();
NetworkFactory& network = edp->mp_RTPSParticipant->network_factory();
CDRMessage_t tempMsg(change->serializedPayload);
auto temp_writer_data = edp->get_temporary_writer_proxies_pool().get();
const auto type_server = change->writerGUID;

if (temp_writer_data->readFromCDRMessage(&tempMsg, network,
edp->mp_RTPSParticipant->has_shm_transport(), true, change->vendor_id))
if (temp_writer_data->readFromCDRMessage(&tempMsg, network, true, change->vendor_id))
{
if (temp_writer_data->guid().guidPrefix == edp->mp_RTPSParticipant->getGuid().guidPrefix)
{
Expand Down Expand Up @@ -215,13 +214,12 @@ void EDPBaseSUBListener::add_reader_from_change(
const EndpointAddedCallback& reader_added_callback /* = nullptr*/)
{
//LOAD INFORMATION IN TEMPORAL READER PROXY DATA
const NetworkFactory& network = edp->mp_RTPSParticipant->network_factory();
NetworkFactory& network = edp->mp_RTPSParticipant->network_factory();
CDRMessage_t tempMsg(change->serializedPayload);
auto temp_reader_data = edp->get_temporary_reader_proxies_pool().get();
const auto type_server = change->writerGUID;

if (temp_reader_data->readFromCDRMessage(&tempMsg, network,
edp->mp_RTPSParticipant->has_shm_transport(), true, change->vendor_id))
if (temp_reader_data->readFromCDRMessage(&tempMsg, network, true, change->vendor_id))
{
if (temp_reader_data->guid().guidPrefix == edp->mp_RTPSParticipant->getGuid().guidPrefix)
{
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void PDPListener::on_new_cache_change_added(
CDRMessage_t msg(change->serializedPayload);
temp_participant_data_.clear();
if (temp_participant_data_.readFromCDRMessage(&msg, true, parent_pdp_->getRTPSParticipant()->network_factory(),
parent_pdp_->getRTPSParticipant()->has_shm_transport(), true, change_in->vendor_id))
true, change_in->vendor_id))
{
// After correctly reading it
change->instanceHandle = temp_participant_data_.m_key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ void PDPServerListener::on_new_cache_change_added(
&msg,
true,
pdp_server()->getRTPSParticipant()->network_factory(),
pdp_server()->getRTPSParticipant()->has_shm_transport(),
true,
change_in->vendor_id))
{
Expand Down
Loading

0 comments on commit af94cb5

Please sign in to comment.