diff --git a/CMakeLists.txt b/CMakeLists.txt index d79d4547e..be26cdd1c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -100,6 +100,17 @@ if (TARGET MADworld) message(STATUS "MADNESS_FOUND=1") endif(TARGET MADworld) + +########################## +#### CUDA +########################## +check_language(CUDA) +if(CMAKE_CUDA_COMPILER) + enable_language(CUDA) +endif(CMAKE_CUDA_COMPILER) +set(TTG_HAVE_CUDA ${CMAKE_CUDA_COMPILER} CACHE BOOL "True if TTG supports compiling .cu files") + + ########################## #### Examples ########################## diff --git a/examples/madness/mrattg.cc b/examples/madness/mrattg.cc index 47bce1ce9..2b2d6c113 100644 --- a/examples/madness/mrattg.cc +++ b/examples/madness/mrattg.cc @@ -124,6 +124,46 @@ auto make_project(functorT& f, return ttg::make_tt(F, edges(fuse(refine, ctl)), edges(refine, result), name, {"control"}, {"refine", "result"}); } + +/// Returns an std::unique_ptr to the object +template +auto make_project_device(functorT& f, + const T thresh, /// should be scalar value not complex + ctlEdge& ctl, rnodeEdge& result, const std::string& name = "project") { + auto F = [f, thresh](const Key& key, std::tuple, rnodeOut>& out) { + FunctionReconstructedNode node(key); // Our eventual result + auto& coeffs = node.coeffs; // Need to clean up OO design + bool is_leaf; + + if (key.level() < initial_level(f)) { + for (auto child : children(key)) ttg::sendk<0>(child, out); + coeffs = T(1e7); // set to obviously bad value to detect incorrect use + is_leaf = false; + } else if (is_negligible(f, Domain::template bounding_box(key), + truncate_tol(key, thresh))) { + coeffs = T(0.0); + is_leaf = true; + } else { + auto node_view = ttg::make_view(node, ttg::ViewScope::Out); // no need to move node onto the device + auto is_leaf_view = ttg::make_view(is_leaf, ttg::ViewScope::Out); + co_await ttg::device::wait_views{}; + fcoeffs(f, key, thresh, + node_view.get_device_ptr<0>(), + is_leaf_view.get_device_ptr<0>()); // cannot deduce K + co_await ttg::device::wait_kernel{}; + if (!is_leaf) { + for (auto child : children(key)) ttg::sendk<0>(child, out); // should be broadcast ? + } + } + node.is_leaf = is_leaf; + ttg::send<1>(key, node, out); // always produce a result + }; + ctlEdge refine("refine"); + return ttg::make_tt(F, edges(fuse(refine, ctl)), edges(refine, result), name, {"control"}, {"refine", "result"}); +} + + + namespace detail { template struct tree_types {}; diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 47dc543e6..73d8a152a 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -2,16 +2,18 @@ include(AddTTGExecutable) # TT unit test: core TTG ops set(ut_src - fibonacci.cc - device_coro.cc - ranges.cc - tt.cc - unit_main.cpp) + #fibonacci.cc + #ranges.cc + #tt.cc + unit_main.cpp + ) set(ut_libs Catch2::Catch2) -if (TARGET std::coroutine) - list(APPEND ut_src fibonacci-coro.cc) +#if (TARGET std::coroutine) + #list(APPEND ut_src fibonacci-coro.cc) + list(APPEND ut_src device_coro.cc) + list(APPEND ut_src cuda_kernel.cu) list(APPEND ut_libs std::coroutine) -endif() +#endif() add_ttg_executable(core-unittests-ttg "${ut_src}" LINK_LIBRARIES "${ut_libs}") diff --git a/tests/unit/device_coro.cc b/tests/unit/device_coro.cc index 1e35c4586..11ed77d91 100644 --- a/tests/unit/device_coro.cc +++ b/tests/unit/device_coro.cc @@ -3,12 +3,154 @@ #include "ttg.h" #include "ttg/view.h" +#include "cuda_kernel.h" + +struct value_t { + ttg::buffer db; // TODO: rename + int quark; + + template + void ttg_serialize(Archive& ar) { + ar& quark; + ar& db; // input: + } +}; + +/* devicebuf is non-POD so provide serialization + * information for members not a devicebuf */ +namespace madness::archive { + template + struct ArchiveSerializeImpl { + static inline void serialize(const Archive& ar, value_t& obj) { ar& obj.quark & obj.db; }; + }; +} // namespace madness::archive + + TEST_CASE("Device", "coro") { + + SECTION("devicebuf") { + + ttg::Edge edge; + auto fn = [&](const int& key, value_t&& val) -> ttg::device_task { + ttg::print("device_task key ", key); + /* wait for the view to be available on the device */ + co_await ttg::to_device(val.db); + /* once we're back here the data has been transferred */ + CHECK(val.db.current_device_ptr() != nullptr); + + /* NO KERNEL */ + + /* here we suspend to wait for a kernel to complete */ + co_await ttg::wait_kernel(); + + /* we're back, the kernel executed and we can send */ + if (key < 1) { + /* TODO: should we move the view in here if we want to get the device side data */ + ttg::send<0>(key+1, std::move(val)); + } + }; + + //ptr.get_view(device_id); + + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); + make_graph_executable(tt); + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{}); + ttg::ttg_fence(ttg::default_execution_context()); + } + + SECTION("scratch") { + + ttg::Edge edge; + auto fn = [&](const int& key, value_t&& val) -> ttg::device_task { + double scratch = 0.0; + ttg::devicescratch ds = ttg::make_scratch(&scratch, ttg::scope::SyncOut); + + /* wait for the view to be available on the device */ + co_await ttg::to_device(ds, val.db); + /* once we're back here the data has been transferred */ + CHECK(ds.device_ptr() != nullptr); + + /* call a kernel */ + increment_buffer(val.db.current_device_ptr(), val.db.size(), ds.device_ptr(), ds.size()); + + /* here we suspend to wait for a kernel to complete */ + co_await ttg::wait_kernel(); + + /* buffer is increment once per task, so it should be the same as key */ + CHECK(static_cast(scratch) == key); + + /* we're back, the kernel executed and we can send */ + if (key < 10) { + /* TODO: should we move the view in here if we want to get the device side data */ + ttg::send<0>(key+1, std::move(val)); + } + }; + + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); + make_graph_executable(tt); + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{}); + ttg::ttg_fence(ttg::default_execution_context()); + } + + SECTION("ptr") { + + ttg::Edge edge; + ttg::Ptr ptr; + auto fn = [&](const int& key, value_t&& val) -> ttg::device_task { + double scratch = 1.0; + ttg::devicescratch ds = ttg::make_scratch(&scratch, ttg::scope::SyncOut); + + /* wait for the view to be available on the device */ + co_await ttg::to_device(ds, val.db); + /* once we're back here the data has been transferred */ + CHECK(ds.device_ptr() != nullptr); + + /* KERNEL */ + increment_buffer(val.db.current_device_ptr(), val.db.size(), ds.device_ptr(), ds.size()); + + /* here we suspend to wait for a kernel and the out-transfer to complete */ + co_await ttg::wait_kernel_out(val.db); + + /* buffer is increment once per task, so it should be the same as key */ + CHECK(static_cast(scratch) == key); + CHECK(static_cast(*val.db.host_ptr()) == key); + + /* we're back, the kernel executed and we can send */ + if (key < 10 || scratch < 0.0) { + ttg::send<0>(key+1, std::move(val)); + } else { + /* exfiltrate the value */ + /* TODO: what consistency do we expect from get_ptr? */ + ptr = ttg::get_ptr(val); + } + }; + + //ptr.get_view(device_id); + + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); + make_graph_executable(tt); + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, value_t{}); + ttg::ttg_fence(ttg::default_execution_context()); + CHECK(ptr.is_valid()); + + /* feed the ptr back into a graph */ + if (ttg::default_execution_context().rank() == 0) tt->invoke(11, ptr); + ttg::ttg_fence(ttg::default_execution_context()); + + ptr.reset(); + } + + +#if 0 + SECTION("device_task") { ttg::Edge edge; auto fn = [&](const int& key, double&& val) -> ttg::device_task { - ttg::View view = ttg::make_view(val, ttg::ViewScope::SyncInOut); + ttg::View view = ttg::make_view(val, ttg::ViewScope::SyncInOut); ttg::print("device_task key ", key, ", value ", val); /* wait for the view to be available on the device */ co_yield view; @@ -27,7 +169,7 @@ TEST_CASE("Device", "coro") { /* we're back, the kernel executed and we can send */ if (key < 10) { - ttg::send<0>(key+1, val); + ttg::send<0>(key+1, std::move(val)); } }; auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), @@ -37,7 +179,154 @@ TEST_CASE("Device", "coro") { ttg::ttg_fence(ttg::default_execution_context()); } + SECTION("get_ptr") { + ttg::Edge edge; + ttg::ptr ptr; + auto fn = [&](const int& key, double&& val) -> ttg::device_task { + ttg::View view = ttg::make_view(val, ttg::ViewScope::SyncInOut); + ttg::print("device_task key ", key, ", value ", val); + /* wait for the view to be available on the device */ + co_yield view; + // co_yield std::tie(view1, view2); + // TTG_WAIT_VIEW(view); + /* once we're back here the data has been transferred */ + //CHECK(view.get_rw_device_ptr<0>() != nullptr); + CHECK(view.get_device_ptr<0>() != nullptr); + CHECK(view.get_device_size<0>() == sizeof(val)); + CHECK(&view.get_host_object() == &val); + + ttg::print("device_task key ", key, ", device pointer ", view.get_device_ptr<0>()); + + /* here we suspend to wait for a kernel to complete */ + co_yield ttg::device_op_wait_kernel{}; + // TTG_WAIT_KERNEL(); + + /* we're back, the kernel executed and we can send */ + if (key < 10) { + /* TODO: should we move the view in here if we want to get the device side data */ + ttg::send<0>(key+1, std::move(val)); + } else { + /* exfiltrate the value */ + ptr = ttg::get_ptr(val); + } + }; + + //ptr.get_view(device_id); + + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); + make_graph_executable(tt); + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, 0.0); + ttg::ttg_fence(ttg::default_execution_context()); + #if 0 + /* feed the host-side value back into the graph */ + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, pview.get_ptr()); + ttg::ttg_fence(ttg::default_execution_context()); + + /* feed the device-side value back into the graph */ + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, pview); + ttg::ttg_fence(ttg::default_execution_context()); +#endif // 0 + } + + + + + SECTION("device_task") { + ttg::Edge edge; + auto fn = [&](const int& key, double&& val) -> ttg::device_task { + // will schedule the view for transfer to and from the device + ttg::View view = ttg::make_view(val, ttg::ViewScope::SyncInOut); + ttg::print("device_task key ", key, ", value ", val); + /* wait for the view to be available on the device */ + co_await ttg::device_task_wait_views{}; + + /* once we're back here the data has been transferred */ + CHECK(view.get_device_ptr<0>() != nullptr); + CHECK(view.get_device_size<0>() == sizeof(val)); + CHECK(&view.get_host_object() == &val); + + ttg::print("device_task key ", key, ", device pointer ", view.get_device_ptr<0>()); + + while (val < 10.0) { + + view.set_scope(ttg::ViewScope::SyncOut); + + /* */ + + /* here we suspend to wait for a kernel to complete */ + co_await ttg::device_task_wait_kernel{}; + + // TTG_WAIT_KERNEL(); + } + + /* we're back, the kernel executed and we can send */ + if (key < 10) { + ttg::send<0>(key+1, val); + } + }; + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); + make_graph_executable(tt); + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, 0.0); + ttg::ttg_fence(ttg::default_execution_context()); + } + + struct A { + double norm; + std::vector d; + }; + + SECTION("device_task") { + ttg::Edge edge; + auto fn = [&](const int& key, ttg::ptr&& a) -> ttg::device_task { + // will schedule the view for transfer to and from the device + View norm_view = a.to_host(&A::norm); + + View norm_view{a}; + co_await ttg::device::wait_transfer{}; + + + + + if (val) + val += 1.0; + ptr.sync_to_device(); + /* wait for the view to be available on the device */ + co_await ttg::device_task_wait_views{}; + + /* once we're back here the data has been transferred */ + CHECK(view.get_device_ptr<0>() != nullptr); + CHECK(view.get_device_size<0>() == sizeof(val)); + CHECK(&view.get_host_object() == &val); + + ttg::print("device_task key ", key, ", device pointer ", view.get_device_ptr<0>()); + + while (val < 10.0) { + + view.set_scope(ttg::ViewScope::SyncOut); + + /* */ + + /* here we suspend to wait for a kernel to complete */ + co_await ttg::device_task_wait_kernel{}; + + // TTG_WAIT_KERNEL(); + } + + /* we're back, the kernel executed and we can send */ + if (key < 10) { + ttg::send<0>(key+1, val); + } + }; + auto tt = ttg::make_tt(fn, ttg::edges(edge), ttg::edges(edge), + "device_task", {"edge_in"}, {"edge_out"}); + make_graph_executable(tt); + if (ttg::default_execution_context().rank() == 0) tt->invoke(0, 0.0); + ttg::ttg_fence(ttg::default_execution_context()); + } + struct A { int a[10]; double b[10]; @@ -48,7 +337,7 @@ TEST_CASE("Device", "coro") { ttg::Edge edge; auto fn = [](const int& key, A&& val) -> ttg::device_task { auto view = ttg::make_view(val, {val.a, 10, ttg::ViewScope::SyncIn}, - {val.b, 10, ttg::ViewScope::SyncIn|ttg::ViewScope::SyncOut}); + {val.b, 10, ttg::ViewScope::SyncIn}); /* wait for the view to be available on the device */ co_yield view; // co_yield std::tie(view1, view2); diff --git a/ttg/CMakeLists.txt b/ttg/CMakeLists.txt index 05231ef6f..d30b68da4 100644 --- a/ttg/CMakeLists.txt +++ b/ttg/CMakeLists.txt @@ -44,12 +44,16 @@ configure_file( ) set(ttg-impl-headers ${CMAKE_CURRENT_SOURCE_DIR}/ttg/broadcast.h + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/buffer.h + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/devicescope.h + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/devicescratch.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/edge.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/execution.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/func.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/fwd.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/impl_selector.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/tt.h + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/ptr.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/reduce.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/run.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/runtimes.h @@ -203,8 +207,13 @@ endif(TARGET MADworld) ######################## if (TARGET PaRSEC::parsec) set(ttg-parsec-headers + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/buffer.h + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/devicescratch.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/fwd.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/import.h + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/ptr.h + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/task.h + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/thread_local.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/ttg.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/parsec/ttg_data_copy.h ) diff --git a/ttg/ttg.h b/ttg/ttg.h index e0fa9a702..09ccfaf90 100644 --- a/ttg/ttg.h +++ b/ttg/ttg.h @@ -27,6 +27,11 @@ #include "ttg/edge.h" +#include "ttg/ptr.h" +#include "ttg/buffer.h" +#include "ttg/devicescratch.h" +#include "ttg/devicescope.h" + #if defined(TTG_USE_PARSEC) #include "ttg/parsec/ttg.h" #elif defined(TTG_USE_MADNESS) diff --git a/ttg/ttg/buffer.h b/ttg/ttg/buffer.h new file mode 100644 index 000000000..1868b7e0c --- /dev/null +++ b/ttg/ttg/buffer.h @@ -0,0 +1,31 @@ +#ifndef TTG_BUFFER_H +#define TTG_BUFFER_H + +#include +#include "ttg/impl_selector.h" + + +namespace ttg { + +template +using buffer = TTG_IMPL_NS::buffer; + +namespace detail { + template + struct is_buffer : std::false_type + { }; + + template + struct is_buffer> : std::true_type + { }; + + template + constexpr bool is_buffer_v = is_buffer::value; + + static_assert(is_buffer_v>); + static_assert(is_buffer_v>); +} // namespace detail + +} // namespace ttg + +#endif // TTG_buffer_H \ No newline at end of file diff --git a/ttg/ttg/devicescope.h b/ttg/ttg/devicescope.h new file mode 100644 index 000000000..a8427fb56 --- /dev/null +++ b/ttg/ttg/devicescope.h @@ -0,0 +1,15 @@ +#ifndef TTG_DEVICESCOPE_H +#define TTG_DEVICESCOPE_H + +namespace ttg { + enum class scope { + Allocate = 0x0, //< memory allocated as scratch, but not moved in or out + SyncIn = 0x2, //< data will be allocated on and transferred to device + //< if latest version resides on the device (no previous sync-out) the data will + //< not be transferred again + SyncOut = 0x4, //< value will be transferred from device to host after kernel completes + SyncInOut = 0x8, //< data will be moved in and synchronized back out after the kernel completes + }; +} // namespace ttg + +#endif // TTG_DEVICESCOPE_H \ No newline at end of file diff --git a/ttg/ttg/devicescratch.h b/ttg/ttg/devicescratch.h new file mode 100644 index 000000000..1478d82cf --- /dev/null +++ b/ttg/ttg/devicescratch.h @@ -0,0 +1,19 @@ +#ifndef TTG_DEVICESCRATCH_H +#define TTG_DEVICESCRATCH_H + +#include "ttg/devicescope.h" +#include "ttg/impl_selector.h" + +namespace ttg { + +template +using devicescratch = TTG_IMPL_NS::devicescratch; + +template +auto make_scratch(T* val, ttg::scope scope, std::size_t count = 1) { + return devicescratch(val, scope, 1); +} + +} // namespace ttg + +#endif // TTG_DEVICESCRATCH_H \ No newline at end of file diff --git a/ttg/ttg/fwd.h b/ttg/ttg/fwd.h index df32505d0..f9b8d1c0f 100644 --- a/ttg/ttg/fwd.h +++ b/ttg/ttg/fwd.h @@ -47,6 +47,7 @@ namespace ttg { template void initialize(int argc, char **argv, int num_threads = -1, RestOfArgs &&...); void finalize(); + [[noreturn]] void abort(); World default_execution_context(); void execute(ttg::World world); diff --git a/ttg/ttg/parsec/buffer.h b/ttg/ttg/parsec/buffer.h new file mode 100644 index 000000000..6d1a0c16f --- /dev/null +++ b/ttg/ttg/parsec/buffer.h @@ -0,0 +1,388 @@ +#ifndef TTG_PARSEC_BUFFER_H +#define TTG_PARSEC_BUFFER_H + +// TODO: replace with short vector +#define TTG_PARSEC_MAX_NUM_DEVICES 4 + +#include +#include +#include +#include +#include "ttg/parsec/ttg_data_copy.h" + +namespace ttg_parsec { + + +namespace detail { + // fwd decl + template + parsec_data_t* get_parsec_data(const ttg_parsec::buffer& db); +} // namespace detail + +/** + * A buffer that is mirrored between host memory + * and different devices. The runtime is free to + * move data between device and host memory based + * on where the tasks are executing. + * + * Note that a buffer is movable and should not + * be shared between two objects (e.g., through a pointer) + * in order for TTG to properly facilitate ownership + * tracking of the containing object. + */ +template +struct buffer { + + using element_type = std::decay_t; + + static_assert(std::is_trivially_copyable_v, + "Only trivially copyable types are supported for devices."); + static_assert(std::is_default_constructible_v, + "Only default constructible types are supported for devices."); + +private: + using delete_fn_t = std::add_pointer_t; + + using parsec_data_ptr = std::unique_ptr; + using host_data_ptr = std::unique_ptr; + parsec_data_ptr m_data; + host_data_ptr m_host_data; + std::size_t m_count = 0; + detail::ttg_data_copy_t *m_ttg_copy = nullptr; + + static void delete_owned(element_type *ptr) { + delete[] ptr; + } + + static void delete_non_owned(element_type *ptr) { + // nothing to be done, we don't own the memory + } + + static void delete_parsec_data(parsec_data_t *data) { + std::cout << "delete parsec_data " << data << std::endl; + parsec_data_destroy(data); + } + + static void delete_null_parsec_data(parsec_data_t *) { + // nothing to be done, only used for nullptr + } + + void create_host_copy() { + /* create a new copy for the host object */ + parsec_data_copy_t* copy; + copy = parsec_data_copy_new(m_data.get(), 0, parsec_datatype_int8_t, PARSEC_DATA_FLAG_PARSEC_MANAGED); + copy->device_private = m_host_data.get(); + copy->coherency_state = PARSEC_DATA_COHERENCY_SHARED; + copy->version = 1; // this version is valid + m_data->nb_elts = sizeof(element_type)*m_count; + m_data->owner_device = 0; + /* register the new data with the host copy */ + if (nullptr != m_ttg_copy) { + m_ttg_copy->add_device_data(m_data.get()); + } + } + + void reset() { + if (m_data) { + if (nullptr != m_ttg_copy) { + m_ttg_copy->remove_device_data(m_data.get()); + } + m_data.reset(); + m_count = 0; + } + } + + friend parsec_data_t* detail::get_parsec_data(const ttg_parsec::buffer&); + +public: + + /* The device ID of the CPU. */ + static constexpr int cpu_device = 0; + + buffer() : buffer(1) + { } + + buffer(std::size_t count) + : m_data(parsec_data_new(), &delete_parsec_data) + , m_host_data(new element_type[count](), &delete_owned) + , m_count(count) + , m_ttg_copy(detail::ttg_data_copy_container()) + { + create_host_copy(); + } + + /* Constructing a buffer using application-managed memory. + * The memory pointed to by ptr must be accessible during + * the life-time of the buffer. */ + buffer(element_type* ptr, std::size_t count = 1) + : m_data(parsec_data_new(), &parsec_data_destroy) + , m_host_data(ptr, &delete_non_owned) + , m_count(count) + , m_ttg_copy(detail::ttg_data_copy_container()) + { + create_host_copy(); + } + + ~buffer() { + unpin(); // make sure the copies are not pinned + /* remove the tracked copy */ + if (nullptr != m_ttg_copy && m_data) { + m_ttg_copy->remove_device_data(m_data.get()); + } + } + + /* allow moving device buffers */ + buffer(buffer&& db) + : m_data(std::move(db.m_data)) + , m_host_data(std::move(db.m_host_data)) + , m_count(db.m_count) + { + db.m_count = 0; + /* don't update the ttg_copy, we keep the connection */ + } + + /* copy the host data but leave the devices untouched */ + buffer(const buffer& db) + : m_data(db.m_count ? parsec_data_new() : nullptr, + db.m_count ? &parsec_data_destroy : &delete_null_parsec_data) + , m_host_data(db.m_count ? new element_type[db.m_count] : nullptr, + db.m_count ? &delete_owned : delete_non_owned) + , m_count(db.m_count) + , m_ttg_copy(detail::ttg_data_copy_container()) + { + /* copy host data */ + std::copy(db.m_host_data.get(), + db.m_host_data.get() + m_count, + m_host_data.get()); + /* create the host copy with the allocated memory */ + create_host_copy(); + } + + /* allow moving device buffers */ + buffer& operator=(buffer&& db) { + m_data = std::move(db.m_data); + m_host_data = std::move(db.m_host_data); + m_count = db.m_count; + db.m_count = 0; + /* don't update the ttg_copy, we keep the connection */ + } + + /* copy the host buffer content but leave the devices untouched */ + buffer& operator=(const buffer& db) { + if (db.m_count == 0) { + m_data = parsec_data_ptr(nullptr, &delete_null_parsec_data); + m_host_data = host_data_ptr(nullptr, &delete_non_owned); + } else { + m_data = parsec_data_ptr(parsec_data_new(), &parsec_data_destroy); + m_host_data = host_data_ptr(new element_type[db.m_count], &delete_owned); + /* copy host data */ + std::copy(db.m_host_data.get(), + db.m_host_data.get() + db.m_count, + m_host_data.get()); + /* create the host copy with the allocated memory */ + create_host_copy(); + } + m_count = db.m_count; + } + + /* set the current device, useful when a device + * buffer was modified outside of a TTG */ + void set_current_device(int device_id) { + assert(is_valid()); + /* make sure it's a valid device */ + assert(parsec_nb_devices > device_id); + /* make sure it's a valid copy */ + assert(m_data->device_copies[device_id] != nullptr); + m_data->owner_device = device_id; + } + + /* get the current device ID, i.e., the last updated + * device buffer. */ + int get_current_device() const { + assert(is_valid()); + return m_data->owner_device; + } + + /* get the current device pointer */ + element_type* current_device_ptr() { + assert(is_valid()); + return static_cast(m_data->device_copies[m_data->owner_device]->device_private); + } + + /* get the current device pointer */ + const element_type* current_device_ptr() const { + assert(is_valid()); + return static_cast(m_data->device_copies[m_data->owner_device]->device_private); + } + + /* get the device pointer at the given device + * \sa cpu_device + */ + element_type* device_ptr_on(int device_id) { + assert(is_valid()); + return static_cast(parsec_data_get_ptr(m_data.get(), device_id)); + } + + /* get the device pointer at the given device + * \sa cpu_device + */ + const element_type* device_ptr_on(int device_id) const { + assert(is_valid()); + return static_cast(parsec_data_get_ptr(m_data.get(), device_id)); + } + + element_type* host_ptr() { + return device_ptr_on(cpu_device); + } + + const element_type* host_ptr() const { + return device_ptr_on(cpu_device); + } + + bool is_valid_on(int device_id) const { + assert(is_valid()); + return (parsec_data_get_ptr(m_data.get(), device_id) != nullptr); + } + + void allocate_on(int device_id) { + /* TODO: need exposed PaRSEC memory allocator */ + } + + /* TODO: can we do this automatically? + * Pin the memory on all devices we currently track. + * Pinned memory won't be released by PaRSEC and can be used + * at any time. + */ + void pin() { + for (int i = 1; i < parsec_nb_devices; ++i) { + pin_on(i); + } + } + + /* Unpin the memory on all devices we currently track. */ + void unpin() { + if (!is_valid()) return; + for (int i = 1; i < parsec_nb_devices; ++i) { + unpin_on(i); + } + } + + /* Pin the memory on a given device */ + void pin_on(int device_id) { + /* TODO: how can we pin memory on a device? */ + } + + /* Pin the memory on a given device */ + void unpin_on(int device_id) { + /* TODO: how can we unpin memory on a device? */ + } + + bool is_valid() const { + return !!m_data; + } + + operator bool() const { + return is_valid(); + } + + std::size_t size() const { + return m_count; + } + + /* Reallocate the buffer with count elements */ + void reset(std::size_t count) { + /* TODO: can we resize if count is smaller than m_count? */ + /* drop the current data and reallocate */ + reset(); + if (count == 0) { + m_data = parsec_data_ptr(nullptr, &delete_null_parsec_data); + m_host_data = host_data_ptr(nullptr, &delete_non_owned); + } else { + m_data = parsec_data_ptr(parsec_data_new(), &parsec_data_destroy); + m_host_data = host_data_ptr(new element_type[count], &delete_owned); + /* create the host copy with the allocated memory */ + create_host_copy(); + } + m_count = count; + /* don't touch the ttg_copy, we still belong to the same container */ + } + + /* Reset the buffer to use the ptr to count elements */ + void reset(T* ptr, std::size_t count = 1) { + /* TODO: can we resize if count is smaller than m_count? */ + /* drop the current data and reallocate */ + reset(); + if (nullptr == ptr) { + m_data = parsec_data_ptr(nullptr, &delete_null_parsec_data); + m_host_data = host_data_ptr(nullptr, &delete_non_owned); + m_count = 0; + } else { + m_data = parsec_data_ptr(parsec_data_new(), &parsec_data_destroy); + m_host_data = host_data_ptr(ptr, &delete_non_owned); + /* create the host copy with the allocated memory */ + create_host_copy(); + m_count = count; + } + /* don't touch the ttg_copy, we still belong to the same container */ + } + + /* serialization support */ + +#ifdef TTG_SERIALIZATION_SUPPORTS_CEREAL + template + std::enable_if_t || + std::is_base_of_v> + serialize(Archive& ar) { + if constexpr (ttg::detail::is_output_archive_v) + std::size_t s = size(); + ar(s); + else { + std::size_t s; + ar(s); + reset(s); + } + ar(value); + } +#endif // TTG_SERIALIZATION_SUPPORTS_CEREAL + +#ifdef TTG_SERIALIZATION_SUPPORTS_MADNESS + template + std::enable_if_t || + std::is_base_of_v> + serialize(Archive& ar) { + if constexpr (ttg::detail::is_output_archive_v) + ar& size(); + else { + std::size_t s; + ar & s; + /* initialize internal pointers and then reset */ + + reset(s); + } + } +#endif // TTG_SERIALIZATION_SUPPORTS_MADNESS + + +}; + +template +struct is_buffer : std::false_type +{ }; + +template +struct is_buffer> : std::true_type +{ }; + +template +constexpr static const bool is_buffer_v = is_buffer::value; + +namespace detail { + template + parsec_data_t* get_parsec_data(const ttg_parsec::buffer& db) { + return const_cast(db.m_data.get()); + } +} // namespace detail + +} // namespace ttg_parsec + +#endif // TTG_PARSEC_BUFFER_H \ No newline at end of file diff --git a/ttg/ttg/parsec/devicescratch.h b/ttg/ttg/parsec/devicescratch.h new file mode 100644 index 000000000..1c0487cdf --- /dev/null +++ b/ttg/ttg/parsec/devicescratch.h @@ -0,0 +1,139 @@ +#ifndef TTG_PARSEC_DEVICESCRATCH_H +#define TTG_PARSEC_DEVICESCRATCH_H + +// TODO: replace with short vector +#define TTG_PARSEC_MAX_NUM_DEVICES 4 + +#include +#include +#include +#include +#include + +namespace ttg_parsec { + +namespace detail { + // fwd decl + template + parsec_data_t* get_parsec_data(const ttg_parsec::devicescratch&); +} // namespace detail + +/** + * Scratch-space for task-local variables. + * TTG will allocate memory on the device + * and transfer data in and out based on the scope. + */ +template +struct devicescratch { + + using element_type = std::decay_t; + + static_assert(std::is_trivially_copyable_v, + "Only trivially copyable types are supported for devices."); + static_assert(std::is_default_constructible_v, + "Only default constructible types are supported for devices."); + +private: + + parsec_data_t* m_data = nullptr; + parsec_data_copy_t m_data_copy; + ttg::scope m_scope; + + void create_host_copy(element_type *ptr, std::size_t count) { + /* TODO: is the construction call necessary? */ + /* TODO: handle the scope */ + PARSEC_OBJ_CONSTRUCT(&m_data_copy, parsec_data_copy_t); + m_data_copy.device_index = 0; + //m_data_copy.original = &m_data; + //m_data_copy.older = NULL; + m_data_copy.flags = PARSEC_DATA_FLAG_PARSEC_MANAGED; + m_data_copy.dtt = parsec_datatype_int8_t; + m_data_copy.version = 1; + m_data_copy.device_private = ptr; + m_data_copy.coherency_state = PARSEC_DATA_COHERENCY_SHARED; + + m_data->nb_elts = count * sizeof(element_type); + m_data->owner_device = 0; + parsec_data_copy_attach(m_data, &m_data_copy, 0); + } + + friend parsec_data_t* detail::get_parsec_data(const ttg_parsec::devicescratch&); + +public: + + /* Constructing a devicescratch using application-managed memory. + * The memory pointed to by ptr must be accessible during + * the life-time of the devicescratch. */ + devicescratch(element_type* ptr, ttg::scope scope = ttg::scope::SyncIn, std::size_t count = 1) + : m_data(parsec_data_new()) + , m_scope(scope) { + create_host_copy(ptr, count); + } + + /* don't allow moving */ + devicescratch(devicescratch&&) = delete; + + /* don't allow copying */ + devicescratch(const devicescratch& db) = delete; + + /* don't allow moving */ + devicescratch& operator=(devicescratch&&) = delete; + + /* don't allow copying */ + devicescratch& operator=(const devicescratch& db) = delete; + + ~devicescratch() { + PARSEC_OBJ_DESTRUCT(&m_data_copy); + parsec_data_destroy(m_data); + m_data = nullptr; + } + + /* get the current device pointer */ + element_type* device_ptr() { + assert(is_valid()); + return static_cast(m_data->device_copies[m_data->owner_device]->device_private); + } + + /* get the current device pointer */ + const element_type* device_ptr() const { + assert(is_valid()); + return static_cast(m_data->device_copies[m_data->owner_device]->device_private); + } + + bool is_valid() const { + // TODO: how to get the current device + // return (m_data->owner_device == parsec_current_device); + return true; + } + + ttg::scope scope() const { + return m_scope; + } + + std::size_t size() const { + return (m_data->nb_elts / sizeof(element_type)); + } + +}; + +template +struct is_devicescratch : std::false_type +{ }; + +template +struct is_devicescratch> : std::true_type +{ }; + +template +constexpr static const bool is_devicescratch_v = is_devicescratch::value; + +namespace detail { + template + parsec_data_t* get_parsec_data(const ttg_parsec::devicescratch& scratch) { + return const_cast(scratch.m_data); + } +} // namespace detail + +} // namespace ttg_parsec + +#endif // TTG_PARSEC_DEVICESCRATCH_H \ No newline at end of file diff --git a/ttg/ttg/parsec/fwd.h b/ttg/ttg/parsec/fwd.h index ece8c42fb..06c4dadf4 100644 --- a/ttg/ttg/parsec/fwd.h +++ b/ttg/ttg/parsec/fwd.h @@ -13,6 +13,20 @@ namespace ttg_parsec { template > class TT; + template + struct ptr; + + template + struct buffer; + template + struct devicescratch; + + template + inline bool register_device_memory(std::tuple &views); + + template + inline void mark_device_out(std::tuple &b); + /// \internal the OG name template using Op [[deprecated("use TT instead")]] = TT>; @@ -58,6 +72,17 @@ namespace ttg_parsec { void free(int did, void *ptr); ttg::ExecutionSpace space(int did); } + +#if 0 + template + inline std::pair>...>> get_ptr(Args&&... args); +#endif + template + inline ptr> get_ptr(T&& obj); + + template + inline ptr make_ptr(Args&&... args); + } // namespace ttg_parsec diff --git a/ttg/ttg/parsec/ptr.h b/ttg/ttg/parsec/ptr.h new file mode 100644 index 000000000..6499e050b --- /dev/null +++ b/ttg/ttg/parsec/ptr.h @@ -0,0 +1,282 @@ +#ifndef TTG_PARSEC_PTR_H +#define TTG_PARSEC_PTR_H + +#include +#include + +#include "ttg/parsec/ttg_data_copy.h" +#include "ttg/parsec/thread_local.h" +#include "ttg/parsec/task.h" + +namespace ttg_parsec { + + // fwd decl + template + struct ptr; + + namespace detail { + /* fwd decl */ + template + inline ttg_data_copy_t *create_new_datacopy(Value &&value); + + struct ptr { + using copy_type = detail::ttg_data_copy_t; + + private: + static inline std::unordered_map m_ptr_map; + static inline std::mutex m_ptr_map_mtx; + + copy_type *m_copy = nullptr; + + void drop_copy() { + std::cout << "ptr drop_copy " << m_copy << " ref " << m_copy->num_ref() << std::endl; + if (nullptr != m_copy && 1 == m_copy->drop_ref()) { + delete m_copy; + } + m_copy = nullptr; + } + + void register_self() { + /* insert ourselves from the list of ptr */ + std::lock_guard {m_ptr_map_mtx}; + m_ptr_map.insert(std::pair{this, true}); + } + + void deregister_self() { + /* remove ourselves from the list of ptr */ + std::lock_guard _{m_ptr_map_mtx}; + if (m_ptr_map.contains(this)) { + m_ptr_map.erase(this); + } + } + + public: + ptr(copy_type *copy) + : m_copy(copy) + { + register_self(); + m_copy->add_ref(); + std::cout << "ptr copy_obj ref " << m_copy->num_ref() << std::endl; + } + + copy_type* get_copy() const { + return m_copy; + } + + ptr(const ptr& p) + : m_copy(p.m_copy) + { + register_self(); + m_copy->add_ref(); + std::cout << "ptr cpy " << m_copy << " ref " << m_copy->num_ref() << std::endl; + } + + ptr(ptr&& p) + : m_copy(p.m_copy) + { + register_self(); + p.m_copy = nullptr; + std::cout << "ptr mov " << m_copy << " ref " << m_copy->num_ref() << std::endl; + } + + ~ptr() { + deregister_self(); + drop_copy(); + } + + ptr& operator=(const ptr& p) + { + drop_copy(); + m_copy = p.m_copy; + m_copy->add_ref(); + std::cout << "ptr cpy " << m_copy << " ref " << m_copy->num_ref() << std::endl; + return *this; + } + + ptr& operator=(ptr&& p) { + drop_copy(); + m_copy = p.m_copy; + p.m_copy = nullptr; + std::cout << "ptr mov " << m_copy << " ref " << m_copy->num_ref() << std::endl; + return *this; + } + + bool is_valid() const { + return (nullptr != m_copy); + } + + void reset() { + drop_copy(); + } + + /* drop all currently registered ptr + * \note this function is not thread-safe + * and should only be called at the + * end of the execution, e.g., during finalize. + */ + static void drop_all_ptr() { + for(auto it : m_ptr_map) { + it.first->drop_copy(); + } + } + }; + + + template + ttg_parsec::detail::ttg_data_copy_t* get_copy(ttg_parsec::ptr& p); + } // namespace detail + + template + ptr ttg_parsec::make_ptr(Args&&... args); + + template + ptr> ttg_parsec::get_ptr(T&& obj); + + template + struct ptr { + + using value_type = std::decay_t; + + private: + using copy_type = detail::ttg_data_value_copy_t; + + std::unique_ptr m_ptr; + + /* only PaRSEC backend functions are allowed to touch our private parts */ + template + friend ptr make_ptr(Args&&... args); + template + friend ptr> get_ptr(S&& obj); + template + friend detail::ttg_data_copy_t* detail::get_copy(ptr& p); + friend ttg::detail::value_copy_handler; + + /* only accessible by get_ptr and make_ptr */ + ptr(detail::ptr::copy_type *copy) + : m_ptr(new detail::ptr(copy)) + { } + + copy_type* get_copy() const { + return static_cast(m_ptr->get_copy()); + } + + public: + + ptr() = default; + + ptr(const ptr& p) + : ptr(p.get_copy()) + { } + + ptr(ptr&& p) = default; + + ~ptr() = default; + + ptr& operator=(const ptr& p) { + m_ptr.reset(new detail::ptr(p.get_copy())); + return *this; + } + + ptr& operator=(ptr&& p) = default; + + value_type& operator*() const { + return **static_cast(m_ptr->get_copy()); + } + + value_type& operator->() const { + return **static_cast(m_ptr->get_copy()); + } + + bool is_valid() const { + return m_ptr && m_ptr->is_valid(); + } + + void reset() { + m_ptr.reset(); + } + }; + +#if 0 + namespace detail { + template + inline auto get_ptr(Arg&& obj) { + + for (int i = 0; i < detail::parsec_ttg_caller->data_count; ++i) { + detail::ttg_data_copy_t *copy = detail::parsec_ttg_caller->copies[i]; + if (nullptr != copy) { + if (copy->get_ptr() == &obj) { + bool is_ready = true; + /* TODO: how can we force-sync host and device? Current data could be on either. */ +#if 0 + /* check all tracked device data for validity */ + for (auto it : copy) { + parsec_data_t *data = *it; + for (int i = 0; i < parsec_nb_devices; ++i) { + if (nullptr != data->device_copies[i]) { + + } else { + is_ready = false; + } + } + } +#endif // 0 + return std::make_pair(is_ready, std::tuple{ttg_parsec::ptr>(copy)}); + } + } + } + + throw std::runtime_error("ttg::get_ptr called on an unknown object!"); + } + } + + template + inline std::pair>...>> get_ptr(Args&&... args) { + if (nullptr == detail::parsec_ttg_caller) { + throw std::runtime_error("ttg::get_ptr called outside of a task!"); + } + + bool ready = true; + auto fn = [&](auto&& arg){ + auto pair = get_ptr(std::forward(arg)); + ready &= pair.first; + return std::move(pair.second); + }; + std::tuple>...> tpl = {(fn(std::forward(args)))...}; + return {ready, std::move(tpl)}; + } +#endif // 0 + + template + inline ptr> get_ptr(T&& obj) { + using ptr_type = ptr>; + if (nullptr != detail::parsec_ttg_caller) { + for (int i = 0; i < detail::parsec_ttg_caller->data_count; ++i) { + detail::ttg_data_copy_t *copy = detail::parsec_ttg_caller->copies[i]; + if (nullptr != copy) { + if (copy->get_ptr() == &obj) { + return ptr_type(copy); + } + } + } + } + /* object not tracked, make a new ptr that is now tracked */ + detail::ttg_data_copy_t *copy = detail::create_new_datacopy(obj); + return ptr_type(copy); + } + + template + inline ptr make_ptr(Args&&... args) { + detail::ttg_data_copy_t *copy = detail::create_new_datacopy(T(std::forward(args)...)); + return ptr(copy); + } + + namespace detail { + template + detail::ttg_data_copy_t* get_copy(ttg_parsec::ptr& p) { + return p.get_copy(); + } + } // namespace detail + +} // namespace ttg_parsec + +#endif // TTG_PARSEC_PTR_H \ No newline at end of file diff --git a/ttg/ttg/parsec/task.h b/ttg/ttg/parsec/task.h new file mode 100644 index 000000000..c8465b81c --- /dev/null +++ b/ttg/ttg/parsec/task.h @@ -0,0 +1,207 @@ +#ifndef TTG_PARSEC_TASK_H +#define TTG_PARSEC_TASK_H + +#include "ttg/parsec/ttg_data_copy.h" + +#include +#include + +namespace ttg_parsec { + + namespace detail { + + struct device_ptr_t { + parsec_gpu_task_t* gpu_task = nullptr; + parsec_flow_t* flows = nullptr; + }; + + template + struct device_state_t + { + static constexpr bool support_device = false; + static constexpr size_t num_flows = 0; + device_state_t(parsec_task_t *parsec_task) + { } + static constexpr device_ptr_t* dev_ptr() { + return nullptr; + } + }; + + template<> + struct device_state_t { + static constexpr bool support_device = false; + static constexpr size_t num_flows = MAX_PARAM_COUNT; + parsec_flow_t m_flows[num_flows]; + device_ptr_t m_dev_ptr = {nullptr, &m_flows[0]}; // gpu_task will be allocated in each task + device_ptr_t* dev_ptr() { + return &m_dev_ptr; + } + }; + + typedef parsec_hook_return_t (*parsec_static_op_t)(void *); // static_op will be cast to this type + + struct parsec_ttg_task_base_t { + parsec_task_t parsec_task; + int32_t in_data_count = 0; //< number of satisfied inputs + int32_t data_count = 0; //< number of data elements in the copies array + ttg_data_copy_t **copies; //< pointer to the fixed copies array of the derived task + parsec_hash_table_item_t tt_ht_item = {}; + parsec_static_op_t function_template_class_ptr[ttg::runtime_traits::num_execution_spaces] = + {nullptr}; + + typedef struct { + std::size_t goal; + std::size_t size; + } size_goal_t; + + typedef void (release_task_fn)(parsec_ttg_task_base_t*); + /* Poor-mans virtual function + * We cannot use virtual inheritance or private visibility because we + * need offsetof for the mempool and scheduling. + */ + release_task_fn* release_task_cb = nullptr; + device_ptr_t* dev_ptr; + bool remove_from_hash = true; + bool is_dummy = false; + bool defer_writer = TTG_PARSEC_DEFER_WRITER; // whether to defer writer instead of creating a new copy + + + /* + virtual void release_task() = 0; + */ + //public: + void release_task() { + release_task_cb(this); + } + + protected: + /** + * Protected constructors: this class should not be instantiated directly + * but always be use through parsec_ttg_task_t. + */ + + parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, + int data_count, ttg_data_copy_t **copies, device_ptr_t *dev_ptr, + bool defer_writer = TTG_PARSEC_DEFER_WRITER) + : data_count(data_count) + , copies(copies) + , dev_ptr(dev_ptr) + , defer_writer(defer_writer) { + PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super); + parsec_task.mempool_owner = mempool; + parsec_task.task_class = task_class; + parsec_task.priority = 0; + } + + parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, + parsec_taskpool_t *taskpool, int32_t priority, + int data_count, ttg_data_copy_t **copies, device_ptr_t *dev_ptr, + release_task_fn *release_fn, + bool defer_writer = TTG_PARSEC_DEFER_WRITER) + : data_count(data_count) + , copies(copies) + , release_task_cb(release_fn) + , dev_ptr(dev_ptr) + , defer_writer(defer_writer) { + PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super); + parsec_task.mempool_owner = mempool; + parsec_task.task_class = task_class; + parsec_task.status = PARSEC_TASK_STATUS_HOOK; + parsec_task.taskpool = taskpool; + parsec_task.priority = priority; + parsec_task.chore_mask = 1<<0; + } + + public: + void set_dummy(bool d) { is_dummy = d; } + bool dummy() { return is_dummy; } + }; + + template > + struct parsec_ttg_task_t : public parsec_ttg_task_base_t { + using key_type = typename TT::key_type; + static constexpr size_t num_streams = TT::numins; + TT* tt; + key_type key; + size_goal_t stream[num_streams] = {}; +#ifdef TTG_HAS_COROUTINE + void* suspended_task_address = nullptr; // if not null the function is suspended +#endif + ttg_data_copy_t *copies[num_streams+1] = { nullptr }; // the data copies tracked by this task + // +1 for the copy needed during send/bcast + device_state_t dev_state; + + parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class) + : parsec_ttg_task_base_t(mempool, task_class, num_streams, copies, dev_state.dev_ptr()) { + tt_ht_item.key = pkey(); + + // We store the hash of the key and the address where it can be found in locals considered as a scratchpad + *(uintptr_t*)&(parsec_task.locals[0]) = 0; //there is no key + *(uintptr_t*)&(parsec_task.locals[2]) = 0; //there is no key + } + + parsec_ttg_task_t(const key_type& key, parsec_thread_mempool_t *mempool, + parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, + TT *tt_ptr, int32_t priority) + : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority, + num_streams, copies, dev_state.dev_ptr(), + &release_task, tt_ptr->m_defer_writer) + , tt(tt_ptr), key(key) { + tt_ht_item.key = pkey(); + + // We store the hash of the key and the address where it can be found in locals considered as a scratchpad + uint64_t hv = ttg::hash>{}(key); + *(uintptr_t*)&(parsec_task.locals[0]) = hv; + *(uintptr_t*)&(parsec_task.locals[2]) = reinterpret_cast(&this->key); + } + + static void release_task(parsec_ttg_task_base_t* task_base) { + parsec_ttg_task_t *task = static_cast(task_base); + TT *tt = task->tt; + tt->release_task(task); + } + + parsec_key_t pkey() { return reinterpret_cast(&key); } + }; + + template + struct parsec_ttg_task_t : public parsec_ttg_task_base_t { + static constexpr size_t num_streams = TT::numins; + TT* tt; + size_goal_t stream[num_streams] = {}; +#ifdef TTG_HAS_COROUTINE + void* suspended_task_address = nullptr; // if not null the function is suspended +#endif + ttg_data_copy_t *copies[num_streams+1] = { nullptr }; // the data copies tracked by this task + // +1 for the copy needed during send/bcast + device_state_t dev_state; + + parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class) + : parsec_ttg_task_base_t(mempool, task_class, num_streams, copies, dev_state.dev_ptr()) { + tt_ht_item.key = pkey(); + } + + parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, + parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority) + : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority, + num_streams, copies, dev_state.dev_ptr(), + &release_task, tt_ptr->m_defer_writer) + , tt(tt_ptr) { + tt_ht_item.key = pkey(); + } + + static void release_task(parsec_ttg_task_base_t* task_base) { + parsec_ttg_task_t *task = static_cast(task_base); + TT *tt = task->tt; + tt->release_task(task); + } + + parsec_key_t pkey() { return 0; } + }; + + + } // namespace detail + +} // namespace ttg_parsec + +#endif // TTG_PARSEC_TASK_H \ No newline at end of file diff --git a/ttg/ttg/parsec/thread_local.h b/ttg/ttg/parsec/thread_local.h new file mode 100644 index 000000000..54b98885e --- /dev/null +++ b/ttg/ttg/parsec/thread_local.h @@ -0,0 +1,22 @@ +#ifndef TTG_PARSEC_THREAD_LOCAL_H +#define TTG_PARSEC_THREAD_LOCAL_H + +namespace ttg_parsec { + +namespace detail { + + // fwd decls + struct parsec_ttg_task_base_t; + struct ttg_data_copy_t; + + inline thread_local parsec_ttg_task_base_t *parsec_ttg_caller = nullptr; + + inline ttg_data_copy_t*& ttg_data_copy_container() { + static thread_local ttg_data_copy_t *ptr = nullptr; + return ptr; + } + +} // namespace detail +} // namespace ttg_parsec + +#endif // TTG_PARSEC_THREAD_LOCAL_H \ No newline at end of file diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index 01e88d8a2..77991551c 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -7,6 +7,11 @@ #define TTG_USE_PARSEC 1 #endif // !defined(TTG_IMPL_NAME) +/* Whether to defer a potential writer if there are readers. + * This may avoid extra copies in exchange for concurrency. + * This may cause deadlocks, so use with caution. */ +#define TTG_PARSEC_DEFER_WRITER false + #include "ttg/impl_selector.h" /* include ttg header to make symbols available in case this header is included directly */ @@ -31,8 +36,15 @@ #include "ttg/serialization/data_descriptor.h" +#include "ttg/view.h" + #include "ttg/parsec/fwd.h" +#include "ttg/parsec/buffer.h" +#include "ttg/parsec/devicescratch.h" +#include "ttg/parsec/thread_local.h" +#include "ttg/parsec/devicefunc.h" + #include #include #include @@ -80,6 +92,9 @@ #include #include "ttg/parsec/ttg_data_copy.h" +#include "ttg/parsec/thread_local.h" +#include "ttg/parsec/ptr.h" +#include "ttg/parsec/task.h" #undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES @@ -87,11 +102,6 @@ #include #endif -/* Whether to defer a potential writer if there are readers. - * This may avoid extra copies in exchange for concurrency. - * This may cause deadlocks, so use with caution. */ -#define TTG_PARSEC_DEFER_WRITER false - /* PaRSEC function declarations */ extern "C" { void parsec_taskpool_termination_detected(parsec_taskpool_t *tp); @@ -100,42 +110,6 @@ int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks); #include "ttg/view.h" -namespace test::meta { - - template - using void_t = void; - namespace detail { - template class TT, class... Args> - struct detector { - using value_t = std::false_type; - using type = Default; - }; - - template class TT, class... Args> - struct detector>, TT, Args...> { - using value_t = std::true_type; - using type = TT; - }; - - } - - template