diff --git a/include/fastdds/rtps/history/History.h b/include/fastdds/rtps/history/History.h index 402c2cc84dc..2270cc8802d 100644 --- a/include/fastdds/rtps/history/History.h +++ b/include/fastdds/rtps/history/History.h @@ -322,6 +322,18 @@ class History RTPS_DllAPI virtual void do_release_cache( CacheChange_t* ch) = 0; + /** + * @brief Removes the constness of a const_iterator to obtain a regular iterator. + * + * This function takes a const_iterator as input and returns a regular iterator by removing the constness. + * + * @param c_it The const_iterator to remove constness from. + * + * @return An iterator with the same position as the input const_iterator. + */ + History::iterator remove_iterator_constness( + const_iterator c_it); + }; } // namespace rtps diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp index 6bfa5813c9b..a1edc24ef5a 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -469,45 +469,51 @@ bool DataReaderHistory::remove_change_sub( return false; } + CacheChange_t dummy_change; + dummy_change.instanceHandle = change->instanceHandle; + dummy_change.isRead = change->isRead; + dummy_change.sequenceNumber = change->sequenceNumber; + dummy_change.writerGUID = change->writerGUID; + std::lock_guard guard(*getMutex()); - bool found = false; - InstanceCollection::iterator vit; - if (find_key(change->instanceHandle, vit)) + + const_iterator chit = find_change_nts(change); + if (chit == changesEnd()) { - for (auto chit = vit->second->cache_changes.begin(); chit != vit->second->cache_changes.end(); ++chit) + EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, "Trying to remove a change not in history"); + return false; + } + + auto new_it = ReaderHistory::remove_change_nts(chit); + + if (new_it == changesEnd() || !matches_change(&dummy_change, *new_it)) // Change was successfully removed. + { + InstanceCollection::iterator vit; + if (find_key(dummy_change.instanceHandle, vit)) { - if ((*chit)->sequenceNumber == change->sequenceNumber && - (*chit)->writerGUID == change->writerGUID) - { - assert(it == chit); - it = vit->second->cache_changes.erase(chit); - found = true; + auto in_it = std::find(vit->second->cache_changes.begin(), vit->second->cache_changes.end(), change); - if (change->isRead) + if (vit->second->cache_changes.end() != in_it) + { + assert(it == in_it); + it = vit->second->cache_changes.erase(in_it); + if (dummy_change.isRead) { --counters_.samples_read; } - break; + } + else + { + EPROSIMA_LOG_ERROR(SUBSCRIBER, "Change not found on this key, something is wrong"); } } - } - if (!found) - { - EPROSIMA_LOG_ERROR(SUBSCRIBER, "Change not found on this key, something is wrong"); - } - const_iterator chit = find_change_nts(change); - if (chit == changesEnd()) - { - EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, "Trying to remove a change not in history"); - return false; + m_isHistoryFull = false; + counters_.samples_unread = mp_reader->get_unread_count(); + return true; } - m_isHistoryFull = false; - ReaderHistory::remove_change_nts(chit); - - counters_.samples_unread = mp_reader->get_unread_count(); - return true; + return false; } bool DataReaderHistory::set_next_deadline( @@ -667,33 +673,47 @@ ReaderHistory::iterator DataReaderHistory::remove_change_nts( { if (removal != changesEnd()) { - CacheChange_t* p_sample = *removal; + CacheChange_t* change_ptr = *removal; + CacheChange_t dummy_change; + bool is_fully_assembled = (*removal)->is_fully_assembled(); + dummy_change.instanceHandle = (*removal)->instanceHandle; + dummy_change.isRead = (*removal)->isRead; + dummy_change.sequenceNumber = (*removal)->sequenceNumber; + dummy_change.writerGUID = (*removal)->writerGUID; + + // call the base class + auto ret_val = ReaderHistory::remove_change_nts(removal, release); - if (!has_keys_ || p_sample->is_fully_assembled()) + if (ret_val == changesEnd() || !matches_change(&dummy_change, *ret_val)) // Change was successfully removed. { - // clean any references to this CacheChange in the key state collection - auto it = instances_.find(p_sample->instanceHandle); - - // if keyed and in history must be in the map - // There is a case when the sample could not be in the keyed map. The first received fragment of a - // fragmented sample is stored in the history, and when it is completed it is stored in the keyed map. - // But it can occur it is rejected when the sample is completed and removed without being stored in the - // keyed map. - if (it != instances_.end()) + if (!has_keys_ || is_fully_assembled) { - it->second->cache_changes.remove(p_sample); - if (p_sample->isRead) + // clean any references to this CacheChange in the key state collection + auto it = instances_.find(dummy_change.instanceHandle); + + // if keyed and in history must be in the map + // There is a case when the sample could not be in the keyed map. The first received fragment of a + // fragmented sample is stored in the history, and when it is completed it is stored in the keyed map. + // But it can occur it is rejected when the sample is completed and removed without being stored in the + // keyed map. + if (it != instances_.end()) { - --counters_.samples_read; + it->second->cache_changes.remove(change_ptr); + if (dummy_change.isRead) + { + --counters_.samples_read; + } } } + + counters_.samples_unread = mp_reader->get_unread_count(); + return ret_val; } + + return remove_iterator_constness(removal); } - // call the base class - auto ret_val = ReaderHistory::remove_change_nts(removal, release); - counters_.samples_unread = mp_reader->get_unread_count(); - return ret_val; + return changesEnd(); } ReaderHistory::iterator DataReaderHistory::remove_change_nts( diff --git a/src/cpp/rtps/history/History.cpp b/src/cpp/rtps/history/History.cpp index 1a3ee177fd5..f2b12f971f7 100644 --- a/src/cpp/rtps/history/History.cpp +++ b/src/cpp/rtps/history/History.cpp @@ -74,7 +74,7 @@ History::iterator History::remove_change_nts( { if (nullptr == mp_mutex) { - return changesEnd(); + return remove_iterator_constness(removal); } if (removal == changesEnd()) @@ -124,16 +124,30 @@ bool History::remove_change( #endif // if HAVE_STRICT_REALTIME const_iterator it = find_change_nts(ch); - const_iterator end_it = changesEnd(); - if (it == end_it) + if (it == changesEnd()) { EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, "Trying to remove a change not in history"); return false; } - // remove using the virtual method - return end_it != remove_change_nts(it, max_blocking_time); + // Dummy change just used to compare original change with change returned from remove_change_nts function + CacheChange_t dummy_change; + dummy_change.writerGUID = (*it)->writerGUID; + dummy_change.sequenceNumber = (*it)->sequenceNumber; + + // Remove using the virtual method + History::iterator history_it = remove_change_nts(it, max_blocking_time); + + // If remove_change_nts returns a valid iterator (not end()) and this is the same iterator means that it + // could not remove it so this function should fail + if (history_it != changesEnd() && matches_change(&dummy_change, *history_it)) + { + EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, "Failed to remove a change from history"); + return false; + } + + return true; } bool History::remove_all_changes() @@ -246,6 +260,14 @@ bool History::get_earliest_change( return true; } +History::iterator History::remove_iterator_constness( + const_iterator c_it) +{ + History::iterator it = changesBegin(); + std::advance(it, std::distance(m_changes.cbegin(), c_it)); + return it; +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/history/ReaderHistory.cpp b/src/cpp/rtps/history/ReaderHistory.cpp index db3400ed085..274d07f5485 100644 --- a/src/cpp/rtps/history/ReaderHistory.cpp +++ b/src/cpp/rtps/history/ReaderHistory.cpp @@ -131,14 +131,14 @@ History::iterator ReaderHistory::remove_change_nts( const_iterator removal, bool release) { - if ( mp_reader == nullptr || mp_mutex == nullptr) + if (mp_reader == nullptr || mp_mutex == nullptr) { EPROSIMA_LOG_ERROR(RTPS_WRITER_HISTORY, "You need to create a Writer with this History before removing any changes"); - return changesEnd(); + return remove_iterator_constness(removal); } - if ( removal == changesEnd()) + if (removal == changesEnd()) { EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, "Trying to remove without a proper CacheChange_t referenced"); return changesEnd(); diff --git a/src/cpp/rtps/history/WriterHistory.cpp b/src/cpp/rtps/history/WriterHistory.cpp index 2fcf0ae1e44..1553d35881a 100644 --- a/src/cpp/rtps/history/WriterHistory.cpp +++ b/src/cpp/rtps/history/WriterHistory.cpp @@ -185,10 +185,10 @@ History::iterator WriterHistory::remove_change_nts( { EPROSIMA_LOG_ERROR(RTPS_WRITER_HISTORY, "You need to create a Writer with this History before removing any changes"); - return changesEnd(); + return remove_iterator_constness(removal); } - if ( removal == changesEnd()) + if (removal == changesEnd()) { EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, "Trying to remove without a proper CacheChange_t referenced"); return changesEnd(); @@ -212,7 +212,9 @@ History::iterator WriterHistory::remove_change_nts( return ret_val; } - return changesEnd(); + EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, + "Failed to inform the writer that a change is going to be removed by the history"); + return remove_iterator_constness(removal); } bool WriterHistory::remove_change_g( diff --git a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp index a7a26d77b94..29317c463dd 100644 --- a/test/blackbox/common/BlackboxTestsPubSubHistory.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubHistory.cpp @@ -1058,6 +1058,7 @@ TEST_P(PubSubHistory, ReliableTransientLocalKeepLast1Data300Kb) { PubSubReader reader(TEST_TOPIC_NAME); PubSubWriter writer(TEST_TOPIC_NAME); + auto transport = std::make_shared(); auto data = default_data300kb_data_generator(); auto reader_data = data; @@ -1066,6 +1067,7 @@ TEST_P(PubSubHistory, ReliableTransientLocalKeepLast1Data300Kb) .history_depth(static_cast(data.size())) .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) .durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS) + .disable_builtin_transport().add_user_transport_to_pparams(transport) .mem_policy(mem_policy_).init(); ASSERT_TRUE(writer.isInitialized()); @@ -1080,6 +1082,7 @@ TEST_P(PubSubHistory, ReliableTransientLocalKeepLast1Data300Kb) .history_depth(1) .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) .durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS) + .disable_builtin_transport().add_user_transport_to_pparams(transport) .mem_policy(mem_policy_).init(); ASSERT_TRUE(reader.isInitialized()); diff --git a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h index e595dc7cbe5..1fd5f344dd0 100644 --- a/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h +++ b/test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h @@ -147,6 +147,11 @@ class ReaderHistory return m_changes.cend(); } + iterator changesEnd() + { + return m_changes.end(); + } + virtual iterator remove_change_nts( const_iterator removal, bool release = true) @@ -177,6 +182,22 @@ class ReaderHistory static_cast(ownership_strength); } + bool matches_change( + const CacheChange_t* inner_change, + CacheChange_t* outer_change) + { + return inner_change->sequenceNumber == outer_change->sequenceNumber && + inner_change->writerGUID == outer_change->writerGUID; + } + + iterator remove_iterator_constness( + const_iterator c_it) + { + iterator it = m_changes.begin(); + std::advance(it, std::distance(m_changes.cbegin(), c_it)); + return it; + } + HistoryAttributes m_att; protected: