Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Spinner manager #198

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions modules/concurrency/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@ find_package(absl REQUIRED)
find_package(fmt REQUIRED)

# library sources
set(SOURCES src/message_queue_consumer.cpp src/spinner.cpp README.md
include/hephaestus/concurrency/message_queue_consumer.h include/hephaestus/concurrency/spinner.h
set(SOURCES
src/message_queue_consumer.cpp
src/spinner.cpp
src/spinner_manager.cpp
README.md
include/hephaestus/concurrency/message_queue_consumer.h
include/hephaestus/concurrency/spinner.h
include/hephaestus/concurrency/spinner_manager.h
)

# library target
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <chrono>
#include <functional>
#include <future>
#include <thread>

namespace heph::concurrency {

Expand Down Expand Up @@ -52,6 +51,7 @@ class Spinner {

std::atomic_bool stop_requested_ = false;
std::future<void> async_spinner_handle_;
std::atomic_flag spinner_completed_ = ATOMIC_FLAG_INIT;

std::chrono::microseconds spin_period_;
std::chrono::system_clock::time_point start_timestamp_;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//=================================================================================================
// Copyright (C) 2023-2024 HEPHAESTUS Contributors
//=================================================================================================

#pragma once

#include <future>
#include <vector>

#include "hephaestus/concurrency/spinner.h"

namespace heph::concurrency {

/// @brief SpinnersManager allows to orchestrate the execution of multiple spinners.
/// The main feature is `waitAny` which allows to unblock as soon as one of the spinners is done.
/// This allows to catch errors in spinner execution as soon as possible and stop the others.
/// NOTE: right now we do not have any concept of error for the spinners: we cannot know if a spinner
/// terminated with an error or not. If an exception is thrown inside the runner, it will be re-throwed when
/// we call runner.stop().get(). We leave it to the user to handle it outside of the runner manager.
/// NOTE: this logic is quite generic and can be extended to any type of object that has `wait()` and `stop()`
/// methods. To be faitful to the principle of implement only what you need now, we limit the scope to
/// spinners and consider to expand the scope when an use case arises.
class SpinnersManager {
public:
explicit SpinnersManager(std::vector<Spinner*> spinners);

void startAll();
void waitAll();
void waitAny();
Comment on lines +28 to +29
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void waitAll();
void waitAny();
/// @brief blocks until all spinners are finished. If a single spinner throws an exception, it will not be propagated as the other spinners are still blocking.
void waitAll();
/// @brief wait until any spinner terminates or throws an exception, which allows for immediate exception handling.
void waitAny();

The naming is clear, but given this is somewhat complex async behavior, I would add these two lines. I'm not sure waitAll is useful tbh, as it only is useful if the spinner terminates on its own AND does never throw, which seems a non-existing case. While it is elegant and adds completeness, I feel like removing it.

void stopAll();

private:
std::vector<Spinner*> spinners_;

std::vector<std::future<void>> wait_futures_;
};

} // namespace heph::concurrency
43 changes: 29 additions & 14 deletions modules/concurrency/src/spinner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,31 +54,42 @@ Spinner::~Spinner() {
void Spinner::start() {
throwExceptionIf<InvalidOperationException>(async_spinner_handle_.valid(), "Spinner is already started.");

stop_requested_.store(false);
spinner_completed_.clear();
async_spinner_handle_ = std::async(std::launch::async, [this]() mutable { spin(); });
}

void Spinner::spin() {
// TODO: set thread name

start_timestamp_ = std::chrono::system_clock::now();
try {
start_timestamp_ = std::chrono::system_clock::now();

while (!stop_requested_.load()) {
const auto spin_result = stoppable_callback_();
while (!stop_requested_.load()) {
const auto spin_result = stoppable_callback_();

++spin_count_;
++spin_count_;

if (spin_result == SpinResult::STOP) {
break;
}
if (spin_result == SpinResult::STOP) {
break;
}

if (spin_period_.count() == 0) {
continue;
}
if (spin_period_.count() == 0) {
continue;
}

const auto target_timestamp = start_timestamp_ + spin_count_ * spin_period_;
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait_until(lock, target_timestamp);
const auto target_timestamp = start_timestamp_ + spin_count_ * spin_period_;
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait_until(lock, target_timestamp);
}
} catch (std::exception& e) {
spinner_completed_.test_and_set();
spinner_completed_.notify_all();
throw; // TODO(@filippo) consider if we want to handle this error in a different way.
}

spinner_completed_.test_and_set();
spinner_completed_.notify_all();
}

auto Spinner::stop() -> std::future<void> {
Expand All @@ -91,7 +102,11 @@ auto Spinner::stop() -> std::future<void> {
}

void Spinner::wait() {
async_spinner_handle_.wait();
if (!async_spinner_handle_.valid()) {
return;
}

spinner_completed_.wait(false);
}

auto Spinner::spinCount() const -> uint64_t {
Expand Down
56 changes: 56 additions & 0 deletions modules/concurrency/src/spinner_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//=================================================================================================
// Copyright (C) 2023-2024 HEPHAESTUS Contributors
//=================================================================================================

#include "hephaestus/concurrency/spinner_manager.h"

#include <algorithm>
#include <atomic>
#include <future>
#include <utility>
#include <vector>

#include <fmt/core.h>

#include "hephaestus/concurrency/spinner.h"

namespace heph::concurrency {
SpinnersManager::SpinnersManager(std::vector<Spinner*> spinners) : spinners_{ std::move(spinners) } {
}

void SpinnersManager::startAll() {
std::ranges::for_each(spinners_, [](auto* runner) { runner->start(); });
}

void SpinnersManager::waitAll() {
std::ranges::for_each(spinners_, [](auto* runner) { runner->wait(); });
}

void SpinnersManager::waitAny() {
std::atomic_flag flag = ATOMIC_FLAG_INIT;
wait_futures_.reserve(spinners_.size());
for (auto* runner : spinners_) {
auto future = std::async(std::launch::async, [runner, &flag]() {
runner->wait();
flag.test_and_set();
flag.notify_all();
});
wait_futures_.push_back(std::move(future));
}

flag.wait(false);
}

void SpinnersManager::stopAll() {
std::vector<std::future<void>> futures;
futures.reserve(spinners_.size());

std::ranges::for_each(spinners_,
[&futures](auto* runner) { futures.push_back(std::move(runner->stop())); });

// If we called `waitAny`, we need to wait for the remaining futures.
std::ranges::for_each(wait_futures_, [](auto& future) { future.get(); });

std::ranges::for_each(futures, [](auto& f) { f.get(); });
}
} // namespace heph::concurrency
7 changes: 7 additions & 0 deletions modules/concurrency/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@ define_module_test(
PUBLIC_INCLUDE_PATHS
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>
PUBLIC_LINK_LIBS "")

define_module_test(
NAME spinner_manager_tests
SOURCES spinner_manager_tests.cpp
PUBLIC_INCLUDE_PATHS
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>
PUBLIC_LINK_LIBS "")
97 changes: 97 additions & 0 deletions modules/concurrency/tests/spinner_manager_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//=================================================================================================
// Copyright (C) 2023-2024 HEPHAESTUS Contributors
//=================================================================================================

#include <atomic>
#include <stdexcept>

#include <fmt/core.h>
#include <gtest/gtest.h>

#include "hephaestus/concurrency/spinner.h"
#include "hephaestus/concurrency/spinner_manager.h"

namespace heph::concurrency::tests {

TEST(SpinnersManager, Empty) {
SpinnersManager runner_manager{ {} };
runner_manager.startAll();
runner_manager.waitAll();
runner_manager.stopAll();
}

TEST(SpinnersManager, OneSpinnerSuccessful) {
std::atomic_bool flag = false;
Spinner spinner{ Spinner::StoppableCallback{ [&flag]() -> Spinner::SpinResult {
flag = true;
return Spinner::SpinResult::STOP;
} } };

SpinnersManager runner_manager{ { &spinner } };
runner_manager.startAll();
runner_manager.waitAll();
runner_manager.stopAll();

EXPECT_TRUE(flag);
}

TEST(SpinnersManager, OneSpinnerError) {
Spinner spinner{ []() { throw std::runtime_error("fail"); } };

SpinnersManager runner_manager{ { &spinner } };
runner_manager.startAll();
runner_manager.waitAll();
EXPECT_THROW(runner_manager.stopAll(), std::runtime_error);
}

TEST(SpinnersManager, MultipleSpinnersSuccessful) {
std::atomic_bool flag1 = false;
Spinner spinner1{ Spinner::StoppableCallback{ [&flag1]() -> Spinner::SpinResult {
flag1 = true;
return Spinner::SpinResult::STOP;
} } };

std::atomic_bool flag2 = false;
Spinner spinner2{ Spinner::StoppableCallback{ [&flag2]() -> Spinner::SpinResult {
flag2 = true;
return Spinner::SpinResult::STOP;
} } };

SpinnersManager runner_manager{ { &spinner1, &spinner2 } };
runner_manager.startAll();
runner_manager.waitAll();
runner_manager.stopAll();

EXPECT_TRUE(flag1);
EXPECT_TRUE(flag2);
}

TEST(SpinnersManager, MultipleSpinnersWaitAny) {
Spinner spinner1{ []() {} }; // Run indefinitely until stopped

std::atomic_bool flag = false;
Spinner spinner2{ Spinner::StoppableCallback{ [&flag]() -> Spinner::SpinResult {
flag = true;
return Spinner::SpinResult::STOP;
} } };

SpinnersManager runner_manager{ { &spinner1, &spinner2 } };
runner_manager.startAll();
runner_manager.waitAny();
runner_manager.stopAll();

EXPECT_TRUE(flag);
}

TEST(SpinnersManager, MultipleSpinnersOneError) {
Spinner spinner1{ []() {} }; // Run indefinitely until stopped

Spinner spinner2{ []() { throw std::runtime_error("fail"); } };

SpinnersManager runner_manager{ { &spinner1, &spinner2 } };
runner_manager.startAll();
runner_manager.waitAny();
EXPECT_THROW(runner_manager.stopAll(), std::runtime_error);
}

} // namespace heph::concurrency::tests
16 changes: 16 additions & 0 deletions modules/concurrency/tests/spinner_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,20 @@ TEST(SpinnerTest, ExceptionHandling) {
EXPECT_THROW(spinner.stop().get(), heph::InvalidOperationException);
}

TEST(SpinnerTest, SpinStartAfterStop) {
size_t callback_called_counter = 0;
Spinner spinner(TestFixture::stoppingCallback(callback_called_counter));

spinner.start();
spinner.wait();
spinner.stop().get();
EXPECT_EQ(callback_called_counter, 10);

callback_called_counter = 0;
spinner.start();
spinner.wait();
spinner.stop().get();
EXPECT_EQ(callback_called_counter, 10);
}

} // namespace heph::concurrency::tests
3 changes: 3 additions & 0 deletions modules/utils/include/hephaestus/utils/concepts.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,7 @@ concept Waitable = requires(T value) {
{ value.wait() };
};

template <typename T>
concept StoppableAndWaitable = requires { Stoppable<T>&& Waitable<T>; };

} // namespace heph
3 changes: 0 additions & 3 deletions modules/utils/include/hephaestus/utils/signal_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@

namespace heph::utils {

template <typename T>
concept StoppableAndWaitable = requires { Stoppable<T>&& Waitable<T>; };

/// \brief Use this class to block until a signal is received.
/// > NOTE: can be extended to call a generic callback when a signal is received.
/// Usage:
Expand Down
Loading