Skip to content

Commit

Permalink
chore: Made Executor fully overloadable
Browse files Browse the repository at this point in the history
This commit makes every public funciton virtual, and adds virtual impl
function for the existing template functions.

The goal of this commit is to be able to fully control the everything from
a derived class.

Signed-off-by: Janosch Machowinski <[email protected]>
  • Loading branch information
Janosch Machowinski committed Apr 14, 2024
1 parent 03e5f9a commit 34e2a80
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 79 deletions.
102 changes: 31 additions & 71 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,42 +242,30 @@ class Executor
* spin_node_once to block indefinitely (the default behavior). A timeout of 0 causes this
* function to be non-blocking.
*/
template<typename RepT = int64_t, typename T = std::milli>
void
RCLCPP_PUBLIC
virtual void
spin_node_once(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::duration<RepT, T> timeout = std::chrono::duration<RepT, T>(-1))
{
return spin_node_once_nanoseconds(
node,
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
);
}
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Convenience function which takes Node and forwards NodeBaseInterface.
template<typename NodeT = rclcpp::Node, typename RepT = int64_t, typename T = std::milli>
void
RCLCPP_PUBLIC
virtual void
spin_node_once(
std::shared_ptr<NodeT> node,
std::chrono::duration<RepT, T> timeout = std::chrono::duration<RepT, T>(-1))
{
return spin_node_once_nanoseconds(
node->get_node_base_interface(),
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
);
}
const std::shared_ptr<rclcpp::Node> & node,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Add a node, complete all immediately available work, and remove the node.
/**
* \param[in] node Shared pointer to the node to add.
*/
RCLCPP_PUBLIC
void
virtual void
spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);

/// Convenience function which takes Node and forwards NodeBaseInterface.
RCLCPP_PUBLIC
void
virtual void
spin_node_some(std::shared_ptr<rclcpp::Node> node);

/// Collect work once and execute all available work, optionally within a max duration.
Expand Down Expand Up @@ -307,14 +295,14 @@ class Executor
* \param[in] node Shared pointer to the node to add.
*/
RCLCPP_PUBLIC
void
virtual void
spin_node_all(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::nanoseconds max_duration);

/// Convenience function which takes Node and forwards NodeBaseInterface.
RCLCPP_PUBLIC
void
virtual void
spin_node_all(std::shared_ptr<rclcpp::Node> node, std::chrono::nanoseconds max_duration);

/// Collect and execute work repeatedly within a duration or until no more work is available.
Expand Down Expand Up @@ -366,52 +354,11 @@ class Executor
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.

// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}

auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
return spin_until_future_complete_impl(std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout), [&future] () {
return future.wait_for(std::chrono::seconds(0));
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);

// Check if the future is set, return SUCCESS if it is.
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
);
}

/// Cancel any running spin* function, causing it to return.
Expand All @@ -420,7 +367,7 @@ class Executor
* \throws std::runtime_error if there is an issue triggering the guard condition
*/
RCLCPP_PUBLIC
void
virtual void
cancel();

/// Returns true if the executor is currently spinning.
Expand All @@ -429,10 +376,19 @@ class Executor
* \return True if the executor is currently spinning.
*/
RCLCPP_PUBLIC
bool
virtual bool
is_spinning();

protected:
// constructor that will not setup any internals.
/**
* This constructor is intended to be used by any derived executor,
* that explicitly does not want to use the default implementation provided
* by this class. This constructor is guaranteed, to not modify the system
* state.
* */
explicit Executor(const std::shared_ptr<rclcpp::Context> & context);

/// Add a node to executor, execute the next available unit of work, and remove the node.
/**
* Implementation of spin_node_once using std::chrono::nanoseconds
Expand All @@ -447,6 +403,10 @@ class Executor
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::nanoseconds timeout);

virtual FutureReturnCode spin_until_future_complete_impl(
std::chrono::nanoseconds max_duration,
const std::function<std::future_status ()> & get_future_status);

/// Collect work and execute available work, optionally within a duration.
/**
* Implementation of spin_some and spin_all.
Expand Down
82 changes: 82 additions & 0 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {tru

class rclcpp::ExecutorImplementation {};

Executor::Executor(const std::shared_ptr<rclcpp::Context> & context)
: spinning(false),
entities_need_rebuild_(true),
collector_(nullptr),
wait_set_({}, {}, {}, {}, {}, {}, context)
{
}

Executor::Executor(const rclcpp::ExecutorOptions & options)
: spinning(false),
interrupt_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
Expand Down Expand Up @@ -229,6 +237,28 @@ Executor::remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify)
this->remove_node(node_ptr->get_node_base_interface(), notify);
}

void
Executor::spin_node_once(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node,
std::chrono::nanoseconds timeout)
{
return spin_node_once_nanoseconds(
node,
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
);
}

void
Executor::spin_node_once(
const std::shared_ptr<rclcpp::Node> & node,
std::chrono::nanoseconds timeout)
{
return spin_node_once_nanoseconds(
node->get_node_base_interface(),
timeout
);
}

void
Executor::spin_node_once_nanoseconds(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
Expand All @@ -240,6 +270,58 @@ Executor::spin_node_once_nanoseconds(
this->remove_node(node, false);
}

rclcpp::FutureReturnCode Executor::spin_until_future_complete_impl(
std::chrono::nanoseconds timeout,
const std::function<std::future_status ()> & get_future_status)
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.

// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = get_future_status();
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}

auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);

// Check if the future is set, return SUCCESS if it is.
status = get_future_status();
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
}

void
Executor::spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
{
Expand Down
8 changes: 0 additions & 8 deletions rclcpp/test/rclcpp/test_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@

using namespace std::chrono_literals;

enum class ClockType
{
RCL_STEADY_TIME,
RCL_SYSTEM_TIME,
RCL_ROS_TIME,
};


class TestClockWakeup : public ::testing::TestWithParam<rcl_clock_type_e>
{
public:
Expand Down

0 comments on commit 34e2a80

Please sign in to comment.