From 5b25662f873d3be8f56893f56b69d336f2315c9b Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Sat, 30 Dec 2023 08:32:38 -0600 Subject: [PATCH 1/2] Better checks for minus local --- hrun/include/hrun/hrun_types.h | 7 ++++--- hrun/include/hrun/task_registry/task.h | 15 --------------- hrun/include/hrun/work_orchestrator/worker.h | 9 +-------- hrun/tasks_required/proc_queue/src/proc_queue.cc | 3 --- .../include/remote_queue/remote_queue.h | 3 --- .../remote_queue/src/remote_queue.cc | 1 - 6 files changed, 5 insertions(+), 33 deletions(-) diff --git a/hrun/include/hrun/hrun_types.h b/hrun/include/hrun/hrun_types.h index d167add46..29a7183e1 100644 --- a/hrun/include/hrun/hrun_types.h +++ b/hrun/include/hrun/hrun_types.h @@ -114,12 +114,13 @@ struct DomainId { /** Domain has the local node */ HSHM_ALWAYS_INLINE bool IsRemote(size_t num_hosts, u32 this_node) const { - if (num_hosts == 1) { + if (num_hosts == 1 && !flags_.Any(kNoLocal)) { return false; } else { - return (flags_.Any(kGlobal | kSet) || (flags_.Any(kNode) && id_ != this_node)); + return + (flags_.Any(kGlobal | kSet | kNoLocal) || + (flags_.Any(kNode) && id_ != this_node)); } - // return flags_.Any(kGlobal | kSet | kNode); } /** DomainId representing the local node */ diff --git a/hrun/include/hrun/task_registry/task.h b/hrun/include/hrun/task_registry/task.h index c8723a1eb..8c4540e71 100644 --- a/hrun/include/hrun/task_registry/task.h +++ b/hrun/include/hrun/task_registry/task.h @@ -402,21 +402,6 @@ struct Task : public hipc::ShmContainer { task_flags_.UnsetBits(TASK_LANE_ALL); } - /** This task is a root task */ - HSHM_ALWAYS_INLINE bool IsRoot() { - return task_flags_.Any(TASK_IS_ROOT); - } - - /** Set this task as a root task */ - HSHM_ALWAYS_INLINE void SetRoot() { - task_flags_.SetBits(TASK_IS_ROOT); - } - - /** Unset this task a sa root task */ - HSHM_ALWAYS_INLINE void UnsetRoot() { - task_flags_.UnsetBits(TASK_IS_ROOT); - } - /** Set period in nanoseconds */ HSHM_ALWAYS_INLINE void SetPeriodNs(double ns) { period_ns_ = ns; diff --git a/hrun/include/hrun/work_orchestrator/worker.h b/hrun/include/hrun/work_orchestrator/worker.h index 9070713ad..f0e0ac9a0 100644 --- a/hrun/include/hrun/work_orchestrator/worker.h +++ b/hrun/include/hrun/work_orchestrator/worker.h @@ -487,14 +487,7 @@ class Worker { task->SetStarted(); } } else { - try { - exec->Run(task->method_, task, rctx); - } catch (std::exception &e) { - HELOG(kError, "(node {}) Worker {} caught an exception: {}", HRUN_CLIENT->node_id_, id_, e.what()); - } catch (...) { - HELOG(kError, "(node {}) Worker {} caught an unknown exception", HRUN_CLIENT->node_id_, id_); - - } + exec->Run(task->method_, task, rctx); task->SetStarted(); } task->DidRun(work_entry.cur_time_); diff --git a/hrun/tasks_required/proc_queue/src/proc_queue.cc b/hrun/tasks_required/proc_queue/src/proc_queue.cc index d576e6dc8..993c85ac4 100644 --- a/hrun/tasks_required/proc_queue/src/proc_queue.cc +++ b/hrun/tasks_required/proc_queue/src/proc_queue.cc @@ -47,9 +47,6 @@ class Server : public TaskLib { ptr->UnsetFireAndForget(); task->is_fire_forget_ = true; } - if (ptr->task_node_.IsRoot() || task->task_node_.IsRoot()) { - ptr->SetRoot(); - } MultiQueue *real_queue = HRUN_CLIENT->GetQueue(QueueId(ptr->task_state_)); bool ret = real_queue->EmplaceFrac( ptr->prio_, ptr->lane_hash_, task->sub_run_.shm_); diff --git a/hrun/tasks_required/remote_queue/include/remote_queue/remote_queue.h b/hrun/tasks_required/remote_queue/include/remote_queue/remote_queue.h index 358643f95..9f26996a5 100644 --- a/hrun/tasks_required/remote_queue/include/remote_queue/remote_queue.h +++ b/hrun/tasks_required/remote_queue/include/remote_queue/remote_queue.h @@ -74,9 +74,6 @@ class Client : public TaskLibClient { LPointer push_task = HRUN_CLIENT->NewTask( orig_task->task_node_ + 1, DomainId::GetLocal(), id_, domain_ids, orig_task, exec, orig_task->method_, xfer); - if (orig_task->IsRoot()) { - push_task->SetRoot(); - } MultiQueue *queue = HRUN_CLIENT->GetQueue(queue_id_); queue->Emplace(push_task->prio_, 0, push_task.shm_); } diff --git a/hrun/tasks_required/remote_queue/src/remote_queue.cc b/hrun/tasks_required/remote_queue/src/remote_queue.cc index b9a440f6b..2034014c4 100644 --- a/hrun/tasks_required/remote_queue/src/remote_queue.cc +++ b/hrun/tasks_required/remote_queue/src/remote_queue.cc @@ -395,7 +395,6 @@ class Server : public TaskLib { orig_task->UnsetStarted(); orig_task->UnsetDataOwner(); orig_task->UnsetLongRunning(); - orig_task->UnsetRoot(); orig_task->task_flags_.SetBits(TASK_REMOTE_DEBUG_MARK); // Execute task From 491cf740365f126475a94bad0a5910b0c600562f Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Sat, 30 Dec 2023 08:38:13 -0600 Subject: [PATCH 2/2] Add stopping --- .../hrun_admin/include/hrun_admin/hrun_admin.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hrun/tasks_required/hrun_admin/include/hrun_admin/hrun_admin.h b/hrun/tasks_required/hrun_admin/include/hrun_admin/hrun_admin.h index ad05c76db..b95faced4 100644 --- a/hrun/tasks_required/hrun_admin/include/hrun_admin/hrun_admin.h +++ b/hrun/tasks_required/hrun_admin/include/hrun_admin/hrun_admin.h @@ -178,9 +178,14 @@ class Client : public TaskLibClient { } HRUN_TASK_NODE_ADMIN_ROOT(StopRuntime); void StopRuntimeRoot() { + HILOG(kInfo, "Beginning to flush the runtime.\n" + "If you did async I/O, this may take some time.\n" + "All unflushed data will be written to the PFS.") FlushRoot(DomainId::GetGlobal()); + HILOG(kInfo, "Stopping the runtime") AsyncStopRuntimeRoot(DomainId::GetGlobalMinusLocal()); AsyncStopRuntimeRoot(DomainId::GetLocal()); + HILOG(kInfo, "All done!") } /** Set work orchestrator queue policy */