Skip to content

Commit

Permalink
Fix destruction data-race on participant removal in intra-process (#5034
Browse files Browse the repository at this point in the history
)

* Refs #21293: Add BB test

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Reinforce test to fail more frequently

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Add RefCountedPointer.hpp to utils

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Add unittests for RefCountedPointer

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: LocalReaderPointer.hpp

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: BaseReader aggregates LocalReaderPointer

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: ReaderLocator aggregates LocalReaderPointer

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: RTPSDomainImpl::find_local_reader returns a sared_ptr<LocalReaderPointer> and properly calls local_actions_on_reader_removed()

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: RTPSWriters properly using LocalReaderPointer::Instance when accessing local reader

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Linter

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Fix windows warnings

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Address Miguel's review

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: Apply last comment

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21293: NIT

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
(cherry picked from commit 456e45f)

# Conflicts:
#	include/fastdds/rtps/writer/ReaderLocator.h
#	include/fastdds/rtps/writer/ReaderProxy.h
#	src/cpp/rtps/RTPSDomain.cpp
#	src/cpp/rtps/RTPSDomainImpl.hpp
#	src/cpp/rtps/participant/RTPSParticipantImpl.cpp
#	src/cpp/rtps/participant/RTPSParticipantImpl.h
#	src/cpp/rtps/reader/BaseReader.cpp
#	src/cpp/rtps/reader/BaseReader.hpp
#	src/cpp/rtps/writer/ReaderLocator.cpp
#	src/cpp/rtps/writer/StatefulWriter.cpp
#	src/cpp/rtps/writer/StatelessWriter.cpp
#	test/blackbox/common/DDSBlackboxTestsBasic.cpp
#	test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h
#	test/unittest/utils/CMakeLists.txt
  • Loading branch information
Mario-DL authored and mergify[bot] committed Oct 29, 2024
1 parent 0616997 commit 7dbf4bd
Show file tree
Hide file tree
Showing 17 changed files with 1,774 additions and 17 deletions.
13 changes: 13 additions & 0 deletions include/fastdds/rtps/writer/ReaderLocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include <fastdds/rtps/messages/RTPSMessageGroup.h>
#include <fastdds/rtps/common/LocatorSelectorEntry.hpp>

#include <rtps/reader/LocalReaderPointer.hpp>

namespace eprosima {
namespace fastrtps {
namespace rtps {
Expand Down Expand Up @@ -69,10 +71,17 @@ class ReaderLocator : public RTPSMessageSenderInterface
return is_local_reader_;
}

<<<<<<< HEAD:include/fastdds/rtps/writer/ReaderLocator.h
RTPSReader* local_reader();

void local_reader(
RTPSReader* local_reader)
=======
LocalReaderPointer::Instance local_reader();

void local_reader(
std::shared_ptr<LocalReaderPointer> local_reader)
>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/writer/ReaderLocator.hpp
{
local_reader_ = local_reader;
}
Expand Down Expand Up @@ -260,7 +269,11 @@ class ReaderLocator : public RTPSMessageSenderInterface
LocatorSelectorEntry async_locator_info_;
bool expects_inline_qos_;
bool is_local_reader_;
<<<<<<< HEAD:include/fastdds/rtps/writer/ReaderLocator.h
RTPSReader* local_reader_;
=======
std::shared_ptr<LocalReaderPointer> local_reader_;
>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/writer/ReaderLocator.hpp
std::vector<GuidPrefix_t> guid_prefix_as_vector_;
std::vector<GUID_t> guid_as_vector_;
IDataSharingNotifier* datasharing_notifier_;
Expand Down
4 changes: 4 additions & 0 deletions include/fastdds/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,11 @@ class ReaderProxy
* Get the local reader on the same process (if any).
* @return The local reader on the same process.
*/
<<<<<<< HEAD:include/fastdds/rtps/writer/ReaderProxy.h
inline RTPSReader* local_reader()
=======
inline LocalReaderPointer::Instance local_reader()
>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/writer/ReaderProxy.hpp
{
return locator_info_.local_reader();
}
Expand Down
19 changes: 18 additions & 1 deletion src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@
#include <fastdds/rtps/reader/RTPSReader.h>
#include <fastdds/rtps/writer/RTPSWriter.h>

<<<<<<< HEAD
=======
#include <rtps/attributes/ServerAttributes.hpp>
#include <rtps/common/GuidUtils.hpp>
#include <rtps/network/utils/external_locators.hpp>
#include <rtps/participant/RTPSParticipantImpl.hpp>
#include <rtps/reader/BaseReader.hpp>
#include <rtps/reader/LocalReaderPointer.hpp>
#include <rtps/RTPSDomainImpl.hpp>
#include <rtps/transport/TCPv4Transport.h>
#include <rtps/transport/TCPv6Transport.h>
#include <rtps/transport/test_UDPv4Transport.h>
>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034))
#include <rtps/transport/UDPv4Transport.h>
#include <rtps/transport/UDPv6Transport.h>
#include <rtps/transport/test_UDPv4Transport.h>
Expand Down Expand Up @@ -777,7 +790,11 @@ RTPSParticipantImpl* RTPSDomainImpl::find_local_participant(
return nullptr;
}

<<<<<<< HEAD
RTPSReader* RTPSDomainImpl::find_local_reader(
=======
std::shared_ptr<LocalReaderPointer> RTPSDomainImpl::find_local_reader(
>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034))
const GUID_t& reader_guid)
{
auto instance = get_instance();
Expand All @@ -791,7 +808,7 @@ RTPSReader* RTPSDomainImpl::find_local_reader(
}
}

