Skip to content

Latest commit

 

History

History
198 lines (158 loc) · 4.24 KB

0.手写线程类Mediapipe的Jobdispather-单线程多任务.md

File metadata and controls

198 lines (158 loc) · 4.24 KB
#ifndef HEADER
#define HEADER

#include <deque>
#include <functional>
#include <mutex>
#include <pthread.h>
#include <vector>

namespace mp {
typedef std::function<FSP_STATUS()> StatusDispatchFunction;
typedef std::function<void()> VoidDispatchFunction;

class JobDispatcher {
public:
  JobDispatcher();

  ~JobDispatcher();

  JobDispatcher(const JobDispatcher &) = delete;

  JobDispatcher &operator=(JobDispatcher) = delete;

  FSP_STATUS Run(StatusDispatchFunction func);

  void ClearJobs();

  void WaitUntilDone();
  void RunWithoutWaiting(VoidDispatchFunction func);

  bool IsCurrentThread();

  void SelfDestruct();

private:
  static void *ThreadBody(void *instance);

  void ThreadBody();

  using Job = std::function<void(void)>;

  Job GetJob();
  inline void CheckComplete();

  void PutJob(Job job);

  std::mutex mutex_;
  // Used to wait for a job's completion.
  bool is_complete_ = true;
  std::condition_variable job_done_cv_;
  std::condition_variable jobs_complete_cv_;
  pthread_t thread_id_;

  std::deque<Job> jobs_;
  std::condition_variable has_jobs_cv_;

  bool self_destruct_ = false;
};
} // namespace mp

#endif // HEADER

#include "JobDispatcher.h"
#include <port/logger.h>


using namespace mp;

static void SetThreadName(const char *name) {
  char thread_name[16]; // Linux requires names (with nul) fit in 16 chars
  strncpy(thread_name, name, sizeof(thread_name));
  thread_name[sizeof(thread_name) - 1] = '\0';
  int res = pthread_setname_np(pthread_self(), thread_name);
  FSP_CHECK_LOG(res != 0, "Can't set pthread names: name: \"%s\", error:%d",
                name, res);
}

mp::JobDispatcher::JobDispatcher() {
  FSP_CHECK_EQ(pthread_create(&thread_id_, nullptr, ThreadBody, this), 0);
}

mp::JobDispatcher::~JobDispatcher() {
  if (IsCurrentThread()) {
    FSP_CHECK(self_destruct_);
    FSP_CHECK_EQ(pthread_detach(thread_id_), 0);
  } else {
    // Give an invalid job to signal termination.
    PutJob({});
    FSP_CHECK_EQ(pthread_join(thread_id_, nullptr), 0);
  }
}

void mp::JobDispatcher::SelfDestruct() {
  self_destruct_ = true;
  // Give an invalid job to signal termination.
  PutJob({});
}

mp::JobDispatcher::Job mp::JobDispatcher::GetJob() {
  std::unique_lock<std::mutex> lock(mutex_);
  while (jobs_.empty()) {
    has_jobs_cv_.wait(lock);
  }
  Job job = std::move(jobs_.front());
  jobs_.pop_front();
  return job;
}

void mp::JobDispatcher::PutJob(Job job) {
  std::lock_guard<std::mutex> lock(mutex_);
  is_complete_ = false;
  jobs_.push_back(std::move(job));
  has_jobs_cv_.notify_all();
}

void *mp::JobDispatcher::ThreadBody(void *instance) {
  JobDispatcher *thread = static_cast<JobDispatcher *>(instance);
  thread->ThreadBody();
  return nullptr;
}

void mp::JobDispatcher::ThreadBody() {
  SetThreadName("JobDispatcher");
  while (true) {
    Job job = GetJob();
    if (!job) {
      CheckComplete();
      FSP_LOGW("No more jobs.");
      break;
    }
    job();
    CheckComplete();
  }
  if (self_destruct_) {
    delete this;
  }
}

FSP_STATUS mp::JobDispatcher::Run(StatusDispatchFunction func) {
  FSP_CHECK(func);
  if (IsCurrentThread()) {
    return func();
  }
  bool done = false;
  FSP_STATUS status;
  PutJob([this, func, &done, &status]() {
    status = func();
    std::unique_lock<std::mutex> lock(mutex_);
    done = true;
    job_done_cv_.notify_all();
  });

  std::unique_lock<std::mutex> lock(mutex_);
  while (!done) {
    job_done_cv_.wait(lock);
  }
  return status;
}

void mp::JobDispatcher::RunWithoutWaiting(VoidDispatchFunction func) {
  FSP_CHECK(func);
  PutJob(std::move(func));
}

bool mp::JobDispatcher::IsCurrentThread() {
  return pthread_equal(thread_id_, pthread_self());
}

void JobDispatcher::WaitUntilDone() {
  std::unique_lock<std::mutex> lock(mutex_);
  while (!is_complete_) {
    jobs_complete_cv_.wait(lock);
  }
}

void JobDispatcher::CheckComplete() {
  std::unique_lock<std::mutex> lock(mutex_);

  if (jobs_.empty()) {
    is_complete_ = true;
    jobs_complete_cv_.notify_all();
  }
}

void JobDispatcher::ClearJobs() {
  std::deque<Job> q_jobs;
  {
    std::lock_guard<std::mutex> _lock(mutex_);
    FSP_LOGW("clear jobs %d", jobs_.size());
    q_jobs.swap(jobs_);
  }
  auto func = [](std::deque<Job> jobs) { jobs.clear(); };
  auto func_bind = std::bind(func, std::move(q_jobs));
  PutJob(func_bind);
}