Skip to content

Commit

Permalink
fix: fix memory leak on libstdc++
Browse files Browse the repository at this point in the history
ditched the std::multimap in queues.cpp for a sorted vector, and established clearer ownership of the request data
yes the requests are still kept alive for a minute
  • Loading branch information
Mishura4 committed Mar 29, 2024
1 parent 44bfb17 commit e1acefe
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 96 deletions.
28 changes: 20 additions & 8 deletions include/dpp/queues.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,9 @@ class DPP_EXPORT in_thread {
std::map<std::string, bucket_t> buckets;

/**
* @brief Queue of requests to be made.
* @brief Queue of requests to be made. Sorted by http_request::endpoint.
*/
std::map<std::string, std::vector<http_request*>> requests_in;
std::vector<std::unique_ptr<http_request>> requests_in;

/**
* @brief Inbound queue thread loop.
Expand Down Expand Up @@ -465,7 +465,7 @@ class DPP_EXPORT in_thread {
* @param req http_request to post. The pointer will be freed when it has
* been executed.
*/
void post_request(http_request* req);
void post_request(std::unique_ptr<http_request> req);
};

/**
Expand Down Expand Up @@ -516,10 +516,15 @@ class DPP_EXPORT request_queue {
*/
std::condition_variable out_ready;

struct request_node {
std::unique_ptr<http_request_completion_t> response;
std::unique_ptr<http_request> request;
};

/**
* @brief Completed requests queue
*/
std::queue<std::pair<http_request_completion_t*, http_request*>> responses_out;
std::queue<request_node> responses_out;

/**
* @brief A vector of inbound request threads forming a pool.
Expand All @@ -532,10 +537,18 @@ class DPP_EXPORT request_queue {
*/
std::vector<in_thread*> requests_in;

struct queued_deleting_request {
time_t time_to_delete;
request_node request;

bool operator<(const queued_deleting_request& other) const noexcept;
bool operator<(time_t time) const noexcept;
};

/**
* @brief Completed requests to delete
*/
std::multimap<time_t, std::pair<http_request_completion_t*, http_request*>> responses_to_delete;
std::vector<queued_deleting_request> responses_to_delete;

/**
* @brief Set to true if the threads should terminate
Expand Down Expand Up @@ -597,14 +610,13 @@ class DPP_EXPORT request_queue {
~request_queue();

/**
* @brief Put a http_request into the request queue. You should ALWAYS "new" an object
* to pass to here -- don't submit an object that's on the stack!
* @brief Put a http_request into the request queue.
* @note Will use a simple hash function to determine which of the 'in queues' to place
* this request onto.
* @param req request to add
* @return reference to self
*/
request_queue& post_request(http_request *req);
request_queue& post_request(std::unique_ptr<http_request> req);

