Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implemented a timer manager that can handle any timer type #2573

Draft
wants to merge 1 commit into
base: rolling
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
356 changes: 356 additions & 0 deletions rclcpp/src/rclcpp/executors/detail/timer_manager.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,356 @@
#pragma once
#include <chrono>
#include <functional>
#include <memory>
#include <vector>
#include <map>

#include <rcl/timer.h>
#include <rclcpp/timer.hpp>

#include <inttypes.h>

namespace rclcpp::executors
{

/**
* @brief A class for managing a queue of timers
*
* This class holds a queue of timers of one type (RCL_ROS_TIME, RCL_SYSTEM_TIME or RCL_STEADY_TIME).
* The queue itself manages an internal map of the timers, orders by the next time a timer will be
* ready. Each time a timer is ready, a callback will be called from the internal thread.
*/
class TimerQueue
{
struct TimerData
{
std::shared_ptr<const rcl_timer_t> rcl_ref;
std::function<void()> timer_ready_callback;
};

public:
TimerQueue(rcl_clock_type_t timer_type)
: timer_type(timer_type),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than storing this as an additional member, we could just use the get_clock_type() method of the clock

used_clock_for_timers(timer_type),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is not easy to understand.
I propose internal_clock

trigger_thread([this]() {
timer_thread();
})
{
};

~TimerQueue()
{
stop();

trigger_thread.join();
}

void stop()
{
running = false;
used_clock_for_timers.cancel_sleep_or_wait();
}

/**
* @brief Removes a new timer from the queue.
* This function is thread safe.
*
* Removes a timer, if it was added to this queue.
* Ignores timers that are not part of this queue
*
* @param timer the timer to remove.
*/

void remove_timer(const rclcpp::TimerBase::SharedPtr & timer)
{
rcl_clock_t * clock_type_of_timer;

std::shared_ptr<const rcl_timer_t> handle = timer->get_timer_handle();

if (rcl_timer_clock(
const_cast<rcl_timer_t *>(handle.get()),
&clock_type_of_timer) != RCL_RET_OK)
{
assert(false);
}

if (clock_type_of_timer->type != timer_type) {
// timer is handled by another queue
return;
}

timer->clear_on_reset_callback();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should first check that the timer is managed by this queue (as the timer could be from another executor) before clearing the callback


std::scoped_lock l(mutex);

auto it = std::find_if(
all_timers.begin(), all_timers.end(),
[rcl_ref = timer->get_timer_handle()](const std::unique_ptr<TimerData> & d)
{
return d->rcl_ref == rcl_ref;
});

if (it != all_timers.end()) {
const TimerData * data_ptr = it->get();


auto it2 = std::find_if(
running_timers.begin(), running_timers.end(), [data_ptr](const auto & e) {
return e.second == data_ptr;
});

running_timers.erase(it2);
all_timers.erase(it);
}

used_clock_for_timers.cancel_sleep_or_wait();
Copy link
Collaborator

@alsora alsora Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to do this under the mutex lock (i.e. we should release it once we are done working on the timers)

}

/**
* @brief Adds a new timer to the queue.
* This function is thread safe.
*
* This function will ignore any timer, that has not a matching type
*
* @param timer the timer to add.
* @param timer_ready_callback callback that should be called when the timer is ready.
*/
void add_timer(
const rclcpp::TimerBase::SharedPtr & timer,
const std::function<void()> & timer_ready_callback)
{
rcl_clock_t * clock_type_of_timer;

std::shared_ptr<const rcl_timer_t> handle = timer->get_timer_handle();

if (rcl_timer_clock(
const_cast<rcl_timer_t *>(handle.get()),
&clock_type_of_timer) != RCL_RET_OK)
{
assert(false);
}

if (clock_type_of_timer->type != timer_type) {
// timer is handled by another queue
return;
}

std::unique_ptr<TimerData> data = std::make_unique<TimerData>();
data->timer_ready_callback = std::move(timer_ready_callback);
data->rcl_ref = std::move(handle);

timer->set_on_reset_callback(
[data_ptr = data.get(), this](size_t) {
std::scoped_lock l(mutex);
if (!remove_if_dropped(data_ptr))
{
add_timer_to_running_map(data_ptr);
}
});

{
std::scoped_lock l(mutex);
add_timer_to_running_map(data.get());

all_timers.emplace_back(std::move(data) );
}

//wake up thread as new timer was added
used_clock_for_timers.cancel_sleep_or_wait();
}

private:
/**
* Checks if the timer is still referenced if not deletes it from the queue
*
* @param timer_data The timer to check
* @return true if removed / invalid
*/
bool remove_if_dropped(const TimerData * timer_data)
{
if (timer_data->rcl_ref.unique()) {
// timer was deleted
auto it = std::find_if(
all_timers.begin(), all_timers.end(), [timer_data](const std::unique_ptr<TimerData> & e) {
return timer_data == e.get();
}
);

if (it != all_timers.end()) {
all_timers.erase(it);
}
return true;
}
return false;
}

/**
* @brief adds the given timer_data to the map of running timers, if valid.
*
* Advances the rcl timer.
* Computes the next call time of the timer.
* readds the timer to the map of running timers
*/
void add_timer_to_running_map(const TimerData * timer_data)
{
rcl_ret_t ret = rcl_timer_call(const_cast<rcl_timer_t *>(timer_data->rcl_ref.get()));
if (ret == RCL_RET_TIMER_CANCELED) {
return;
}

int64_t next_call_time;

ret = rcl_timer_get_next_call_time(timer_data->rcl_ref.get(), &next_call_time);

if (ret == RCL_RET_OK) {
running_timers.emplace(next_call_time, timer_data);
}

// wake up the timer thread so that it can pick up the timer
used_clock_for_timers.cancel_sleep_or_wait();
}

/**
* Returns the time when the next timer becomes ready
*/
std::chrono::nanoseconds get_next_timer_ready_time() const
{
if (running_timers.empty()) {
// can't use std::chrono::nanoseconds::max, as wait_for
// internally computes end time by using ::now() + timeout
// as a workaround, we use some absurd high timeout
return std::chrono::nanoseconds(used_clock_for_timers.now().nanoseconds()) + std::chrono::hours(10000);
}
return running_timers.begin()->first;
}

void call_ready_timer_callbacks()
{
auto readd_timer_to_running_map = [this](TimerMap::node_type && e)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be static or a member function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may not be static, as this would only be copied the first time the lambda is initialized.

{
const auto & timer_data = e.mapped();
if(remove_if_dropped(timer_data))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this check here? It's already done at the beginning of each while (!running_timers.empty()) { loop iteration

{
return;
}

int64_t next_call_time;

auto ret = rcl_timer_get_next_call_time(timer_data->rcl_ref.get(), &next_call_time);

if (ret == RCL_RET_OK) {
e.key() = std::chrono::nanoseconds(next_call_time);
running_timers.insert(std::move(e));
}
};

while (!running_timers.empty()) {

if(remove_if_dropped(running_timers.begin()->second))
{
continue;
}

int64_t time_until_call;

const rcl_timer_t * rcl_timer_ref = running_timers.begin()->second->rcl_ref.get();
auto ret = rcl_timer_get_time_until_next_call(rcl_timer_ref, &time_until_call);
if (ret == RCL_RET_TIMER_CANCELED) {
running_timers.erase(running_timers.begin());
continue;
}

if (time_until_call <= 0) {
// advance next call time;
rcl_ret_t ret = rcl_timer_call(const_cast<rcl_timer_t *>(rcl_timer_ref));
if (ret == RCL_RET_TIMER_CANCELED) {
running_timers.erase(running_timers.begin());
continue;
}

// timer is ready, execute callback
running_timers.begin()->second->timer_ready_callback();
Copy link
Collaborator

@alsora alsora Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple of comments on this:

  1. we should check somewhere that this is callable (e.g. when adding a timer)
  2. the current timers manager supports two modes of operation: in one mode, when a timer is ready it will push an event and the timer will then be executed by the events-executor thread (after it picks up that event); in the other mode the timers manager will directly execute the timer's user callback (without pushing any event). This second mode can give much more precision in the timer's execution and I see value in preserving it.

Maybe, to support the second mode, we should just set a timer_ready_callback that executes the timer...

readd_timer_to_running_map(running_timers.extract(running_timers.begin()));
continue;
}
break;
}
}

void timer_thread()
{
while (running && rclcpp::ok()) {
std::chrono::nanoseconds next_wakeup_time;
{
std::scoped_lock l(mutex);
call_ready_timer_callbacks();

next_wakeup_time = get_next_timer_ready_time();
}
try {
// RCUTILS_LOG_ERROR_NAMED("rclcpp", "TimerQueue::timer_thread before sleep, next wakeup time %+" PRId64 , next_wakeup_time.count());
used_clock_for_timers.sleep_until(rclcpp::Time(next_wakeup_time.count(), timer_type));
} catch (const std::runtime_error &) {
//there is a race on shutdown, were the context may become invalid, while we call sleep_until
running = false;
}
}
thread_terminated = true;
}

rcl_clock_type_t timer_type;

Context::SharedPtr clock_sleep_context;

rclcpp::Clock used_clock_for_timers;

std::mutex mutex;

std::atomic_bool running = true;
std::atomic_bool thread_terminated = false;

std::vector<std::unique_ptr<TimerData>> all_timers;

using TimerMap = std::multimap<std::chrono::nanoseconds, const TimerData *>;
TimerMap running_timers;

std::thread trigger_thread;

std::condition_variable thread_conditional;
};

class TimerManager
{
std::array<TimerQueue, 3> timer_queues;

public:
TimerManager()
: timer_queues{RCL_ROS_TIME, RCL_SYSTEM_TIME, RCL_STEADY_TIME}
Copy link
Collaborator

@alsora alsora Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queues should be created when someone tries to add the first timer of that type, and (maybe?) destroyed when empty.

if an application only uses timers of a single type, it shouldn't create 2 extra threads per executor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I don't see any harm in having two idle threads in this case. They would block on the conditional and consume no CPU whatsoever.

Starting threads on runtime, and additional checks inside the executor code I see more as a problem though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if there will be no CPU consumption, threads also have a memory impact.
Moreover, even if the impact is likely very small, most systems will only have 1 type of timers (a lot of users use the default RCL_ROS_TIME clock), it seems strange that if you have 10 executors with 1 timer each you will create 40 threads instead of 20.

I agree that stopping and restarting them is probably not great, but at least we should wait to create a thread only if we need it.
This is an issue also with the current implementation, that always creates 1 thread even if you have no timers... But it's worst here because we always create 3 threads.

Copy link
Contributor Author

@jmachowinski jmachowinski Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still don't see the issue, memory impact is at most 100 KB per thread. So event for a really big system like ours (106 executors), we are talking 30 MB of memory vs 10MB.

Copy link
Collaborator

@alsora alsora Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In embedded applications, knowing that there's 2-3 MB wasted on threads that are never used is a problem, especially if it's something that can be easily optimized.

I agree that starting and stopping the threads is likely not the correct solution (at least it shouldn't be the default behavior), but I don't see any downside in delaying the creation of the timer queues until they are needed.

The remove_timer and add_timer functions below should just look up the timer clock type and create a queue if they don't have one already, this also makes the code simpler because we don't have to "try" to add the timer to all queue, but we already know in which queue it should be added and we can check a return code for success/failures.

{

}

void remove_timer(const rclcpp::TimerBase::SharedPtr & timer)
{
for (TimerQueue & q : timer_queues) {
q.remove_timer(timer);
}
}

void add_timer(
const rclcpp::TimerBase::SharedPtr & timer,
const std::function<void()> & timer_ready_callback)
{
for (TimerQueue & q : timer_queues) {
q.add_timer(timer, timer_ready_callback);
}
}

void stop()
{
for (TimerQueue & q : timer_queues) {
q.stop();
}
}
};
}