From 128d23936585a74a5773ab33491581f2d2d341ae Mon Sep 17 00:00:00 2001 From: Huihuo Zheng Date: Fri, 15 Sep 2023 15:10:33 -0500 Subject: [PATCH] recent changes on merging --- benchmarks/write_cache.cpp | 7 +- spack/package.py | 1 + src/H5LS.c | 2 + src/H5LS.h | 1 + src/H5VLcache_ext.c | 131 +++++++++++++++++++++++++------------ 5 files changed, 98 insertions(+), 44 deletions(-) diff --git a/benchmarks/write_cache.cpp b/benchmarks/write_cache.cpp index 753d97f..596703d 100644 --- a/benchmarks/write_cache.cpp +++ b/benchmarks/write_cache.cpp @@ -166,8 +166,10 @@ int main(int argc, char **argv) { tt.start_clock("H5Fcreate"); hid_t file_id = H5Fcreate(f, H5F_ACC_TRUNC, H5P_DEFAULT, plist_id); tt.stop_clock("H5Fcreate"); - if (async_close) + if (async_close) { + if (debug_level() > 1 && rank == 0) printf("set close calls to be async\n"); H5Fcache_async_close_set(file_id); + } hid_t *dset_id = new hid_t[nvars]; hid_t *filespace = new hid_t[nvars]; for (int it = 0; it < niter; it++) { @@ -255,13 +257,12 @@ int main(int argc, char **argv) { if (rank == 0 and debug_level() > 1) printf("start async jobs execution\n"); #endif - H5Fcache_async_op_start(file_id); tt.start_clock("barrier"); if (barrier) MPI_Barrier(MPI_COMM_WORLD); tt.stop_clock("barrier"); tt.start_clock("close"); - + H5Fcache_async_op_start(file_id); for (int i = 0; i < nvars; i++) { tt.start_clock("H5Dclose"); H5Dclose(dset_id[i]); diff --git a/spack/package.py b/spack/package.py index a23146c..126c946 100644 --- a/spack/package.py +++ b/spack/package.py @@ -9,6 +9,7 @@ class Hdf5volcache(CMakePackage): maintainers = ['zhenghh04'] version('develop', branch='develop') + version('1.2', tag='v1.2') version('1.1', tag='v1.1') version('1.0', tag='v1.0') diff --git a/src/H5LS.c b/src/H5LS.c index 4a65fbc..c6d5905 100644 --- a/src/H5LS.c +++ b/src/H5LS.c @@ -134,6 +134,8 @@ herr_t readLSConf(char *fname, cache_storage_t *LS) { fprintf(stderr, " [CACHE VOL] **ERROR: cache configure file %s does not exist.\n", fname); + MPI_Barrier(MPI_COMM_WORLD); + MPI_Finalize(); exit(100); } FILE *file = fopen(fname, "r"); diff --git a/src/H5LS.h b/src/H5LS.h index 1a0e557..7a952db 100644 --- a/src/H5LS.h +++ b/src/H5LS.h @@ -71,6 +71,7 @@ typedef struct _task_data_t { hsize_t offset; // offset in memory mapped file on SSD hsize_t size; void **buf; + struct _task_data_t *previous; struct _task_data_t *next; } task_data_t; #else diff --git a/src/H5VLcache_ext.c b/src/H5VLcache_ext.c index 78a4a65..43b79a7 100644 --- a/src/H5VLcache_ext.c +++ b/src/H5VLcache_ext.c @@ -77,7 +77,9 @@ #define H5_REQUEST_NULL NULL #endif +#ifndef INF #define INF UINT64_MAX +#endif #ifndef STDERR #ifdef __APPLE__ @@ -408,8 +410,9 @@ static void *write_data_to_local_storage(void *dset, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t plist_id, const void *buf, void **req); -// currently because read and write are not unified, I have to use two different -// functions of write_data_to_cache +// Currently because read and write are not unified for local storage, I have to use two different +// functions of write_data_to_cache. For global storage, the two functions will be +// the same. static void *write_data_to_local_storage2(void *dset, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t plist_id, @@ -433,9 +436,6 @@ static void *write_data_to_global_storage(void *dset, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t plist_id, const void *buf, void **req); -// currently because read and write are not unified, I have to use two different -// functions of write_data_to_cache - static herr_t read_data_from_global_storage(void *dset, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t plist_id, @@ -662,7 +662,7 @@ typedef struct group_args_t { hid_t dxpl_id; } group_args_t; -/* Get the cache_storage_t object for current VOL layer based on info object */ +/* Get the cache_storage_t object for current VOL layer from the info object */ static cache_storage_t *get_cache_storage_obj(H5VL_cache_ext_info_t *info) { H5LS_stack_t *p = H5LS_stack; while (strcmp(p->fconfig, info->fconfig) && (p != NULL)) { @@ -693,19 +693,30 @@ static void LOG(int rank, const char *str) { #endif } +/* + This is to set close calls to be async by setting CLOSE_ASYNC = 1. + By default all the async calls are synchronous. +*/ +//=============================================================================== herr_t set_close_async(hbool_t t) { CLOSE_ASYNC = t; return 0; } + +hbool_t get_close_async() { + return CLOSE_ASYNC; +} + +/* waiting for an async close to finish */ static herr_t async_close_task_wait(object_close_task_t *task) { H5VL_cache_ext_t *o = (H5VL_cache_ext_t *)task->obj; H5VL_request_status_t status; H5VL_class_value_t under_value; H5VLget_value(o->under_vol_id, &under_value); - if (under_value != H5VL_ASYNC_VALUE) { + if (under_value != H5VL_ASYNC_VALUE || get_close_async()) { if (RANK == io_node()) - printf(" [CACHE VOL] **WARNING: Do not have Async VOL underneath it. " - "Close is not async.\n"); + printf(" [CACHE VOL] **WARNING: Do not have Async VOL underneath it " + "or close is not async.\n"); return 0; } #ifndef NDEBUG @@ -738,27 +749,40 @@ static herr_t async_close_task_wait(object_close_task_t *task) { return 0; } +// start all the async close files herr_t async_close_start() { herr_t ret = SUCCEED; - object_close_task_t *p = (object_close_task_t *)async_close_task_current; - if (p == NULL) - return ret; - while (p->next != NULL) { - if (p->req != NULL) - ret = H5async_start(p->req); - p = p->next; + if (get_async_close()==1) { + object_close_task_t *p = (object_close_task_t *)async_close_task_current; + if (p == NULL) + return ret; + while (p->next != NULL) { + if (p->req != NULL) + ret = H5async_start(p->req); + p = p->next; + } } return ret; } +// wait for all the task to finish herr_t async_close_wait() { - while (async_close_task_current->next != NULL) { - async_close_task_wait(async_close_task_current); - async_close_task_current = async_close_task_current->next; + if (get_async_close()==1) { + while (async_close_task_current->next != NULL) { + async_close_task_wait(async_close_task_current); + async_close_task_current = async_close_task_current->next; + } } return SUCCEED; } +// utils functions +hsize_t round_page(hsize_t s) { + if (s % PAGESIZE == 0 || s < PAGESIZE) + return s; + return (s / PAGESIZE + 1) * PAGESIZE; +} + /*------------------------------------------------------------------------- * Function: H5VL_cache_ext_new_obj * @@ -854,8 +878,18 @@ static herr_t H5VL_cache_ext_init(hid_t vipl_id) { int called = 0; MPI_Initialized(&called); if (called == 1) { + int provided = 0; + MPI_Query_thread(&provided); MPI_Comm_size(MPI_COMM_WORLD, &NPROC); MPI_Comm_rank(MPI_COMM_WORLD, &RANK); + if (provided != MPI_THREAD_MULTIPLE) { + if (RANK == io_node()) { + printf(" [CACHE_VOL] ERROR: cache VOL requires MPI to " + " be initialized with MPI_THREAD_MULTIPLE. " + " Please use MPI_Init_thread\n"); + } + MPI_Abort(MPI_COMM_WORLD, 1); + } } else { int provided = 0; MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided); @@ -1197,6 +1231,9 @@ static herr_t H5VL_cache_ext_info_to_str(const void *_info, char **str) { return 0; } /* end H5VL_cache_ext_info_to_str() */ +/* + This is to get the info object for native vol. +*/ static herr_t native_vol_info(void **_info) { const char *str = "under_vol=0;under_vol_info={}"; H5VL_cache_ext_info_t *info; @@ -1835,10 +1872,13 @@ static hid_t dataset_get_dapl(void *dset, hid_t driver_id, hid_t dxpl_id, return vol_cb_args.args.get_dapl.dapl_id; } +/* So far this does not work */ static hid_t group_get_gapl(void *group, hid_t driver_id, hid_t dxpl_id, void **req) { H5VL_dataset_get_args_t vol_cb_args; - + if (RANK==io_node()) + printf(" [CACHE_VOL] **WARNING geting gapl from the group object \n" + " is not implemented yet, returnning H5P_DEFAULT\n"); /* Set up VOL callback arguments */ // vol_cb_args.op_type = H5VL_GROUP_GET_GAPL; // vol_cb_args.args.get_dapl.dapl_id = H5I_INVALID_HID; @@ -1916,12 +1956,6 @@ static void *H5VL_cache_ext_dataset_create(void *obj, return (void *)dset; } /* end H5VL_cache_ext_dataset_create() */ -hsize_t round_page(hsize_t s) { - if (s % PAGESIZE == 0 || s < PAGESIZE) - return s; - return (s / PAGESIZE + 1) * PAGESIZE; -} - static herr_t H5VL_cache_ext_dataset_mmap_remap(void *obj) { H5VL_cache_ext_t *dset = (H5VL_cache_ext_t *)obj; // created a memory mapped file on the local storage. And create a MPI_win @@ -2749,6 +2783,7 @@ void create_task_place_holder(void **request_list) { t->next->req = NULL; t->next->id = t->id + 1; *request_list = t->next; + ((task_data_t *)*request_list)->previous = t; } #if H5_VERSION_GE(1, 13, 3) @@ -3484,6 +3519,9 @@ static herr_t H5VL_cache_ext_dataset_close(void *dset, hid_t dxpl_id, #endif if (p->async_close && o->write_cache) { double t0 = MPI_Wtime(); + void *write_req =((task_data_t *) o->H5DWMM->io->request_list->previous)->req; + if (write_req==NULL && RANK == io_node() && log_level() > 0) + printf(" [CACHE VOL] previous req NULL\n"); #if H5_VERSION_GE(1, 13, 3) if (o->H5DWMM->io->num_fusion_requests > 0) { merge_tasks_in_queue(&o->H5DWMM->io->flush_request, @@ -3492,27 +3530,34 @@ static herr_t H5VL_cache_ext_dataset_close(void *dset, hid_t dxpl_id, o->H5DWMM->io->flush_request, req); // flush data for current task; o->H5DWMM->io->num_fusion_requests = 0; o->H5DWMM->io->fusion_data_size = 0.0; + write_req = o->H5DWMM->io->flush_request->req; o->H5DWMM->io->flush_request = o->H5DWMM->io->flush_request->next; } #endif + if (write_req==NULL && RANK == io_node() && log_level() > 0) + printf(" [CACHE VOL] previous req NULL\n"); p->async_close_task_list->next = (object_close_task_t *)malloc(sizeof(object_close_task_t)); p->async_close_task_list->type = DATASET_CLOSE; p->async_close_task_list->req = NULL; p->async_close_task_list->obj = dset; - if (p->async_pause) - H5Pset_dxpl_pause(dxpl_id, true); + //if (p->async_pause) + // H5Pset_dxpl_pause(dxpl_id, p->async_pause); double tt0 = MPI_Wtime(); + void **tt; ret_value = H5VLdataset_close(o->under_object, o->under_vol_id, dxpl_id, &p->async_close_task_list->req); + //assert(p->async_close_task_list->req!=NULL); double tt1 = MPI_Wtime(); - // H5Pset_dxpl_pause(dxpl_id, false); - // Importance to add dependence here to make sure that the merged dataset - // writes happened before dataset close. - // if (o->H5DWMM->io->flush_request->req !=NULL && - // o->H5LS->fusion_threshold >0.0) - // H5VL_async_set_request_dep(p->async_close_task_list->req, - // o->H5DWMM->io->flush_request->req); +/* + if (write_req !=NULL) { + printf(" set dependenace....\n"); + H5VL_async_set_request_dep(p->async_close_task_list->req, + write_req); + } else { + printf(" NULL write request ....\n"); + } + */ p->async_close_task_list = p->async_close_task_list->next; p->async_close_task_list->next = NULL; double t1 = MPI_Wtime(); @@ -4305,14 +4350,17 @@ static herr_t H5VL_cache_ext_file_optional(void *file, H5async_start(p->req); p = p->next; } - /* if (o->async_close) { - object_close_task_t *p = (object_close_task_t - *)o->async_close_task_current; while (p!=NULL && p->req!= NULL) { + object_close_task_t *p = (object_close_task_t *)o->async_close_task_current; +#ifndef NDEBUG + if (o->H5DWMM->mpi->rank == io_node() && debug_level() > 0) + printf(" [CACHE VOL] starting async close task\n"); +#endif + while (p!=NULL && p->req!= NULL) { H5async_start(p->req); p = p->next; } - } */ + } } ret_value = SUCCEED; } else if (args->op_type == H5VL_cache_file_cache_async_close_set_op_g) { @@ -4323,6 +4371,8 @@ static herr_t H5VL_cache_ext_file_optional(void *file, o->async_close = true; o->async_close_task_list = (object_close_task_t *)malloc(sizeof(object_close_task_t)); + o->async_close_task_list->req = NULL; + o->async_close_task_list->obj = NULL; o->async_close_task_list->next = NULL; o->async_close_task_current = o->async_close_task_list; o->async_close_task_head = o->async_close_task_list; @@ -4340,7 +4390,7 @@ static herr_t H5VL_cache_ext_file_optional(void *file, if (RANK == io_node() && debug_level() > 1) printf(" [CACHE VOL] async close wait done\n"); #endif - while (o->async_close_task_current->next != NULL) { + while (o->async_close_task_current->next!= NULL) { async_close_task_wait(o->async_close_task_current); o->async_close_task_current = o->async_close_task_current->next; } @@ -4671,7 +4721,6 @@ static herr_t H5VL_cache_ext_group_close(void *grp, hid_t dxpl_id, void **req) { ret_value = H5VLgroup_close(o->under_object, o->under_vol_id, dxpl_id, &p->async_close_task_list->req); // H5Pset_dxpl_pause(dxpl_id, false); - // H5async_start(p->async_close_task_list->req); p->async_close_task_list = p->async_close_task_list->next; p->async_close_task_list->next = NULL;