Skip to content

Commit

Permalink
performance improvement; bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cnbatch committed Oct 13, 2024
1 parent d4ca8a8 commit f84276b
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 215 deletions.
170 changes: 149 additions & 21 deletions src/3rd_party/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <memory> // std::make_shared, std::make_unique, std::shared_ptr, std::unique_ptr
#include <mutex> // std::mutex, std::scoped_lock, std::unique_lock
#include <list>
#include <set>
#include <thread> // std::thread
#include <type_traits> // std::common_type_t, std::decay_t, std::invoke_result_t, std::is_void_v
#include <utility> // std::forward, std::move, std::swap
Expand All @@ -26,9 +27,60 @@ namespace ttp
using concurrency_t = std::invoke_result_t<decltype(std::thread::hardware_concurrency)>;

using task_callback = std::function<void(std::unique_ptr<uint8_t[]>)>;

using task_void_callback = std::function<void()>;
using calculate_func = size_t(*)(size_t, concurrency_t);
using task_queue = std::list<std::tuple<task_callback, std::unique_ptr<uint8_t[]>>>;

static size_t always_zero(size_t input_value, concurrency_t thread_count) noexcept
{
return 0;
}

[[nodiscard]]
static size_t calculate_odd(size_t input_value, concurrency_t thread_count) noexcept
{
size_t odd_value = (input_value % thread_count) * 2 + 1;
size_t thread_number = (odd_value + thread_count) % thread_count | 1;
return thread_number;
}

[[nodiscard]]
static size_t assign_thread_odd(size_t input_value, concurrency_t thread_count) noexcept
{
static calculate_func calc[2] = { calculate_odd, always_zero };
return (calc[thread_count == 1])(input_value, thread_count);
}

[[nodiscard]]
static size_t calculate_even(size_t input_value, concurrency_t thread_count) noexcept
{
size_t even_value = (input_value % thread_count) * 2;
size_t thread_number = (even_value + thread_count) % thread_count;
return thread_number;
}

[[nodiscard]]
static size_t assign_thread_even(size_t input_value, concurrency_t thread_count) noexcept
{
static calculate_func calc[2] = { calculate_even, always_zero };
return (calc[thread_count == 1])(input_value, thread_count);
}

[[nodiscard]]
static size_t calculate_assign(size_t input_value, concurrency_t thread_count) noexcept
{
size_t assign_value = (input_value % thread_count) * 2 + (input_value & 1);
size_t thread_number = (assign_value + thread_count) % thread_count | (input_value & 1);
return thread_number;
}

[[nodiscard]]
static size_t assign_thread(size_t input_value, concurrency_t thread_count) noexcept
{
static calculate_func calc[2] = { calculate_assign, always_zero };
return (calc[thread_count == 1])(input_value, thread_count);
}

/**
* @brief A fast, lightweight, and easy-to-use C++17 thread pool class. This is a lighter version of the main thread pool class.
*/
Expand Down Expand Up @@ -348,7 +400,9 @@ namespace ttp
*/
task_group_pool(const concurrency_t thread_count_ = 0) :
thread_count(determine_thread_count(thread_count_)),
threads(std::make_unique<std::thread[]>(thread_count))
threads(std::make_unique<std::thread[]>(thread_count)),
local_network_tasks_total_of_threads(std::make_unique<std::atomic<size_t>[]>(thread_count)),
peer_network_tasks_total_of_threads(std::make_unique<std::atomic<size_t>[]>(thread_count))
{
task_queue_of_threads = std::make_unique<task_queue[]>(thread_count);
tasks_total_of_threads = std::make_unique<std::atomic<size_t>[]>(thread_count);
Expand Down Expand Up @@ -384,7 +438,7 @@ namespace ttp
[[nodiscard]]
size_t get_task_count(size_t number) const
{
size_t thread_number = number % thread_count;
size_t thread_number = assign_thread(number, thread_count);
return tasks_total_of_threads[thread_number].load();
}

Expand All @@ -397,25 +451,60 @@ namespace ttp
return total;
}

[[nodiscard]]
size_t get_local_network_task_count_all() const
{
size_t total = 0;
for (size_t i = 0; i < thread_count; ++i)
total += local_network_tasks_total_of_threads[i].load();
return total;
}

