diff --git a/include/dpp/queues.h b/include/dpp/queues.h index 1ae28b5400..1ccb0fd47c 100644 --- a/include/dpp/queues.h +++ b/include/dpp/queues.h @@ -30,6 +30,7 @@ #include #include #include +#include namespace dpp { @@ -407,7 +408,7 @@ class DPP_EXPORT in_thread { /** * @brief True if ending. */ - bool terminating; + std::atomic terminating; /** * @brief Request queue that owns this in_thread. @@ -465,6 +466,12 @@ class DPP_EXPORT in_thread { */ ~in_thread(); + /** + * @brief Terminates the thread + * This will end the thread that is owned by this object, but will not join it. + */ + void terminate(); + /** * @brief Post a http_request to this thread. * @@ -551,7 +558,7 @@ class DPP_EXPORT request_queue { * 2) Requests for different endpoints go into different buckets, so that they may be requested in parallel * A global ratelimit event pauses all threads in the pool. These are few and far between. */ - std::vector requests_in; + std::vector> requests_in; /** * @brief A request queued for deletion in the queue. @@ -590,7 +597,7 @@ class DPP_EXPORT request_queue { /** * @brief Set to true if the threads should terminate */ - bool terminating; + std::atomic terminating; /** * @brief True if globally rate limited - makes the entire request thread wait diff --git a/src/dpp/queues.cpp b/src/dpp/queues.cpp index b6d691210a..c18405f098 100644 --- a/src/dpp/queues.cpp +++ b/src/dpp/queues.cpp @@ -201,7 +201,7 @@ http_request_completion_t http_request::run(cluster* owner) { request_queue::request_queue(class cluster* owner, uint32_t request_threads) : creator(owner), terminating(false), globally_ratelimited(false), globally_limited_for(0), in_thread_pool_size(request_threads) { for (uint32_t in_alloc = 0; in_alloc < in_thread_pool_size; ++in_alloc) { - requests_in.push_back(new in_thread(owner, this, in_alloc)); + requests_in.push_back(std::make_unique(owner, this, in_alloc)); } out_thread = new std::thread(&request_queue::out_loop, this); } @@ -209,7 +209,7 @@ request_queue::request_queue(class cluster* owner, uint32_t request_threads) : c request_queue& request_queue::add_request_threads(uint32_t request_threads) { for (uint32_t in_alloc_ex = 0; in_alloc_ex < request_threads; ++in_alloc_ex) { - requests_in.push_back(new in_thread(creator, this, in_alloc_ex + in_thread_pool_size)); + requests_in.push_back(std::make_unique(creator, this, in_alloc_ex + in_thread_pool_size)); } in_thread_pool_size += request_threads; return *this; @@ -227,16 +227,24 @@ in_thread::in_thread(class cluster* owner, class request_queue* req_q, uint32_t in_thread::~in_thread() { - terminating = true; - in_ready.notify_one(); + terminate(); in_thr->join(); delete in_thr; } +void in_thread::terminate() +{ + terminating.store(true, std::memory_order_relaxed); + in_ready.notify_one(); +} + request_queue::~request_queue() { - terminating = true; + terminating.store(true, std::memory_order_relaxed); out_ready.notify_one(); + for (auto& in_thr : requests_in) { + in_thr->terminate(); // signal all of them here, otherwise they will all join 1 by 1 and it will take forever + } out_thread->join(); delete out_thread; } @@ -284,7 +292,7 @@ struct compare_request { void in_thread::in_loop(uint32_t index) { utility::set_thread_name(std::string("http_req/") + std::to_string(index)); - while (!terminating) { + while (!terminating.load(std::memory_order_relaxed)) { std::mutex mtx; std::unique_lock lock{ mtx }; in_ready.wait_for(lock, std::chrono::seconds(1)); @@ -397,7 +405,7 @@ bool request_queue::queued_deleting_request::operator<(time_t time) const noexce void request_queue::out_loop() { utility::set_thread_name("req_callback"); - while (!terminating) { + while (!terminating.load(std::memory_order_relaxed)) { std::mutex mtx; std::unique_lock lock{ mtx };