Skip to content

Commit

Permalink
Refs #21293: Approach rework v2
Browse files Browse the repository at this point in the history
Signed-off-by: Mario Dominguez <[email protected]>
  • Loading branch information
Mario-DL committed Oct 14, 2024
1 parent 7fff624 commit 91ac4b7
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 125 deletions.
4 changes: 2 additions & 2 deletions src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ RTPSParticipantImpl* RTPSDomainImpl::find_local_participant(
return nullptr;
}

BaseReader* RTPSDomainImpl::find_local_reader(
LocalReaderPointer RTPSDomainImpl::find_local_reader(
const GUID_t& reader_guid)
{
auto instance = get_instance();
Expand All @@ -704,7 +704,7 @@ BaseReader* RTPSDomainImpl::find_local_reader(
}
}

return nullptr;
return LocalReaderPointer();
}

BaseWriter* RTPSDomainImpl::find_local_writer(
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/RTPSDomainImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class RTPSDomainImpl
*
* @returns A pointer to a local reader given its endpoint guid, or nullptr if not found.
*/
static BaseReader* find_local_reader(
static LocalReaderPointer find_local_reader(
const GUID_t& reader_guid);

/**
Expand Down
13 changes: 8 additions & 5 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ bool RTPSParticipantImpl::createReader(
return create_reader(ReaderOut, param, entityId, isBuiltin, enable, callback);
}

BaseReader* RTPSParticipantImpl::find_local_reader(
LocalReaderPointer RTPSParticipantImpl::find_local_reader(
const GUID_t& reader_guid)
{
shared_lock<shared_mutex> _(endpoints_list_mutex);
Expand All @@ -1359,11 +1359,11 @@ BaseReader* RTPSParticipantImpl::find_local_reader(
{
if (reader->getGuid() == reader_guid)
{
return reader;
return reader->get_local_pointer();
}
}

return nullptr;
return LocalReaderPointer();
}

BaseWriter* RTPSParticipantImpl::find_local_writer(
Expand Down Expand Up @@ -2840,8 +2840,11 @@ bool RTPSParticipantImpl::register_in_reader(
}
else if (!fastdds::statistics::is_statistics_builtin(reader_guid.entityId))
{
BaseReader* reader = find_local_reader(reader_guid);
res = reader->add_statistics_listener(listener);
LocalReaderPointer reader = find_local_reader(reader_guid);
if (reader)
{
res = reader->add_statistics_listener(listener);
}
}

return res;
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ namespace rtps {

class BaseReader;
class BaseWriter;
class LocalReaderPointer;

} // namespace rtps

Expand Down Expand Up @@ -477,7 +478,7 @@ class RTPSParticipantImpl
/***
* @returns A pointer to a local reader given its endpoint guid, or nullptr if not found.
*/
BaseReader* find_local_reader(
LocalReaderPointer find_local_reader(
const GUID_t& reader_guid);

/***
Expand Down
9 changes: 8 additions & 1 deletion src/cpp/rtps/reader/BaseReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <rtps/history/BasicPayloadPool.hpp>
#include <rtps/history/CacheChangePool.h>
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/reader/LocalReaderPointer.hpp>
#include <rtps/reader/LocalReaderView.hpp>
#include <rtps/reader/ReaderHistoryState.hpp>
#include <statistics/rtps/StatisticsBase.hpp>
Expand Down Expand Up @@ -110,7 +111,7 @@ BaseReader::BaseReader(

void BaseReader::local_actions_on_reader_removed()
{
local_view_->wait_for_references_and_set_status(0, LocalReaderViewStatus::INACTIVE);
local_view_->deactivate();
}

BaseReader::~BaseReader()
Expand Down Expand Up @@ -278,6 +279,12 @@ void BaseReader::allow_unknown_writers()
accept_messages_from_unkown_writers_ = true;
}

LocalReaderPointer BaseReader::get_local_pointer()
{
std::lock_guard<decltype(mp_mutex)> guard(mp_mutex);
return LocalReaderPointer(this, local_view_);
}

bool BaseReader::reserve_cache(
uint32_t cdr_payload_size,
fastdds::rtps::CacheChange_t*& change)
Expand Down
14 changes: 9 additions & 5 deletions src/cpp/rtps/reader/BaseReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ namespace rtps {
struct CacheChange_t;
class IDataSharingListener;
class LocalReaderView;
class LocalReaderPointer;
struct ReaderHistoryState;
class ReaderListener;
class RTPSParticipantImpl;
Expand Down Expand Up @@ -164,6 +165,14 @@ class BaseReader
return datasharing_listener_;
}

/**
* @brief Retrieves the local pointer to this reader
* to be used by other local entities.
*
* @return Local pointer to this reader.
*/
LocalReaderPointer get_local_pointer();

/**
* @brief Reserve a CacheChange_t.
*
Expand Down Expand Up @@ -297,11 +306,6 @@ class BaseReader
const fastdds::rtps::SequenceNumberSet_t& gapList,
VendorId_t origin_vendor_id = c_VendorId_Unknown) = 0;

inline std::shared_ptr<LocalReaderView> get_local_view()
{
return local_view_;
}

void local_actions_on_reader_removed();

#ifdef FASTDDS_STATISTICS
Expand Down
123 changes: 97 additions & 26 deletions src/cpp/rtps/reader/LocalReaderPointer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,71 +16,142 @@
* @file LocalReaderPointer.cpp
*/

#include <rtps/reader/BaseReader.hpp>
#include <rtps/reader/LocalReaderPointer.hpp>

#include <cassert>

#include <rtps/reader/LocalReaderView.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {


LocalReaderPointer::LocalReaderPointer()
WeakLocalReaderPointer::WeakLocalReaderPointer()
: local_reader_(nullptr)
, already_referenced_(false)
, view_(nullptr)
{
}

LocalReaderPointer::LocalReaderPointer(
WeakLocalReaderPointer::WeakLocalReaderPointer(
BaseReader* reader,
std::shared_ptr<LocalReaderView> view)
: local_reader_(reader)
, view_(view)
, already_referenced_(false)
{
}

BaseReader* WeakLocalReaderPointer::operator ->()
{
assert(nullptr != local_reader_);
return local_reader_;
}

LocalReaderPointer::~LocalReaderPointer()
WeakLocalReaderPointer::operator bool() const
{
auto view = view_.lock();
if (already_referenced_ && view)
bool ret = false;

if (nullptr != local_reader_ &&
nullptr != view_)
{
view->dereference();
ret = true;
}
return ret;
}

BaseReader* LocalReaderPointer::reader()
void WeakLocalReaderPointer::reset()
{
local_reader_ = nullptr;
view_.reset();
}

LocalReaderPointer::LocalReaderPointer()
: WeakLocalReaderPointer()
{
BaseReader* ret = nullptr;
auto view = view_.lock();
if (!already_referenced_ && view)
}

LocalReaderPointer::LocalReaderPointer(
BaseReader* reader,
std::shared_ptr<LocalReaderView> view)
: WeakLocalReaderPointer(reader, view)
{
if (nullptr != view_)
{
view->add_reference();
ret = local_reader_;
already_referenced_ = true;
view_->add_reference();
}
}

return ret;
LocalReaderPointer::LocalReaderPointer(
const LocalReaderPointer& other)
: WeakLocalReaderPointer(other.local_reader_, other.view_)
{
if (nullptr != view_)
{
view_->add_reference();
}
}

LocalReaderPointer::LocalReaderPointer(
const WeakLocalReaderPointer& weak_local_reader_ptr)
: WeakLocalReaderPointer(weak_local_reader_ptr.local_reader_, weak_local_reader_ptr.view_)
{
if (nullptr != view_)
{
view_->add_reference();
}
}

LocalReaderPointer::~LocalReaderPointer()
{
if (nullptr != view_)
{
view_->dereference();
}
}

BaseReader* LocalReaderPointer::operator ->()
{
return reader();
assert(nullptr != local_reader_);
return local_reader_;
}

bool LocalReaderPointer::is_valid()
LocalReaderPointer& LocalReaderPointer::operator =(
const LocalReaderPointer& other)
{
bool ret = false;
local_reader_ = other.local_reader_;
view_ = other.view_;

if (nullptr != view_)
{
view_->add_reference();
}

return *this;
}

if (local_reader_)
LocalReaderPointer& LocalReaderPointer::operator =(
const WeakLocalReaderPointer& other)
{
local_reader_ = other.local_reader_;
view_ = other.view_;

if (nullptr != view_)
{
auto view = view_.lock();
view_->add_reference();
}

if (view && view->get_status() != LocalReaderViewStatus::INACTIVE)
{
ret = true;
}
return *this;
}

LocalReaderPointer::operator bool() const
{
bool ret = false;

if (nullptr != local_reader_ &&
nullptr != view_ &&
view_->get_status() != LocalReaderViewStatus::INACTIVE)
{
ret = true;
}

return ret;
Expand Down
Loading

0 comments on commit 91ac4b7

Please sign in to comment.