Skip to content

Commit

Permalink
fix: fix leak of in_thread, fix read-after-free, fix race condition U…
Browse files Browse the repository at this point in the history
…B after cluster shutdown (#1187)
  • Loading branch information
braindigitalis authored Jul 6, 2024
2 parents 47d420f + 40cbdb8 commit d2e62f9
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
13 changes: 10 additions & 3 deletions include/dpp/queues.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <vector>
#include <functional>
#include <condition_variable>
#include <atomic>

namespace dpp {

Expand Down Expand Up @@ -407,7 +408,7 @@ class DPP_EXPORT in_thread {
/**
* @brief True if ending.
*/
bool terminating;
std::atomic<bool> terminating;

/**
* @brief Request queue that owns this in_thread.
Expand Down Expand Up @@ -465,6 +466,12 @@ class DPP_EXPORT in_thread {
*/
~in_thread();

/**
* @brief Terminates the thread
* This will end the thread that is owned by this object, but will not join it.
*/
void terminate();

/**
* @brief Post a http_request to this thread.
*
Expand Down Expand Up @@ -551,7 +558,7 @@ class DPP_EXPORT request_queue {
* 2) Requests for different endpoints go into different buckets, so that they may be requested in parallel
* A global ratelimit event pauses all threads in the pool. These are few and far between.
*/
std::vector<in_thread*> requests_in;
std::vector<std::unique_ptr<in_thread>> requests_in;

/**
* @brief A request queued for deletion in the queue.
Expand Down Expand Up @@ -590,7 +597,7 @@ class DPP_EXPORT request_queue {
/**
* @brief Set to true if the threads should terminate
*/
bool terminating;
std::atomic<bool> terminating;

/**
* @brief True if globally rate limited - makes the entire request thread wait
Expand Down
22 changes: 15 additions & 7 deletions src/dpp/queues.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,15 @@ http_request_completion_t http_request::run(cluster* owner) {
request_queue::request_queue(class cluster* owner, uint32_t request_threads) : creator(owner), terminating(false), globally_ratelimited(false), globally_limited_for(0), in_thread_pool_size(request_threads)
{
for (uint32_t in_alloc = 0; in_alloc < in_thread_pool_size; ++in_alloc) {
requests_in.push_back(new in_thread(owner, this, in_alloc));
requests_in.push_back(std::make_unique<in_thread>(owner, this, in_alloc));
}
out_thread = new std::thread(&request_queue::out_loop, this);
}

request_queue& request_queue::add_request_threads(uint32_t request_threads)
{
for (uint32_t in_alloc_ex = 0; in_alloc_ex < request_threads; ++in_alloc_ex) {
requests_in.push_back(new in_thread(creator, this, in_alloc_ex + in_thread_pool_size));
requests_in.push_back(std::make_unique<in_thread>(creator, this, in_alloc_ex + in_thread_pool_size));
}
in_thread_pool_size += request_threads;
return *this;
Expand All @@ -227,16 +227,24 @@ in_thread::in_thread(class cluster* owner, class request_queue* req_q, uint32_t

in_thread::~in_thread()
{
terminating = true;
in_ready.notify_one();
terminate();
in_thr->join();
delete in_thr;
}

void in_thread::terminate()
{
terminating.store(true, std::memory_order_relaxed);
in_ready.notify_one();
}

request_queue::~request_queue()
{
terminating = true;
terminating.store(true, std::memory_order_relaxed);
out_ready.notify_one();
for (auto& in_thr : requests_in) {
in_thr->terminate(); // signal all of them here, otherwise they will all join 1 by 1 and it will take forever
}
out_thread->join();
delete out_thread;
}
Expand Down Expand Up @@ -284,7 +292,7 @@ struct compare_request {
void in_thread::in_loop(uint32_t index)
{
utility::set_thread_name(std::string("http_req/") + std::to_string(index));
while (!terminating) {
while (!terminating.load(std::memory_order_relaxed)) {
std::mutex mtx;
std::unique_lock<std::mutex> lock{ mtx };
in_ready.wait_for(lock, std::chrono::seconds(1));
Expand Down Expand Up @@ -397,7 +405,7 @@ bool request_queue::queued_deleting_request::operator<(time_t time) const noexce
void request_queue::out_loop()
{
utility::set_thread_name("req_callback");
while (!terminating) {
while (!terminating.load(std::memory_order_relaxed)) {

std::mutex mtx;
std::unique_lock lock{ mtx };
Expand Down

0 comments on commit d2e62f9

Please sign in to comment.