Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wjwwood] feat(MultiThreadedExecutor): Added ability to handle exceptions from threads #2505

Open
wants to merge 3 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ class Executor
virtual void
spin() = 0;

/// Version of spin() that takes an exception handler to be called when a callback throws.
/**
* \sa rclcpp::Executor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* \param[in] exception_handler will be called for every exception in the processing threads
*
* The exception_handler can be called from multiple threads at the same time.
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
virtual void
spin(const std::function<void(const std::exception & e)> & exception_handler) = 0;

/// Add a callback group to an executor.
/**
* An executor can have zero or more callback groups which provide work during `spin` functions.
Expand Down Expand Up @@ -462,14 +475,27 @@ class Executor

/// Find the next available executable and do the work associated with it.
/**
* \param[in] any_exec Union structure that can hold any executable type (timer, subscription,
* \param[in] any_exec Structure that can hold any executable type (timer, subscription,
* service, client).
* \throws std::runtime_error if there is an issue triggering the guard condition
*/
RCLCPP_PUBLIC
void
execute_any_executable(AnyExecutable & any_exec);

/// Find the next available executable and do the work associated with it.
/**
* \param[in] any_exec Structure that can hold any executable type (timer, subscription,
* service, client).
* \param[in] exception_handler will be called for every exception in the processing threads
* \throws std::runtime_error if there is an issue triggering the guard condition
*/
RCLCPP_PUBLIC
void
execute_any_executable(
AnyExecutable & any_exec,
const std::function<void(const std::exception & e)> & exception_handler);

/// Run subscription executable.
/**
* Do necessary setup and tear-down as well as executing the subscription.
Expand Down
16 changes: 15 additions & 1 deletion rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,28 @@ class MultiThreadedExecutor : public rclcpp::Executor
void
spin() override;

/**
* \sa rclcpp::Executor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* \param[in] exception_handler will be called for every exception in the processing threads
*
* The exception_handler can be called from multiple threads at the same time.
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

RCLCPP_PUBLIC
size_t
get_number_of_threads();

protected:
RCLCPP_PUBLIC
void
run(size_t this_thread_number);
run(
size_t this_thread_number,
const std::function<void(const std::exception & e)> & exception_handler);

private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)
Expand Down
12 changes: 12 additions & 0 deletions rclcpp/include/rclcpp/executors/single_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ class SingleThreadedExecutor : public rclcpp::Executor
void
spin() override;

/// Single-threaded implementation of spin.
/**
* \sa rclcpp::SingleThreadedExecutor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* \param[in] exception_handler will be called for every exception in the processing threads
*
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

private:
RCLCPP_DISABLE_COPY(SingleThreadedExecutor)
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
void
spin() override;

/**
* \sa rclcpp::SingleThreadedExecutor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* \param[in] exception_handler will be called for every exception in the processing threads
*
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/RCLCPP_PUBLIC
virtual void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

/// Static executor implementation of spin some
/**
* This non-blocking function will execute entities that
Expand Down Expand Up @@ -125,6 +135,11 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
void
spin_once_impl(std::chrono::nanoseconds timeout) override;

void
spin_once_impl(
std::chrono::nanoseconds timeout,
const std::function<void(const std::exception & e)> & exception_handler);

std::optional<rclcpp::WaitResult<rclcpp::WaitSet>>
collect_and_wait(std::chrono::nanoseconds timeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ class EventsExecutor : public rclcpp::Executor
void
spin() override;

/**
* \sa rclcpp::Executor:spin() for more details
* \throws std::runtime_error when spin() called while already spinning
* \param exception_handler will be called for every exception in the processing threads
*
* The exception_handler can be called from multiple threads at the same time.
* The exception_handler shall rethrow the exception it if wants to terminate the program.
*/
RCLCPP_PUBLIC
void
spin(const std::function<void(const std::exception & e)> & exception_handler) override;

/// Events executor implementation of spin some
/**
* This non-blocking function will execute the timers and events
Expand Down
14 changes: 12 additions & 2 deletions rclcpp/include/rclcpp/experimental/timers_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,14 @@ class TimersManager
/**
* @brief Starts a thread that takes care of executing the timers stored in this object.
* Function will throw an error if the timers thread was already running.
*
* @param exception_handler if valid, the execution of the timer will be done in a try catch block,
* and any occurring exception will be passed to the given handler
*/
RCLCPP_PUBLIC
void start();
void start(
const std::function<void(const std::exception & e)> & exception_handler = std::function<void(
const std::exception & e)>());

/**
* @brief Stops the timers thread.
Expand Down Expand Up @@ -511,6 +516,11 @@ class TimersManager
*/
void run_timers();

/**
* @brief calls run_timers with a try catch block.
*/
void run_timers(const std::function<void(const std::exception & e)> & exception_handler);

/**
* @brief Get the amount of time before the next timer triggers.
* This function is not thread safe, acquire a mutex before calling it.
Expand All @@ -528,7 +538,7 @@ class TimersManager
* while keeping the heap correctly sorted.
* This function is not thread safe, acquire the timers_mutex_ before calling it.
*/
void execute_ready_timers_unsafe();
void execute_ready_timers_unsafe(std::function<void(const std::exception & e)> exception_handler);

