Skip to content

Commit

Permalink
SCHEDULE: Multithreading fixes (#463) (#470)
Browse files Browse the repository at this point in the history
* SCHEDULE: fix race in n_completed_tasks

* SCHEDULE: fix race in pipelined sched finalize

* SCHEDULE: fix race in sched task cleanup
(cherry picked from commit b07ebd9)
  • Loading branch information
valentin petrov authored Apr 15, 2022
1 parent e38647f commit c69c53b
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 13 deletions.
15 changes: 9 additions & 6 deletions src/schedule/ucc_schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "utils/ucc_compiler_def.h"
#include "components/base/ucc_base_iface.h"
#include "coll_score/ucc_coll_score.h"
#include "core/ucc_context.h"

ucc_status_t ucc_event_manager_init(ucc_event_manager_t *em)
{
Expand Down Expand Up @@ -43,6 +44,7 @@ ucc_status_t ucc_coll_task_init(ucc_coll_task_t *task,
task->n_deps = 0;
task->n_deps_satisfied = 0;
task->bargs.args.mask = 0;
task->schedule = NULL;
if (bargs) {
memcpy(&task->bargs, bargs, sizeof(*bargs));
}
Expand Down Expand Up @@ -101,11 +103,11 @@ ucc_schedule_completed_handler(ucc_coll_task_t *parent_task, //NOLINT
ucc_coll_task_t *task)
{
ucc_schedule_t *self = ucc_container_of(task, ucc_schedule_t, super);
uint32_t n_completed_tasks;

// TODO: do we need lock here?
// if tasks in schedule are independet and completes concurently
self->n_completed_tasks += 1;
if (self->n_completed_tasks == self->n_tasks) {
n_completed_tasks = ucc_atomic_fadd32(&self->n_completed_tasks, 1);

if (n_completed_tasks + 1 == self->n_tasks) {
self->super.status = UCC_OK;
ucc_task_complete(&self->super);
}
Expand All @@ -125,8 +127,9 @@ ucc_status_t ucc_schedule_init(ucc_schedule_t *schedule, ucc_base_coll_args_t *b

void ucc_schedule_add_task(ucc_schedule_t *schedule, ucc_coll_task_t *task)
{
ucc_event_manager_subscribe(&task->em, UCC_EVENT_COMPLETED,
&schedule->super, ucc_schedule_completed_handler);
ucc_event_manager_subscribe(&task->em, UCC_EVENT_COMPLETED_SCHEDULE,
&schedule->super,
ucc_schedule_completed_handler);
task->schedule = schedule;
schedule->tasks[schedule->n_tasks++] = task;
}
Expand Down
29 changes: 24 additions & 5 deletions src/schedule/ucc_schedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ typedef enum {
UCC_EVENT_COMPLETED = 0,
UCC_EVENT_SCHEDULE_STARTED,
UCC_EVENT_TASK_STARTED,
UCC_EVENT_COMPLETED_SCHEDULE, /*< Event is used to notify SCHEDULE that
one of its task has completed */
UCC_EVENT_ERROR,
UCC_EVENT_LAST
} ucc_event_t;
Expand Down Expand Up @@ -86,8 +88,8 @@ typedef struct ucc_context ucc_context_t;

typedef struct ucc_schedule {
ucc_coll_task_t super;
int n_completed_tasks;
int n_tasks;
uint32_t n_completed_tasks;
uint32_t n_tasks;
ucc_context_t *ctx;
ucc_coll_task_t *tasks[UCC_SCHEDULE_MAX_TASKS];
} ucc_schedule_t;
Expand Down Expand Up @@ -123,11 +125,23 @@ ucc_status_t ucc_triggered_post(ucc_ee_h ee, ucc_ev_t *ev,

static inline ucc_status_t ucc_task_complete(ucc_coll_task_t *task)
{
ucc_status_t status = task->status;
ucc_coll_callback_t cb = task->cb;
int has_cb = task->flags & UCC_COLL_TASK_FLAG_CB;
ucc_status_t status = task->status;
ucc_coll_callback_t cb = task->cb;
int has_cb = task->flags & UCC_COLL_TASK_FLAG_CB;
int has_sched = task->schedule != NULL;

ucc_assert((status == UCC_OK) || (status < 0));

/* If task is part of a schedule then it can be
released during ucc_event_manager_notify(EVENT_COMPLETED_SCHEDULE) below.
Sequence: notify => schedule->n_completed_tasks++ =>
shedule->super.status = UCC_OK => user releases schedule from another
thread => schedule_finalize => schedule finalizes all the tasks.
After that the task ptr should not be accessed.
This is why notification of schedule is done separatly in the end of
this function. Internal implementation must make sure that tasks
with schedules are not released during a callabck (if set). */

if (ucc_likely(status == UCC_OK)) {
status = ucc_event_manager_notify(task, UCC_EVENT_COMPLETED);
} else {
Expand All @@ -148,6 +162,10 @@ static inline ucc_status_t ucc_task_complete(ucc_coll_task_t *task)
if (has_cb) {
cb.cb(cb.data, status);
}
if (has_sched && status == UCC_OK) {
status = ucc_event_manager_notify(task, UCC_EVENT_COMPLETED_SCHEDULE);
}

return status;
}

Expand All @@ -164,4 +182,5 @@ static inline void ucc_task_subscribe_dep(ucc_coll_task_t *target,
#define UCC_TASK_CORE_CTX(_task) \
(((ucc_coll_task_t *)_task)->team->context->ucc_context)

#define UCC_TASK_THREAD_MODE(_task) (UCC_TASK_CORE_CTX(_task)->thread_mode)
#endif
20 changes: 18 additions & 2 deletions src/schedule/ucc_schedule_pipelined.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "ucc_schedule.h"
#include "ucc_schedule_pipelined.h"
#include "coll_score/ucc_coll_score.h"
#include "core/ucc_context.h"

static ucc_status_t ucc_frag_start_handler(ucc_coll_task_t *parent,
ucc_coll_task_t *task)
Expand Down Expand Up @@ -43,6 +44,10 @@ ucc_schedule_pipelined_completed_handler(ucc_coll_task_t *parent_task,
ucc_schedule_t *frag = ucc_derived_of(parent_task, ucc_schedule_t);
int i;

if (UCC_TASK_THREAD_MODE(task) == UCC_THREAD_MULTIPLE) {
ucc_recursive_spin_lock(&schedule->lock);
}

schedule->super.n_completed_tasks += 1;
schedule->n_frags_in_pipeline--;
ucc_trace_req(
Expand All @@ -52,6 +57,9 @@ ucc_schedule_pipelined_completed_handler(ucc_coll_task_t *parent_task,
ucc_assert(frag->super.status == UCC_OK);
if (schedule->super.n_completed_tasks == schedule->super.n_tasks) {
schedule->super.super.status = UCC_OK;
if (UCC_TASK_THREAD_MODE(task) == UCC_THREAD_MULTIPLE) {
ucc_recursive_spin_unlock(&schedule->lock);
}
ucc_task_complete(task);
return UCC_OK;
}
Expand All @@ -75,6 +83,9 @@ ucc_schedule_pipelined_completed_handler(ucc_coll_task_t *parent_task,
break;
}
}
if (UCC_TASK_THREAD_MODE(task) == UCC_THREAD_MULTIPLE) {
ucc_recursive_spin_unlock(&schedule->lock);
}
return UCC_OK;
}

Expand All @@ -89,6 +100,7 @@ ucc_status_t ucc_schedule_pipelined_finalize(ucc_coll_task_t *task)
for (i = 0; i < schedule_p->n_frags; i++) {
schedule_p->frags[i]->super.finalize(&frags[i]->super);
}
ucc_recursive_spinlock_destroy(&schedule_p->lock);
return UCC_OK;
}

Expand Down Expand Up @@ -143,6 +155,8 @@ ucc_status_t ucc_schedule_pipelined_init(
return status;
}

ucc_recursive_spinlock_init(&schedule->lock, 0);

schedule->super.n_tasks = n_frags_total;
schedule->n_frags = n_frags;
schedule->sequential = sequential;
Expand All @@ -158,6 +172,7 @@ ucc_status_t ucc_schedule_pipelined_init(
ucc_error("failed to initialize fragment for pipeline");
goto err;
}
frags[i]->super.schedule = &schedule->super;
frags[i]->super.status = UCC_OPERATION_INITIALIZED;
frags[i]->super.super.status = UCC_OPERATION_INITIALIZED;
}
Expand All @@ -176,8 +191,9 @@ ucc_status_t ucc_schedule_pipelined_init(
UCC_EVENT_SCHEDULE_STARTED,
&frags[i]->super, ucc_frag_start_handler);
ucc_event_manager_subscribe(
&frags[i]->super.em, UCC_EVENT_COMPLETED, &schedule->super.super,
ucc_schedule_pipelined_completed_handler);
&frags[i]->super.em, UCC_EVENT_COMPLETED_SCHEDULE,
&schedule->super.super,
ucc_schedule_pipelined_completed_handler);
}
return UCC_OK;
err:
Expand Down
1 change: 1 addition & 0 deletions src/schedule/ucc_schedule_pipelined.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ typedef struct ucc_schedule_pipelined {
int sequential;
int next_frag_to_post;
ucc_schedule_frag_setup_fn_t frag_setup;
ucc_recursive_spinlock_t lock;
} ucc_schedule_pipelined_t;

/* Creates a pipelined schedule for the algorithm defined by "frag_init".
Expand Down

0 comments on commit c69c53b

Please sign in to comment.