/**
* @brief Returns true if the bot is currently globally rate limited
Expand Down
6 changes: 3 additions & 3 deletions src/dpp/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ json error_response(const std::string& message, http_request_completion_t& rv)

void cluster::post_rest(const std::string &endpoint, const std::string &major_parameters, const std::string &parameters, http_method method, const std::string &postdata, json_encode_t callback, const std::string &filename, const std::string &filecontent, const std::string &filemimetype, const std::string &protocol) {
/* NOTE: This is not a memory leak! The request_queue will free the http_request once it reaches the end of its lifecycle */
rest->post_request(new http_request(endpoint + (!major_parameters.empty() ? "/" : "") + major_parameters, parameters, [endpoint, callback](http_request_completion_t rv) {
rest->post_request(std::make_unique<http_request>(endpoint + (!major_parameters.empty() ? "/" : "") + major_parameters, parameters, [endpoint, callback](http_request_completion_t rv) {
json j;
if (rv.error == h_success && !rv.body.empty()) {
try {
Expand Down Expand Up @@ -351,7 +351,7 @@ void cluster::post_rest_multipart(const std::string &endpoint, const std::string
}

/* NOTE: This is not a memory leak! The request_queue will free the http_request once it reaches the end of its lifecycle */
rest->post_request(new http_request(endpoint + (!major_parameters.empty() ? "/" : "") + major_parameters, parameters, [endpoint, callback](http_request_completion_t rv) {
rest->post_request(std::make_unique<http_request>(endpoint + (!major_parameters.empty() ? "/" : "") + major_parameters, parameters, [endpoint, callback](http_request_completion_t rv) {
json j;
if (rv.error == h_success && !rv.body.empty()) {
try {
Expand All @@ -370,7 +370,7 @@ void cluster::post_rest_multipart(const std::string &endpoint, const std::string

void cluster::request(const std::string &url, http_method method, http_completion_event callback, const std::string &postdata, const std::string &mimetype, const std::multimap<std::string, std::string> &headers, const std::string &protocol) {
/* NOTE: This is not a memory leak! The request_queue will free the http_request once it reaches the end of its lifecycle */
raw_rest->post_request(new http_request(url, callback, method, postdata, mimetype, headers, protocol));
raw_rest->post_request(std::make_unique<http_request>(url, callback, method, postdata, mimetype, headers, protocol));
}

gateway::gateway() : shards(0), session_start_total(0), session_start_remaining(0), session_start_reset_after(0), session_start_max_concurrency(0) {
Expand Down
195 changes: 110 additions & 85 deletions src/dpp/queues.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,103 +236,116 @@ request_queue::~request_queue()
out_ready.notify_one();
out_thread->join();
delete out_thread;
for (auto& ri : requests_in) {
delete ri;
}
}

namespace
{
struct compare_request {
bool operator()(const std::unique_ptr<http_request>& lhs, const std::unique_ptr<http_request>& rhs) const noexcept {
return std::less{}(lhs->endpoint, rhs->endpoint);
};

bool operator()(const std::unique_ptr<http_request>& lhs, std::string_view rhs) const noexcept {
return std::less{}(lhs->endpoint, rhs);
};

bool operator()(std::string_view lhs, const std::unique_ptr<http_request>& rhs) const noexcept {
return std::less{}(lhs, rhs->endpoint);
};
};
}

void in_thread::in_loop(uint32_t index)
{
utility::set_thread_name(std::string("http_req/") + std::to_string(index));
while (!terminating) {
std::mutex mtx;
std::unique_lock<std::mutex> lock{ mtx };
std::unique_lock<std::mutex> lock{ mtx };
in_ready.wait_for(lock, std::chrono::seconds(1));
/* New request to be sent! */

if (!requests->globally_ratelimited) {

std::map<std::string, std::vector<http_request*>> requests_in_copy;
std::vector<http_request*> requests_view;
{
/* Make a safe copy within a mutex */
/* Gather all the requests first within a mutex */
std::shared_lock lock(in_mutex);
if (requests_in.empty()) {
/* Nothing to copy, wait again */
continue;
}
requests_in_copy = requests_in;
requests_view.reserve(requests_in.size());
std::transform(requests_in.begin(), requests_in.end(), std::back_inserter(requests_view), [](const std::unique_ptr<http_request> &r) {
return r.get();
});
}

for (auto & bucket : requests_in_copy) {
for (auto req : bucket.second) {

http_request_completion_t rv;
auto currbucket = buckets.find(bucket.first);

if (currbucket != buckets.end()) {
/* There's a bucket for this request. Check its status. If the bucket says to wait,
* skip all requests in this bucket till its ok.
*/
if (currbucket->second.remaining < 1) {
uint64_t wait = (currbucket->second.retry_after ? currbucket->second.retry_after : currbucket->second.reset_after);
if ((uint64_t)time(nullptr) > currbucket->second.timestamp + wait) {
/* Time has passed, we can process this bucket again. send its request. */
rv = req->run(creator);
} else {
if (!req->waiting) {
req->waiting = true;
}
/* Time not up yet, wait more */
break;
}
for (auto& request_view : requests_view) {
const std::string &key = request_view->endpoint;
http_request_completion_t rv;
auto currbucket = buckets.find(key);

if (currbucket != buckets.end()) {
/* There's a bucket for this request. Check its status. If the bucket says to wait,
* skip all requests in this bucket till its ok.
*/
if (currbucket->second.remaining < 1) {
uint64_t wait = (currbucket->second.retry_after ? currbucket->second.retry_after : currbucket->second.reset_after);
if ((uint64_t)time(nullptr) > currbucket->second.timestamp + wait) {
/* Time has passed, we can process this bucket again. send its request. */
rv = request_view->run(creator);
} else {
/* There's limit remaining, we can just run the request */
rv = req->run(creator);
if (!request_view->waiting) {
request_view->waiting = true;
}
/* Time not up yet, wait more */
break;
}
} else {
/* No bucket for this endpoint yet. Just send it, and make one from its reply */
rv = req->run(creator);
}

bucket_t newbucket;
newbucket.limit = rv.ratelimit_limit;
newbucket.remaining = rv.ratelimit_remaining;
newbucket.reset_after = rv.ratelimit_reset_after;
newbucket.retry_after = rv.ratelimit_retry_after;
newbucket.timestamp = time(nullptr);
requests->globally_ratelimited = rv.ratelimit_global;
if (requests->globally_ratelimited) {
requests->globally_limited_for = (newbucket.retry_after ? newbucket.retry_after : newbucket.reset_after);
/* There's limit remaining, we can just run the request */
rv = request_view->run(creator);
}
buckets[req->endpoint] = newbucket;

/* Make a new entry in the completion list and notify */
http_request_completion_t* hrc = new http_request_completion_t();
*hrc = rv;
{
std::unique_lock lock(requests->out_mutex);
requests->responses_out.push(std::make_pair(hrc, req));
}
requests->out_ready.notify_one();
} else {
/* No bucket for this endpoint yet. Just send it, and make one from its reply */
rv = request_view->run(creator);
}
}

{
std::unique_lock lock(in_mutex);
bool again = false;
do {
again = false;
for (auto & bucket : requests_in) {
for (auto req = bucket.second.begin(); req != bucket.second.end(); ++req) {
if ((*req)->is_completed()) {
requests_in[bucket.first].erase(req);
again = true;
goto out; /* Only clean way out of a nested loop */
}
bucket_t newbucket;
newbucket.limit = rv.ratelimit_limit;
newbucket.remaining = rv.ratelimit_remaining;
newbucket.reset_after = rv.ratelimit_reset_after;
newbucket.retry_after = rv.ratelimit_retry_after;
newbucket.timestamp = time(nullptr);
requests->globally_ratelimited = rv.ratelimit_global;
if (requests->globally_ratelimited) {
requests->globally_limited_for = (newbucket.retry_after ? newbucket.retry_after : newbucket.reset_after);
}
buckets[request_view->endpoint] = newbucket;

/* Remove the request from the incoming requests to transfer it to completed requests */
std::unique_ptr<http_request> request;
{
/* Find the owned pointer in requests_in */
std::scoped_lock lock1{in_mutex};

auto [begin, end] = std::equal_range(requests_in.begin(), requests_in.end(), key, compare_request{});
for (auto it = begin; it != end; ++it) {
if (it->get() == request_view) {
/* Grab and remove */
request = std::move(*it);
requests_in.erase(it);
break;
}
}
out:;
} while (again);
}
/* Make a new entry in the completion list and notify */
auto hrc = std::make_unique<http_request_completion_t>();
*hrc = rv;
{
std::scoped_lock lock1(requests->out_mutex);
requests->responses_out.push({std::move(hrc), std::move(request)});
}
requests->out_ready.notify_one();
}

} else {
Expand All @@ -346,47 +359,59 @@ void in_thread::in_loop(uint32_t index)
}
}

bool request_queue::queued_deleting_request::operator<(const queued_deleting_request& other) const noexcept {
return time_to_delete < other.time_to_delete;
}

bool request_queue::queued_deleting_request::operator<(time_t time) const noexcept {
return time_to_delete < time;
}


void request_queue::out_loop()
{
utility::set_thread_name("req_callback");
while (!terminating) {

std::mutex mtx;
std::unique_lock<std::mutex> lock{ mtx };
std::unique_lock lock{ mtx };
out_ready.wait_for(lock, std::chrono::seconds(1));
time_t now = time(nullptr);

/* A request has been completed! */
std::pair<http_request_completion_t*, http_request*> queue_head = {};
request_node queue_head = {};
{
std::unique_lock lock(out_mutex);
std::scoped_lock lock1(out_mutex);
if (responses_out.size()) {
queue_head = responses_out.front();
queue_head = std::move(responses_out.front());
responses_out.pop();
}
}

if (queue_head.first && queue_head.second) {
queue_head.second->complete(*queue_head.first);
if (queue_head.request && queue_head.response) {
queue_head.request->complete(*queue_head.response);
/* Queue deletions for 60 seconds from now */
responses_to_delete.insert(std::make_pair(now + 60, queue_head));
auto when = now + 60;
auto where = std::lower_bound(responses_to_delete.begin(), responses_to_delete.end(), when);
responses_to_delete.emplace(where, when, std::move(queue_head));
}

/* Check for deletable items every second regardless of select status */
while (responses_to_delete.size() && now >= responses_to_delete.begin()->first) {
delete responses_to_delete.begin()->second.first;
delete responses_to_delete.begin()->second.second;
responses_to_delete.erase(responses_to_delete.begin());
auto end = std::lower_bound(responses_to_delete.begin(), responses_to_delete.end(), now);
if (end != responses_to_delete.begin()) {
responses_to_delete.erase(responses_to_delete.begin(), end);
}
}
}

/* Post a http_request into the queue */
void in_thread::post_request(http_request* req)
void in_thread::post_request(std::unique_ptr<http_request> req)
{
{
std::unique_lock lock(in_mutex);
requests_in[req->endpoint].push_back(req);
std::scoped_lock lock(in_mutex);

auto where = std::lower_bound(requests_in.begin(), requests_in.end(), req->endpoint, compare_request{});
requests_in.emplace(where, std::move(req));
}
in_ready.notify_one();
}
Expand All @@ -410,9 +435,9 @@ inline uint32_t hash(const char *s)
}

/* Post a http_request into a request queue */
request_queue& request_queue::post_request(http_request* req)
request_queue& request_queue::post_request(std::unique_ptr<http_request> req)
{
requests_in[hash(req->endpoint.c_str()) % in_thread_pool_size]->post_request(req);
requests_in[hash(req->endpoint.c_str()) % in_thread_pool_size]->post_request(std::move(req));
return *this;
}

Expand Down

0 comments on commit e1acefe

Please sign in to comment.