Skip to content

Latest commit

 

History

History
187 lines (155 loc) · 3.91 KB

0.手写实现线程池.md

File metadata and controls

187 lines (155 loc) · 3.91 KB
#include <condition_variable>  // 条件变量,用于线程同步
#include <functional>          // 函数对象
#include <iostream>            // 标准输入输出流
#include <mutex>               // 互斥锁
#include <queue>               // 队列
#include <thread>              // 线程
#include <vector>              // 向量容器

using namespace std;

// 线程池类
class ThreadPool {
 public:
  // 构造函数,初始化线程池
  ThreadPool(int num_threads) {
    for (int i = 0; i < num_threads; i++) {
      AddThread();  // 添加线程
    }
  }

  // 析构函数,销毁线程池
  ~ThreadPool() {
    {
      unique_lock<mutex> lock(mutex_);
      stop_ = true;  // 标志位,停止所有线程
    }

    cv_.notify_all();  // 唤醒所有线程

    for (auto &thread : pool_) {
      if (thread.joinable()) {
        thread.join();  // 等待线程结束
      }
    }
  }

  // 添加线程
  void AddThread() {
    // 添加线程函数
    pool_.emplace_back([this]() {
      while (true) {
        function<void()> task;  // 任务函数

        {
          unique_lock<mutex> lock(mutex_);
          // 等待任务队列非空或停止标志位为真
          cv_.wait(lock, [this]() { return !tasks_.empty() || stop_; });

          if (stop_ && tasks_.empty()) {
            return;  // 停止线程
          }

          task = move(tasks_.front());  // 取出任务
          tasks_.pop();                 // 删除任务
        }

        task();  // 执行任务
      }
    });
  }

  // 提交任务
  template <typename F, typename... Args>
  void Commit(F &&f, Args &&...args) {
    auto task = bind(forward<F>(f), forward<Args>(args)...);

    {
      unique_lock<mutex> lock(mutex_);
      tasks_.emplace(task);  // 添加任务到队列
    }

    cv_.notify_one();  // 唤醒一个线程执行任务
  }

 private:
  vector<thread> pool_;            // 线程池
  queue<function<void()>> tasks_;  // 任务队列
  mutex mutex_;                    // 互斥锁
  condition_variable cv_;          // 条件变量
  bool stop_ = false;              // 停止标志位
};

// 打印函数
void Print(int num) {
  cout << "Thread ID: " << this_thread::get_id() << " -> " << num << endl;
}

// 主函数
int main() {
  ThreadPool pool(4);  // 创建线程池,容量为4

  // 提交8个任务
  for (int i = 0; i < 8; i++) {
    pool.Commit(Print, i);
  }

  return 0;
}

// 执行结果
Thread ID: Thread ID: 51772 -> 3
Thread ID: 51772 -> 4
Thread ID: 51772 -> 5
Thread ID: 51772 -> 6
Thread ID: 51772 -> 7
Thread ID: 51436 -> 2
43884 -> 0
Thread ID: 45792 -> 1

#include <condition_variable>
#include <deque>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

using namespace std;

class ThreadPool {
public:
	ThreadPool(int size) {
		for (int i = 0; i < size; i++) {
			AddThread();
		}
	}

	~ThreadPool() {
		{
			unique_lock<mutex> lck(mtx_);
			is_stop_ = true;
		}

		cv_.notify_all();

		for (auto& tid : pool_)
			if (tid.joinable()) tid.join();
	}

	void AddThread() {
		pool_.emplace_back([this]() {
			while (true) {
				function<void()> func;
				{
					unique_lock<mutex> lck(mtx_);
					cv_.wait(lck, [this]() { return !task_.empty() || is_stop_; });

					if (is_stop_ && task_.empty()) return;

					func = move(task_.front());
					task_.pop_front();
				}
				func();
			}
			});
	}

	template <typename F, typename... Args>
	void PutTask(F&& f, Args &&... args) {
		auto func = bind(forward<F>(f), forward<Args>(args)...);
		{
			unique_lock<mutex> lck(mtx_);
			task_.emplace_back(func);
		}
		cv_.notify_one();
	}

private:
	deque<function<void()>> task_;
	vector<thread> pool_;
	int size_{ 0 };
	bool is_stop_{ false };
	condition_variable cv_;
	mutex mtx_;
};

void Print(int num) { cout << "Thread ID: " << this_thread::get_id() << " -> " << num << endl; }

int main() {
	ThreadPool tp(4);
	for (int i = 0; i < 10; i++) {
		tp.PutTask(Print, i);
	}
	return 0;
}