#include <condition_variable> // 条件变量,用于线程同步
#include <functional> // 函数对象
#include <iostream> // 标准输入输出流
#include <mutex> // 互斥锁
#include <queue> // 队列
#include <thread> // 线程
#include <vector> // 向量容器
using namespace std;
// 线程池类
class ThreadPool {
public:
// 构造函数,初始化线程池
ThreadPool(int num_threads) {
for (int i = 0; i < num_threads; i++) {
AddThread(); // 添加线程
}
}
// 析构函数,销毁线程池
~ThreadPool() {
{
unique_lock<mutex> lock(mutex_);
stop_ = true; // 标志位,停止所有线程
}
cv_.notify_all(); // 唤醒所有线程
for (auto &thread : pool_) {
if (thread.joinable()) {
thread.join(); // 等待线程结束
}
}
}
// 添加线程
void AddThread() {
// 添加线程函数
pool_.emplace_back([this]() {
while (true) {
function<void()> task; // 任务函数
{
unique_lock<mutex> lock(mutex_);
// 等待任务队列非空或停止标志位为真
cv_.wait(lock, [this]() { return !tasks_.empty() || stop_; });
if (stop_ && tasks_.empty()) {
return; // 停止线程
}
task = move(tasks_.front()); // 取出任务
tasks_.pop(); // 删除任务
}
task(); // 执行任务
}
});
}
// 提交任务
template <typename F, typename... Args>
void Commit(F &&f, Args &&...args) {
auto task = bind(forward<F>(f), forward<Args>(args)...);
{
unique_lock<mutex> lock(mutex_);
tasks_.emplace(task); // 添加任务到队列
}
cv_.notify_one(); // 唤醒一个线程执行任务
}
private:
vector<thread> pool_; // 线程池
queue<function<void()>> tasks_; // 任务队列
mutex mutex_; // 互斥锁
condition_variable cv_; // 条件变量
bool stop_ = false; // 停止标志位
};
// 打印函数
void Print(int num) {
cout << "Thread ID: " << this_thread::get_id() << " -> " << num << endl;
}
// 主函数
int main() {
ThreadPool pool(4); // 创建线程池,容量为4
// 提交8个任务
for (int i = 0; i < 8; i++) {
pool.Commit(Print, i);
}
return 0;
}
// 执行结果
Thread ID: Thread ID: 51772 -> 3
Thread ID: 51772 -> 4
Thread ID: 51772 -> 5
Thread ID: 51772 -> 6
Thread ID: 51772 -> 7
Thread ID: 51436 -> 2
43884 -> 0
Thread ID: 45792 -> 1
#include <condition_variable>
#include <deque>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
using namespace std;
class ThreadPool {
public:
ThreadPool(int size) {
for (int i = 0; i < size; i++) {
AddThread();
}
}
~ThreadPool() {
{
unique_lock<mutex> lck(mtx_);
is_stop_ = true;
}
cv_.notify_all();
for (auto& tid : pool_)
if (tid.joinable()) tid.join();
}
void AddThread() {
pool_.emplace_back([this]() {
while (true) {
function<void()> func;
{
unique_lock<mutex> lck(mtx_);
cv_.wait(lck, [this]() { return !task_.empty() || is_stop_; });
if (is_stop_ && task_.empty()) return;
func = move(task_.front());
task_.pop_front();
}
func();
}
});
}
template <typename F, typename... Args>
void PutTask(F&& f, Args &&... args) {
auto func = bind(forward<F>(f), forward<Args>(args)...);
{
unique_lock<mutex> lck(mtx_);
task_.emplace_back(func);
}
cv_.notify_one();
}
private:
deque<function<void()>> task_;
vector<thread> pool_;
int size_{ 0 };
bool is_stop_{ false };
condition_variable cv_;
mutex mtx_;
};
void Print(int num) { cout << "Thread ID: " << this_thread::get_id() << " -> " << num << endl; }
int main() {
ThreadPool tp(4);
for (int i = 0; i < 10; i++) {
tp.PutTask(Print, i);
}
return 0;
}