Skip to content

Commit

Permalink
Fix strict real-time in Flow Controller (#3735)
Browse files Browse the repository at this point in the history
* Refs #13803. Fix realtime in flowcontroller

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #13803. Fix error

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #13803. Update realtime tests to DDS API

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #13803. Fix blocking lock when removing from flowcontroller

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #13803. Fix compilation without STRICT_REALTIME

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19254. Fix bug in readers

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19254. Document tests

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19254. Improve doxygen documentation

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19254. Fix bug in readers

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19254. Fix warning on windows

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19254. Apply suggestion

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19254. Apply suggestion

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19254. Apply suggestions

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19254. Apply suggestions

Signed-off-by: Ricardo González Moreno <[email protected]>

---------

Signed-off-by: Ricardo González Moreno <[email protected]>
  • Loading branch information
richiware authored Sep 13, 2023
1 parent a8a4f42 commit a575bbb
Show file tree
Hide file tree
Showing 45 changed files with 1,414 additions and 704 deletions.
23 changes: 23 additions & 0 deletions include/fastdds/rtps/history/History.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,19 @@ class History
const_iterator removal,
bool release = true);

/**
* Remove a specific change from the history.
* No Thread Safe
* @param removal iterator to the CacheChange_t to remove.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @param release defaults to true and hints if the CacheChange_t should return to the pool
* @return iterator to the next CacheChange_t or end iterator.
*/
RTPS_DllAPI virtual iterator remove_change_nts(
const_iterator removal,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time,
bool release = true);

/**
* Remove all changes from the History
* @return True if everything was correctly removed.
Expand All @@ -159,6 +172,16 @@ class History
RTPS_DllAPI bool remove_change(
CacheChange_t* ch);

/**
* Remove a specific change from the history.
* @param ch Pointer to the CacheChange_t.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if removed.
*/
RTPS_DllAPI bool remove_change(
CacheChange_t* ch,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);

/**
* Find a specific change in the history using the matches_change method criteria.
* @param ch Pointer to the CacheChange_t to search for.
Expand Down
13 changes: 13 additions & 0 deletions include/fastdds/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,19 @@ class ReaderHistory : public History
const_iterator removal,
bool release = true) override;

/**
* Remove a specific change from the history.
* No Thread Safe
* @param removal iterator to the change for removal
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @param release specifies if the change must be returned to the pool
* @return iterator to the next change if any
*/
RTPS_DllAPI iterator remove_change_nts(
const_iterator removal,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time,
bool release = true) override;

/**
* Criteria to search a specific CacheChange_t on history
* @param inner change to compare
Expand Down
25 changes: 25 additions & 0 deletions include/fastdds/rtps/history/WriterHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ class WriterHistory : public rtps::History
const_iterator removal,
bool release = true) override;

/**
* Remove a specific change from the history.
* No Thread Safe
* @param removal iterator to the change for removal
* @param release specifies if the change should be return to the pool
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return iterator to the next change if any
*/
RTPS_DllAPI iterator remove_change_nts(
const_iterator removal,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time,
bool release = true) override;

/**
* Criteria to search a specific CacheChange_t on history
* @param inner change to compare
Expand All @@ -99,6 +112,10 @@ class WriterHistory : public rtps::History
RTPS_DllAPI virtual bool remove_change_g(
CacheChange_t* a_change);

RTPS_DllAPI virtual bool remove_change_g(
CacheChange_t* a_change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);

RTPS_DllAPI bool remove_change(
const SequenceNumber_t& sequence_number);

Expand All @@ -111,6 +128,14 @@ class WriterHistory : public rtps::History
*/
RTPS_DllAPI bool remove_min_change();

/**
* Remove the CacheChange_t with the minimum sequenceNumber.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if correctly removed.
*/
RTPS_DllAPI bool remove_min_change(
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);

RTPS_DllAPI SequenceNumber_t next_sequence_number() const
{
return m_lastCacheChangeSeqNum + 1;
Expand Down
7 changes: 4 additions & 3 deletions include/fastdds/rtps/messages/RTPSMessageGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,14 @@ class RTPSMessageGroup
* Constructs a RTPSMessageGroup allowing to allocate its own buffer.
* @param participant Pointer to the participant sending data.
* @param internal_buffer true indicates this object to allocate its own buffer. false indicates to get a buffer
* @param max_blocking_time_point Future time point where blocking send should end.
* from the participant.
*/
RTPSMessageGroup(
RTPSParticipantImpl* participant,
bool internal_buffer = false);
bool internal_buffer = false,
std::chrono::steady_clock::time_point max_blocking_time_point =
std::chrono::steady_clock::now() + std::chrono::hours(24));

