From e83225f7fb21968c422bfc01d2f8680d6b094092 Mon Sep 17 00:00:00 2001 From: Huihuo Zheng Date: Wed, 27 Sep 2023 17:50:31 -0500 Subject: [PATCH] clear up P S that are not closed --- src/H5VLcache_ext.c | 127 +++++++++++++++++++++++++--------- tests/test_local_ssd_async.sh | 2 +- 2 files changed, 97 insertions(+), 32 deletions(-) diff --git a/src/H5VLcache_ext.c b/src/H5VLcache_ext.c index 7e33f3e..e78ff31 100644 --- a/src/H5VLcache_ext.c +++ b/src/H5VLcache_ext.c @@ -103,10 +103,35 @@ herr_t H5async_start(void *request); herr_t H5async_stop(void *request); herr_t H5VL_async_pause(); herr_t H5VL_async_start(); + +#define H5Pcopy(X) \ + H5Pcopy(X); \ + LOG_DEBUG(-1, "H5Pcopy called: %s:%d %s\n", __FILE__, __LINE__, __FUNCTION__); + +#define H5Scopy(X) \ + H5Scopy(X); \ + LOG_DEBUG(-1, "H5Scopy called: %s:%d %s\n", __FILE__, __LINE__, __FUNCTION__); +#define H5Screate_simple(...) \ + H5Screate_simple(__VA_ARGS__); \ + LOG_DEBUG(-1, "H5Screate_simple called: %s:%d %s\n", __FILE__, __LINE__, __FUNCTION__); + +#define H5Sclose(X) \ + H5Sclose(X); \ + LOG_DEBUG(-1, "H5Sclose called: %s:%d %s\n", __FILE__, __LINE__, __FUNCTION__); + +#define H5Pcreate(X) \ + H5Pcreate(X); \ + LOG_DEBUG(-1, "H5Pcreate called %s:%d %s\n", __FILE__, __LINE__, __FUNCTION__); + +#define H5Pclose(X) \ + H5Pclose(X); \ + LOG_DEBUG(-1, "H5Pclose called %s:%d %s\n", __FILE__, __LINE__, __FUNCTION__); + /************/ /* Typedefs */ /************/ + /* The cache VOL wrapper context */ typedef struct H5VL_cache_ext_wrap_ctx_t { hid_t under_vol_id; /* VOL ID for under VOL */ @@ -758,7 +783,7 @@ static herr_t async_close_task_wait(object_close_task_t *task) { #endif return 0; } - if (get_close_async() == 0) { + if (o->async_close) { #ifndef NDEBUG LOG_WARN(-1, "Close call is not async; will do nothing"); #endif @@ -1058,7 +1083,7 @@ static herr_t H5VL_cache_ext_init(hid_t vipl_id) { H5LS_stack->next = NULL; if (!getenv("ABT_THREAD_STACKSIZE")) setenv("ABT_THREAD_STACKSIZE", "100000", 1); - setenv("HDF5_ASYNC_DISABLE_IMPLICIT_NON_DSET_RW", "1", 1); + //setenv("HDF5_ASYNC_DISABLE_IMPLICIT_NON_DSET_RW", "1", 1); // async_close_task_list = (object_close_task_t *) // malloc(sizeof(object_close_task_t)); async_close_task_list->next = NULL; // async_close_task_current = async_close_task_list; @@ -2102,6 +2127,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch_async(void *obj, hid_t fspace, &dset->H5DRMM->dset.h5_datatype, &mspace, &fs_cpy, plist_id, &ptr, &r->req); r = r->next; + H5Sclose(fs_cpy); } if (dset->H5DRMM->dset.ns_loc % nsample_per_block != 0) { hid_t fs_cpy = H5Scopy(fspace); @@ -2187,6 +2213,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch_async(void *obj, hid_t fspace, fs_cpy, plist_id, &p[offset], &r->req); r = r->next; + H5Sclose(fs_cpy); } if (dset->H5DRMM->dset.ns_loc % nsample_per_block != 0) { hid_t fs_cpy = H5Scopy(fspace); @@ -2208,6 +2235,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch_async(void *obj, hid_t fspace, fs_cpy, plist_id, &p[offset], &r->req); nblock = nblock + 1; r = r->next; + H5Sclose(fs_cpy); } free(samples); } @@ -2270,7 +2298,7 @@ static void *H5VL_cache_ext_dataset_open(void *obj, req); H5Pclose(args->lcpl_id); H5Tclose(args->type_id); - H5Pclose(args->space_id); + H5Sclose(args->space_id); H5Pclose(args->dcpl_id); H5Pclose(args->dapl_id); H5Pclose(args->dxpl_id); @@ -2381,6 +2409,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch(void *obj, hid_t fspace, dset->H5DRMM->dset.h5_datatype, mspace, fs_cpy, plist_id, &p[offset], NULL); #endif + H5Sclose(fs_cpy); } if (dset->H5DRMM->dset.ns_loc % nsample_per_block != 0) { hid_t fs_cpy = H5Scopy(fspace); @@ -2399,6 +2428,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch(void *obj, hid_t fspace, &dset->H5DRMM->dset.h5_datatype, &mspace, &fs_cpy, plist_id, &ptr, NULL); nblock = nblock + 1; + H5Sclose(fs_cpy); } if (ret_value == 0) { hsize_t ss = round_page(dset->H5DRMM->dset.size); @@ -2476,6 +2506,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch(void *obj, hid_t fspace, ret_value = H5VLdataset_read(dset->under_object, dset->under_vol_id, dset->H5DRMM->dset.h5_datatype, mspace, fs_cpy, plist_id, &p[offset], NULL); + H5Sclose(fs_cpy); } if (dset->H5DRMM->dset.ns_loc % nsample_per_block != 0) { hid_t fs_cpy = H5Scopy(fspace); @@ -2493,6 +2524,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch(void *obj, hid_t fspace, dset->H5DRMM->dset.h5_datatype, mspace, fs_cpy, plist_id, &p[offset], NULL); nblock = nblock + 1; + H5Sclose(fs_cpy); } if (ret_value == 0) { hsize_t ss = round_page(dset->H5DRMM->dset.size); @@ -2815,9 +2847,15 @@ static herr_t free_cache_space_from_dataset(void *dset, hsize_t size) { H5Sclose(o->H5DWMM->io->current_request->mem_space_id[i]); H5Sclose(o->H5DWMM->io->current_request->file_space_id[i]); } + H5Pclose(o->H5DWMM->io->current_request->xfer_plist_id); free(o->H5DWMM->io->current_request->mem_type_id); free(o->H5DWMM->io->current_request->mem_space_id); free(o->H5DWMM->io->current_request->file_space_id); +#else + H5Tclose(o->H5DWMM->io->current_request->mem_type_id); + H5Sclose(o->H5DWMM->io->current_request->mem_space_id); + H5Sclose(o->H5DWMM->io->current_request->file_space_id); + H5Pclose(o->H5DWMM->io->current_request->xfer_plist_id); #endif #ifndef NDEBUG #if H5_VERSION_GE(1, 13, 3) @@ -2860,6 +2898,7 @@ void create_task_place_holder(task_data_t **request_list) { t->next->id = t->id + 1; *request_list = t->next; ((task_data_t *)*request_list)->previous = t; + (*request_list)->next = NULL; } #endif @@ -2906,7 +2945,7 @@ static herr_t merge_tasks_in_queue(task_data_t **task_list, int ntasks) { #endif int off = 0; - t_com->xfer_plist_id = r->xfer_plist_id; + t_com->xfer_plist_id = H5Pcopy(r->xfer_plist_id); for (int i = 0; i < ntasks; i++) { for (int j = 0; j < r->count; j++) { t_com->dataset_obj[off + j] = r->dataset_obj[j]; @@ -2986,18 +3025,15 @@ add_current_write_task_to_queue(size_t count, void *dset[], hid_t mem_type_id[], r->mem_space_id = (hid_t *)calloc(count, sizeof(hid_t)); r->file_space_id = (hid_t *)calloc(count, sizeof(hid_t)); if (plist_id > 0) - r->xfer_plist_id = plist_id; + r->xfer_plist_id = H5Pcopy(plist_id); for (i = 0; i < count; i++) { r->dataset_obj[i] = dset[i]; - if (mem_type_id[i] > 0) + //if (mem_type_id[i] > 0) r->mem_type_id[i] = H5Tcopy(mem_type_id[i]); - if (mem_space_id[i] > 0) + //if (mem_space_id[i] > 0) r->mem_space_id[i] = H5Scopy(mem_space_id[i]); - if (file_space_id[i] > 0) + //if (file_space_id[i] > 0) r->file_space_id[i] = H5Scopy(file_space_id[i]); - // H5Tclose(mem_type_id[i]); - // H5Sclose(mem_space_id[i]); - // H5Sclose(file_space_id[i]); } /* set whether to pause async execution */ H5VL_cache_ext_t *p = (H5VL_cache_ext_t *)o->parent; @@ -3384,6 +3420,8 @@ static herr_t H5VL_cache_ext_dataset_optional(void *obj, // dset_args.loc_params = loc_params; ret_value = o->H5LS->cache_io_cls->create_cache(obj, &dset_args, req); + H5Pclose(dset_args.dxpl_id); + H5Pclose(dset_args.lcpl_id); } else if (args->op_type == H5VL_cache_dataset_cache_async_op_pause_op_g) { if (o->write_cache || o->read_cache) { o->async_pause = true; @@ -3438,7 +3476,7 @@ static herr_t H5VL_cache_ext_dataset_wait(void *dset) { double available = o->H5DWMM->cache->mspace_per_rank_left; H5VL_request_status_t status; while ((o->num_request_dataset > 0) && - (o->H5DWMM->io->current_request != NULL)) { + (o->H5DWMM->io->current_request != NULL && o->H5DWMM->io->current_request->req != NULL)) { double t0 = MPI_Wtime(); assert(o->H5DWMM->io->current_request->req != NULL); #ifndef NDEBUG @@ -3451,7 +3489,7 @@ static herr_t H5VL_cache_ext_dataset_wait(void *dset) { INF, &status); #endif if (o->H5DWMM->io->current_request->buf != NULL && - !(strcmp(o->H5LS->scope, "GLOBAL"))) { + (strcmp(o->H5LS->scope, "GLOBAL"))) { free(o->H5DWMM->io->current_request->buf); o->H5DWMM->io->current_request->buf = NULL; #if H5_VERSION_GE(1, 13, 3) @@ -3460,6 +3498,7 @@ static herr_t H5VL_cache_ext_dataset_wait(void *dset) { H5Sclose(o->H5DWMM->io->current_request->mem_space_id[i]); H5Sclose(o->H5DWMM->io->current_request->file_space_id[i]); } + H5Pclose(o->H5DWMM->io->current_request->xfer_plist_id); free(o->H5DWMM->io->current_request->mem_type_id); free(o->H5DWMM->io->current_request->mem_space_id); free(o->H5DWMM->io->current_request->file_space_id); @@ -3467,6 +3506,7 @@ static herr_t H5VL_cache_ext_dataset_wait(void *dset) { H5Tclose(o->H5DWMM->io->current_request->mem_type_id); H5Sclose(o->H5DWMM->io->current_request->mem_space_id); H5Sclose(o->H5DWMM->io->current_request->file_space_id); + H5Pclose(o->H5DWMM->io->current_request->xfer_plist_id); #endif } double t1 = MPI_Wtime(); @@ -3551,7 +3591,24 @@ static herr_t H5VL_cache_ext_file_wait(void *file) { H5async_start(o->H5DWMM->io->current_request->req); H5VLrequest_wait(o->H5DWMM->io->current_request->req, o->under_vol_id, INF, &status); + free(o->H5DWMM->io->current_request->buf); +#if H5_VERSION_GE(1, 13, 3) + for (int i = 0; i < o->H5DWMM->io->current_request->count; i++) { + H5Tclose(o->H5DWMM->io->current_request->mem_type_id[i]); + H5Sclose(o->H5DWMM->io->current_request->mem_space_id[i]); + H5Sclose(o->H5DWMM->io->current_request->file_space_id[i]); + } + H5Pclose(o->H5DWMM->io->current_request->xfer_plist_id); + free(o->H5DWMM->io->current_request->mem_type_id); + free(o->H5DWMM->io->current_request->mem_space_id); + free(o->H5DWMM->io->current_request->file_space_id); +#else + H5Tclose(o->H5DWMM->io->current_request->mem_type_id); + H5Sclose(o->H5DWMM->io->current_request->mem_space_id); + H5Sclose(o->H5DWMM->io->current_request->file_space_id); + H5Pclose(o->H5DWMM->io->current_request->xfer_plist_id); +#endif #ifndef NDEBUG #if H5_VERSION_GE(1, 13, 3) LOG_DEBUG(-1, "Task %d (%lu merged) finished", @@ -3605,8 +3662,6 @@ static herr_t H5VL_cache_ext_dataset_close(void *dset, hid_t dxpl_id, double t0 = MPI_Wtime(); #if H5_VERSION_GE(1, 13, 3) - void *write_req = - ((task_data_t *)o->H5DWMM->io->request_list->previous)->req; if (o->H5DWMM->io->num_fusion_requests > 0) { merge_tasks_in_queue(&o->H5DWMM->io->flush_request, o->H5DWMM->io->num_fusion_requests); @@ -3614,7 +3669,6 @@ static herr_t H5VL_cache_ext_dataset_close(void *dset, hid_t dxpl_id, o->H5DWMM->io->flush_request, req); // flush data for current task; o->H5DWMM->io->num_fusion_requests = 0; o->H5DWMM->io->fusion_data_size = 0.0; - write_req = o->H5DWMM->io->flush_request->req; o->H5DWMM->io->flush_request = o->H5DWMM->io->flush_request->next; } #endif @@ -4079,7 +4133,9 @@ static void *H5VL_cache_ext_file_create(const char *name, unsigned flags, /* Set file cache information */ set_file_cache((void *)file, (void *)args, req); } - + H5Pclose(args->fapl_id); + H5Pclose(args->fcpl_id); + H5Pclose(args->dxpl_id); free(args); /* Close underlying FAPL */ @@ -4152,7 +4208,9 @@ static void *H5VL_cache_ext_file_open(const char *name, unsigned flags, free(args); /* Close underlying FAPL */ H5Pclose(under_fapl_id); - + H5Pclose(args->fapl_id); + H5Pclose(args->fcpl_id); + H5Pclose(args->dxpl_id); H5VL_cache_ext_info_free(info); return (void *)file; } /* end H5VL_cache_ext_file_open() */ @@ -4650,6 +4708,7 @@ static void *H5VL_cache_ext_group_open(void *obj, group->obj_type = H5I_GROUP; group->H5LS->cache_io_cls->create_cache((void *)group, (void *)args, req); + H5Pclose(args->lcpl_id); H5Pclose(args->gcpl_id); H5Pclose(args->gapl_id); H5Pclose(args->dxpl_id); @@ -6578,7 +6637,7 @@ static herr_t create_file_cache_on_global_storage(void *obj, void *file_args, args->fcpl_id, fapl_id_default); #ifndef NDEBUG LOG_DEBUG(-1, " file under_vol_id: %0lx(map), %0lx", async_vol_id, - file->under_vol_id); + file->under_vol_id); #endif file->H5DWMM->io->request_list = (task_data_t *)malloc(sizeof(task_data_t)); file->H5DWMM->io->request_list->req = NULL; @@ -6611,8 +6670,8 @@ static herr_t create_group_cache_on_global_storage(void *obj, void *group_args, #ifndef NDEBUG LOG_INFO(-1, "Create Group Cache on global storage"); #endif - o->hd_glob = H5Gcreate(p->hd_glob, args->name, args->lcpl_id, args->gcpl_id, - args->gapl_id); + o->hd_glob = H5Gcreate_async(p->hd_glob, args->name, args->lcpl_id, args->gcpl_id, + args->gapl_id, H5ES_NONE); return SUCCEED; } @@ -6621,7 +6680,7 @@ static herr_t remove_group_cache_on_global_storage(void *obj, void **req) { LOG_INFO(-1, "VOL group cache remove on global storage "); #endif H5VL_cache_ext_t *o = (H5VL_cache_ext_t *)obj; - H5Gclose(o->hd_glob); + H5Gclose_async(o->hd_glob, H5ES_NONE); free(o->H5DWMM->mmap); free(o->H5DWMM); o->H5DWMM = NULL; @@ -6728,8 +6787,8 @@ static herr_t create_dataset_cache_on_global_storage(void *obj, void *dset_args, LOG_DEBUG(-1, "Create dataset in parent group"); #endif dset->hd_glob = - H5Dcreate(p->hd_glob, args->name, args->type_id, args->space_id, - args->lcpl_id, args->dcpl_id, args->dapl_id); + H5Dcreate_async(p->hd_glob, args->name, args->type_id, args->space_id, + args->lcpl_id, args->dcpl_id, args->dapl_id, H5ES_NONE); #ifndef NDEBUG LOG_DEBUG(-1, "Create dataset in parent group done"); #endif @@ -6765,6 +6824,7 @@ static void *write_data_to_global_storage(void *dset, hid_t mem_type_id, H5Pset_dxpl_disable_async_implicit(dxpl_id, TRUE); H5Dwrite_async(d->hd_glob, mem_type_id, mem_space_id, file_space_id, dxpl_id, buf, H5ES_NONE); + H5Pclose(dxpl_id); H5LSrecord_cache_access(d->H5DWMM->cache); return NULL; } @@ -6788,10 +6848,11 @@ static herr_t read_data_from_global_storage(void *dset, hid_t mem_type_id, LOG_INFO(-1, "VOL DATASET Read from cache"); #endif LOG_DEBUG(o->H5DWMM->mpi->rank, "dataset_read_from_cache"); - hid_t dxpl_id = H5Pcopy(plist_id); - H5Pset_dxpl_disable_async_implicit(dxpl_id, TRUE); +// H5Dread_async(o->hd_glob, mem_type_id, mem_space_id, file_space_id, plist_id, +// buf, H5ES_NONE); + H5Dread_async(o->hd_glob, mem_type_id, mem_space_id, file_space_id, plist_id, - buf, H5ES_NONE); + buf, H5ES_NONE); H5LSrecord_cache_access(o->H5DWMM->cache); return SUCCEED; } /* end */ @@ -6852,14 +6913,16 @@ static herr_t flush_data_from_global_storage(void *current_request, p = (H5VL_cache_ext_t *)p->parent; H5Pset_dxpl_pause(dxpl_id, p->async_pause); // temparally fix + H5Dread_multi_async(task->count, task->dataset_id, task->mem_type_id, + task->mem_space_id, task->file_space_id, dxpl_id, + task->buf, o->es_id); + ret_value = H5VLdataset_write(count, obj, o->under_vol_id, task->mem_type_id, task->mem_space_id, task->file_space_id, dxpl_id, (const void **)task->buf, &task->req); assert(task->req != NULL); - H5Dread_multi_async(task->count, task->dataset_id, task->mem_type_id, - task->mem_space_id, task->file_space_id, dxpl_id, - task->buf, o->es_id); + H5Pset_dxpl_pause(dxpl_id, true); #ifndef NDEBUG @@ -6903,6 +6966,7 @@ static herr_t flush_data_from_global_storage(void *current_request, o->H5DWMM->io->num_request++; if (obj != &obj_local) free(obj); + H5Pclose(dxpl_id); return ret_value; } #else @@ -6968,6 +7032,7 @@ static herr_t flush_data_from_global_storage(void *current_request, // record the total number of request o->H5DWMM->io->num_request++; o->num_request_dataset++; + H5Pclose(dxpl_id); return ret_value; } #endif @@ -6989,7 +7054,7 @@ static herr_t remove_dataset_cache_on_global_storage(void *dset, void **req) { if (o->write_cache) H5VL_cache_ext_dataset_wait(dset); if (o->write_cache || o->read_cache) { - H5Dclose(o->hd_glob); + H5Dclose_async(o->hd_glob, H5ES_NONE); free(o->H5DWMM->cache); free(o->H5DWMM->mmap); free(o->H5DWMM); diff --git a/tests/test_local_ssd_async.sh b/tests/test_local_ssd_async.sh index 575b97b..435f587 100755 --- a/tests/test_local_ssd_async.sh +++ b/tests/test_local_ssd_async.sh @@ -9,6 +9,6 @@ HDF5_CACHE_STORAGE_SCOPE: LOCAL HDF5_CACHE_STORAGE_SIZE: 1287558138880 HDF5_CACHE_WRITE_BUFFER_SIZE: 10485760000 " > cache_1.cfg -HDF5_CACHE_WR=yes mpirun -np 2 write_cache.exe --async_close --sleep 0.25 +HDF5_CACHE_WR=yes mpirun -np 1 write_cache.exe --async_close --sleep 0.25 --niter 2 --nvars 1