Skip to content

Commit

Permalink
Change history remove change return statement (#3858)
Browse files Browse the repository at this point in the history
* Change history remove change return statement

Signed-off-by: Raul Sanchez-Mateos <[email protected]>

* Apply review changes

Signed-off-by: Raul Sanchez-Mateos <[email protected]>

* Refs #19590. Update DataReaderHistory

Signed-off-by: Ricardo González Moreno <[email protected]>

* Refs #19590: Apply suggested changes

Signed-off-by: Raul Sanchez-Mateos <[email protected]>

---------

Signed-off-by: Raul Sanchez-Mateos <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>
Co-authored-by: Ricardo González Moreno <[email protected]>
  • Loading branch information
rsanchez15 and richiware committed Sep 25, 2023
1 parent 8b50a00 commit b913df0
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 56 deletions.
12 changes: 12 additions & 0 deletions include/fastdds/rtps/history/History.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,18 @@ class History
RTPS_DllAPI virtual void do_release_cache(
CacheChange_t* ch) = 0;

/**
* @brief Removes the constness of a const_iterator to obtain a regular iterator.
*
* This function takes a const_iterator as input and returns a regular iterator by removing the constness.
*
* @param c_it The const_iterator to remove constness from.
*
* @return An iterator with the same position as the input const_iterator.
*/
History::iterator remove_iterator_constness(
const_iterator c_it);

};

} // namespace rtps
Expand Down
110 changes: 65 additions & 45 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,45 +469,51 @@ bool DataReaderHistory::remove_change_sub(
return false;
}

CacheChange_t dummy_change;
dummy_change.instanceHandle = change->instanceHandle;
dummy_change.isRead = change->isRead;
dummy_change.sequenceNumber = change->sequenceNumber;
dummy_change.writerGUID = change->writerGUID;

std::lock_guard<RecursiveTimedMutex> guard(*getMutex());
bool found = false;
InstanceCollection::iterator vit;
if (find_key(change->instanceHandle, vit))

const_iterator chit = find_change_nts(change);
if (chit == changesEnd())
{
for (auto chit = vit->second->cache_changes.begin(); chit != vit->second->cache_changes.end(); ++chit)
EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, "Trying to remove a change not in history");
return false;
}

auto new_it = ReaderHistory::remove_change_nts(chit);

if (new_it == changesEnd() || !matches_change(&dummy_change, *new_it)) // Change was successfully removed.
{
InstanceCollection::iterator vit;
if (find_key(dummy_change.instanceHandle, vit))
{
if ((*chit)->sequenceNumber == change->sequenceNumber &&
(*chit)->writerGUID == change->writerGUID)
{
assert(it == chit);
it = vit->second->cache_changes.erase(chit);
found = true;
auto in_it = std::find(vit->second->cache_changes.begin(), vit->second->cache_changes.end(), change);

if (change->isRead)
if (vit->second->cache_changes.end() != in_it)
{
assert(it == in_it);
it = vit->second->cache_changes.erase(in_it);
if (dummy_change.isRead)
{
--counters_.samples_read;
}
break;
}
else
{
EPROSIMA_LOG_ERROR(SUBSCRIBER, "Change not found on this key, something is wrong");
}
}
}
if (!found)
{
EPROSIMA_LOG_ERROR(SUBSCRIBER, "Change not found on this key, something is wrong");
}

const_iterator chit = find_change_nts(change);
if (chit == changesEnd())
{
EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, "Trying to remove a change not in history");
return false;
m_isHistoryFull = false;
counters_.samples_unread = mp_reader->get_unread_count();
return true;
}

m_isHistoryFull = false;
ReaderHistory::remove_change_nts(chit);

counters_.samples_unread = mp_reader->get_unread_count();
return true;
return false;
}