[[nodiscard]]
size_t get_peer_network_task_count_all() const
{
size_t total = 0;
for (size_t i = 0; i < thread_count; ++i)
total += peer_network_tasks_total_of_threads[i].load();
return total;
}

[[nodiscard]]
size_t get_local_network_task_count(size_t number) const
{
size_t thread_number = assign_thread_odd(number, thread_count);
return local_network_tasks_total_of_threads[thread_number].load();
}

[[nodiscard]]
size_t get_peer_network_task_count(size_t number) const
{
size_t thread_number = assign_thread_even(number, thread_count);
return peer_network_tasks_total_of_threads[thread_number].load();
}

bool thread_id_exists(std::thread::id tid)
{
return thread_ids.find(tid) != thread_ids.end();
}

/**
* @brief Push a function with zero or more arguments, but no return value, into the task queue. Does not return a future, so the user must use wait_for_tasks() or some other method to ensure that the task finishes executing, otherwise bad things will happen.
* @brief Push a function with no parameters, and no return value, into the task queue. Does not return a future, so the user must use wait_for_tasks() or some other method to ensure that the task finishes executing, otherwise bad things will happen.
*
* @tparam F The type of the function.
* @tparam A The types of the arguments.
* @param task The function to push.
* @param args The zero or more arguments to pass to the function. Note that if the task is a class member function, the first argument must be a pointer to the object, i.e. &object (or this), followed by the actual arguments.
* @param task_function The function to push.
*/
//template <typename F, typename... A>
//void push_task(F&& task, A&&... args)
//{
// std::function<void()> task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...);
// {
// const std::scoped_lock tasks_lock(tasks_mutex);
// tasks.push(task_function);
// }
// ++tasks_total;
// task_available_cv.notify_one();
//}
void push_task(size_t number, task_void_callback void_task_function)
{
std::unique_ptr<uint8_t[]> data = nullptr;
size_t thread_number = assign_thread(number, thread_count);
{
std::scoped_lock tasks_lock(tasks_mutex_of_threads[thread_number]);
auto task_function = [void_task_function](std::unique_ptr<uint8_t[]> data) { void_task_function(); };
task_queue_of_threads[thread_number].push_back({ task_function, std::move(data) });
++tasks_total_of_threads[thread_number];
}
task_available_cv[thread_number].notify_one();
}

