Skip to content

Commit

Permalink
feat(clock): Added function to interrupt sleep
Browse files Browse the repository at this point in the history
This is useful in case a second thread needs to wake up another
thread, that is sleeping using a clock.

Signed-off-by: Janosch Machowinski <[email protected]>
  • Loading branch information
Janosch Machowinski authored and Janosch Machowinski committed Mar 26, 2024
1 parent acc31a6 commit 36061ed
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 22 deletions.
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ class Clock
bool
ros_time_is_active();

RCLCPP_PUBLIC
void
cancel_sleep_or_wait();


/// Return the rcl_clock_t clock handle
RCLCPP_PUBLIC
rcl_clock_t *
Expand Down
66 changes: 44 additions & 22 deletions rclcpp/src/rclcpp/clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ class Clock::Impl

rcl_clock_t rcl_clock_;
rcl_allocator_t allocator_;
bool stop_sleeping_ = false;
std::condition_variable cv_;
std::mutex wait_mutex_;
std::mutex clock_mutex_;
};

Expand Down Expand Up @@ -79,8 +82,20 @@ Clock::now() const
return now;
}

void
Clock::cancel_sleep_or_wait()
{
{
std::unique_lock lock(impl_->wait_mutex_);
impl_->stop_sleeping_ = true;
}
impl_->cv_.notify_one();
}

bool
Clock::sleep_until(Time until, Context::SharedPtr context)
Clock::sleep_until(
Time until,
Context::SharedPtr context)
{
if (!context || !context->is_valid()) {
throw std::runtime_error("context cannot be slept with because it's invalid");
Expand All @@ -91,12 +106,10 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
}
bool time_source_changed = false;

std::condition_variable cv;

// Wake this thread if the context is shutdown
rclcpp::OnShutdownCallbackHandle shutdown_cb_handle = context->add_on_shutdown_callback(
[&cv]() {
cv.notify_one();
[this]() {
cancel_sleep_or_wait();
});
// No longer need the shutdown callback when this function exits
auto callback_remover = rcpputils::scope_exit(
Expand All @@ -112,22 +125,24 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
const std::chrono::steady_clock::time_point chrono_until =
chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds());

// loop over spurious wakeups but notice shutdown
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid()) {
cv.wait_until(lock, chrono_until);
// loop over spurious wakeups but notice shutdown or stop of sleep
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) {
impl_->cv_.wait_until(lock, chrono_until);
}
impl_->stop_sleeping_ = false;
} else if (this_clock_type == RCL_SYSTEM_TIME) {
auto system_time = std::chrono::system_clock::time_point(
// Cast because system clock resolution is too big for nanoseconds on some systems
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(until.nanoseconds())));

// loop over spurious wakeups but notice shutdown
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid()) {
cv.wait_until(lock, system_time);
// loop over spurious wakeups but notice shutdown or stop of sleep
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) {
impl_->cv_.wait_until(lock, system_time);
}
impl_->stop_sleeping_ = false;
} else if (this_clock_type == RCL_ROS_TIME) {
// Install jump handler for any amount of time change, for two purposes:
// - if ROS time is active, check if time reached on each new clock sample
Expand All @@ -139,11 +154,12 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
threshold.min_forward.nanoseconds = 1;
auto clock_handler = create_jump_callback(
nullptr,
[&cv, &time_source_changed](const rcl_time_jump_t & jump) {
[this, &time_source_changed](const rcl_time_jump_t & jump) {
if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) {
std::lock_guard<std::mutex> lk(impl_->wait_mutex_);
time_source_changed = true;
}
cv.notify_one();
impl_->cv_.notify_one();
},
threshold);

Expand All @@ -153,19 +169,25 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(until.nanoseconds())));

// loop over spurious wakeups but notice shutdown or time source change
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid() && !time_source_changed) {
cv.wait_until(lock, system_time);
// loop over spurious wakeups but notice shutdown, stop of sleep or time source change
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && context->is_valid() &&
!time_source_changed)
{
impl_->cv_.wait_until(lock, system_time);
}
impl_->stop_sleeping_ = false;
} else {
// RCL_ROS_TIME with ros_time_is_active.
// Just wait without "until" because installed
// jump callbacks wake the cv on every new sample.
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid() && !time_source_changed) {
cv.wait(lock);
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && context->is_valid() &&
!time_source_changed)
{
impl_->cv_.wait(lock);
}
impl_->stop_sleeping_ = false;
}
}

