Skip to content

Commit

Permalink
Add spinner and concurrency module (#35)
Browse files Browse the repository at this point in the history
# Description
Added a spinner that can be used for running applications using the ipc
module.

## Type of change

- New feature (non-breaking change which adds functionality)

## Checklist before requesting a review
- [x] I have performed a self-review of my code.
- [x] If it is a core feature, I have added thorough tests.
- [x] If this is a new component I have added examples.
- [x] I updated the README and related documentation.

---------

Co-authored-by: Filippo Brizzi <[email protected]>
  • Loading branch information
floriantschopp and filippobrizzi authored Mar 26, 2024
1 parent 85be4b5 commit ad57535
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 3 deletions.
30 changes: 30 additions & 0 deletions modules/concurrency/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# =================================================================================================
# Copyright (C) 2023-2024 HEPHAESTUS Contributors
# =================================================================================================

declare_module(
NAME concurrency
DEPENDS_ON_MODULES base
DEPENDS_ON_EXTERNAL_PROJECTS absl fmt
)

find_package(absl REQUIRED)
find_package(fmt REQUIRED)

# library sources
set(SOURCES src/spinner.cpp README.md include/hephaestus/concurrency/spinner.h)

# library target
define_module_library(
NAME concurrency
PUBLIC_LINK_LIBS absl::log hephaestus::base fmt::fmt
PRIVATE_LINK_LIBS ""
SOURCES ${SOURCES}
PUBLIC_INCLUDE_PATHS $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:include>
PRIVATE_INCLUDE_PATHS ""
SYSTEM_PRIVATE_INCLUDE_PATHS ""
)

# Subprojects
add_subdirectory(tests)
add_subdirectory(examples)
21 changes: 21 additions & 0 deletions modules/concurrency/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Concurrency Module

The Concurrency module provides a set of classes and functions to help with multithreaded programming. It includes utilities for managing threads, synchronizing data access, and handling asynchronous tasks.

## Spinner Component

One of the key components of the Concurrency module is the `Spinner` class. The `Spinner` class provides a simple way to run a task in a separate thread and stop it when necessary.

### Usage

To use the `Spinner` class, you need to create a subclass and implement the `spinOnce` method. This method will be called repeatedly in a separate thread when the spinner is started.
Check the [example](examples/spinner_example.cpp) for more details.

### Stopping the Spinner

The `Spinner` class provides a `stop` method to stop the spinner. You can also add a stop callback that will be called when the spinner is stopped.
Check the [test](tests/spinner_tests.cpp) for more details.

### Thread Safety

The `Spinner` class is thread-safe. You can safely call `start`, `stop`, and `addStopCallback` from multiple threads without additional synchronization.
10 changes: 10 additions & 0 deletions modules/concurrency/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#=================================================================================================
# Copyright (C) 2023-2024 HEPHAESTUS Contributors
#=================================================================================================

define_module_example(
NAME spinner_example
SOURCES spinner_example.cpp
PUBLIC_INCLUDE_PATHS
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>
PUBLIC_LINK_LIBS "")
52 changes: 52 additions & 0 deletions modules/concurrency/examples/spinner_example.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//=================================================================================================
// Copyright (C) 2023-2024 HEPHAESTUS Contributors
//=================================================================================================

#include <atomic>
#include <csignal>

#include <fmt/core.h>

#include "hephaestus/concurrency/spinner.h"

class TestSpinner : public heph::concurrency::Spinner {
public:
std::atomic<int> counter = 0;

protected:
void spinOnce() override {
fmt::println("Spinning once. Counter: {}", counter.load());
++counter;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT
}
};

// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
std::atomic_flag stop_called = ATOMIC_FLAG_INIT;
void handleSigint(int signal) {
if (signal == SIGINT) {
fmt::println("Stop called.");
stop_called.test_and_set();
stop_called.notify_all();
}
}

auto main() -> int {
try {
TestSpinner spinner;
std::ignore = std::signal(SIGINT, handleSigint);

spinner.start();

// Wait until stop_called is set
stop_called.wait(false);

auto future = spinner.stop();
future.get();
} catch (const std::exception& ex) {
fmt::println("{}", ex.what());
return EXIT_FAILURE;
}

return EXIT_SUCCESS;
}
38 changes: 38 additions & 0 deletions modules/concurrency/include/hephaestus/concurrency/spinner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//=================================================================================================
// Copyright (C) 2023-2024 HEPHAESTUS Contributors
//=================================================================================================

#pragma once

#include <atomic>
#include <functional>
#include <future>
#include <thread>

namespace heph::concurrency {

class Spinner {
public:
Spinner();
virtual ~Spinner();
Spinner(const Spinner&) = delete;
auto operator=(const Spinner&) -> Spinner& = delete;
Spinner(Spinner&&) = delete;
auto operator=(Spinner&&) -> Spinner& = delete;

void start();
auto stop() -> std::future<void>;
virtual void spinOnce() = 0; // Pure virtual function
void addStopCallback(std::function<void()> callback);

private:
void spin();
void stopImpl();

private:
std::atomic_bool is_started_ = false;
std::atomic_bool stop_requested_ = false;
std::function<void()> stop_callback_;
std::thread spinner_thread_;
};
} // namespace heph::concurrency
62 changes: 62 additions & 0 deletions modules/concurrency/src/spinner.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//=================================================================================================
// Copyright (C) 2023-2024 HEPHAESTUS Contributors
//=================================================================================================

#include "hephaestus/concurrency/spinner.h"

#include <future>

#include <absl/log/log.h>
#include <fmt/format.h>

#include "hephaestus/base/exception.h"

namespace heph::concurrency {
Spinner::Spinner() : is_started_(false), stop_requested_(false) {
}

Spinner::~Spinner() {
if (is_started_.load() || spinner_thread_.joinable()) {
LOG(FATAL) << "Spinner is still running. Call stop() before destroying the object.";
std::terminate();
}
}

void Spinner::start() {
throwExceptionIf<InvalidOperationException>(is_started_.load(), fmt::format("Spinner is already started."));

// NOTE: Replace with std::stop_token and std::jthread when clang supports it.
spinner_thread_ = std::thread([this]() { spin(); });

is_started_.store(true);
}

void Spinner::spin() {
while (!stop_requested_.load()) {
spinOnce();
}
}

auto Spinner::stop() -> std::future<void> {
throwExceptionIf<InvalidOperationException>(!is_started_.load(),
fmt::format("Spinner not yet started, cannot stop."));
stop_requested_.store(true);

return std::async(std::launch::async, [this]() { stopImpl(); });
}

void Spinner::stopImpl() {
spinner_thread_.join();

if (stop_callback_) {
stop_callback_();
}

is_started_.store(false);
}

void Spinner::addStopCallback(std::function<void()> callback) {
stop_callback_ = std::move(callback);
}

} // namespace heph::concurrency
10 changes: 10 additions & 0 deletions modules/concurrency/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#=================================================================================================
# Copyright (C) 2023-2024 HEPHAESTUS Contributors
#=================================================================================================

define_module_test(
NAME spinner_tests
SOURCES spinner_tests.cpp
PUBLIC_INCLUDE_PATHS
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>
PUBLIC_LINK_LIBS "")
59 changes: 59 additions & 0 deletions modules/concurrency/tests/spinner_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//=================================================================================================
// Copyright (C) 2023-2024 HEPHAESTUS Contributors
//=================================================================================================
#include "gtest/gtest.h"
#include "hephaestus/base/exception.h"
#include "hephaestus/concurrency/spinner.h"

class TestSpinner : public heph::concurrency::Spinner {
public:
std::atomic<int> counter = 0;

protected:
void spinOnce() override {
++counter;
// simulate work
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // NOLINT
}
};

TEST(SpinnerTest, StartStopTest) {
TestSpinner spinner;

EXPECT_THROW(spinner.stop(), heph::InvalidOperationException);
spinner.start();

EXPECT_THROW(spinner.start(), heph::InvalidOperationException);
auto future = spinner.stop();
future.get();

EXPECT_THROW(spinner.stop(), heph::InvalidOperationException);
}

TEST(SpinnerTest, SpinTest) {
TestSpinner spinner;

spinner.start();

// Wait for a while to let the spinner do some work.
std::this_thread::sleep_for(std::chrono::seconds(1));
auto future = spinner.stop();
future.get();

// The counter should have been incremented.
EXPECT_GT(spinner.counter.load(), 0);
}

TEST(SpinnerTest, StopCallbackTest) {
TestSpinner spinner;
std::atomic<bool> callback_called(false);

spinner.addStopCallback([&]() { callback_called.store(true); });

spinner.start();
auto future = spinner.stop();
future.get();

// The callback should have been called.
EXPECT_TRUE(callback_called.load());
}
6 changes: 3 additions & 3 deletions modules/utils/src/version_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ static constexpr std::uint8_t VERSION_MAJOR = 0;
static constexpr std::uint8_t VERSION_MINOR = 0;
static constexpr std::uint16_t VERSION_PATCH = 1;

static constexpr std::string_view REPO_BRANCH = "zenohc_fix_carg_version";
static constexpr std::string_view BUILD_PROFILE = "Release";
static constexpr std::string_view REPO_HASH = "5f5c307";
static constexpr std::string_view REPO_BRANCH = "feature/spinner";
static constexpr std::string_view BUILD_PROFILE = "RelWithDebInfo";
static constexpr std::string_view REPO_HASH = "6b4f737";

} // namespace heph::utils

0 comments on commit ad57535

Please sign in to comment.