Skip to content

Commit

Permalink
Refs #21293: Another temporary fix that IDK why it is not working
Browse files Browse the repository at this point in the history
Signed-off-by: Mario Dominguez <[email protected]>
  • Loading branch information
Mario-DL committed Jul 10, 2024
1 parent 18b7f62 commit f435614
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 1 deletion.
7 changes: 7 additions & 0 deletions include/fastdds/rtps/writer/RTPSWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ class RTPSWriter
return false;
}

/**
* Check if a specific change has been acknowledged by all Readers.
* Is only useful in reliable Writer. In BE Writers returns false when pending to be sent.
* @return True if acknowledged by all.
*/
FASTDDS_EXPORTED_API virtual bool reset_intraprocess_references() = 0;

/**
* Waits until all changes were acknowledged or max_wait.
* @return True if all were acknowledged.
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,9 @@ bool EDPSimple::removeLocalWriter(

if (writer->first != nullptr)
{
//! reset local pointer in intraprocess readers
writer->first->reset_intraprocess_references();

InstanceHandle_t iH;
iH = W->getGuid();
CacheChange_t* change = EDPUtils::create_change(*writer, NOT_ALIVE_DISPOSED_UNREGISTERED, iH,
Expand Down Expand Up @@ -703,6 +706,9 @@ bool EDPSimple::removeLocalReader(

if (writer->first != nullptr)
{
//! reset local pointer in intraprocess readers
writer->first->reset_intraprocess_references();

InstanceHandle_t iH;
iH = (R->getGuid());
CacheChange_t* change = EDPUtils::create_change(*writer, NOT_ALIVE_DISPOSED_UNREGISTERED, iH,
Expand Down
9 changes: 9 additions & 0 deletions src/cpp/rtps/writer/ReaderProxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,15 @@ class ReaderProxy
return locator_info_.local_reader(force_update);
}

/**
* Get the local reader on the same process (if any).
* @return The local reader on the same process.
*/
inline void local_reader(BaseReader* reader)
{
locator_info_.local_reader(reader);
}

/**
* Called when an ACKNACK is received to set a new value for the minimum count accepted for following received
* ACKNACKs.
Expand Down
16 changes: 15 additions & 1 deletion src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ bool StatefulWriter::intraprocess_delivery(
CacheChange_t* change,
ReaderProxy* reader_proxy)
{
BaseReader* reader = reader_proxy->local_reader(true);
BaseReader* reader = reader_proxy->local_reader();
if (reader)
{
if (change->write_params.related_sample_identity() != SampleIdentity::unknown())
Expand Down Expand Up @@ -1379,6 +1379,20 @@ bool StatefulWriter::all_readers_updated()
);
}

bool StatefulWriter::reset_intraprocess_references()
{
if (!matched_local_readers_.empty())
{
for_matched_readers(matched_local_readers_, [](ReaderProxy* reader)
{
reader->local_reader(nullptr);
return true;
});
}

return true;
}

bool StatefulWriter::wait_for_all_acked(
const Duration_t& max_wait)
{
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/writer/StatefulWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ class StatefulWriter : public RTPSWriter

bool all_readers_updated();

bool reset_intraprocess_references() override;

/**
* Remove the change with the minimum SequenceNumber
* @return True if removed.
Expand Down
14 changes: 14 additions & 0 deletions src/cpp/rtps/writer/StatelessWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,20 @@ bool StatelessWriter::change_removed_by_history(
return ret_value;
}

bool StatelessWriter::reset_intraprocess_references()
{
if (!matched_local_readers_.empty())
{
for_matched_readers(matched_local_readers_, [](ReaderLocator& reader)
{
reader.local_reader(nullptr);
return true;
});
}

return true;
}

bool StatelessWriter::has_been_fully_delivered(
const SequenceNumber_t& seq_num) const
{
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/writer/StatelessWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class StatelessWriter : public RTPSWriter
CacheChange_t* change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;

bool reset_intraprocess_references() override;

/**
* Add a matched reader.
* @param data Pointer to the ReaderProxyData object added.
Expand Down

0 comments on commit f435614

Please sign in to comment.