Skip to content

Commit

Permalink
clear up P S that are not closed
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghh04 committed Sep 27, 2023
1 parent 37b56d6 commit e83225f
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 32 deletions.
127 changes: 96 additions & 31 deletions src/H5VLcache_ext.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,35 @@ herr_t H5async_start(void *request);
herr_t H5async_stop(void *request);
herr_t H5VL_async_pause();
herr_t H5VL_async_start();

#define H5Pcopy(X) \
H5Pcopy(X); \
LOG_DEBUG(-1, "H5Pcopy called: %s:%d %s\n", __FILE__, __LINE__, __FUNCTION__);

#define H5Scopy(X) \
H5Scopy(X); \
LOG_DEBUG(-1, "H5Scopy called: %s:%d %s\n", __FILE__, __LINE__, __FUNCTION__);
#define H5Screate_simple(...) \
H5Screate_simple(__VA_ARGS__); \
LOG_DEBUG(-1, "H5Screate_simple called: %s:%d %s\n", __FILE__, __LINE__, __FUNCTION__);

#define H5Sclose(X) \
H5Sclose(X); \
LOG_DEBUG(-1, "H5Sclose called: %s:%d %s\n", __FILE__, __LINE__, __FUNCTION__);

#define H5Pcreate(X) \
H5Pcreate(X); \
LOG_DEBUG(-1, "H5Pcreate called %s:%d %s\n", __FILE__, __LINE__, __FUNCTION__);

#define H5Pclose(X) \
H5Pclose(X); \
LOG_DEBUG(-1, "H5Pclose called %s:%d %s\n", __FILE__, __LINE__, __FUNCTION__);

/************/
/* Typedefs */
/************/


