From 36061ede15d81f534dd30d16646b0650c2a165aa Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Tue, 26 Mar 2024 15:17:01 +0100 Subject: [PATCH] feat(clock): Added function to interrupt sleep This is useful in case a second thread needs to wake up another thread, that is sleeping using a clock. Signed-off-by: Janosch Machowinski --- rclcpp/include/rclcpp/clock.hpp | 5 + rclcpp/src/rclcpp/clock.cpp | 66 +++++++---- rclcpp/test/rclcpp/CMakeLists.txt | 4 + rclcpp/test/rclcpp/test_clock.cpp | 180 ++++++++++++++++++++++++++++++ 4 files changed, 233 insertions(+), 22 deletions(-) create mode 100644 rclcpp/test/rclcpp/test_clock.cpp diff --git a/rclcpp/include/rclcpp/clock.hpp b/rclcpp/include/rclcpp/clock.hpp index 702b224d53..b714bf59b8 100644 --- a/rclcpp/include/rclcpp/clock.hpp +++ b/rclcpp/include/rclcpp/clock.hpp @@ -193,6 +193,11 @@ class Clock bool ros_time_is_active(); + RCLCPP_PUBLIC + void + cancel_sleep_or_wait(); + + /// Return the rcl_clock_t clock handle RCLCPP_PUBLIC rcl_clock_t * diff --git a/rclcpp/src/rclcpp/clock.cpp b/rclcpp/src/rclcpp/clock.cpp index f46649a77d..addf3628c3 100644 --- a/rclcpp/src/rclcpp/clock.cpp +++ b/rclcpp/src/rclcpp/clock.cpp @@ -49,6 +49,9 @@ class Clock::Impl rcl_clock_t rcl_clock_; rcl_allocator_t allocator_; + bool stop_sleeping_ = false; + std::condition_variable cv_; + std::mutex wait_mutex_; std::mutex clock_mutex_; }; @@ -79,8 +82,20 @@ Clock::now() const return now; } +void +Clock::cancel_sleep_or_wait() +{ + { + std::unique_lock lock(impl_->wait_mutex_); + impl_->stop_sleeping_ = true; + } + impl_->cv_.notify_one(); +} + bool -Clock::sleep_until(Time until, Context::SharedPtr context) +Clock::sleep_until( + Time until, + Context::SharedPtr context) { if (!context || !context->is_valid()) { throw std::runtime_error("context cannot be slept with because it's invalid"); @@ -91,12 +106,10 @@ Clock::sleep_until(Time until, Context::SharedPtr context) } bool time_source_changed = false; - std::condition_variable cv; - // Wake this thread if the context is shutdown rclcpp::OnShutdownCallbackHandle shutdown_cb_handle = context->add_on_shutdown_callback( - [&cv]() { - cv.notify_one(); + [this]() { + cancel_sleep_or_wait(); }); // No longer need the shutdown callback when this function exits auto callback_remover = rcpputils::scope_exit( @@ -112,22 +125,24 @@ Clock::sleep_until(Time until, Context::SharedPtr context) const std::chrono::steady_clock::time_point chrono_until = chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds()); - // loop over spurious wakeups but notice shutdown - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid()) { - cv.wait_until(lock, chrono_until); + // loop over spurious wakeups but notice shutdown or stop of sleep + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) { + impl_->cv_.wait_until(lock, chrono_until); } + impl_->stop_sleeping_ = false; } else if (this_clock_type == RCL_SYSTEM_TIME) { auto system_time = std::chrono::system_clock::time_point( // Cast because system clock resolution is too big for nanoseconds on some systems std::chrono::duration_cast( std::chrono::nanoseconds(until.nanoseconds()))); - // loop over spurious wakeups but notice shutdown - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid()) { - cv.wait_until(lock, system_time); + // loop over spurious wakeups but notice shutdown or stop of sleep + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) { + impl_->cv_.wait_until(lock, system_time); } + impl_->stop_sleeping_ = false; } else if (this_clock_type == RCL_ROS_TIME) { // Install jump handler for any amount of time change, for two purposes: // - if ROS time is active, check if time reached on each new clock sample @@ -139,11 +154,12 @@ Clock::sleep_until(Time until, Context::SharedPtr context) threshold.min_forward.nanoseconds = 1; auto clock_handler = create_jump_callback( nullptr, - [&cv, &time_source_changed](const rcl_time_jump_t & jump) { + [this, &time_source_changed](const rcl_time_jump_t & jump) { if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) { + std::lock_guard lk(impl_->wait_mutex_); time_source_changed = true; } - cv.notify_one(); + impl_->cv_.notify_one(); }, threshold); @@ -153,19 +169,25 @@ Clock::sleep_until(Time until, Context::SharedPtr context) std::chrono::duration_cast( std::chrono::nanoseconds(until.nanoseconds()))); - // loop over spurious wakeups but notice shutdown or time source change - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid() && !time_source_changed) { - cv.wait_until(lock, system_time); + // loop over spurious wakeups but notice shutdown, stop of sleep or time source change + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && context->is_valid() && + !time_source_changed) + { + impl_->cv_.wait_until(lock, system_time); } + impl_->stop_sleeping_ = false; } else { // RCL_ROS_TIME with ros_time_is_active. // Just wait without "until" because installed // jump callbacks wake the cv on every new sample. - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid() && !time_source_changed) { - cv.wait(lock); + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && context->is_valid() && + !time_source_changed) + { + impl_->cv_.wait(lock); } + impl_->stop_sleeping_ = false; } } diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 25ca4082e1..468e35efb7 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -55,6 +55,10 @@ ament_add_gtest(test_client test_client.cpp) if(TARGET test_client) target_link_libraries(test_client ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS}) endif() +ament_add_gtest(test_clock test_clock.cpp) +if(TARGET test_clock) + target_link_libraries(test_clock ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS}) +endif() ament_add_gtest(test_copy_all_parameter_values test_copy_all_parameter_values.cpp) if(TARGET test_copy_all_parameter_values) target_link_libraries(test_copy_all_parameter_values ${PROJECT_NAME}) diff --git a/rclcpp/test/rclcpp/test_clock.cpp b/rclcpp/test/rclcpp/test_clock.cpp new file mode 100644 index 0000000000..08dcbe632a --- /dev/null +++ b/rclcpp/test/rclcpp/test_clock.cpp @@ -0,0 +1,180 @@ +// Copyright 2024 Cellumation GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include "rcl/error_handling.h" +#include "rcl/time.h" +#include "rclcpp/clock.hpp" +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/time_source.hpp" + +#include "../utils/rclcpp_gtest_macros.hpp" + +using namespace std::chrono_literals; +class TestClockWakeup : public ::testing::Test +{ +public: + void test_wakeup_before_sleep(const rclcpp::Clock::SharedPtr & clock) + { + std::atomic_bool thread_finished = false; + + std::thread wait_thread = std::thread( + [&clock, &thread_finished]() + { + // make sure the thread starts sleeping late + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + clock->sleep_until(clock->now() + std::chrono::seconds(3)); + thread_finished = true; + }); + + // notify the clock, that the sleep shall be interrupted + clock->cancel_sleep_or_wait(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); + } + + void test_wakeup_after_sleep(const rclcpp::Clock::SharedPtr & clock) + { + std::atomic_bool thread_finished = false; + + std::thread wait_thread = std::thread( + [&clock, &thread_finished]() + { + clock->sleep_until(clock->now() + std::chrono::seconds(3)); + thread_finished = true; + }); + + // make sure the thread is already sleeping before we send the cancel + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // notify the clock, that the sleep shall be interrupted + clock->cancel_sleep_or_wait(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); + } + +protected: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + node = std::make_shared("my_node"); + } + + void TearDown() + { + node.reset(); + } + + rclcpp::Node::SharedPtr node; +}; + +TEST_F(TestClockWakeup, wakeup_sleep_steay_time) { + auto clock = std::make_shared(RCL_STEADY_TIME); + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, wakeup_sleep_system_time) { + auto clock = std::make_shared(RCL_SYSTEM_TIME); + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, wakeup_sleep_ros_time_not_active) { + auto clock = std::make_shared(RCL_ROS_TIME); + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, wakeup_sleep_ros_time_active) { + node->set_parameter({"use_sim_time", true}); + auto clock = std::make_shared(RCL_ROS_TIME); + rclcpp::TimeSource time_source(node); + time_source.attachClock(clock); + + EXPECT_TRUE(clock->ros_time_is_active()); + + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, no_wakeup_on_sim_time) { + node->set_parameter({"use_sim_time", true}); + auto clock = std::make_shared(RCL_ROS_TIME); + rclcpp::TimeSource time_source(node); + time_source.attachClock(clock); + + EXPECT_TRUE(clock->ros_time_is_active()); + + std::atomic_bool thread_finished = false; + + std::thread wait_thread = std::thread( + [&clock, &thread_finished]() + { + // make sure the thread starts sleeping late + clock->sleep_until(clock->now() + std::chrono::milliseconds(10)); + thread_finished = true; + }); + + // make sure, that the sim time clock does not wakeup, as no clock is provided + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + EXPECT_FALSE(thread_finished); + + // notify the clock, that the sleep shall be interrupted + clock->cancel_sleep_or_wait(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); +}