Skip to content

Commit

Permalink
Merge pull request #629 from lukemartinlogan/dev
Browse files Browse the repository at this point in the history
Fix flushing and data op
  • Loading branch information
lukemartinlogan authored Oct 21, 2023
2 parents efce6e3 + 4d62147 commit 6d4f5aa
Show file tree
Hide file tree
Showing 18 changed files with 208 additions and 88 deletions.
18 changes: 18 additions & 0 deletions hrun/include/hrun/api/template/hrun_task_cc.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
extern "C" {
void* alloc_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) {
hrun::TaskState *exec = reinterpret_cast<hrun::TaskState*>(
new TYPE_UNWRAP(TRAIT_CLASS)());
exec->Init(task->id_, state_name);
return exec;
}
void* create_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) {
hrun::TaskState *exec = reinterpret_cast<hrun::TaskState*>(
new TYPE_UNWRAP(TRAIT_CLASS)());
exec->Init(task->id_, state_name);
RunContext rctx(0);
exec->Run(hrun::TaskMethod::kConstruct, task, rctx);
return exec;
}
const char* get_task_lib_name(void) { return TASK_NAME; }
bool is_hrun_task_ = true;
}
8 changes: 8 additions & 0 deletions hrun/include/hrun/network/serialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ class BinaryInputArchive {
ss_.str(std::string((char*)param_xfer.data_, param_xfer.data_size_));
}

/** String constructor */
BinaryInputArchive(const std::string &params) : ar_(ss_) {
xfer_.resize(1);
xfer_[0].data_ = (void*)params.data();
xfer_[0].data_size_ = params.size();
ss_.str(params);
}

/** Deserialize using call */
template<typename T, typename ...Args>
BinaryInputArchive& operator()(T &var, Args &&...args) {
Expand Down
37 changes: 23 additions & 14 deletions hrun/include/hrun/task_registry/task_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,35 @@ class TaskLibClient {
};

extern "C" {
/** The two methods provided by all tasks */
/** Allocate a state (no construction) */
typedef TaskState* (*alloc_state_t)(Task *task, const char *state_name);
/** Allocate + construct a state */
typedef TaskState* (*create_state_t)(Task *task, const char *state_name);
/** Get the name of a task */
typedef const char* (*get_task_lib_name_t)(void);
} // extern c

/** Used internally by task source file */
#define HRUN_TASK_CC(TRAIT_CLASS, TASK_NAME) \
extern "C" { \
void* create_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) { \
hrun::TaskState *exec = reinterpret_cast<hrun::TaskState*>( \
new TYPE_UNWRAP(TRAIT_CLASS)()); \
exec->Init(task->id_, state_name); \
RunContext rctx(0); \
exec->Run(hrun::TaskMethod::kConstruct, task, rctx); \
return exec; \
} \
const char* get_task_lib_name(void) { return TASK_NAME; } \
bool is_hrun_task_ = true; \
}
#define HRUN_TASK_CC(TRAIT_CLASS, TASK_NAME)\
extern "C" {\
void* alloc_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) {\
hrun::TaskState *exec = reinterpret_cast<hrun::TaskState*>(\
new TYPE_UNWRAP(TRAIT_CLASS)());\
exec->Init(task->id_, state_name);\
return exec;\
}\
void* create_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) {\
hrun::TaskState *exec = reinterpret_cast<hrun::TaskState*>(\
new TYPE_UNWRAP(TRAIT_CLASS)());\
exec->Init(task->id_, state_name);\
RunContext rctx(0);\
exec->Run(hrun::TaskMethod::kConstruct, task, rctx);\
return exec;\
}\
const char* get_task_lib_name(void) { return TASK_NAME; }\
bool is_hrun_task_ = true;\
}

} // namespace hrun

