Skip to content

Commit

Permalink
recent changes on merging
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghh04 committed Sep 15, 2023
1 parent fec6537 commit 128d239
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 44 deletions.
7 changes: 4 additions & 3 deletions benchmarks/write_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,10 @@ int main(int argc, char **argv) {
tt.start_clock("H5Fcreate");
hid_t file_id = H5Fcreate(f, H5F_ACC_TRUNC, H5P_DEFAULT, plist_id);
tt.stop_clock("H5Fcreate");
if (async_close)
if (async_close) {
if (debug_level() > 1 && rank == 0) printf("set close calls to be async\n");
H5Fcache_async_close_set(file_id);
}
hid_t *dset_id = new hid_t[nvars];
hid_t *filespace = new hid_t[nvars];
for (int it = 0; it < niter; it++) {
Expand Down Expand Up @@ -255,13 +257,12 @@ int main(int argc, char **argv) {
if (rank == 0 and debug_level() > 1)
printf("start async jobs execution\n");
#endif
H5Fcache_async_op_start(file_id);
tt.start_clock("barrier");
if (barrier)
MPI_Barrier(MPI_COMM_WORLD);
tt.stop_clock("barrier");
tt.start_clock("close");

H5Fcache_async_op_start(file_id);
for (int i = 0; i < nvars; i++) {
tt.start_clock("H5Dclose");
H5Dclose(dset_id[i]);
Expand Down
1 change: 1 addition & 0 deletions spack/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class Hdf5volcache(CMakePackage):
maintainers = ['zhenghh04']

version('develop', branch='develop')
version('1.2', tag='v1.2')
version('1.1', tag='v1.1')
version('1.0', tag='v1.0')

Expand Down
2 changes: 2 additions & 0 deletions src/H5LS.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ herr_t readLSConf(char *fname, cache_storage_t *LS) {
fprintf(stderr,
" [CACHE VOL] **ERROR: cache configure file %s does not exist.\n",
fname);
MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
exit(100);
}
FILE *file = fopen(fname, "r");
Expand Down
1 change: 1 addition & 0 deletions src/H5LS.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ typedef struct _task_data_t {
hsize_t offset; // offset in memory mapped file on SSD
hsize_t size;
void **buf;
struct _task_data_t *previous;
struct _task_data_t *next;
} task_data_t;
#else
Expand Down
131 changes: 90 additions & 41 deletions src/H5VLcache_ext.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@
#define H5_REQUEST_NULL NULL
#endif

#ifndef INF
#define INF UINT64_MAX
#endif

#ifndef STDERR
#ifdef __APPLE__
Expand Down Expand Up @@ -408,8 +410,9 @@ static void *write_data_to_local_storage(void *dset, hid_t mem_type_id,
hid_t mem_space_id,
hid_t file_space_id, hid_t plist_id,
const void *buf, void **req);
// currently because read and write are not unified, I have to use two different
// functions of write_data_to_cache
// Currently because read and write are not unified for local storage, I have to use two different
// functions of write_data_to_cache. For global storage, the two functions will be
// the same.
static void *write_data_to_local_storage2(void *dset, hid_t mem_type_id,
hid_t mem_space_id,
hid_t file_space_id, hid_t plist_id,
Expand All @@ -433,9 +436,6 @@ static void *write_data_to_global_storage(void *dset, hid_t mem_type_id,
hid_t mem_space_id,
hid_t file_space_id, hid_t plist_id,
const void *buf, void **req);
// currently because read and write are not unified, I have to use two different
// functions of write_data_to_cache

static herr_t read_data_from_global_storage(void *dset, hid_t mem_type_id,
hid_t mem_space_id,
hid_t file_space_id, hid_t plist_id,
Expand Down Expand Up @@ -662,7 +662,7 @@ typedef struct group_args_t {
hid_t dxpl_id;
} group_args_t;

/* Get the cache_storage_t object for current VOL layer based on info object */
/* Get the cache_storage_t object for current VOL layer from the info object */
static cache_storage_t *get_cache_storage_obj(H5VL_cache_ext_info_t *info) {
H5LS_stack_t *p = H5LS_stack;
while (strcmp(p->fconfig, info->fconfig) && (p != NULL)) {
Expand Down Expand Up @@ -693,19 +693,30 @@ static void LOG(int rank, const char *str) {
#endif
}

/*
This is to set close calls to be async by setting CLOSE_ASYNC = 1.
By default all the async calls are synchronous.
*/
//===============================================================================
herr_t set_close_async(hbool_t t) {
CLOSE_ASYNC = t;
return 0;
}

hbool_t get_close_async() {
return CLOSE_ASYNC;
}

/* waiting for an async close to finish */
static herr_t async_close_task_wait(object_close_task_t *task) {
H5VL_cache_ext_t *o = (H5VL_cache_ext_t *)task->obj;
H5VL_request_status_t status;
H5VL_class_value_t under_value;
H5VLget_value(o->under_vol_id, &under_value);
if (under_value != H5VL_ASYNC_VALUE) {
if (under_value != H5VL_ASYNC_VALUE || get_close_async()) {
if (RANK == io_node())
printf(" [CACHE VOL] **WARNING: Do not have Async VOL underneath it. "
"Close is not async.\n");
printf(" [CACHE VOL] **WARNING: Do not have Async VOL underneath it "
"or close is not async.\n");
return 0;
}
#ifndef NDEBUG
Expand Down Expand Up @@ -738,27 +749,40 @@ static herr_t async_close_task_wait(object_close_task_t *task) {
return 0;
}

// start all the async close files
herr_t async_close_start() {
herr_t ret = SUCCEED;
object_close_task_t *p = (object_close_task_t *)async_close_task_current;
if (p == NULL)
return ret;
while (p->next != NULL) {
if (p->req != NULL)
ret = H5async_start(p->req);
p = p->next;
if (get_async_close()==1) {
object_close_task_t *p = (object_close_task_t *)async_close_task_current;
if (p == NULL)
return ret;
while (p->next != NULL) {
if (p->req != NULL)
ret = H5async_start(p->req);
p = p->next;
}
}
return ret;
}

// wait for all the task to finish
herr_t async_close_wait() {
while (async_close_task_current->next != NULL) {
async_close_task_wait(async_close_task_current);
async_close_task_current = async_close_task_current->next;
if (get_async_close()==1) {
while (async_close_task_current->next != NULL) {
async_close_task_wait(async_close_task_current);
async_close_task_current = async_close_task_current->next;
}
}
return SUCCEED;
}

// utils functions
hsize_t round_page(hsize_t s) {
if (s % PAGESIZE == 0 || s < PAGESIZE)
return s;
return (s / PAGESIZE + 1) * PAGESIZE;
}

/*-------------------------------------------------------------------------
* Function: H5VL_cache_ext_new_obj
*
Expand Down Expand Up @@ -854,8 +878,18 @@ static herr_t H5VL_cache_ext_init(hid_t vipl_id) {
int called = 0;
MPI_Initialized(&called);
if (called == 1) {
int provided = 0;
MPI_Query_thread(&provided);
MPI_Comm_size(MPI_COMM_WORLD, &NPROC);
MPI_Comm_rank(MPI_COMM_WORLD, &RANK);
if (provided != MPI_THREAD_MULTIPLE) {
if (RANK == io_node()) {
printf(" [CACHE_VOL] ERROR: cache VOL requires MPI to "
" be initialized with MPI_THREAD_MULTIPLE. "
" Please use MPI_Init_thread\n");
}
MPI_Abort(MPI_COMM_WORLD, 1);
}
} else {
int provided = 0;
MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided);
Expand Down Expand Up @@ -1197,6 +1231,9 @@ static herr_t H5VL_cache_ext_info_to_str(const void *_info, char **str) {
return 0;
} /* end H5VL_cache_ext_info_to_str() */

/*
This is to get the info object for native vol.
*/
static herr_t native_vol_info(void **_info) {
const char *str = "under_vol=0;under_vol_info={}";
H5VL_cache_ext_info_t *info;
Expand Down Expand Up @@ -1835,10 +1872,13 @@ static hid_t dataset_get_dapl(void *dset, hid_t driver_id, hid_t dxpl_id,
return vol_cb_args.args.get_dapl.dapl_id;
}

/* So far this does not work */
static hid_t group_get_gapl(void *group, hid_t driver_id, hid_t dxpl_id,
void **req) {
H5VL_dataset_get_args_t vol_cb_args;

if (RANK==io_node())
printf(" [CACHE_VOL] **WARNING geting gapl from the group object \n"
" is not implemented yet, returnning H5P_DEFAULT\n");
/* Set up VOL callback arguments */
// vol_cb_args.op_type = H5VL_GROUP_GET_GAPL;
// vol_cb_args.args.get_dapl.dapl_id = H5I_INVALID_HID;
Expand Down Expand Up @@ -1916,12 +1956,6 @@ static void *H5VL_cache_ext_dataset_create(void *obj,
return (void *)dset;
} /* end H5VL_cache_ext_dataset_create() */

hsize_t round_page(hsize_t s) {
if (s % PAGESIZE == 0 || s < PAGESIZE)
return s;
return (s / PAGESIZE + 1) * PAGESIZE;
}

static herr_t H5VL_cache_ext_dataset_mmap_remap(void *obj) {
H5VL_cache_ext_t *dset = (H5VL_cache_ext_t *)obj;
// created a memory mapped file on the local storage. And create a MPI_win
Expand Down Expand Up @@ -2749,6 +2783,7 @@ void create_task_place_holder(void **request_list) {
t->next->req = NULL;
t->next->id = t->id + 1;
*request_list = t->next;
((task_data_t *)*request_list)->previous = t;
}

#if H5_VERSION_GE(1, 13, 3)
Expand Down Expand Up @@ -3484,6 +3519,9 @@ static herr_t H5VL_cache_ext_dataset_close(void *dset, hid_t dxpl_id,
#endif
if (p->async_close && o->write_cache) {
double t0 = MPI_Wtime();
void *write_req =((task_data_t *) o->H5DWMM->io->request_list->previous)->req;
if (write_req==NULL && RANK == io_node() && log_level() > 0)
printf(" [CACHE VOL] previous req NULL\n");
#if H5_VERSION_GE(1, 13, 3)
if (o->H5DWMM->io->num_fusion_requests > 0) {
merge_tasks_in_queue(&o->H5DWMM->io->flush_request,
Expand All @@ -3492,27 +3530,34 @@ static herr_t H5VL_cache_ext_dataset_close(void *dset, hid_t dxpl_id,
o->H5DWMM->io->flush_request, req); // flush data for current task;
o->H5DWMM->io->num_fusion_requests = 0;
o->H5DWMM->io->fusion_data_size = 0.0;
write_req = o->H5DWMM->io->flush_request->req;
o->H5DWMM->io->flush_request = o->H5DWMM->io->flush_request->next;
}
#endif
if (write_req==NULL && RANK == io_node() && log_level() > 0)
printf(" [CACHE VOL] previous req NULL\n");
p->async_close_task_list->next =
(object_close_task_t *)malloc(sizeof(object_close_task_t));
p->async_close_task_list->type = DATASET_CLOSE;
p->async_close_task_list->req = NULL;
p->async_close_task_list->obj = dset;
if (p->async_pause)
H5Pset_dxpl_pause(dxpl_id, true);
//if (p->async_pause)
// H5Pset_dxpl_pause(dxpl_id, p->async_pause);
double tt0 = MPI_Wtime();
void **tt;
ret_value = H5VLdataset_close(o->under_object, o->under_vol_id, dxpl_id,
&p->async_close_task_list->req);
//assert(p->async_close_task_list->req!=NULL);
double tt1 = MPI_Wtime();
// H5Pset_dxpl_pause(dxpl_id, false);
// Importance to add dependence here to make sure that the merged dataset
// writes happened before dataset close.
// if (o->H5DWMM->io->flush_request->req !=NULL &&
// o->H5LS->fusion_threshold >0.0)
// H5VL_async_set_request_dep(p->async_close_task_list->req,
// o->H5DWMM->io->flush_request->req);
/*
if (write_req !=NULL) {
printf(" set dependenace....\n");
H5VL_async_set_request_dep(p->async_close_task_list->req,
write_req);
} else {
printf(" NULL write request ....\n");
}
*/
p->async_close_task_list = p->async_close_task_list->next;
p->async_close_task_list->next = NULL;
double t1 = MPI_Wtime();
Expand Down Expand Up @@ -4305,14 +4350,17 @@ static herr_t H5VL_cache_ext_file_optional(void *file,
H5async_start(p->req);
p = p->next;
}
/*
if (o->async_close) {
object_close_task_t *p = (object_close_task_t
*)o->async_close_task_current; while (p!=NULL && p->req!= NULL) {
object_close_task_t *p = (object_close_task_t *)o->async_close_task_current;
#ifndef NDEBUG
if (o->H5DWMM->mpi->rank == io_node() && debug_level() > 0)
printf(" [CACHE VOL] starting async close task\n");
#endif
while (p!=NULL && p->req!= NULL) {
H5async_start(p->req);
p = p->next;
}
} */
}
}
ret_value = SUCCEED;
} else if (args->op_type == H5VL_cache_file_cache_async_close_set_op_g) {
Expand All @@ -4323,6 +4371,8 @@ static herr_t H5VL_cache_ext_file_optional(void *file,
o->async_close = true;
o->async_close_task_list =
(object_close_task_t *)malloc(sizeof(object_close_task_t));
o->async_close_task_list->req = NULL;
o->async_close_task_list->obj = NULL;
o->async_close_task_list->next = NULL;
o->async_close_task_current = o->async_close_task_list;
o->async_close_task_head = o->async_close_task_list;
Expand All @@ -4340,7 +4390,7 @@ static herr_t H5VL_cache_ext_file_optional(void *file,
if (RANK == io_node() && debug_level() > 1)
printf(" [CACHE VOL] async close wait done\n");
#endif
while (o->async_close_task_current->next != NULL) {
while (o->async_close_task_current->next!= NULL) {
async_close_task_wait(o->async_close_task_current);
o->async_close_task_current = o->async_close_task_current->next;
}
Expand Down Expand Up @@ -4671,7 +4721,6 @@ static herr_t H5VL_cache_ext_group_close(void *grp, hid_t dxpl_id, void **req) {
ret_value = H5VLgroup_close(o->under_object, o->under_vol_id, dxpl_id,
&p->async_close_task_list->req);
// H5Pset_dxpl_pause(dxpl_id, false);

// H5async_start(p->async_close_task_list->req);
p->async_close_task_list = p->async_close_task_list->next;
p->async_close_task_list->next = NULL;
Expand Down

0 comments on commit 128d239

Please sign in to comment.