/**
* Basic constructor.
Expand Down Expand Up @@ -313,8 +316,6 @@ class RTPSMessageGroup

std::chrono::steady_clock::time_point max_blocking_time_point_;

bool max_blocking_time_is_set_ = false;

std::unique_ptr<RTPSMessageGroup_t> send_buffer_;

bool internal_buffer_ = false;
Expand Down
17 changes: 14 additions & 3 deletions include/fastdds/rtps/writer/LocatorSelectorSender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
#include <fastdds/rtps/common/LocatorSelector.hpp>
#include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>

#include <mutex>
#include <fastrtps/utils/TimedMutex.hpp>

namespace eprosima {
namespace fastrtps {
Expand Down Expand Up @@ -95,6 +94,18 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface
mutex_.unlock();
}

/*!
* Try to lock the object.
*
* This kind of object needs to be locked because could be used outside the writer's mutex.
*/
template <class Clock, class Duration>
bool try_lock_until(
const std::chrono::time_point<Clock, Duration>& abs_time)
{
return mutex_.try_lock_until(abs_time);
}

fastrtps::rtps::LocatorSelector locator_selector;

ResourceLimitedVector<GUID_t> all_remote_readers;
Expand All @@ -105,7 +116,7 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface

RTPSWriter& writer_;

std::recursive_mutex mutex_;
RecursiveTimedMutex mutex_;
};

} // namespace rtps
Expand Down
6 changes: 4 additions & 2 deletions include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ class RTPSWriter
/**
* Add a change to the unsent list.
* @param change Pointer to the change to add.
* @param max_blocking_time
* @param[in] max_blocking_time Maximum time this method has to complete the task.
*/
virtual void unsent_change_added_to_history(
CacheChange_t* change,
Expand All @@ -544,10 +544,12 @@ class RTPSWriter
/**
* Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
* @param a_change Pointer to the change that is going to be removed.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if removed correctly.
*/
virtual bool change_removed_by_history(
CacheChange_t* a_change) = 0;
CacheChange_t* a_change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;

bool is_datasharing_compatible_with(
const ReaderProxyData& rdata) const;
Expand Down
6 changes: 4 additions & 2 deletions include/fastdds/rtps/writer/StatefulPersistentWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class StatefulPersistentWriter : public StatefulWriter, private PersistentWriter
/**
* Add a specific change to all ReaderLocators.
* @param p Pointer to the change.
* @param max_blocking_time
* @param[in] max_blocking_time Maximum time this method has to complete the task.
*/
void unsent_change_added_to_history(
CacheChange_t* p,
Expand All @@ -92,10 +92,12 @@ class StatefulPersistentWriter : public StatefulWriter, private PersistentWriter
/**
* Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
* @param a_change Pointer to the change that is going to be removed.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if removed correctly.
*/
bool change_removed_by_history(
CacheChange_t* a_change) override;
CacheChange_t* a_change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
};

} // namespace rtps
Expand Down
6 changes: 4 additions & 2 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class StatefulWriter : public RTPSWriter
/**
* Add a specific change to all ReaderLocators.
* @param p Pointer to the change.
* @param max_blocking_time
* @param[in] max_blocking_time Maximum time this method has to complete the task.
*/
void unsent_change_added_to_history(
CacheChange_t* p,
Expand All @@ -143,10 +143,12 @@ class StatefulWriter : public RTPSWriter
/**
* Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
* @param a_change Pointer to the change that is going to be removed.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if removed correctly.
*/
bool change_removed_by_history(
CacheChange_t* a_change) override;
CacheChange_t* a_change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;

/**
* Sends a change directly to a intraprocess reader.
Expand Down
6 changes: 4 additions & 2 deletions include/fastdds/rtps/writer/StatelessPersistentWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class StatelessPersistentWriter : public StatelessWriter, private PersistentWrit
/**
* Add a specific change to all ReaderLocators.
* @param p Pointer to the change.
* @param max_blocking_time
* @param[in] max_blocking_time Maximum time this method has to complete the task.
*/
void unsent_change_added_to_history(
CacheChange_t* p,
Expand All @@ -83,10 +83,12 @@ class StatelessPersistentWriter : public StatelessWriter, private PersistentWrit
/**
* Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
* @param a_change Pointer to the change that is going to be removed.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if removed correctly.
*/
bool change_removed_by_history(
CacheChange_t* a_change) override;
CacheChange_t* a_change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
};

} // namespace rtps
Expand Down
6 changes: 4 additions & 2 deletions include/fastdds/rtps/writer/StatelessWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class StatelessWriter : public RTPSWriter
/**
* Add a specific change to all ReaderLocators.
* @param change Pointer to the change.
* @param max_blocking_time
* @param[in] max_blocking_time Maximum time this method has to complete the task.
*/
void unsent_change_added_to_history(
CacheChange_t* change,
Expand All @@ -92,10 +92,12 @@ class StatelessWriter : public RTPSWriter
/**
* Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
* @param change Pointer to the change that is going to be removed.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if removed correctly.
*/
bool change_removed_by_history(
CacheChange_t* change) override;
CacheChange_t* change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;

/**
* Add a matched reader.
Expand Down
15 changes: 11 additions & 4 deletions include/fastrtps/attributes/LibrarySettingsAttributes.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,26 @@ class LibrarySettingsAttributes
{
public:

LibrarySettingsAttributes() {
LibrarySettingsAttributes()
{
}

virtual ~LibrarySettingsAttributes() {
virtual ~LibrarySettingsAttributes()
{
}

bool operator==(
bool operator ==(
const LibrarySettingsAttributes& b) const
{
return (intraprocess_delivery == b.intraprocess_delivery);
}

IntraprocessDeliveryType intraprocess_delivery = INTRAPROCESS_FULL;
IntraprocessDeliveryType intraprocess_delivery =
#if HAVE_STRICT_REALTIME
INTRAPROCESS_OFF;
#else
INTRAPROCESS_FULL;
#endif // if HAVE_STRICT_REALTIME
};

} // namespace fastrtps
Expand Down
8 changes: 6 additions & 2 deletions include/fastrtps/publisher/PublisherHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,12 @@ class PublisherHistory : public rtps::WriterHistory
bool remove_change_pub(
rtps::CacheChange_t* change);

virtual bool remove_change_g(
rtps::CacheChange_t* a_change);
bool remove_change_g(
rtps::CacheChange_t* a_change) override;

bool remove_change_g(
rtps::CacheChange_t* a_change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;

bool remove_instance_changes(
const rtps::InstanceHandle_t& handle,
Expand Down
25 changes: 23 additions & 2 deletions include/fastrtps/utils/TimedConditionVariable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#endif // if HAVE_STRICT_REALTIME && defined(__unix__)

#include <mutex>
#include <condition_variable>
#include <chrono>
#include <functional>

Expand Down Expand Up @@ -115,6 +116,25 @@ class TimedConditionVariable
return ret_value;
}

template<typename Mutex>
std::cv_status wait_for(
std::unique_lock<Mutex>& lock,
const std::chrono::nanoseconds& max_blocking_time)
{
auto nsecs = max_blocking_time;
struct timespec max_wait = {
0, 0
};
clock_gettime(CLOCK_MONOTONIC, &max_wait);
nsecs = nsecs + std::chrono::nanoseconds(max_wait.tv_nsec);
auto secs = std::chrono::duration_cast<std::chrono::seconds>(nsecs);
nsecs -= secs;
max_wait.tv_sec += secs.count();
max_wait.tv_nsec = (long)nsecs.count();
return (CV_TIMEDWAIT_(cv_, lock.mutex()->native_handle(),
&max_wait) == 0) ? std::cv_status::no_timeout : std::cv_status::timeout;
}

template<typename Mutex>
bool wait_until(
std::unique_lock<Mutex>& lock,
Expand All @@ -137,7 +157,7 @@ class TimedConditionVariable
}

template<typename Mutex>
bool wait_until(
std::cv_status wait_until(
std::unique_lock<Mutex>& lock,
const std::chrono::steady_clock::time_point& max_blocking_time)
{
Expand All @@ -147,7 +167,8 @@ class TimedConditionVariable
struct timespec max_wait = {
secs.time_since_epoch().count(), ns.count()
};
return (CV_TIMEDWAIT_(cv_, lock.mutex()->native_handle(), &max_wait) == 0);
return (CV_TIMEDWAIT_(cv_, lock.mutex()->native_handle(),
&max_wait) == 0) ? std::cv_status::no_timeout : std::cv_status::timeout;
}

void notify_one()
Expand Down
Loading

0 comments on commit a575bbb

Please sign in to comment.