Skip to content

Commit

Permalink
Fix msg header ctor and non-device tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph Schuchart <[email protected]>
  • Loading branch information
devreal committed Mar 9, 2023
1 parent 2a0e47f commit de259c2
Showing 1 changed file with 14 additions and 24 deletions.
38 changes: 14 additions & 24 deletions ttg/ttg/parsec/ttg.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ namespace ttg_parsec {
int num_keys = 0;
int sender;

msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int nk, int sender)
msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int sender, int nk)
: fn_id(fid)
, taskpool_id(tid)
, op_id(oid)
, param_id(pid)
, num_keys(num_keys)
, num_keys(nk)
, sender(sender)
{ }
};
Expand Down Expand Up @@ -237,7 +237,7 @@ namespace ttg_parsec {
int parsec_ttg_profile_backend_allocate_datacopy, parsec_ttg_profile_backend_free_datacopy;
#endif

static constexpr const int PARSEC_TTG_MAX_AM_SIZE = 1024 * 1024;
static constexpr const int PARSEC_TTG_MAX_AM_SIZE = 4 * 1024;
WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c = nullptr)
: WorldImplBase(query_comm_size(), query_comm_rank())
, ctx(c)
Expand Down Expand Up @@ -1748,7 +1748,6 @@ namespace ttg_parsec {
uint64_t key_end_pos;
std::vector<keyT> keylist;
int num_keys = msg->tt_id.num_keys;
std::cout << "set_arg_from_msg num_keys " << num_keys << std::endl;
keylist.reserve(num_keys);
auto rank = world.rank();
for (int k = 0; k < num_keys; ++k) {
Expand All @@ -1764,7 +1763,6 @@ namespace ttg_parsec {
if constexpr (!ttg::meta::is_void_v<valueT>) {
using decvalueT = std::decay_t<valueT>;
int32_t num_iovecs = msg->tt_id.num_iovecs;
std::span<parsec_ce_mem_reg_handle_t> tag_span;
detail::ttg_data_copy_t *copy;
if constexpr (ttg::has_split_metadata<decvalueT>::value) {
ttg::SplitMetadataDescriptor<decvalueT> descr;
Expand Down Expand Up @@ -1797,8 +1795,6 @@ namespace ttg_parsec {
std::memcpy(&cbtag, msg->bytes + pos, sizeof(cbtag));
pos += sizeof(cbtag);

std::cout << "set_arg_from_msg pos after cbtag " << pos << std::endl;

/* create the value from the metadata */
auto activation = new detail::rma_delayed_activate(
std::move(keylist), copy, num_iovecs, [this](std::vector<keyT> &&keylist, detail::ttg_data_copy_t *copy) {
Expand Down Expand Up @@ -1834,6 +1830,7 @@ namespace ttg_parsec {
iov.num_bytes, &lreg, &lreg_size);
world.impl().increment_inflight_msg();
/* TODO: PaRSEC should treat the remote callback as a tag, not a function pointer! */
std::cout << "set_arg_from_msg: get rreg " << rreg << " remote " << remote << std::endl;
parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iov.num_bytes, remote,
&detail::get_complete_cb<ActivationT>, activation,
/*world.impl().parsec_ttg_rma_tag()*/
Expand All @@ -1847,10 +1844,8 @@ namespace ttg_parsec {
handle_iovecs_fn(copy->iovec_span());
copy->iovec_reset();
}
std::cout << "set_arg_from_msg pos after iovecs " << pos << " key_offset " << msg->tt_id.key_offset << std::endl;

assert(num_iovecs == nv);
if (size != (key_end_pos + sizeof(msg_header_t))) std::cout << "XXX pos + header + keys " << pos + sizeof(msg_header_t) + num_keys*sizeof(keyT) << " size " << size << std::endl;
assert(size == (key_end_pos + sizeof(msg_header_t)));
}
// case 2 and 3
Expand Down Expand Up @@ -2259,7 +2254,6 @@ namespace ttg_parsec {
std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
msg_header_t::MSG_SET_ARG, i, world_impl.rank(), 1);
using decvalueT = std::decay_t<Value>;
msg->tt_id.sender = world_impl.rank();

if constexpr (!ttg::meta::is_void_v<decvalueT>) {

Expand Down Expand Up @@ -2300,8 +2294,9 @@ namespace ttg_parsec {
int32_t lreg_size_i = lreg_size;
std::memcpy(msg->bytes + pos, &lreg_size_i, sizeof(lreg_size_i));
pos += sizeof(lreg_size_i);
std::memcpy(msg->bytes + pos, lreg, lreg_size_i);
pos += lreg_size_i;
std::memcpy(msg->bytes + pos, lreg, lreg_size);
pos += lreg_size;
std::cout << "set_arg_impl lreg " << lreg << std::endl;
/* TODO: can we avoid the extra indirection of going through std::function? */
std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
/* shared_ptr of value and registration captured by value so resetting
Expand Down Expand Up @@ -2339,21 +2334,16 @@ namespace ttg_parsec {
/* pack the key */
msg->tt_id.num_keys = 0;
msg->tt_id.key_offset = pos;
std::cout << "set_arg_impl pos " << pos << " key_offset " << msg->tt_id.key_offset << " key size " << sizeof(key) << std::endl;
if constexpr (!ttg::meta::is_void_v<Key>) {
size_t tmppos = pack(key, msg->bytes, pos);
std::cout << "set_arg_impl pos before " << pos << " after pack " << tmppos << std::endl;
pos = tmppos;
msg->tt_id.num_keys = 1;
}

parsec_taskpool_t *tp = world_impl.taskpool();
tp->tdm.module->outgoing_message_start(tp, owner, NULL);
tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
std::cout << "set_arg_impl num_keys " << msg->tt_id.num_keys
<< ", total size " << sizeof(msg_header_t) + pos
<< " pos " << pos << " key_offset " << msg->tt_id.key_offset
<< " sizeof header" << sizeof(msg_header_t) << std::endl;
std::cout << "set_arg_impl send_am owner " << owner << " sender " << msg->tt_id.sender << std::endl;
parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
sizeof(msg_header_t) + pos);
#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
Expand Down Expand Up @@ -2455,8 +2445,10 @@ namespace ttg_parsec {
std::shared_ptr<void>{lreg, [](void *ptr) {
parsec_ce_mem_reg_handle_t memreg =
(parsec_ce_mem_reg_handle_t)ptr;
std::cout << "broadcast_arg memunreg lreg " << memreg << std::endl;
parsec_ce.mem_unregister(&memreg);
}}));
std::cout << "broadcast_arg memreg lreg " << lreg << std::endl;
}
};

Expand Down Expand Up @@ -2493,7 +2485,6 @@ namespace ttg_parsec {
msg->tt_id.num_iovecs = num_iovs;

std::size_t save_pos = pos;
std::cout << "broadcast_arg save_pos " << save_pos <<std::endl;

parsec_taskpool_t *tp = world_impl.taskpool();
for (auto it = keylist_sorted.begin(); it < keylist_sorted.end(); /* increment done inline */) {
Expand Down Expand Up @@ -2524,15 +2515,15 @@ namespace ttg_parsec {
pos += sizeof(lreg_size);
std::memcpy(msg->bytes + pos, lreg_ptr.get(), lreg_size);
pos += lreg_size;
/* create a function that will be invoked upon RMA completion at the target */
std::shared_ptr<void> lreg_ptr_v = lreg_ptr;
std::cout << "broadcast_arg lreg_ptr " << lreg_ptr.get() << std::endl;
/* mark another reader on the copy */
copy = detail::register_data_copy<valueT>(copy, nullptr, true);
/* create a function that will be invoked upon RMA completion at the target */
std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
/* shared_ptr of value and registration captured by value so resetting
* them here will eventually release the memory/registration */
detail::release_data_copy(copy);
lreg_ptr_v.reset();
lreg_ptr.reset();
});
std::intptr_t fn_ptr{reinterpret_cast<std::intptr_t>(fn)};
std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr));
Expand All @@ -2541,7 +2532,6 @@ namespace ttg_parsec {

/* mark the beginning of the keys */
msg->tt_id.key_offset = pos;
std::cout << "broadcast_arg key_offset " << msg->tt_id.key_offset <<std::endl;

/* pack all keys for this owner */
int num_keys = 0;
Expand All @@ -2554,7 +2544,7 @@ namespace ttg_parsec {

tp->tdm.module->outgoing_message_start(tp, owner, NULL);
tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
std::cout << "broadcast_arg total size " << sizeof(msg_header_t) + pos << " pos " << pos <<std::endl;
std::cout << "broadcast_arg send_am owner " << owner << std::endl;
parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
sizeof(msg_header_t) + pos);
}
Expand Down

0 comments on commit de259c2

Please sign in to comment.