diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index b2d1023064..85434df9a0 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -121,6 +121,16 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/waitable.cpp ) +if(${CMAKE_SYSTEM_NAME} MATCHES "Linux") + list(APPEND ${PROJECT_NAME}_SRCS + src/rclcpp/threads/posix_thread.cpp + ) +elseif(${CMAKE_SYSTEM_NAME} MATCHES "Windows") + list(APPEND ${PROJECT_NAME}_SRCS + src/rclcpp/threads/windows_thread.cpp + ) +endif() + find_package(Python3 REQUIRED COMPONENTS Interpreter) # "watch" template for changes diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 119013ebfb..8c48c5fe1e 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -18,10 +18,12 @@ #include #include #include +#include #include #include #include +#include "rcl_yaml_param_parser/types.h" #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" @@ -85,6 +87,7 @@ class MultiThreadedExecutor : public rclcpp::Executor size_t number_of_threads_; bool yield_before_execute_; std::chrono::nanoseconds next_exec_timeout_; + rcl_thread_attrs_t * thread_attributes_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp index 9dc6dec57b..526592da16 100644 --- a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp @@ -22,6 +22,7 @@ #include #include +#include "rcl_yaml_param_parser/types.h" #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" @@ -65,8 +66,14 @@ class SingleThreadedExecutor : public rclcpp::Executor void spin() override; +protected: + RCLCPP_PUBLIC + void + run(); + private: RCLCPP_DISABLE_COPY(SingleThreadedExecutor) + rcl_thread_attrs_t * thread_attributes_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/threads.hpp b/rclcpp/include/rclcpp/threads.hpp new file mode 100644 index 0000000000..ac628de3e1 --- /dev/null +++ b/rclcpp/include/rclcpp/threads.hpp @@ -0,0 +1,26 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS_HPP_ +#define RCLCPP__THREADS_HPP_ + +#if defined(__linux__) +#include "rclcpp/threads/posix/thread.hpp" +#elif defined(_WIN32) +#include "rclcpp/threads/win32/thread.hpp" +#else +#include "rclcpp/threads/std/thread.hpp" +#endif + +#endif // RCLCPP__THREADS_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp b/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp new file mode 100644 index 0000000000..60e9a1fbd7 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp @@ -0,0 +1,158 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ +#define RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ + +#include +#include +#include +#include + +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +namespace detail +{ + +struct CpuSet +{ + using NativeCpuSetType = cpu_set_t; + CpuSet() = default; + explicit CpuSet(std::size_t cpu) + { + init_cpu_set(); + CPU_ZERO_S(alloc_size(), cpu_set_.get()); + CPU_SET_S(cpu, alloc_size(), cpu_set_.get()); + } + CpuSet(const CpuSet & other) + { + if (other.cpu_set_) { + init_cpu_set(); + memcpy(cpu_set_.get(), other.cpu_set_.get(), alloc_size()); + } + } + CpuSet & operator=(const CpuSet & other) + { + if (other.cpu_set_) { + init_cpu_set(); + memcpy(cpu_set_.get(), other.cpu_set_.get(), alloc_size()); + } else { + clear(); + } + return *this; + } + CpuSet(CpuSet && other) + : CpuSet() + { + swap(other); + } + CpuSet & operator=(CpuSet && other) + { + CpuSet tmp; + other.swap(tmp); + tmp.swap(*this); + return *this; + } + void swap(CpuSet & other) + { + using std::swap; + swap(cpu_set_, other.cpu_set_); + swap(num_proc_, other.num_proc_); + } + void set(std::size_t cpu) + { + init_cpu_set(); + valid_cpu(cpu); + CPU_SET_S(cpu, alloc_size(), cpu_set_.get()); + } + void unset(std::size_t cpu) + { + init_cpu_set(); + valid_cpu(cpu); + CPU_CLR_S(cpu, alloc_size(), cpu_set_.get()); + } + void clear() + { + if (cpu_set_) { + CPU_ZERO_S(alloc_size(), cpu_set_.get()); + } + } + bool is_set(std::size_t cpu) const + { + if (cpu_set_) { + valid_cpu(cpu); + return CPU_ISSET_S(cpu, alloc_size(), cpu_set_.get()); + } else { + return false; + } + } + + std::size_t max_processors() const + { + return num_proc_; + } + std::size_t alloc_size() const + { + return CPU_ALLOC_SIZE(num_proc_); + } + NativeCpuSetType * native_cpu_set() const + { + return cpu_set_.get(); + } + +private: + void init_cpu_set() + { + if (cpu_set_) { + return; + } + auto num_proc = sysconf(_SC_NPROCESSORS_ONLN); + if (num_proc == -1) { + return; + } + auto p = CPU_ALLOC(CPU_ALLOC_SIZE(num_proc)); + cpu_set_ = std::unique_ptr(p); + num_proc_ = num_proc; + } + void valid_cpu(std::size_t cpu) const + { + if (num_proc_ <= cpu) { + auto ec = std::make_error_code(std::errc::invalid_argument); + throw std::system_error{ec, "cpu number is invaild"}; + } + } + struct CpuSetDeleter + { + void operator()(NativeCpuSetType * cpu_set) const + { + CPU_FREE(cpu_set); + } + }; + std::unique_ptr cpu_set_; + std::size_t num_proc_; +}; + +inline void swap(CpuSet & a, CpuSet & b) +{ + a.swap(b); +} + +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread.hpp b/rclcpp/include/rclcpp/threads/posix/thread.hpp new file mode 100644 index 0000000000..2a249541f4 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread.hpp @@ -0,0 +1,205 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/macros.hpp" +#include "rclcpp/threads/posix/thread_attribute.hpp" +#include "rclcpp/threads/posix/thread_func.hpp" +#include "rclcpp/threads/posix/thread_id.hpp" +#include "rclcpp/threads/posix/utilities.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +struct Thread +{ + using NativeHandleType = pthread_t; + using Attribute = detail::ThreadAttribute; + using Id = detail::ThreadId; + + // Assume pthread_t is an invalid handle if it's 0 + Thread() noexcept + : handle_{} {} + Thread(Thread && other) + : handle_{} + { + swap(other); + } + template, Attribute>::value>> + explicit Thread(F && f, Args && ... args) + : Thread( + static_cast(nullptr), + make_thread_func(std::forward(f), std::forward(args)...)) + {} + template + Thread(Attribute const & attr, F && f, Args && ... args) + : Thread( + &attr, + make_thread_func_with_attr(attr, std::forward(f), std::forward(args)...)) + {} + Thread(Thread const &) = delete; + ~Thread() + { + if (handle_) { + std::terminate(); + } + } + + Thread & operator=(Thread && other) noexcept + { + if (handle_) { + std::terminate(); + } + swap(other); + return *this; + } + + Thread & operator=(Thread const &) = delete; + + void swap(Thread & other) + { + using std::swap; + swap(handle_, other.handle_); + swap(name_, other.name_); + } + + void join() + { + void * p; + int r = pthread_join(handle_, &p); + detail::throw_if_error(r, "Error in pthread_join "); + handle_ = NativeHandleType{}; + } + + bool joinable() const noexcept + { + return 0 == pthread_equal(handle_, NativeHandleType{}); + } + + void detach() + { + int r = pthread_detach(handle_); + detail::throw_if_error(r, "Error in pthread_detach "); + handle_ = NativeHandleType{}; + } + + NativeHandleType native_handle() const + { + return handle_; + } + + Id get_id() const noexcept + { + return Id{handle_}; + } + + static unsigned int hardware_concurrency() noexcept + { + auto r = sysconf(_SC_NPROCESSORS_ONLN); + if (r == -1) { + return 0u; + } else { + return static_cast(r); + } + } + +private: + using ThreadFuncBase = detail::ThreadFuncBase; + template + static ThreadFuncBase::UniquePtr make_thread_func(F && f, Args && ... args) + { + static_assert( + !std::is_member_object_pointer_v>, + "F is a pointer to member, that has no effect on a thread"); + + detail::ThreadFuncBase * func = new detail::ThreadFunc( + [f = std::forward(f), args = std::tuple(std::forward(args)...)]() mutable + { + std::apply(f, args); + }); + return ThreadFuncBase::UniquePtr(func); + } + template + static ThreadFuncBase::UniquePtr make_thread_func_with_attr( + Attribute const & attr, + F && f, + Args && ... args) + { + static_assert( + !std::is_member_object_pointer_v>, + "F is a pointer to member, that has no effect on a thread"); + + detail::ThreadFuncBase * func = new detail::ThreadFunc( + [attr, f = std::forward(f), args = std::tuple(std::forward(args)...)]() mutable + { + std::apply(f, args); + }); + return ThreadFuncBase::UniquePtr(func); + } + + Thread(Attribute const * attr, ThreadFuncBase::UniquePtr func); + + static void apply_attr(Attribute const & attr); + + NativeHandleType handle_; + std::string name_; +}; + +inline void swap(Thread & t1, Thread & t2) +{ + t1.swap(t2); +} + +namespace detail +{ +void apply_attr_to_current_thread(ThreadAttribute const & attr); +} + +namespace this_thread +{ + +template +void run_with_thread_attribute( + detail::ThreadAttribute const & attr, F && f, Args && ... args) +{ + static_assert( + !std::is_member_object_pointer_v>, + "F is a pointer to member, that has no effect on a thread"); + + detail::apply_attr_to_current_thread(attr); + std::invoke(std::forward(f), std::forward(args)...); +} + +} // namespace this_thread + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__THREAD_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp b/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp new file mode 100644 index 0000000000..e2baa37696 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp @@ -0,0 +1,189 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ + +#include +#include +#include + +#include "rcl_yaml_param_parser/types.h" +#include "rclcpp/visibility_control.hpp" + +#ifdef __linux__ +#include "rclcpp/threads/posix/linux/cpu_set.hpp" +#endif + +namespace rclcpp +{ + +namespace detail +{ + +struct ThreadAttribute +{ + ThreadAttribute(); + + ThreadAttribute(const ThreadAttribute &) = default; + ThreadAttribute(ThreadAttribute &&) = default; + ThreadAttribute & operator=(const ThreadAttribute &) = default; + ThreadAttribute & operator=(ThreadAttribute &&) = default; + + using NativeAttributeType = pthread_attr_t; + + ThreadAttribute & set_affinity(CpuSet cs) + { + cpu_set_ = std::move(cs); + return *this; + } + const CpuSet & get_affinity() const + { + return cpu_set_; + } + + ThreadAttribute & set_sched_policy(rcl_thread_scheduling_policy_type_t sp) + { + sched_policy_ = rcl_scheduling_policy_to_sched_policy(sp); + return *this; + } + int get_sched_policy() const + { + return sched_policy_; + } + + ThreadAttribute & set_stack_size(std::size_t sz) + { + stack_size_ = sz; + return *this; + } + std::size_t get_stack_size() const + { + return stack_size_; + } + + ThreadAttribute & set_priority(int prio) + { + priority_ = prio; + return *this; + } + int get_priority() const + { + return priority_; + } + + ThreadAttribute & set_run_as_detached(bool detach) + { + detached_flag_ = detach; + return *this; + } + bool get_run_as_detached() const + { + return detached_flag_; + } + + ThreadAttribute & set_name(std::string name) + { + name_ = std::move(name); + return *this; + } + const std::string & get_name() const + { + return name_; + } + + void + set_thread_attribute( + const rcl_thread_attr_t & attr) + { + CpuSet cpu_set(attr.core_affinity); + set_affinity(std::move(cpu_set)); + set_sched_policy(attr.scheduling_policy); + set_priority(attr.priority); + set_name(attr.name); + } + + void + swap( + ThreadAttribute & other) + { + using std::swap; + swap(cpu_set_, other.cpu_set_); + swap(sched_policy_, other.sched_policy_); + swap(stack_size_, other.stack_size_); + swap(priority_, other.priority_); + swap(detached_flag_, other.detached_flag_); + swap(name_, other.name_); + } + +private: + CpuSet cpu_set_; + int sched_policy_; + std::size_t stack_size_; + int priority_; + bool detached_flag_; + std::string name_; + + int rcl_scheduling_policy_to_sched_policy( + rcl_thread_scheduling_policy_type_t sched_policy) + { + switch (sched_policy) { +#ifdef SCHED_FIFO + case RCL_THREAD_SCHEDULING_POLICY_FIFO: + return SCHED_FIFO; +#endif +#ifdef SCHED_RR + case RCL_THREAD_SCHEDULING_POLICY_RR: + return SCHED_RR; +#endif +#ifdef SCHED_OTHER + case RCL_THREAD_SCHEDULING_POLICY_OTHER: + return SCHED_OTHER; +#endif +#ifdef SCHED_IDLE + case RCL_THREAD_SCHEDULING_POLICY_IDLE: + return SCHED_IDLE; +#endif +#ifdef SCHED_BATCH + case RCL_THREAD_SCHEDULING_POLICY_BATCH: + return SCHED_BATCH; +#endif +#ifdef SCHED_SPORADIC + case RCL_THREAD_SCHEDULING_POLICY_SPORADIC: + return SCHED_SPORADIC; +#endif + /* Todo: Necessity and setting method need to be considered + #ifdef SCHED_DEADLINE + case RCL_THREAD_SCHEDULING_POLICY_DEADLINE: + return SCHED_DEADLINE; + break; + #endif + */ + default: + throw std::runtime_error("Invalid scheduling policy"); + } + return -1; + } +}; + +inline void swap(ThreadAttribute & a, ThreadAttribute & b) +{ + a.swap(b); +} + +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_func.hpp b/rclcpp/include/rclcpp/threads/posix/thread_func.hpp new file mode 100644 index 0000000000..d4ae497e14 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread_func.hpp @@ -0,0 +1,54 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ + +#include +#include +#include + +namespace rclcpp::detail +{ + +struct ThreadFuncBase +{ + virtual ~ThreadFuncBase() = default; + virtual void run() = 0; + RCLCPP_UNIQUE_PTR_DEFINITIONS(ThreadFuncBase) +}; + +template +struct ThreadFunc : ThreadFuncBase +{ + template + explicit ThreadFunc(G && g) + : func_(std::forward(g)) + {} + +private: + void run() override + { + func_(); + } + + F func_; +}; + +template +ThreadFunc(F &&)->ThreadFunc>; + +} // namespace rclcpp::detail + +#endif // RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_id.hpp b/rclcpp/include/rclcpp/threads/posix/thread_id.hpp new file mode 100644 index 0000000000..20fa3b8a00 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread_id.hpp @@ -0,0 +1,146 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ + +#include + +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wmismatched-tags" +#endif + +struct Thread; + +namespace detail +{ + +namespace thread_id_ns +{ + +struct ThreadId; + +inline ThreadId get_id() noexcept; +inline bool operator==(ThreadId id1, ThreadId id2); +inline bool operator!=(ThreadId id1, ThreadId id2); +inline bool operator<(ThreadId id1, ThreadId id2); +inline bool operator>(ThreadId id1, ThreadId id2); +inline bool operator<=(ThreadId id1, ThreadId id2); +inline bool operator>=(ThreadId id1, ThreadId id2); +template +inline std::basic_ostream & operator<<( + std::basic_ostream &, + ThreadId); + +struct ThreadId +{ + ThreadId() = default; + ThreadId(ThreadId const &) = default; + ThreadId(ThreadId &&) = default; + ThreadId & operator=(ThreadId const &) = default; + ThreadId & operator=(ThreadId &&) = default; + + friend bool operator==(ThreadId id1, ThreadId id2) + { + return pthread_equal(id1.h, id2.h); + } + friend bool operator<(ThreadId id1, ThreadId id2) + { + return id1.h < id2.h; + } + template + friend std::basic_ostream & operator<<( + std::basic_ostream & ost, + ThreadId id) + { + return ost << id.h; + } + +private: + friend class rclcpp::Thread; + friend ThreadId get_id() noexcept; + friend struct std::hash; + explicit ThreadId(pthread_t h) + : h(h) {} + pthread_t h; +}; + +ThreadId get_id() noexcept +{ + return ThreadId{pthread_self()}; +} + +bool operator!=(ThreadId id1, ThreadId id2) +{ + return !(id1 == id2); +} + +bool operator>(ThreadId id1, ThreadId id2) +{ + return id2 < id1; +} + +bool operator<=(ThreadId id1, ThreadId id2) +{ + return !(id1 > id2); +} + +bool operator>=(ThreadId id1, ThreadId id2) +{ + return !(id1 < id2); +} + +} // namespace thread_id_ns + +using thread_id_ns::ThreadId; +using thread_id_ns::operator==; +using thread_id_ns::operator!=; +using thread_id_ns::operator<; // NOLINT +using thread_id_ns::operator>; // NOLINT +using thread_id_ns::operator<=; +using thread_id_ns::operator>=; +using thread_id_ns::operator<<; + +} // namespace detail + +namespace this_thread +{ + +using detail::thread_id_ns::get_id; + +} // namespace this_thread + +} // namespace rclcpp + +namespace std +{ + +template<> +struct hash +{ + std::size_t operator()(rclcpp::detail::thread_id_ns::ThreadId id) + { + return id.h; + } +}; + +} // namespace std + +#endif // RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/utilities.hpp b/rclcpp/include/rclcpp/threads/posix/utilities.hpp new file mode 100644 index 0000000000..4b648db276 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/utilities.hpp @@ -0,0 +1,42 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__UTILITIES_HPP_ +#define RCLCPP__THREADS__POSIX__UTILITIES_HPP_ + +#include + +namespace rclcpp +{ + +namespace detail +{ + +namespace +{ + +inline void throw_if_error(int r, char const * msg) +{ + if (r != 0) { + throw std::system_error(r, std::system_category(), msg); + } +} + +} // namespace + +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__UTILITIES_HPP_ diff --git a/rclcpp/include/rclcpp/threads/std/thread.hpp b/rclcpp/include/rclcpp/threads/std/thread.hpp new file mode 100644 index 0000000000..3ce0b9e188 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/std/thread.hpp @@ -0,0 +1,145 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__STD__THREAD_HPP_ +#define RCLCPP__THREADS__STD__THREAD_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/threads/std/thread_attribute.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +struct Thread +{ + using NativeHandleType = std::thread::native_handle_type; + using Attribute = detail::ThreadAttribute; + using Id = std::thread::id; + + Thread() noexcept + : thread_{} + {} + Thread(Thread && other) + : thread_{} + { + swap(other); + } + template, Attribute>::value>> + explicit Thread(F && f, Args && ... args) + : thread_(std::forward(f), std::forward(args)...) + {} + template + Thread(Attribute & attr, F && f, Args && ... args) + : thread_(std::forward(f), std::forward(args)...) + { + if (attr.set_unavailable_items_) { + throw std::runtime_error("std::thread can't set thread attribute"); + } + if (attr.get_run_as_detached()) { + thread_.detach(); + } + } + Thread(Thread const &) = delete; + ~Thread() {} + + Thread & operator=(Thread && other) noexcept + { + swap(other); + return *this; + } + + Thread & operator=(Thread const &) = delete; + + void swap(Thread & other) + { + using std::swap; + swap(thread_, other.thread_); + } + + void join() + { + thread_.join(); + thread_ = std::thread{}; + } + + bool joinable() const noexcept + { + return thread_.joinable(); + } + + void detach() + { + thread_.detach(); + thread_ = std::thread{}; + } + + NativeHandleType native_handle() + { + return thread_.native_handle(); + } + + Id get_id() const noexcept + { + return thread_.get_id(); + } + + static int hardware_concurrency() noexcept + { + return std::thread::hardware_concurrency(); + } + +private: + std::thread thread_; +}; + +inline void swap(Thread & t1, Thread & t2) +{ + t1.swap(t2); +} + +namespace this_thread +{ + +template +void run_with_thread_attribute(Thread::Attribute & attr, F && f, Args && ... args) +{ + static_assert( + !std::is_member_object_pointer_v>, + "F is a pointer to member, that is ineffective on thread"); + + if (attr.set_unavailable_items_) { + throw std::runtime_error("std::thread can't set thread attribute"); + } + + std::invoke(f, std::forward(args)...); +} + +} // namespace this_thread + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__STD__THREAD_HPP_ diff --git a/rclcpp/include/rclcpp/threads/std/thread_attribute.hpp b/rclcpp/include/rclcpp/threads/std/thread_attribute.hpp new file mode 100644 index 0000000000..ef0da283be --- /dev/null +++ b/rclcpp/include/rclcpp/threads/std/thread_attribute.hpp @@ -0,0 +1,165 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__STD__THREAD_ATTRIBUTE_HPP_ +#define RCLCPP__THREADS__STD__THREAD_ATTRIBUTE_HPP_ + +#include +#include +#include + +#include "rcl_yaml_param_parser/types.h" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +struct Thread; + +namespace detail +{ +struct ThreadAttribute; +} // namespace detail + +namespace this_thread +{ +template +void run_with_thread_attribute( + detail::ThreadAttribute & attr, F && f, Args && ... args); +} // namespace this_thread + +namespace detail +{ + +struct CpuSet +{ + using NativeCpuSetType = std::size_t; + CpuSet() {} + explicit CpuSet(std::size_t) {} + CpuSet(const CpuSet &) {} + CpuSet & operator=(const CpuSet &) + { + return *this; + } + CpuSet(CpuSet &&) = delete; + CpuSet & operator=(CpuSet &&) = delete; + ~CpuSet() {} + void set(std::size_t) {} + void unset(std::size_t) {} + void clear() {} + bool is_set(std::size_t) + { + return false; + } + std::size_t get_max_processors() const + { + return 0; + } + NativeCpuSetType native_cpu_set() const + { + return 0; + } +}; + +struct ThreadAttribute +{ + using PriorityType = int; + + ThreadAttribute() + : set_unavailable_items_(false) {} + + ThreadAttribute(const ThreadAttribute &) = default; + ThreadAttribute(ThreadAttribute &&) = default; + ThreadAttribute & operator=(const ThreadAttribute &) = default; + ThreadAttribute & operator=(ThreadAttribute &&) = default; + + ThreadAttribute & set_affinity(CpuSet &) + { + set_unavailable_items_ = true; + return *this; + } + CpuSet get_affinity() + { + return CpuSet{}; + } + + ThreadAttribute & set_stack_size(std::size_t) + { + set_unavailable_items_ = true; + return *this; + } + std::size_t get_stack_size() const + { + return 0; + } + + ThreadAttribute & set_priority(int prio) + { + (void)prio; + set_unavailable_items_ = true; + return *this; + } + int get_priority() const + { + return 0; + } + + ThreadAttribute & set_run_as_detached(bool detach) + { + run_as_detached_ = detach; + return *this; + } + bool get_run_as_detached() const + { + return run_as_detached_; + } + + ThreadAttribute & set_name(std::string const &) + { + set_unavailable_items_ = true; + return *this; + } + const char * get_name() const + { + return ""; + } + + void + set_thread_attribute( + const rcl_thread_attr_t &) + { + set_unavailable_items_ = true; + } + + void swap( + ThreadAttribute & other) + { + std::swap(*this, other); + } + +private: + friend struct rclcpp::Thread; + template + friend void this_thread::run_with_thread_attribute( + ThreadAttribute & attr, F && f, Args && ... args); + + bool set_unavailable_items_; + bool run_as_detached_; +}; + +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__STD__THREAD_ATTRIBUTE_HPP_ diff --git a/rclcpp/include/rclcpp/threads/windows/thread.hpp b/rclcpp/include/rclcpp/threads/windows/thread.hpp new file mode 100644 index 0000000000..49e6134fcf --- /dev/null +++ b/rclcpp/include/rclcpp/threads/windows/thread.hpp @@ -0,0 +1,22 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__WINDOWS__THREAD_HPP_ +#define RCLCPP__THREADS__WINDOWS__THREAD_HPP_ + +// Not implemented so far. +// The windows specific code will be implemented +// while discussing the scheduling parameter passing feature at Real-time WG. + +#endif // RCLCPP__THREADS__WINDOWS__THREAD_HPP_ diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 507d47f913..e835ef4af9 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -23,6 +23,7 @@ #include "rclcpp/logging.hpp" #include "rclcpp/utilities.hpp" +#include "rclcpp/threads.hpp" using rclcpp::executors::MultiThreadedExecutor; @@ -33,13 +34,34 @@ MultiThreadedExecutor::MultiThreadedExecutor( std::chrono::nanoseconds next_exec_timeout) : rclcpp::Executor(options), yield_before_execute_(yield_before_execute), - next_exec_timeout_(next_exec_timeout) + next_exec_timeout_(next_exec_timeout), + thread_attributes_(nullptr) { + bool has_number_of_threads_arg = number_of_threads > 0; + rcl_ret_t ret; + number_of_threads_ = number_of_threads > 0 ? number_of_threads : std::max(std::thread::hardware_concurrency(), 2U); - if (number_of_threads_ == 1) { + ret = rcl_arguments_get_thread_attrs( + &options.context->get_rcl_context()->global_arguments, + &thread_attributes_); + if (ret != RCL_RET_OK) { + ret = rcl_context_get_thread_attrs( + options.context->get_rcl_context().get(), + &thread_attributes_); + } + + if (has_number_of_threads_arg && thread_attributes_ && + thread_attributes_->num_attributes != number_of_threads) + { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "The number of threads argument passed to the MultiThreadedExecutor" + " is different from the number of thread attributes.\n" + "The executor runs using the thread attributes and ignores the former."); + } else if (number_of_threads_ == 1) { RCLCPP_WARN( rclcpp::get_logger("rclcpp"), "MultiThreadedExecutor is used with a single thread.\n" @@ -56,17 +78,35 @@ MultiThreadedExecutor::spin() throw std::runtime_error("spin() called while already spinning"); } RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - std::vector threads; + std::vector threads; size_t thread_id = 0; - { - std::lock_guard wait_lock{wait_mutex_}; - for (; thread_id < number_of_threads_ - 1; ++thread_id) { - auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); - threads.emplace_back(func); + + if (thread_attributes_) { + rclcpp::detail::ThreadAttribute thread_attr; + { + std::lock_guard wait_lock{wait_mutex_}; + for (; thread_id < thread_attributes_->num_attributes - 1; ++thread_id) { + thread_attr.set_thread_attribute( + thread_attributes_->attributes[thread_id]); + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + threads.emplace_back(rclcpp::Thread(thread_attr, func)); + } + } + thread_attr.set_thread_attribute( + thread_attributes_->attributes[thread_id]); + this_thread::run_with_thread_attribute( + thread_attr, &MultiThreadedExecutor::run, this, thread_id); + } else { + { + std::lock_guard wait_lock{wait_mutex_}; + for (; thread_id < number_of_threads_ - 1; ++thread_id) { + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + threads.emplace_back(func); + } } + run(thread_id); } - run(thread_id); for (auto & thread : threads) { thread.join(); } @@ -75,7 +115,11 @@ MultiThreadedExecutor::spin() size_t MultiThreadedExecutor::get_number_of_threads() { - return number_of_threads_; + if (thread_attributes_) { + return thread_attributes_->num_attributes; + } else { + return number_of_threads_; + } } void diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index e7f311c147..6c688851e6 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -16,16 +16,49 @@ #include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/any_executable.hpp" +#include "rclcpp/threads.hpp" using rclcpp::executors::SingleThreadedExecutor; SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::ExecutorOptions & options) -: rclcpp::Executor(options) {} +: rclcpp::Executor(options), + thread_attributes_(nullptr) +{ + rcl_ret_t ret; + + ret = rcl_arguments_get_thread_attrs( + &options.context->get_rcl_context()->global_arguments, + &thread_attributes_); + if (ret != RCL_RET_OK) { + ret = rcl_context_get_thread_attrs( + options.context->get_rcl_context().get(), + &thread_attributes_); + } + if (thread_attributes_ && thread_attributes_->num_attributes != 1) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Specified thread attributes contains multiple configurations.\n" + "The executor runs only using first configuration."); + } +} SingleThreadedExecutor::~SingleThreadedExecutor() {} void SingleThreadedExecutor::spin() +{ + if (thread_attributes_) { + rclcpp::detail::ThreadAttribute thread_attr; + thread_attr.set_thread_attribute( + thread_attributes_->attributes[0]); + rclcpp::this_thread::run_with_thread_attribute(thread_attr, &SingleThreadedExecutor::run, this); + } else { + run(); + } +} + +void +SingleThreadedExecutor::run() { if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); diff --git a/rclcpp/src/rclcpp/threads/posix_thread.cpp b/rclcpp/src/rclcpp/threads/posix_thread.cpp new file mode 100644 index 0000000000..7599aa499f --- /dev/null +++ b/rclcpp/src/rclcpp/threads/posix_thread.cpp @@ -0,0 +1,171 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +#include "rclcpp/threads/posix/thread.hpp" +#include "rclcpp/threads/posix/utilities.hpp" + +static void set_pthread_attr(pthread_attr_t & native_attr, rclcpp::Thread::Attribute const & attr); +static void * thread_main(void * p); + +namespace rclcpp +{ + +Thread::Thread(Attribute const * attr, ThreadFuncBase::UniquePtr func) +: handle_(NativeHandleType{}), name_(attr ? attr->get_name() : std::string{}) +{ + Attribute::NativeAttributeType native_attr; + int r = pthread_attr_init(&native_attr); + detail::throw_if_error(r, "Error in pthread_attr_init "); + + if (attr != nullptr) { + set_pthread_attr(native_attr, *attr); + } + + NativeHandleType h; + r = pthread_create(&h, &native_attr, thread_main, func.get()); + detail::throw_if_error(r, "Error in pthread_create "); + + if (attr == nullptr || !attr->get_run_as_detached()) { + this->handle_ = h; + } + + pthread_attr_destroy(&native_attr); + + func.release(); +} + +void Thread::apply_attr(Attribute const & attr) +{ + int r; + int policy = attr.get_sched_policy(); +#if __linux__ + if (policy != SCHED_FIFO && policy != SCHED_RR && policy != SCHED_OTHER) { + sched_param param; + param.sched_priority = attr.get_priority(); + r = pthread_setschedparam(pthread_self(), policy, ¶m); + detail::throw_if_error(r, "Error in pthread_setschedparam "); + } +#endif // #if __linux__ +} + +namespace detail +{ + +ThreadAttribute::ThreadAttribute() +{ + NativeAttributeType attr; + int r; + + r = pthread_attr_init(&attr); + throw_if_error(r, "Error in pthread_attr_init "); + + r = pthread_attr_getschedpolicy(&attr, &sched_policy_); + throw_if_error(r, "Error in pthread_attr_getschedpolicy "); + + r = pthread_attr_getstacksize(&attr, &stack_size_); + throw_if_error(r, "Error in pthread_attr_getstacksize "); + + sched_param param; + r = pthread_attr_getschedparam(&attr, ¶m); + throw_if_error(r, "Error in pthread_attr_getschedparam "); + priority_ = param.sched_priority; + + int flag; + r = pthread_attr_getdetachstate(&attr, &flag); + throw_if_error(r, "Error in pthread_attr_getdetachstate "); + detached_flag_ = (flag == PTHREAD_CREATE_DETACHED); + + pthread_attr_destroy(&attr); +} + + +void apply_attr_to_current_thread(ThreadAttribute const & attr) +{ + int r; + +#if __linux__ + CpuSet cpu_set = attr.get_affinity(); + CpuSet::NativeCpuSetType * native_cpu_set = cpu_set.native_cpu_set(); + if (native_cpu_set) { + std::size_t alloc_size = cpu_set.alloc_size(); + r = pthread_setaffinity_np(pthread_self(), alloc_size, native_cpu_set); + throw_if_error(r, "Error in sched_setaffinity "); + } +#endif // #if __linux__ + + sched_param param; + param.sched_priority = attr.get_priority(); + int policy = attr.get_sched_policy(); + r = pthread_setschedparam(pthread_self(), policy, ¶m); + throw_if_error(r, "Error in sched_setscheduler"); +} + +} // namespace detail + +} // namespace rclcpp + +static void * thread_main(void * p) +{ + using rclcpp::detail::ThreadFuncBase; + ThreadFuncBase::UniquePtr func(reinterpret_cast(p)); + + try { + func->run(); + } catch (...) { + std::cerr << "failed to run thread" << std::endl; + std::terminate(); + } + + return nullptr; +} + +static void set_pthread_attr(pthread_attr_t & native_attr, rclcpp::Thread::Attribute const & attr) +{ + int r; + using rclcpp::detail::throw_if_error; + +#if defined(__linux__) + rclcpp::detail::CpuSet affinity = attr.get_affinity(); + size_t cpu_size = CPU_ALLOC_SIZE(static_cast(sysconf(_SC_NPROCESSORS_ONLN))); + r = pthread_attr_setaffinity_np(&native_attr, cpu_size, affinity.native_cpu_set()); + throw_if_error(r, "Error in pthread_attr_setaffinity_np "); +#endif + + std::size_t stack_size = attr.get_stack_size(); + r = pthread_attr_setstacksize(&native_attr, stack_size); + throw_if_error(r, "Error in pthread_attr_setstacksize "); + + int flag = attr.get_run_as_detached() ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE; + r = pthread_attr_setdetachstate(&native_attr, flag); + throw_if_error(r, "Error in pthread_attr_setdetachstate "); + + int sched_policy = attr.get_sched_policy(); + if (sched_policy == SCHED_FIFO || sched_policy == SCHED_RR) { + r = pthread_attr_setinheritsched(&native_attr, PTHREAD_EXPLICIT_SCHED); + throw_if_error(r, "Error in pthread_attr_setinheritsched "); + + r = pthread_attr_setschedpolicy(&native_attr, sched_policy); + throw_if_error(r, "Error in pthread_attr_setschedpolicy "); + + sched_param param; + param.sched_priority = attr.get_priority(); + r = pthread_attr_setschedparam(&native_attr, ¶m); + throw_if_error(r, "Error in pthread_attr_setschedparam "); + } +} diff --git a/rclcpp/src/rclcpp/threads/windows_thread.cpp b/rclcpp/src/rclcpp/threads/windows_thread.cpp new file mode 100644 index 0000000000..2f6ca50ec7 --- /dev/null +++ b/rclcpp/src/rclcpp/threads/windows_thread.cpp @@ -0,0 +1,19 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/threads/windows/thread.hpp" + +// Not implemented so far. +// The windows specific code will be implemented +// while discussing the scheduling parameter passing feature at Real-time WG.