Skip to content

Commit

Permalink
Merge pull request #156 from Parallel-NetCDF/intra_node3
Browse files Browse the repository at this point in the history
New feature: intra-node write aggregation
  • Loading branch information
wkliao authored Nov 7, 2024
2 parents c1b24df + 7b4491a commit 8981841
Show file tree
Hide file tree
Showing 40 changed files with 1,933 additions and 219 deletions.
7 changes: 7 additions & 0 deletions benchmarks/C/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -72,6 +78,7 @@ for i in ${check_PROGRAMS} ; do
# Validator does not support nc4
fi
done
done
rm -f ${OUTDIR}/$i.nc
rm -f ${OUTDIR}/$i.bb.nc
rm -f ${OUTDIR}/$i.nc4
Expand Down
7 changes: 7 additions & 0 deletions benchmarks/FLASH-IO/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -72,6 +78,7 @@ for i in ${check_PROGRAMS} ; do
${MPIRUN} ${NCMPIDIFF} -q ${TESTOUTDIR}/$i.ncmpi_plt_crn_0000.nc ${TESTOUTDIR}/$i.bb.ncmpi_plt_crn_0000.nc
fi
done
done
rm -f ${OUTDIR}/$i.ncmpi_chk_0000.nc
rm -f ${OUTDIR}/$i.ncmpi_plt_cnt_0000.nc
rm -f ${OUTDIR}/$i.ncmpi_plt_crn_0000.nc
Expand Down
7 changes: 7 additions & 0 deletions examples/C/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -92,6 +98,7 @@ for i in ${check_PROGRAMS} ; do
# Validator does not support nc4
fi

done
done
# delete output file
if test $i = get_vara ; then
Expand Down
7 changes: 7 additions & 0 deletions examples/CXX/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -90,6 +96,7 @@ for i in ${check_PROGRAMS} ; do
# Validator does not support nc4
fi
done
done
# delete output file
if test $i = get_vara ; then
rm -f ${OUTDIR}/put_vara.nc
Expand Down
7 changes: 7 additions & 0 deletions examples/F77/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -90,6 +96,7 @@ for i in ${check_PROGRAMS} ; do
# Validator does not support nc4
fi
done
done
# delete output file
rm -f ${OUTDIR}/$i.nc
rm -f ${OUTDIR}/$i.bb.nc
Expand Down
7 changes: 7 additions & 0 deletions examples/F90/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -90,6 +96,7 @@ for i in ${check_PROGRAMS} ; do
# Validator does not support nc4
fi
done
done
# delete output file
rm -f ${OUTDIR}/$i.nc
rm -f ${OUTDIR}/$i.bb.nc
Expand Down
7 changes: 7 additions & 0 deletions examples/adios/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand All @@ -42,5 +48,6 @@ for i in ${check_PROGRAMS} ; do
echo "PASS: C parallel run on $1 processes --------------- $i"
fi
done
done
done

7 changes: 7 additions & 0 deletions examples/burst_buffer/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@ unset PNETCDF_HINTS
for i in ${check_PROGRAMS} ; do
# echo "---- exec=$i"
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand All @@ -49,6 +55,7 @@ for i in ${check_PROGRAMS} ; do
${TESTSEQRUN} ${VALIDATOR} -q ${TESTOUTDIR}/$i.nc
# echo ""
done
done
# delete output files
rm -f ${OUTDIR}/$i.nc
done
Expand Down
7 changes: 7 additions & 0 deletions examples/tutorial/parallel_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ unset PNETCDF_HINTS

for i in ${check_PROGRAMS} ; do
for j in ${safe_modes} ; do
for intra_aggr in 0 1 ; do
if test "$j" = 1 ; then # test only in safe mode
export PNETCDF_HINTS="romio_no_indep_rw=true"
else
export PNETCDF_HINTS=
fi
if test "$intra_aggr" = 1 ; then
export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2"
fi
export PNETCDF_SAFE_MODE=$j
# echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}"
Expand Down Expand Up @@ -136,6 +142,7 @@ for i in ${check_PROGRAMS} ; do
# Validator does not support nc4
fi
done
done
done

