Skip to content

Commit

Permalink
Setting infraestructure for naming threads (#3821)
Browse files Browse the repository at this point in the history
* Refs #19375. Added internal header for threading utilities.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Added empty implementation for set_name_to_current_thread.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Added name on Log thread.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Added name on shm watchdog thread.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Added name for filewatch threads.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Added name on ResourceEvent threads.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Added name on udp reception thread.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Added name on shm reception thread.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Added name on data-sharing reception thread.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Added name for TCP threads.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Added name on FlowController thread.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Added name for SHM dump threads.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Added initialization callback to LogTopic.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. SecurityManager refactor to receive plugin factory by dependency injection.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Allow easy overload of creation of builtin plugins.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. RTPSParticipantImpl configures the logging thread.

Signed-off-by: Miguel Company <[email protected]>

* Refs #19375. Uncrustify.

Signed-off-by: Miguel Company <[email protected]>

---------

Signed-off-by: Miguel Company <[email protected]>
  • Loading branch information
MiguelCompany authored and EduPonz committed Nov 20, 2023
1 parent 0095373 commit 1e3dbb3
Show file tree
Hide file tree
Showing 34 changed files with 438 additions and 171 deletions.
9 changes: 7 additions & 2 deletions include/fastdds/rtps/resources/ResourceEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
#include <fastrtps/utils/TimedMutex.hpp>
#include <fastrtps/utils/TimedConditionVariable.hpp>

#include <thread>
#include <atomic>
#include <functional>
#include <thread>
#include <vector>

namespace eprosima {
Expand All @@ -49,8 +50,12 @@ class ResourceEvent

/*!
* @brief Method to initialize the internal thread.
*
* @param[in] configure_cb Function to be called in the context of the started thread
* before calling the internal service routine.
*/
void init_thread();
void init_thread(
std::function<void()> configure_cb = {});

void stop_thread();

Expand Down
3 changes: 3 additions & 0 deletions src/cpp/fastdds/log/Log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fastdds/dds/log/StdoutErrConsumer.hpp>
#include <fastdds/dds/log/Colors.hpp>
#include <utils/SystemInfo.hpp>
#include <utils/threading.hpp>

namespace eprosima {
namespace fastdds {
Expand Down Expand Up @@ -258,6 +259,8 @@ struct LogResources

void run()
{
set_name_to_current_thread("dds.log");

std::unique_lock<std::mutex> guard(cv_mutex_);

while (logging_)
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/DataSharing/DataSharingListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <rtps/DataSharing/DataSharingListener.hpp>
#include <fastdds/rtps/reader/RTPSReader.h>
#include <utils/threading.hpp>

#include <memory>
#include <mutex>
Expand Down Expand Up @@ -49,6 +50,8 @@ DataSharingListener::~DataSharingListener()

void DataSharingListener::run()
{
set_name_to_current_thread("dds.dsha.%u", reader_->getGuid().entityId.to_uint32() & 0x0000FFFF);

std::unique_lock<Segment::mutex> lock(notification_->notification_->notification_mutex, std::defer_lock);
while (is_running_.load())
{
Expand Down
8 changes: 7 additions & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@

#include <rtps/builtin/discovery/database/backup/SharedBackupFunctions.hpp>

#include <utils/threading.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {
Expand Down Expand Up @@ -160,7 +162,11 @@ bool PDPServer::init(
getRTPSParticipant()->enableReader(edp->publications_reader_.first);

// Initialize server dedicated thread.
resource_event_thread_.init_thread();
uint32_t id_for_thread = static_cast<uint32_t>(getRTPSParticipant()->getRTPSParticipantAttributes().participantID);
resource_event_thread_.init_thread([id_for_thread]()
{
set_name_to_current_thread("dds.ds_ev.%u", id_for_thread);
});

/*
Given the fact that a participant is either a client or a server the
Expand Down
26 changes: 14 additions & 12 deletions src/cpp/rtps/flowcontrol/FlowControllerFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,26 @@ void FlowControllerFactory::init(
pure_sync_flow_controller_name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerPureSyncPublishMode,
FlowControllerFifoSchedule>(participant_, nullptr))));
FlowControllerFifoSchedule>(participant_, nullptr, 0))));
// SyncFlowController -> used by rest of besteffort writers.
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
sync_flow_controller_name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerSyncPublishMode,
FlowControllerFifoSchedule>(participant_, nullptr))));
FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++))));
// AsyncFlowController
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
async_flow_controller_name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerFifoSchedule>(participant_, nullptr))));
FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++))));

#ifdef FASTDDS_STATISTICS
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
async_statistics_flow_controller_name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerFifoSchedule>(participant_, nullptr))));
FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++))));
#endif // ifndef FASTDDS_STATISTICS
}

Expand All @@ -67,31 +67,32 @@ void FlowControllerFactory::register_flow_controller (
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerLimitedAsyncPublishMode,
FlowControllerFifoSchedule>(participant_, &flow_controller_descr))));
FlowControllerFifoSchedule>(participant_,
&flow_controller_descr, async_controller_index_++))));
break;
case FlowControllerSchedulerPolicy::ROUND_ROBIN:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerLimitedAsyncPublishMode,
FlowControllerRoundRobinSchedule>(participant_,
&flow_controller_descr))));
&flow_controller_descr, async_controller_index_++))));
break;
case FlowControllerSchedulerPolicy::HIGH_PRIORITY:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerLimitedAsyncPublishMode,
FlowControllerHighPrioritySchedule>(participant_,
&flow_controller_descr))));
&flow_controller_descr, async_controller_index_++))));
break;
case FlowControllerSchedulerPolicy::PRIORITY_WITH_RESERVATION:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerLimitedAsyncPublishMode,
FlowControllerPriorityWithReservationSchedule>(participant_,
&flow_controller_descr))));
&flow_controller_descr, async_controller_index_++))));
break;
default:
assert(false);
Expand All @@ -106,31 +107,32 @@ void FlowControllerFactory::register_flow_controller (
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerFifoSchedule>(participant_, &flow_controller_descr))));
FlowControllerFifoSchedule>(participant_,
&flow_controller_descr, async_controller_index_++))));
break;
case FlowControllerSchedulerPolicy::ROUND_ROBIN:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerRoundRobinSchedule>(participant_,
&flow_controller_descr))));
&flow_controller_descr, async_controller_index_++))));
break;
case FlowControllerSchedulerPolicy::HIGH_PRIORITY:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerHighPrioritySchedule>(participant_,
&flow_controller_descr))));
&flow_controller_descr, async_controller_index_++))));
break;
case FlowControllerSchedulerPolicy::PRIORITY_WITH_RESERVATION:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerPriorityWithReservationSchedule>(participant_,
&flow_controller_descr))));
&flow_controller_descr, async_controller_index_++))));
break;
default:
assert(false);
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/flowcontrol/FlowControllerFactory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class FlowControllerFactory
//! Stores the created flow controllers.
std::map<std::string, std::unique_ptr<FlowController>> flow_controllers_;

//! Counter used for thread identification
uint32_t async_controller_index_ = 0;

};

} // namespace rtps
Expand Down
30 changes: 23 additions & 7 deletions src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
#ifndef _RTPS_FLOWCONTROL_FLOWCONTROLLERIMPL_HPP_
#define _RTPS_FLOWCONTROL_FLOWCONTROLLERIMPL_HPP_

#include "FlowController.hpp"
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/writer/RTPSWriter.h>
#include <fastrtps/utils/TimedMutex.hpp>
#include <fastrtps/utils/TimedConditionVariable.hpp>

#include <atomic>
#include <cassert>
#include <chrono>
#include <map>
#include <thread>
#include <unordered_map>

#include "FlowController.hpp"
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/writer/RTPSWriter.h>
#include <fastrtps/utils/TimedConditionVariable.hpp>
#include <fastrtps/utils/TimedMutex.hpp>

#include <rtps/participant/RTPSParticipantImpl.h>
#include <utils/threading.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {
Expand Down Expand Up @@ -926,11 +929,19 @@ class FlowControllerImpl : public FlowController

FlowControllerImpl(
fastrtps::rtps::RTPSParticipantImpl* participant,
const FlowControllerDescriptor* descriptor
const FlowControllerDescriptor* descriptor,
uint32_t async_index
)
: participant_(participant)
, async_mode(participant, descriptor)
, participant_id_(0)
, async_index_(async_index)
{
if (nullptr != participant)
{
participant_id_ = static_cast<uint32_t>(participant->getRTPSParticipantAttributes().participantID);
}

uint32_t limitation = get_max_payload();

if (std::numeric_limits<uint32_t>::max() != limitation)
Expand Down Expand Up @@ -1328,6 +1339,8 @@ class FlowControllerImpl : public FlowController
*/
void run()
{
set_name_to_current_thread("dds.asyn.%u.%u", participant_id_, async_index_);

while (async_mode.running)
{
// There are writers interested in removing a sample.
Expand Down Expand Up @@ -1470,6 +1483,9 @@ class FlowControllerImpl : public FlowController

// async_mode must be destroyed before sched.
publish_mode async_mode;

uint32_t participant_id_ = 0;
uint32_t async_index_ = 0;
};

} // namespace rtps
Expand Down
22 changes: 20 additions & 2 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/persistence/PersistenceService.h>
#include <statistics/rtps/GuidUtils.hpp>
#include <utils/threading.hpp>

#if HAVE_SECURITY
#include <security/logging/LogTopic.h>
#endif // HAVE_SECURITY

namespace eprosima {
namespace fastrtps {
Expand Down Expand Up @@ -137,7 +142,7 @@ RTPSParticipantImpl::RTPSParticipantImpl(
, internal_metatraffic_locators_(false)
, internal_default_locators_(false)
#if HAVE_SECURITY
, m_security_manager(this)
, m_security_manager(this, *this)
#endif // if HAVE_SECURITY
, mp_participantListener(plisten)
, mp_userParticipant(par)
Expand Down Expand Up @@ -239,7 +244,11 @@ RTPSParticipantImpl::RTPSParticipantImpl(
}

mp_userParticipant->mp_impl = this;
mp_event_thr.init_thread();
uint32_t id_for_thread = static_cast<uint32_t>(m_att.participantID);
mp_event_thr.init_thread([id_for_thread]()
{
set_name_to_current_thread("dds.ev.%u", id_for_thread);
});

if (!networkFactoryHasRegisteredTransports())
{
Expand Down Expand Up @@ -2196,6 +2205,15 @@ bool RTPSParticipantImpl::is_security_enabled_for_reader(
return false;
}

security::Logging* RTPSParticipantImpl::create_builtin_logging_plugin()
{
return new security::LogTopic([this]()
{
uint32_t participant_id = static_cast<uint32_t>(m_att.participantID);
set_name_to_current_thread("dds.slog.%u", participant_id);
});
}

#endif // if HAVE_SECURITY

PDP* RTPSParticipantImpl::pdp()
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include <fastdds/rtps/Endpoint.h>
#include <fastdds/rtps/security/accesscontrol/ParticipantSecurityAttributes.h>
#include <rtps/security/SecurityManager.h>
#include <rtps/security/SecurityPluginFactory.h>
#endif // if HAVE_SECURITY

namespace eprosima {
Expand Down Expand Up @@ -106,6 +107,9 @@ class WLP;
*/
class RTPSParticipantImpl
: public fastdds::statistics::StatisticsParticipantImpl
#if HAVE_SECURITY
, private security::SecurityPluginFactory
#endif // if HAVE_SECURITY
{
/*
Receiver Control block is a struct we use to encapsulate the resources that take part in message reception.
Expand Down Expand Up @@ -398,6 +402,8 @@ class RTPSParticipantImpl
bool is_security_enabled_for_reader(
const ReaderAttributes& reader_attributes);

security::Logging* create_builtin_logging_plugin() override;

#endif // if HAVE_SECURITY

PDPSimple* pdpsimple();
Expand Down
12 changes: 10 additions & 2 deletions src/cpp/rtps/resources/ResourceEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,23 @@ void ResourceEvent::do_timer_actions()
}
}

void ResourceEvent::init_thread()
void ResourceEvent::init_thread(
std::function<void()> configure_cb)
{
std::lock_guard<TimedMutex> lock(mutex_);

allow_vector_manipulation_ = false;
stop_.store(false);
resize_collections();

thread_ = std::thread(&ResourceEvent::event_service, this);
thread_ = std::thread([this, configure_cb]()
{
if (configure_cb)
{
configure_cb();
}
event_service();
});
}

} /* namespace rtps */
Expand Down
Loading

0 comments on commit 1e3dbb3

Please sign in to comment.