return nullptr;
return std::shared_ptr<LocalReaderPointer>(nullptr);
}

RTPSWriter* RTPSDomainImpl::find_local_writer(
Expand Down
11 changes: 11 additions & 0 deletions src/cpp/rtps/RTPSDomainImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@
#include <fastrtps/rtps/RTPSDomain.h>
#include <fastrtps/rtps/writer/RTPSWriter.h>

<<<<<<< HEAD
=======
#include <rtps/reader/BaseReader.hpp>
#include <rtps/reader/LocalReaderPointer.hpp>
#include <rtps/writer/BaseWriter.hpp>
#include <utils/shared_memory/BoostAtExitRegistry.hpp>
>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034))
#include <utils/SystemInfo.hpp>

#include <utils/shared_memory/BoostAtExitRegistry.hpp>
Expand Down Expand Up @@ -173,7 +180,11 @@ class RTPSDomainImpl
*
* @returns A pointer to a local reader given its endpoint guid, or nullptr if not found.
*/
<<<<<<< HEAD
static RTPSReader* find_local_reader(
=======
static std::shared_ptr<LocalReaderPointer> find_local_reader(
>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034))
const GUID_t& reader_guid);

/**
Expand Down
28 changes: 26 additions & 2 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1452,7 +1452,11 @@ bool RTPSParticipantImpl::createReader(
return create_reader(ReaderOut, param, entityId, isBuiltin, enable, callback);
}

<<<<<<< HEAD
RTPSReader* RTPSParticipantImpl::find_local_reader(
=======
std::shared_ptr<LocalReaderPointer> RTPSParticipantImpl::find_local_reader(
>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034))
const GUID_t& reader_guid)
{
shared_lock<shared_mutex> _(endpoints_list_mutex);
Expand All @@ -1461,11 +1465,11 @@ RTPSReader* RTPSParticipantImpl::find_local_reader(
{
if (reader->getGuid() == reader_guid)
{
return reader;
return reader->get_local_pointer();
}
}

return nullptr;
return std::shared_ptr<LocalReaderPointer>();
}

RTPSWriter* RTPSParticipantImpl::find_local_writer(
Expand Down Expand Up @@ -2099,6 +2103,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(

bool found = false, found_in_users = false;
Endpoint* p_endpoint = nullptr;
BaseReader* reader = nullptr;

if (endpoint.entityId.is_writer())
{
Expand Down Expand Up @@ -2133,6 +2138,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
{
if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it
{
reader = *rit;
m_userReaderList.erase(rit);
found_in_users = true;
break;
Expand All @@ -2143,6 +2149,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
{
if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it
{
reader = *rit;
p_endpoint = *rit;
m_allReaderList.erase(rit);
found = true;
Expand Down Expand Up @@ -2201,6 +2208,10 @@ bool RTPSParticipantImpl::deleteUserEndpoint(
#endif // if HAVE_SECURITY
}

if (reader)
{
reader->local_actions_on_reader_removed();
}
delete(p_endpoint);
return true;
}
Expand Down Expand Up @@ -2288,6 +2299,11 @@ void RTPSParticipantImpl::deleteAllUserEndpoints()
}
#endif // if HAVE_SECURITY

if (kind == READER)
{
static_cast<BaseReader*>(endpoint)->local_actions_on_reader_removed();
}

// remove the endpoints
delete(endpoint);
}
Expand Down Expand Up @@ -2988,8 +3004,16 @@ bool RTPSParticipantImpl::register_in_reader(
}
else if (!fastdds::statistics::is_statistics_builtin(reader_guid.entityId))
{
<<<<<<< HEAD
RTPSReader* reader = find_local_reader(reader_guid);
res = reader->add_statistics_listener(listener);
=======
LocalReaderPointer::Instance local_reader(find_local_reader(reader_guid));
if (local_reader)
{
res = local_reader->add_statistics_listener(listener);
}
>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034))
}

return res;
Expand Down
9 changes: 9 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
#include <rtps/messages/SendBuffersManager.hpp>
#include <rtps/network/NetworkFactory.h>
#include <rtps/network/ReceiverResource.h>
<<<<<<< HEAD:src/cpp/rtps/participant/RTPSParticipantImpl.h
=======
#include <rtps/reader/LocalReaderPointer.hpp>
#include <rtps/resources/ResourceEvent.h>
>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/participant/RTPSParticipantImpl.hpp
#include <statistics/rtps/monitor-service/interfaces/IConnectionsObserver.hpp>
#include <statistics/rtps/monitor-service/interfaces/IConnectionsQueryable.hpp>
#include <statistics/rtps/StatisticsBase.hpp>
Expand Down Expand Up @@ -479,7 +484,11 @@ class RTPSParticipantImpl
/***
* @returns A pointer to a local reader given its endpoint guid, or nullptr if not found.
*/
<<<<<<< HEAD:src/cpp/rtps/participant/RTPSParticipantImpl.h
RTPSReader* find_local_reader(
=======
std::shared_ptr<LocalReaderPointer> find_local_reader(
>>>>>>> 456e45f25 (Fix destruction data-race on participant removal in intra-process (#5034)):src/cpp/rtps/participant/RTPSParticipantImpl.hpp
const GUID_t& reader_guid);

/***
Expand Down
Loading

0 comments on commit 7dbf4bd

Please sign in to comment.