diff --git a/include/fastdds/rtps/history/History.h b/include/fastdds/rtps/history/History.h index c897af10dfd..402c2cc84dc 100644 --- a/include/fastdds/rtps/history/History.h +++ b/include/fastdds/rtps/history/History.h @@ -145,6 +145,19 @@ class History const_iterator removal, bool release = true); + /** + * Remove a specific change from the history. + * No Thread Safe + * @param removal iterator to the CacheChange_t to remove. + * @param[in] max_blocking_time Maximum time this method has to complete the task. + * @param release defaults to true and hints if the CacheChange_t should return to the pool + * @return iterator to the next CacheChange_t or end iterator. + */ + RTPS_DllAPI virtual iterator remove_change_nts( + const_iterator removal, + const std::chrono::time_point& max_blocking_time, + bool release = true); + /** * Remove all changes from the History * @return True if everything was correctly removed. @@ -159,6 +172,16 @@ class History RTPS_DllAPI bool remove_change( CacheChange_t* ch); + /** + * Remove a specific change from the history. + * @param ch Pointer to the CacheChange_t. + * @param[in] max_blocking_time Maximum time this method has to complete the task. + * @return True if removed. + */ + RTPS_DllAPI bool remove_change( + CacheChange_t* ch, + const std::chrono::time_point& max_blocking_time); + /** * Find a specific change in the history using the matches_change method criteria. * @param ch Pointer to the CacheChange_t to search for. diff --git a/include/fastdds/rtps/history/ReaderHistory.h b/include/fastdds/rtps/history/ReaderHistory.h index b2729f312a6..0625915264f 100644 --- a/include/fastdds/rtps/history/ReaderHistory.h +++ b/include/fastdds/rtps/history/ReaderHistory.h @@ -156,6 +156,19 @@ class ReaderHistory : public History const_iterator removal, bool release = true) override; + /** + * Remove a specific change from the history. + * No Thread Safe + * @param removal iterator to the change for removal + * @param[in] max_blocking_time Maximum time this method has to complete the task. + * @param release specifies if the change must be returned to the pool + * @return iterator to the next change if any + */ + RTPS_DllAPI iterator remove_change_nts( + const_iterator removal, + const std::chrono::time_point& max_blocking_time, + bool release = true) override; + /** * Criteria to search a specific CacheChange_t on history * @param inner change to compare diff --git a/include/fastdds/rtps/history/WriterHistory.h b/include/fastdds/rtps/history/WriterHistory.h index 9ae54df789c..74423a7f05b 100644 --- a/include/fastdds/rtps/history/WriterHistory.h +++ b/include/fastdds/rtps/history/WriterHistory.h @@ -83,6 +83,19 @@ class WriterHistory : public rtps::History const_iterator removal, bool release = true) override; + /** + * Remove a specific change from the history. + * No Thread Safe + * @param removal iterator to the change for removal + * @param release specifies if the change should be return to the pool + * @param[in] max_blocking_time Maximum time this method has to complete the task. + * @return iterator to the next change if any + */ + RTPS_DllAPI iterator remove_change_nts( + const_iterator removal, + const std::chrono::time_point& max_blocking_time, + bool release = true) override; + /** * Criteria to search a specific CacheChange_t on history * @param inner change to compare @@ -99,6 +112,10 @@ class WriterHistory : public rtps::History RTPS_DllAPI virtual bool remove_change_g( CacheChange_t* a_change); + RTPS_DllAPI virtual bool remove_change_g( + CacheChange_t* a_change, + const std::chrono::time_point& max_blocking_time); + RTPS_DllAPI bool remove_change( const SequenceNumber_t& sequence_number); @@ -111,6 +128,14 @@ class WriterHistory : public rtps::History */ RTPS_DllAPI bool remove_min_change(); + /** + * Remove the CacheChange_t with the minimum sequenceNumber. + * @param[in] max_blocking_time Maximum time this method has to complete the task. + * @return True if correctly removed. + */ + RTPS_DllAPI bool remove_min_change( + const std::chrono::time_point& max_blocking_time); + RTPS_DllAPI SequenceNumber_t next_sequence_number() const { return m_lastCacheChangeSeqNum + 1; diff --git a/include/fastdds/rtps/messages/RTPSMessageGroup.h b/include/fastdds/rtps/messages/RTPSMessageGroup.h index 6b7c97d5e96..b2e1d98dfcb 100644 --- a/include/fastdds/rtps/messages/RTPSMessageGroup.h +++ b/include/fastdds/rtps/messages/RTPSMessageGroup.h @@ -83,11 +83,14 @@ class RTPSMessageGroup * Constructs a RTPSMessageGroup allowing to allocate its own buffer. * @param participant Pointer to the participant sending data. * @param internal_buffer true indicates this object to allocate its own buffer. false indicates to get a buffer + * @param max_blocking_time_point Future time point where blocking send should end. * from the participant. */ RTPSMessageGroup( RTPSParticipantImpl* participant, - bool internal_buffer = false); + bool internal_buffer = false, + std::chrono::steady_clock::time_point max_blocking_time_point = + std::chrono::steady_clock::now() + std::chrono::hours(24)); /** * Basic constructor. @@ -313,8 +316,6 @@ class RTPSMessageGroup std::chrono::steady_clock::time_point max_blocking_time_point_; - bool max_blocking_time_is_set_ = false; - std::unique_ptr send_buffer_; bool internal_buffer_ = false; diff --git a/include/fastdds/rtps/writer/LocatorSelectorSender.hpp b/include/fastdds/rtps/writer/LocatorSelectorSender.hpp index 5bc36599310..8e691dff09f 100644 --- a/include/fastdds/rtps/writer/LocatorSelectorSender.hpp +++ b/include/fastdds/rtps/writer/LocatorSelectorSender.hpp @@ -4,8 +4,7 @@ #include #include #include - -#include +#include namespace eprosima { namespace fastrtps { @@ -95,6 +94,18 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface mutex_.unlock(); } + /*! + * Try to lock the object. + * + * This kind of object needs to be locked because could be used outside the writer's mutex. + */ + template + bool try_lock_until( + const std::chrono::time_point& abs_time) + { + return mutex_.try_lock_until(abs_time); + } + fastrtps::rtps::LocatorSelector locator_selector; ResourceLimitedVector all_remote_readers; @@ -105,7 +116,7 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface RTPSWriter& writer_; - std::recursive_mutex mutex_; + RecursiveTimedMutex mutex_; }; } // namespace rtps diff --git a/include/fastdds/rtps/writer/RTPSWriter.h b/include/fastdds/rtps/writer/RTPSWriter.h index b92c820f4b9..c3813eff195 100644 --- a/include/fastdds/rtps/writer/RTPSWriter.h +++ b/include/fastdds/rtps/writer/RTPSWriter.h @@ -535,7 +535,7 @@ class RTPSWriter /** * Add a change to the unsent list. * @param change Pointer to the change to add. - * @param max_blocking_time + * @param[in] max_blocking_time Maximum time this method has to complete the task. */ virtual void unsent_change_added_to_history( CacheChange_t* change, @@ -544,10 +544,12 @@ class RTPSWriter /** * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement. * @param a_change Pointer to the change that is going to be removed. + * @param[in] max_blocking_time Maximum time this method has to complete the task. * @return True if removed correctly. */ virtual bool change_removed_by_history( - CacheChange_t* a_change) = 0; + CacheChange_t* a_change, + const std::chrono::time_point& max_blocking_time) = 0; bool is_datasharing_compatible_with( const ReaderProxyData& rdata) const; diff --git a/include/fastdds/rtps/writer/StatefulPersistentWriter.h b/include/fastdds/rtps/writer/StatefulPersistentWriter.h index 016ffddddf4..31a308d551b 100644 --- a/include/fastdds/rtps/writer/StatefulPersistentWriter.h +++ b/include/fastdds/rtps/writer/StatefulPersistentWriter.h @@ -83,7 +83,7 @@ class StatefulPersistentWriter : public StatefulWriter, private PersistentWriter /** * Add a specific change to all ReaderLocators. * @param p Pointer to the change. - * @param max_blocking_time + * @param[in] max_blocking_time Maximum time this method has to complete the task. */ void unsent_change_added_to_history( CacheChange_t* p, @@ -92,10 +92,12 @@ class StatefulPersistentWriter : public StatefulWriter, private PersistentWriter /** * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement. * @param a_change Pointer to the change that is going to be removed. + * @param[in] max_blocking_time Maximum time this method has to complete the task. * @return True if removed correctly. */ bool change_removed_by_history( - CacheChange_t* a_change) override; + CacheChange_t* a_change, + const std::chrono::time_point& max_blocking_time) override; }; } // namespace rtps diff --git a/include/fastdds/rtps/writer/StatefulWriter.h b/include/fastdds/rtps/writer/StatefulWriter.h index 3fc6bf8fa94..58c00f0fecc 100644 --- a/include/fastdds/rtps/writer/StatefulWriter.h +++ b/include/fastdds/rtps/writer/StatefulWriter.h @@ -134,7 +134,7 @@ class StatefulWriter : public RTPSWriter /** * Add a specific change to all ReaderLocators. * @param p Pointer to the change. - * @param max_blocking_time + * @param[in] max_blocking_time Maximum time this method has to complete the task. */ void unsent_change_added_to_history( CacheChange_t* p, @@ -143,10 +143,12 @@ class StatefulWriter : public RTPSWriter /** * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement. * @param a_change Pointer to the change that is going to be removed. + * @param[in] max_blocking_time Maximum time this method has to complete the task. * @return True if removed correctly. */ bool change_removed_by_history( - CacheChange_t* a_change) override; + CacheChange_t* a_change, + const std::chrono::time_point& max_blocking_time) override; /** * Sends a change directly to a intraprocess reader. diff --git a/include/fastdds/rtps/writer/StatelessPersistentWriter.h b/include/fastdds/rtps/writer/StatelessPersistentWriter.h index c92536e0a4d..d7ef75b01b9 100644 --- a/include/fastdds/rtps/writer/StatelessPersistentWriter.h +++ b/include/fastdds/rtps/writer/StatelessPersistentWriter.h @@ -74,7 +74,7 @@ class StatelessPersistentWriter : public StatelessWriter, private PersistentWrit /** * Add a specific change to all ReaderLocators. * @param p Pointer to the change. - * @param max_blocking_time + * @param[in] max_blocking_time Maximum time this method has to complete the task. */ void unsent_change_added_to_history( CacheChange_t* p, @@ -83,10 +83,12 @@ class StatelessPersistentWriter : public StatelessWriter, private PersistentWrit /** * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement. * @param a_change Pointer to the change that is going to be removed. + * @param[in] max_blocking_time Maximum time this method has to complete the task. * @return True if removed correctly. */ bool change_removed_by_history( - CacheChange_t* a_change) override; + CacheChange_t* a_change, + const std::chrono::time_point& max_blocking_time) override; }; } // namespace rtps diff --git a/include/fastdds/rtps/writer/StatelessWriter.h b/include/fastdds/rtps/writer/StatelessWriter.h index 923de00342b..168578e196f 100644 --- a/include/fastdds/rtps/writer/StatelessWriter.h +++ b/include/fastdds/rtps/writer/StatelessWriter.h @@ -83,7 +83,7 @@ class StatelessWriter : public RTPSWriter /** * Add a specific change to all ReaderLocators. * @param change Pointer to the change. - * @param max_blocking_time + * @param[in] max_blocking_time Maximum time this method has to complete the task. */ void unsent_change_added_to_history( CacheChange_t* change, @@ -92,10 +92,12 @@ class StatelessWriter : public RTPSWriter /** * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement. * @param change Pointer to the change that is going to be removed. + * @param[in] max_blocking_time Maximum time this method has to complete the task. * @return True if removed correctly. */ bool change_removed_by_history( - CacheChange_t* change) override; + CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) override; /** * Add a matched reader. diff --git a/include/fastrtps/attributes/LibrarySettingsAttributes.h b/include/fastrtps/attributes/LibrarySettingsAttributes.h index e0572c1117c..a967d73d5a6 100644 --- a/include/fastrtps/attributes/LibrarySettingsAttributes.h +++ b/include/fastrtps/attributes/LibrarySettingsAttributes.h @@ -38,19 +38,26 @@ class LibrarySettingsAttributes { public: - LibrarySettingsAttributes() { + LibrarySettingsAttributes() + { } - virtual ~LibrarySettingsAttributes() { + virtual ~LibrarySettingsAttributes() + { } - bool operator==( + bool operator ==( const LibrarySettingsAttributes& b) const { return (intraprocess_delivery == b.intraprocess_delivery); } - IntraprocessDeliveryType intraprocess_delivery = INTRAPROCESS_FULL; + IntraprocessDeliveryType intraprocess_delivery = +#if HAVE_STRICT_REALTIME + INTRAPROCESS_OFF; +#else + INTRAPROCESS_FULL; +#endif // if HAVE_STRICT_REALTIME }; } // namespace fastrtps diff --git a/include/fastrtps/publisher/PublisherHistory.h b/include/fastrtps/publisher/PublisherHistory.h index 5a4059fc52a..3198b650d2d 100644 --- a/include/fastrtps/publisher/PublisherHistory.h +++ b/include/fastrtps/publisher/PublisherHistory.h @@ -110,8 +110,12 @@ class PublisherHistory : public rtps::WriterHistory bool remove_change_pub( rtps::CacheChange_t* change); - virtual bool remove_change_g( - rtps::CacheChange_t* a_change); + bool remove_change_g( + rtps::CacheChange_t* a_change) override; + + bool remove_change_g( + rtps::CacheChange_t* a_change, + const std::chrono::time_point& max_blocking_time) override; bool remove_instance_changes( const rtps::InstanceHandle_t& handle, diff --git a/include/fastrtps/utils/TimedConditionVariable.hpp b/include/fastrtps/utils/TimedConditionVariable.hpp index ef375b301b3..d2c4a795336 100644 --- a/include/fastrtps/utils/TimedConditionVariable.hpp +++ b/include/fastrtps/utils/TimedConditionVariable.hpp @@ -55,6 +55,7 @@ #endif // if HAVE_STRICT_REALTIME && defined(__unix__) #include +#include #include #include @@ -115,6 +116,25 @@ class TimedConditionVariable return ret_value; } + template + std::cv_status wait_for( + std::unique_lock& lock, + const std::chrono::nanoseconds& max_blocking_time) + { + auto nsecs = max_blocking_time; + struct timespec max_wait = { + 0, 0 + }; + clock_gettime(CLOCK_MONOTONIC, &max_wait); + nsecs = nsecs + std::chrono::nanoseconds(max_wait.tv_nsec); + auto secs = std::chrono::duration_cast(nsecs); + nsecs -= secs; + max_wait.tv_sec += secs.count(); + max_wait.tv_nsec = (long)nsecs.count(); + return (CV_TIMEDWAIT_(cv_, lock.mutex()->native_handle(), + &max_wait) == 0) ? std::cv_status::no_timeout : std::cv_status::timeout; + } + template bool wait_until( std::unique_lock& lock, @@ -137,7 +157,7 @@ class TimedConditionVariable } template - bool wait_until( + std::cv_status wait_until( std::unique_lock& lock, const std::chrono::steady_clock::time_point& max_blocking_time) { @@ -147,7 +167,8 @@ class TimedConditionVariable struct timespec max_wait = { secs.time_since_epoch().count(), ns.count() }; - return (CV_TIMEDWAIT_(cv_, lock.mutex()->native_handle(), &max_wait) == 0); + return (CV_TIMEDWAIT_(cv_, lock.mutex()->native_handle(), + &max_wait) == 0) ? std::cv_status::no_timeout : std::cv_status::timeout; } void notify_one() diff --git a/src/cpp/fastdds/publisher/DataWriterHistory.cpp b/src/cpp/fastdds/publisher/DataWriterHistory.cpp index 07a6662f37c..c8d025a4257 100644 --- a/src/cpp/fastdds/publisher/DataWriterHistory.cpp +++ b/src/cpp/fastdds/publisher/DataWriterHistory.cpp @@ -149,7 +149,7 @@ bool DataWriterHistory::prepare_change( } else if (history_qos_.kind == KEEP_LAST_HISTORY_QOS) { - ret = this->remove_min_change(); + ret = this->remove_min_change(max_blocking_time); } // Notify if change has been removed unacknowledged @@ -347,6 +347,13 @@ bool DataWriterHistory::removeMinChange() bool DataWriterHistory::remove_change_pub( CacheChange_t* change) +{ + return DataWriterHistory::remove_change_pub(change, std::chrono::steady_clock::now() + std::chrono::hours(24)); +} + +bool DataWriterHistory::remove_change_pub( + CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) { if (mp_writer == nullptr || mp_mutex == nullptr) { @@ -354,10 +361,20 @@ bool DataWriterHistory::remove_change_pub( return false; } +#if HAVE_STRICT_REALTIME + std::unique_lock lock(*this->mp_mutex, std::defer_lock); + if (!lock.try_lock_until(max_blocking_time)) + { + EPROSIMA_LOG_ERROR(PUBLISHER, "Cannot lock the DataWriterHistory mutex"); + return false; + } +#else std::lock_guard guard(*this->mp_mutex); +#endif // if HAVE_STRICT_REALTIME + if (topic_att_.getTopicKind() == NO_KEY) { - if (remove_change(change)) + if (remove_change(change, max_blocking_time)) { m_isHistoryFull = false; return true; @@ -378,7 +395,7 @@ bool DataWriterHistory::remove_change_pub( { if (((*chit)->sequenceNumber == change->sequenceNumber) && ((*chit)->writerGUID == change->writerGUID)) { - if (remove_change(change)) + if (remove_change(change, max_blocking_time)) { vit->second.cache_changes.erase(chit); m_isHistoryFull = false; @@ -394,7 +411,14 @@ bool DataWriterHistory::remove_change_pub( bool DataWriterHistory::remove_change_g( CacheChange_t* a_change) { - return remove_change_pub(a_change); + return remove_change_pub(a_change, std::chrono::steady_clock::now() + std::chrono::hours(24)); +} + +bool DataWriterHistory::remove_change_g( + CacheChange_t* a_change, + const std::chrono::time_point& max_blocking_time) +{ + return remove_change_pub(a_change, max_blocking_time); } bool DataWriterHistory::remove_instance_changes( diff --git a/src/cpp/fastdds/publisher/DataWriterHistory.hpp b/src/cpp/fastdds/publisher/DataWriterHistory.hpp index 13213eb60b4..281d460417d 100644 --- a/src/cpp/fastdds/publisher/DataWriterHistory.hpp +++ b/src/cpp/fastdds/publisher/DataWriterHistory.hpp @@ -174,8 +174,22 @@ class DataWriterHistory : public fastrtps::rtps::WriterHistory bool remove_change_pub( fastrtps::rtps::CacheChange_t* change); - virtual bool remove_change_g( - fastrtps::rtps::CacheChange_t* a_change); + /** + * Remove a change by the publisher History. + * @param change Pointer to the CacheChange_t. + * @param[in] max_blocking_time Maximum time this method has to complete the task. + * @return True if removed. + */ + bool remove_change_pub( + fastrtps::rtps::CacheChange_t* change, + const std::chrono::time_point& max_blocking_time); + + bool remove_change_g( + fastrtps::rtps::CacheChange_t* a_change) override; + + bool remove_change_g( + fastrtps::rtps::CacheChange_t* a_change, + const std::chrono::time_point& max_blocking_time) override; bool remove_instance_changes( const fastrtps::rtps::InstanceHandle_t& handle, diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index 42bed2a9f51..7b1cafa53ce 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -454,6 +454,10 @@ ReturnCode_t DataWriterImpl::loan_sample( void*& sample, LoanInitializationKind initialization) { + // Block lowlevel writer + auto max_blocking_time = steady_clock::now() + + microseconds(::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time)); + // Type should be plain and have space for the representation header if (!type_->is_plain() || SerializedPayload_t::representation_header_size > type_->m_typeSize) { @@ -466,7 +470,16 @@ ReturnCode_t DataWriterImpl::loan_sample( return ReturnCode_t::RETCODE_NOT_ENABLED; } +#if HAVE_STRICT_REALTIME + std::unique_lock lock(writer_->getMutex(), std::defer_lock); + if (!lock.try_lock_until(max_blocking_time)) + { + return ReturnCode_t::RETCODE_TIMEOUT; + } +#else + static_cast(max_blocking_time); std::lock_guard lock(writer_->getMutex()); +#endif // if HAVE_STRICT_REALTIME // Get one payload from the pool PayloadInfo_t payload; diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp index e73b1043b6d..6bfa5813c9b 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -696,6 +696,14 @@ ReaderHistory::iterator DataReaderHistory::remove_change_nts( return ret_val; } +ReaderHistory::iterator DataReaderHistory::remove_change_nts( + ReaderHistory::const_iterator removal, + const std::chrono::time_point&, + bool release) +{ + return DataReaderHistory::remove_change_nts(removal, release); +} + bool DataReaderHistory::completed_change( CacheChange_t* change) { diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp index 23f5dae4ece..4feb7842685 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.hpp @@ -97,6 +97,21 @@ class DataReaderHistory : public eprosima::fastrtps::rtps::ReaderHistory const_iterator removal, bool release = true) override; + /** + * Remove a specific change from the history. + * No Thread Safe. + * + * @param removal iterator to the CacheChange_t to remove. + * @param[in] max_blocking_time Maximum time this method has to complete the task. + * @param release defaults to true and hints if the CacheChange_t should return to the pool + * + * @return iterator to the next CacheChange_t or end iterator. + */ + iterator remove_change_nts( + const_iterator removal, + const std::chrono::time_point& max_blocking_time, + bool release = true) override; + /** * Check if a new change can be added to this history. * diff --git a/src/cpp/fastrtps_deprecated/publisher/PublisherHistory.cpp b/src/cpp/fastrtps_deprecated/publisher/PublisherHistory.cpp index f7fe88f2612..a327450b34d 100644 --- a/src/cpp/fastrtps_deprecated/publisher/PublisherHistory.cpp +++ b/src/cpp/fastrtps_deprecated/publisher/PublisherHistory.cpp @@ -356,6 +356,13 @@ bool PublisherHistory::remove_change_g( return remove_change_pub(a_change); } +bool PublisherHistory::remove_change_g( + CacheChange_t* a_change, + const std::chrono::time_point&) +{ + return remove_change_pub(a_change); +} + bool PublisherHistory::remove_instance_changes( const rtps::InstanceHandle_t& handle, const rtps::SequenceNumber_t& seq_up_to) diff --git a/src/cpp/rtps/flowcontrol/FlowController.hpp b/src/cpp/rtps/flowcontrol/FlowController.hpp index 934e31aaea5..0f4605956d6 100644 --- a/src/cpp/rtps/flowcontrol/FlowController.hpp +++ b/src/cpp/rtps/flowcontrol/FlowController.hpp @@ -81,10 +81,13 @@ class FlowController * If the CacheChange_t is currently managed by this object, remove it. * This method should be called whenever a CacheChange_t is removed from the writer's history. * - * @param Pointer to the change which should be removed if it is currently managed by this object. + * @param[in] change Pointer to the change which should be removed if it is currently managed by this object. + * @param[in] max_blocking_time Maximum time this method has to complete the task. + * @return true if the sample could be removed. false otherwise. */ - virtual void remove_change( - fastrtps::rtps::CacheChange_t* change) = 0; + virtual bool remove_change( + fastrtps::rtps::CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) = 0; /*! * Return the maximum number of bytes can be used by the flow controller to generate a RTPS message. diff --git a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp index 57d8a06cff5..1060bd941aa 100644 --- a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp +++ b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp @@ -4,13 +4,13 @@ #include "FlowController.hpp" #include #include +#include +#include #include #include #include -#include #include -#include #include #include @@ -207,7 +207,7 @@ struct FlowControllerAsyncPublishMode if (running) { { - std::unique_lock lock(changes_interested_mutex); + std::unique_lock lock(changes_interested_mutex); running = false; cv.notify_one(); } @@ -222,7 +222,7 @@ struct FlowControllerAsyncPublishMode } bool wait( - std::unique_lock& lock) + std::unique_lock& lock) { cv.wait(lock); return false; @@ -242,12 +242,12 @@ struct FlowControllerAsyncPublishMode std::atomic_bool running {false}; - std::condition_variable cv; + fastrtps::TimedConditionVariable cv; fastrtps::rtps::RTPSMessageGroup group; //! Mutex for interested samples to be added. - std::mutex changes_interested_mutex; + fastrtps::TimedMutex changes_interested_mutex; //! Used to warning async thread a writer wants to remove a sample. std::atomic writers_interested_in_remove = {0}; @@ -319,7 +319,7 @@ struct FlowControllerLimitedAsyncPublishMode : public FlowControllerAsyncPublish * @return false if the condition_variable was awaken because a new change was added. true if the condition_variable was awaken because the bandwidth limitation has to be reset. */ bool wait( - std::unique_lock& lock) + std::unique_lock& lock) { auto lapse = std::chrono::steady_clock::now() - last_period_; bool reset_limit = true; @@ -960,7 +960,7 @@ class FlowControllerImpl : public FlowController void register_writer( fastrtps::rtps::RTPSWriter* writer) override { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); auto ret = writers_.insert({ writer->getGuid(), writer}); (void)ret; assert(ret.second); @@ -975,7 +975,7 @@ class FlowControllerImpl : public FlowController void unregister_writer( fastrtps::rtps::RTPSWriter* writer) override { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); writers_.erase(writer->getGuid()); unregister_writer_impl(writer); } @@ -1019,13 +1019,16 @@ class FlowControllerImpl : public FlowController * If currently the CacheChange_t is managed by this object, remove it. * This funcion should be called when a CacheChange_t is removed from the writer's history. * - * @param Pointer to the change which should be removed if it is currently managed by this object. + * @param[in] change Pointer to the change which should be removed if it is currently managed by this object. + * @param[in] max_blocking_time Maximum time this method has to complete the task. + * @return true if the sample could be removed. false otherwise. */ - void remove_change( - fastrtps::rtps::CacheChange_t* change) override + bool remove_change( + fastrtps::rtps::CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) override { assert(nullptr != change); - remove_change_impl(change); + return remove_change_impl(change, max_blocking_time); } uint32_t get_max_payload() override @@ -1065,7 +1068,7 @@ class FlowControllerImpl : public FlowController register_writer_impl( fastrtps::rtps::RTPSWriter* writer) { - std::unique_lock in_lock(async_mode.changes_interested_mutex); + std::unique_lock in_lock(async_mode.changes_interested_mutex); sched.register_writer(writer); } @@ -1082,7 +1085,7 @@ class FlowControllerImpl : public FlowController unregister_writer_impl( fastrtps::rtps::RTPSWriter* writer) { - std::unique_lock in_lock(async_mode.changes_interested_mutex); + std::unique_lock in_lock(async_mode.changes_interested_mutex); sched.unregister_writer(writer); } @@ -1104,15 +1107,25 @@ class FlowControllerImpl : public FlowController enqueue_new_sample_impl( fastrtps::rtps::RTPSWriter* writer, fastrtps::rtps::CacheChange_t* change, - const std::chrono::time_point& /* TODO max_blocking_time*/) + const std::chrono::time_point& max_blocking_time) { + bool ret_value = false; assert(!change->writer_info.is_linked.load()); // Sync delivery failed. Try to store for asynchronous delivery. - std::unique_lock lock(async_mode.changes_interested_mutex); - sched.add_new_sample(writer, change); - async_mode.cv.notify_one(); +#if HAVE_STRICT_REALTIME + std::unique_lock lock(async_mode.changes_interested_mutex, std::defer_lock); + if (lock.try_lock_until(max_blocking_time)) +#else + static_cast(max_blocking_time); + std::unique_lock lock(async_mode.changes_interested_mutex); +#endif // if HAVE_STRICT_REALTIME{ + { + sched.add_new_sample(writer, change); + async_mode.cv.notify_one(); + ret_value = true; + } - return true; + return ret_value; } /*! This function is used when PublishMode = FlowControllerPureSyncPublishMode. @@ -1143,17 +1156,32 @@ class FlowControllerImpl : public FlowController fastrtps::rtps::CacheChange_t* change, const std::chrono::time_point& max_blocking_time) { + bool ret_value = false; // This call should be made with writer's mutex locked. fastrtps::rtps::LocatorSelectorSender& locator_selector = writer->get_general_locator_selector(); - std::lock_guard lock(locator_selector); - fastrtps::rtps::RTPSMessageGroup group(participant_, writer, &locator_selector); - if (fastrtps::rtps::DeliveryRetCode::DELIVERED != - writer->deliver_sample_nts(change, group, locator_selector, max_blocking_time)) +#if HAVE_STRICT_REALTIME + std::unique_lock lock(locator_selector, std::defer_lock); + if (lock.try_lock_until(max_blocking_time)) +#else + std::unique_lock lock(locator_selector); +#endif // if HAVE_STRICT_REALTIME{ { - return enqueue_new_sample_impl(writer, change, max_blocking_time); + try + { + fastrtps::rtps::RTPSMessageGroup group(participant_, writer, &locator_selector, max_blocking_time); + ret_value = true; + if (fastrtps::rtps::DeliveryRetCode::DELIVERED != + writer->deliver_sample_nts(change, group, locator_selector, max_blocking_time)) + { + ret_value = enqueue_new_sample_impl(writer, change, max_blocking_time); + } + } + catch (fastrtps::rtps::RTPSMessageGroup::timeout&) + { + } } - return true; + return ret_value; } /*! @@ -1179,18 +1207,27 @@ class FlowControllerImpl : public FlowController add_old_sample_impl( fastrtps::rtps::RTPSWriter* writer, fastrtps::rtps::CacheChange_t* change, - const std::chrono::time_point& /* TODO max_blocking_time*/) + const std::chrono::time_point& max_blocking_time) { + bool ret_value = false; + if (!change->writer_info.is_linked.load()) { - std::unique_lock lock(async_mode.changes_interested_mutex); - sched.add_old_sample(writer, change); - async_mode.cv.notify_one(); - - return true; +#if HAVE_STRICT_REALTIME + std::unique_lock lock(async_mode.changes_interested_mutex, std::defer_lock); + if (lock.try_lock_until(max_blocking_time)) +#else + static_cast(max_blocking_time); + std::unique_lock lock(async_mode.changes_interested_mutex); +#endif // if HAVE_STRICT_REALTIME{ + { + sched.add_old_sample(writer, change); + async_mode.cv.notify_one(); + ret_value = true; + } } - return false; + return ret_value; } /*! This function is used when PublishMode = FlowControllerPureSyncPublishMode. @@ -1212,44 +1249,78 @@ class FlowControllerImpl : public FlowController * @note Before calling this function, the change's writer mutex have to be locked. */ template - typename std::enable_if::value, void>::type + typename std::enable_if::value, bool>::type remove_change_impl( - fastrtps::rtps::CacheChange_t* change) + fastrtps::rtps::CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) { + bool ret_value = true; if (change->writer_info.is_linked.load()) { ++async_mode.writers_interested_in_remove; - std::unique_lock lock(mutex_); - std::unique_lock interested_lock(async_mode.changes_interested_mutex); - - // When blocked, both pointer are different than nullptr or equal. - assert((nullptr != change->writer_info.previous && - nullptr != change->writer_info.next) || - (nullptr == change->writer_info.previous && - nullptr == change->writer_info.next)); - if (change->writer_info.is_linked.load()) +#if HAVE_STRICT_REALTIME + std::unique_lock lock(mutex_, std::defer_lock); + if (lock.try_lock_until(max_blocking_time)) +#else + static_cast(max_blocking_time); + std::unique_lock lock(mutex_); +#endif // if HAVE_STRICT_REALTIME { +#if HAVE_STRICT_REALTIME + std::unique_lock interested_lock(async_mode.changes_interested_mutex, + std::defer_lock); + if (interested_lock.try_lock_until(max_blocking_time)) +#else + std::unique_lock interested_lock(async_mode.changes_interested_mutex); +#endif // if HAVE_STRICT_REALTIME + { - // Try to join previous node and next node. - change->writer_info.previous->writer_info.next = change->writer_info.next; - change->writer_info.next->writer_info.previous = change->writer_info.previous; - change->writer_info.previous = nullptr; - change->writer_info.next = nullptr; - change->writer_info.is_linked.store(false); + // When blocked, both pointer are different than nullptr or equal. + assert((nullptr != change->writer_info.previous && + nullptr != change->writer_info.next) || + (nullptr == change->writer_info.previous && + nullptr == change->writer_info.next)); + if (change->writer_info.is_linked.load()) + { + + // Try to join previous node and next node. + change->writer_info.previous->writer_info.next = change->writer_info.next; + change->writer_info.next->writer_info.previous = change->writer_info.previous; + change->writer_info.previous = nullptr; + change->writer_info.next = nullptr; + change->writer_info.is_linked.store(false); + } + } +#if HAVE_STRICT_REALTIME + else + { + ret_value = !change->writer_info.is_linked.load(); + } +#endif // if HAVE_STRICT_REALTIME } +#if HAVE_STRICT_REALTIME + else + { + ret_value = !change->writer_info.is_linked.load(); + } +#endif // if HAVE_STRICT_REALTIME --async_mode.writers_interested_in_remove; } + + return ret_value; } /*! This function is used when PublishMode = FlowControllerPureSyncPublishMode. * In this case there is no async mechanism. */ template - typename std::enable_if::value, void>::type + typename std::enable_if::value, bool>::type remove_change_impl( - fastrtps::rtps::CacheChange_t*) const + fastrtps::rtps::CacheChange_t*, + const std::chrono::time_point&) { // Do nothing. + return true; } /*! @@ -1265,12 +1336,12 @@ class FlowControllerImpl : public FlowController continue; } - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); fastrtps::rtps::CacheChange_t* change_to_process = nullptr; //Check if we have to sleep. { - std::unique_lock in_lock(async_mode.changes_interested_mutex); + std::unique_lock in_lock(async_mode.changes_interested_mutex); // Add interested changes into the queue. sched.add_interested_changes_to_queue_nts(); @@ -1364,7 +1435,7 @@ class FlowControllerImpl : public FlowController // Add interested changes into the queue. { - std::unique_lock in_lock(async_mode.changes_interested_mutex); + std::unique_lock in_lock(async_mode.changes_interested_mutex); sched.add_interested_changes_to_queue_nts(); } @@ -1389,7 +1460,7 @@ class FlowControllerImpl : public FlowController return std::numeric_limits::max(); } - std::mutex mutex_; + fastrtps::TimedMutex mutex_; fastrtps::rtps::RTPSParticipantImpl* participant_ = nullptr; diff --git a/src/cpp/rtps/history/History.cpp b/src/cpp/rtps/history/History.cpp index a3368c164eb..1a3ee177fd5 100644 --- a/src/cpp/rtps/history/History.cpp +++ b/src/cpp/rtps/history/History.cpp @@ -94,23 +94,46 @@ History::iterator History::remove_change_nts( return m_changes.erase(removal); } +History::iterator History::remove_change_nts( + const_iterator removal, + const std::chrono::time_point&, + bool release) +{ + return History::remove_change_nts(removal, release); +} + bool History::remove_change( CacheChange_t* ch) { + return History::remove_change(ch, std::chrono::steady_clock::now() + std::chrono::hours(24)); +} + +bool History::remove_change( + CacheChange_t* ch, + const std::chrono::time_point& max_blocking_time) +{ +#if HAVE_STRICT_REALTIME + std::unique_lock lock(*mp_mutex, std::defer_lock); + if (!lock.try_lock_until(max_blocking_time)) + { + EPROSIMA_LOG_ERROR(PUBLISHER, "Cannot lock the DataWriterHistory mutex"); + return false; + } +#else std::lock_guard guard(*mp_mutex); +#endif // if HAVE_STRICT_REALTIME const_iterator it = find_change_nts(ch); + const_iterator end_it = changesEnd(); - if (it == changesEnd()) + if (it == end_it) { EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, "Trying to remove a change not in history"); return false; } // remove using the virtual method - remove_change_nts(it); - - return true; + return end_it != remove_change_nts(it, max_blocking_time); } bool History::remove_all_changes() diff --git a/src/cpp/rtps/history/ReaderHistory.cpp b/src/cpp/rtps/history/ReaderHistory.cpp index 8662e601770..db3400ed085 100644 --- a/src/cpp/rtps/history/ReaderHistory.cpp +++ b/src/cpp/rtps/history/ReaderHistory.cpp @@ -157,6 +157,14 @@ History::iterator ReaderHistory::remove_change_nts( return ret_val; } +History::iterator ReaderHistory::remove_change_nts( + const_iterator removal, + const std::chrono::time_point&, + bool release) +{ + return ReaderHistory::remove_change_nts(removal, release); +} + void ReaderHistory::writer_unmatched( const GUID_t& writer_guid, const SequenceNumber_t& last_notified_seq) diff --git a/src/cpp/rtps/history/WriterHistory.cpp b/src/cpp/rtps/history/WriterHistory.cpp index c5f34e1091e..2fcf0ae1e44 100644 --- a/src/cpp/rtps/history/WriterHistory.cpp +++ b/src/cpp/rtps/history/WriterHistory.cpp @@ -171,6 +171,15 @@ bool WriterHistory::matches_change( History::iterator WriterHistory::remove_change_nts( const_iterator removal, bool release) +{ + return WriterHistory::remove_change_nts(removal, std::chrono::steady_clock::now() + std::chrono::hours(24), + release); +} + +History::iterator WriterHistory::remove_change_nts( + const_iterator removal, + const std::chrono::time_point& max_blocking_time, + bool release) { if (mp_writer == nullptr || mp_mutex == nullptr) { @@ -185,27 +194,38 @@ History::iterator WriterHistory::remove_change_nts( return changesEnd(); } - // Remove from history CacheChange_t* change = *removal; - auto ret_val = m_changes.erase(removal); - m_isHistoryFull = false; // Inform writer - mp_writer->change_removed_by_history(change); - - // Release from pools - if ( release ) + if (mp_writer->change_removed_by_history(change, max_blocking_time)) { - mp_writer->release_change(change); + // Remove from history + auto ret_val = m_changes.erase(removal); + m_isHistoryFull = false; + + // Release from pools + if ( release ) + { + mp_writer->release_change(change); + } + + return ret_val; } - return ret_val; + return changesEnd(); } bool WriterHistory::remove_change_g( CacheChange_t* a_change) { - return remove_change(a_change); + return remove_change(a_change, std::chrono::steady_clock::now() + std::chrono::hours(24)); +} + +bool WriterHistory::remove_change_g( + CacheChange_t* a_change, + const std::chrono::time_point& max_blocking_time) +{ + return remove_change(a_change, max_blocking_time); } bool WriterHistory::remove_change( @@ -252,6 +272,12 @@ CacheChange_t* WriterHistory::remove_change_and_reuse( } bool WriterHistory::remove_min_change() +{ + return remove_min_change(std::chrono::steady_clock::now() + std::chrono::hours(24)); +} + +bool WriterHistory::remove_min_change( + const std::chrono::time_point& max_blocking_time) { if (mp_writer == nullptr || mp_mutex == nullptr) @@ -261,8 +287,18 @@ bool WriterHistory::remove_min_change() return false; } +#if HAVE_STRICT_REALTIME + std::unique_lock lock(*mp_mutex, std::defer_lock); + if (!lock.try_lock_until(max_blocking_time)) + { + EPROSIMA_LOG_ERROR(PUBLISHER, "Cannot lock the DataWriterHistory mutex"); + return false; + } +#else std::lock_guard guard(*mp_mutex); - if (m_changes.size() > 0 && remove_change_g(m_changes.front())) +#endif // if HAVE_STRICT_REALTIME + + if (m_changes.size() > 0 && remove_change_g(m_changes.front(), max_blocking_time)) { return true; } diff --git a/src/cpp/rtps/messages/RTPSMessageGroup.cpp b/src/cpp/rtps/messages/RTPSMessageGroup.cpp index 8bac73bfdba..5ca45d27cac 100644 --- a/src/cpp/rtps/messages/RTPSMessageGroup.cpp +++ b/src/cpp/rtps/messages/RTPSMessageGroup.cpp @@ -173,9 +173,11 @@ const EntityId_t& get_entity_id( RTPSMessageGroup::RTPSMessageGroup( RTPSParticipantImpl* participant, - bool internal_buffer) + bool internal_buffer, + std::chrono::steady_clock::time_point max_blocking_time_point) : participant_(participant) - , send_buffer_(!internal_buffer ? participant->get_send_buffer() : nullptr) + , max_blocking_time_point_(max_blocking_time_point) + , send_buffer_(!internal_buffer ? participant->get_send_buffer(max_blocking_time_point) : nullptr) , internal_buffer_(internal_buffer) { // Avoid warning when neither SECURITY nor DEBUG is used @@ -220,14 +222,12 @@ RTPSMessageGroup::RTPSMessageGroup( Endpoint* endpoint, RTPSMessageSenderInterface* msg_sender, std::chrono::steady_clock::time_point max_blocking_time_point) - : RTPSMessageGroup(participant) + : RTPSMessageGroup(participant, false, max_blocking_time_point) { assert(endpoint); endpoint_ = endpoint; sender_ = msg_sender; - max_blocking_time_point_ = max_blocking_time_point; - max_blocking_time_is_set_ = true; } RTPSMessageGroup::~RTPSMessageGroup() noexcept(false) @@ -300,8 +300,7 @@ void RTPSMessageGroup::send() eprosima::fastdds::statistics::rtps::add_statistics_submessage(msgToSend); if (!sender_->send(msgToSend, - max_blocking_time_is_set_ ? max_blocking_time_point_ : (std::chrono::steady_clock::now() + - std::chrono::hours(24)))) + max_blocking_time_point_)) { throw timeout(); } diff --git a/src/cpp/rtps/messages/SendBuffersManager.cpp b/src/cpp/rtps/messages/SendBuffersManager.cpp index 7b53c896993..c9d4a7e3c3a 100644 --- a/src/cpp/rtps/messages/SendBuffersManager.cpp +++ b/src/cpp/rtps/messages/SendBuffersManager.cpp @@ -18,6 +18,7 @@ #include "SendBuffersManager.hpp" #include "../participant/RTPSParticipantImpl.h" +#include namespace eprosima { namespace fastrtps { @@ -34,7 +35,7 @@ SendBuffersManager::SendBuffersManager( void SendBuffersManager::init( const RTPSParticipantImpl* participant) { - std::lock_guard guard(mutex_); + std::lock_guard guard(mutex_); if (n_created_ < pool_.capacity()) { @@ -74,9 +75,18 @@ void SendBuffersManager::init( } std::unique_ptr SendBuffersManager::get_buffer( - const RTPSParticipantImpl* participant) + const RTPSParticipantImpl* participant, + const std::chrono::steady_clock::time_point& max_blocking_time) { - std::unique_lock lock(mutex_); +#if HAVE_STRICT_REALTIME + std::unique_lock lock(mutex_, std::defer_lock); + if (!lock.try_lock_until(max_blocking_time)) + { + throw RTPSMessageGroup::timeout(); + } +#else + std::unique_lock lock(mutex_); +#endif // if HAVE_STRICT_REALTIME std::unique_ptr ret_val; @@ -89,7 +99,10 @@ std::unique_ptr SendBuffersManager::get_buffer( else { EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, "Waiting for send buffer"); - available_cv_.wait(lock); + if (std::cv_status::timeout == available_cv_.wait_until(lock, max_blocking_time)) + { + throw RTPSMessageGroup::timeout(); + } } } @@ -102,7 +115,7 @@ std::unique_ptr SendBuffersManager::get_buffer( void SendBuffersManager::return_buffer( std::unique_ptr && buffer) { - std::lock_guard guard(mutex_); + std::lock_guard guard(mutex_); pool_.push_back(std::move(buffer)); available_cv_.notify_one(); } diff --git a/src/cpp/rtps/messages/SendBuffersManager.hpp b/src/cpp/rtps/messages/SendBuffersManager.hpp index 744b64ae87f..fccc584a0a8 100644 --- a/src/cpp/rtps/messages/SendBuffersManager.hpp +++ b/src/cpp/rtps/messages/SendBuffersManager.hpp @@ -22,11 +22,11 @@ #include "RTPSMessageGroup_t.hpp" #include +#include +#include #include // std::vector #include // std::unique_ptr -#include // std::mutex -#include // std::condition_variable namespace eprosima { @@ -68,10 +68,12 @@ class SendBuffersManager /** * Get one buffer from the pool. * @param participant Pointer to the participant asking for a buffer. + * @param max_blocking_time Maximum time the function can be blocked. * @return unique pointer to a send buffer. */ std::unique_ptr get_buffer( - const RTPSParticipantImpl* participant); + const RTPSParticipantImpl* participant, + const std::chrono::steady_clock::time_point& max_blocking_time); /** * Return one buffer to the pool. @@ -86,7 +88,7 @@ class SendBuffersManager const RTPSParticipantImpl* participant); //!Protects all data - std::mutex mutex_; + TimedMutex mutex_; //!Send buffers pool std::vector> pool_; //!Raw buffer shared by the buffers created inside init() @@ -96,13 +98,13 @@ class SendBuffersManager //!Whether we allow n_created_ to grow beyond the pool_ capacity. bool allow_growing_ = true; //!To wait for a buffer to be returned to the pool. - std::condition_variable available_cv_; + TimedConditionVariable available_cv_; }; } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ -#endif +#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #endif // RTPS_MESSAGES_SENDBUFFERSMANAGER_HPP diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 48f00d6a449..0e801116bc8 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -2278,9 +2278,10 @@ void RTPSParticipantImpl::set_check_type_function( type_check_fn_ = std::move(check_type); } -std::unique_ptr RTPSParticipantImpl::get_send_buffer() +std::unique_ptr RTPSParticipantImpl::get_send_buffer( + const std::chrono::steady_clock::time_point& max_blocking_time) { - return send_buffers_->get_buffer(this); + return send_buffers_->get_buffer(this, max_blocking_time); } void RTPSParticipantImpl::return_send_buffer( diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 03b68851747..2f343bc9cc8 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -470,7 +470,8 @@ class RTPSParticipantImpl return false; } - std::unique_ptr get_send_buffer(); + std::unique_ptr get_send_buffer( + const std::chrono::steady_clock::time_point& max_blocking_time); void return_send_buffer( std::unique_ptr && buffer); diff --git a/src/cpp/rtps/writer/RTPSWriter.cpp b/src/cpp/rtps/writer/RTPSWriter.cpp index f967d28f9d2..e2fde462d20 100644 --- a/src/cpp/rtps/writer/RTPSWriter.cpp +++ b/src/cpp/rtps/writer/RTPSWriter.cpp @@ -152,7 +152,7 @@ void RTPSWriter::deinit() std::lock_guard guard(mp_mutex); for (auto it = mp_history->changesBegin(); it != mp_history->changesEnd(); ++it) { - flow_controller_->remove_change(*it); + flow_controller_->remove_change(*it, std::chrono::steady_clock::now() + std::chrono::hours(24)); } } for (auto it = mp_history->changesBegin(); it != mp_history->changesEnd(); ++it) diff --git a/src/cpp/rtps/writer/StatefulPersistentWriter.cpp b/src/cpp/rtps/writer/StatefulPersistentWriter.cpp index a1368b7df5f..459dd28a468 100644 --- a/src/cpp/rtps/writer/StatefulPersistentWriter.cpp +++ b/src/cpp/rtps/writer/StatefulPersistentWriter.cpp @@ -88,10 +88,11 @@ void StatefulPersistentWriter::unsent_change_added_to_history( } bool StatefulPersistentWriter::change_removed_by_history( - CacheChange_t* change) + CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) { remove_persistent_change(change); - return StatefulWriter::change_removed_by_history(change); + return StatefulWriter::change_removed_by_history(change, max_blocking_time); } void StatefulPersistentWriter::print_inconsistent_acknack( diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index a9b223c1b12..cd265475844 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -533,44 +533,50 @@ bool StatefulWriter::intraprocess_heartbeat( } bool StatefulWriter::change_removed_by_history( - CacheChange_t* a_change) + CacheChange_t* a_change, + const std::chrono::time_point& max_blocking_time) { + bool ret_value = false; SequenceNumber_t sequence_number = a_change->sequenceNumber; std::lock_guard guard(mp_mutex); EPROSIMA_LOG_INFO(RTPS_WRITER, "Change " << sequence_number << " to be removed."); - flow_controller_->remove_change(a_change); - - // Take note of biggest removed sequence number to improve sending of gaps - if (sequence_number > biggest_removed_sequence_number_) + if (flow_controller_->remove_change(a_change, max_blocking_time)) { - biggest_removed_sequence_number_ = sequence_number; - } - // Invalidate CacheChange pointer in ReaderProxies. - for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, - [sequence_number](ReaderProxy* reader) - { - reader->change_has_been_removed(sequence_number); - return false; - } - ); + // Take note of biggest removed sequence number to improve sending of gaps + if (sequence_number > biggest_removed_sequence_number_) + { + biggest_removed_sequence_number_ = sequence_number; + } - // remove from datasharing pool history - if (is_datasharing_compatible()) - { - auto pool = std::dynamic_pointer_cast(payload_pool_); - assert (pool != nullptr); + // Invalidate CacheChange pointer in ReaderProxies. + for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, + [sequence_number](ReaderProxy* reader) + { + reader->change_has_been_removed(sequence_number); + return false; + } + ); - pool->remove_from_shared_history(a_change); - EPROSIMA_LOG_INFO(RTPS_WRITER, "Removing shared cache change with SN " << a_change->sequenceNumber); - } + // remove from datasharing pool history + if (is_datasharing_compatible()) + { + auto pool = std::dynamic_pointer_cast(payload_pool_); + assert (pool != nullptr); - may_remove_change_ = 2; - may_remove_change_cond_.notify_one(); + pool->remove_from_shared_history(a_change); + EPROSIMA_LOG_INFO(RTPS_WRITER, "Removing shared cache change with SN " << a_change->sequenceNumber); + } - return true; + may_remove_change_ = 2; + may_remove_change_cond_.notify_one(); + + ret_value = true; + } + + return ret_value; } void StatefulWriter::send_heartbeat_to_all_readers() diff --git a/src/cpp/rtps/writer/StatelessPersistentWriter.cpp b/src/cpp/rtps/writer/StatelessPersistentWriter.cpp index 8d86f62d4fc..872e9a2da12 100644 --- a/src/cpp/rtps/writer/StatelessPersistentWriter.cpp +++ b/src/cpp/rtps/writer/StatelessPersistentWriter.cpp @@ -86,10 +86,11 @@ void StatelessPersistentWriter::unsent_change_added_to_history( } bool StatelessPersistentWriter::change_removed_by_history( - CacheChange_t* change) + CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) { remove_persistent_change(change); - return StatelessWriter::change_removed_by_history(change); + return StatelessWriter::change_removed_by_history(change, max_blocking_time); } } // namespace rtps diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 4f57e569e6e..c68b13efc90 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -360,29 +360,35 @@ bool StatelessWriter::intraprocess_delivery( } bool StatelessWriter::change_removed_by_history( - CacheChange_t* change) + CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) { + bool ret_value = false; std::lock_guard guard(mp_mutex); - flow_controller_->remove_change(change); - - // remove from datasharing pool history - if (is_datasharing_compatible()) + if (flow_controller_->remove_change(change, max_blocking_time)) { - auto pool = std::dynamic_pointer_cast(payload_pool_); - assert (pool != nullptr); - pool->remove_from_shared_history(change); - EPROSIMA_LOG_INFO(RTPS_WRITER, "Removing shared cache change with SN " << change->sequenceNumber); - } + // remove from datasharing pool history + if (is_datasharing_compatible()) + { + auto pool = std::dynamic_pointer_cast(payload_pool_); + assert (pool != nullptr); - const uint64_t sequence_number = change->sequenceNumber.to64long(); - if (sequence_number > last_sequence_number_sent_) - { - unsent_changes_cond_.notify_all(); + pool->remove_from_shared_history(change); + EPROSIMA_LOG_INFO(RTPS_WRITER, "Removing shared cache change with SN " << change->sequenceNumber); + } + + const uint64_t sequence_number = change->sequenceNumber.to64long(); + if (sequence_number > last_sequence_number_sent_) + { + unsent_changes_cond_.notify_all(); + } + + ret_value = true; } - return true; + return ret_value; } bool StatelessWriter::has_been_fully_delivered( diff --git a/test/mock/rtps/RTPSMessageGroup/fastdds/rtps/messages/RTPSMessageGroup.h b/test/mock/rtps/RTPSMessageGroup/fastdds/rtps/messages/RTPSMessageGroup.h index 2cfa91dce52..6b45a53c377 100644 --- a/test/mock/rtps/RTPSMessageGroup/fastdds/rtps/messages/RTPSMessageGroup.h +++ b/test/mock/rtps/RTPSMessageGroup/fastdds/rtps/messages/RTPSMessageGroup.h @@ -20,6 +20,8 @@ #ifndef _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_ #define _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_ +#include + #include namespace eprosima { @@ -34,6 +36,18 @@ class RTPSMessageGroup { public: + class timeout : public std::runtime_error + { + public: + + timeout() + : std::runtime_error("timeout") + { + } + + virtual ~timeout() = default; + }; + RTPSMessageGroup( RTPSParticipantImpl*, bool) @@ -43,7 +57,8 @@ class RTPSMessageGroup RTPSMessageGroup( RTPSParticipantImpl*, Endpoint*, - const RTPSMessageSenderInterface*) + const RTPSMessageSenderInterface*, + std::chrono::steady_clock::time_point) { } diff --git a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h index 5d3a479d255..e595dc7cbe5 100644 --- a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h +++ b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h @@ -155,6 +155,14 @@ class ReaderHistory return m_changes.erase(removal); } + virtual iterator remove_change_nts( + const_iterator removal, + const std::chrono::time_point&, + bool release = true) + { + return remove_change_nts(removal, release); + } + virtual void writer_unmatched( const GUID_t& /*writer_guid*/, const SequenceNumber_t& /*last_notified_seq*/) diff --git a/test/mock/rtps/WriterHistory/fastdds/rtps/history/WriterHistory.h b/test/mock/rtps/WriterHistory/fastdds/rtps/history/WriterHistory.h index b0079a61139..5fc21c0ab5b 100644 --- a/test/mock/rtps/WriterHistory/fastdds/rtps/history/WriterHistory.h +++ b/test/mock/rtps/WriterHistory/fastdds/rtps/history/WriterHistory.h @@ -50,6 +50,8 @@ class WriterHistory { } + virtual ~WriterHistory() = default; + using iterator = std::vector::iterator; // *INDENT-OFF* Uncrustify makes a mess with MOCK_METHOD macros @@ -83,10 +85,16 @@ class WriterHistory MOCK_METHOD1(remove_change_mock, bool(CacheChange_t*)); + MOCK_METHOD2(remove_change_mock, bool(CacheChange_t*, + const std::chrono::time_point& max_blocking_time)); + MOCK_METHOD0(getHistorySize, size_t()); MOCK_METHOD0(remove_min_change, bool()); + MOCK_METHOD1(remove_min_change, bool( + const std::chrono::time_point& max_blocking_time)); + MOCK_METHOD3(add_change_, bool( CacheChange_t* a_change, WriteParams &wparams, @@ -125,6 +133,28 @@ class WriterHistory return ret; } + bool remove_change( + CacheChange_t* change, + const std::chrono::time_point& max_blocking_time) + { + bool ret = remove_change_mock(change, max_blocking_time); + delete change; + return ret; + } + + virtual bool remove_change_g( + fastrtps::rtps::CacheChange_t* a_change) + { + return remove_change(a_change); + } + + virtual bool remove_change_g( + fastrtps::rtps::CacheChange_t* a_change, + const std::chrono::time_point& max_blocking_time) + { + return remove_change(a_change, max_blocking_time); + } + void wait_for_more_samples_than( unsigned int minimum) { diff --git a/test/realtime/UserThreadNonBlockedTest.cpp b/test/realtime/UserThreadNonBlockedTest.cpp index 27d691e3e83..970d9d777df 100644 --- a/test/realtime/UserThreadNonBlockedTest.cpp +++ b/test/realtime/UserThreadNonBlockedTest.cpp @@ -1,12 +1,19 @@ #include "mutex_testing_tool/TMutex.hpp" -#include -#include -#include -#include -#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include -#include +#include #include + #include #include @@ -14,203 +21,260 @@ #include #include -class DummyType:public eprosima::fastrtps::TopicDataType +#if defined(_WIN32) +#define GET_PID _getpid +#else +#define GET_PID getpid +#endif // if defined(_WIN32) + +const char* SLOW_FLOW_NAME = "SlowFlow"; + +class DummyType : public eprosima::fastdds::dds::TopicDataType { - public: +public: - DummyType() - { - setName("DummyType"); - m_typeSize = 4 + 4 /*encapsulation*/; - m_isGetKeyDefined = false; - } + DummyType() + { + setName("DummyType"); + m_typeSize = 4 + 4 /*encapsulation*/; + m_isGetKeyDefined = false; + } - DummyType(int32_t value) : DummyType() - { - value_ = value; - } + DummyType( + int32_t value) + : DummyType() + { + value_ = value; + } - virtual ~DummyType() = default; + virtual ~DummyType() = default; - bool serialize( - void*data, - eprosima::fastrtps::rtps::SerializedPayload_t* payload) - { - DummyType* sample = reinterpret_cast(data); - // Object that manages the raw buffer. - eprosima::fastcdr::FastBuffer fastbuffer((char*)payload->data, payload->max_size); - // Object that serializes the data. - eprosima::fastcdr::Cdr ser(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); - payload->encapsulation = ser.endianness() == eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE; - // Serialize encapsulation - ser.serialize_encapsulation(); - //serialize the object: - ser.serialize(sample->value_); - payload->length = (uint32_t)ser.getSerializedDataLength(); - return true; - } + bool serialize( + void* data, + eprosima::fastrtps::rtps::SerializedPayload_t* payload) + { + DummyType* sample = reinterpret_cast(data); + // Object that manages the raw buffer. + eprosima::fastcdr::FastBuffer fastbuffer((char*)payload->data, payload->max_size); + // Object that serializes the data. + eprosima::fastcdr::Cdr ser(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); + payload->encapsulation = ser.endianness() == eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE; + // Serialize encapsulation + ser.serialize_encapsulation(); + //serialize the object: + ser.serialize(sample->value_); + payload->length = (uint32_t)ser.getSerializedDataLength(); + return true; + } - bool deserialize( - eprosima::fastrtps::rtps::SerializedPayload_t* payload, - void * data) - { - DummyType* sample = reinterpret_cast(data); - // Object that manages the raw buffer. - eprosima::fastcdr::FastBuffer fastbuffer((char*)payload->data, payload->length); - // Object that serializes the data. - eprosima::fastcdr::Cdr deser(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); // Object that deserializes the data. - // Deserialize encapsulation. - deser.read_encapsulation(); - payload->encapsulation = deser.endianness() == eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE; - //serialize the object: - deser.deserialize(sample->value_); - return true; - } + bool deserialize( + eprosima::fastrtps::rtps::SerializedPayload_t* payload, + void* data) + { + DummyType* sample = reinterpret_cast(data); + // Object that manages the raw buffer. + eprosima::fastcdr::FastBuffer fastbuffer((char*)payload->data, payload->length); + // Object that serializes the data. + eprosima::fastcdr::Cdr deser(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); // Object that deserializes the data. + // Deserialize encapsulation. + deser.read_encapsulation(); + payload->encapsulation = deser.endianness() == eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE; + //serialize the object: + deser.deserialize(sample->value_); + return true; + } - std::function getSerializedSizeProvider(void*) - { - return []() -> uint32_t { - return 4 + 4 /*encapsulation*/; - }; - } + std::function getSerializedSizeProvider( + void*) + { + return []() -> uint32_t + { + return 4 + 4 /*encapsulation*/; + }; + } - bool getKey( - void*, - eprosima::fastrtps::rtps::InstanceHandle_t*, - bool) - { - return false; - } + bool getKey( + void*, + eprosima::fastrtps::rtps::InstanceHandle_t*, + bool) + { + return false; + } - void* createData() - { - return reinterpret_cast(new DummyType()); - } + void* createData() + { + return reinterpret_cast(new DummyType()); + } - void deleteData(void* data) - { - delete(reinterpret_cast(data)); - } + void deleteData( + void* data) + { + delete(reinterpret_cast(data)); + } - private: +private: - int32_t value_; + int32_t value_; }; class UserThreadNonBlockedTest : public ::testing::Test { - protected: +protected: - virtual void SetUp() - { - // Create publisher - publisher_attr_.topic.topicDataType = type_.getName(); - publisher_attr_.topic.topicName = "Dummy"; - - // Create subscriber - subscriber_attr_.topic.topicDataType = type_.getName(); - subscriber_attr_.topic.topicName = "Dummy"; - subscriber_attr_.times.initialAcknackDelay.seconds = 10; - subscriber_attr_.times.heartbeatResponseDelay.seconds = 10; - } + virtual void SetUp() + { + // Disable SHM transport, DataSharing and Intraprocess + auto udp_transport = std::make_shared(); + participant_qos_.transport().user_transports.push_back(udp_transport); + participant_qos_.transport().use_builtin_transports = false; + datawriter_qos_.data_sharing().off(); + datareader_qos_.data_sharing().off(); + eprosima::fastrtps::LibrarySettingsAttributes library_attributes; + library_attributes.intraprocess_delivery = eprosima::fastrtps::INTRAPROCESS_OFF; + eprosima::fastrtps::xmlparser::XMLProfileManager::library_settings(library_attributes); + + datareader_qos_.reliable_reader_qos().times.initialAcknackDelay.seconds = 10; + datareader_qos_.reliable_reader_qos().times.heartbeatResponseDelay.seconds = 10; + + // Slow flow-controller used in some test + auto slow_flowcontroller = std::make_shared(); + slow_flowcontroller->name = SLOW_FLOW_NAME; + slow_flowcontroller->period_ms = 10000; + slow_flowcontroller->max_bytes_per_period = 1; + participant_qos_.flow_controllers().push_back(slow_flowcontroller); + } - virtual void TearDown() - { - assert(participant_); - eprosima::fastrtps::Domain::removeParticipant(participant_); - participant_ = nullptr; - } + virtual void TearDown() + { + assert(participant_); + participant_->delete_contained_entities(); + eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(participant_); + participant_ = nullptr; + } - void init() - { - // Create participant - participant_ = eprosima::fastrtps::Domain::createParticipant(participant_attr_); - assert(participant_); + void init() + { + // Create participant + participant_ = eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant( + GET_PID() % 230, participant_qos_); + assert(participant_); - // Register type - eprosima::fastrtps::Domain::registerType(participant_, &type_); + // Register type + type_.register_type(participant_); - publisher_ = eprosima::fastrtps::Domain::createPublisher(participant_, publisher_attr_, nullptr); - assert(publisher_); + eprosima::fastdds::dds::Publisher* publisher = participant_->create_publisher( + eprosima::fastdds::dds::PUBLISHER_QOS_DEFAULT); + assert(publisher); - subscriber_ = eprosima::fastrtps::Domain::createSubscriber(participant_, subscriber_attr_, nullptr); - assert(subscriber_); - } + eprosima::fastdds::dds::Subscriber* subscriber = participant_->create_subscriber( + eprosima::fastdds::dds::SUBSCRIBER_QOS_DEFAULT); + assert(subscriber); + + eprosima::fastdds::dds::Topic* topic = participant_->create_topic("DummyType", type_->getName(), + eprosima::fastdds::dds::TOPIC_QOS_DEFAULT); + + assert(topic); - public: + datawriter_ = publisher->create_datawriter(topic, datawriter_qos_); - UserThreadNonBlockedTest() = default; + assert(datawriter_); - eprosima::fastrtps::ParticipantAttributes participant_attr_; + datareader_ = subscriber->create_datareader(topic, datareader_qos_); - eprosima::fastrtps::Participant* participant_; + assert(datareader_); + } + +public: + + UserThreadNonBlockedTest() = default; + + eprosima::fastdds::dds::DomainParticipantQos participant_qos_; - DummyType type_; + eprosima::fastdds::dds::DomainParticipant* participant_; - eprosima::fastrtps::PublisherAttributes publisher_attr_; + eprosima::fastdds::dds::TypeSupport type_ {new DummyType()}; - eprosima::fastrtps::Publisher* publisher_; + eprosima::fastdds::dds::DataWriterQos datawriter_qos_; - eprosima::fastrtps::SubscriberAttributes subscriber_attr_; + eprosima::fastdds::dds::DataWriter* datawriter_; - eprosima::fastrtps::Subscriber* subscriber_; + eprosima::fastdds::dds::DataReaderQos datareader_qos_; + + eprosima::fastdds::dds::DataReader* datareader_; }; -TEST_F(UserThreadNonBlockedTest, write_sample_besteffort) +/*! + * @test Tests the mutexes involved in calling `DataWriter::write()` for publishing a new sample when there is another + * one in the process of being sent + * and it has to be removed. + */ +TEST_F(UserThreadNonBlockedTest, remove_previous_sample_on_history) { - publisher_attr_.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; - subscriber_attr_.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; + datawriter_qos_.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; + datawriter_qos_.publish_mode().kind = eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE; + datawriter_qos_.publish_mode().flow_controller_name = SLOW_FLOW_NAME; init(); DummyType sample{1}; + datawriter_->write(reinterpret_cast(&sample)); + // Record the mutexes. eprosima::fastrtps::tmutex_start_recording(); - publisher_->write(reinterpret_cast(&sample)); + datawriter_->write(reinterpret_cast(&sample)); eprosima::fastrtps::tmutex_stop_recording(); - ASSERT_EQ(2, eprosima::fastrtps::tmutex_get_num_mutexes()); - ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); - ASSERT_EQ(2, eprosima::fastrtps::tmutex_get_num_timedlock_type()); - - for(size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) + for (size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) { std::cout << "Testing mutex " << count << std::endl; // Start testing locking the mutexes. - eprosima::fastrtps::tmutex_lock_mutex(count); - - std::promise> promise; - std::future> future = promise.get_future(); - std::thread([&] - { - auto now = std::chrono::steady_clock::now(); - bool returned_value = publisher_->write(reinterpret_cast(&sample)); - auto end = std::chrono::steady_clock::now(); - promise.set_value_at_thread_exit( std::pair(returned_value, - std::chrono::duration_cast(end - now))); - }).detach(); - future.wait(); - auto returned_value = future.get(); - // If main mutex cannot be taken, the write fails. - // But for the rest the information is stored and it is as if the samples was sent. - ASSERT_EQ(count == 0 ? false : true, returned_value.first); - std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( - publisher_attr_.qos.m_reliability.max_blocking_time)); - ASSERT_GE(returned_value.second, max_w); - ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); - - eprosima::fastrtps::tmutex_unlock_mutex(count); + if (eprosima::fastrtps::tmutex_lock_mutex(count)) + { + std::promise> promise; + std::future> future = promise.get_future(); + std::thread([&] + { + auto now = std::chrono::steady_clock::now(); + bool returned_value = datawriter_->write(reinterpret_cast(&sample)); + auto end = std::chrono::steady_clock::now(); + promise.set_value_at_thread_exit( std::pair(returned_value, + std::chrono::duration_cast(end - now))); + }).detach(); + future.wait(); + auto returned_value = future.get(); + // If main mutex cannot be taken, the write fails. + // But for the rest the information is stored and it is as if the samples was sent. + ASSERT_EQ(count <= 3 ? false : true, returned_value.first); + std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( + datawriter_qos_.reliability().max_blocking_time)); + ASSERT_GE(returned_value.second, max_w); + ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); + + eprosima::fastrtps::tmutex_unlock_mutex(count); + } + else + { + std::cout << "Mutex " << count << " is not a timed lock. Pass.." << std::endl; + } } + + ASSERT_EQ(5, eprosima::fastrtps::tmutex_get_num_mutexes()); + ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); + ASSERT_EQ(5, eprosima::fastrtps::tmutex_get_num_timedlock_type()); } -TEST_F(UserThreadNonBlockedTest, write_sample_reliable) +/*! + * @test Tests the mutexes involved in calling `DataWriter::write()` for publishing a new sample using best-effort + * reliability. + */ +TEST_F(UserThreadNonBlockedTest, write_sample_besteffort) { - publisher_attr_.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; - subscriber_attr_.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; + datawriter_qos_.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; + datareader_qos_.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; init(); DummyType sample{1}; @@ -218,49 +282,57 @@ TEST_F(UserThreadNonBlockedTest, write_sample_reliable) // Record the mutexes. eprosima::fastrtps::tmutex_start_recording(); - publisher_->write(reinterpret_cast(&sample)); + datawriter_->write(reinterpret_cast(&sample)); eprosima::fastrtps::tmutex_stop_recording(); - ASSERT_EQ(3, eprosima::fastrtps::tmutex_get_num_mutexes()); - ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); - ASSERT_EQ(3, eprosima::fastrtps::tmutex_get_num_timedlock_type()); - - for(size_t count = 0; count < 2; ++count) + for (size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) { std::cout << "Testing mutex " << count << std::endl; // Start testing locking the mutexes. - eprosima::fastrtps::tmutex_lock_mutex(count); - - std::promise> promise; - std::future> future = promise.get_future(); - std::thread([&] - { - auto now = std::chrono::steady_clock::now(); - bool returned_value = publisher_->write(reinterpret_cast(&sample)); - auto end = std::chrono::steady_clock::now(); - promise.set_value_at_thread_exit( std::pair(returned_value, - std::chrono::duration_cast(end - now))); - }).detach(); - future.wait(); - auto returned_value = future.get(); - // If main mutex cannot be taken, the write fails. - // But for the rest the information is stored and it is as if the samples was sent. - ASSERT_EQ(count == 0 ? false : true, returned_value.first); - std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( - publisher_attr_.qos.m_reliability.max_blocking_time)); - ASSERT_GE(returned_value.second, max_w); - ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); - - eprosima::fastrtps::tmutex_unlock_mutex(count); + if (eprosima::fastrtps::tmutex_lock_mutex(count)) + { + std::promise> promise; + std::future> future = promise.get_future(); + std::thread([&] + { + auto now = std::chrono::steady_clock::now(); + bool returned_value = datawriter_->write(reinterpret_cast(&sample)); + auto end = std::chrono::steady_clock::now(); + promise.set_value_at_thread_exit( std::pair(returned_value, + std::chrono::duration_cast(end - now))); + }).detach(); + future.wait(); + auto returned_value = future.get(); + // If main mutex cannot be taken, the write fails. + // But for the rest the information is stored and it is as if the samples was sent. + ASSERT_EQ(count == 0 ? false : true, returned_value.first); + std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( + datawriter_qos_.reliability().max_blocking_time)); + ASSERT_GE(returned_value.second, max_w); + ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); + + eprosima::fastrtps::tmutex_unlock_mutex(count); + } + else + { + std::cout << "Mutex " << count << " is not a timed lock. Pass.." << std::endl; + } } + + ASSERT_EQ(2, eprosima::fastrtps::tmutex_get_num_mutexes()); + ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); + ASSERT_EQ(2, eprosima::fastrtps::tmutex_get_num_timedlock_type()); } -TEST_F(UserThreadNonBlockedTest, write_async_sample_besteffort) +/*! + * @test Tests the mutexes involved in calling `DataWriter::write()` for publishing a new sample using reliable + * reliability. + */ +TEST_F(UserThreadNonBlockedTest, write_sample_reliable) { - publisher_attr_.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; - publisher_attr_.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - subscriber_attr_.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; + datawriter_qos_.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; + datareader_qos_.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; init(); DummyType sample{1}; @@ -268,49 +340,117 @@ TEST_F(UserThreadNonBlockedTest, write_async_sample_besteffort) // Record the mutexes. eprosima::fastrtps::tmutex_start_recording(); - publisher_->write(reinterpret_cast(&sample)); + datawriter_->write(reinterpret_cast(&sample)); eprosima::fastrtps::tmutex_stop_recording(); + for (size_t count = 0; count < 2; ++count) + { + std::cout << "Testing mutex " << count << std::endl; + // Start testing locking the mutexes. + if (eprosima::fastrtps::tmutex_lock_mutex(count)) + { + std::promise> promise; + std::future> future = promise.get_future(); + std::thread([&] + { + auto now = std::chrono::steady_clock::now(); + bool returned_value = datawriter_->write(reinterpret_cast(&sample)); + auto end = std::chrono::steady_clock::now(); + promise.set_value_at_thread_exit( std::pair(returned_value, + std::chrono::duration_cast(end - now))); + }).detach(); + future.wait(); + auto returned_value = future.get(); + // If main mutex cannot be taken, the write fails. + // But for the rest the information is stored and it is as if the samples was sent. + ASSERT_EQ(count == 0 ? false : true, returned_value.first); + std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( + datawriter_qos_.reliability().max_blocking_time)); + ASSERT_GE(returned_value.second, max_w); + ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); + + eprosima::fastrtps::tmutex_unlock_mutex(count); + } + else + { + std::cout << "Mutex " << count << " is not a timed lock. Pass.." << std::endl; + } + } + ASSERT_EQ(3, eprosima::fastrtps::tmutex_get_num_mutexes()); ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); ASSERT_EQ(3, eprosima::fastrtps::tmutex_get_num_timedlock_type()); +} + +/*! + * @test Tests the mutexes involved in calling `DataWriter::write()` for publishing a new sample using best-effort + * reliability and asynchronous publication mode. + */ +TEST_F(UserThreadNonBlockedTest, write_async_sample_besteffort) +{ + datawriter_qos_.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; + datawriter_qos_.publish_mode().kind = eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE; + datareader_qos_.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; + init(); + + DummyType sample{1}; - for(size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) + // Record the mutexes. + eprosima::fastrtps::tmutex_start_recording(); + + datawriter_->write(reinterpret_cast(&sample)); + + eprosima::fastrtps::tmutex_stop_recording(); + + for (size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) { std::cout << "Testing mutex " << count << std::endl; // Start testing locking the mutexes. - eprosima::fastrtps::tmutex_lock_mutex(count); - - std::promise> promise; - std::future> future = promise.get_future(); - std::thread([&] - { - auto now = std::chrono::steady_clock::now(); - bool returned_value = publisher_->write(reinterpret_cast(&sample)); - auto end = std::chrono::steady_clock::now(); - promise.set_value_at_thread_exit( std::pair(returned_value, - std::chrono::duration_cast(end - now))); - }).detach(); - future.wait(); - auto returned_value = future.get(); - // If main mutex cannot be taken, the write fails. - // But for the rest the information is stored and it is as if the samples was sent. - ASSERT_EQ(count == 0 ? false : true, returned_value.first); - std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( - publisher_attr_.qos.m_reliability.max_blocking_time)); - ASSERT_GE(returned_value.second, max_w); - ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); - - eprosima::fastrtps::tmutex_unlock_mutex(count); + if (eprosima::fastrtps::tmutex_lock_mutex(count)) + { + std::promise> promise; + std::future> future = promise.get_future(); + std::thread([&] + { + auto now = std::chrono::steady_clock::now(); + bool returned_value = datawriter_->write(reinterpret_cast(&sample)); + auto end = std::chrono::steady_clock::now(); + promise.set_value_at_thread_exit( std::pair(returned_value, + std::chrono::duration_cast(end - now))); + }).detach(); + future.wait(); + auto returned_value = future.get(); + // If main mutex cannot be taken, the write fails. + // But for the rest the information is stored and it is as if the samples was sent. + ASSERT_EQ(count == 0 ? false : true, returned_value.first); + std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( + datawriter_qos_.reliability().max_blocking_time)); + ASSERT_GE(returned_value.second, max_w); + ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); + + eprosima::fastrtps::tmutex_unlock_mutex(count); + } + else + { + std::cout << "Mutex " << count << " is not a timed lock. Pass.." << std::endl; + } } + + ASSERT_EQ(3, eprosima::fastrtps::tmutex_get_num_mutexes()); + ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); + ASSERT_EQ(3, eprosima::fastrtps::tmutex_get_num_timedlock_type()); } +/*! + * @test Tests the mutexes involved in calling `DataWriter::write()` for publishing a new sample using reliable + * reliability and asynchronous publication mode. + */ TEST_F(UserThreadNonBlockedTest, write_async_sample_reliable) { - publisher_attr_.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; - publisher_attr_.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE; - subscriber_attr_.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; + datawriter_qos_.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; + datawriter_qos_.publish_mode().kind = eprosima::fastdds::dds::ASYNCHRONOUS_PUBLISH_MODE; + datawriter_qos_.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; init(); DummyType sample{1}; @@ -318,381 +458,461 @@ TEST_F(UserThreadNonBlockedTest, write_async_sample_reliable) // Record the mutexes. eprosima::fastrtps::tmutex_start_recording(); - publisher_->write(reinterpret_cast(&sample)); + datawriter_->write(reinterpret_cast(&sample)); eprosima::fastrtps::tmutex_stop_recording(); - ASSERT_EQ(3, eprosima::fastrtps::tmutex_get_num_mutexes()); - ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); - ASSERT_EQ(3, eprosima::fastrtps::tmutex_get_num_timedlock_type()); - - for(size_t count = 0; count < 2; ++count) + for (size_t count = 0; count < 2; ++count) { std::cout << "Testing mutex " << count << std::endl; // Start testing locking the mutexes. - eprosima::fastrtps::tmutex_lock_mutex(count); - - std::promise> promise; - std::future> future = promise.get_future(); - std::thread([&] - { - auto now = std::chrono::steady_clock::now(); - bool returned_value = publisher_->write(reinterpret_cast(&sample)); - auto end = std::chrono::steady_clock::now(); - promise.set_value_at_thread_exit( std::pair(returned_value, - std::chrono::duration_cast(end - now))); - }).detach(); - future.wait(); - auto returned_value = future.get(); - // If main mutex cannot be taken, the write fails. - // But for the rest the information is stored and it is as if the samples was sent. - ASSERT_EQ(count == 0 ? false : true, returned_value.first); - std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( - publisher_attr_.qos.m_reliability.max_blocking_time)); - ASSERT_GE(returned_value.second, max_w); - ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); - - eprosima::fastrtps::tmutex_unlock_mutex(count); + if (eprosima::fastrtps::tmutex_lock_mutex(count)) + { + std::promise> promise; + std::future> future = promise.get_future(); + std::thread([&] + { + auto now = std::chrono::steady_clock::now(); + bool returned_value = datawriter_->write(reinterpret_cast(&sample)); + auto end = std::chrono::steady_clock::now(); + promise.set_value_at_thread_exit( std::pair(returned_value, + std::chrono::duration_cast(end - now))); + }).detach(); + future.wait(); + auto returned_value = future.get(); + // If main mutex cannot be taken, the write fails. + // But for the rest the information is stored and it is as if the samples was sent. + ASSERT_EQ(count == 0 ? false : true, returned_value.first); + std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( + datawriter_qos_.reliability().max_blocking_time)); + ASSERT_GE(returned_value.second, max_w); + ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); + + eprosima::fastrtps::tmutex_unlock_mutex(count); + } + else + { + std::cout << "Mutex " << count << " is not a timed lock. Pass.." << std::endl; + } } + + ASSERT_EQ(3, eprosima::fastrtps::tmutex_get_num_mutexes()); + ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); + ASSERT_EQ(3, eprosima::fastrtps::tmutex_get_num_timedlock_type()); } +/*! + * @test Tests the mutexes involved in calling `DataReader::read_next_sample()` for reading a new sample using + * best-effort reliability. + */ TEST_F(UserThreadNonBlockedTest, read_sample_besteffort) { - publisher_attr_.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; - subscriber_attr_.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; + datawriter_qos_.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; + datareader_qos_.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; init(); DummyType sample{1}, read_sample; - eprosima::fastrtps::SampleInfo_t read_info; + eprosima::fastdds::dds::SampleInfo read_info; - publisher_->write(reinterpret_cast(&sample)); - publisher_->wait_for_all_acked({0, 100000000}); + datawriter_->write(reinterpret_cast(&sample)); + datawriter_->wait_for_acknowledgments({0, 100000000}); // Record the mutexes. eprosima::fastrtps::tmutex_start_recording(); - subscriber_->readNextData(reinterpret_cast(&read_sample), &read_info); + datareader_->read_next_sample(reinterpret_cast(&read_sample), &read_info); eprosima::fastrtps::tmutex_stop_recording(); - ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_mutexes()); - ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); - ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_timedlock_type()); - - for(size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) + for (size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) { - publisher_->write(reinterpret_cast(&sample)); - publisher_->wait_for_all_acked({0, 100000000}); + datawriter_->write(reinterpret_cast(&sample)); + datawriter_->wait_for_acknowledgments({0, 100000000}); std::cout << "Testing mutex " << count << std::endl; // Start testing locking the mutexes. - eprosima::fastrtps::tmutex_lock_mutex(count); - - std::promise> promise; - std::future> future = promise.get_future(); - std::thread([&] - { - auto now = std::chrono::steady_clock::now(); - bool returned_value = subscriber_->readNextData(reinterpret_cast(&read_sample), &read_info); - auto end = std::chrono::steady_clock::now(); - promise.set_value_at_thread_exit( std::pair(returned_value, - std::chrono::duration_cast(end - now))); - }).detach(); - future.wait(); - auto returned_value = future.get(); - // If main mutex cannot be taken, the write fails. - // But for the rest the information is stored and it is as if the samples was sent. - ASSERT_EQ(count == 0 ? false : true, returned_value.first); - std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( - subscriber_attr_.qos.m_reliability.max_blocking_time)); - ASSERT_GE(returned_value.second, max_w); - ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); - - eprosima::fastrtps::tmutex_unlock_mutex(count); + if (eprosima::fastrtps::tmutex_lock_mutex(count)) + { + std::promise> promise; + std::future> future = promise.get_future(); + std::thread([&] + { + auto now = std::chrono::steady_clock::now(); + eprosima::fastrtps::types::ReturnCode_t returned_value = + datareader_->read_next_sample(reinterpret_cast(&read_sample), &read_info); + auto end = std::chrono::steady_clock::now(); + promise.set_value_at_thread_exit(std::pair(returned_value, + std::chrono::duration_cast(end - now))); + }).detach(); + future.wait(); + auto returned_value = future.get(); + // If main mutex cannot be taken, the write fails. + // But for the rest the information is stored and it is as if the samples was sent. + ASSERT_EQ(count == 0 ? eprosima::fastrtps::types::ReturnCode_t::RETCODE_NO_DATA : + eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK, returned_value.first); + std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( + datareader_qos_.reliability().max_blocking_time)); + ASSERT_GE(returned_value.second, max_w); + ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); + + eprosima::fastrtps::tmutex_unlock_mutex(count); + } + else + { + std::cout << "Mutex " << count << " is not a timed lock. Pass.." << std::endl; + } } + + ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_mutexes()); + ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); + ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_timedlock_type()); } +/*! + * @test Tests the mutexes involved in calling `DataReader::read_next_sample()` for reading a new sample using + * reliable reliability. + */ TEST_F(UserThreadNonBlockedTest, read_sample_reliable) { - publisher_attr_.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; - subscriber_attr_.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; + datawriter_qos_.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; + datareader_qos_.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; init(); DummyType sample{1}, read_sample; - eprosima::fastrtps::SampleInfo_t read_info; + eprosima::fastdds::dds::SampleInfo read_info; - publisher_->write(reinterpret_cast(&sample)); - publisher_->wait_for_all_acked({0, 100000000}); + datawriter_->write(reinterpret_cast(&sample)); + datawriter_->wait_for_acknowledgments({0, 100000000}); // Record the mutexes. eprosima::fastrtps::tmutex_start_recording(); - subscriber_->readNextData(reinterpret_cast(&read_sample), &read_info); + datareader_->read_next_sample(reinterpret_cast(&read_sample), &read_info); eprosima::fastrtps::tmutex_stop_recording(); - ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_mutexes()); - ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); - ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_timedlock_type()); - - for(size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) + for (size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) { - publisher_->write(reinterpret_cast(&sample)); - publisher_->wait_for_all_acked({0, 100000000}); + datawriter_->write(reinterpret_cast(&sample)); + datawriter_->wait_for_acknowledgments({0, 100000000}); std::cout << "Testing mutex " << count << std::endl; // Start testing locking the mutexes. - eprosima::fastrtps::tmutex_lock_mutex(count); - - std::promise> promise; - std::future> future = promise.get_future(); - std::thread([&] - { - auto now = std::chrono::steady_clock::now(); - bool returned_value = subscriber_->readNextData(reinterpret_cast(&read_sample), &read_info); - auto end = std::chrono::steady_clock::now(); - promise.set_value_at_thread_exit( std::pair(returned_value, - std::chrono::duration_cast(end - now))); - }).detach(); - future.wait(); - auto returned_value = future.get(); - // If main mutex cannot be taken, the write fails. - // But for the rest the information is stored and it is as if the samples was sent. - ASSERT_EQ(count == 0 ? false : true, returned_value.first); - std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( - subscriber_attr_.qos.m_reliability.max_blocking_time)); - ASSERT_GE(returned_value.second, max_w); - ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); - - eprosima::fastrtps::tmutex_unlock_mutex(count); + if (eprosima::fastrtps::tmutex_lock_mutex(count)) + { + std::promise> promise; + std::future> future = promise.get_future(); + std::thread([&] + { + auto now = std::chrono::steady_clock::now(); + eprosima::fastrtps::types::ReturnCode_t returned_value = + datareader_->read_next_sample(reinterpret_cast(&read_sample), &read_info); + auto end = std::chrono::steady_clock::now(); + promise.set_value_at_thread_exit( std::pair(returned_value, + std::chrono::duration_cast(end - now))); + }).detach(); + future.wait(); + auto returned_value = future.get(); + // If main mutex cannot be taken, the write fails. + // But for the rest the information is stored and it is as if the samples was sent. + ASSERT_EQ( + count == 0 ? eprosima::fastrtps::types::ReturnCode_t::RETCODE_NO_DATA : eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK, + returned_value.first); + std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( + datareader_qos_.reliability().max_blocking_time)); + ASSERT_GE(returned_value.second, max_w); + ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); + + eprosima::fastrtps::tmutex_unlock_mutex(count); + } + else + { + std::cout << "Mutex " << count << " is not a timed lock. Pass.." << std::endl; + } } + + ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_mutexes()); + ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); + ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_timedlock_type()); } +/*! + * @test Tests the mutexes involved in calling `DataReader::take_next_sample()` for taking a new sample using + * best-effort reliability. + */ TEST_F(UserThreadNonBlockedTest, take_sample_besteffort) { - publisher_attr_.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; - subscriber_attr_.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; + datawriter_qos_.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; + datareader_qos_.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; init(); DummyType sample{1}, read_sample; - eprosima::fastrtps::SampleInfo_t read_info; + eprosima::fastdds::dds::SampleInfo read_info; - publisher_->write(reinterpret_cast(&sample)); - publisher_->wait_for_all_acked({0, 100000000}); + datawriter_->write(reinterpret_cast(&sample)); + datawriter_->wait_for_acknowledgments({0, 100000000}); // Record the mutexes. eprosima::fastrtps::tmutex_start_recording(); - subscriber_->takeNextData(reinterpret_cast(&read_sample), &read_info); + datareader_->take_next_sample(reinterpret_cast(&read_sample), &read_info); eprosima::fastrtps::tmutex_stop_recording(); - ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_mutexes()); - ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); - ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_timedlock_type()); - - for(size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) + for (size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) { - publisher_->write(reinterpret_cast(&sample)); - publisher_->wait_for_all_acked({0, 100000000}); + datawriter_->write(reinterpret_cast(&sample)); + datawriter_->wait_for_acknowledgments({0, 100000000}); std::cout << "Testing mutex " << count << std::endl; // Start testing locking the mutexes. - eprosima::fastrtps::tmutex_lock_mutex(count); - - std::promise> promise; - std::future> future = promise.get_future(); - std::thread([&] - { - auto now = std::chrono::steady_clock::now(); - bool returned_value = subscriber_->takeNextData(reinterpret_cast(&read_sample), &read_info); - auto end = std::chrono::steady_clock::now(); - promise.set_value_at_thread_exit( std::pair(returned_value, - std::chrono::duration_cast(end - now))); - }).detach(); - future.wait(); - auto returned_value = future.get(); - // If main mutex cannot be taken, the write fails. - // But for the rest the information is stored and it is as if the samples was sent. - ASSERT_EQ(count == 0 ? false : true, returned_value.first); - std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( - subscriber_attr_.qos.m_reliability.max_blocking_time)); - ASSERT_GE(returned_value.second, max_w); - ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); - - eprosima::fastrtps::tmutex_unlock_mutex(count); + if (eprosima::fastrtps::tmutex_lock_mutex(count)) + { + std::promise> promise; + std::future> future = promise.get_future(); + std::thread([&] + { + auto now = std::chrono::steady_clock::now(); + eprosima::fastrtps::types::ReturnCode_t returned_value = + datareader_->take_next_sample(reinterpret_cast(&read_sample), &read_info); + auto end = std::chrono::steady_clock::now(); + promise.set_value_at_thread_exit( std::pair(returned_value, + std::chrono::duration_cast(end - now))); + }).detach(); + future.wait(); + auto returned_value = future.get(); + // If main mutex cannot be taken, the write fails. + // But for the rest the information is stored and it is as if the samples was sent. + ASSERT_EQ( + count == 0 ? eprosima::fastrtps::types::ReturnCode_t::RETCODE_NO_DATA : eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK, + returned_value.first); + std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( + datareader_qos_.reliability().max_blocking_time)); + ASSERT_GE(returned_value.second, max_w); + ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); + + eprosima::fastrtps::tmutex_unlock_mutex(count); + } + else + { + std::cout << "Mutex " << count << " is not a timed lock. Pass.." << std::endl; + } } + + ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_mutexes()); + ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); + ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_timedlock_type()); } +/*! + * @test Tests the mutexes involved in calling `DataReader::take_next_sample()` for taking a new sample using + * reliable reliability. + */ TEST_F(UserThreadNonBlockedTest, take_sample_reliable) { - publisher_attr_.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; - subscriber_attr_.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; + datawriter_qos_.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; + datareader_qos_.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; init(); DummyType sample{1}, read_sample; - eprosima::fastrtps::SampleInfo_t read_info; + eprosima::fastdds::dds::SampleInfo read_info; - publisher_->write(reinterpret_cast(&sample)); - publisher_->wait_for_all_acked({0, 100000000}); + datawriter_->write(reinterpret_cast(&sample)); + datawriter_->wait_for_acknowledgments({0, 100000000}); // Record the mutexes. eprosima::fastrtps::tmutex_start_recording(); - subscriber_->takeNextData(reinterpret_cast(&read_sample), &read_info); + datareader_->take_next_sample(reinterpret_cast(&read_sample), &read_info); eprosima::fastrtps::tmutex_stop_recording(); - ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_mutexes()); - ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); - ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_timedlock_type()); - - for(size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) + for (size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) { - publisher_->write(reinterpret_cast(&sample)); - publisher_->wait_for_all_acked({0, 100000000}); + datawriter_->write(reinterpret_cast(&sample)); + datawriter_->wait_for_acknowledgments({0, 100000000}); std::cout << "Testing mutex " << count << std::endl; // Start testing locking the mutexes. - eprosima::fastrtps::tmutex_lock_mutex(count); - - std::promise> promise; - std::future> future = promise.get_future(); - std::thread([&] - { - auto now = std::chrono::steady_clock::now(); - bool returned_value = subscriber_->takeNextData(reinterpret_cast(&read_sample), &read_info); - auto end = std::chrono::steady_clock::now(); - promise.set_value_at_thread_exit( std::pair(returned_value, - std::chrono::duration_cast(end - now))); - }).detach(); - future.wait(); - auto returned_value = future.get(); - // If main mutex cannot be taken, the write fails. - // But for the rest the information is stored and it is as if the samples was sent. - ASSERT_EQ(count == 0 ? false : true, returned_value.first); - std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( - subscriber_attr_.qos.m_reliability.max_blocking_time)); - ASSERT_GE(returned_value.second, max_w); - ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); - - eprosima::fastrtps::tmutex_unlock_mutex(count); + if (eprosima::fastrtps::tmutex_lock_mutex(count)) + { + std::promise> promise; + std::future> future = promise.get_future(); + std::thread([&] + { + auto now = std::chrono::steady_clock::now(); + eprosima::fastrtps::types::ReturnCode_t returned_value = + datareader_->take_next_sample(reinterpret_cast(&read_sample), &read_info); + auto end = std::chrono::steady_clock::now(); + promise.set_value_at_thread_exit( std::pair(returned_value, + std::chrono::duration_cast(end - now))); + }).detach(); + future.wait(); + auto returned_value = future.get(); + // If main mutex cannot be taken, the write fails. + // But for the rest the information is stored and it is as if the samples was sent. + ASSERT_EQ( + count == 0 ? eprosima::fastrtps::types::ReturnCode_t::RETCODE_NO_DATA : eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK, + returned_value.first); + std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( + datareader_qos_.reliability().max_blocking_time)); + ASSERT_GE(returned_value.second, max_w); + ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); + + eprosima::fastrtps::tmutex_unlock_mutex(count); + } + else + { + std::cout << "Mutex " << count << " is not a timed lock. Pass.." << std::endl; + } } + + ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_mutexes()); + ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); + ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_timedlock_type()); } +/*! + * @test Tests the mutexes involved in calling `DataReader::wait_for_unread_message()` for waiting a new sample using + * best-effort reliability. + */ TEST_F(UserThreadNonBlockedTest, wait_for_sample_besteffort) { - publisher_attr_.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; - subscriber_attr_.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; + datawriter_qos_.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; + datareader_qos_.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; init(); DummyType sample{1}, read_sample; - eprosima::fastrtps::SampleInfo_t read_info; + eprosima::fastdds::dds::SampleInfo read_info; - publisher_->write(reinterpret_cast(&sample)); - publisher_->wait_for_all_acked({0, 100000000}); + datawriter_->write(reinterpret_cast(&sample)); + datawriter_->wait_for_acknowledgments({0, 100000000}); // Record the mutexes. eprosima::fastrtps::tmutex_start_recording(); - subscriber_->wait_for_unread_samples({0, 100000000}); + datareader_->wait_for_unread_message({0, 100000000}); eprosima::fastrtps::tmutex_stop_recording(); - ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_mutexes()); - ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); - ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_timedlock_type()); - - for(size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) + for (size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) { - publisher_->write(reinterpret_cast(&sample)); - publisher_->wait_for_all_acked({0, 100000000}); + datawriter_->write(reinterpret_cast(&sample)); + datawriter_->wait_for_acknowledgments({0, 100000000}); std::cout << "Testing mutex " << count << std::endl; // Start testing locking the mutexes. - eprosima::fastrtps::tmutex_lock_mutex(count); - - std::promise> promise; - std::future> future = promise.get_future(); - std::thread([&] - { - auto now = std::chrono::steady_clock::now(); - bool returned_value = subscriber_->wait_for_unread_samples({0, 100000000}); - auto end = std::chrono::steady_clock::now(); - promise.set_value_at_thread_exit( std::pair(returned_value, - std::chrono::duration_cast(end - now))); - }).detach(); - future.wait(); - auto returned_value = future.get(); - // If main mutex cannot be taken, the write fails. - // But for the rest the information is stored and it is as if the samples was sent. - ASSERT_EQ(count == 0 ? false : true, returned_value.first); - std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( - subscriber_attr_.qos.m_reliability.max_blocking_time)); - ASSERT_GE(returned_value.second, max_w); - ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); - - eprosima::fastrtps::tmutex_unlock_mutex(count); + if (eprosima::fastrtps::tmutex_lock_mutex(count)) + { + std::promise> promise; + std::future> future = promise.get_future(); + std::thread([&] + { + auto now = std::chrono::steady_clock::now(); + bool returned_value = datareader_->wait_for_unread_message({0, 100000000}); + auto end = std::chrono::steady_clock::now(); + promise.set_value_at_thread_exit( std::pair(returned_value, + std::chrono::duration_cast(end - now))); + }).detach(); + future.wait(); + auto returned_value = future.get(); + // If main mutex cannot be taken, the write fails. + // But for the rest the information is stored and it is as if the samples was sent. + ASSERT_EQ(count == 0 ? false : true, returned_value.first); + std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( + datareader_qos_.reliability().max_blocking_time)); + ASSERT_GE(returned_value.second, max_w); + ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); + + eprosima::fastrtps::tmutex_unlock_mutex(count); + } + else + { + std::cout << "Mutex " << count << " is not a timed lock. Pass.." << std::endl; + } } + + ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_mutexes()); + ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); + ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_timedlock_type()); } +/*! + * @test Tests the mutexes involved in calling `DataReader::wait_for_unread_message()` for waiting a new sample using + * reliable reliability. + */ TEST_F(UserThreadNonBlockedTest, wait_for_sample_reliable) { - publisher_attr_.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; - subscriber_attr_.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; + datawriter_qos_.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; + datareader_qos_.reliability().kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; init(); DummyType sample{1}, read_sample; - eprosima::fastrtps::SampleInfo_t read_info; + eprosima::fastdds::dds::SampleInfo read_info; - publisher_->write(reinterpret_cast(&sample)); - publisher_->wait_for_all_acked({0, 100000000}); + datawriter_->write(reinterpret_cast(&sample)); + datawriter_->wait_for_acknowledgments({0, 100000000}); // Record the mutexes. eprosima::fastrtps::tmutex_start_recording(); - subscriber_->wait_for_unread_samples({0, 100000000}); + datareader_->wait_for_unread_message({0, 100000000}); eprosima::fastrtps::tmutex_stop_recording(); - ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_mutexes()); - ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); - ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_timedlock_type()); - - for(size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) + for (size_t count = 0; count < eprosima::fastrtps::tmutex_get_num_mutexes(); ++count) { - publisher_->write(reinterpret_cast(&sample)); - publisher_->wait_for_all_acked({0, 100000000}); + datawriter_->write(reinterpret_cast(&sample)); + datawriter_->wait_for_acknowledgments({0, 100000000}); std::cout << "Testing mutex " << count << std::endl; // Start testing locking the mutexes. - eprosima::fastrtps::tmutex_lock_mutex(count); - - std::promise> promise; - std::future> future = promise.get_future(); - std::thread([&] - { - auto now = std::chrono::steady_clock::now(); - bool returned_value = subscriber_->wait_for_unread_samples({0, 100000000}); - auto end = std::chrono::steady_clock::now(); - promise.set_value_at_thread_exit( std::pair(returned_value, - std::chrono::duration_cast(end - now))); - }).detach(); - future.wait(); - auto returned_value = future.get(); - // If main mutex cannot be taken, the write fails. - // But for the rest the information is stored and it is as if the samples was sent. - ASSERT_EQ(count == 0 ? false : true, returned_value.first); - std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( - subscriber_attr_.qos.m_reliability.max_blocking_time)); - ASSERT_GE(returned_value.second, max_w); - ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); - - eprosima::fastrtps::tmutex_unlock_mutex(count); + if (eprosima::fastrtps::tmutex_lock_mutex(count)) + { + std::promise> promise; + std::future> future = promise.get_future(); + std::thread([&] + { + auto now = std::chrono::steady_clock::now(); + bool returned_value = datareader_->wait_for_unread_message({0, 100000000}); + auto end = std::chrono::steady_clock::now(); + promise.set_value_at_thread_exit( std::pair(returned_value, + std::chrono::duration_cast(end - now))); + }).detach(); + future.wait(); + auto returned_value = future.get(); + // If main mutex cannot be taken, the write fails. + // But for the rest the information is stored and it is as if the samples was sent. + ASSERT_EQ(count == 0 ? false : true, returned_value.first); + std::chrono::microseconds max_w(eprosima::fastrtps::rtps::TimeConv::Time_t2MicroSecondsInt64( + datareader_qos_.reliability().max_blocking_time)); + ASSERT_GE(returned_value.second, max_w); + ASSERT_LE(returned_value.second - max_w, std::chrono::milliseconds(1)); + + eprosima::fastrtps::tmutex_unlock_mutex(count); + } + else + { + std::cout << "Mutex " << count << " is not a timed lock. Pass.." << std::endl; + } } + + ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_mutexes()); + ASSERT_EQ(0, eprosima::fastrtps::tmutex_get_num_lock_type()); + ASSERT_EQ(1, eprosima::fastrtps::tmutex_get_num_timedlock_type()); } -int main(int argc, char** argv) +int main( + int argc, + char** argv) { testing::InitGoogleTest(&argc, argv); diff --git a/test/realtime/mutex_testing_tool/Mutex.cpp b/test/realtime/mutex_testing_tool/Mutex.cpp index 751c8be574f..4420474d456 100644 --- a/test/realtime/mutex_testing_tool/Mutex.cpp +++ b/test/realtime/mutex_testing_tool/Mutex.cpp @@ -23,7 +23,8 @@ #include #include -int pthread_mutex_lock(pthread_mutex_t* mutex) +int pthread_mutex_lock( + pthread_mutex_t* mutex) { pid_t pid = eprosima::fastrtps::g_tmutex_thread_pid; @@ -35,15 +36,17 @@ int pthread_mutex_lock(pthread_mutex_t* mutex) } } - if(eprosima::fastrtps::g_origin_lock_func == nullptr) + if (eprosima::fastrtps::g_origin_lock_func == nullptr) { - eprosima::fastrtps::g_origin_lock_func = (int(*)(pthread_mutex_t*))dlsym(RTLD_NEXT, "pthread_mutex_lock"); + eprosima::fastrtps::g_origin_lock_func = (int (*)(pthread_mutex_t*))dlsym(RTLD_NEXT, "pthread_mutex_lock"); } return (*eprosima::fastrtps::g_origin_lock_func)(mutex); } -int pthread_mutex_timedlock(pthread_mutex_t* mutex, const struct timespec* abs_timeout) +int pthread_mutex_timedlock( + pthread_mutex_t* mutex, + const struct timespec* abs_timeout) { pid_t pid = eprosima::fastrtps::g_tmutex_thread_pid; @@ -55,11 +58,36 @@ int pthread_mutex_timedlock(pthread_mutex_t* mutex, const struct timespec* abs_t } } - if(eprosima::fastrtps::g_origin_timedlock_func == nullptr) + if (eprosima::fastrtps::g_origin_timedlock_func == nullptr) { eprosima::fastrtps::g_origin_timedlock_func = - (int(*)(pthread_mutex_t*, const struct timespec*))dlsym(RTLD_NEXT, "pthread_mutex_timedlock"); + (int (*)(pthread_mutex_t*, const struct timespec*))dlsym(RTLD_NEXT, "pthread_mutex_timedlock"); } return (*eprosima::fastrtps::g_origin_timedlock_func)(mutex, abs_timeout); } + +int pthread_mutex_clocklock( + pthread_mutex_t* mutex, + clockid_t clock, + const struct timespec* abs_timeout) +{ + pid_t pid = eprosima::fastrtps::g_tmutex_thread_pid; + + if (0 != pid) + { + if (pid == GET_TID()) + { + eprosima::fastrtps::tmutex_record_mutex_(eprosima::fastrtps::LockType::TIMED_LOCK, mutex); + } + } + + if (eprosima::fastrtps::g_origin_clocklock_func == nullptr) + { + eprosima::fastrtps::g_origin_clocklock_func = + (int (*)(pthread_mutex_t*, clockid_t, const struct timespec*))dlsym(RTLD_NEXT, + "pthread_mutex_clocklock"); + } + + return (*eprosima::fastrtps::g_origin_clocklock_func)(mutex, clock, abs_timeout); +} diff --git a/test/realtime/mutex_testing_tool/Mutex.hpp b/test/realtime/mutex_testing_tool/Mutex.hpp index 3dedd0ca247..0cdb8f62f79 100644 --- a/test/realtime/mutex_testing_tool/Mutex.hpp +++ b/test/realtime/mutex_testing_tool/Mutex.hpp @@ -21,9 +21,18 @@ #define __TEST_REALTIME_MUTEX_HPP__ #include +#include -extern "C" int pthread_mutex_lock(pthread_mutex_t* mutex); +extern "C" int pthread_mutex_lock( + pthread_mutex_t* mutex); -extern "C" int pthread_mutex_timedlock(pthread_mutex_t* mutex, const struct timespec* abs_timeout); +extern "C" int pthread_mutex_timedlock( + pthread_mutex_t* mutex, + const struct timespec* abs_timeout); + +extern "C" int pthread_mutex_clocklock( + pthread_mutex_t* mutex, + clockid_t clock, + const struct timespec* abs_timeout); #endif // __TEST_REALTIME_MUTEX_HPP__ diff --git a/test/realtime/mutex_testing_tool/TMutex.cpp b/test/realtime/mutex_testing_tool/TMutex.cpp index 823a5856ff0..3d701dbffbb 100644 --- a/test/realtime/mutex_testing_tool/TMutex.cpp +++ b/test/realtime/mutex_testing_tool/TMutex.cpp @@ -22,6 +22,7 @@ #include #include #include +#include // TODO contar que solo bloquea una vez y nunca mas despues de timeout. // TODO si se bloquea el dos, que no se bloqueen los posteriores @@ -32,8 +33,11 @@ namespace eprosima { namespace fastrtps { std::atomic g_tmutex_thread_pid(0); +// *INDENT-OFF* Uncrustify parse this as a function declaration instead of a function pointer. int (*g_origin_lock_func)(pthread_mutex_t*){nullptr}; int (*g_origin_timedlock_func)(pthread_mutex_t*, const struct timespec*){nullptr}; +int (*g_origin_clocklock_func)(pthread_mutex_t*, clockid_t, const struct timespec*){nullptr}; +// *INDENT-ON* typedef struct { @@ -46,11 +50,12 @@ constexpr size_t g_tmutex_records_max_length = 30; std::array g_tmutex_records{{{LockType::LOCK, nullptr, 0}}}; int32_t g_tmutex_records_end = -1; -int32_t tmutex_find_record(pthread_mutex_t* mutex) +int32_t tmutex_find_record( + pthread_mutex_t* mutex) { int32_t returned_position = -1; - for(int32_t position = 0; position <= g_tmutex_records_end; ++position) + for (int32_t position = 0; position <= g_tmutex_records_end; ++position) { if (mutex == g_tmutex_records[position].mutex) { @@ -79,7 +84,9 @@ void eprosima::fastrtps::tmutex_stop_recording() g_tmutex_thread_pid = 0; } -void eprosima::fastrtps::tmutex_record_mutex_(LockType type, pthread_mutex_t* mutex) +void eprosima::fastrtps::tmutex_record_mutex_( + LockType type, + pthread_mutex_t* mutex) { assert(0 < g_tmutex_thread_pid); @@ -107,12 +114,12 @@ size_t eprosima::fastrtps::tmutex_get_num_lock_type() { size_t counter = 0; - if(-1 < g_tmutex_records_end) + if (-1 < g_tmutex_records_end) { std::for_each(g_tmutex_records.begin(), g_tmutex_records.begin() + g_tmutex_records_end + 1, [&](const tmutex_record& record) { - if(record.type == LockType::LOCK) + if (record.type == LockType::LOCK) { ++counter; } @@ -126,12 +133,12 @@ size_t eprosima::fastrtps::tmutex_get_num_timedlock_type() { size_t counter = 0; - if(-1 < g_tmutex_records_end) + if (-1 < g_tmutex_records_end) { std::for_each(g_tmutex_records.begin(), g_tmutex_records.begin() + g_tmutex_records_end + 1, [&](const tmutex_record& record) { - if(record.type == LockType::TIMED_LOCK) + if (record.type == LockType::TIMED_LOCK) { ++counter; } @@ -141,23 +148,33 @@ size_t eprosima::fastrtps::tmutex_get_num_timedlock_type() return counter; } -pthread_mutex_t* eprosima::fastrtps::tmutex_get_mutex(const size_t index) +pthread_mutex_t* eprosima::fastrtps::tmutex_get_mutex( + const size_t index) { assert(index <= size_t(g_tmutex_records_end)); return g_tmutex_records[index].mutex; } -void eprosima::fastrtps::tmutex_lock_mutex(const size_t index) +bool eprosima::fastrtps::tmutex_lock_mutex( + const size_t index) { assert(index <= size_t(g_tmutex_records_end)); - - if(g_origin_lock_func != nullptr) + if (LockType::TIMED_LOCK == g_tmutex_records[index].type) { - (*g_origin_lock_func)(g_tmutex_records[index].mutex); + + if (g_origin_lock_func != nullptr) + { + (*g_origin_lock_func)(g_tmutex_records[index].mutex); + } + + return true; } + + return false; } -void eprosima::fastrtps::tmutex_unlock_mutex(const size_t index) +void eprosima::fastrtps::tmutex_unlock_mutex( + const size_t index) { assert(index <= size_t(g_tmutex_records_end)); pthread_mutex_unlock(g_tmutex_records[index].mutex); diff --git a/test/realtime/mutex_testing_tool/TMutex.hpp b/test/realtime/mutex_testing_tool/TMutex.hpp index f4a81914dd0..75fa62da571 100644 --- a/test/realtime/mutex_testing_tool/TMutex.hpp +++ b/test/realtime/mutex_testing_tool/TMutex.hpp @@ -36,11 +36,12 @@ using pid_t = int; #define GET_TID() syscall(SYS_gettid) #else #error "SYS_gettid unavailable on this system" -#endif +#endif // ifdef SYS_gettid #endif //_WIN32 #include +#include namespace eprosima { namespace fastrtps { @@ -55,10 +56,19 @@ enum LockType : int32_t extern std::atomic g_tmutex_thread_pid; //! Store the original pthread_mutex_lock function -extern int (*g_origin_lock_func)(pthread_mutex_t*); +extern int (* g_origin_lock_func)( + pthread_mutex_t*); //! Store the original pthread_mutex_timedlock function -extern int (*g_origin_timedlock_func)(pthread_mutex_t*, const struct timespec*); +extern int (* g_origin_timedlock_func)( + pthread_mutex_t*, + const struct timespec*); + +//! Store the original pthread_mutex_clocklock function +extern int (* g_origin_clocklock_func)( + pthread_mutex_t*, + clockid_t, + const struct timespec*); /*! * @brief Records all mutexes used by the thread that calls this function. @@ -75,26 +85,32 @@ void tmutex_stop_recording(); * @param[in] type Type of the lock used by the mutex. * @param[in] mutex Pointer of the mutex to be recorded. */ -void tmutex_record_mutex_(LockType type, pthread_mutex_t* mutex); +void tmutex_record_mutex_( + LockType type, + pthread_mutex_t* mutex); /*! * @brief Gets the pointer of the selected recorded mutex. * @param[in] index Position of the recorded mutex. * @return Pointer to the selected mutex. */ -pthread_mutex_t* tmutex_get_mutex(size_t index); +pthread_mutex_t* tmutex_get_mutex( + size_t index); /*! * @brief Locks the selected recorded mutex. * @param[in] index Position of the recorded mutex. + * @return true if locking a timed mutex. false otherwise */ -void tmutex_lock_mutex(size_t index); +bool tmutex_lock_mutex( + size_t index); /*! * @brief Unlocks the selected recorded mutex. * @param[in] index Position of the recorded mutex. */ -void tmutex_unlock_mutex(size_t index); +void tmutex_unlock_mutex( + size_t index); /*! * @brief Returns the number of mutex recorded. diff --git a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnAsyncTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnAsyncTests.cpp index 4050360c81c..a29e881cfde 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnAsyncTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnAsyncTests.cpp @@ -268,31 +268,31 @@ TYPED_TEST(FlowControllerPublishModes, async_publish_mode) nullptr != change_writer10.writer_info.previous); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer10); + async.remove_change(&change_writer10, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer9); + async.remove_change(&change_writer9, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer8); + async.remove_change(&change_writer8, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer7); + async.remove_change(&change_writer7, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer6); + async.remove_change(&change_writer6, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer5); + async.remove_change(&change_writer5, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer4); + async.remove_change(&change_writer4, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer3); + async.remove_change(&change_writer3, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer2); + async.remove_change(&change_writer2, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); ASSERT_TRUE(nullptr == change_writer2.writer_info.next && nullptr == change_writer2.writer_info.previous); @@ -356,31 +356,31 @@ TYPED_TEST(FlowControllerPublishModes, async_publish_mode) nullptr != change_writer10.writer_info.previous); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer10); + async.remove_change(&change_writer10, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer9); + async.remove_change(&change_writer9, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer8); + async.remove_change(&change_writer8, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer7); + async.remove_change(&change_writer7, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer6); + async.remove_change(&change_writer6, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer5); + async.remove_change(&change_writer5, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer4); + async.remove_change(&change_writer4, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer3); + async.remove_change(&change_writer3, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer2); + async.remove_change(&change_writer2, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); ASSERT_TRUE(nullptr == change_writer2.writer_info.next && nullptr == change_writer2.writer_info.previous); diff --git a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnLimitedAsyncTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnLimitedAsyncTests.cpp index 8508d952d84..75eb1b9264e 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnLimitedAsyncTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnLimitedAsyncTests.cpp @@ -301,31 +301,31 @@ TYPED_TEST(FlowControllerPublishModes, limited_async_publish_mode) nullptr != change_writer10.writer_info.previous); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer10); + async.remove_change(&change_writer10, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer9); + async.remove_change(&change_writer9, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer8); + async.remove_change(&change_writer8, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer7); + async.remove_change(&change_writer7, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer6); + async.remove_change(&change_writer6, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer5); + async.remove_change(&change_writer5, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer4); + async.remove_change(&change_writer4, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer3); + async.remove_change(&change_writer3, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer2); + async.remove_change(&change_writer2, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); ASSERT_TRUE(nullptr == change_writer2.writer_info.next && nullptr == change_writer2.writer_info.previous); @@ -389,31 +389,31 @@ TYPED_TEST(FlowControllerPublishModes, limited_async_publish_mode) nullptr != change_writer10.writer_info.previous); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer10); + async.remove_change(&change_writer10, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer9); + async.remove_change(&change_writer9, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer8); + async.remove_change(&change_writer8, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer7); + async.remove_change(&change_writer7, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer6); + async.remove_change(&change_writer6, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer5); + async.remove_change(&change_writer5, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer4); + async.remove_change(&change_writer4, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer3); + async.remove_change(&change_writer3, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - async.remove_change(&change_writer2); + async.remove_change(&change_writer2, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); ASSERT_TRUE(nullptr == change_writer2.writer_info.next && nullptr == change_writer2.writer_info.previous); diff --git a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnSyncTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnSyncTests.cpp index d8438a25ae6..3a805c0946a 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnSyncTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnSyncTests.cpp @@ -216,31 +216,31 @@ TYPED_TEST(FlowControllerPublishModes, sync_publish_mode) nullptr != change_writer10.writer_info.previous); writer1.getMutex().unlock(); writer1.getMutex().lock(); - sync.remove_change(&change_writer10); + sync.remove_change(&change_writer10, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - sync.remove_change(&change_writer9); + sync.remove_change(&change_writer9, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - sync.remove_change(&change_writer8); + sync.remove_change(&change_writer8, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - sync.remove_change(&change_writer7); + sync.remove_change(&change_writer7, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - sync.remove_change(&change_writer6); + sync.remove_change(&change_writer6, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - sync.remove_change(&change_writer5); + sync.remove_change(&change_writer5, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - sync.remove_change(&change_writer4); + sync.remove_change(&change_writer4, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - sync.remove_change(&change_writer3); + sync.remove_change(&change_writer3, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); writer1.getMutex().lock(); - sync.remove_change(&change_writer2); + sync.remove_change(&change_writer2, std::chrono::steady_clock::now() + std::chrono::hours(24)); writer1.getMutex().unlock(); ASSERT_TRUE(nullptr == change_writer2.writer_info.next && nullptr == change_writer2.writer_info.previous);