/**
* @brief Push a function with no parameters, and no return value, into the task queue. Does not return a future, so the user must use wait_for_tasks() or some other method to ensure that the task finishes executing, otherwise bad things will happen.
Expand All @@ -425,7 +514,7 @@ namespace ttp
*/
void push_task(size_t number, task_callback task_function, std::unique_ptr<uint8_t[]> data)
{
size_t thread_number = number % thread_count;
size_t thread_number = assign_thread(number, thread_count);
{
std::scoped_lock tasks_lock(tasks_mutex_of_threads[thread_number]);
task_queue_of_threads[thread_number].push_back({ task_function, std::move(data) });
Expand All @@ -434,9 +523,43 @@ namespace ttp
task_available_cv[thread_number].notify_one();
}

void push_task_local(size_t number, task_callback task_function, std::unique_ptr<uint8_t[]> data)
{
size_t thread_number = assign_thread_odd(number, thread_count);
{
std::scoped_lock tasks_lock(tasks_mutex_of_threads[thread_number]);
auto task_func = [task_function, this, thread_number](std::unique_ptr<uint8_t[]> data)
{
task_function(std::move(data));
local_network_tasks_total_of_threads[thread_number]--;
};
task_queue_of_threads[thread_number].push_back({ task_func, std::move(data) });
tasks_total_of_threads[thread_number]++;
local_network_tasks_total_of_threads[thread_number]++;
}
task_available_cv[thread_number].notify_one();
}

void push_task_peer(size_t number, task_callback task_function, std::unique_ptr<uint8_t[]> data)
{
size_t thread_number = assign_thread_even(number, thread_count);
{
std::scoped_lock tasks_lock(tasks_mutex_of_threads[thread_number]);
auto task_func = [task_function, this, thread_number](std::unique_ptr<uint8_t[]> data)
{
task_function(std::move(data));
peer_network_tasks_total_of_threads[thread_number]--;
};
task_queue_of_threads[thread_number].push_back({ task_func, std::move(data) });
tasks_total_of_threads[thread_number]++;
peer_network_tasks_total_of_threads[thread_number]++;
}
task_available_cv[thread_number].notify_one();
}

void push_task(size_t number, std::shared_future<task_callback> task_function_run_later, std::unique_ptr<uint8_t[]> data)
{
size_t thread_number = number % thread_count;
size_t thread_number = assign_thread(number, thread_count);
{
std::scoped_lock tasks_lock(tasks_mutex_of_threads[thread_number]);
auto task_func = [task_function_run_later](std::unique_ptr<uint8_t[]> data)
Expand Down Expand Up @@ -559,6 +682,7 @@ namespace ttp
for (concurrency_t i = 0; i < thread_count; ++i)
{
threads[i] = std::thread(&task_group_pool::worker, this, i);
thread_ids.insert(threads[i].get_id());
}
}

Expand Down Expand Up @@ -672,6 +796,10 @@ namespace ttp
* @brief An atomic variable indicating that wait_for_tasks() is active and expects to be notified whenever a task is done.
*/
std::atomic<bool> waiting = false;

std::unique_ptr<std::atomic<size_t>[]> local_network_tasks_total_of_threads;
std::unique_ptr<std::atomic<size_t>[]> peer_network_tasks_total_of_threads;
std::set<std::thread::id> thread_ids;
};


Expand Down
16 changes: 7 additions & 9 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
int main(int argc, char *argv[])
{
#ifdef __cpp_lib_format
std::cout << std::format("{} version 20241003\n", app_name);
std::cout << std::format("{} version 20241013\n", app_name);
if (argc <= 1)
{
std::cout << std::format("Usage: {} config1.conf\n", app_name);
std::cout << std::format(" {} config1.conf config2.conf...\n", (int)app_name.length(), app_name.data());
return 0;
}
#else
std::cout << app_name << " version 20241003\n";
std::cout << app_name << " version 20241013\n";
if (argc <= 1)
{
std::cout << "Usage: " << app_name << " config1.conf\n";
Expand All @@ -33,18 +33,16 @@ int main(int argc, char *argv[])
}
#endif

constexpr size_t task_count_limit = 8192u;
uint16_t thread_group_count = 1;
int io_thread_count = 1;
if (std::thread::hardware_concurrency() > 3)
{
auto thread_counts = std::thread::hardware_concurrency();
thread_group_count = (uint16_t)(thread_counts / 2);
thread_group_count = (uint16_t)thread_counts;
io_thread_count = (int)std::log2(thread_counts);
}

ttp::task_group_pool task_groups_local{ thread_group_count };
ttp::task_group_pool task_groups_peer{ thread_group_count };
ttp::task_group_pool task_groups{ thread_group_count };

asio::io_context ioc{ io_thread_count };
asio::io_context network_io{ io_thread_count };
Expand Down Expand Up @@ -105,13 +103,13 @@ int main(int argc, char *argv[])
switch (settings.mode)
{
case running_mode::client:
clients.emplace_back(client_mode(ioc, network_io, task_groups_local, task_groups_peer, task_count_limit, settings));
clients.emplace_back(client_mode(ioc, network_io, /*task_groups_local, task_groups_peer,*/ task_groups, /*task_count_limit,*/ settings));
break;
case running_mode::relay:
relays.emplace_back(relay_mode(ioc, network_io, task_groups_local, task_groups_peer, task_count_limit, settings));
relays.emplace_back(relay_mode(ioc, network_io, /*task_groups_local, task_groups_peer,*/ task_groups, /*task_count_limit,*/ settings));
break;
case running_mode::server:
servers.emplace_back(server_mode(ioc, network_io, task_groups_local, task_groups_peer, task_count_limit, settings));
servers.emplace_back(server_mode(ioc, network_io, /*task_groups_local, task_groups_peer,*/ task_groups, /*task_count_limit,*/ settings));
break;
default:
break;
Expand Down
Loading

0 comments on commit f84276b

Please sign in to comment.