Skip to content

Commit

Permalink
feat(thread): add Task
Browse files Browse the repository at this point in the history
  • Loading branch information
RiscadoA committed Apr 8, 2024
1 parent 5b8d61b commit 3235ca0
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Access to unscaled delta time for utilities like the debug camera (#1020, **@RiscadoA**).
- RenderTarget plugin (#1059, **@tomas7770**).
- RenderPicker plugin (#1060, **@tomas7770**).
- Task class, for use in asynchronous code (#1111, **@RiscadoA**).

### Changed

Expand Down
2 changes: 2 additions & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ message("# Building core tests: " ${BUILD_CORE_TESTS})
# Set core source files
set(CUBOS_CORE_SOURCE
"src/log.cpp"

"src/thread/pool.cpp"
"src/thread/task.cpp"

"src/memory/stream.cpp"
"src/memory/standard_stream.cpp"
Expand Down
159 changes: 159 additions & 0 deletions core/include/cubos/core/thread/task.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/// @file
/// @brief Class @ref cubos::core::thread::Task.
/// @ingroup core-thread

#pragma once

#include <condition_variable>
#include <mutex>

#include <cubos/core/log.hpp>

namespace cubos::core::thread
{
/// @brief Provides a mechanism to access the results of asynchronous operations.
/// @tparam T Result type.
/// @ingroup core-thread
template <typename T>
class Task final
{
public:
~Task()
{
this->discard();
}

/// @brief Constructs.
Task()
{
mData = new Data();
}

/// @brief Copy constructs.
/// @param other Task.
Task(const Task& other)
: mData{other.mData}
{
std::unique_lock lock{mData->mMutex};
mData->mRefCount += 1;
}

/// @brief Move constructs.
/// @param other Task.
Task(Task&& other) noexcept
: mData{other.mData}
{
other.mData = nullptr;
}

/// @brief Copy assigns.
/// @param other Task.
Task& operator=(const Task& other)
{
this->discard();
mData = other.mData;
std::unique_lock lock{mData->mMutex};
mData->mRefCount += 1;
}

/// @brief Move assigns.
/// @param other Task.
Task& operator=(Task&& other) noexcept
{
this->discard();
mData = other.mData;
other.mData = nullptr;
}

/// @brief Finishes the task, setting its result and notifying a waiting thread.
/// @param value Task result.
void finish(T value)
{
CUBOS_ASSERT(mData != nullptr, "Task has been discarded");

// Set the result and notify one waiting thread.
std::unique_lock lock(mData->mMutex);
CUBOS_ASSERT(!mData->mDone, "Task has already been finished");

new (&mData->mValue) T(std::move(value));
mData->mDone = true;
mData->mCondition.notify_all();
}

/// @brief Discards any result eventually received. The task is left in an invalid state.
void discard()
{
if (mData != nullptr)
{
std::unique_lock lock{mData->mMutex};
mData->mRefCount -= 1;
if (mData->mRefCount == 0)
{
lock.unlock();
delete mData;
}
mData = nullptr;
}
}

/// @brief Returns whether the task has finished.
/// @return Whether the task has finished.
bool isDone() const
{
CUBOS_ASSERT(mData != nullptr, "Task has been discarded");
std::unique_lock lock(mData->mMutex);
return mData->mDone;
}

/// @brief Blocks until the task finishes and then returns its result.
/// @return Task result.
T result()
{
CUBOS_ASSERT(mData != nullptr, "Task has been discarded");

// Wait until a result is obtained. When it is, consume it.
{
std::unique_lock lock(mData->mMutex);
mData->mCondition.wait(lock, [this]() { return mData->mDone; });
CUBOS_ASSERT(!mData->mConsumed, "Task result has already been consumed");
mData->mConsumed = true;
}

// Move the result and reset the task.
T result = std::move(mData->mValue);
this->discard();
return result;
}

private:
/// @brief Tracks the eventual result of the task, and is used to deliver it.
struct Data
{
int mRefCount{1};
bool mDone{false};
bool mConsumed{false};
union {
T mValue;
};

std::mutex mMutex;
std::condition_variable mCondition;

// NOLINTBEGIN(modernize-use-equals-default)
Data()
{
}
// NOLINTEND(modernize-use-equals-default)

~Data()
{
if (mDone)
{
mValue.~T();
}
}
};

Data* mData;
};
} // namespace cubos::core::thread
1 change: 1 addition & 0 deletions core/src/thread/task.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include <cubos/core/thread/task.hpp>
2 changes: 2 additions & 0 deletions core/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ add_executable(

geom/box.cpp
geom/capsule.cpp

thread/task.cpp
)

target_link_libraries(cubos-core-tests cubos-core doctest::doctest)
Expand Down
62 changes: 62 additions & 0 deletions core/tests/thread/task.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#include <thread>

#include <doctest/doctest.h>

#include <cubos/core/thread/task.hpp>

#include "../utils.hpp"

using cubos::core::thread::Task;

TEST_CASE("thread::Task")
{
SUBCASE("task which produces an int")
{
Task<int> task{};

REQUIRE_FALSE(task.isDone());

auto thread = std::thread{[&task]() {
std::this_thread::sleep_for(std::chrono::milliseconds{100});
task.finish(42);
}};

SUBCASE("join before")
{
thread.join();
REQUIRE(task.isDone());
REQUIRE(task.result() == 42);
}

SUBCASE("join after")
{
task.isDone(); // May return either true or false.
REQUIRE(task.result() == 42);
thread.join();
}
}

SUBCASE("check if task is destroyed properly")
{
Task<DetectDestructor> task{};
bool destroyed = false;

// A copy of the task is finished.
{
Task<DetectDestructor> task2{task};
REQUIRE_FALSE(task.isDone());
REQUIRE_FALSE(task2.isDone());
task2.finish({&destroyed});
REQUIRE_FALSE(destroyed);
REQUIRE(task2.isDone());
}
REQUIRE(task.isDone());

// Value isn't destroyed yet.
REQUIRE_FALSE(destroyed);

// Value is destroyed only after being accessed.
task.result();
REQUIRE(destroyed);
}
}

0 comments on commit 3235ca0

Please sign in to comment.