// Callback to be called when timer is ready
std::function<void(const rclcpp::TimerBase *,
Expand Down
63 changes: 63 additions & 0 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,69 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
any_exec.callback_group->can_be_taken_from().store(true);
}

template<typename Function>
void
execute_guarded(
const Function & function,
const std::function<void(const std::exception & e)> & exception_handler)
{
try {
function();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is @jmachowinski trying to explain with #2505 (comment), i do not think this is gonna useful stack trace either.

} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"Exception while spinning: " << e.what());

exception_handler(e);
}
}

void
Executor::execute_any_executable(
AnyExecutable & any_exec,
const std::function<void(const std::exception & e)> & exception_handler)
{
if (!spinning.load()) {
return;
}

assert(
(void("cannot execute an AnyExecutable without a valid callback group"),
any_exec.callback_group));

if (any_exec.timer) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
execute_guarded([&any_exec]() {
execute_timer(any_exec.timer, any_exec.data);
}, exception_handler);
}
if (any_exec.subscription) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));
execute_guarded(
[&any_exec]() {
execute_subscription(any_exec.subscription);
}, exception_handler);
}
if (any_exec.service) {
execute_guarded([&any_exec]() {execute_service(any_exec.service);}, exception_handler);
}
if (any_exec.client) {
execute_guarded([&any_exec]() {execute_client(any_exec.client);}, exception_handler);
}
if (any_exec.waitable) {
execute_guarded([&any_exec]() {
const std::shared_ptr<void> & const_data = any_exec.data;
any_exec.waitable->execute(const_data);
}, exception_handler);
}
// Reset the callback_group, regardless of type
any_exec.callback_group->can_be_taken_from().store(true);
}

template<typename Taker, typename Handler>
static
void
Expand Down
16 changes: 12 additions & 4 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {}

void
MultiThreadedExecutor::spin()
{
spin([](const std::exception & e) {throw e;});
}

void
MultiThreadedExecutor::spin(const std::function<void(const std::exception & e)> & exception_handler)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
Expand All @@ -61,12 +67,12 @@ MultiThreadedExecutor::spin()
{
std::lock_guard wait_lock{wait_mutex_};
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id, exception_handler);
threads.emplace_back(func);
}
}

run(thread_id);
run(thread_id, exception_handler);
for (auto & thread : threads) {
thread.join();
}
Expand All @@ -79,7 +85,9 @@ MultiThreadedExecutor::get_number_of_threads()
}

void
MultiThreadedExecutor::run(size_t this_thread_number)
MultiThreadedExecutor::run(
size_t this_thread_number,
const std::function<void(const std::exception & e)> & exception_handler)
{
(void)this_thread_number;
while (rclcpp::ok(this->context_) && spinning.load()) {
Expand All @@ -97,7 +105,7 @@ MultiThreadedExecutor::run(size_t this_thread_number)
std::this_thread::yield();
}

execute_any_executable(any_exec);
execute_any_executable(any_exec, exception_handler);

if (any_exec.callback_group &&
any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
Expand Down
9 changes: 8 additions & 1 deletion rclcpp/src/rclcpp/executors/single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ SingleThreadedExecutor::~SingleThreadedExecutor() {}

void
SingleThreadedExecutor::spin()
{
spin([](const std::exception & e) {throw e;});
}

void
SingleThreadedExecutor::spin(
const std::function<void(const std::exception &)> & exception_handler)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
Expand All @@ -39,7 +46,7 @@ SingleThreadedExecutor::spin()
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_executable;
if (get_next_executable(any_executable)) {
execute_any_executable(any_executable);
execute_any_executable(any_executable, exception_handler);
}
}
}
32 changes: 30 additions & 2 deletions rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() {}

void
StaticSingleThreadedExecutor::spin()
{
spin([](const std::exception & e) {throw e;});
}


void
StaticSingleThreadedExecutor::spin(
const std::function<void(const std::exception & e)> & exception_handler)
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
Expand All @@ -39,7 +47,7 @@ StaticSingleThreadedExecutor::spin()
// except we need to keep the wait result to reproduce the StaticSingleThreadedExecutor
// behavior.
while (rclcpp::ok(this->context_) && spinning.load()) {
this->spin_once_impl(std::chrono::nanoseconds(-1));
this->spin_once_impl(std::chrono::nanoseconds(-1), exception_handler);
}
}

Expand Down Expand Up @@ -97,12 +105,32 @@ StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_durati

void
StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
{
spin_once_impl(timeout, std::function<void(const std::exception & e)>());
}

void
StaticSingleThreadedExecutor::spin_once_impl(
std::chrono::nanoseconds timeout,
const std::function<void(const std::exception & e)> & exception_handler)
{
if (rclcpp::ok(context_) && spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
auto wait_result = this->collect_and_wait(timeout);
if (wait_result.has_value()) {
this->execute_ready_executables(current_collection_, wait_result.value(), true);
if(exception_handler) {
try {
this->execute_ready_executables(current_collection_, wait_result.value(), true);
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"Exception while spinning : " << e.what());

exception_handler(e);
}
} else {
this->execute_ready_executables(current_collection_, wait_result.value(), true);
}
}
}
}
Expand Down
Loading