/* The cache VOL wrapper context */
typedef struct H5VL_cache_ext_wrap_ctx_t {
hid_t under_vol_id; /* VOL ID for under VOL */
Expand Down Expand Up @@ -758,7 +783,7 @@ static herr_t async_close_task_wait(object_close_task_t *task) {
#endif
return 0;
}
if (get_close_async() == 0) {
if (o->async_close) {
#ifndef NDEBUG
LOG_WARN(-1, "Close call is not async; will do nothing");
#endif
Expand Down Expand Up @@ -1058,7 +1083,7 @@ static herr_t H5VL_cache_ext_init(hid_t vipl_id) {
H5LS_stack->next = NULL;
if (!getenv("ABT_THREAD_STACKSIZE"))
setenv("ABT_THREAD_STACKSIZE", "100000", 1);
setenv("HDF5_ASYNC_DISABLE_IMPLICIT_NON_DSET_RW", "1", 1);
//setenv("HDF5_ASYNC_DISABLE_IMPLICIT_NON_DSET_RW", "1", 1);
// async_close_task_list = (object_close_task_t *)
// malloc(sizeof(object_close_task_t)); async_close_task_list->next = NULL;
// async_close_task_current = async_close_task_list;
Expand Down Expand Up @@ -2102,6 +2127,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch_async(void *obj, hid_t fspace,
&dset->H5DRMM->dset.h5_datatype, &mspace,
&fs_cpy, plist_id, &ptr, &r->req);
r = r->next;
H5Sclose(fs_cpy);
}
if (dset->H5DRMM->dset.ns_loc % nsample_per_block != 0) {
hid_t fs_cpy = H5Scopy(fspace);
Expand Down Expand Up @@ -2187,6 +2213,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch_async(void *obj, hid_t fspace,
fs_cpy, plist_id, &p[offset], &r->req);

r = r->next;
H5Sclose(fs_cpy);
}
if (dset->H5DRMM->dset.ns_loc % nsample_per_block != 0) {
hid_t fs_cpy = H5Scopy(fspace);
Expand All @@ -2208,6 +2235,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch_async(void *obj, hid_t fspace,
fs_cpy, plist_id, &p[offset], &r->req);
nblock = nblock + 1;
r = r->next;
H5Sclose(fs_cpy);
}
free(samples);
}
Expand Down Expand Up @@ -2270,7 +2298,7 @@ static void *H5VL_cache_ext_dataset_open(void *obj,
req);
H5Pclose(args->lcpl_id);
H5Tclose(args->type_id);
H5Pclose(args->space_id);
H5Sclose(args->space_id);
H5Pclose(args->dcpl_id);
H5Pclose(args->dapl_id);
H5Pclose(args->dxpl_id);
Expand Down Expand Up @@ -2381,6 +2409,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch(void *obj, hid_t fspace,
dset->H5DRMM->dset.h5_datatype, mspace,
fs_cpy, plist_id, &p[offset], NULL);
#endif
H5Sclose(fs_cpy);
}
if (dset->H5DRMM->dset.ns_loc % nsample_per_block != 0) {
hid_t fs_cpy = H5Scopy(fspace);
Expand All @@ -2399,6 +2428,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch(void *obj, hid_t fspace,
&dset->H5DRMM->dset.h5_datatype, &mspace,
&fs_cpy, plist_id, &ptr, NULL);
nblock = nblock + 1;
H5Sclose(fs_cpy);
}
if (ret_value == 0) {
hsize_t ss = round_page(dset->H5DRMM->dset.size);
Expand Down Expand Up @@ -2476,6 +2506,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch(void *obj, hid_t fspace,
ret_value = H5VLdataset_read(dset->under_object, dset->under_vol_id,
dset->H5DRMM->dset.h5_datatype, mspace,
fs_cpy, plist_id, &p[offset], NULL);
H5Sclose(fs_cpy);
}
if (dset->H5DRMM->dset.ns_loc % nsample_per_block != 0) {
hid_t fs_cpy = H5Scopy(fspace);
Expand All @@ -2493,6 +2524,7 @@ static herr_t H5VL_cache_ext_dataset_prefetch(void *obj, hid_t fspace,
dset->H5DRMM->dset.h5_datatype, mspace,
fs_cpy, plist_id, &p[offset], NULL);
nblock = nblock + 1;
H5Sclose(fs_cpy);
}
if (ret_value == 0) {
hsize_t ss = round_page(dset->H5DRMM->dset.size);
Expand Down Expand Up @@ -2815,9 +2847,15 @@ static herr_t free_cache_space_from_dataset(void *dset, hsize_t size) {
H5Sclose(o->H5DWMM->io->current_request->mem_space_id[i]);
H5Sclose(o->H5DWMM->io->current_request->file_space_id[i]);
}
H5Pclose(o->H5DWMM->io->current_request->xfer_plist_id);
free(o->H5DWMM->io->current_request->mem_type_id);
free(o->H5DWMM->io->current_request->mem_space_id);
free(o->H5DWMM->io->current_request->file_space_id);
#else
H5Tclose(o->H5DWMM->io->current_request->mem_type_id);
H5Sclose(o->H5DWMM->io->current_request->mem_space_id);
H5Sclose(o->H5DWMM->io->current_request->file_space_id);
H5Pclose(o->H5DWMM->io->current_request->xfer_plist_id);
#endif
#ifndef NDEBUG
#if H5_VERSION_GE(1, 13, 3)
Expand Down Expand Up @@ -2860,6 +2898,7 @@ void create_task_place_holder(task_data_t **request_list) {
t->next->id = t->id + 1;
*request_list = t->next;
((task_data_t *)*request_list)->previous = t;
(*request_list)->next = NULL;
}
#endif

Expand Down Expand Up @@ -2906,7 +2945,7 @@ static herr_t merge_tasks_in_queue(task_data_t **task_list, int ntasks) {

#endif
int off = 0;
t_com->xfer_plist_id = r->xfer_plist_id;
t_com->xfer_plist_id = H5Pcopy(r->xfer_plist_id);
for (int i = 0; i < ntasks; i++) {
for (int j = 0; j < r->count; j++) {
t_com->dataset_obj[off + j] = r->dataset_obj[j];
Expand Down Expand Up @@ -2986,18 +3025,15 @@ add_current_write_task_to_queue(size_t count, void *dset[], hid_t mem_type_id[],
r->mem_space_id = (hid_t *)calloc(count, sizeof(hid_t));
r->file_space_id = (hid_t *)calloc(count, sizeof(hid_t));
if (plist_id > 0)
r->xfer_plist_id = plist_id;
r->xfer_plist_id = H5Pcopy(plist_id);
for (i = 0; i < count; i++) {
r->dataset_obj[i] = dset[i];
if (mem_type_id[i] > 0)
//if (mem_type_id[i] > 0)
r->mem_type_id[i] = H5Tcopy(mem_type_id[i]);
if (mem_space_id[i] > 0)
//if (mem_space_id[i] > 0)
r->mem_space_id[i] = H5Scopy(mem_space_id[i]);
if (file_space_id[i] > 0)
//if (file_space_id[i] > 0)
r->file_space_id[i] = H5Scopy(file_space_id[i]);
// H5Tclose(mem_type_id[i]);
// H5Sclose(mem_space_id[i]);
// H5Sclose(file_space_id[i]);
}
/* set whether to pause async execution */
H5VL_cache_ext_t *p = (H5VL_cache_ext_t *)o->parent;
Expand Down Expand Up @@ -3384,6 +3420,8 @@ static herr_t H5VL_cache_ext_dataset_optional(void *obj,
// dset_args.loc_params = loc_params;
ret_value =
o->H5LS->cache_io_cls->create_cache(obj, &dset_args, req);
H5Pclose(dset_args.dxpl_id);
H5Pclose(dset_args.lcpl_id);
} else if (args->op_type == H5VL_cache_dataset_cache_async_op_pause_op_g) {
if (o->write_cache || o->read_cache) {
o->async_pause = true;
Expand Down Expand Up @@ -3438,7 +3476,7 @@ static herr_t H5VL_cache_ext_dataset_wait(void *dset) {
double available = o->H5DWMM->cache->mspace_per_rank_left;
H5VL_request_status_t status;
while ((o->num_request_dataset > 0) &&
(o->H5DWMM->io->current_request != NULL)) {
(o->H5DWMM->io->current_request != NULL && o->H5DWMM->io->current_request->req != NULL)) {
double t0 = MPI_Wtime();
assert(o->H5DWMM->io->current_request->req != NULL);
#ifndef NDEBUG
Expand All @@ -3451,7 +3489,7 @@ static herr_t H5VL_cache_ext_dataset_wait(void *dset) {
INF, &status);
#endif
if (o->H5DWMM->io->current_request->buf != NULL &&
!(strcmp(o->H5LS->scope, "GLOBAL"))) {
(strcmp(o->H5LS->scope, "GLOBAL"))) {
free(o->H5DWMM->io->current_request->buf);
o->H5DWMM->io->current_request->buf = NULL;
#if H5_VERSION_GE(1, 13, 3)
Expand All @@ -3460,13 +3498,15 @@ static herr_t H5VL_cache_ext_dataset_wait(void *dset) {
H5Sclose(o->H5DWMM->io->current_request->mem_space_id[i]);
H5Sclose(o->H5DWMM->io->current_request->file_space_id[i]);
}
H5Pclose(o->H5DWMM->io->current_request->xfer_plist_id);
free(o->H5DWMM->io->current_request->mem_type_id);
free(o->H5DWMM->io->current_request->mem_space_id);
free(o->H5DWMM->io->current_request->file_space_id);
#else
H5Tclose(o->H5DWMM->io->current_request->mem_type_id);
H5Sclose(o->H5DWMM->io->current_request->mem_space_id);
H5Sclose(o->H5DWMM->io->current_request->file_space_id);
H5Pclose(o->H5DWMM->io->current_request->xfer_plist_id);
#endif
}
double t1 = MPI_Wtime();
Expand Down Expand Up @@ -3551,7 +3591,24 @@ static herr_t H5VL_cache_ext_file_wait(void *file) {
H5async_start(o->H5DWMM->io->current_request->req);
H5VLrequest_wait(o->H5DWMM->io->current_request->req, o->under_vol_id,
INF, &status);

free(o->H5DWMM->io->current_request->buf);
#if H5_VERSION_GE(1, 13, 3)
for (int i = 0; i < o->H5DWMM->io->current_request->count; i++) {
H5Tclose(o->H5DWMM->io->current_request->mem_type_id[i]);
H5Sclose(o->H5DWMM->io->current_request->mem_space_id[i]);
H5Sclose(o->H5DWMM->io->current_request->file_space_id[i]);
}
H5Pclose(o->H5DWMM->io->current_request->xfer_plist_id);
free(o->H5DWMM->io->current_request->mem_type_id);
free(o->H5DWMM->io->current_request->mem_space_id);
free(o->H5DWMM->io->current_request->file_space_id);
#else
H5Tclose(o->H5DWMM->io->current_request->mem_type_id);
H5Sclose(o->H5DWMM->io->current_request->mem_space_id);
H5Sclose(o->H5DWMM->io->current_request->file_space_id);
H5Pclose(o->H5DWMM->io->current_request->xfer_plist_id);
#endif
#ifndef NDEBUG
#if H5_VERSION_GE(1, 13, 3)
LOG_DEBUG(-1, "Task %d (%lu merged) finished",
Expand Down Expand Up @@ -3605,16 +3662,13 @@ static herr_t H5VL_cache_ext_dataset_close(void *dset, hid_t dxpl_id,
double t0 = MPI_Wtime();

#if H5_VERSION_GE(1, 13, 3)
void *write_req =
((task_data_t *)o->H5DWMM->io->request_list->previous)->req;
if (o->H5DWMM->io->num_fusion_requests > 0) {
merge_tasks_in_queue(&o->H5DWMM->io->flush_request,
o->H5DWMM->io->num_fusion_requests);
o->H5LS->cache_io_cls->flush_data_from_cache(
o->H5DWMM->io->flush_request, req); // flush data for current task;
o->H5DWMM->io->num_fusion_requests = 0;
o->H5DWMM->io->fusion_data_size = 0.0;
write_req = o->H5DWMM->io->flush_request->req;
o->H5DWMM->io->flush_request = o->H5DWMM->io->flush_request->next;
}
#endif
Expand Down Expand Up @@ -4079,7 +4133,9 @@ static void *H5VL_cache_ext_file_create(const char *name, unsigned flags,
/* Set file cache information */
set_file_cache((void *)file, (void *)args, req);
}

H5Pclose(args->fapl_id);
H5Pclose(args->fcpl_id);
H5Pclose(args->dxpl_id);
free(args);

/* Close underlying FAPL */
Expand Down Expand Up @@ -4152,7 +4208,9 @@ static void *H5VL_cache_ext_file_open(const char *name, unsigned flags,
free(args);
/* Close underlying FAPL */
H5Pclose(under_fapl_id);

H5Pclose(args->fapl_id);
H5Pclose(args->fcpl_id);
H5Pclose(args->dxpl_id);
H5VL_cache_ext_info_free(info);
return (void *)file;
} /* end H5VL_cache_ext_file_open() */
Expand Down Expand Up @@ -4650,6 +4708,7 @@ static void *H5VL_cache_ext_group_open(void *obj,
group->obj_type = H5I_GROUP;
group->H5LS->cache_io_cls->create_cache((void *)group, (void *)args,
req);
H5Pclose(args->lcpl_id);
H5Pclose(args->gcpl_id);
H5Pclose(args->gapl_id);
H5Pclose(args->dxpl_id);
Expand Down Expand Up @@ -6578,7 +6637,7 @@ static herr_t create_file_cache_on_global_storage(void *obj, void *file_args,
args->fcpl_id, fapl_id_default);
#ifndef NDEBUG
LOG_DEBUG(-1, " file under_vol_id: %0lx(map), %0lx", async_vol_id,
file->under_vol_id);
file->under_vol_id);
#endif
file->H5DWMM->io->request_list = (task_data_t *)malloc(sizeof(task_data_t));
file->H5DWMM->io->request_list->req = NULL;
Expand Down Expand Up @@ -6611,8 +6670,8 @@ static herr_t create_group_cache_on_global_storage(void *obj, void *group_args,
#ifndef NDEBUG
LOG_INFO(-1, "Create Group Cache on global storage");
#endif
o->hd_glob = H5Gcreate(p->hd_glob, args->name, args->lcpl_id, args->gcpl_id,
args->gapl_id);
o->hd_glob = H5Gcreate_async(p->hd_glob, args->name, args->lcpl_id, args->gcpl_id,
args->gapl_id, H5ES_NONE);
return SUCCEED;
}

Expand All @@ -6621,7 +6680,7 @@ static herr_t remove_group_cache_on_global_storage(void *obj, void **req) {
LOG_INFO(-1, "VOL group cache remove on global storage ");
#endif
H5VL_cache_ext_t *o = (H5VL_cache_ext_t *)obj;
H5Gclose(o->hd_glob);
H5Gclose_async(o->hd_glob, H5ES_NONE);
free(o->H5DWMM->mmap);
free(o->H5DWMM);
o->H5DWMM = NULL;
Expand Down Expand Up @@ -6728,8 +6787,8 @@ static herr_t create_dataset_cache_on_global_storage(void *obj, void *dset_args,
LOG_DEBUG(-1, "Create dataset in parent group");
#endif
dset->hd_glob =
H5Dcreate(p->hd_glob, args->name, args->type_id, args->space_id,
args->lcpl_id, args->dcpl_id, args->dapl_id);
H5Dcreate_async(p->hd_glob, args->name, args->type_id, args->space_id,
args->lcpl_id, args->dcpl_id, args->dapl_id, H5ES_NONE);
#ifndef NDEBUG
LOG_DEBUG(-1, "Create dataset in parent group done");
#endif
Expand Down Expand Up @@ -6765,6 +6824,7 @@ static void *write_data_to_global_storage(void *dset, hid_t mem_type_id,
H5Pset_dxpl_disable_async_implicit(dxpl_id, TRUE);
H5Dwrite_async(d->hd_glob, mem_type_id, mem_space_id, file_space_id, dxpl_id,
buf, H5ES_NONE);
H5Pclose(dxpl_id);
H5LSrecord_cache_access(d->H5DWMM->cache);
return NULL;
}
Expand All @@ -6788,10 +6848,11 @@ static herr_t read_data_from_global_storage(void *dset, hid_t mem_type_id,
LOG_INFO(-1, "VOL DATASET Read from cache");
#endif
LOG_DEBUG(o->H5DWMM->mpi->rank, "dataset_read_from_cache");
hid_t dxpl_id = H5Pcopy(plist_id);
H5Pset_dxpl_disable_async_implicit(dxpl_id, TRUE);
// H5Dread_async(o->hd_glob, mem_type_id, mem_space_id, file_space_id, plist_id,
// buf, H5ES_NONE);

H5Dread_async(o->hd_glob, mem_type_id, mem_space_id, file_space_id, plist_id,
buf, H5ES_NONE);
buf, H5ES_NONE);
H5LSrecord_cache_access(o->H5DWMM->cache);
return SUCCEED;
} /* end */
Expand Down Expand Up @@ -6852,14 +6913,16 @@ static herr_t flush_data_from_global_storage(void *current_request,
p = (H5VL_cache_ext_t *)p->parent;
H5Pset_dxpl_pause(dxpl_id, p->async_pause);
// temparally fix
H5Dread_multi_async(task->count, task->dataset_id, task->mem_type_id,
task->mem_space_id, task->file_space_id, dxpl_id,
task->buf, o->es_id);

ret_value = H5VLdataset_write(count, obj, o->under_vol_id, task->mem_type_id,
task->mem_space_id, task->file_space_id,
dxpl_id, (const void **)task->buf, &task->req);
assert(task->req != NULL);

H5Dread_multi_async(task->count, task->dataset_id, task->mem_type_id,
task->mem_space_id, task->file_space_id, dxpl_id,
task->buf, o->es_id);

H5Pset_dxpl_pause(dxpl_id, true);
#ifndef NDEBUG

Expand Down Expand Up @@ -6903,6 +6966,7 @@ static herr_t flush_data_from_global_storage(void *current_request,
o->H5DWMM->io->num_request++;
if (obj != &obj_local)
free(obj);
H5Pclose(dxpl_id);
return ret_value;
}
#else
Expand Down Expand Up @@ -6968,6 +7032,7 @@ static herr_t flush_data_from_global_storage(void *current_request,
// record the total number of request
o->H5DWMM->io->num_request++;
o->num_request_dataset++;
H5Pclose(dxpl_id);
return ret_value;
}
#endif
Expand All @@ -6989,7 +7054,7 @@ static herr_t remove_dataset_cache_on_global_storage(void *dset, void **req) {
if (o->write_cache)
H5VL_cache_ext_dataset_wait(dset);
if (o->write_cache || o->read_cache) {
H5Dclose(o->hd_glob);
H5Dclose_async(o->hd_glob, H5ES_NONE);
free(o->H5DWMM->cache);
free(o->H5DWMM->mmap);
free(o->H5DWMM);
Expand Down
Loading

0 comments on commit e83225f

Please sign in to comment.