Skip to content

Commit

Permalink
Update BS_thread_pool_light.hpp to version 3.5.0 (#1341)
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumekln authored Jul 12, 2023
1 parent 8fd1d9d commit 81e9e8e
Showing 1 changed file with 45 additions and 43 deletions.
88 changes: 45 additions & 43 deletions third_party/BS_thread_pool_light.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@
/**
* @file BS_thread_pool_light.hpp
* @author Barak Shoshany ([email protected]) (http://baraksh.com)
* @version 3.3.0
* @date 2022-08-03
* @copyright Copyright (c) 2022 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021)
* @version 3.5.0
* @date 2023-05-25
* @copyright Copyright (c) 2023 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021)
*
* @brief BS::thread_pool_light: a fast, lightweight, and easy-to-use C++17 thread pool library. This header file contains a light version of the main library, for use when advanced features are not needed.
*/

#define BS_THREAD_POOL_VERSION "v3.3.0 (2022-08-03) [light]"
#define BS_THREAD_POOL_LIGHT_VERSION "v3.5.0 (2023-05-25)"

#include <atomic> // std::atomic
#include <condition_variable> // std::condition_variable
#include <exception> // std::current_exception
#include <functional> // std::bind, std::function, std::invoke
Expand Down Expand Up @@ -135,12 +134,10 @@ class [[nodiscard]] thread_pool_light
template <typename F, typename... A>
void push_task(F&& task, A&&... args)
{
std::function<void()> task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...);
{
const std::scoped_lock tasks_lock(tasks_mutex);
tasks.push(task_function);
tasks.push(std::bind(std::forward<F>(task), std::forward<A>(args)...)); // cppcheck-suppress ignoredReturnValue
}
++tasks_total;
task_available_cv.notify_one();
}

Expand All @@ -157,10 +154,9 @@ class [[nodiscard]] thread_pool_light
template <typename F, typename... A, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>
[[nodiscard]] std::future<R> submit(F&& task, A&&... args)
{
std::function<R()> task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...);
std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
push_task(
[task_function, task_promise]
[task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...), task_promise]
{
try
{
Expand Down Expand Up @@ -189,13 +185,13 @@ class [[nodiscard]] thread_pool_light
}

/**
* @brief Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are currently running in the threads and those that are still waiting in the queue. Note: To wait for just one specific task, use submit() instead, and call the wait() member function of the generated future.
* @brief Wait for tasks to be completed, both those that are currently running in the threads and those that are still waiting in the queue. Note: To wait for just one specific task, use submit() instead, and call the wait() member function of the generated future.
*/
void wait_for_tasks()
{
std::unique_lock tasks_lock(tasks_mutex);
waiting = true;
std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
task_done_cv.wait(tasks_lock, [this] { return (tasks_total == 0); });
tasks_done_cv.wait(tasks_lock, [this] { return !tasks_running && tasks.empty(); });
waiting = false;
}

Expand All @@ -209,7 +205,10 @@ class [[nodiscard]] thread_pool_light
*/
void create_threads()
{
running = true;
{
const std::scoped_lock tasks_lock(tasks_mutex);
workers_running = true;
}
for (concurrency_t i = 0; i < thread_count; ++i)
{
threads[i] = std::thread(&thread_pool_light::worker, this);
Expand All @@ -221,7 +220,10 @@ class [[nodiscard]] thread_pool_light
*/
void destroy_threads()
{
running = false;
{
const std::scoped_lock tasks_lock(tasks_mutex);
workers_running = false;
}
task_available_cv.notify_all();
for (concurrency_t i = 0; i < thread_count; ++i)
{
Expand All @@ -235,7 +237,7 @@ class [[nodiscard]] thread_pool_light
* @param thread_count_ The parameter passed to the constructor. If the parameter is a positive number, then the pool will be created with this number of threads. If the parameter is non-positive, or a parameter was not supplied (in which case it will have the default value of 0), then the pool will be created with the total number of hardware threads available, as obtained from std::thread::hardware_concurrency(). If the latter returns a non-positive number for some reason, then the pool will be created with just one thread.
* @return The number of threads to use for constructing the pool.
*/
[[nodiscard]] concurrency_t determine_thread_count(const concurrency_t thread_count_)
[[nodiscard]] concurrency_t determine_thread_count(const concurrency_t thread_count_) const
{
if (thread_count_ > 0)
return thread_count_;
Expand All @@ -253,22 +255,22 @@ class [[nodiscard]] thread_pool_light
*/
void worker()
{
while (running)
std::function<void()> task;
while (true)
{
std::function<void()> task;
std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
task_available_cv.wait(tasks_lock, [this] { return !tasks.empty() || !running; });
if (running)
{
task = std::move(tasks.front());
tasks.pop();
tasks_lock.unlock();
task();
tasks_lock.lock();
--tasks_total;
if (waiting)
task_done_cv.notify_one();
}
std::unique_lock tasks_lock(tasks_mutex);
task_available_cv.wait(tasks_lock, [this] { return !tasks.empty() || !workers_running; });
if (!workers_running)
break;
task = std::move(tasks.front());
tasks.pop();
++tasks_running;
tasks_lock.unlock();
task();
tasks_lock.lock();
--tasks_running;
if (waiting && !tasks_running && tasks.empty())
tasks_done_cv.notify_all();
}
}

Expand All @@ -277,29 +279,24 @@ class [[nodiscard]] thread_pool_light
// ============

/**
* @brief An atomic variable indicating to the workers to keep running. When set to false, the workers permanently stop working.
*/
std::atomic<bool> running = false;

/**
* @brief A condition variable used to notify worker() that a new task has become available.
* @brief A condition variable to notify worker() that a new task has become available.
*/
std::condition_variable task_available_cv = {};

/**
* @brief A condition variable used to notify wait_for_tasks() that a tasks is done.
* @brief A condition variable to notify wait_for_tasks() that the tasks are done.
*/
std::condition_variable task_done_cv = {};
std::condition_variable tasks_done_cv = {};

/**
* @brief A queue of tasks to be executed by the threads.
*/
std::queue<std::function<void()>> tasks = {};

/**
* @brief An atomic variable to keep track of the total number of unfinished tasks - either still in the queue, or running in a thread.
* @brief A counter for the total number of currently running tasks.
*/
std::atomic<size_t> tasks_total = 0;
size_t tasks_running = 0;

/**
* @brief A mutex to synchronize access to the task queue by different threads.
Expand All @@ -317,9 +314,14 @@ class [[nodiscard]] thread_pool_light
std::unique_ptr<std::thread[]> threads = nullptr;

/**
* @brief An atomic variable indicating that wait_for_tasks() is active and expects to be notified whenever a task is done.
* @brief A flag indicating that wait_for_tasks() is active and expects to be notified whenever a task is done.
*/
bool waiting = false;

/**
* @brief A flag indicating to the workers to keep running. When set to false, the workers terminate permanently.
*/
std::atomic<bool> waiting = false;
bool workers_running = false;
};

} // namespace BS

0 comments on commit 81e9e8e

Please sign in to comment.