Skip to content

Commit

Permalink
Implement exception handling during destruction.
Browse files Browse the repository at this point in the history
  • Loading branch information
lhruby committed Oct 21, 2024
1 parent e77df26 commit e69714f
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 20 deletions.
2 changes: 1 addition & 1 deletion modules/concurrency/examples/spinner_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ static constexpr auto RATE_HZ = 10;

class Worker {
public:
Worker() : spinner_([this] { doWork(); }, RATE_HZ) {
Worker() : spinner_([this] { return doWork(); }, RATE_HZ) {
}

void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MessageQueueConsumer {
[[nodiscard]] auto queue() -> containers::BlockingQueue<T>&;

private:
void consume();
[[nodiscard]] auto consume() -> Spinner::SpinResult;

private:
Callback callback_;
Expand All @@ -44,7 +44,7 @@ class MessageQueueConsumer {

template <typename T>
MessageQueueConsumer<T>::MessageQueueConsumer(Callback&& callback, std::optional<std::size_t> max_queue_size)
: callback_(std::move(callback)), message_queue_(max_queue_size), spinner_([this] { consume(); }) {
: callback_(std::move(callback)), message_queue_(max_queue_size), spinner_([this] { return consume(); }) {
}

template <typename T>
Expand All @@ -64,13 +64,13 @@ auto MessageQueueConsumer<T>::queue() -> containers::BlockingQueue<T>& {
}

template <typename T>
void MessageQueueConsumer<T>::consume() {
auto MessageQueueConsumer<T>::consume() -> Spinner::SpinResult {
auto message = message_queue_.waitAndPop();
if (!message.has_value()) {
return;
if (message.has_value()) {
callback_(message.value());
}

callback_(message.value());
return Spinner::SpinResult::Continue;
}

} // namespace heph::concurrency
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Spinner {

std::atomic_bool is_started_ = false;
std::atomic_bool stop_requested_ = false;
std::thread spinner_thread_;
std::future<void> async_spinner_handle_;

std::chrono::microseconds spin_period_;
std::chrono::system_clock::time_point start_timestamp_;
Expand Down
15 changes: 10 additions & 5 deletions modules/concurrency/src/spinner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,25 @@ Spinner::Spinner(Callback&& callback, double rate_hz /*= 0*/)
}

Spinner::~Spinner() {
if (is_started_.load() || spinner_thread_.joinable()) {
if (is_started_.load()) {
LOG(FATAL) << "Spinner is still running. Call stop() before destroying the object.";
std::terminate();
}

if (async_spinner_handle_.valid()) {
try {
async_spinner_handle_.get();
} catch (const std::exception& e) {
throw e; // Re-throw the exception to be handled by the caller.
}
}
}

void Spinner::start() {
throwExceptionIf<InvalidOperationException>(is_started_.load(), "Spinner is already started.");

std::promise<void> promise;
auto future = promise.get_future();
spinner_thread_ = std::async(std::launch::async, [this, &promise]() {
async_spinner_handle_ = std::async(std::launch::async, [this, promise = std::move(promise)]() mutable {
try {
spin();
promise.set_value();
Expand Down Expand Up @@ -95,8 +102,6 @@ auto Spinner::stop() -> std::future<void> {
}

void Spinner::stopImpl() {
spinner_thread_.join();

is_started_.store(false);
}

Expand Down
20 changes: 13 additions & 7 deletions modules/concurrency/tests/spinner_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,27 @@
namespace heph::concurrency::tests {

struct TestFixture : public ::testing::Test {
static auto TrivialCallback() -> std::function<void()> {
return []() {};
static auto TrivialCallback() -> std::function<Spinner::SpinResult()> {
return []() { return Spinner::SpinResult::Continue; };
}

static auto NonThrowingCallback(size_t& callback_called_counter) -> std::function<void()> {
return [&callback_called_counter]() { ++callback_called_counter; };
static auto NonThrowingCallback(size_t& callback_called_counter) -> std::function<Spinner::SpinResult()> {
return [&callback_called_counter]() {
++callback_called_counter;
return Spinner::SpinResult::Continue;
};
}

static auto ThrowingCallback() -> std::function<void()> {
return []() { throwException<InvalidOperationException>("This is a test exception."); };
static auto ThrowingCallback() -> std::function<Spinner::SpinResult()> {
return []() {
throwException<InvalidOperationException>("This is a test exception.");
return Spinner::SpinResult::Continue;
};
}
};

TEST(SpinnerTest, StartStopTest) {
Spinner spinner{ []() {} };
Spinner spinner{ TestFixture::TrivialCallback() };

EXPECT_THROW(spinner.stop(), heph::InvalidOperationException);
spinner.start();
Expand Down

0 comments on commit e69714f

Please sign in to comment.