From e69714f9304751b6866f79d300d77d17cb32a9ef Mon Sep 17 00:00:00 2001 From: Lorenz Hruby Date: Mon, 21 Oct 2024 18:11:02 +0000 Subject: [PATCH] Implement exception handling during destruction. --- .../concurrency/examples/spinner_example.cpp | 2 +- .../concurrency/message_queue_consumer.h | 12 +++++------ .../include/hephaestus/concurrency/spinner.h | 2 +- modules/concurrency/src/spinner.cpp | 15 +++++++++----- modules/concurrency/tests/spinner_tests.cpp | 20 ++++++++++++------- 5 files changed, 31 insertions(+), 20 deletions(-) diff --git a/modules/concurrency/examples/spinner_example.cpp b/modules/concurrency/examples/spinner_example.cpp index 6801e0ce..9263e1ef 100644 --- a/modules/concurrency/examples/spinner_example.cpp +++ b/modules/concurrency/examples/spinner_example.cpp @@ -15,7 +15,7 @@ static constexpr auto RATE_HZ = 10; class Worker { public: - Worker() : spinner_([this] { doWork(); }, RATE_HZ) { + Worker() : spinner_([this] { return doWork(); }, RATE_HZ) { } void start() { diff --git a/modules/concurrency/include/hephaestus/concurrency/message_queue_consumer.h b/modules/concurrency/include/hephaestus/concurrency/message_queue_consumer.h index 8e7b0c12..43b0b0ff 100644 --- a/modules/concurrency/include/hephaestus/concurrency/message_queue_consumer.h +++ b/modules/concurrency/include/hephaestus/concurrency/message_queue_consumer.h @@ -33,7 +33,7 @@ class MessageQueueConsumer { [[nodiscard]] auto queue() -> containers::BlockingQueue&; private: - void consume(); + [[nodiscard]] auto consume() -> Spinner::SpinResult; private: Callback callback_; @@ -44,7 +44,7 @@ class MessageQueueConsumer { template MessageQueueConsumer::MessageQueueConsumer(Callback&& callback, std::optional max_queue_size) - : callback_(std::move(callback)), message_queue_(max_queue_size), spinner_([this] { consume(); }) { + : callback_(std::move(callback)), message_queue_(max_queue_size), spinner_([this] { return consume(); }) { } template @@ -64,13 +64,13 @@ auto MessageQueueConsumer::queue() -> containers::BlockingQueue& { } template -void MessageQueueConsumer::consume() { +auto MessageQueueConsumer::consume() -> Spinner::SpinResult { auto message = message_queue_.waitAndPop(); - if (!message.has_value()) { - return; + if (message.has_value()) { + callback_(message.value()); } - callback_(message.value()); + return Spinner::SpinResult::Continue; } } // namespace heph::concurrency diff --git a/modules/concurrency/include/hephaestus/concurrency/spinner.h b/modules/concurrency/include/hephaestus/concurrency/spinner.h index 26711c7c..dc03c23c 100644 --- a/modules/concurrency/include/hephaestus/concurrency/spinner.h +++ b/modules/concurrency/include/hephaestus/concurrency/spinner.h @@ -42,7 +42,7 @@ class Spinner { std::atomic_bool is_started_ = false; std::atomic_bool stop_requested_ = false; - std::thread spinner_thread_; + std::future async_spinner_handle_; std::chrono::microseconds spin_period_; std::chrono::system_clock::time_point start_timestamp_; diff --git a/modules/concurrency/src/spinner.cpp b/modules/concurrency/src/spinner.cpp index 19708a46..bbd36dc4 100644 --- a/modules/concurrency/src/spinner.cpp +++ b/modules/concurrency/src/spinner.cpp @@ -39,18 +39,25 @@ Spinner::Spinner(Callback&& callback, double rate_hz /*= 0*/) } Spinner::~Spinner() { - if (is_started_.load() || spinner_thread_.joinable()) { + if (is_started_.load()) { LOG(FATAL) << "Spinner is still running. Call stop() before destroying the object."; std::terminate(); } + + if (async_spinner_handle_.valid()) { + try { + async_spinner_handle_.get(); + } catch (const std::exception& e) { + throw e; // Re-throw the exception to be handled by the caller. + } + } } void Spinner::start() { throwExceptionIf(is_started_.load(), "Spinner is already started."); std::promise promise; - auto future = promise.get_future(); - spinner_thread_ = std::async(std::launch::async, [this, &promise]() { + async_spinner_handle_ = std::async(std::launch::async, [this, promise = std::move(promise)]() mutable { try { spin(); promise.set_value(); @@ -95,8 +102,6 @@ auto Spinner::stop() -> std::future { } void Spinner::stopImpl() { - spinner_thread_.join(); - is_started_.store(false); } diff --git a/modules/concurrency/tests/spinner_tests.cpp b/modules/concurrency/tests/spinner_tests.cpp index 68c8b8cd..8d0935c2 100644 --- a/modules/concurrency/tests/spinner_tests.cpp +++ b/modules/concurrency/tests/spinner_tests.cpp @@ -15,21 +15,27 @@ namespace heph::concurrency::tests { struct TestFixture : public ::testing::Test { - static auto TrivialCallback() -> std::function { - return []() {}; + static auto TrivialCallback() -> std::function { + return []() { return Spinner::SpinResult::Continue; }; } - static auto NonThrowingCallback(size_t& callback_called_counter) -> std::function { - return [&callback_called_counter]() { ++callback_called_counter; }; + static auto NonThrowingCallback(size_t& callback_called_counter) -> std::function { + return [&callback_called_counter]() { + ++callback_called_counter; + return Spinner::SpinResult::Continue; + }; } - static auto ThrowingCallback() -> std::function { - return []() { throwException("This is a test exception."); }; + static auto ThrowingCallback() -> std::function { + return []() { + throwException("This is a test exception."); + return Spinner::SpinResult::Continue; + }; } }; TEST(SpinnerTest, StartStopTest) { - Spinner spinner{ []() {} }; + Spinner spinner{ TestFixture::TrivialCallback() }; EXPECT_THROW(spinner.stop(), heph::InvalidOperationException); spinner.start();