Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API Changes for Multihtreaded Events Executor #2466

Closed
wants to merge 10 commits into from
11 changes: 11 additions & 0 deletions rclcpp/include/rclcpp/clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,17 @@ class Clock
bool
ros_time_is_active();

/**
* Cancels an ongoing or future sleep operation of one thread.
*
* This function is intended for multi threaded signaling. It can
* be used by one thread, to wakeup another thread, using any
* of the sleep_ or wait_ methods.
*/
RCLCPP_PUBLIC
void
cancel_sleep_or_wait();

/// Return the rcl_clock_t clock handle
RCLCPP_PUBLIC
rcl_clock_t *
Expand Down
111 changes: 41 additions & 70 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,42 +242,30 @@ class Executor
* spin_node_once to block indefinitely (the default behavior). A timeout of 0 causes this
* function to be non-blocking.
*/
template<typename RepT = int64_t, typename T = std::milli>
void
RCLCPP_PUBLIC
virtual void
spin_node_once(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::duration<RepT, T> timeout = std::chrono::duration<RepT, T>(-1))
{
return spin_node_once_nanoseconds(
node,
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
);
}
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Convenience function which takes Node and forwards NodeBaseInterface.
template<typename NodeT = rclcpp::Node, typename RepT = int64_t, typename T = std::milli>
void
RCLCPP_PUBLIC
virtual void
spin_node_once(
std::shared_ptr<NodeT> node,
std::chrono::duration<RepT, T> timeout = std::chrono::duration<RepT, T>(-1))
{
return spin_node_once_nanoseconds(
node->get_node_base_interface(),
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
);
}
const std::shared_ptr<rclcpp::Node> & node,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
Comment on lines -245 to +256
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you absolutely need to make these non-template? I'm afraid existing code may not compile against this if certain types of durations are passed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really need to, but all other functions in the executor have the same signature and compile just fine. Any std::chono::duration type should auto convert to std::chrono::nanoseconds.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably the others taking only nanoseconds is a probably a bug, see: https://stackoverflow.com/a/22362867

It seems like in C++20 we could have a function that takes std::chrono::duration<auto>, but we don't have that right now.

I'll remove this change in my fix up.


/// Add a node, complete all immediately available work, and remove the node.
/**
* \param[in] node Shared pointer to the node to add.
*/
RCLCPP_PUBLIC
void
virtual void
spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);

/// Convenience function which takes Node and forwards NodeBaseInterface.
RCLCPP_PUBLIC
void
virtual void
spin_node_some(std::shared_ptr<rclcpp::Node> node);

/// Collect work once and execute all available work, optionally within a max duration.
Expand Down Expand Up @@ -307,14 +295,14 @@ class Executor
* \param[in] node Shared pointer to the node to add.
*/
RCLCPP_PUBLIC
void
virtual void
spin_node_all(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::nanoseconds max_duration);

/// Convenience function which takes Node and forwards NodeBaseInterface.
RCLCPP_PUBLIC
void
virtual void
spin_node_all(std::shared_ptr<rclcpp::Node> node, std::chrono::nanoseconds max_duration);

/// Collect and execute work repeatedly within a duration or until no more work is available.
Expand Down Expand Up @@ -366,52 +354,11 @@ class Executor
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.

// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}

auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
return spin_until_future_complete_impl(std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout), [&future] (std::chrono::nanoseconds wait_time) {
return future.wait_for(wait_time);
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);

// Check if the future is set, return SUCCESS if it is.
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
);
}

/// Cancel any running spin* function, causing it to return.
Expand All @@ -420,7 +367,7 @@ class Executor
* \throws std::runtime_error if there is an issue triggering the guard condition
*/
RCLCPP_PUBLIC
void
virtual void
cancel();

/// Returns true if the executor is currently spinning.
Expand All @@ -433,6 +380,15 @@ class Executor
is_spinning();

protected:
/// Constructor that will not initialize any non-trivial members.
/**
* This constructor is intended to be used by any derived executor
* that explicitly does not want to use the default implementation provided
* by this class.
* This constructor is guaranteed to not modify the system state.
*/
explicit Executor(const std::shared_ptr<rclcpp::Context> & context);