rm -f ${OUTDIR}/pnetcdf-*.nc
Expand Down
16 changes: 12 additions & 4 deletions src/drivers/common/mem_alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ void ncmpii_add_mem_entry(void *buf,
/*----< ncmpii_del_mem_entry() >---------------------------------------------*/
/* delete a malloc entry from the table */
static
void ncmpii_del_mem_entry(void *buf)
int ncmpii_del_mem_entry(void *buf)
{
int err=0;

#ifdef ENABLE_THREAD_SAFE
pthread_mutex_lock(&lock);
#endif
Expand All @@ -146,6 +148,7 @@ void ncmpii_del_mem_entry(void *buf)
if (ret == NULL) {
fprintf(stderr, "Error at line %d file %s: tfind() buf=%p\n",
__LINE__,__FILE__,buf);
err = 1;
goto fn_exit;
}
/* free space for func and filename */
Expand All @@ -159,6 +162,7 @@ void ncmpii_del_mem_entry(void *buf)
if (ret == NULL) {
fprintf(stderr, "Error at line %d file %s: tdelete() buf=%p\n",
__LINE__,__FILE__,buf);
err = 1;
goto fn_exit;
}
free(tmp);
Expand All @@ -170,7 +174,7 @@ void ncmpii_del_mem_entry(void *buf)
#ifdef ENABLE_THREAD_SAFE
pthread_mutex_unlock(&lock);
#endif
return;
return err;
}
#endif

Expand Down Expand Up @@ -246,7 +250,9 @@ void *NCI_Realloc_fn(void *ptr,
}

#ifdef PNC_MALLOC_TRACE
ncmpii_del_mem_entry(ptr);
if (ncmpii_del_mem_entry(ptr) != 0)
fprintf(stderr, "realloc failed in file %s func %s line %d\n",
filename, func, lineno);
#endif
void *buf = (void *) realloc(ptr, size);
#ifdef PNETCDF_DEBUG
Expand Down Expand Up @@ -275,7 +281,9 @@ void NCI_Free_fn(void *ptr,
{
if (ptr == NULL) return;
#ifdef PNC_MALLOC_TRACE
ncmpii_del_mem_entry(ptr);
if (ncmpii_del_mem_entry(ptr) != 0)
fprintf(stderr, "free failed in file %s func %s line %d\n",
filename, func, lineno);
#endif
free(ptr);
}
Expand Down
3 changes: 2 additions & 1 deletion src/drivers/ncmpio/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ C_SRCS = ncmpio_driver.c \
ncmpio_fill.c \
ncmpio_util.c \
ncmpio_hash_func.c \
ncmpio_file_io.c
ncmpio_file_io.c \
ncmpio_intra_node.c

$(M4_SRCS:.m4=.c): Makefile

Expand Down
24 changes: 24 additions & 0 deletions src/drivers/ncmpio/ncmpio_NC.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,17 @@ struct NC {

char *path; /* file name */
struct NC *old; /* contains the previous NC during redef. */

/* Below are used for intra-node aggregation */
int num_aggrs_per_node; /* number of aggregators per compute node. Set
through a user hint. 0 to disable the
intra-node aggregation, -1 to let PnetCDF to
decide. This value must be the same among all
processes.
*/
int my_aggr; /* rank ID of my aggregator */
int num_nonaggrs; /* number of non-aggregators assigned */
int *nonaggr_ranks; /* ranks of assigned non-aggregators */
};

#define NC_readonly(ncp) fIsSet((ncp)->flags, NC_MODE_RDONLY)
Expand Down Expand Up @@ -631,4 +642,17 @@ ncmpio_read_write(NC *ncp, int rw_flag, int coll_indep, MPI_Offset offset,
MPI_Offset buf_count, MPI_Datatype buf_type, void *buf,
int buftype_is_contig);

/* Begin defined in ncmpio_intranode.c --------------------------------------*/
extern int
ncmpio_intra_node_aggr_init(NC *ncp);

extern int
ncmpio_intra_node_aggregation_nreqs(NC *ncp, int num_reqs, NC_req *put_list,
MPI_Offset newnumrecs);
extern int
ncmpio_intra_node_aggregation(NC *ncp, NC_var *varp, const MPI_Offset *start,
const MPI_Offset *count,
const MPI_Offset *stride, MPI_Offset bufCount,
MPI_Datatype bufType, void *buf);

#endif /* H_NC */
17 changes: 7 additions & 10 deletions src/drivers/ncmpio/ncmpio_close.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ ncmpio_free_NC(NC *ncp)
*/
if (ncp->mpiinfo != MPI_INFO_NULL) MPI_Info_free(&ncp->mpiinfo);

if (ncp->get_list != NULL) NCI_Free(ncp->get_list);
if (ncp->put_list != NULL) NCI_Free(ncp->put_list);
if (ncp->abuf != NULL) NCI_Free(ncp->abuf);
if (ncp->path != NULL) NCI_Free(ncp->path);
if (ncp->get_list != NULL) NCI_Free(ncp->get_list);
if (ncp->put_list != NULL) NCI_Free(ncp->put_list);
if (ncp->abuf != NULL) NCI_Free(ncp->abuf);
if (ncp->path != NULL) NCI_Free(ncp->path);
if (ncp->nonaggr_ranks != NULL) NCI_Free(ncp->nonaggr_ranks);

NCI_Free(ncp);
}
Expand Down Expand Up @@ -144,17 +145,13 @@ ncmpio_close(void *ncdp)
}
#else
if (ncp->numLeadGetReqs > 0) {
int rank;
MPI_Comm_rank(ncp->comm, &rank);
printf("PnetCDF warning: %d nonblocking get requests still pending on process %d. Cancelling ...\n",ncp->numLeadGetReqs,rank);
printf("PnetCDF warning: %d nonblocking get requests still pending on process %d. Cancelling ...\n",ncp->numLeadGetReqs,ncp->rank);
err = ncmpio_cancel(ncp, NC_GET_REQ_ALL, NULL, NULL);
if (status == NC_NOERR) status = err;
if (status == NC_NOERR) status = NC_EPENDING;
}
if (ncp->numLeadPutReqs > 0) {
int rank;
MPI_Comm_rank(ncp->comm, &rank);
printf("PnetCDF warning: %d nonblocking put requests still pending on process %d. Cancelling ...\n",ncp->numLeadPutReqs,rank);
printf("PnetCDF warning: %d nonblocking put requests still pending on process %d. Cancelling ...\n",ncp->numLeadPutReqs,ncp->rank);
err = ncmpio_cancel(ncp, NC_PUT_REQ_ALL, NULL, NULL);
if (status == NC_NOERR) status = err;
if (status == NC_NOERR) status = NC_EPENDING;
Expand Down
13 changes: 13 additions & 0 deletions src/drivers/ncmpio/ncmpio_create.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,19 @@ ncmpio_create(MPI_Comm comm,
* be '\0' (null character). In this case, safe_mode is enabled */
}

/* determine whether to enable intra-node aggregation and set up all
* intra-node aggregation metadata.
* ncp->num_aggrs_per_node = 0, or non-zero indicates whether this feature
* is enabled globally for all processes.
* ncp->my_aggr = -1 or >= 0 indicates whether aggregation is effectively
* enabled for the aggregation group of this process.
*/
ncp->my_aggr = -1;
if (ncp->num_aggrs_per_node != 0) {
err = ncmpio_intra_node_aggr_init(ncp);
if (err != NC_NOERR) return err;
}

*ncpp = (void*)ncp;

return NC_NOERR;
Expand Down
Loading

0 comments on commit 8981841

Please sign in to comment.