From 18b6cb581c08514227c3cdd254c030be4fa08d68 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 12:40:42 -0500 Subject: [PATCH 01/70] Comment out stage in --- tasks/data_stager/src/data_stager.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 789f06492..d379deaf9 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -47,9 +47,9 @@ class Server : public TaskLib { } void StageIn(StageInTask *task, RunContext &rctx) { - AbstractStager &stager = *url_map_[rctx.lane_id_][task->bkt_id_]; - stager.StageIn(blob_mdm_, task, rctx); - task->SetModuleComplete(); +// AbstractStager &stager = *url_map_[rctx.lane_id_][task->bkt_id_]; +// stager.StageIn(blob_mdm_, task, rctx); +// task->SetModuleComplete(); } void StageOut(StageOutTask *task, RunContext &rctx) { From 42bf6d79bb7baf58f8adb0bb90c69b9f03ea0e67 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 12:41:46 -0500 Subject: [PATCH 02/70] Don't register stager --- tasks/data_stager/src/data_stager.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index d379deaf9..f5dcba46e 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -31,10 +31,10 @@ class Server : public TaskLib { } void RegisterStager(RegisterStagerTask *task, RunContext &rctx) { - std::string url = task->url_->str(); - std::unique_ptr stager = StagerFactory::Get(url); - stager->RegisterStager(task, rctx); - url_map_[rctx.lane_id_].emplace(task->bkt_id_, std::move(stager)); +// std::string url = task->url_->str(); +// std::unique_ptr stager = StagerFactory::Get(url); +// stager->RegisterStager(task, rctx); +// url_map_[rctx.lane_id_].emplace(task->bkt_id_, std::move(stager)); task->SetModuleComplete(); } From 7b4c8123b93bada386d52923be0025f0d9f58555 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 12:42:58 -0500 Subject: [PATCH 03/70] Don't set is file --- test/unit/hermes/test_bucket.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index 28460e12f..217d8ea61 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -464,10 +464,10 @@ TEST_CASE("TestHermesDataStager") { // Create a stageable bucket using hermes::data_stager::BinaryFileStager; hermes::Context ctx; - ctx.flags_.SetBits(HERMES_IS_FILE); + ctx.flags_.SetBits(0); hshm::charbuf url = BinaryFileStager::BuildFileUrl("/tmp/test.txt", page_size); - hermes::Bucket bkt(url.str(), file_size, HERMES_IS_FILE); + hermes::Bucket bkt(url.str(), file_size, 0); // Put a few blobs in the bucket for (size_t i = off; i < proc_count; ++i) { From 9d7b5f91ec91635eafe80023cdfc755e0242ab5f Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 12:47:10 -0500 Subject: [PATCH 04/70] Print tag --- tasks/hermes_bucket_mdm/src/hermes_bucket_mdm.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tasks/hermes_bucket_mdm/src/hermes_bucket_mdm.cc b/tasks/hermes_bucket_mdm/src/hermes_bucket_mdm.cc index 4e167e0ee..c0fe118c5 100644 --- a/tasks/hermes_bucket_mdm/src/hermes_bucket_mdm.cc +++ b/tasks/hermes_bucket_mdm/src/hermes_bucket_mdm.cc @@ -191,7 +191,6 @@ class Server : public TaskLib { /** Get or create a tag */ void GetOrCreateTag(GetOrCreateTagTask *task, RunContext &rctx) { TagId tag_id; - HILOG(kDebug, "Creating a tag on lane {}", rctx.lane_id_); // Check if the tag exists TAG_ID_MAP_T &tag_id_map = tag_id_map_[rctx.lane_id_]; @@ -201,6 +200,7 @@ class Server : public TaskLib { if (tag_name.size() > 0) { did_create = tag_id_map.find(tag_name) == tag_id_map.end(); } + HILOG(kDebug, "Creating a tag {} on lane {}", tag_name.str(), rctx.lane_id_); // Emplace bucket if it does not already exist if (did_create) { From 3ebf90a1ce8e8cc4c7a0f2d01c81df5448d45c9e Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 12:52:36 -0500 Subject: [PATCH 05/70] Make HERMES_IS_FILE again --- tasks/data_stager/src/data_stager.cc | 14 +++++++------- tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 19 ++++++++++--------- test/unit/hermes/test_bucket.cc | 4 ++-- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index f5dcba46e..789f06492 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -31,10 +31,10 @@ class Server : public TaskLib { } void RegisterStager(RegisterStagerTask *task, RunContext &rctx) { -// std::string url = task->url_->str(); -// std::unique_ptr stager = StagerFactory::Get(url); -// stager->RegisterStager(task, rctx); -// url_map_[rctx.lane_id_].emplace(task->bkt_id_, std::move(stager)); + std::string url = task->url_->str(); + std::unique_ptr stager = StagerFactory::Get(url); + stager->RegisterStager(task, rctx); + url_map_[rctx.lane_id_].emplace(task->bkt_id_, std::move(stager)); task->SetModuleComplete(); } @@ -47,9 +47,9 @@ class Server : public TaskLib { } void StageIn(StageInTask *task, RunContext &rctx) { -// AbstractStager &stager = *url_map_[rctx.lane_id_][task->bkt_id_]; -// stager.StageIn(blob_mdm_, task, rctx); -// task->SetModuleComplete(); + AbstractStager &stager = *url_map_[rctx.lane_id_][task->bkt_id_]; + stager.StageIn(blob_mdm_, task, rctx); + task->SetModuleComplete(); } void StageOut(StageOutTask *task, RunContext &rctx) { diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index 977db66f0..66ca76d82 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -178,15 +178,16 @@ class Server : public TaskLib { for (auto &it : blob_map) { BlobInfo &blob_info = it.second; // Update blob scores - float new_score = MakeScore(blob_info, now); - if (ShouldReorganize(blob_info, new_score, task->task_node_)) { - blob_mdm_.AsyncReorganizeBlob(task->task_node_ + 1, - blob_info.tag_id_, - blob_info.blob_id_, - new_score, 0, false); - } - blob_info.access_freq_ = 0; - blob_info.score_ = new_score; + // TODO(llogan): Add back +// float new_score = MakeScore(blob_info, now); +// if (ShouldReorganize(blob_info, new_score, task->task_node_)) { +// blob_mdm_.AsyncReorganizeBlob(task->task_node_ + 1, +// blob_info.tag_id_, +// blob_info.blob_id_, +// new_score, 0, false); +// } +// blob_info.access_freq_ = 0; +// blob_info.score_ = new_score; // Flush data if (blob_info.last_flush_ > 0 && diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index 217d8ea61..28460e12f 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -464,10 +464,10 @@ TEST_CASE("TestHermesDataStager") { // Create a stageable bucket using hermes::data_stager::BinaryFileStager; hermes::Context ctx; - ctx.flags_.SetBits(0); + ctx.flags_.SetBits(HERMES_IS_FILE); hshm::charbuf url = BinaryFileStager::BuildFileUrl("/tmp/test.txt", page_size); - hermes::Bucket bkt(url.str(), file_size, 0); + hermes::Bucket bkt(url.str(), file_size, HERMES_IS_FILE); // Put a few blobs in the bucket for (size_t i = off; i < proc_count; ++i) { From 29859d6ed1e63209678d6a96e6e74c05eb2ff3a1 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 12:56:09 -0500 Subject: [PATCH 06/70] Update path --- test/unit/hermes/test_bucket.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index 28460e12f..33903fa6b 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -445,6 +445,8 @@ TEST_CASE("TestHermesDataStager") { MPI_Comm_size(MPI_COMM_WORLD, &nprocs); // create dataset + std::string home_dir = getenv("HOME"); + std::string path = home_dir + "/test.txt"; size_t count_per_proc = 16; size_t off = rank * count_per_proc; size_t proc_count = off + count_per_proc; @@ -452,7 +454,7 @@ TEST_CASE("TestHermesDataStager") { size_t file_size = nprocs * page_size * 16; std::vector data(file_size, 0); if (rank == 0) { - FILE *file = fopen("/tmp/test.txt", "w"); + FILE *file = fopen(path.c_str(), "w"); fwrite(data.data(), sizeof(char), data.size(), file); fclose(file); } @@ -465,8 +467,9 @@ TEST_CASE("TestHermesDataStager") { using hermes::data_stager::BinaryFileStager; hermes::Context ctx; ctx.flags_.SetBits(HERMES_IS_FILE); + hshm::charbuf url = - BinaryFileStager::BuildFileUrl("/tmp/test.txt", page_size); + BinaryFileStager::BuildFileUrl(path, page_size); hermes::Bucket bkt(url.str(), file_size, HERMES_IS_FILE); // Put a few blobs in the bucket From 838a53bc2cb15ec51443b7f725e21e55a47c8296 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 13:03:29 -0500 Subject: [PATCH 07/70] Check stager existence --- tasks/data_stager/src/data_stager.cc | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 789f06492..e77797eb6 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -47,13 +47,25 @@ class Server : public TaskLib { } void StageIn(StageInTask *task, RunContext &rctx) { - AbstractStager &stager = *url_map_[rctx.lane_id_][task->bkt_id_]; + std::unordered_map>::iterator it = + url_map_[rctx.lane_id_].find(task->bkt_id_); + if (it == url_map_[rctx.lane_id_].end()) { + HELOG(kError, "Could not find stager for bucket: {}", task->bkt_id_); + task->SetModuleComplete(); + } + AbstractStager &stager = *it->second; stager.StageIn(blob_mdm_, task, rctx); task->SetModuleComplete(); } void StageOut(StageOutTask *task, RunContext &rctx) { - AbstractStager &stager = *url_map_[rctx.lane_id_][task->bkt_id_]; + std::unordered_map>::iterator it = + url_map_[rctx.lane_id_].find(task->bkt_id_); + if (it == url_map_[rctx.lane_id_].end()) { + HELOG(kError, "Could not find stager for bucket: {}", task->bkt_id_); + task->SetModuleComplete(); + } + AbstractStager &stager = *it->second; stager.StageOut(blob_mdm_, task, rctx); task->SetModuleComplete(); } From 3549ded243f74346243d4483e8ded2e79c87bb80 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 13:11:18 -0500 Subject: [PATCH 08/70] only register stagers --- tasks/data_stager/include/data_stager/data_stager_tasks.h | 6 ++++++ test/unit/hermes/test_bucket.cc | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tasks/data_stager/include/data_stager/data_stager_tasks.h b/tasks/data_stager/include/data_stager/data_stager_tasks.h index 6689729ab..e9862b5c9 100644 --- a/tasks/data_stager/include/data_stager/data_stager_tasks.h +++ b/tasks/data_stager/include/data_stager/data_stager_tasks.h @@ -113,6 +113,12 @@ struct RegisterStagerTask : public Task, TaskFlags { HSHM_MAKE_AR(url_, alloc, url); } + /** Destructor */ + HSHM_ALWAYS_INLINE + ~RegisterStagerTask() { + HSHM_DESTROY_AR(url_) + } + /** Duplicate message */ void Dup(hipc::Allocator *alloc, RegisterStagerTask &other) { task_dup(other); diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index 33903fa6b..0a1ce54dd 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -466,11 +466,11 @@ TEST_CASE("TestHermesDataStager") { // Create a stageable bucket using hermes::data_stager::BinaryFileStager; hermes::Context ctx; - ctx.flags_.SetBits(HERMES_IS_FILE); - + ctx.flags_.SetBits(0); hshm::charbuf url = BinaryFileStager::BuildFileUrl(path, page_size); hermes::Bucket bkt(url.str(), file_size, HERMES_IS_FILE); + HILOG(kInfo, "CREATED STAGERS!!!") // Put a few blobs in the bucket for (size_t i = off; i < proc_count; ++i) { From 27ac08cda1787ff18cbe97a19ca988418ccb41bf Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 14:50:40 -0500 Subject: [PATCH 09/70] don't do require checks right now --- test/unit/hermes/test_bucket.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index 0a1ce54dd..6fac3efb0 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -466,7 +466,7 @@ TEST_CASE("TestHermesDataStager") { // Create a stageable bucket using hermes::data_stager::BinaryFileStager; hermes::Context ctx; - ctx.flags_.SetBits(0); + ctx.flags_.SetBits(HERMES_IS_FILE); hshm::charbuf url = BinaryFileStager::BuildFileUrl(path, page_size); hermes::Bucket bkt(url.str(), file_size, HERMES_IS_FILE); @@ -482,11 +482,11 @@ TEST_CASE("TestHermesDataStager") { bkt.PartialPut(blob_name.str(), blob, 0, ctx); hermes::Blob blob2; bkt.Get(blob_name.str(), blob2, ctx); - REQUIRE(blob2.size() == page_size); + // REQUIRE(blob2.size() == page_size); hermes::Blob full_blob(page_size); memcpy(full_blob.data(), blob.data(), blob.size()); memcpy(full_blob.data() + blob.size(), data.data(), page_size / 2); - REQUIRE(full_blob == blob2); + // REQUIRE(full_blob == blob2); } for (size_t i = off; i < proc_count; ++i) { hshm::charbuf blob_name = hermes::adapter::BlobPlacement::CreateBlobName(i); From b3041867f25c2f95442978e31843e3cd5ea90239 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 14:51:51 -0500 Subject: [PATCH 10/70] Comment out stage in --- tasks/data_stager/src/data_stager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index e77797eb6..e97a80aae 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -54,7 +54,7 @@ class Server : public TaskLib { task->SetModuleComplete(); } AbstractStager &stager = *it->second; - stager.StageIn(blob_mdm_, task, rctx); + // stager.StageIn(blob_mdm_, task, rctx); task->SetModuleComplete(); } From b4dc9ee0485dffa84d81239beee346035ffd7f92 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 14:52:48 -0500 Subject: [PATCH 11/70] Comment out stage in --- tasks/data_stager/src/data_stager.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index e97a80aae..78ccd563b 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -47,13 +47,13 @@ class Server : public TaskLib { } void StageIn(StageInTask *task, RunContext &rctx) { - std::unordered_map>::iterator it = - url_map_[rctx.lane_id_].find(task->bkt_id_); - if (it == url_map_[rctx.lane_id_].end()) { - HELOG(kError, "Could not find stager for bucket: {}", task->bkt_id_); - task->SetModuleComplete(); - } - AbstractStager &stager = *it->second; +// std::unordered_map>::iterator it = +// url_map_[rctx.lane_id_].find(task->bkt_id_); +// if (it == url_map_[rctx.lane_id_].end()) { +// HELOG(kError, "Could not find stager for bucket: {}", task->bkt_id_); +// task->SetModuleComplete(); +// } +// AbstractStager &stager = *it->second; // stager.StageIn(blob_mdm_, task, rctx); task->SetModuleComplete(); } From a1bdfc26646dc14f1d4dbcf3b6fd83aa081f365a Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 14:54:46 -0500 Subject: [PATCH 12/70] Add back find --- tasks/data_stager/src/data_stager.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 78ccd563b..84c77cc9f 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -47,8 +47,8 @@ class Server : public TaskLib { } void StageIn(StageInTask *task, RunContext &rctx) { -// std::unordered_map>::iterator it = -// url_map_[rctx.lane_id_].find(task->bkt_id_); + std::unordered_map>::iterator it = + url_map_[rctx.lane_id_].find(task->bkt_id_); // if (it == url_map_[rctx.lane_id_].end()) { // HELOG(kError, "Could not find stager for bucket: {}", task->bkt_id_); // task->SetModuleComplete(); From fd91f1726c11edef9c56612150b8e2013469c5ff Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 14:55:33 -0500 Subject: [PATCH 13/70] Add bck if cehck --- tasks/data_stager/src/data_stager.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 84c77cc9f..891f0a191 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -49,10 +49,10 @@ class Server : public TaskLib { void StageIn(StageInTask *task, RunContext &rctx) { std::unordered_map>::iterator it = url_map_[rctx.lane_id_].find(task->bkt_id_); -// if (it == url_map_[rctx.lane_id_].end()) { -// HELOG(kError, "Could not find stager for bucket: {}", task->bkt_id_); -// task->SetModuleComplete(); -// } + if (it == url_map_[rctx.lane_id_].end()) { + HELOG(kError, "Could not find stager for bucket: {}", task->bkt_id_); + task->SetModuleComplete(); + } // AbstractStager &stager = *it->second; // stager.StageIn(blob_mdm_, task, rctx); task->SetModuleComplete(); From f6dcd222203a049da9265b7a1e873431d15a396e Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 14:56:39 -0500 Subject: [PATCH 14/70] Return from erronous stager --- tasks/data_stager/src/data_stager.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 891f0a191..2eac17206 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -52,6 +52,7 @@ class Server : public TaskLib { if (it == url_map_[rctx.lane_id_].end()) { HELOG(kError, "Could not find stager for bucket: {}", task->bkt_id_); task->SetModuleComplete(); + return; } // AbstractStager &stager = *it->second; // stager.StageIn(blob_mdm_, task, rctx); @@ -64,6 +65,7 @@ class Server : public TaskLib { if (it == url_map_[rctx.lane_id_].end()) { HELOG(kError, "Could not find stager for bucket: {}", task->bkt_id_); task->SetModuleComplete(); + return; } AbstractStager &stager = *it->second; stager.StageOut(blob_mdm_, task, rctx); From e2fa6d89647b528fc956b8097438752038574e6f Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 14:57:41 -0500 Subject: [PATCH 15/70] Add back stager dereference --- tasks/data_stager/src/data_stager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 2eac17206..4899bcb81 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -54,7 +54,7 @@ class Server : public TaskLib { task->SetModuleComplete(); return; } -// AbstractStager &stager = *it->second; + AbstractStager &stager = *it->second; // stager.StageIn(blob_mdm_, task, rctx); task->SetModuleComplete(); } From 8fd660e952889ed631a04b7d4213b3e7e13b6169 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 15:01:33 -0500 Subject: [PATCH 16/70] Change stager dereference to ptr --- tasks/data_stager/src/data_stager.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 4899bcb81..aa70e3f6a 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -54,8 +54,8 @@ class Server : public TaskLib { task->SetModuleComplete(); return; } - AbstractStager &stager = *it->second; - // stager.StageIn(blob_mdm_, task, rctx); + std::unique_ptr &stager = it->second; + stager->StageIn(blob_mdm_, task, rctx); task->SetModuleComplete(); } @@ -67,8 +67,8 @@ class Server : public TaskLib { task->SetModuleComplete(); return; } - AbstractStager &stager = *it->second; - stager.StageOut(blob_mdm_, task, rctx); + std::unique_ptr &stager = it->second; + stager->StageOut(blob_mdm_, task, rctx); task->SetModuleComplete(); } public: From 7827bda14f3965b8d20e46624023be29506742fb Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 15:10:52 -0500 Subject: [PATCH 17/70] Add print for stager pointer --- tasks/data_stager/src/data_stager.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index aa70e3f6a..946c022c0 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -55,7 +55,8 @@ class Server : public TaskLib { return; } std::unique_ptr &stager = it->second; - stager->StageIn(blob_mdm_, task, rctx); + HILOG(kInfo, "POINTER FAILING HERE?: {}", (size_t)stager.get()); + // stager->StageIn(blob_mdm_, task, rctx); task->SetModuleComplete(); } From f63ebeb625bcbd0d60db1c8baaafdb40fd40b01f Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 16:08:16 -0500 Subject: [PATCH 18/70] Sleep for data op --- test/unit/hermes/test_bucket.cc | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index 6fac3efb0..9b5ccc755 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -547,7 +547,15 @@ TEST_CASE("TestHermesDataOp") { // HRUN_ADMIN->FlushRoot(DomainId::GetGlobal()); // Verify derived operator happens hermes::Bucket bkt_min("data_bkt_min", 0, 0); - size_t size = bkt_min.GetSize(); + size_t size; + do { + size = bkt_min.GetSize(); + if (size != sizeof(float) * count_per_proc * nprocs) { + sleep(1); + } else { + break; + } + } while (true); hermes::Blob blob2; bkt_min.Get(std::to_string(0), blob2, ctx); From 10be1ea3029bbe633afd2e698691e8e18ef064ad Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 16:09:59 -0500 Subject: [PATCH 19/70] Sleep for data op --- test/unit/hermes/test_bucket.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index 9b5ccc755..61df79db3 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -551,6 +551,7 @@ TEST_CASE("TestHermesDataOp") { do { size = bkt_min.GetSize(); if (size != sizeof(float) * count_per_proc * nprocs) { + HILOG(kInfo, "Waiting for derived data"); sleep(1); } else { break; From 91d3aa28d7ae897eef9f22f49bdb31c3fcc4f702 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 16:10:53 -0500 Subject: [PATCH 20/70] comment out stager for now --- include/hermes/config_manager.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/hermes/config_manager.h b/include/hermes/config_manager.h index 1cf44a040..3f0c173ac 100644 --- a/include/hermes/config_manager.h +++ b/include/hermes/config_manager.h @@ -52,8 +52,8 @@ class ConfigurationManager { bkt_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_bkt_mdm"); op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm", bkt_mdm_.id_, blob_mdm_.id_); - stager_mdm_.CreateRoot(DomainId::GetGlobal(), - "hermes_stager_mdm", blob_mdm_.id_); +// stager_mdm_.CreateRoot(DomainId::GetGlobal(), +// "hermes_stager_mdm", blob_mdm_.id_); blob_mdm_.SetBucketMdmRoot(DomainId::GetGlobal(), bkt_mdm_.id_, stager_mdm_.id_, op_mdm_.id_); From 0472f5fe8afc0bfc0dd771353deeaf29dbaff871 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 22:44:15 -0500 Subject: [PATCH 21/70] Don't free buffers right now? --- tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index 66ca76d82..49623894a 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -364,9 +364,9 @@ class Server : public TaskLib { for (BufferInfo &buf : blob_info.buffers_) { TargetInfo &target = *target_map_[buf.tid_]; std::vector buf_vec = {buf}; - target.AsyncFree(task->task_node_ + 1, - blob_info.score_, - std::move(buf_vec), true); +// target.AsyncFree(task->task_node_ + 1, +// blob_info.score_, +// std::move(buf_vec), true); } blob_info.buffers_.clear(); blob_info.max_blob_size_ = 0; From f86ef99841ef17f1ce5d2a9c39c6b7a9e93ac7d3 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 22:55:28 -0500 Subject: [PATCH 22/70] Add back stager --- include/hermes/config_manager.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/hermes/config_manager.h b/include/hermes/config_manager.h index 3f0c173ac..1cf44a040 100644 --- a/include/hermes/config_manager.h +++ b/include/hermes/config_manager.h @@ -52,8 +52,8 @@ class ConfigurationManager { bkt_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_bkt_mdm"); op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm", bkt_mdm_.id_, blob_mdm_.id_); -// stager_mdm_.CreateRoot(DomainId::GetGlobal(), -// "hermes_stager_mdm", blob_mdm_.id_); + stager_mdm_.CreateRoot(DomainId::GetGlobal(), + "hermes_stager_mdm", blob_mdm_.id_); blob_mdm_.SetBucketMdmRoot(DomainId::GetGlobal(), bkt_mdm_.id_, stager_mdm_.id_, op_mdm_.id_); From a176011f1330fa332e09a8cccaa94fdce8c8c5e4 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 22:59:34 -0500 Subject: [PATCH 23/70] Add back StageIn --- tasks/data_stager/src/data_stager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 946c022c0..9c806e159 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -56,7 +56,7 @@ class Server : public TaskLib { } std::unique_ptr &stager = it->second; HILOG(kInfo, "POINTER FAILING HERE?: {}", (size_t)stager.get()); - // stager->StageIn(blob_mdm_, task, rctx); + stager->StageIn(blob_mdm_, task, rctx); task->SetModuleComplete(); } From 8d9985c9f03fd4c53bf05217b6ebe95b9b9079ff Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 23:02:06 -0500 Subject: [PATCH 24/70] Add more prints --- tasks/data_stager/src/data_stager.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 9c806e159..17b66c498 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -34,6 +34,7 @@ class Server : public TaskLib { std::string url = task->url_->str(); std::unique_ptr stager = StagerFactory::Get(url); stager->RegisterStager(task, rctx); + HILOG(kInfo, "REGISTERING STAGER: {}", (size_t)stager.get()); url_map_[rctx.lane_id_].emplace(task->bkt_id_, std::move(stager)); task->SetModuleComplete(); } @@ -57,6 +58,7 @@ class Server : public TaskLib { std::unique_ptr &stager = it->second; HILOG(kInfo, "POINTER FAILING HERE?: {}", (size_t)stager.get()); stager->StageIn(blob_mdm_, task, rctx); + HILOG(kInfo, "STAGED IN?: {}", (size_t)stager.get()); task->SetModuleComplete(); } From 516a2c7213f778ea60967f74099050dd7425311d Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 23:05:14 -0500 Subject: [PATCH 25/70] Make staging info log level --- .../data_stager/include/data_stager/factory/binary_stager.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tasks/data_stager/include/data_stager/factory/binary_stager.h b/tasks/data_stager/include/data_stager/factory/binary_stager.h index 93b958578..763d6b208 100644 --- a/tasks/data_stager/include/data_stager/factory/binary_stager.h +++ b/tasks/data_stager/include/data_stager/factory/binary_stager.h @@ -58,7 +58,7 @@ class BinaryFileStager : public AbstractStager { void StageIn(blob_mdm::Client &blob_mdm, StageInTask *task, RunContext &rctx) override { adapter::BlobPlacement plcmnt; plcmnt.DecodeBlobName(*task->blob_name_, page_size_); - HILOG(kDebug, "Attempting to stage {} bytes from the backend file {} at offset {}", + HILOG(kInfo, "Attempting to stage {} bytes from the backend file {} at offset {}", page_size_, url_, plcmnt.bucket_off_); LPointer blob = HRUN_CLIENT->AllocateBuffer(page_size_); ssize_t real_size = HERMES_POSIX_API->pread(fd_, @@ -71,8 +71,9 @@ class BinaryFileStager : public AbstractStager { return; } memcpy(blob.ptr_ + plcmnt.blob_off_, blob.ptr_, real_size); - HILOG(kDebug, "Staged {} bytes from the backend file {}", + HILOG(kInfo, "Staged {} bytes from the backend file {}", real_size, url_); + HILOG(kInfo, "Submitting put blob {} to blob mdm", task->blob_name_->str()) hapi::Context ctx; LPointer put_task = blob_mdm.AsyncPutBlob(task->task_node_ + 1, From a8ad402c5e2b6d96817a5a200a1092aeca6216f2 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 23:06:47 -0500 Subject: [PATCH 26/70] AsyncComplete for MonitorTask --- tasks/bdev/include/bdev/bdev.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tasks/bdev/include/bdev/bdev.h b/tasks/bdev/include/bdev/bdev.h index be796cb83..d795f6127 100644 --- a/tasks/bdev/include/bdev/bdev.h +++ b/tasks/bdev/include/bdev/bdev.h @@ -54,10 +54,10 @@ class Client : public TaskLibClient { queue_info, dev_info); } void AsyncCreateComplete(ConstructTask *task) { - if (task->IsComplete()) { + if (task->IsModuleComplete()) { id_ = task->id_; queue_id_ = QueueId(id_); - monitor_task_ = AsyncMonitor(task->task_node_, 100).ptr_; + monitor_task_ = AsyncMonitor(task->task_node_ + 1, 100).ptr_; HRUN_CLIENT->DelTask(task); } } From 749a3c83249363495314f1d68fd39134d77bfb16 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 23:14:02 -0500 Subject: [PATCH 27/70] Include bkt_id --- tasks/data_stager/include/data_stager/factory/binary_stager.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tasks/data_stager/include/data_stager/factory/binary_stager.h b/tasks/data_stager/include/data_stager/factory/binary_stager.h index 763d6b208..c8d54160f 100644 --- a/tasks/data_stager/include/data_stager/factory/binary_stager.h +++ b/tasks/data_stager/include/data_stager/factory/binary_stager.h @@ -73,7 +73,7 @@ class BinaryFileStager : public AbstractStager { memcpy(blob.ptr_ + plcmnt.blob_off_, blob.ptr_, real_size); HILOG(kInfo, "Staged {} bytes from the backend file {}", real_size, url_); - HILOG(kInfo, "Submitting put blob {} to blob mdm", task->blob_name_->str()) + HILOG(kInfo, "Submitting put blob {} ({}) to blob mdm", task->blob_name_->str(), task->bkt_id_) hapi::Context ctx; LPointer put_task = blob_mdm.AsyncPutBlob(task->task_node_ + 1, From bfe29c0a874a07126487428d4dd3a2345ddad6c3 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 23:14:34 -0500 Subject: [PATCH 28/70] Beginning put --- tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index 49623894a..cd946bebd 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -219,6 +219,7 @@ class Server : public TaskLib { * Create a blob's metadata * */ void PutBlob(PutBlobTask *task, RunContext &rctx) { + HILOG(kInfo, "Beginning PUT"); // Get the blob info data structure hshm::charbuf blob_name = hshm::to_charbuf(*task->blob_name_); if (task->blob_id_.IsNull()) { From e63edfeb89197d2b6975a4771631330e3c8d6677 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 23:14:54 -0500 Subject: [PATCH 29/70] Beginning put --- tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index cd946bebd..47cbd980a 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -219,13 +219,13 @@ class Server : public TaskLib { * Create a blob's metadata * */ void PutBlob(PutBlobTask *task, RunContext &rctx) { - HILOG(kInfo, "Beginning PUT"); // Get the blob info data structure hshm::charbuf blob_name = hshm::to_charbuf(*task->blob_name_); if (task->blob_id_.IsNull()) { task->blob_id_ = GetOrCreateBlobId(task->tag_id_, task->lane_hash_, blob_name, rctx, task->flags_); } + HILOG(kInfo, "Beginning PUT for {}", blob_name); BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; BlobInfo &blob_info = blob_map[task->blob_id_]; From b48333a2e385779123cd1c3b0836b870ea1e3fdb Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 23:15:41 -0500 Subject: [PATCH 30/70] Beginning put --- tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index 47cbd980a..5ad3ec60f 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -225,7 +225,7 @@ class Server : public TaskLib { task->blob_id_ = GetOrCreateBlobId(task->tag_id_, task->lane_hash_, blob_name, rctx, task->flags_); } - HILOG(kInfo, "Beginning PUT for {}", blob_name); + HILOG(kInfo, "Beginning PUT for {}", blob_name.str()); BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; BlobInfo &blob_info = blob_map[task->blob_id_]; From 5e1e5a5dd4f7a84865522da069059f1c1ed10c18 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 23:19:02 -0500 Subject: [PATCH 31/70] Force grouping --- .../include/data_stager/data_stager_tasks.h | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tasks/data_stager/include/data_stager/data_stager_tasks.h b/tasks/data_stager/include/data_stager/data_stager_tasks.h index e9862b5c9..baded22bf 100644 --- a/tasks/data_stager/include/data_stager/data_stager_tasks.h +++ b/tasks/data_stager/include/data_stager/data_stager_tasks.h @@ -149,7 +149,10 @@ struct RegisterStagerTask : public Task, TaskFlags { /** Create group */ HSHM_ALWAYS_INLINE u32 GetGroup(hshm::charbuf &group) { - return TASK_UNORDERED; + hrun::LocalSerialize srl(group); + srl << bkt_id_.unique_; + srl << bkt_id_.node_id_; + return 0; } }; @@ -263,7 +266,10 @@ struct StageInTask : public Task, TaskFlags { /** Create group */ HSHM_ALWAYS_INLINE u32 GetGroup(hshm::charbuf &group) { - return TASK_UNORDERED; + hrun::LocalSerialize srl(group); + srl << bkt_id_.unique_; + srl << bkt_id_.node_id_; + return 0; } }; @@ -318,7 +324,10 @@ struct StageOutTask : public Task, TaskFlags { /** Create group */ HSHM_ALWAYS_INLINE u32 GetGroup(hshm::charbuf &group) { - return TASK_UNORDERED; + hrun::LocalSerialize srl(group); + srl << bkt_id_.unique_; + srl << bkt_id_.node_id_; + return 0; } }; From 0346ff0f24d302491cdb4697d00a23512950702c Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 23:23:01 -0500 Subject: [PATCH 32/70] Print blob mdm id --- tasks/data_stager/include/data_stager/factory/binary_stager.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tasks/data_stager/include/data_stager/factory/binary_stager.h b/tasks/data_stager/include/data_stager/factory/binary_stager.h index c8d54160f..f66e3e0ec 100644 --- a/tasks/data_stager/include/data_stager/factory/binary_stager.h +++ b/tasks/data_stager/include/data_stager/factory/binary_stager.h @@ -73,7 +73,8 @@ class BinaryFileStager : public AbstractStager { memcpy(blob.ptr_ + plcmnt.blob_off_, blob.ptr_, real_size); HILOG(kInfo, "Staged {} bytes from the backend file {}", real_size, url_); - HILOG(kInfo, "Submitting put blob {} ({}) to blob mdm", task->blob_name_->str(), task->bkt_id_) + HILOG(kInfo, "Submitting put blob {} ({}) to blob mdm ({})", + task->blob_name_->str(), task->bkt_id_, blob_mdm.id_) hapi::Context ctx; LPointer put_task = blob_mdm.AsyncPutBlob(task->task_node_ + 1, From 9bc89cdc4a89f0ffd81eab27a05608b501bd6dcd Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 23:26:11 -0500 Subject: [PATCH 33/70] Updated node id --- tasks/data_stager/src/data_stager.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 17b66c498..794102b53 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -23,6 +23,7 @@ class Server : public TaskLib { void Construct(ConstructTask *task, RunContext &rctx) { url_map_.resize(HRUN_QM_RUNTIME->max_lanes_); blob_mdm_.Init(task->blob_mdm_); + HILOG(kInfo, "(node {}) BLOB MDM: {}", HRUN_CLIENT->node_id_, blob_mdm_.id_); task->SetModuleComplete(); } From da6e074a6a425859fb1708ae79a7c509a7c4ea4e Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Thu, 19 Oct 2023 23:28:23 -0500 Subject: [PATCH 34/70] HRUN CLIENT --- include/hermes/config_manager.h | 1 + tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/include/hermes/config_manager.h b/include/hermes/config_manager.h index 1cf44a040..064e83e35 100644 --- a/include/hermes/config_manager.h +++ b/include/hermes/config_manager.h @@ -50,6 +50,7 @@ class ConfigurationManager { mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_mdm"); blob_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_blob_mdm"); bkt_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_bkt_mdm"); + HILOG(kInfo, "(node {}) BLOB MDM ID: {}", HRUN_CLIENT->node_id_, blob_mdm_.id_) op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm", bkt_mdm_.id_, blob_mdm_.id_); stager_mdm_.CreateRoot(DomainId::GetGlobal(), diff --git a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h index f8a705304..9c04cefdb 100644 --- a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h +++ b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h @@ -41,7 +41,7 @@ class Client : public TaskLibClient { task_node, domain_id, state_name, id_, queue_info); } void AsyncCreateComplete(ConstructTask *task) { - if (task->IsComplete()) { + if (task->IsModuleComplete()) { id_ = task->id_; queue_id_ = QueueId(id_); HRUN_CLIENT->DelTask(task); From 36fe383bec442e3a204bff84a91b706358deba5c Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 01:08:38 -0500 Subject: [PATCH 35/70] Add an alloc_state function --- .../hrun/api/template/hrun_task_cc.template | 18 +++++++++ hrun/include/hrun/task_registry/task_lib.h | 37 ++++++++++++------- .../hrun/task_registry/task_registry.h | 17 ++++++++- 3 files changed, 56 insertions(+), 16 deletions(-) create mode 100644 hrun/include/hrun/api/template/hrun_task_cc.template diff --git a/hrun/include/hrun/api/template/hrun_task_cc.template b/hrun/include/hrun/api/template/hrun_task_cc.template new file mode 100644 index 000000000..77e024be4 --- /dev/null +++ b/hrun/include/hrun/api/template/hrun_task_cc.template @@ -0,0 +1,18 @@ +extern "C" { +void* alloc_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) { + hrun::TaskState *exec = reinterpret_cast( + new TYPE_UNWRAP(TRAIT_CLASS)()); + exec->Init(task->id_, state_name); + return exec; +} +void* create_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) { + hrun::TaskState *exec = reinterpret_cast( + new TYPE_UNWRAP(TRAIT_CLASS)()); + exec->Init(task->id_, state_name); + RunContext rctx(0); + exec->Run(hrun::TaskMethod::kConstruct, task, rctx); + return exec; +} +const char* get_task_lib_name(void) { return TASK_NAME; } +bool is_hrun_task_ = true; +} \ No newline at end of file diff --git a/hrun/include/hrun/task_registry/task_lib.h b/hrun/include/hrun/task_registry/task_lib.h index a6cce4743..663fe42b5 100644 --- a/hrun/include/hrun/task_registry/task_lib.h +++ b/hrun/include/hrun/task_registry/task_lib.h @@ -98,26 +98,35 @@ class TaskLibClient { }; extern "C" { -/** The two methods provided by all tasks */ +/** Allocate a state (no construction) */ +typedef TaskState* (*alloc_state_t)(Task *task, const char *state_name); +/** Allocate + construct a state */ typedef TaskState* (*create_state_t)(Task *task, const char *state_name); /** Get the name of a task */ typedef const char* (*get_task_lib_name_t)(void); } // extern c /** Used internally by task source file */ -#define HRUN_TASK_CC(TRAIT_CLASS, TASK_NAME) \ - extern "C" { \ - void* create_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) { \ - hrun::TaskState *exec = reinterpret_cast( \ - new TYPE_UNWRAP(TRAIT_CLASS)()); \ - exec->Init(task->id_, state_name); \ - RunContext rctx(0); \ - exec->Run(hrun::TaskMethod::kConstruct, task, rctx); \ - return exec; \ - } \ - const char* get_task_lib_name(void) { return TASK_NAME; } \ - bool is_hrun_task_ = true; \ - } +#define HRUN_TASK_CC(TRAIT_CLASS, TASK_NAME)\ + extern "C" {\ + void* alloc_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) {\ + hrun::TaskState *exec = reinterpret_cast(\ + new TYPE_UNWRAP(TRAIT_CLASS)());\ + exec->Init(task->id_, state_name);\ + return exec;\ + }\ + void* create_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) {\ + hrun::TaskState *exec = reinterpret_cast(\ + new TYPE_UNWRAP(TRAIT_CLASS)());\ + exec->Init(task->id_, state_name);\ + RunContext rctx(0);\ + exec->Run(hrun::TaskMethod::kConstruct, task, rctx);\ + return exec;\ + }\ + const char* get_task_lib_name(void) { return TASK_NAME; }\ + bool is_hrun_task_ = true;\ + } + } // namespace hrun #endif // HRUN_INCLUDE_HRUN_TASK_TASK_H_ diff --git a/hrun/include/hrun/task_registry/task_registry.h b/hrun/include/hrun/task_registry/task_registry.h index 0324d7af9..40f75bd1e 100644 --- a/hrun/include/hrun/task_registry/task_registry.h +++ b/hrun/include/hrun/task_registry/task_registry.h @@ -29,6 +29,7 @@ namespace hrun { /** All information needed to create a trait */ struct TaskLibInfo { void *lib_; /**< The dlfcn library */ + alloc_state_t alloc_state_; /**< The create task function */ create_state_t create_state_; /**< The create task function */ get_task_lib_name_t get_task_lib_name; /**< The get task name function */ @@ -44,22 +45,27 @@ struct TaskLibInfo { /** Emplace constructor */ explicit TaskLibInfo(void *lib, - create_state_t create_task, + alloc_state_t alloc_state, + create_state_t create_state, get_task_lib_name_t get_task_name) - : lib_(lib), create_state_(create_task), get_task_lib_name(get_task_name) {} + : lib_(lib), alloc_state_(alloc_state), + create_state_(create_state), get_task_lib_name(get_task_name) {} /** Copy constructor */ TaskLibInfo(const TaskLibInfo &other) : lib_(other.lib_), + alloc_state_(other.alloc_state_), create_state_(other.create_state_), get_task_lib_name(other.get_task_lib_name) {} /** Move constructor */ TaskLibInfo(TaskLibInfo &&other) noexcept : lib_(other.lib_), + alloc_state_(other.alloc_state_), create_state_(other.create_state_), get_task_lib_name(other.get_task_lib_name) { other.lib_ = nullptr; + other.alloc_state_ = nullptr; other.create_state_ = nullptr; other.get_task_lib_name = nullptr; } @@ -157,6 +163,13 @@ class TaskRegistry { lib_path); return false; } + info.alloc_state_ = (alloc_state_t)dlsym( + info.lib_, "alloc_state"); + if (!info.alloc_state_) { + HELOG(kError, "The lib {} does not have alloc_state symbol", + lib_path); + return false; + } info.get_task_lib_name = (get_task_lib_name_t)dlsym( info.lib_, "get_task_lib_name"); if (!info.get_task_lib_name) { From e7a741be7eb2732ca0a7e0c963e7202a88cd487c Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 09:32:55 -0500 Subject: [PATCH 36/70] Begin making ConstructTasks more reliable over network --- hrun/include/hrun/network/serialize.h | 8 ++++++ .../hrun/task_registry/task_registry.h | 26 ++++++++++++------- .../include/hrun_admin/hrun_admin_tasks.h | 1 + .../hrun_admin/src/hrun_admin.cc | 14 +++++++--- .../remote_queue/src/remote_queue.cc | 26 ++++++++++++++----- .../include/data_stager/data_stager_tasks.h | 5 ++-- .../hermes_data_op/hermes_data_op_tasks.h | 5 ++-- 7 files changed, 62 insertions(+), 23 deletions(-) diff --git a/hrun/include/hrun/network/serialize.h b/hrun/include/hrun/network/serialize.h index 873d03a57..d775de7c8 100644 --- a/hrun/include/hrun/network/serialize.h +++ b/hrun/include/hrun/network/serialize.h @@ -220,6 +220,14 @@ class BinaryInputArchive { ss_.str(std::string((char*)param_xfer.data_, param_xfer.data_size_)); } + /** String constructor */ + BinaryInputArchive(const std::string ¶ms) : ar_(ss_) { + xfer_.resize(1); + xfer_[0].data_ = (void*)params.data(); + xfer_[0].data_size_ = params.size(); + ss_.str(params); + } + /** Deserialize using call */ template BinaryInputArchive& operator()(T &var, Args &&...args) { diff --git a/hrun/include/hrun/task_registry/task_registry.h b/hrun/include/hrun/task_registry/task_registry.h index 40f75bd1e..6885a7c78 100644 --- a/hrun/include/hrun/task_registry/task_registry.h +++ b/hrun/include/hrun/task_registry/task_registry.h @@ -214,15 +214,16 @@ class TaskRegistry { * Create a task state * state_id must not be NULL. * */ - bool CreateTaskState(const char *lib_name, - const char *state_name, - const TaskStateId &state_id, - Admin::CreateTaskStateTask *task) { + TaskState* CreateTaskState(const char *lib_name, + const char *state_name, + const TaskStateId &state_id, + Admin::CreateTaskStateTask *task, + bool alloc_only = false) { // Ensure state_id is not NULL if (state_id.IsNull()) { HILOG(kError, "The task state ID cannot be null"); task->SetModuleComplete(); - return false; + return nullptr; } // HILOG(kInfo, "(node {}) Creating an instance of {} with name {}", // HRUN_CLIENT->node_id_, lib_name, state_name) @@ -232,24 +233,29 @@ class TaskRegistry { if (it == libs_.end()) { HELOG(kError, "Could not find the task lib: {}", lib_name); task->SetModuleComplete(); - return false; + return nullptr; } // Ensure the task state does not already exist if (TaskStateExists(state_id)) { HELOG(kError, "The task state already exists: {}", state_name); task->SetModuleComplete(); - return true; + return nullptr; } // Create the state instance task->id_ = state_id; TaskLibInfo &info = it->second; - TaskState *task_state = info.create_state_(task, state_name); + TaskState *task_state; + if (!alloc_only) { + task_state = info.create_state_(task, state_name); + } else { + task_state = info.alloc_state_(task, state_name); + } if (!task_state) { HELOG(kError, "Could not create the task state: {}", state_name); task->SetModuleComplete(); - return false; + return nullptr; } // Add the state to the registry @@ -260,7 +266,7 @@ class TaskRegistry { task_states_.emplace(state_id, task_state); HILOG(kInfo, "(node {}) Created an instance of {} with name {} and ID {}", HRUN_CLIENT->node_id_, lib_name, state_name, state_id) - return true; + return task_state; } /** Get or create a task state's ID */ diff --git a/hrun/tasks_required/hrun_admin/include/hrun_admin/hrun_admin_tasks.h b/hrun/tasks_required/hrun_admin/include/hrun_admin/hrun_admin_tasks.h index 602185310..5de0662f5 100644 --- a/hrun/tasks_required/hrun_admin/include/hrun_admin/hrun_admin_tasks.h +++ b/hrun/tasks_required/hrun_admin/include/hrun_admin/hrun_admin_tasks.h @@ -143,6 +143,7 @@ struct CreateTaskStateTask : public Task, TaskFlags { IN hipc::ShmArchive state_name_; IN hipc::ShmArchive> queue_info_; INOUT TaskStateId id_; + TEMP std::string *net_buf_ = nullptr; /** SHM default constructor */ HSHM_ALWAYS_INLINE explicit diff --git a/hrun/tasks_required/hrun_admin/src/hrun_admin.cc b/hrun/tasks_required/hrun_admin/src/hrun_admin.cc index ca9653e7d..04740711e 100644 --- a/hrun/tasks_required/hrun_admin/src/hrun_admin.cc +++ b/hrun/tasks_required/hrun_admin/src/hrun_admin.cc @@ -81,13 +81,21 @@ class Server : public TaskLib { QueueId qid(task->id_); MultiQueue *queue = HRUN_QM_RUNTIME->CreateQueue( qid, task->queue_info_->vec()); - // Run the task state's submethod + // Allocate the task state task->method_ = Method::kConstruct; - bool ret = HRUN_TASK_REGISTRY->CreateTaskState( + TaskState *exec = HRUN_TASK_REGISTRY->CreateTaskState( lib_name.c_str(), state_name.c_str(), task->id_, - task); + task, task->net_buf_ != nullptr); + if (exec && task->net_buf_ != nullptr) { + // For networked tasks, need to re-deserialize using the proper + // deserialization method. + BinaryInputArchive net_buf(*task->net_buf_); + TaskPointer task_ptr = exec->LoadStart(Method::kConstruct, net_buf); + exec->Run(Method::kConstruct, task_ptr.ptr_, rctx); + HRUN_CLIENT->DelTask(exec, task_ptr.ptr_); + } queue->flags_.SetBits(QUEUE_READY); task->SetModuleComplete(); } diff --git a/hrun/tasks_required/remote_queue/src/remote_queue.cc b/hrun/tasks_required/remote_queue/src/remote_queue.cc index 1de787101..c90d3cfb2 100644 --- a/hrun/tasks_required/remote_queue/src/remote_queue.cc +++ b/hrun/tasks_required/remote_queue/src/remote_queue.cc @@ -204,7 +204,7 @@ class Server : public TaskLib { // Process the message TaskState *exec; Task *orig_task; - RpcExec(req, state_id, method, xfer, orig_task, exec); + RpcExec(req, state_id, method, params, xfer, orig_task, exec); RpcComplete(req, method, orig_task, exec, state_id); } @@ -235,7 +235,7 @@ class Server : public TaskLib { } TaskState *exec; Task *orig_task; - RpcExec(req, state_id, method, xfer, orig_task, exec); + RpcExec(req, state_id, method, params, xfer, orig_task, exec); if (io_type == IoType::kRead) { HILOG(kDebug, "(node {}) Read blob integer: {}", HRUN_CLIENT->node_id_, (int)data[0]) HRUN_THALLIUM->IoCallServer(req, bulk, io_type, data.data(), data_size); @@ -249,6 +249,7 @@ class Server : public TaskLib { void RpcExec(const tl::request &req, const TaskStateId &state_id, u32 method, + std::string ¶ms, std::vector &xfer, Task *&orig_task, TaskState *&exec) { size_t data_size = xfer[0].data_size_; @@ -268,16 +269,29 @@ class Server : public TaskLib { } TaskPointer task_ptr = exec->LoadStart(method, ar); orig_task = task_ptr.ptr_; - hipc::Pointer &p = task_ptr.shm_; orig_task->domain_id_ = DomainId::GetNode(HRUN_CLIENT->node_id_); - // Execute task - MultiQueue *queue = HRUN_CLIENT->GetQueue(QueueId(state_id)); + // NOTE(llogan): Construction tasks will call deserialization + // improperly since they are routed to the Admin state instead + // of the state they are constructing. This is because their + // state does not yet exist. We fix this by passing the params + // buffer to the construction task. + if (orig_task->method_ == Method::kConstruct) { + ((CreateTaskStateTask*)orig_task)->net_buf_ = ¶ms; + } + + // Unset task flags + // NOTE(llogan): Remote tasks are executed to completion and + // return values sent back to the remote host. This is + // for things like long-running monitoring tasks. orig_task->UnsetFireAndForget(); orig_task->UnsetStarted(); orig_task->UnsetDataOwner(); orig_task->UnsetLongRunning(); - queue->Emplace(orig_task->prio_, orig_task->lane_hash_, p); + + // Execute task + MultiQueue *queue = HRUN_CLIENT->GetQueue(QueueId(state_id)); + queue->Emplace(orig_task->prio_, orig_task->lane_hash_, task_ptr.shm_); HILOG(kDebug, "(node {}) Executing task (task_node={}, task_state={}/{}, state_name={}, method={}, size={}, lane_hash={})", HRUN_CLIENT->node_id_, diff --git a/tasks/data_stager/include/data_stager/data_stager_tasks.h b/tasks/data_stager/include/data_stager/data_stager_tasks.h index baded22bf..c25219578 100644 --- a/tasks/data_stager/include/data_stager/data_stager_tasks.h +++ b/tasks/data_stager/include/data_stager/data_stager_tasks.h @@ -46,10 +46,11 @@ struct ConstructTask : public CreateTaskStateTask { blob_mdm_ = blob_mdm; } + /** (De)serialize message call */ template void SerializeStart(Ar &ar) { - task_serialize(ar); - ar(lib_name_, state_name_, id_, queue_info_, blob_mdm_); + CreateTaskStateTask::SerializeStart(ar); + ar(blob_mdm_); } HSHM_ALWAYS_INLINE diff --git a/tasks/hermes_data_op/include/hermes_data_op/hermes_data_op_tasks.h b/tasks/hermes_data_op/include/hermes_data_op/hermes_data_op_tasks.h index 0421f8413..02b0f5872 100644 --- a/tasks/hermes_data_op/include/hermes_data_op/hermes_data_op_tasks.h +++ b/tasks/hermes_data_op/include/hermes_data_op/hermes_data_op_tasks.h @@ -93,10 +93,11 @@ struct ConstructTask : public CreateTaskStateTask { blob_mdm_ = blob_mdm_id; } + /** (De)serialize message call */ template void SerializeStart(Ar &ar) { - task_serialize(ar); - ar(lib_name_, state_name_, id_, queue_info_, bkt_mdm_, blob_mdm_); + CreateTaskStateTask::SerializeStart(ar); + ar(bkt_mdm_, blob_mdm_); } HSHM_ALWAYS_INLINE From f691ff5c96cbefce9e1705d8396ebacaac1e2a92 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 09:37:46 -0500 Subject: [PATCH 37/70] Add more prints --- hrun/tasks_required/hrun_admin/src/hrun_admin.cc | 1 + hrun/tasks_required/remote_queue/src/remote_queue.cc | 1 + 2 files changed, 2 insertions(+) diff --git a/hrun/tasks_required/hrun_admin/src/hrun_admin.cc b/hrun/tasks_required/hrun_admin/src/hrun_admin.cc index 04740711e..aa013ca6c 100644 --- a/hrun/tasks_required/hrun_admin/src/hrun_admin.cc +++ b/hrun/tasks_required/hrun_admin/src/hrun_admin.cc @@ -91,6 +91,7 @@ class Server : public TaskLib { if (exec && task->net_buf_ != nullptr) { // For networked tasks, need to re-deserialize using the proper // deserialization method. + HILOG(kInfo, "Networked buffer mode???") BinaryInputArchive net_buf(*task->net_buf_); TaskPointer task_ptr = exec->LoadStart(Method::kConstruct, net_buf); exec->Run(Method::kConstruct, task_ptr.ptr_, rctx); diff --git a/hrun/tasks_required/remote_queue/src/remote_queue.cc b/hrun/tasks_required/remote_queue/src/remote_queue.cc index c90d3cfb2..f222dd7b7 100644 --- a/hrun/tasks_required/remote_queue/src/remote_queue.cc +++ b/hrun/tasks_required/remote_queue/src/remote_queue.cc @@ -277,6 +277,7 @@ class Server : public TaskLib { // state does not yet exist. We fix this by passing the params // buffer to the construction task. if (orig_task->method_ == Method::kConstruct) { + HILOG(kInfo, "Setting the net buf for the construction task"); ((CreateTaskStateTask*)orig_task)->net_buf_ = ¶ms; } From fb73196cbd64cf6ff06523bf9128bbb748a5d8d8 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 09:41:06 -0500 Subject: [PATCH 38/70] Print networked commands --- hrun/tasks_required/remote_queue/src/remote_queue.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hrun/tasks_required/remote_queue/src/remote_queue.cc b/hrun/tasks_required/remote_queue/src/remote_queue.cc index f222dd7b7..3851fd22c 100644 --- a/hrun/tasks_required/remote_queue/src/remote_queue.cc +++ b/hrun/tasks_required/remote_queue/src/remote_queue.cc @@ -293,7 +293,7 @@ class Server : public TaskLib { // Execute task MultiQueue *queue = HRUN_CLIENT->GetQueue(QueueId(state_id)); queue->Emplace(orig_task->prio_, orig_task->lane_hash_, task_ptr.shm_); - HILOG(kDebug, + HILOG(kInfo, "(node {}) Executing task (task_node={}, task_state={}/{}, state_name={}, method={}, size={}, lane_hash={})", HRUN_CLIENT->node_id_, orig_task->task_node_, From 1e48993c8e7808ec6b057458f61275afcab7823d Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 09:45:39 -0500 Subject: [PATCH 39/70] CreateTaskState not Construct --- hrun/tasks_required/remote_queue/src/remote_queue.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hrun/tasks_required/remote_queue/src/remote_queue.cc b/hrun/tasks_required/remote_queue/src/remote_queue.cc index 3851fd22c..dc6b65b75 100644 --- a/hrun/tasks_required/remote_queue/src/remote_queue.cc +++ b/hrun/tasks_required/remote_queue/src/remote_queue.cc @@ -276,7 +276,7 @@ class Server : public TaskLib { // of the state they are constructing. This is because their // state does not yet exist. We fix this by passing the params // buffer to the construction task. - if (orig_task->method_ == Method::kConstruct) { + if (orig_task->method_ == Admin::Method::kCreateTaskState) { HILOG(kInfo, "Setting the net buf for the construction task"); ((CreateTaskStateTask*)orig_task)->net_buf_ = ¶ms; } From 7f644afcf18dc26c1d5b2e92dda26779133ad2cf Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 09:56:42 -0500 Subject: [PATCH 40/70] Add custom parameter buf to construction tasks --- .../hrun/task_registry/task_registry.h | 9 ++------ .../include/hrun_admin/hrun_admin_tasks.h | 6 +++-- .../hrun_admin/src/hrun_admin.cc | 13 ++--------- .../remote_queue/src/remote_queue.cc | 10 -------- .../include/data_stager/data_stager_tasks.h | 14 +++++++---- tasks/data_stager/src/data_stager.cc | 1 + .../hermes_data_op/hermes_data_op_tasks.h | 13 +++++++++++ tasks/hermes_data_op/src/hermes_data_op.cc | 1 + .../include/hermes_mdm/hermes_mdm_tasks.h | 23 ++++++++++--------- 9 files changed, 45 insertions(+), 45 deletions(-) diff --git a/hrun/include/hrun/task_registry/task_registry.h b/hrun/include/hrun/task_registry/task_registry.h index 6885a7c78..60bf7844b 100644 --- a/hrun/include/hrun/task_registry/task_registry.h +++ b/hrun/include/hrun/task_registry/task_registry.h @@ -217,8 +217,7 @@ class TaskRegistry { TaskState* CreateTaskState(const char *lib_name, const char *state_name, const TaskStateId &state_id, - Admin::CreateTaskStateTask *task, - bool alloc_only = false) { + Admin::CreateTaskStateTask *task) { // Ensure state_id is not NULL if (state_id.IsNull()) { HILOG(kError, "The task state ID cannot be null"); @@ -247,11 +246,7 @@ class TaskRegistry { task->id_ = state_id; TaskLibInfo &info = it->second; TaskState *task_state; - if (!alloc_only) { - task_state = info.create_state_(task, state_name); - } else { - task_state = info.alloc_state_(task, state_name); - } + task_state = info.create_state_(task, state_name); if (!task_state) { HELOG(kError, "Could not create the task state: {}", state_name); task->SetModuleComplete(); diff --git a/hrun/tasks_required/hrun_admin/include/hrun_admin/hrun_admin_tasks.h b/hrun/tasks_required/hrun_admin/include/hrun_admin/hrun_admin_tasks.h index 5de0662f5..4dd5e24ab 100644 --- a/hrun/tasks_required/hrun_admin/include/hrun_admin/hrun_admin_tasks.h +++ b/hrun/tasks_required/hrun_admin/include/hrun_admin/hrun_admin_tasks.h @@ -143,7 +143,7 @@ struct CreateTaskStateTask : public Task, TaskFlags { IN hipc::ShmArchive state_name_; IN hipc::ShmArchive> queue_info_; INOUT TaskStateId id_; - TEMP std::string *net_buf_ = nullptr; + IN hipc::ShmArchive custom_; /** SHM default constructor */ HSHM_ALWAYS_INLINE explicit @@ -171,6 +171,7 @@ struct CreateTaskStateTask : public Task, TaskFlags { HSHM_MAKE_AR(state_name_, alloc, state_name); HSHM_MAKE_AR(lib_name_, alloc, lib_name); HSHM_MAKE_AR(queue_info_, alloc, queue_info); + HSHM_MAKE_AR(custom_, alloc, ""); id_ = id; } @@ -179,6 +180,7 @@ struct CreateTaskStateTask : public Task, TaskFlags { HSHM_DESTROY_AR(state_name_); HSHM_DESTROY_AR(lib_name_); HSHM_DESTROY_AR(queue_info_); + HSHM_DESTROY_AR(custom_); } /** Duplicate message */ @@ -204,7 +206,7 @@ struct CreateTaskStateTask : public Task, TaskFlags { template void SerializeStart(Ar &ar) { task_serialize(ar); - ar(lib_name_, state_name_, id_, queue_info_); + ar(lib_name_, state_name_, id_, queue_info_, custom_); } /** (De)serialize message return */ diff --git a/hrun/tasks_required/hrun_admin/src/hrun_admin.cc b/hrun/tasks_required/hrun_admin/src/hrun_admin.cc index aa013ca6c..6d28bcccb 100644 --- a/hrun/tasks_required/hrun_admin/src/hrun_admin.cc +++ b/hrun/tasks_required/hrun_admin/src/hrun_admin.cc @@ -83,20 +83,11 @@ class Server : public TaskLib { qid, task->queue_info_->vec()); // Allocate the task state task->method_ = Method::kConstruct; - TaskState *exec = HRUN_TASK_REGISTRY->CreateTaskState( + HRUN_TASK_REGISTRY->CreateTaskState( lib_name.c_str(), state_name.c_str(), task->id_, - task, task->net_buf_ != nullptr); - if (exec && task->net_buf_ != nullptr) { - // For networked tasks, need to re-deserialize using the proper - // deserialization method. - HILOG(kInfo, "Networked buffer mode???") - BinaryInputArchive net_buf(*task->net_buf_); - TaskPointer task_ptr = exec->LoadStart(Method::kConstruct, net_buf); - exec->Run(Method::kConstruct, task_ptr.ptr_, rctx); - HRUN_CLIENT->DelTask(exec, task_ptr.ptr_); - } + task); queue->flags_.SetBits(QUEUE_READY); task->SetModuleComplete(); } diff --git a/hrun/tasks_required/remote_queue/src/remote_queue.cc b/hrun/tasks_required/remote_queue/src/remote_queue.cc index dc6b65b75..82fc9859a 100644 --- a/hrun/tasks_required/remote_queue/src/remote_queue.cc +++ b/hrun/tasks_required/remote_queue/src/remote_queue.cc @@ -271,16 +271,6 @@ class Server : public TaskLib { orig_task = task_ptr.ptr_; orig_task->domain_id_ = DomainId::GetNode(HRUN_CLIENT->node_id_); - // NOTE(llogan): Construction tasks will call deserialization - // improperly since they are routed to the Admin state instead - // of the state they are constructing. This is because their - // state does not yet exist. We fix this by passing the params - // buffer to the construction task. - if (orig_task->method_ == Admin::Method::kCreateTaskState) { - HILOG(kInfo, "Setting the net buf for the construction task"); - ((CreateTaskStateTask*)orig_task)->net_buf_ = ¶ms; - } - // Unset task flags // NOTE(llogan): Remote tasks are executed to completion and // return values sent back to the remote host. This is diff --git a/tasks/data_stager/include/data_stager/data_stager_tasks.h b/tasks/data_stager/include/data_stager/data_stager_tasks.h index c25219578..2c3c5fd22 100644 --- a/tasks/data_stager/include/data_stager/data_stager_tasks.h +++ b/tasks/data_stager/include/data_stager/data_stager_tasks.h @@ -44,12 +44,18 @@ struct ConstructTask : public CreateTaskStateTask { "data_stager", id, queue_info) { // Custom params blob_mdm_ = blob_mdm; + std::stringstream ss; + cereal::BinaryOutputArchive ar(ss); + ar(blob_mdm_); + std::string data = ss.str(); + *custom_ = data; } - /** (De)serialize message call */ - template - void SerializeStart(Ar &ar) { - CreateTaskStateTask::SerializeStart(ar); + /** Deserialize parameters */ + void Deserialize() { + std::string data = custom_->str(); + std::stringstream ss(data); + cereal::BinaryInputArchive ar(ss); ar(blob_mdm_); } diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 794102b53..322ddb130 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -21,6 +21,7 @@ class Server : public TaskLib { Server() = default; void Construct(ConstructTask *task, RunContext &rctx) { + task->Deserialize(); url_map_.resize(HRUN_QM_RUNTIME->max_lanes_); blob_mdm_.Init(task->blob_mdm_); HILOG(kInfo, "(node {}) BLOB MDM: {}", HRUN_CLIENT->node_id_, blob_mdm_.id_); diff --git a/tasks/hermes_data_op/include/hermes_data_op/hermes_data_op_tasks.h b/tasks/hermes_data_op/include/hermes_data_op/hermes_data_op_tasks.h index 02b0f5872..6f0d97d02 100644 --- a/tasks/hermes_data_op/include/hermes_data_op/hermes_data_op_tasks.h +++ b/tasks/hermes_data_op/include/hermes_data_op/hermes_data_op_tasks.h @@ -91,6 +91,19 @@ struct ConstructTask : public CreateTaskStateTask { // Custom params bkt_mdm_ = bkt_mdm_id; blob_mdm_ = blob_mdm_id; + std::stringstream ss; + cereal::BinaryOutputArchive ar(ss); + ar(bkt_mdm_, blob_mdm_); + std::string data = ss.str(); + *custom_ = data; + } + + /** Deserialize parameters */ + void Deserialize() { + std::string data = custom_->str(); + std::stringstream ss(data); + cereal::BinaryInputArchive ar(ss); + ar(bkt_mdm_, blob_mdm_); } /** (De)serialize message call */ diff --git a/tasks/hermes_data_op/src/hermes_data_op.cc b/tasks/hermes_data_op/src/hermes_data_op.cc index 5c366210f..72d9f3db7 100644 --- a/tasks/hermes_data_op/src/hermes_data_op.cc +++ b/tasks/hermes_data_op/src/hermes_data_op.cc @@ -32,6 +32,7 @@ class Server : public TaskLib { Server() = default; void Construct(ConstructTask *task, RunContext &rctx) { + task->Deserialize(); bkt_mdm_.Init(task->bkt_mdm_); blob_mdm_.Init(task->blob_mdm_); client_.Init(id_); diff --git a/tasks/hermes_mdm/include/hermes_mdm/hermes_mdm_tasks.h b/tasks/hermes_mdm/include/hermes_mdm/hermes_mdm_tasks.h index 1a590cd07..93db250d1 100644 --- a/tasks/hermes_mdm/include/hermes_mdm/hermes_mdm_tasks.h +++ b/tasks/hermes_mdm/include/hermes_mdm/hermes_mdm_tasks.h @@ -42,6 +42,18 @@ struct ConstructTask : public CreateTaskStateTask { "hermes_mdm", id, queue_info) { // Custom params HSHM_MAKE_AR(server_config_path_, alloc, server_config_path); + std::stringstream ss; + cereal::BinaryOutputArchive ar(ss); + ar(server_config_path_); + std::string data = ss.str(); + *custom_ = data; + } + + void Deserialize() { + std::string data = custom_->str(); + std::stringstream ss(data); + cereal::BinaryInputArchive ar(ss); + ar(server_config_path_); } /** Destructor */ @@ -49,17 +61,6 @@ struct ConstructTask : public CreateTaskStateTask { ~ConstructTask() { HSHM_DESTROY_AR(server_config_path_); } - - /** (De)serialize message call */ - template - void SerializeStart(Ar &ar) { - CreateTaskStateTask::SerializeStart(ar); - ar(server_config_path_); - } - - /** (De)serialize message return */ - template - void SerializeEnd(u32 replica, Ar &ar) {} }; /** A task to destroy hermes_mdm */ From 8224e8e71a54575e223bce3383c08ae2900f310f Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:01:33 -0500 Subject: [PATCH 41/70] Reduce logging in remote queue --- hrun/tasks_required/remote_queue/src/remote_queue.cc | 2 +- tasks/data_stager/src/data_stager.cc | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/hrun/tasks_required/remote_queue/src/remote_queue.cc b/hrun/tasks_required/remote_queue/src/remote_queue.cc index 82fc9859a..049783c6d 100644 --- a/hrun/tasks_required/remote_queue/src/remote_queue.cc +++ b/hrun/tasks_required/remote_queue/src/remote_queue.cc @@ -283,7 +283,7 @@ class Server : public TaskLib { // Execute task MultiQueue *queue = HRUN_CLIENT->GetQueue(QueueId(state_id)); queue->Emplace(orig_task->prio_, orig_task->lane_hash_, task_ptr.shm_); - HILOG(kInfo, + HILOG(kDebug, "(node {}) Executing task (task_node={}, task_state={}/{}, state_name={}, method={}, size={}, lane_hash={})", HRUN_CLIENT->node_id_, orig_task->task_node_, diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 322ddb130..efe923965 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -58,9 +58,7 @@ class Server : public TaskLib { return; } std::unique_ptr &stager = it->second; - HILOG(kInfo, "POINTER FAILING HERE?: {}", (size_t)stager.get()); stager->StageIn(blob_mdm_, task, rctx); - HILOG(kInfo, "STAGED IN?: {}", (size_t)stager.get()); task->SetModuleComplete(); } From 1273b446708db44f035fde7f52df7dc22cc9e201 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:04:17 -0500 Subject: [PATCH 42/70] More logging in put --- tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index 5ad3ec60f..e75741627 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -267,7 +267,7 @@ class Server : public TaskLib { size_diff = needed_space - blob_info.max_blob_size_; } blob_info.blob_size_ += size_diff; - HILOG(kDebug, "The size diff is {} bytes", size_diff) + HILOG(kInfo, "The size diff is {} bytes", size_diff) // Use DPE std::vector schema_vec; @@ -303,7 +303,7 @@ class Server : public TaskLib { write_tasks.reserve(blob_info.buffers_.size()); size_t blob_off = 0, buf_off = 0; char *blob_buf = HRUN_CLIENT->GetPrivatePointer(task->data_); - HILOG(kDebug, "Number of buffers {}", blob_info.buffers_.size()); + HILOG(kInfo, "Number of buffers {}", blob_info.buffers_.size()); for (BufferInfo &buf : blob_info.buffers_) { size_t blob_left = blob_off; size_t blob_right = blob_off + buf.t_size_; @@ -314,7 +314,7 @@ class Server : public TaskLib { if (blob_off + buf_size > task->blob_off_ + task->data_size_) { buf_size = task->blob_off_ + task->data_size_ - blob_off; } - HILOG(kDebug, "Writing {} bytes at off {} from target {}", buf_size, tgt_off, buf.tid_) + HILOG(kInfo, "Writing {} bytes at off {} from target {}", buf_size, tgt_off, buf.tid_) TargetInfo &target = *target_map_[buf.tid_]; LPointer write_task = target.AsyncWrite(task->task_node_ + 1, @@ -357,6 +357,7 @@ class Server : public TaskLib { } // Free data + HILOG(kInfo, "Completing PUT for {}", blob_name.str()); task->SetModuleComplete(); } From 3debb45ade3c62c70139cbcb7bdac7ae5e555578 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:07:13 -0500 Subject: [PATCH 43/70] Make statements debug --- tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index e75741627..af2f7f6f6 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -225,7 +225,7 @@ class Server : public TaskLib { task->blob_id_ = GetOrCreateBlobId(task->tag_id_, task->lane_hash_, blob_name, rctx, task->flags_); } - HILOG(kInfo, "Beginning PUT for {}", blob_name.str()); + HILOG(kDebug, "Beginning PUT for {}", blob_name.str()); BLOB_MAP_T &blob_map = blob_map_[rctx.lane_id_]; BlobInfo &blob_info = blob_map[task->blob_id_]; @@ -267,7 +267,7 @@ class Server : public TaskLib { size_diff = needed_space - blob_info.max_blob_size_; } blob_info.blob_size_ += size_diff; - HILOG(kInfo, "The size diff is {} bytes", size_diff) + HILOG(kDebug, "The size diff is {} bytes", size_diff) // Use DPE std::vector schema_vec; @@ -303,7 +303,7 @@ class Server : public TaskLib { write_tasks.reserve(blob_info.buffers_.size()); size_t blob_off = 0, buf_off = 0; char *blob_buf = HRUN_CLIENT->GetPrivatePointer(task->data_); - HILOG(kInfo, "Number of buffers {}", blob_info.buffers_.size()); + HILOG(kDebug, "Number of buffers {}", blob_info.buffers_.size()); for (BufferInfo &buf : blob_info.buffers_) { size_t blob_left = blob_off; size_t blob_right = blob_off + buf.t_size_; @@ -314,7 +314,7 @@ class Server : public TaskLib { if (blob_off + buf_size > task->blob_off_ + task->data_size_) { buf_size = task->blob_off_ + task->data_size_ - blob_off; } - HILOG(kInfo, "Writing {} bytes at off {} from target {}", buf_size, tgt_off, buf.tid_) + HILOG(kDebug, "Writing {} bytes at off {} from target {}", buf_size, tgt_off, buf.tid_) TargetInfo &target = *target_map_[buf.tid_]; LPointer write_task = target.AsyncWrite(task->task_node_ + 1, @@ -357,7 +357,7 @@ class Server : public TaskLib { } // Free data - HILOG(kInfo, "Completing PUT for {}", blob_name.str()); + HILOG(kDebug, "Completing PUT for {}", blob_name.str()); task->SetModuleComplete(); } From 1edd06a90547512c2af5b6ca1a8a37ee6b2239ff Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:11:56 -0500 Subject: [PATCH 44/70] Print node where stager registered --- tasks/data_stager/src/data_stager.cc | 2 +- test/unit/hermes/test_bucket.cc | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index efe923965..8d954c099 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -36,7 +36,7 @@ class Server : public TaskLib { std::string url = task->url_->str(); std::unique_ptr stager = StagerFactory::Get(url); stager->RegisterStager(task, rctx); - HILOG(kInfo, "REGISTERING STAGER: {}", (size_t)stager.get()); + HILOG(kInfo, "(node {}) REGISTERING STAGER: {}", HRUN_CLIENT->node_id_, (size_t)stager.get()); url_map_[rctx.lane_id_].emplace(task->bkt_id_, std::move(stager)); task->SetModuleComplete(); } diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index 61df79db3..f141825d6 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -470,7 +470,6 @@ TEST_CASE("TestHermesDataStager") { hshm::charbuf url = BinaryFileStager::BuildFileUrl(path, page_size); hermes::Bucket bkt(url.str(), file_size, HERMES_IS_FILE); - HILOG(kInfo, "CREATED STAGERS!!!") // Put a few blobs in the bucket for (size_t i = off; i < proc_count; ++i) { From 3c12eb2541cefb14401ad55e21772bdca6b4a361 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:13:42 -0500 Subject: [PATCH 45/70] Only register stagers --- test/unit/hermes/test_bucket.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index f141825d6..4a2f51134 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -470,6 +470,7 @@ TEST_CASE("TestHermesDataStager") { hshm::charbuf url = BinaryFileStager::BuildFileUrl(path, page_size); hermes::Bucket bkt(url.str(), file_size, HERMES_IS_FILE); + return; // Put a few blobs in the bucket for (size_t i = off; i < proc_count; ++i) { From 73450f38ac5866aaf7d42d04a07e96f9538f1b2a Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:19:42 -0500 Subject: [PATCH 46/70] sleep after to see if register stager worked --- test/unit/hermes/test_bucket.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index 4a2f51134..71bd8fe34 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -470,7 +470,7 @@ TEST_CASE("TestHermesDataStager") { hshm::charbuf url = BinaryFileStager::BuildFileUrl(path, page_size); hermes::Bucket bkt(url.str(), file_size, HERMES_IS_FILE); - return; + sleep(5); // Put a few blobs in the bucket for (size_t i = off; i < proc_count; ++i) { From ae3155a7205af6ef381897b05a1ed097f2855575 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:22:34 -0500 Subject: [PATCH 47/70] Remove PutBlob from staging --- .../data_stager/factory/binary_stager.h | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tasks/data_stager/include/data_stager/factory/binary_stager.h b/tasks/data_stager/include/data_stager/factory/binary_stager.h index f66e3e0ec..e9b272c13 100644 --- a/tasks/data_stager/include/data_stager/factory/binary_stager.h +++ b/tasks/data_stager/include/data_stager/factory/binary_stager.h @@ -76,15 +76,15 @@ class BinaryFileStager : public AbstractStager { HILOG(kInfo, "Submitting put blob {} ({}) to blob mdm ({})", task->blob_name_->str(), task->bkt_id_, blob_mdm.id_) hapi::Context ctx; - LPointer put_task = - blob_mdm.AsyncPutBlob(task->task_node_ + 1, - task->bkt_id_, - hshm::to_charbuf(*task->blob_name_), - hermes::BlobId::GetNull(), - 0, real_size, blob.shm_, task->score_, 0, - ctx, TASK_DATA_OWNER | TASK_LOW_LATENCY); - put_task->Wait(task); - HRUN_CLIENT->DelTask(put_task); +// LPointer put_task = +// blob_mdm.AsyncPutBlob(task->task_node_ + 1, +// task->bkt_id_, +// hshm::to_charbuf(*task->blob_name_), +// hermes::BlobId::GetNull(), +// 0, real_size, blob.shm_, task->score_, 0, +// ctx, TASK_DATA_OWNER | TASK_LOW_LATENCY); +// put_task->Wait(task); +// HRUN_CLIENT->DelTask(put_task); } /** Stage data out to remote source */ From d5ccd27af817045f62a2d82c11c538cab68c2af0 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:25:54 -0500 Subject: [PATCH 48/70] Remove memcpy from Staging --- tasks/data_stager/include/data_stager/factory/binary_stager.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tasks/data_stager/include/data_stager/factory/binary_stager.h b/tasks/data_stager/include/data_stager/factory/binary_stager.h index e9b272c13..d1d3607cc 100644 --- a/tasks/data_stager/include/data_stager/factory/binary_stager.h +++ b/tasks/data_stager/include/data_stager/factory/binary_stager.h @@ -70,7 +70,7 @@ class BinaryFileStager : public AbstractStager { page_size_, url_); return; } - memcpy(blob.ptr_ + plcmnt.blob_off_, blob.ptr_, real_size); + // memcpy(blob.ptr_ + plcmnt.blob_off_, blob.ptr_, real_size); HILOG(kInfo, "Staged {} bytes from the backend file {}", real_size, url_); HILOG(kInfo, "Submitting put blob {} ({}) to blob mdm ({})", From efcda56c4d89630386ae44f4469973850348c4fc Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:28:21 -0500 Subject: [PATCH 49/70] Make prints debug --- .../data_stager/include/data_stager/factory/binary_stager.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tasks/data_stager/include/data_stager/factory/binary_stager.h b/tasks/data_stager/include/data_stager/factory/binary_stager.h index d1d3607cc..4078af03e 100644 --- a/tasks/data_stager/include/data_stager/factory/binary_stager.h +++ b/tasks/data_stager/include/data_stager/factory/binary_stager.h @@ -58,7 +58,7 @@ class BinaryFileStager : public AbstractStager { void StageIn(blob_mdm::Client &blob_mdm, StageInTask *task, RunContext &rctx) override { adapter::BlobPlacement plcmnt; plcmnt.DecodeBlobName(*task->blob_name_, page_size_); - HILOG(kInfo, "Attempting to stage {} bytes from the backend file {} at offset {}", + HILOG(kDebug, "Attempting to stage {} bytes from the backend file {} at offset {}", page_size_, url_, plcmnt.bucket_off_); LPointer blob = HRUN_CLIENT->AllocateBuffer(page_size_); ssize_t real_size = HERMES_POSIX_API->pread(fd_, @@ -71,9 +71,9 @@ class BinaryFileStager : public AbstractStager { return; } // memcpy(blob.ptr_ + plcmnt.blob_off_, blob.ptr_, real_size); - HILOG(kInfo, "Staged {} bytes from the backend file {}", + HILOG(kDebug, "Staged {} bytes from the backend file {}", real_size, url_); - HILOG(kInfo, "Submitting put blob {} ({}) to blob mdm ({})", + HILOG(kDebug, "Submitting put blob {} ({}) to blob mdm ({})", task->blob_name_->str(), task->bkt_id_, blob_mdm.id_) hapi::Context ctx; // LPointer put_task = From 83b880abe0b4bbcc1aa5724bca9f0244e6b7ef91 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:30:22 -0500 Subject: [PATCH 50/70] don't use op mdm --- include/hermes/config_manager.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/include/hermes/config_manager.h b/include/hermes/config_manager.h index 064e83e35..30a1bd13f 100644 --- a/include/hermes/config_manager.h +++ b/include/hermes/config_manager.h @@ -50,9 +50,8 @@ class ConfigurationManager { mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_mdm"); blob_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_blob_mdm"); bkt_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_bkt_mdm"); - HILOG(kInfo, "(node {}) BLOB MDM ID: {}", HRUN_CLIENT->node_id_, blob_mdm_.id_) - op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm", - bkt_mdm_.id_, blob_mdm_.id_); +// op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm", +// bkt_mdm_.id_, blob_mdm_.id_); stager_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_stager_mdm", blob_mdm_.id_); blob_mdm_.SetBucketMdmRoot(DomainId::GetGlobal(), From 630ee6ce458f6e817a1767752b79b0f09af6bde0 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:33:44 -0500 Subject: [PATCH 51/70] don't rerun long running remote tasks --- hrun/tasks_required/remote_queue/src/remote_queue.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hrun/tasks_required/remote_queue/src/remote_queue.cc b/hrun/tasks_required/remote_queue/src/remote_queue.cc index 049783c6d..6c37a4202 100644 --- a/hrun/tasks_required/remote_queue/src/remote_queue.cc +++ b/hrun/tasks_required/remote_queue/src/remote_queue.cc @@ -81,8 +81,9 @@ class Server : public TaskLib { if (!task->orig_task_->IsLongRunning()) { task->orig_task_->SetModuleComplete(); } else { - task->orig_task_->UnsetStarted(); - task->orig_task_->UnsetDisableRun(); + task->orig_task_->SetModuleComplete(); +// task->orig_task_->UnsetStarted(); +// task->orig_task_->UnsetDisableRun(); } task->SetModuleComplete(); } From f45f65882455bd79f8832c4ce669d2b589e6ef81 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:35:09 -0500 Subject: [PATCH 52/70] Be careful about SetModulecomplete --- hrun/tasks_required/remote_queue/src/remote_queue.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/hrun/tasks_required/remote_queue/src/remote_queue.cc b/hrun/tasks_required/remote_queue/src/remote_queue.cc index 6c37a4202..25c4d2c65 100644 --- a/hrun/tasks_required/remote_queue/src/remote_queue.cc +++ b/hrun/tasks_required/remote_queue/src/remote_queue.cc @@ -80,12 +80,11 @@ class Server : public TaskLib { task->orig_task_->method_); if (!task->orig_task_->IsLongRunning()) { task->orig_task_->SetModuleComplete(); + task->SetModuleComplete(); } else { - task->orig_task_->SetModuleComplete(); -// task->orig_task_->UnsetStarted(); -// task->orig_task_->UnsetDisableRun(); + task->orig_task_->UnsetStarted(); + task->orig_task_->UnsetDisableRun(); } - task->SetModuleComplete(); } /** Push for small message */ From cff23b580013a0e8a78237ecc2dff231eaf20731 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:36:11 -0500 Subject: [PATCH 53/70] Don't set module complete for right this second --- include/hermes/config_manager.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/hermes/config_manager.h b/include/hermes/config_manager.h index 30a1bd13f..1cf44a040 100644 --- a/include/hermes/config_manager.h +++ b/include/hermes/config_manager.h @@ -50,8 +50,8 @@ class ConfigurationManager { mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_mdm"); blob_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_blob_mdm"); bkt_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_bkt_mdm"); -// op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm", -// bkt_mdm_.id_, blob_mdm_.id_); + op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm", + bkt_mdm_.id_, blob_mdm_.id_); stager_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_stager_mdm", blob_mdm_.id_); blob_mdm_.SetBucketMdmRoot(DomainId::GetGlobal(), From d137d1afb87043d0b9df2ce070dc881d73cf09aa Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:39:54 -0500 Subject: [PATCH 54/70] Put back module complete --- hrun/tasks_required/remote_queue/src/remote_queue.cc | 2 +- include/hermes/config_manager.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hrun/tasks_required/remote_queue/src/remote_queue.cc b/hrun/tasks_required/remote_queue/src/remote_queue.cc index 25c4d2c65..049783c6d 100644 --- a/hrun/tasks_required/remote_queue/src/remote_queue.cc +++ b/hrun/tasks_required/remote_queue/src/remote_queue.cc @@ -80,11 +80,11 @@ class Server : public TaskLib { task->orig_task_->method_); if (!task->orig_task_->IsLongRunning()) { task->orig_task_->SetModuleComplete(); - task->SetModuleComplete(); } else { task->orig_task_->UnsetStarted(); task->orig_task_->UnsetDisableRun(); } + task->SetModuleComplete(); } /** Push for small message */ diff --git a/include/hermes/config_manager.h b/include/hermes/config_manager.h index 1cf44a040..30a1bd13f 100644 --- a/include/hermes/config_manager.h +++ b/include/hermes/config_manager.h @@ -50,8 +50,8 @@ class ConfigurationManager { mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_mdm"); blob_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_blob_mdm"); bkt_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_bkt_mdm"); - op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm", - bkt_mdm_.id_, blob_mdm_.id_); +// op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm", +// bkt_mdm_.id_, blob_mdm_.id_); stager_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_stager_mdm", blob_mdm_.id_); blob_mdm_.SetBucketMdmRoot(DomainId::GetGlobal(), From 0cfb3bd75b6c1ae0b560566361ad4c421945f45b Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:46:42 -0500 Subject: [PATCH 55/70] Staging in async blob back --- .../data_stager/factory/binary_stager.h | 18 +++++++++--------- test/unit/hermes/test_bucket.cc | 1 - 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/tasks/data_stager/include/data_stager/factory/binary_stager.h b/tasks/data_stager/include/data_stager/factory/binary_stager.h index 4078af03e..cea23d787 100644 --- a/tasks/data_stager/include/data_stager/factory/binary_stager.h +++ b/tasks/data_stager/include/data_stager/factory/binary_stager.h @@ -76,15 +76,15 @@ class BinaryFileStager : public AbstractStager { HILOG(kDebug, "Submitting put blob {} ({}) to blob mdm ({})", task->blob_name_->str(), task->bkt_id_, blob_mdm.id_) hapi::Context ctx; -// LPointer put_task = -// blob_mdm.AsyncPutBlob(task->task_node_ + 1, -// task->bkt_id_, -// hshm::to_charbuf(*task->blob_name_), -// hermes::BlobId::GetNull(), -// 0, real_size, blob.shm_, task->score_, 0, -// ctx, TASK_DATA_OWNER | TASK_LOW_LATENCY); -// put_task->Wait(task); -// HRUN_CLIENT->DelTask(put_task); + LPointer put_task = + blob_mdm.AsyncPutBlob(task->task_node_ + 1, + task->bkt_id_, + hshm::to_charbuf(*task->blob_name_), + hermes::BlobId::GetNull(), + 0, real_size, blob.shm_, task->score_, 0, + ctx, TASK_DATA_OWNER | TASK_LOW_LATENCY); + put_task->Wait(task); + HRUN_CLIENT->DelTask(put_task); } /** Stage data out to remote source */ diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index 71bd8fe34..f141825d6 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -470,7 +470,6 @@ TEST_CASE("TestHermesDataStager") { hshm::charbuf url = BinaryFileStager::BuildFileUrl(path, page_size); hermes::Bucket bkt(url.str(), file_size, HERMES_IS_FILE); - sleep(5); // Put a few blobs in the bucket for (size_t i = off; i < proc_count; ++i) { From 32498a38363c8f55e1f468680e69f0126c8543d6 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:48:40 -0500 Subject: [PATCH 56/70] Add back op mdm --- include/hermes/config_manager.h | 4 ++-- tasks/hermes_data_op/src/hermes_data_op.cc | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/include/hermes/config_manager.h b/include/hermes/config_manager.h index 30a1bd13f..1cf44a040 100644 --- a/include/hermes/config_manager.h +++ b/include/hermes/config_manager.h @@ -50,8 +50,8 @@ class ConfigurationManager { mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_mdm"); blob_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_blob_mdm"); bkt_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_bkt_mdm"); -// op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm", -// bkt_mdm_.id_, blob_mdm_.id_); + op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm", + bkt_mdm_.id_, blob_mdm_.id_); stager_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_stager_mdm", blob_mdm_.id_); blob_mdm_.SetBucketMdmRoot(DomainId::GetGlobal(), diff --git a/tasks/hermes_data_op/src/hermes_data_op.cc b/tasks/hermes_data_op/src/hermes_data_op.cc index 72d9f3db7..5c8ba8262 100644 --- a/tasks/hermes_data_op/src/hermes_data_op.cc +++ b/tasks/hermes_data_op/src/hermes_data_op.cc @@ -39,7 +39,7 @@ class Server : public TaskLib { op_id_map_["min"] = 0; op_id_map_["max"] = 1; op_graphs_.resize(HRUN_QM_RUNTIME->max_lanes_); - run_task_ = client_.AsyncRunOp(task->task_node_ + 1); + // run_task_ = client_.AsyncRunOp(task->task_node_ + 1); task->SetModuleComplete(); } From 8521e464c0cafce1d6126198348d84eb8fbbf528 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:52:37 -0500 Subject: [PATCH 57/70] Comment out all of the constructor --- tasks/hermes_data_op/src/hermes_data_op.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tasks/hermes_data_op/src/hermes_data_op.cc b/tasks/hermes_data_op/src/hermes_data_op.cc index 5c8ba8262..1ce7c2aa0 100644 --- a/tasks/hermes_data_op/src/hermes_data_op.cc +++ b/tasks/hermes_data_op/src/hermes_data_op.cc @@ -32,13 +32,13 @@ class Server : public TaskLib { Server() = default; void Construct(ConstructTask *task, RunContext &rctx) { - task->Deserialize(); - bkt_mdm_.Init(task->bkt_mdm_); - blob_mdm_.Init(task->blob_mdm_); - client_.Init(id_); - op_id_map_["min"] = 0; - op_id_map_["max"] = 1; - op_graphs_.resize(HRUN_QM_RUNTIME->max_lanes_); +// task->Deserialize(); +// bkt_mdm_.Init(task->bkt_mdm_); +// blob_mdm_.Init(task->blob_mdm_); +// client_.Init(id_); +// op_id_map_["min"] = 0; +// op_id_map_["max"] = 1; +// op_graphs_.resize(HRUN_QM_RUNTIME->max_lanes_); // run_task_ = client_.AsyncRunOp(task->task_node_ + 1); task->SetModuleComplete(); } From 6271c44c1c4cd13a5b7db027d99a4713bdd57cca Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 10:55:56 -0500 Subject: [PATCH 58/70] remove serialize method from data op --- .../include/hermes_data_op/hermes_data_op_tasks.h | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tasks/hermes_data_op/include/hermes_data_op/hermes_data_op_tasks.h b/tasks/hermes_data_op/include/hermes_data_op/hermes_data_op_tasks.h index 6f0d97d02..a76101f59 100644 --- a/tasks/hermes_data_op/include/hermes_data_op/hermes_data_op_tasks.h +++ b/tasks/hermes_data_op/include/hermes_data_op/hermes_data_op_tasks.h @@ -106,13 +106,6 @@ struct ConstructTask : public CreateTaskStateTask { ar(bkt_mdm_, blob_mdm_); } - /** (De)serialize message call */ - template - void SerializeStart(Ar &ar) { - CreateTaskStateTask::SerializeStart(ar); - ar(bkt_mdm_, blob_mdm_); - } - HSHM_ALWAYS_INLINE ~ConstructTask() { // Custom params From 76cf4350d6ea09b73d1d64d33ba37c9aa6994fc0 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 19:43:57 -0500 Subject: [PATCH 59/70] Log staging --- tasks/data_stager/src/data_stager.cc | 2 ++ tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 2 ++ tasks/hermes_data_op/src/hermes_data_op.cc | 16 ++++++++-------- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 8d954c099..377670d46 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -57,8 +57,10 @@ class Server : public TaskLib { task->SetModuleComplete(); return; } + HILOG(kInfo, "Staging in bucket: {}", task->bkt_id_); std::unique_ptr &stager = it->second; stager->StageIn(blob_mdm_, task, rctx); + HILOG(kInfo, "Finished staging in bucket: {}", task->bkt_id_); task->SetModuleComplete(); } diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index af2f7f6f6..60c241b13 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -242,6 +242,8 @@ class Server : public TaskLib { blob_info.last_flush_ = 0; blob_info.UpdateWriteStats(); if (task->flags_.Any(HERMES_IS_FILE)) { + HILOG(kInfo, "Staging in using stager mdm {} on bucket {}", + stager_mdm_.id_, task->tag_id_); blob_info.mod_count_ = 1; blob_info.last_flush_ = 1; LPointer stage_task = diff --git a/tasks/hermes_data_op/src/hermes_data_op.cc b/tasks/hermes_data_op/src/hermes_data_op.cc index 1ce7c2aa0..72d9f3db7 100644 --- a/tasks/hermes_data_op/src/hermes_data_op.cc +++ b/tasks/hermes_data_op/src/hermes_data_op.cc @@ -32,14 +32,14 @@ class Server : public TaskLib { Server() = default; void Construct(ConstructTask *task, RunContext &rctx) { -// task->Deserialize(); -// bkt_mdm_.Init(task->bkt_mdm_); -// blob_mdm_.Init(task->blob_mdm_); -// client_.Init(id_); -// op_id_map_["min"] = 0; -// op_id_map_["max"] = 1; -// op_graphs_.resize(HRUN_QM_RUNTIME->max_lanes_); - // run_task_ = client_.AsyncRunOp(task->task_node_ + 1); + task->Deserialize(); + bkt_mdm_.Init(task->bkt_mdm_); + blob_mdm_.Init(task->blob_mdm_); + client_.Init(id_); + op_id_map_["min"] = 0; + op_id_map_["max"] = 1; + op_graphs_.resize(HRUN_QM_RUNTIME->max_lanes_); + run_task_ = client_.AsyncRunOp(task->task_node_ + 1); task->SetModuleComplete(); } From c411ca915cc02dac0709709a15aacf5cc5111689 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 19:48:53 -0500 Subject: [PATCH 60/70] reaccess blob print --- tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index 60c241b13..bd54d9eab 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -255,6 +255,8 @@ class Server : public TaskLib { HRUN_CLIENT->DelTask(stage_task); } } else { + HILOG(kInfo, "Reaccessing a blob using stager mdm {} on bucket {}", + stager_mdm_.id_, task->tag_id_); // Modify existing blob blob_info.UpdateWriteStats(); } From 10ce44dd3166fcef29b602386cac5c32809a8938 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 20:01:29 -0500 Subject: [PATCH 61/70] Larger stack size for sanity --- hrun/include/hrun/task_registry/task.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hrun/include/hrun/task_registry/task.h b/hrun/include/hrun/task_registry/task.h index 6a6a3d451..05b332a76 100644 --- a/hrun/include/hrun/task_registry/task.h +++ b/hrun/include/hrun/task_registry/task.h @@ -251,7 +251,7 @@ struct WorkPending { struct RunContext { u32 lane_id_; /**< The lane id of the task */ bctx::transfer_t jmp_; /**< Current execution state of the task (runtime) */ - size_t stack_size_ = KILOBYTES(64); /**< The size of the stack for the task (runtime) */ + size_t stack_size_ = KILOBYTES(128); /**< The size of the stack for the task (runtime) */ void *stack_ptr_; /**< The pointer to the stack (runtime) */ TaskLib *exec_; WorkPending *flush_; From 763c27baac6cc0c4cc877c4150549b1623e6aca5 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 20:39:23 -0500 Subject: [PATCH 62/70] Comment out IS_FILE --- tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 28 ++++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index bd54d9eab..45ce600e8 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -242,17 +242,17 @@ class Server : public TaskLib { blob_info.last_flush_ = 0; blob_info.UpdateWriteStats(); if (task->flags_.Any(HERMES_IS_FILE)) { - HILOG(kInfo, "Staging in using stager mdm {} on bucket {}", - stager_mdm_.id_, task->tag_id_); - blob_info.mod_count_ = 1; - blob_info.last_flush_ = 1; - LPointer stage_task = - stager_mdm_.AsyncStageIn(task->task_node_ + 1, - task->tag_id_, - blob_info.name_, - task->score_, 0); - stage_task->Wait(task); - HRUN_CLIENT->DelTask(stage_task); +// HILOG(kInfo, "Staging in using stager mdm {} on bucket {}", +// stager_mdm_.id_, task->tag_id_); +// blob_info.mod_count_ = 1; +// blob_info.last_flush_ = 1; +// LPointer stage_task = +// stager_mdm_.AsyncStageIn(task->task_node_ + 1, +// task->tag_id_, +// blob_info.name_, +// task->score_, 0); +// stage_task->Wait(task); +// HRUN_CLIENT->DelTask(stage_task); } } else { HILOG(kInfo, "Reaccessing a blob using stager mdm {} on bucket {}", @@ -339,9 +339,9 @@ class Server : public TaskLib { // Update information int update_mode = bucket_mdm::UpdateSizeMode::kAdd; - if (task->flags_.Any(HERMES_IS_FILE)) { - update_mode = bucket_mdm::UpdateSizeMode::kCap; - } +// if (task->flags_.Any(HERMES_IS_FILE)) { +// update_mode = bucket_mdm::UpdateSizeMode::kCap; +// } bkt_mdm_.AsyncUpdateSize(task->task_node_ + 1, task->tag_id_, task->blob_off_ + task->data_size_, From 90e4010004a982401cb82903ade5ee846e637fbb Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 20:43:42 -0500 Subject: [PATCH 63/70] Add back StageIn --- tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 22 ++++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index 45ce600e8..f0c84dbf9 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -242,17 +242,17 @@ class Server : public TaskLib { blob_info.last_flush_ = 0; blob_info.UpdateWriteStats(); if (task->flags_.Any(HERMES_IS_FILE)) { -// HILOG(kInfo, "Staging in using stager mdm {} on bucket {}", -// stager_mdm_.id_, task->tag_id_); -// blob_info.mod_count_ = 1; -// blob_info.last_flush_ = 1; -// LPointer stage_task = -// stager_mdm_.AsyncStageIn(task->task_node_ + 1, -// task->tag_id_, -// blob_info.name_, -// task->score_, 0); -// stage_task->Wait(task); -// HRUN_CLIENT->DelTask(stage_task); + HILOG(kInfo, "Staging in using stager mdm {} on bucket {}", + stager_mdm_.id_, task->tag_id_); + blob_info.mod_count_ = 1; + blob_info.last_flush_ = 1; + LPointer stage_task = + stager_mdm_.AsyncStageIn(task->task_node_ + 1, + task->tag_id_, + blob_info.name_, + task->score_, 0); + stage_task->Wait(task); + HRUN_CLIENT->DelTask(stage_task); } } else { HILOG(kInfo, "Reaccessing a blob using stager mdm {} on bucket {}", From b2fcd2557c10c22eef132fa37db1b321d54635e1 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 20:47:38 -0500 Subject: [PATCH 64/70] Make StageIn unordered? --- .../include/data_stager/data_stager_tasks.h | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tasks/data_stager/include/data_stager/data_stager_tasks.h b/tasks/data_stager/include/data_stager/data_stager_tasks.h index 2c3c5fd22..7dfa1da74 100644 --- a/tasks/data_stager/include/data_stager/data_stager_tasks.h +++ b/tasks/data_stager/include/data_stager/data_stager_tasks.h @@ -230,10 +230,10 @@ struct UnregisterStagerTask : public Task, TaskFlags { * A task to stage in data from a remote source * */ struct StageInTask : public Task, TaskFlags { - hermes::BucketId bkt_id_; - hipc::ShmArchive blob_name_; - float score_; - u32 node_id_; + IN hermes::BucketId bkt_id_; + IN hipc::ShmArchive blob_name_; + IN float score_; + IN u32 node_id_; /** SHM default constructor */ HSHM_ALWAYS_INLINE explicit @@ -273,10 +273,11 @@ struct StageInTask : public Task, TaskFlags { /** Create group */ HSHM_ALWAYS_INLINE u32 GetGroup(hshm::charbuf &group) { - hrun::LocalSerialize srl(group); - srl << bkt_id_.unique_; - srl << bkt_id_.node_id_; - return 0; +// hrun::LocalSerialize srl(group); +// srl << bkt_id_.unique_; +// srl << bkt_id_.node_id_; +// return 0; + return TASK_UNORDERED; } }; From 268d78548b48d99b65edc2c99d8257dede230a81 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 21:00:43 -0500 Subject: [PATCH 65/70] RegisterStager --- .../include/data_stager/data_stager_tasks.h | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/tasks/data_stager/include/data_stager/data_stager_tasks.h b/tasks/data_stager/include/data_stager/data_stager_tasks.h index 7dfa1da74..a9bcb44f9 100644 --- a/tasks/data_stager/include/data_stager/data_stager_tasks.h +++ b/tasks/data_stager/include/data_stager/data_stager_tasks.h @@ -156,10 +156,7 @@ struct RegisterStagerTask : public Task, TaskFlags { /** Create group */ HSHM_ALWAYS_INLINE u32 GetGroup(hshm::charbuf &group) { - hrun::LocalSerialize srl(group); - srl << bkt_id_.unique_; - srl << bkt_id_.node_id_; - return 0; + return TASK_UNORDERED; } }; @@ -167,7 +164,7 @@ struct RegisterStagerTask : public Task, TaskFlags { * Unregister a new stager * */ struct UnregisterStagerTask : public Task, TaskFlags { - hermes::BucketId bkt_id_; + IN hermes::BucketId bkt_id_; /** SHM default constructor */ HSHM_ALWAYS_INLINE explicit @@ -285,10 +282,10 @@ struct StageInTask : public Task, TaskFlags { * A task to stage data out of a hermes to a remote source * */ struct StageOutTask : public Task, TaskFlags { - hermes::BucketId bkt_id_; - hipc::ShmArchive blob_name_; - hipc::Pointer data_; - size_t data_size_; + IN hermes::BucketId bkt_id_; + IN hipc::ShmArchive blob_name_; + IN hipc::Pointer data_; + IN size_t data_size_; /** SHM default constructor */ HSHM_ALWAYS_INLINE explicit @@ -332,10 +329,7 @@ struct StageOutTask : public Task, TaskFlags { /** Create group */ HSHM_ALWAYS_INLINE u32 GetGroup(hshm::charbuf &group) { - hrun::LocalSerialize srl(group); - srl << bkt_id_.unique_; - srl << bkt_id_.node_id_; - return 0; + return TASK_UNORDERED; } }; From 7ee26b5b1d0403802969947355a16e5e2e186f74 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 21:40:14 -0500 Subject: [PATCH 66/70] Add back checks --- test/unit/hermes/test_bucket.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/unit/hermes/test_bucket.cc b/test/unit/hermes/test_bucket.cc index f141825d6..39731f052 100644 --- a/test/unit/hermes/test_bucket.cc +++ b/test/unit/hermes/test_bucket.cc @@ -481,11 +481,11 @@ TEST_CASE("TestHermesDataStager") { bkt.PartialPut(blob_name.str(), blob, 0, ctx); hermes::Blob blob2; bkt.Get(blob_name.str(), blob2, ctx); - // REQUIRE(blob2.size() == page_size); + REQUIRE(blob2.size() == page_size); hermes::Blob full_blob(page_size); memcpy(full_blob.data(), blob.data(), blob.size()); memcpy(full_blob.data() + blob.size(), data.data(), page_size / 2); - // REQUIRE(full_blob == blob2); + REQUIRE(full_blob == blob2); } for (size_t i = off; i < proc_count; ++i) { hshm::charbuf blob_name = hermes::adapter::BlobPlacement::CreateBlobName(i); From c74f147766d13b992aa5e05f2383ad05f4996f40 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 21:43:13 -0500 Subject: [PATCH 67/70] Add back size --- tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index f0c84dbf9..bd54d9eab 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -339,9 +339,9 @@ class Server : public TaskLib { // Update information int update_mode = bucket_mdm::UpdateSizeMode::kAdd; -// if (task->flags_.Any(HERMES_IS_FILE)) { -// update_mode = bucket_mdm::UpdateSizeMode::kCap; -// } + if (task->flags_.Any(HERMES_IS_FILE)) { + update_mode = bucket_mdm::UpdateSizeMode::kCap; + } bkt_mdm_.AsyncUpdateSize(task->task_node_ + 1, task->tag_id_, task->blob_off_ + task->data_size_, From dfb21c2a13f3225d247c7243621c937bb8eca220 Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 21:59:38 -0500 Subject: [PATCH 68/70] Don't finish task if stager DNE for now --- tasks/data_stager/src/data_stager.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tasks/data_stager/src/data_stager.cc b/tasks/data_stager/src/data_stager.cc index 377670d46..55dc298b9 100644 --- a/tasks/data_stager/src/data_stager.cc +++ b/tasks/data_stager/src/data_stager.cc @@ -36,7 +36,6 @@ class Server : public TaskLib { std::string url = task->url_->str(); std::unique_ptr stager = StagerFactory::Get(url); stager->RegisterStager(task, rctx); - HILOG(kInfo, "(node {}) REGISTERING STAGER: {}", HRUN_CLIENT->node_id_, (size_t)stager.get()); url_map_[rctx.lane_id_].emplace(task->bkt_id_, std::move(stager)); task->SetModuleComplete(); } @@ -54,13 +53,12 @@ class Server : public TaskLib { url_map_[rctx.lane_id_].find(task->bkt_id_); if (it == url_map_[rctx.lane_id_].end()) { HELOG(kError, "Could not find stager for bucket: {}", task->bkt_id_); - task->SetModuleComplete(); + // TODO(llogan): Probably should add back... + // task->SetModuleComplete(); return; } - HILOG(kInfo, "Staging in bucket: {}", task->bkt_id_); std::unique_ptr &stager = it->second; stager->StageIn(blob_mdm_, task, rctx); - HILOG(kInfo, "Finished staging in bucket: {}", task->bkt_id_); task->SetModuleComplete(); } From 43324543a2a2af538fe0760b77ed7994a4f24acb Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 22:23:15 -0500 Subject: [PATCH 69/70] Make stack size 64kb again --- hrun/include/hrun/task_registry/task.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hrun/include/hrun/task_registry/task.h b/hrun/include/hrun/task_registry/task.h index 05b332a76..6a6a3d451 100644 --- a/hrun/include/hrun/task_registry/task.h +++ b/hrun/include/hrun/task_registry/task.h @@ -251,7 +251,7 @@ struct WorkPending { struct RunContext { u32 lane_id_; /**< The lane id of the task */ bctx::transfer_t jmp_; /**< Current execution state of the task (runtime) */ - size_t stack_size_ = KILOBYTES(128); /**< The size of the stack for the task (runtime) */ + size_t stack_size_ = KILOBYTES(64); /**< The size of the stack for the task (runtime) */ void *stack_ptr_; /**< The pointer to the stack (runtime) */ TaskLib *exec_; WorkPending *flush_; From 4d62147cad8e891cbd5808f6e3d23351341d962d Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Fri, 20 Oct 2023 22:24:31 -0500 Subject: [PATCH 70/70] Add back flushing task? --- tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index bd54d9eab..852a1fb44 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -119,7 +119,7 @@ class Server : public TaskLib { stager_mdm_.Init(task->stager_mdm_); op_mdm_.Init(task->op_mdm_); // TODO(llogan): Add back - // flush_task_ = blob_mdm_.AsyncFlushData(task->task_node_ + 1); + flush_task_ = blob_mdm_.AsyncFlushData(task->task_node_ + 1); } task->SetModuleComplete(); }