/// Add a node to executor, execute the next available unit of work, and remove the node.
/**
* Implementation of spin_node_once using std::chrono::nanoseconds
Expand All @@ -447,6 +403,21 @@ class Executor
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::nanoseconds timeout);

/// Spin (blocking) until the get_future_status returns ready, max_duration is reached,
/// or rclcpp is interrupted.
/**
* \param[in] max_duration Optional duration parameter, which gets passed to Executor::spin_node_once.
* `-1` is block forever, `0` is non-blocking.
* If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return
* code.
* \param[in] wait_for_future A function waiting for a future to become ready.
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
RCLCPP_PUBLIC
virtual FutureReturnCode spin_until_future_complete_impl(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
virtual FutureReturnCode spin_until_future_complete_impl(
RCLCPP_PUBLIC
virtual FutureReturnCode
spin_until_future_complete_impl(

Also, needs at least a minimal docstring.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said above, I'd consider dropping this change, but if you keep it, consider the name spin_until_future_complete_nanoseconds() since that would match other functions like spin_node_once() -> spin_node_once_nanoseconds().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We got two colliding styles here, there is also
spin_once -> spin_once_impl and spin_some -> spin_some_impl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added docs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We got two colliding styles here, there is also
spin_once -> spin_once_impl and spin_some -> spin_some_impl

Yeah, I'll leave it as-is, but I think this is related to the templated functions that take duration vs just taking nanoseconds, I still believe the templated approach is better, and in that case the narrowing to the non-template protected methods are truly just to cast it down to nanoseconds, so that makes sense in those cases. Likely if I had noticed I would have suggesting the newer functions follow that pattern, but it's not that important now.

std::chrono::nanoseconds max_duration,
const std::function<std::future_status(std::chrono::nanoseconds wait_time)> & wait_for_future);

/// Collect work and execute available work, optionally within a duration.
/**
* Implementation of spin_some and spin_all.
Expand Down
71 changes: 49 additions & 22 deletions rclcpp/src/rclcpp/clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class Clock::Impl

rcl_clock_t rcl_clock_;
rcl_allocator_t allocator_;
bool stop_sleeping_ = false;
bool shutdown_ = false;
std::condition_variable cv_;
std::mutex wait_mutex_;
std::mutex clock_mutex_;
};

Expand Down Expand Up @@ -79,8 +83,20 @@ Clock::now() const
return now;
}

void
Clock::cancel_sleep_or_wait()
{
{
std::unique_lock lock(impl_->wait_mutex_);
impl_->stop_sleeping_ = true;
}
impl_->cv_.notify_one();
}

bool
Clock::sleep_until(Time until, Context::SharedPtr context)
Clock::sleep_until(
jmachowinski marked this conversation as resolved.
Show resolved Hide resolved
Time until,
Context::SharedPtr context)
{
if (!context || !context->is_valid()) {
throw std::runtime_error("context cannot be slept with because it's invalid");
Expand All @@ -91,12 +107,14 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
}
bool time_source_changed = false;

std::condition_variable cv;

// Wake this thread if the context is shutdown
rclcpp::OnShutdownCallbackHandle shutdown_cb_handle = context->add_on_shutdown_callback(
[&cv]() {
cv.notify_one();
[this]() {
{
std::unique_lock lock(impl_->wait_mutex_);
impl_->shutdown_ = true;
}
impl_->cv_.notify_one();
});
// No longer need the shutdown callback when this function exits
auto callback_remover = rcpputils::scope_exit(
Expand All @@ -112,22 +130,24 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
const std::chrono::steady_clock::time_point chrono_until =
chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds());

// loop over spurious wakeups but notice shutdown
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid()) {
cv.wait_until(lock, chrono_until);
// loop over spurious wakeups but notice shutdown or stop of sleep
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) {
impl_->cv_.wait_until(lock, chrono_until);
}
impl_->stop_sleeping_ = false;
} else if (this_clock_type == RCL_SYSTEM_TIME) {
auto system_time = std::chrono::system_clock::time_point(
// Cast because system clock resolution is too big for nanoseconds on some systems
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(until.nanoseconds())));

// loop over spurious wakeups but notice shutdown
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid()) {
cv.wait_until(lock, system_time);
// loop over spurious wakeups but notice shutdown or stop of sleep
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) {
impl_->cv_.wait_until(lock, system_time);
}
impl_->stop_sleeping_ = false;
} else if (this_clock_type == RCL_ROS_TIME) {
// Install jump handler for any amount of time change, for two purposes:
// - if ROS time is active, check if time reached on each new clock sample
Expand All @@ -139,11 +159,12 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
threshold.min_forward.nanoseconds = 1;
auto clock_handler = create_jump_callback(
nullptr,
[&cv, &time_source_changed](const rcl_time_jump_t & jump) {
[this, &time_source_changed](const rcl_time_jump_t & jump) {
if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) {
std::lock_guard<std::mutex> lk(impl_->wait_mutex_);
time_source_changed = true;
}
cv.notify_one();
impl_->cv_.notify_one();
},
threshold);

Expand All @@ -153,19 +174,25 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(until.nanoseconds())));

// loop over spurious wakeups but notice shutdown or time source change
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid() && !time_source_changed) {
cv.wait_until(lock, system_time);
// loop over spurious wakeups but notice shutdown, stop of sleep or time source change
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() &&
!time_source_changed)
{
impl_->cv_.wait_until(lock, system_time);
}
impl_->stop_sleeping_ = false;
} else {
// RCL_ROS_TIME with ros_time_is_active.
// Just wait without "until" because installed
// jump callbacks wake the cv on every new sample.
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid() && !time_source_changed) {
cv.wait(lock);
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() &&
!time_source_changed)
{
impl_->cv_.wait(lock);
}
impl_->stop_sleeping_ = false;
}
}

Expand Down
Loading