From ad57535d6fc5b44c180c54e89c38b8afac666ec4 Mon Sep 17 00:00:00 2001 From: Florian Tschopp Date: Tue, 26 Mar 2024 13:19:25 +0100 Subject: [PATCH] Add spinner and concurrency module (#35) # Description Added a spinner that can be used for running applications using the ipc module. ## Type of change - New feature (non-breaking change which adds functionality) ## Checklist before requesting a review - [x] I have performed a self-review of my code. - [x] If it is a core feature, I have added thorough tests. - [x] If this is a new component I have added examples. - [x] I updated the README and related documentation. --------- Co-authored-by: Filippo Brizzi --- modules/concurrency/CMakeLists.txt | 30 +++++++++ modules/concurrency/README.md | 21 +++++++ modules/concurrency/examples/CMakeLists.txt | 10 +++ .../concurrency/examples/spinner_example.cpp | 52 ++++++++++++++++ .../include/hephaestus/concurrency/spinner.h | 38 ++++++++++++ modules/concurrency/src/spinner.cpp | 62 +++++++++++++++++++ modules/concurrency/tests/CMakeLists.txt | 10 +++ modules/concurrency/tests/spinner_tests.cpp | 59 ++++++++++++++++++ modules/utils/src/version_impl.h | 6 +- 9 files changed, 285 insertions(+), 3 deletions(-) create mode 100644 modules/concurrency/CMakeLists.txt create mode 100644 modules/concurrency/README.md create mode 100644 modules/concurrency/examples/CMakeLists.txt create mode 100644 modules/concurrency/examples/spinner_example.cpp create mode 100644 modules/concurrency/include/hephaestus/concurrency/spinner.h create mode 100644 modules/concurrency/src/spinner.cpp create mode 100644 modules/concurrency/tests/CMakeLists.txt create mode 100644 modules/concurrency/tests/spinner_tests.cpp diff --git a/modules/concurrency/CMakeLists.txt b/modules/concurrency/CMakeLists.txt new file mode 100644 index 00000000..7f2bedc6 --- /dev/null +++ b/modules/concurrency/CMakeLists.txt @@ -0,0 +1,30 @@ +# ================================================================================================= +# Copyright (C) 2023-2024 HEPHAESTUS Contributors +# ================================================================================================= + +declare_module( + NAME concurrency + DEPENDS_ON_MODULES base + DEPENDS_ON_EXTERNAL_PROJECTS absl fmt +) + +find_package(absl REQUIRED) +find_package(fmt REQUIRED) + +# library sources +set(SOURCES src/spinner.cpp README.md include/hephaestus/concurrency/spinner.h) + +# library target +define_module_library( + NAME concurrency + PUBLIC_LINK_LIBS absl::log hephaestus::base fmt::fmt + PRIVATE_LINK_LIBS "" + SOURCES ${SOURCES} + PUBLIC_INCLUDE_PATHS $ $ + PRIVATE_INCLUDE_PATHS "" + SYSTEM_PRIVATE_INCLUDE_PATHS "" +) + +# Subprojects +add_subdirectory(tests) +add_subdirectory(examples) diff --git a/modules/concurrency/README.md b/modules/concurrency/README.md new file mode 100644 index 00000000..2702402c --- /dev/null +++ b/modules/concurrency/README.md @@ -0,0 +1,21 @@ +# Concurrency Module + +The Concurrency module provides a set of classes and functions to help with multithreaded programming. It includes utilities for managing threads, synchronizing data access, and handling asynchronous tasks. + +## Spinner Component + +One of the key components of the Concurrency module is the `Spinner` class. The `Spinner` class provides a simple way to run a task in a separate thread and stop it when necessary. + +### Usage + +To use the `Spinner` class, you need to create a subclass and implement the `spinOnce` method. This method will be called repeatedly in a separate thread when the spinner is started. +Check the [example](examples/spinner_example.cpp) for more details. + +### Stopping the Spinner + +The `Spinner` class provides a `stop` method to stop the spinner. You can also add a stop callback that will be called when the spinner is stopped. +Check the [test](tests/spinner_tests.cpp) for more details. + +### Thread Safety + +The `Spinner` class is thread-safe. You can safely call `start`, `stop`, and `addStopCallback` from multiple threads without additional synchronization. diff --git a/modules/concurrency/examples/CMakeLists.txt b/modules/concurrency/examples/CMakeLists.txt new file mode 100644 index 00000000..925cdaa9 --- /dev/null +++ b/modules/concurrency/examples/CMakeLists.txt @@ -0,0 +1,10 @@ +#================================================================================================= +# Copyright (C) 2023-2024 HEPHAESTUS Contributors +#================================================================================================= + +define_module_example( + NAME spinner_example + SOURCES spinner_example.cpp + PUBLIC_INCLUDE_PATHS + $ + PUBLIC_LINK_LIBS "") diff --git a/modules/concurrency/examples/spinner_example.cpp b/modules/concurrency/examples/spinner_example.cpp new file mode 100644 index 00000000..a32204c6 --- /dev/null +++ b/modules/concurrency/examples/spinner_example.cpp @@ -0,0 +1,52 @@ +//================================================================================================= +// Copyright (C) 2023-2024 HEPHAESTUS Contributors +//================================================================================================= + +#include +#include + +#include + +#include "hephaestus/concurrency/spinner.h" + +class TestSpinner : public heph::concurrency::Spinner { +public: + std::atomic counter = 0; + +protected: + void spinOnce() override { + fmt::println("Spinning once. Counter: {}", counter.load()); + ++counter; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT + } +}; + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +std::atomic_flag stop_called = ATOMIC_FLAG_INIT; +void handleSigint(int signal) { + if (signal == SIGINT) { + fmt::println("Stop called."); + stop_called.test_and_set(); + stop_called.notify_all(); + } +} + +auto main() -> int { + try { + TestSpinner spinner; + std::ignore = std::signal(SIGINT, handleSigint); + + spinner.start(); + + // Wait until stop_called is set + stop_called.wait(false); + + auto future = spinner.stop(); + future.get(); + } catch (const std::exception& ex) { + fmt::println("{}", ex.what()); + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} diff --git a/modules/concurrency/include/hephaestus/concurrency/spinner.h b/modules/concurrency/include/hephaestus/concurrency/spinner.h new file mode 100644 index 00000000..0aa91d02 --- /dev/null +++ b/modules/concurrency/include/hephaestus/concurrency/spinner.h @@ -0,0 +1,38 @@ +//================================================================================================= +// Copyright (C) 2023-2024 HEPHAESTUS Contributors +//================================================================================================= + +#pragma once + +#include +#include +#include +#include + +namespace heph::concurrency { + +class Spinner { +public: + Spinner(); + virtual ~Spinner(); + Spinner(const Spinner&) = delete; + auto operator=(const Spinner&) -> Spinner& = delete; + Spinner(Spinner&&) = delete; + auto operator=(Spinner&&) -> Spinner& = delete; + + void start(); + auto stop() -> std::future; + virtual void spinOnce() = 0; // Pure virtual function + void addStopCallback(std::function callback); + +private: + void spin(); + void stopImpl(); + +private: + std::atomic_bool is_started_ = false; + std::atomic_bool stop_requested_ = false; + std::function stop_callback_; + std::thread spinner_thread_; +}; +} // namespace heph::concurrency diff --git a/modules/concurrency/src/spinner.cpp b/modules/concurrency/src/spinner.cpp new file mode 100644 index 00000000..400aef4e --- /dev/null +++ b/modules/concurrency/src/spinner.cpp @@ -0,0 +1,62 @@ +//================================================================================================= +// Copyright (C) 2023-2024 HEPHAESTUS Contributors +//================================================================================================= + +#include "hephaestus/concurrency/spinner.h" + +#include + +#include +#include + +#include "hephaestus/base/exception.h" + +namespace heph::concurrency { +Spinner::Spinner() : is_started_(false), stop_requested_(false) { +} + +Spinner::~Spinner() { + if (is_started_.load() || spinner_thread_.joinable()) { + LOG(FATAL) << "Spinner is still running. Call stop() before destroying the object."; + std::terminate(); + } +} + +void Spinner::start() { + throwExceptionIf(is_started_.load(), fmt::format("Spinner is already started.")); + + // NOTE: Replace with std::stop_token and std::jthread when clang supports it. + spinner_thread_ = std::thread([this]() { spin(); }); + + is_started_.store(true); +} + +void Spinner::spin() { + while (!stop_requested_.load()) { + spinOnce(); + } +} + +auto Spinner::stop() -> std::future { + throwExceptionIf(!is_started_.load(), + fmt::format("Spinner not yet started, cannot stop.")); + stop_requested_.store(true); + + return std::async(std::launch::async, [this]() { stopImpl(); }); +} + +void Spinner::stopImpl() { + spinner_thread_.join(); + + if (stop_callback_) { + stop_callback_(); + } + + is_started_.store(false); +} + +void Spinner::addStopCallback(std::function callback) { + stop_callback_ = std::move(callback); +} + +} // namespace heph::concurrency diff --git a/modules/concurrency/tests/CMakeLists.txt b/modules/concurrency/tests/CMakeLists.txt new file mode 100644 index 00000000..938d3e04 --- /dev/null +++ b/modules/concurrency/tests/CMakeLists.txt @@ -0,0 +1,10 @@ +#================================================================================================= +# Copyright (C) 2023-2024 HEPHAESTUS Contributors +#================================================================================================= + +define_module_test( + NAME spinner_tests + SOURCES spinner_tests.cpp + PUBLIC_INCLUDE_PATHS + $ + PUBLIC_LINK_LIBS "") diff --git a/modules/concurrency/tests/spinner_tests.cpp b/modules/concurrency/tests/spinner_tests.cpp new file mode 100644 index 00000000..4f955d4f --- /dev/null +++ b/modules/concurrency/tests/spinner_tests.cpp @@ -0,0 +1,59 @@ +//================================================================================================= +// Copyright (C) 2023-2024 HEPHAESTUS Contributors +//================================================================================================= +#include "gtest/gtest.h" +#include "hephaestus/base/exception.h" +#include "hephaestus/concurrency/spinner.h" + +class TestSpinner : public heph::concurrency::Spinner { +public: + std::atomic counter = 0; + +protected: + void spinOnce() override { + ++counter; + // simulate work + std::this_thread::sleep_for(std::chrono::milliseconds(10)); // NOLINT + } +}; + +TEST(SpinnerTest, StartStopTest) { + TestSpinner spinner; + + EXPECT_THROW(spinner.stop(), heph::InvalidOperationException); + spinner.start(); + + EXPECT_THROW(spinner.start(), heph::InvalidOperationException); + auto future = spinner.stop(); + future.get(); + + EXPECT_THROW(spinner.stop(), heph::InvalidOperationException); +} + +TEST(SpinnerTest, SpinTest) { + TestSpinner spinner; + + spinner.start(); + + // Wait for a while to let the spinner do some work. + std::this_thread::sleep_for(std::chrono::seconds(1)); + auto future = spinner.stop(); + future.get(); + + // The counter should have been incremented. + EXPECT_GT(spinner.counter.load(), 0); +} + +TEST(SpinnerTest, StopCallbackTest) { + TestSpinner spinner; + std::atomic callback_called(false); + + spinner.addStopCallback([&]() { callback_called.store(true); }); + + spinner.start(); + auto future = spinner.stop(); + future.get(); + + // The callback should have been called. + EXPECT_TRUE(callback_called.load()); +} diff --git a/modules/utils/src/version_impl.h b/modules/utils/src/version_impl.h index 81d4c53a..6c2304ab 100644 --- a/modules/utils/src/version_impl.h +++ b/modules/utils/src/version_impl.h @@ -17,8 +17,8 @@ static constexpr std::uint8_t VERSION_MAJOR = 0; static constexpr std::uint8_t VERSION_MINOR = 0; static constexpr std::uint16_t VERSION_PATCH = 1; -static constexpr std::string_view REPO_BRANCH = "zenohc_fix_carg_version"; -static constexpr std::string_view BUILD_PROFILE = "Release"; -static constexpr std::string_view REPO_HASH = "5f5c307"; +static constexpr std::string_view REPO_BRANCH = "feature/spinner"; +static constexpr std::string_view BUILD_PROFILE = "RelWithDebInfo"; +static constexpr std::string_view REPO_HASH = "6b4f737"; } // namespace heph::utils