Skip to content

Commit

Permalink
Correctly call on_liveliness_changed when there are multiple reader…
Browse files Browse the repository at this point in the history
…s on the same topic (#4822) (#4883)

* Correctly call `on_liveliness_changed` when there are multiple readers on the same topic (#4822)

* Refs #21065: Add BB test

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21065: Add fix rework after sync with Miguel

Signed-off-by: Mario Dominguez <[email protected]>

* REfs #21065: Correct windows NIT

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21065: Apply NIT

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
(cherry picked from commit df90943)

# Conflicts:
#	include/fastdds/rtps/writer/LivelinessManager.h
#	src/cpp/rtps/writer/LivelinessManager.cpp
#	test/unittest/rtps/writer/LivelinessManagerTests.cpp

* Solve conflicts

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
Co-authored-by: Mario Domínguez López <[email protected]>
Co-authored-by: Mario Dominguez <[email protected]>
  • Loading branch information
3 people authored Jun 10, 2024
1 parent 0fcad67 commit 88dece6
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 40 deletions.
12 changes: 7 additions & 5 deletions include/fastdds/rtps/writer/LivelinessManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,17 @@ class LivelinessManager

/**
* @brief Removes a writer
* @param guid GUID of the writer
* @param kind Liveliness kind
* @param lease_duration Liveliness lease duration
* @param [in] guid GUID of the writer
* @param [in] kind Liveliness kind
* @param [in] lease_duration Liveliness lease duration
* @param [out] writer_liveliness_status The liveliness status of the writer
* @return True if the writer was successfully removed
*/
bool remove_writer(
GUID_t guid,
LivelinessQosPolicyKind kind,
Duration_t lease_duration);
fastdds::dds::LivelinessQosPolicyKind kind,
Duration_t lease_duration,
LivelinessData::WriterStatus& writer_liveliness_status);

/**
* @brief Asserts liveliness of a writer in the set
Expand Down
8 changes: 6 additions & 2 deletions src/cpp/rtps/builtin/liveliness/WLP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,8 @@ bool WLP::remove_local_writer(

EPROSIMA_LOG_INFO(RTPS_LIVELINESS, W->getGuid().entityId << " from Liveliness Protocol");

LivelinessData::WriterStatus writer_status;

if (W->get_liveliness_kind() == AUTOMATIC_LIVELINESS_QOS)
{
auto it = std::find(
Expand Down Expand Up @@ -764,7 +766,8 @@ bool WLP::remove_local_writer(
if (!pub_liveliness_manager_->remove_writer(
W->getGuid(),
W->get_liveliness_kind(),
W->get_liveliness_lease_duration()))
W->get_liveliness_lease_duration(),
writer_status))
{
EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,
"Could not remove writer " << W->getGuid() << " from liveliness manager");
Expand Down Expand Up @@ -807,7 +810,8 @@ bool WLP::remove_local_writer(
if (!pub_liveliness_manager_->remove_writer(
W->getGuid(),
W->get_liveliness_kind(),
W->get_liveliness_lease_duration()))
W->get_liveliness_lease_duration(),
writer_status))
{
EPROSIMA_LOG_ERROR(RTPS_LIVELINESS,
"Could not remove writer " << W->getGuid() << " from liveliness manager");
Expand Down
14 changes: 13 additions & 1 deletion src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,22 @@ bool StatefulReader::matched_writer_remove(
auto wlp = this->mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
LivelinessData::WriterStatus writer_liveliness_status;
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_);
liveliness_lease_duration_,
writer_liveliness_status);

if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
}
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
}

}
else
{
Expand Down
13 changes: 12 additions & 1 deletion src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,21 @@ bool StatelessReader::matched_writer_remove(
auto wlp = mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
LivelinessData::WriterStatus writer_liveliness_status;
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_);
liveliness_lease_duration_,
writer_liveliness_status);

if (writer_liveliness_status == LivelinessData::WriterStatus::ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, -1, 0);
}
else if (writer_liveliness_status == LivelinessData::WriterStatus::NOT_ALIVE)
{
wlp->update_liveliness_changed_status(writer_guid, this, 0, -1);
}
}
else
{
Expand Down
22 changes: 5 additions & 17 deletions src/cpp/rtps/writer/LivelinessManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,21 @@ bool LivelinessManager::add_writer(

bool LivelinessManager::remove_writer(
GUID_t guid,
LivelinessQosPolicyKind kind,
Duration_t lease_duration)
fastdds::dds::LivelinessQosPolicyKind kind,
Duration_t lease_duration,
LivelinessData::WriterStatus& writer_status)
{
bool removed = false;
LivelinessData::WriterStatus status;

{
// collection guard
std::lock_guard<shared_mutex> _(col_mutex_);
// writers_ elements guard
std::lock_guard<std::mutex> __(mutex_);

removed = writers_.remove_if([guid, kind, lease_duration, &status](LivelinessData& writer)
removed = writers_.remove_if([guid, kind, lease_duration, &writer_status](LivelinessData& writer)
{
status = writer.status;
writer_status = writer.status;
return writer.guid == guid &&
writer.kind == kind &&
writer.lease_duration == lease_duration &&
Expand All @@ -118,18 +118,6 @@ bool LivelinessManager::remove_writer(
return false;
}

if (callback_ != nullptr)
{
if (status == LivelinessData::WriterStatus::ALIVE)
{
callback_(guid, kind, lease_duration, -1, 0);
}
else if (status == LivelinessData::WriterStatus::NOT_ALIVE)
{
callback_(guid, kind, lease_duration, 0, -1);
}
}

std::unique_lock<std::mutex> lock(mutex_);

if (timer_owner_ != nullptr)
Expand Down
41 changes: 41 additions & 0 deletions test/blackbox/common/BlackboxTestsLivelinessQos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1998,6 +1998,47 @@ TEST(LivelinessTests, Reader_Successfully_Asserts_Liveliness_on_a_Disconnected_W
ASSERT_EQ(reader.sub_wait_liveliness_lost_for(1, std::chrono::seconds(4)), 1u);
}

// Regression test of Refs #21065, github issue #4610
TEST(LivelinessTests, correct_liveliness_state_one_writer_multiple_readers)
{
uint8_t num_readers = 2;

// Create one writer participant
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);

// Create a reader participant containing 2 readers
PubSubParticipant<HelloWorldPubSubType> reader(0, num_readers, 0, num_readers);

reader.init_participant();
// Define the reader's lease duration in 1.6 secs
reader.sub_liveliness_lease_duration(eprosima::fastrtps::Time_t(1, 600000000));
// Both readers on the same topic
reader.sub_topic_name(TEST_TOPIC_NAME);

for (size_t i = 0; i < num_readers; i++)
{
// Create Subscribers and readers, one for each writer
reader.init_subscriber(static_cast<unsigned int>(i));
}

// Create writers
writer.lease_duration(c_TimeInfinite, 1)
.liveliness_lease_duration(eprosima::fastrtps::Time_t(1, 0))
.liveliness_kind(eprosima::fastdds::dds::AUTOMATIC_LIVELINESS_QOS)
.liveliness_announcement_period(eprosima::fastrtps::Time_t(0, 500000000))
.init();

// Wait for discovery to occur. Liveliness should be recovered twice,
// one per matched reader.
reader.sub_wait_liveliness_recovered(2);

// Destroy the writer
writer.destroy();

// After 1.6 secs, we should receive a on_liveliness_changed(status lost) on the two readers
ASSERT_EQ(reader.sub_wait_liveliness_lost_for(2, std::chrono::seconds(4)), 2u);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
40 changes: 26 additions & 14 deletions test/unittest/rtps/writer/LivelinessManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,28 @@ TEST_F(LivelinessManagerTests, WriterCannotBeRemovedTwice)
GuidPrefix_t guidP;
guidP.value[0] = 1;
GUID_t guid(guidP, 0);

EXPECT_EQ(liveliness_manager.add_writer(guid, AUTOMATIC_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, AUTOMATIC_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, AUTOMATIC_LIVELINESS_QOS, Duration_t(1)), false);

EXPECT_EQ(liveliness_manager.add_writer(guid, MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, Duration_t(1)), false);

EXPECT_EQ(liveliness_manager.add_writer(guid, MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(1)), false);
LivelinessData::WriterStatus writer_status;

EXPECT_EQ(liveliness_manager.add_writer(guid, fastdds::dds::AUTOMATIC_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, fastdds::dds::AUTOMATIC_LIVELINESS_QOS, Duration_t(1),
writer_status), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, fastdds::dds::AUTOMATIC_LIVELINESS_QOS, Duration_t(1),
writer_status), false);

EXPECT_EQ(liveliness_manager.add_writer(guid, fastdds::dds::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, Duration_t(
1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, fastdds::dds::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, Duration_t(
1), writer_status), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, fastdds::dds::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, Duration_t(
1), writer_status), false);

EXPECT_EQ(liveliness_manager.add_writer(guid, fastdds::dds::MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(1)), true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, fastdds::dds::MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(1),
writer_status),
true);
EXPECT_EQ(liveliness_manager.remove_writer(guid, fastdds::dds::MANUAL_BY_TOPIC_LIVELINESS_QOS, Duration_t(1),
writer_status),
false);
}

//! Tests that the assert_liveliness() method that takes liveliness kind as argument sets the alive state and time
Expand Down Expand Up @@ -479,12 +489,14 @@ TEST_F(LivelinessManagerTests, TimerOwnerRemoved)

GuidPrefix_t guidP;
guidP.value[0] = 1;
LivelinessData::WriterStatus writer_status;

liveliness_manager.add_writer(GUID_t(guidP, 1), AUTOMATIC_LIVELINESS_QOS, Duration_t(0.5));
liveliness_manager.add_writer(GUID_t(guidP, 2), AUTOMATIC_LIVELINESS_QOS, Duration_t(1));

liveliness_manager.assert_liveliness(GUID_t(guidP, 1), AUTOMATIC_LIVELINESS_QOS, Duration_t(0.5));
liveliness_manager.remove_writer(GUID_t(guidP, 1), AUTOMATIC_LIVELINESS_QOS, Duration_t(0.5));
liveliness_manager.assert_liveliness(GUID_t(guidP, 1), fastdds::dds::AUTOMATIC_LIVELINESS_QOS, Duration_t(0.5));
liveliness_manager.remove_writer(GUID_t(guidP, 1), fastdds::dds::AUTOMATIC_LIVELINESS_QOS, Duration_t(
0.5), writer_status);

wait_liveliness_lost(1u);
EXPECT_EQ(writer_losing_liveliness, GUID_t(guidP, 2));
Expand Down

0 comments on commit 88dece6

Please sign in to comment.