Skip to content

Commit

Permalink
SHM transport: ignore non-existing segment on pop (#3992)
Browse files Browse the repository at this point in the history
* Refs #19500. Changed log category.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19500. Regression test.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19500. Ignore non-existing segment on pop.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19869. Update test as suggested by reviewer.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19869. Linters.

Signed-off-by: Miguel Company <[email protected]>

---------

Signed-off-by: Miguel Company <[email protected]>
  • Loading branch information
MiguelCompany authored Nov 13, 2023
1 parent ded6cd6 commit 34c1b78
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class SharedMemChannelResource : public ChannelResource
}
catch (const std::exception& e)
{
EPROSIMA_LOG_WARNING(RTPS_MSG_IN, e.what());
EPROSIMA_LOG_WARNING(RTPS_TRANSPORT_SHM, e.what());
}
}

Expand Down Expand Up @@ -150,7 +150,7 @@ class SharedMemChannelResource : public ChannelResource
}
else if (alive())
{
EPROSIMA_LOG_WARNING(RTPS_MSG_IN, "Received Message, but no receiver attached");
EPROSIMA_LOG_WARNING(RTPS_TRANSPORT_SHM, "Received Message, but no receiver attached");
}

// Forces message release before waiting for the next
Expand Down Expand Up @@ -184,8 +184,9 @@ class SharedMemChannelResource : public ChannelResource
catch (const std::exception& error)
{
(void)error;
EPROSIMA_LOG_WARNING(RTPS_MSG_OUT, "Error receiving data: " << error.what() << " - " << message_receiver()
<< " (" << this << ")");
EPROSIMA_LOG_WARNING(RTPS_TRANSPORT_SHM,
"Error receiving data: " << error.what() << " - " << message_receiver()
<< " (" << this << ")");
return nullptr;
}
}
Expand Down
14 changes: 13 additions & 1 deletion src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,11 @@ class SharedMemManager :
global_port_->pop(*global_listener_, was_cell_freed);

auto segment = shared_mem_manager_->find_segment(buffer_descriptor.source_segment_id);
if (!segment)
{
// Descriptor points to non-existing segment: discard
continue;
}
auto buffer_node =
static_cast<BufferNode*>(segment->get_address_from_offset(buffer_descriptor.
buffer_node_offset));
Expand Down Expand Up @@ -1305,7 +1310,14 @@ class SharedMemManager :
else // Is a new segment
{
auto segment_name = global_segment_.domain_name() + "_" + id.to_string();
segment = std::make_shared<SharedMemSegment>(boost::interprocess::open_only, segment_name);
try
{
segment = std::make_shared<SharedMemSegment>(boost::interprocess::open_only, segment_name);
}
catch (std::exception&)
{
return segment;
}
auto segment_wrapper = std::make_shared<SegmentWrapper>(shared_from_this(), segment, id, segment_name);

ids_segments_[id.get()] = segment_wrapper;
Expand Down
82 changes: 82 additions & 0 deletions test/blackbox/common/BlackboxTestsTransportSHM.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
#include "PubSubReader.hpp"
#include "PubSubWriter.hpp"

#include "./mock/BlackboxMockConsumer.h"

#include <gtest/gtest.h>

#include <fastdds/dds/log/Log.hpp>

#include <rtps/transport/shared_mem/test_SharedMemTransportDescriptor.h>
#include <fastdds/rtps/transport/UDPv4TransportDescriptor.h>

Expand Down Expand Up @@ -70,6 +74,84 @@ TEST(SHM, TransportPubSub)
reader.wait_participant_undiscovery();
}

// Regression test for redmine #19500
TEST(SHM, IgnoreNonExistentSegment)
{
using namespace eprosima::fastdds::dds;

// Set up log
BlackboxMockConsumer* helper_consumer = new BlackboxMockConsumer();
Log::ClearConsumers(); // Remove default consumers
Log::RegisterConsumer(std::unique_ptr<LogConsumer>(helper_consumer)); // Registering a consumer transfer ownership
// Filter specific message
Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Warning);
Log::SetCategoryFilter(std::regex("RTPS_TRANSPORT_SHM"));
Log::SetErrorStringFilter(std::regex("Error receiving data.*"));

PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);

writer
.asynchronously(eprosima::fastrtps::SYNCHRONOUS_PUBLISH_MODE)
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS)
.history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS)
.disable_builtin_transport()
.add_user_transport_to_pparams(std::make_shared<SharedMemTransportDescriptor>())
.init();
ASSERT_TRUE(writer.isInitialized());

reader
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS)
.history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS)
.disable_builtin_transport()
.add_user_transport_to_pparams(std::make_shared<SharedMemTransportDescriptor>())
.init();

ASSERT_TRUE(reader.isInitialized());

reader.wait_discovery();

// Create and quickly destroy several participants in several threads
std::vector<std::thread> threads;
for (size_t i = 0; i < 10; i++)
{
threads.push_back(std::thread([]()
{
constexpr size_t num_parts = 10;
for (size_t i = 0; i < num_parts; ++i)
{
PubSubWriter<Data1mbPubSubType> late_writer(TEST_TOPIC_NAME);
late_writer
.asynchronously(eprosima::fastrtps::SYNCHRONOUS_PUBLISH_MODE)
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.disable_builtin_transport()
.add_user_transport_to_pparams(std::make_shared<SharedMemTransportDescriptor>())
.init();
ASSERT_TRUE(late_writer.isInitialized());
}
}));
}

// Destroy the writer participant.
writer.destroy();

// Check that reader receives the unmatched.
reader.wait_participant_undiscovery();

for (auto& thread : threads)
{
thread.join();
}
// Check logs
Log::Flush();
EXPECT_EQ(helper_consumer->ConsumedEntries().size(), 0);

// Clean-up
Log::Reset(); // This calls to ClearConsumers, which deletes the registered consumer
}

TEST(SHM, Test300KFragmentation)
{
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
Expand Down

0 comments on commit 34c1b78

Please sign in to comment.