Skip to content

Commit

Permalink
Merge pull request #255 from shintaro-iwasaki/refactoring
Browse files Browse the repository at this point in the history
minor code refactoring
  • Loading branch information
shintaro-iwasaki authored Oct 7, 2020
2 parents 64b8ee9 + 1048e7f commit a9937e2
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 115 deletions.
7 changes: 0 additions & 7 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -851,13 +851,6 @@ int main() {

AM_CONDITIONAL([ABT_USE_INT128_ATOMIC], [test "x$ABT_CONFIG_HAVE_ATOMIC_INT128" != "x"])

# set compiler-specific flags
if test x"$CC" = x"pgcc" ; then
# Suppress PGI warning
# #546-D: transfer of control bypasses initialization
PAC_APPEND_FLAG([--diag_suppress=546], [CFLAGS])
fi

dnl ----------------------------------------------------------------------------

AM_INIT_AUTOMAKE([-Wall -Wno-portability-recursive -Werror foreign 1.12.3 subdir-objects])
Expand Down
2 changes: 2 additions & 0 deletions src/include/abtd_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
#include "abt_config.h"

#ifndef ABT_CONFIG_USE_FCONTEXT
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE
#endif
#include <ucontext.h>
#endif

Expand Down
4 changes: 2 additions & 2 deletions src/include/abtd_ythread.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

void ABTD_ythread_func_wrapper(void *p_arg);
#if ABT_CONFIG_THREAD_TYPE == ABT_THREAD_TYPE_DYNAMIC_PROMOTION
void ABTD_ythread_terminate_no_arg();
void ABTD_ythread_terminate_no_arg(void);
#endif

static inline void ABTD_ythread_context_create(ABTD_ythread_context *p_link,
Expand Down Expand Up @@ -114,7 +114,7 @@ static inline void
ABTD_ythread_context_dynamic_promote_ythread(void *p_stacktop)
{
union fp_conv {
void (*f)(void *);
void (*f)(void);
void *ptr;
} conv;
conv.f = ABTD_ythread_terminate_no_arg;
Expand Down
115 changes: 56 additions & 59 deletions src/info.c
Original file line number Diff line number Diff line change
Expand Up @@ -501,22 +501,7 @@ int ABT_info_trigger_print_all_thread_stacks(FILE *fp, double timeout,
/* Private APIs */
/*****************************************************************************/

struct info_print_unit_arg_t {
FILE *fp;
ABT_pool pool;
};

struct info_pool_set_t {
ABT_pool *pools;
size_t num;
size_t len;
};

ABTU_ret_err static inline int
info_initialize_pool_set(struct info_pool_set_t *p_set);
ABTU_ret_err static inline int info_add_pool_set(ABT_pool pool,
struct info_pool_set_t *p_set);
static inline void info_finalize_pool_set(struct info_pool_set_t *p_set);
ABTU_ret_err static int print_all_thread_stacks(FILE *fp);

#define PRINT_STACK_FLAG_UNSET 0
#define PRINT_STACK_FLAG_INITIALIZE 1
Expand Down Expand Up @@ -565,55 +550,17 @@ void ABTI_info_check_print_all_thread_stacks(void)
/* All the available ESs are (supposed to be) stopped. We *assume* that
* no ES is calling and will call Argobots functions except this
* function while printing stack information. */
int i, abt_errno;

FILE *fp = print_stack_fp;
if (force_print) {
fprintf(fp,
fprintf(print_stack_fp,
"ABT_info_trigger_print_all_thread_stacks: "
"timeout (only %d ESs stop)\n",
(int)ABTD_atomic_acquire_load_uint32(&print_stack_barrier));
}

struct info_pool_set_t pool_set;
abt_errno = info_initialize_pool_set(&pool_set);
if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS)
goto print_fail;
ABTI_xstream *p_xstream = gp_ABTI_global->p_xstream_head;
while (p_xstream) {
ABTI_sched *p_main_sched = p_xstream->p_main_sched;
fprintf(fp, "= xstream[%d] (%p) =\n", p_xstream->rank,
(void *)p_xstream);
fprintf(fp, "main_sched : %p\n", (void *)p_main_sched);
if (!p_main_sched)
continue;
for (i = 0; i < p_main_sched->num_pools; i++) {
ABT_pool pool = p_main_sched->pools[i];
ABTI_ASSERT(pool != ABT_POOL_NULL);
fprintf(fp, " pools[%d] : %p\n", i,
(void *)ABTI_pool_get_ptr(pool));
abt_errno = info_add_pool_set(pool, &pool_set);
if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
info_finalize_pool_set(&pool_set);
goto print_fail;
}
}
p_xstream = p_xstream->p_next;
}
for (i = 0; i < pool_set.num; i++) {
ABT_pool pool = pool_set.pools[i];
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
abt_errno = info_print_thread_stacks_in_pool(fp, p_pool);
if (abt_errno != ABT_SUCCESS)
fprintf(fp, " Failed to print (errno = %d).\n", abt_errno);
int abt_errno = print_all_thread_stacks(print_stack_fp);
if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
fprintf(print_stack_fp, "ABT_info_trigger_print_all_thread_stacks: "
"failed because of an internal error.\n");
}
info_finalize_pool_set(&pool_set);
goto print_exit;

print_fail:
fprintf(fp, "ABT_info_trigger_print_all_thread_stacks: "
"failed because of memory error.\n");
print_exit:
/* Release the lock that protects ES data. */
ABTI_spinlock_release(&gp_ABTI_global->xstream_list_lock);
if (print_cb_func)
Expand Down Expand Up @@ -707,6 +654,17 @@ void ABTI_info_print_config(FILE *fp)
/* Internal static functions */
/*****************************************************************************/

struct info_print_unit_arg_t {
FILE *fp;
ABT_pool pool;
};

struct info_pool_set_t {
ABT_pool *pools;
size_t num;
size_t len;
};

static void info_print_unit(void *arg, ABT_unit unit)
{
/* This function may not have any side effect on unit because it is passed
Expand Down Expand Up @@ -817,3 +775,42 @@ static void info_trigger_print_all_thread_stacks(
}
}
}

ABTU_ret_err static int print_all_thread_stacks(FILE *fp)
{
int i, abt_errno;
struct info_pool_set_t pool_set;

abt_errno = info_initialize_pool_set(&pool_set);
ABTI_CHECK_ERROR(abt_errno);
ABTI_xstream *p_xstream = gp_ABTI_global->p_xstream_head;
while (p_xstream) {
ABTI_sched *p_main_sched = p_xstream->p_main_sched;
fprintf(fp, "= xstream[%d] (%p) =\n", p_xstream->rank,
(void *)p_xstream);
fprintf(fp, "main_sched : %p\n", (void *)p_main_sched);
if (!p_main_sched)
continue;
for (i = 0; i < p_main_sched->num_pools; i++) {
ABT_pool pool = p_main_sched->pools[i];
ABTI_ASSERT(pool != ABT_POOL_NULL);
fprintf(fp, " pools[%d] : %p\n", i,
(void *)ABTI_pool_get_ptr(pool));
abt_errno = info_add_pool_set(pool, &pool_set);
if (ABTI_IS_ERROR_CHECK_ENABLED && abt_errno != ABT_SUCCESS) {
info_finalize_pool_set(&pool_set);
ABTI_HANDLE_ERROR(abt_errno);
}
}
p_xstream = p_xstream->p_next;
}
for (i = 0; i < pool_set.num; i++) {
ABT_pool pool = pool_set.pools[i];
ABTI_pool *p_pool = ABTI_pool_get_ptr(pool);
abt_errno = info_print_thread_stacks_in_pool(fp, p_pool);
if (abt_errno != ABT_SUCCESS)
fprintf(fp, " Failed to print (errno = %d).\n", abt_errno);
}
info_finalize_pool_set(&pool_set);
return ABT_SUCCESS;
}
2 changes: 1 addition & 1 deletion src/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ void ABTI_log_debug(FILE *fh, const char *format, ...)
return;
ABTI_local *p_local = ABTI_local_get_local_uninlined();

char *prefix_fmt = NULL, *prefix = NULL;
const char *prefix_fmt = NULL, *prefix = NULL;
char *newfmt;
uint64_t tid;
int rank;
Expand Down
2 changes: 1 addition & 1 deletion src/pool/pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ void ABTI_pool_print(ABTI_pool *p_pool, FILE *p_os, int indent)
if (p_pool == NULL) {
fprintf(p_os, "%*s== NULL POOL ==\n", indent, "");
} else {
char *access;
const char *access;

switch (p_pool->access) {
case ABT_POOL_ACCESS_PRIV:
Expand Down
2 changes: 1 addition & 1 deletion src/sched/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ void ABTI_sched_print(ABTI_sched *p_sched, FILE *p_os, int indent,
fprintf(p_os, "%*s== NULL SCHED ==\n", indent, "");
} else {
ABTI_sched_kind kind;
char *kind_str, *type, *used;
const char *kind_str, *type, *used;

kind = p_sched->kind;
if (kind == sched_get_kind(ABTI_sched_get_basic_def())) {
Expand Down
2 changes: 1 addition & 1 deletion src/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ void ABTI_xstream_print(ABTI_xstream *p_xstream, FILE *p_os, int indent,
if (p_xstream == NULL) {
fprintf(p_os, "%*s== NULL ES ==\n", indent, "");
} else {
char *type, *state;
const char *type, *state;
switch (p_xstream->type) {
case ABTI_XSTREAM_TYPE_PRIMARY:
type = "PRIMARY";
Expand Down
109 changes: 67 additions & 42 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ void ABTI_thread_print(ABTI_thread *p_thread, FILE *p_os, int indent)
} else {
ABTI_xstream *p_xstream = p_thread->p_last_xstream;
int xstream_rank = p_xstream ? p_xstream->rank : 0;
char *type, *yieldable, *state;
const char *type, *yieldable, *state;

if (p_thread->type & ABTI_THREAD_TYPE_MAIN) {
type = "MAIN";
Expand Down Expand Up @@ -1855,31 +1855,77 @@ static void thread_key_destructor_migration(void *p_value)
ABTU_free(p_mig_data);
}

static void thread_join_busywait(ABTI_thread *p_thread)
{
while (ABTD_atomic_acquire_load_int(&p_thread->state) !=
ABT_THREAD_STATE_TERMINATED) {
ABTD_atomic_pause();
}
ABTI_tool_event_thread_join(NULL, p_thread, NULL);
}

static void thread_join_yield_ythread(ABTI_xstream **pp_local_xstream,
ABTI_ythread *p_self,
ABTI_ythread *p_ythread)
{
while (ABTD_atomic_acquire_load_int(&p_ythread->thread.state) !=
ABT_THREAD_STATE_TERMINATED) {
ABTI_ythread_yield(pp_local_xstream, p_self,
ABT_SYNC_EVENT_TYPE_THREAD_JOIN, (void *)p_ythread);
}
ABTI_tool_event_thread_join(ABTI_xstream_get_local(*pp_local_xstream),
&p_ythread->thread, &p_self->thread);
}

static void thread_join_yield_task(ABTI_xstream **pp_local_xstream,
ABTI_ythread *p_self, ABTI_thread *p_task)
{
while (ABTD_atomic_acquire_load_int(&p_task->state) !=
ABT_THREAD_STATE_TERMINATED) {
ABTI_ythread_yield(pp_local_xstream, p_self,
ABT_SYNC_EVENT_TYPE_TASK_JOIN, (void *)p_task);
}
ABTI_tool_event_thread_join(ABTI_xstream_get_local(*pp_local_xstream),
p_task, &p_self->thread);
}

static inline void thread_join(ABTI_local **pp_local, ABTI_thread *p_thread)
{
if (ABTD_atomic_acquire_load_int(&p_thread->state) ==
ABT_THREAD_STATE_TERMINATED) {
goto fn_exit;
ABTI_tool_event_thread_join(*pp_local, p_thread,
ABTI_local_get_xstream_or_null(*pp_local)
? ABTI_local_get_xstream(*pp_local)
->p_thread
: NULL);
return;
}
/* The main ULT cannot be joined. */
ABTI_ASSERT(!(p_thread->type & ABTI_THREAD_TYPE_MAIN));

ABTI_xstream *p_local_xstream = ABTI_local_get_xstream_or_null(*pp_local);
if (ABTI_IS_EXT_THREAD_ENABLED && !p_local_xstream)
goto busywait_based;
if (ABTI_IS_EXT_THREAD_ENABLED && !p_local_xstream) {
thread_join_busywait(p_thread);
return;
}

ABTI_thread *p_self_thread = p_local_xstream->p_thread;

ABTI_ythread *p_self = ABTI_thread_get_ythread_or_null(p_self_thread);
if (!p_self)
goto busywait_based;
if (!p_self) {
thread_join_busywait(p_thread);
return;
}

/* The target ULT should be different. */
ABTI_ASSERT(p_thread != p_self_thread);

ABTI_ythread *p_ythread = ABTI_thread_get_ythread_or_null(p_thread);
if (!p_ythread)
goto yield_based_task;
if (!p_ythread) {
thread_join_yield_task(&p_local_xstream, p_self, p_thread);
*pp_local = ABTI_xstream_get_local(p_local_xstream);
return;
}

ABT_pool_access access = p_self->thread.p_pool->access;

Expand Down Expand Up @@ -1951,16 +1997,21 @@ static inline void thread_join(ABTI_local **pp_local, ABTI_thread *p_thread)
/* FIXME: once we change the suspend/resume mechanism (i.e., asking the
* scheduler to wake up the blocked ULT), we will be able to handle all
* access modes. */
goto yield_based;
thread_join_yield_ythread(&p_local_xstream, p_self, p_ythread);
*pp_local = ABTI_xstream_get_local(p_local_xstream);
return;

} else {
/* Tell p_ythread that there has been a join request. */
/* If request already has ABTI_THREAD_REQ_JOIN, p_ythread is
* terminating. We can't block p_self in this case. */
uint32_t req = ABTD_atomic_fetch_or_uint32(&p_ythread->thread.request,
ABTI_THREAD_REQ_JOIN);
if (req & ABTI_THREAD_REQ_JOIN)
goto yield_based;
if (req & ABTI_THREAD_REQ_JOIN) {
thread_join_yield_ythread(&p_local_xstream, p_self, p_ythread);
*pp_local = ABTI_xstream_get_local(p_local_xstream);
return;
}

ABTI_ythread_set_blocked(p_self);
LOG_DEBUG("[U%" PRIu64 ":E%d] blocked to join U%" PRIu64 "\n",
Expand Down Expand Up @@ -1995,39 +2046,13 @@ static inline void thread_join(ABTI_local **pp_local, ABTI_thread *p_thread)
LOG_DEBUG("[U%" PRIu64 ":E%d] resume after join\n",
ABTI_thread_get_id(&p_self->thread),
p_self->thread.p_last_xstream->rank);
goto fn_exit;
}

yield_based:
while (ABTD_atomic_acquire_load_int(&p_ythread->thread.state) !=
ABT_THREAD_STATE_TERMINATED) {
ABTI_ythread_yield(&p_local_xstream, p_self,
ABT_SYNC_EVENT_TYPE_THREAD_JOIN, (void *)p_ythread);
*pp_local = ABTI_xstream_get_local(p_local_xstream);
}
goto fn_exit;

yield_based_task:
while (ABTD_atomic_acquire_load_int(&p_thread->state) !=
ABT_THREAD_STATE_TERMINATED) {
ABTI_ythread_yield(&p_local_xstream, p_self,
ABT_SYNC_EVENT_TYPE_TASK_JOIN, (void *)p_thread);
ABTI_tool_event_thread_join(*pp_local, p_thread, &p_self->thread);
} else {
/* Use a yield-based method. */
thread_join_yield_ythread(&p_local_xstream, p_self, p_ythread);
*pp_local = ABTI_xstream_get_local(p_local_xstream);
return;
}
goto fn_exit;

busywait_based:
while (ABTD_atomic_acquire_load_int(&p_thread->state) !=
ABT_THREAD_STATE_TERMINATED) {
ABTD_atomic_pause();
}

fn_exit:
ABTI_tool_event_thread_join(*pp_local, p_thread,
ABTI_local_get_xstream_or_null(*pp_local)
? ABTI_local_get_xstream(*pp_local)
->p_thread
: NULL);
}

static void thread_root_func(void *arg)
Expand Down
2 changes: 1 addition & 1 deletion src/thread_attr.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ void ABTI_thread_attr_print(ABTI_thread_attr *p_attr, FILE *p_os, int indent)
if (p_attr == NULL) {
fprintf(p_os, "%*sULT attr: [NULL ATTR]\n", indent, "");
} else {
char *stacktype;
const char *stacktype;
if (p_attr->thread_type & ABTI_THREAD_TYPE_MEM_MEMPOOL_DESC) {
stacktype = "MEMPOOL_DESC";
} else if (p_attr->thread_type & ABTI_THREAD_TYPE_MEM_MALLOC_DESC) {
Expand Down

0 comments on commit a9937e2

Please sign in to comment.