bool DataReaderHistory::set_next_deadline(
Expand Down Expand Up @@ -667,33 +673,47 @@ ReaderHistory::iterator DataReaderHistory::remove_change_nts(
{
if (removal != changesEnd())
{
CacheChange_t* p_sample = *removal;
CacheChange_t* change_ptr = *removal;
CacheChange_t dummy_change;
bool is_fully_assembled = (*removal)->is_fully_assembled();
dummy_change.instanceHandle = (*removal)->instanceHandle;
dummy_change.isRead = (*removal)->isRead;
dummy_change.sequenceNumber = (*removal)->sequenceNumber;
dummy_change.writerGUID = (*removal)->writerGUID;

// call the base class
auto ret_val = ReaderHistory::remove_change_nts(removal, release);

if (!has_keys_ || p_sample->is_fully_assembled())
if (ret_val == changesEnd() || !matches_change(&dummy_change, *ret_val)) // Change was successfully removed.
{
// clean any references to this CacheChange in the key state collection
auto it = instances_.find(p_sample->instanceHandle);

// if keyed and in history must be in the map
// There is a case when the sample could not be in the keyed map. The first received fragment of a
// fragmented sample is stored in the history, and when it is completed it is stored in the keyed map.
// But it can occur it is rejected when the sample is completed and removed without being stored in the
// keyed map.
if (it != instances_.end())
if (!has_keys_ || is_fully_assembled)
{
it->second->cache_changes.remove(p_sample);
if (p_sample->isRead)
// clean any references to this CacheChange in the key state collection
auto it = instances_.find(dummy_change.instanceHandle);

// if keyed and in history must be in the map
// There is a case when the sample could not be in the keyed map. The first received fragment of a
// fragmented sample is stored in the history, and when it is completed it is stored in the keyed map.
// But it can occur it is rejected when the sample is completed and removed without being stored in the
// keyed map.
if (it != instances_.end())
{
--counters_.samples_read;
it->second->cache_changes.remove(change_ptr);
if (dummy_change.isRead)
{
--counters_.samples_read;
}
}
}

counters_.samples_unread = mp_reader->get_unread_count();
return ret_val;
}

return remove_iterator_constness(removal);
}

// call the base class
auto ret_val = ReaderHistory::remove_change_nts(removal, release);
counters_.samples_unread = mp_reader->get_unread_count();
return ret_val;
return changesEnd();
}

ReaderHistory::iterator DataReaderHistory::remove_change_nts(
Expand Down
32 changes: 27 additions & 5 deletions src/cpp/rtps/history/History.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ History::iterator History::remove_change_nts(
{
if (nullptr == mp_mutex)
{
return changesEnd();
return remove_iterator_constness(removal);
}

if (removal == changesEnd())
Expand Down Expand Up @@ -124,16 +124,30 @@ bool History::remove_change(
#endif // if HAVE_STRICT_REALTIME

const_iterator it = find_change_nts(ch);
const_iterator end_it = changesEnd();

if (it == end_it)
if (it == changesEnd())
{
EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, "Trying to remove a change not in history");
return false;
}

// remove using the virtual method
return end_it != remove_change_nts(it, max_blocking_time);
// Dummy change just used to compare original change with change returned from remove_change_nts function
CacheChange_t dummy_change;
dummy_change.writerGUID = (*it)->writerGUID;
dummy_change.sequenceNumber = (*it)->sequenceNumber;

// Remove using the virtual method
History::iterator history_it = remove_change_nts(it, max_blocking_time);

// If remove_change_nts returns a valid iterator (not end()) and this is the same iterator means that it
// could not remove it so this function should fail
if (history_it != changesEnd() && matches_change(&dummy_change, *history_it))
{
EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, "Failed to remove a change from history");
return false;
}

return true;
}

bool History::remove_all_changes()
Expand Down Expand Up @@ -246,6 +260,14 @@ bool History::get_earliest_change(
return true;
}

History::iterator History::remove_iterator_constness(
const_iterator c_it)
{
History::iterator it = changesBegin();
std::advance(it, std::distance<const_iterator>(m_changes.cbegin(), c_it));
return it;
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
Expand Down
6 changes: 3 additions & 3 deletions src/cpp/rtps/history/ReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,14 @@ History::iterator ReaderHistory::remove_change_nts(
const_iterator removal,
bool release)
{
if ( mp_reader == nullptr || mp_mutex == nullptr)
if (mp_reader == nullptr || mp_mutex == nullptr)
{
EPROSIMA_LOG_ERROR(RTPS_WRITER_HISTORY,
"You need to create a Writer with this History before removing any changes");
return changesEnd();
return remove_iterator_constness(removal);
}

if ( removal == changesEnd())
if (removal == changesEnd())
{
EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, "Trying to remove without a proper CacheChange_t referenced");
return changesEnd();
Expand Down
8 changes: 5 additions & 3 deletions src/cpp/rtps/history/WriterHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ History::iterator WriterHistory::remove_change_nts(
{
EPROSIMA_LOG_ERROR(RTPS_WRITER_HISTORY,
"You need to create a Writer with this History before removing any changes");
return changesEnd();
return remove_iterator_constness(removal);
}

if ( removal == changesEnd())
if (removal == changesEnd())
{
EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY, "Trying to remove without a proper CacheChange_t referenced");
return changesEnd();
Expand All @@ -212,7 +212,9 @@ History::iterator WriterHistory::remove_change_nts(
return ret_val;
}

return changesEnd();
EPROSIMA_LOG_INFO(RTPS_WRITER_HISTORY,
"Failed to inform the writer that a change is going to be removed by the history");
return remove_iterator_constness(removal);
}

bool WriterHistory::remove_change_g(
Expand Down
3 changes: 3 additions & 0 deletions test/blackbox/common/BlackboxTestsPubSubHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,7 @@ TEST_P(PubSubHistory, ReliableTransientLocalKeepLast1Data300Kb)
{
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);
auto transport = std::make_shared<UDPv4TransportDescriptor>();

auto data = default_data300kb_data_generator();
auto reader_data = data;
Expand All @@ -1066,6 +1067,7 @@ TEST_P(PubSubHistory, ReliableTransientLocalKeepLast1Data300Kb)
.history_depth(static_cast<int32_t>(data.size()))
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS)
.disable_builtin_transport().add_user_transport_to_pparams(transport)
.mem_policy(mem_policy_).init();

ASSERT_TRUE(writer.isInitialized());
Expand All @@ -1080,6 +1082,7 @@ TEST_P(PubSubHistory, ReliableTransientLocalKeepLast1Data300Kb)
.history_depth(1)
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS)
.disable_builtin_transport().add_user_transport_to_pparams(transport)
.mem_policy(mem_policy_).init();

ASSERT_TRUE(reader.isInitialized());
Expand Down
21 changes: 21 additions & 0 deletions test/mock/rtps/ReaderHistory/fastdds/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ class ReaderHistory
return m_changes.cend();
}

iterator changesEnd()
{
return m_changes.end();
}

virtual iterator remove_change_nts(
const_iterator removal,
bool release = true)
Expand Down Expand Up @@ -177,6 +182,22 @@ class ReaderHistory
static_cast<void>(ownership_strength);
}

bool matches_change(
const CacheChange_t* inner_change,
CacheChange_t* outer_change)
{
return inner_change->sequenceNumber == outer_change->sequenceNumber &&
inner_change->writerGUID == outer_change->writerGUID;
}

iterator remove_iterator_constness(
const_iterator c_it)
{
iterator it = m_changes.begin();
std::advance(it, std::distance<const_iterator>(m_changes.cbegin(), c_it));
return it;
}

HistoryAttributes m_att;

protected:
Expand Down

0 comments on commit b913df0

Please sign in to comment.