Skip to content

Commit

Permalink
Notify datasharing listener also when intraprocess (#3875)
Browse files Browse the repository at this point in the history
* Refs #19570: Regression Test

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #19570: Notify datasharing listener also when intraprocess

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #19570: Address reviewer changes

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
  • Loading branch information
Mario-DL authored Oct 4, 2023
1 parent b84825a commit 9e69e26
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ bool StatefulReader::matched_writer_add(
wp->lost_changes_update(last_seq + 1);
}
}
else if (!is_same_process)
else
{
// simulate a notification to force reading of transient changes
datasharing_listener_->notify(false);
Expand Down
67 changes: 67 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsDataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,73 @@ TEST_P(DDSDataReader, ConsistentTotalUnreadAfterGetFirstUntakenInfo)
ASSERT_EQ(result, ReturnCode_t::RETCODE_OK) << "Reader's unread count is: " << reader.get_unread_count();
}

//! Regression test for Issues #3822 Github #3875
//! This test needs to late join a reader in the same process.
//! Not setting this test as parametrized since it only makes sense in intraprocess.
//! Note: Without the fix, the test fails ~1/10 times, it is encouraged to launch
//! the test with --retest-until-fail 50
TEST(DDSDataReader, ConsistentReliabilityWhenIntraprocess)
{
//! Manually set intraprocess
LibrarySettingsAttributes library_settings;
library_settings.intraprocess_delivery = eprosima::fastrtps::INTRAPROCESS_FULL;
xmlparser::XMLProfileManager::library_settings(library_settings);

auto participant = DomainParticipantFactory::get_instance()->create_participant(
(uint32_t)GET_PID() % 230,
DomainParticipantFactory::get_instance()->get_default_participant_qos(), nullptr,
eprosima::fastdds::dds::StatusMask::none());

eprosima::fastdds::dds::TypeSupport t_type{ new HelloWorldPubSubType() };
ASSERT_TRUE(t_type.register_type( participant ) == ReturnCode_t::RETCODE_OK);

auto topic = participant->create_topic( TEST_TOPIC_NAME, t_type.get_type_name(),
participant->get_default_topic_qos());

// create publisher and writer
auto publisher = participant->create_publisher( participant->get_default_publisher_qos());

auto writer_qos = eprosima::fastdds::dds::DATAWRITER_QOS_DEFAULT;
writer_qos.durability().kind = DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS;
writer_qos.reliability().kind = ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS;
auto writer = publisher->create_datawriter( topic, writer_qos );

auto data = HelloWorld{};
ASSERT_TRUE(writer->write( &data ));

std::this_thread::sleep_for(std::chrono::milliseconds(200));

// create a late joiner subscriber and reader
auto subscriber = participant->create_subscriber( participant->get_default_subscriber_qos());
auto reader_qos = eprosima::fastdds::dds::DATAREADER_QOS_DEFAULT;
reader_qos.durability().kind = DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS;
reader_qos.reliability().kind = ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS;
auto reader = subscriber->create_datareader( topic, reader_qos );

eprosima::fastdds::dds::SubscriptionMatchedStatus status;
reader->get_subscription_matched_status(status);
ASSERT_GT(status.total_count, 0);

// wait for message
uint64_t unread_count = 0;
auto t0 = std::chrono::steady_clock::now();
while (unread_count <= 0 &&
(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - t0)).count() < 2)
{
unread_count = reader->get_unread_count(true);
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ));
}

participant->delete_contained_entities();
DomainParticipantFactory::get_instance()->delete_participant(participant);

ASSERT_TRUE(unread_count > 0);

//! Reset back to INTRAPROCESS_OFF
library_settings.intraprocess_delivery = eprosima::fastrtps::INTRAPROCESS_OFF;
xmlparser::XMLProfileManager::library_settings(library_settings);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down

0 comments on commit 9e69e26

Please sign in to comment.