Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the safety mutex in Reception in Thread Pool #292

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@
#include <ddsrouter_utils/thread_pool/task/Task.hpp>
#include <ddsrouter_utils/thread_pool/task/TaskId.hpp>
#include <ddsrouter_utils/thread_pool/thread/CustomThread.hpp>
#include <ddsrouter_utils/types/Atomicable.hpp>
#include <ddsrouter_utils/wait/DBQueueWaitHandler.hpp>

namespace eprosima {
namespace ddsrouter {
namespace utils {

// Declarations to make code less verbose
using SlotsMapType = SharedAtomicable<std::map<TaskId, Task>>;

/**
* This class represents a thread pool that can register tasks inside.
*
Expand Down Expand Up @@ -151,12 +155,9 @@ class SlotThreadPool
/**
* @brief Map of tasks indexed by their task Id.
*
* This object is protected by the \c slots_mutex_ mutex.
* This object is protected by itself (atomicable).
*/
std::map<TaskId, Task> slots_;

//! Protects access to \c slots_ .
std::mutex slots_mutex_;
SlotsMapType slots_;

//! Whether the object is currently enabled
std::atomic<bool> enabled_;
Expand Down
60 changes: 31 additions & 29 deletions ddsrouter_utils/src/cpp/thread_pool/pool/SlotThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,27 +81,27 @@ void SlotThreadPool::disable() noexcept
void SlotThreadPool::emit(
const TaskId& task_id)
{
// Lock to access the slot map
std::lock_guard<std::mutex> lock(slots_mutex_);

auto it = slots_.find(task_id);

if (it == slots_.end())
{
throw utils::ValueNotAllowedException(STR_ENTRY << "Slot " << task_id << " not registered.");
}
else
{
task_queue_.produce(it->first);
}
// TODO check if we want this check or we trust the user of this class
// {
// // Lock to access the slot map while searching the task exists
// std::shared_lock<SlotsMapType> lock(slots_);

// auto it = slots_.find(task_id);
// if (it == slots_.end())
// {
// throw utils::ValueNotAllowedException(STR_ENTRY << "Slot " << task_id << " not registered.");
// }
// }

task_queue_.produce(task_id);
}

void SlotThreadPool::slot(
const TaskId& task_id,
Task&& task)
{
// Lock to access the slot map
std::lock_guard<std::mutex> lock(slots_mutex_);
// Lock to access the slot map to modify it
std::unique_lock<SlotsMapType> lock(slots_);

auto it = slots_.find(task_id);

Expand All @@ -126,22 +126,24 @@ void SlotThreadPool::thread_routine_()
logDebug(DDSROUTER_THREAD_POOL, "Thread: " << std::this_thread::get_id() << " free, getting new callback.");
TaskId task_id = task_queue_.consume();

// Lock to access the slot map
slots_mutex_.lock();

auto it = slots_.find(task_id);
// Check the slot is correct
if (it == slots_.end())
{
utils::tsnh(STR_ENTRY << "Slot in Queue must be stored in slots register");
// Lock to access the slot map while executing task
// NOTE: this can end before calling task if the task object is copied, but if it is used with
// a reference, it cannot unlock before calling method
std::shared_lock<SlotsMapType> lock(slots_);

auto it = slots_.find(task_id);
// Check the slot is correct
if (it == slots_.end())
{
throw utils::ValueNotAllowedException(STR_ENTRY << "Slot " << task_id << " in queue not registered.");
}

Task& task = it->second;

logDebug(DDSROUTER_THREAD_POOL, "Thread: " << std::this_thread::get_id() << " executing callback.");
task();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that the time taken for copying a function (in our case, only with functions whose args are not copied) is not so relevant. However, it might be problematic to have the discovery thread stuck for not being able to slot a new task.

}

Task& task = it->second;

slots_mutex_.unlock();

logDebug(DDSROUTER_THREAD_POOL, "Thread: " << std::this_thread::get_id() << " executing callback.");
task();
}
}
catch (const utils::DisabledException& e)
Expand Down