#endif // HRUN_INCLUDE_HRUN_TASK_TASK_H_
38 changes: 26 additions & 12 deletions hrun/include/hrun/task_registry/task_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace hrun {
/** All information needed to create a trait */
struct TaskLibInfo {
void *lib_; /**< The dlfcn library */
alloc_state_t alloc_state_; /**< The create task function */
create_state_t create_state_; /**< The create task function */
get_task_lib_name_t get_task_lib_name; /**< The get task name function */

Expand All @@ -44,22 +45,27 @@ struct TaskLibInfo {

/** Emplace constructor */
explicit TaskLibInfo(void *lib,
create_state_t create_task,
alloc_state_t alloc_state,
create_state_t create_state,
get_task_lib_name_t get_task_name)
: lib_(lib), create_state_(create_task), get_task_lib_name(get_task_name) {}
: lib_(lib), alloc_state_(alloc_state),
create_state_(create_state), get_task_lib_name(get_task_name) {}

/** Copy constructor */
TaskLibInfo(const TaskLibInfo &other)
: lib_(other.lib_),
alloc_state_(other.alloc_state_),
create_state_(other.create_state_),
get_task_lib_name(other.get_task_lib_name) {}

/** Move constructor */
TaskLibInfo(TaskLibInfo &&other) noexcept
: lib_(other.lib_),
alloc_state_(other.alloc_state_),
create_state_(other.create_state_),
get_task_lib_name(other.get_task_lib_name) {
other.lib_ = nullptr;
other.alloc_state_ = nullptr;
other.create_state_ = nullptr;
other.get_task_lib_name = nullptr;
}
Expand Down Expand Up @@ -157,6 +163,13 @@ class TaskRegistry {
lib_path);
return false;
}
info.alloc_state_ = (alloc_state_t)dlsym(
info.lib_, "alloc_state");
if (!info.alloc_state_) {
HELOG(kError, "The lib {} does not have alloc_state symbol",
lib_path);
return false;
}
info.get_task_lib_name = (get_task_lib_name_t)dlsym(
info.lib_, "get_task_lib_name");
if (!info.get_task_lib_name) {
Expand Down Expand Up @@ -201,15 +214,15 @@ class TaskRegistry {
* Create a task state
* state_id must not be NULL.
* */
bool CreateTaskState(const char *lib_name,
const char *state_name,
const TaskStateId &state_id,
Admin::CreateTaskStateTask *task) {
TaskState* CreateTaskState(const char *lib_name,
const char *state_name,
const TaskStateId &state_id,
Admin::CreateTaskStateTask *task) {
// Ensure state_id is not NULL
if (state_id.IsNull()) {
HILOG(kError, "The task state ID cannot be null");
task->SetModuleComplete();
return false;
return nullptr;
}
// HILOG(kInfo, "(node {}) Creating an instance of {} with name {}",
// HRUN_CLIENT->node_id_, lib_name, state_name)
Expand All @@ -219,24 +232,25 @@ class TaskRegistry {
if (it == libs_.end()) {
HELOG(kError, "Could not find the task lib: {}", lib_name);
task->SetModuleComplete();
return false;
return nullptr;
}

// Ensure the task state does not already exist
if (TaskStateExists(state_id)) {
HELOG(kError, "The task state already exists: {}", state_name);
task->SetModuleComplete();
return true;
return nullptr;
}

// Create the state instance
task->id_ = state_id;
TaskLibInfo &info = it->second;
TaskState *task_state = info.create_state_(task, state_name);
TaskState *task_state;
task_state = info.create_state_(task, state_name);
if (!task_state) {
HELOG(kError, "Could not create the task state: {}", state_name);
task->SetModuleComplete();
return false;
return nullptr;
}

// Add the state to the registry
Expand All @@ -247,7 +261,7 @@ class TaskRegistry {
task_states_.emplace(state_id, task_state);
HILOG(kInfo, "(node {}) Created an instance of {} with name {} and ID {}",
HRUN_CLIENT->node_id_, lib_name, state_name, state_id)
return true;
return task_state;
}

/** Get or create a task state's ID */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ struct CreateTaskStateTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
IN hipc::ShmArchive<hipc::string> state_name_;
IN hipc::ShmArchive<hipc::vector<PriorityInfo>> queue_info_;
INOUT TaskStateId id_;
IN hipc::ShmArchive<hipc::string> custom_;

/** SHM default constructor */
HSHM_ALWAYS_INLINE explicit
Expand Down Expand Up @@ -170,6 +171,7 @@ struct CreateTaskStateTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
HSHM_MAKE_AR(state_name_, alloc, state_name);
HSHM_MAKE_AR(lib_name_, alloc, lib_name);
HSHM_MAKE_AR(queue_info_, alloc, queue_info);
HSHM_MAKE_AR(custom_, alloc, "");
id_ = id;
}

Expand All @@ -178,6 +180,7 @@ struct CreateTaskStateTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
HSHM_DESTROY_AR(state_name_);
HSHM_DESTROY_AR(lib_name_);
HSHM_DESTROY_AR(queue_info_);
HSHM_DESTROY_AR(custom_);
}

/** Duplicate message */
Expand All @@ -203,7 +206,7 @@ struct CreateTaskStateTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
template<typename Ar>
void SerializeStart(Ar &ar) {
task_serialize<Ar>(ar);
ar(lib_name_, state_name_, id_, queue_info_);
ar(lib_name_, state_name_, id_, queue_info_, custom_);
}

/** (De)serialize message return */
Expand Down
4 changes: 2 additions & 2 deletions hrun/tasks_required/hrun_admin/src/hrun_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ class Server : public TaskLib {
QueueId qid(task->id_);
MultiQueue *queue = HRUN_QM_RUNTIME->CreateQueue(
qid, task->queue_info_->vec());
// Run the task state's submethod
// Allocate the task state
task->method_ = Method::kConstruct;
bool ret = HRUN_TASK_REGISTRY->CreateTaskState(
HRUN_TASK_REGISTRY->CreateTaskState(
lib_name.c_str(),
state_name.c_str(),
task->id_,
Expand Down
17 changes: 11 additions & 6 deletions hrun/tasks_required/remote_queue/src/remote_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class Server : public TaskLib {
// Process the message
TaskState *exec;
Task *orig_task;
RpcExec(req, state_id, method, xfer, orig_task, exec);
RpcExec(req, state_id, method, params, xfer, orig_task, exec);
RpcComplete(req, method, orig_task, exec, state_id);
}

Expand Down Expand Up @@ -235,7 +235,7 @@ class Server : public TaskLib {
}
TaskState *exec;
Task *orig_task;
RpcExec(req, state_id, method, xfer, orig_task, exec);
RpcExec(req, state_id, method, params, xfer, orig_task, exec);
if (io_type == IoType::kRead) {
HILOG(kDebug, "(node {}) Read blob integer: {}", HRUN_CLIENT->node_id_, (int)data[0])
HRUN_THALLIUM->IoCallServer(req, bulk, io_type, data.data(), data_size);
Expand All @@ -249,6 +249,7 @@ class Server : public TaskLib {
void RpcExec(const tl::request &req,
const TaskStateId &state_id,
u32 method,
std::string &params,
std::vector<DataTransfer> &xfer,
Task *&orig_task, TaskState *&exec) {
size_t data_size = xfer[0].data_size_;
Expand All @@ -268,16 +269,20 @@ class Server : public TaskLib {
}
TaskPointer task_ptr = exec->LoadStart(method, ar);
orig_task = task_ptr.ptr_;
hipc::Pointer &p = task_ptr.shm_;
orig_task->domain_id_ = DomainId::GetNode(HRUN_CLIENT->node_id_);

// Execute task
MultiQueue *queue = HRUN_CLIENT->GetQueue(QueueId(state_id));
// Unset task flags
// NOTE(llogan): Remote tasks are executed to completion and
// return values sent back to the remote host. This is
// for things like long-running monitoring tasks.
orig_task->UnsetFireAndForget();
orig_task->UnsetStarted();
orig_task->UnsetDataOwner();
orig_task->UnsetLongRunning();
queue->Emplace(orig_task->prio_, orig_task->lane_hash_, p);

// Execute task
MultiQueue *queue = HRUN_CLIENT->GetQueue(QueueId(state_id));
queue->Emplace(orig_task->prio_, orig_task->lane_hash_, task_ptr.shm_);
HILOG(kDebug,
"(node {}) Executing task (task_node={}, task_state={}/{}, state_name={}, method={}, size={}, lane_hash={})",
HRUN_CLIENT->node_id_,
Expand Down
4 changes: 2 additions & 2 deletions tasks/bdev/include/bdev/bdev.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ class Client : public TaskLibClient {
queue_info, dev_info);
}
void AsyncCreateComplete(ConstructTask *task) {
if (task->IsComplete()) {
if (task->IsModuleComplete()) {
id_ = task->id_;
queue_id_ = QueueId(id_);
monitor_task_ = AsyncMonitor(task->task_node_, 100).ptr_;
monitor_task_ = AsyncMonitor(task->task_node_ + 1, 100).ptr_;
HRUN_CLIENT->DelTask(task);
}
}
Expand Down
43 changes: 30 additions & 13 deletions tasks/data_stager/include/data_stager/data_stager_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,19 @@ struct ConstructTask : public CreateTaskStateTask {
"data_stager", id, queue_info) {
// Custom params
blob_mdm_ = blob_mdm;
std::stringstream ss;
cereal::BinaryOutputArchive ar(ss);
ar(blob_mdm_);
std::string data = ss.str();
*custom_ = data;
}

template<typename Ar>
void SerializeStart(Ar &ar) {
task_serialize<Ar>(ar);
ar(lib_name_, state_name_, id_, queue_info_, blob_mdm_);
/** Deserialize parameters */
void Deserialize() {
std::string data = custom_->str();
std::stringstream ss(data);
cereal::BinaryInputArchive ar(ss);
ar(blob_mdm_);
}

HSHM_ALWAYS_INLINE
Expand Down Expand Up @@ -113,6 +120,12 @@ struct RegisterStagerTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
HSHM_MAKE_AR(url_, alloc, url);
}

/** Destructor */
HSHM_ALWAYS_INLINE
~RegisterStagerTask() {
HSHM_DESTROY_AR(url_)
}

/** Duplicate message */
void Dup(hipc::Allocator *alloc, RegisterStagerTask &other) {
task_dup(other);
Expand Down Expand Up @@ -151,7 +164,7 @@ struct RegisterStagerTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
* Unregister a new stager
* */
struct UnregisterStagerTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
hermes::BucketId bkt_id_;
IN hermes::BucketId bkt_id_;

/** SHM default constructor */
HSHM_ALWAYS_INLINE explicit
Expand Down Expand Up @@ -214,10 +227,10 @@ struct UnregisterStagerTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
* A task to stage in data from a remote source
* */
struct StageInTask : public Task, TaskFlags<TF_LOCAL> {
hermes::BucketId bkt_id_;
hipc::ShmArchive<hipc::charbuf> blob_name_;
float score_;
u32 node_id_;
IN hermes::BucketId bkt_id_;
IN hipc::ShmArchive<hipc::charbuf> blob_name_;
IN float score_;
IN u32 node_id_;

/** SHM default constructor */
HSHM_ALWAYS_INLINE explicit
Expand Down Expand Up @@ -257,6 +270,10 @@ struct StageInTask : public Task, TaskFlags<TF_LOCAL> {
/** Create group */
HSHM_ALWAYS_INLINE
u32 GetGroup(hshm::charbuf &group) {
// hrun::LocalSerialize srl(group);
// srl << bkt_id_.unique_;
// srl << bkt_id_.node_id_;
// return 0;
return TASK_UNORDERED;
}
};
Expand All @@ -265,10 +282,10 @@ struct StageInTask : public Task, TaskFlags<TF_LOCAL> {
* A task to stage data out of a hermes to a remote source
* */
struct StageOutTask : public Task, TaskFlags<TF_LOCAL> {
hermes::BucketId bkt_id_;
hipc::ShmArchive<hipc::charbuf> blob_name_;
hipc::Pointer data_;
size_t data_size_;
IN hermes::BucketId bkt_id_;
IN hipc::ShmArchive<hipc::charbuf> blob_name_;
IN hipc::Pointer data_;
IN size_t data_size_;

/** SHM default constructor */
HSHM_ALWAYS_INLINE explicit
Expand Down
Loading

0 comments on commit 6d4f5aa

Please sign in to comment.