Expand Down
4 changes: 4 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ ament_add_gtest(test_client test_client.cpp)
if(TARGET test_client)
target_link_libraries(test_client ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS})
endif()
ament_add_gtest(test_clock test_clock.cpp)
if(TARGET test_clock)
target_link_libraries(test_clock ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS})
endif()
ament_add_gtest(test_copy_all_parameter_values test_copy_all_parameter_values.cpp)
if(TARGET test_copy_all_parameter_values)
target_link_libraries(test_copy_all_parameter_values ${PROJECT_NAME})
Expand Down
180 changes: 180 additions & 0 deletions rclcpp/test/rclcpp/test_clock.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright 2024 Cellumation GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gtest/gtest.h>

#include <chrono>

#include "rcl/error_handling.h"
#include "rcl/time.h"
#include "rclcpp/clock.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/time_source.hpp"

#include "../utils/rclcpp_gtest_macros.hpp"

using namespace std::chrono_literals;
class TestClockWakeup : public ::testing::Test
{
public:
void test_wakeup_before_sleep(const rclcpp::Clock::SharedPtr & clock)
{
std::atomic_bool thread_finished = false;

std::thread wait_thread = std::thread(
[&clock, &thread_finished]()
{
// make sure the thread starts sleeping late
std::this_thread::sleep_for(std::chrono::milliseconds(100));
clock->sleep_until(clock->now() + std::chrono::seconds(3));
thread_finished = true;
});

// notify the clock, that the sleep shall be interrupted
clock->cancel_sleep_or_wait();

auto start_time = std::chrono::steady_clock::now();
auto cur_time = start_time;
while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
cur_time = std::chrono::steady_clock::now();
}

wait_thread.join();

EXPECT_TRUE(thread_finished);
EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
}

void test_wakeup_after_sleep(const rclcpp::Clock::SharedPtr & clock)
{
std::atomic_bool thread_finished = false;

std::thread wait_thread = std::thread(
[&clock, &thread_finished]()
{
clock->sleep_until(clock->now() + std::chrono::seconds(3));
thread_finished = true;
});

// make sure the thread is already sleeping before we send the cancel
std::this_thread::sleep_for(std::chrono::milliseconds(100));

// notify the clock, that the sleep shall be interrupted
clock->cancel_sleep_or_wait();

auto start_time = std::chrono::steady_clock::now();
auto cur_time = start_time;
while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
cur_time = std::chrono::steady_clock::now();
}

wait_thread.join();

EXPECT_TRUE(thread_finished);
EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
}

protected:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}

static void TearDownTestCase()
{
rclcpp::shutdown();
}

void SetUp()
{
node = std::make_shared<rclcpp::Node>("my_node");
}

void TearDown()
{
node.reset();
}

rclcpp::Node::SharedPtr node;
};

TEST_F(TestClockWakeup, wakeup_sleep_steay_time) {
auto clock = std::make_shared<rclcpp::Clock>(RCL_STEADY_TIME);
test_wakeup_after_sleep(clock);
test_wakeup_before_sleep(clock);
}

TEST_F(TestClockWakeup, wakeup_sleep_system_time) {
auto clock = std::make_shared<rclcpp::Clock>(RCL_SYSTEM_TIME);
test_wakeup_after_sleep(clock);
test_wakeup_before_sleep(clock);
}

TEST_F(TestClockWakeup, wakeup_sleep_ros_time_not_active) {
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
test_wakeup_after_sleep(clock);
test_wakeup_before_sleep(clock);
}

TEST_F(TestClockWakeup, wakeup_sleep_ros_time_active) {
node->set_parameter({"use_sim_time", true});
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
rclcpp::TimeSource time_source(node);
time_source.attachClock(clock);

EXPECT_TRUE(clock->ros_time_is_active());

test_wakeup_after_sleep(clock);
test_wakeup_before_sleep(clock);
}

TEST_F(TestClockWakeup, no_wakeup_on_sim_time) {
node->set_parameter({"use_sim_time", true});
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
rclcpp::TimeSource time_source(node);
time_source.attachClock(clock);

EXPECT_TRUE(clock->ros_time_is_active());

std::atomic_bool thread_finished = false;

std::thread wait_thread = std::thread(
[&clock, &thread_finished]()
{
// make sure the thread starts sleeping late
clock->sleep_until(clock->now() + std::chrono::milliseconds(10));
thread_finished = true;
});

// make sure, that the sim time clock does not wakeup, as no clock is provided
std::this_thread::sleep_for(std::chrono::milliseconds(500));
EXPECT_FALSE(thread_finished);

// notify the clock, that the sleep shall be interrupted
clock->cancel_sleep_or_wait();

auto start_time = std::chrono::steady_clock::now();
auto cur_time = start_time;
while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
cur_time = std::chrono::steady_clock::now();
}

wait_thread.join();

EXPECT_TRUE(thread_finished);
EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
}

0 comments on commit 36061ed

Please sign in to comment.