diff --git a/benchmarks/C/parallel_run.sh b/benchmarks/C/parallel_run.sh index 06996c66c..75aa106a0 100755 --- a/benchmarks/C/parallel_run.sh +++ b/benchmarks/C/parallel_run.sh @@ -29,8 +29,14 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -72,6 +78,7 @@ for i in ${check_PROGRAMS} ; do # Validator does not support nc4 fi done + done rm -f ${OUTDIR}/$i.nc rm -f ${OUTDIR}/$i.bb.nc rm -f ${OUTDIR}/$i.nc4 diff --git a/benchmarks/FLASH-IO/parallel_run.sh b/benchmarks/FLASH-IO/parallel_run.sh index 414caf1b8..01077bb5f 100755 --- a/benchmarks/FLASH-IO/parallel_run.sh +++ b/benchmarks/FLASH-IO/parallel_run.sh @@ -29,8 +29,14 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -72,6 +78,7 @@ for i in ${check_PROGRAMS} ; do ${MPIRUN} ${NCMPIDIFF} -q ${TESTOUTDIR}/$i.ncmpi_plt_crn_0000.nc ${TESTOUTDIR}/$i.bb.ncmpi_plt_crn_0000.nc fi done + done rm -f ${OUTDIR}/$i.ncmpi_chk_0000.nc rm -f ${OUTDIR}/$i.ncmpi_plt_cnt_0000.nc rm -f ${OUTDIR}/$i.ncmpi_plt_crn_0000.nc diff --git a/examples/C/parallel_run.sh b/examples/C/parallel_run.sh index d4bd55898..b8de2d2d0 100755 --- a/examples/C/parallel_run.sh +++ b/examples/C/parallel_run.sh @@ -32,8 +32,14 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -92,6 +98,7 @@ for i in ${check_PROGRAMS} ; do # Validator does not support nc4 fi + done done # delete output file if test $i = get_vara ; then diff --git a/examples/CXX/parallel_run.sh b/examples/CXX/parallel_run.sh index 99d382727..d656fa31d 100755 --- a/examples/CXX/parallel_run.sh +++ b/examples/CXX/parallel_run.sh @@ -32,8 +32,14 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -90,6 +96,7 @@ for i in ${check_PROGRAMS} ; do # Validator does not support nc4 fi done + done # delete output file if test $i = get_vara ; then rm -f ${OUTDIR}/put_vara.nc diff --git a/examples/F77/parallel_run.sh b/examples/F77/parallel_run.sh index 307565ce6..2dfd97c01 100755 --- a/examples/F77/parallel_run.sh +++ b/examples/F77/parallel_run.sh @@ -32,8 +32,14 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -90,6 +96,7 @@ for i in ${check_PROGRAMS} ; do # Validator does not support nc4 fi done + done # delete output file rm -f ${OUTDIR}/$i.nc rm -f ${OUTDIR}/$i.bb.nc diff --git a/examples/F90/parallel_run.sh b/examples/F90/parallel_run.sh index 1d04709d3..aaa393307 100755 --- a/examples/F90/parallel_run.sh +++ b/examples/F90/parallel_run.sh @@ -32,8 +32,14 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -90,6 +96,7 @@ for i in ${check_PROGRAMS} ; do # Validator does not support nc4 fi done + done # delete output file rm -f ${OUTDIR}/$i.nc rm -f ${OUTDIR}/$i.bb.nc diff --git a/examples/adios/parallel_run.sh b/examples/adios/parallel_run.sh index 2f07ffb05..234eed593 100755 --- a/examples/adios/parallel_run.sh +++ b/examples/adios/parallel_run.sh @@ -28,8 +28,14 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -42,5 +48,6 @@ for i in ${check_PROGRAMS} ; do echo "PASS: C parallel run on $1 processes --------------- $i" fi done + done done diff --git a/examples/burst_buffer/parallel_run.sh b/examples/burst_buffer/parallel_run.sh index cfad91cf4..2e3312b09 100755 --- a/examples/burst_buffer/parallel_run.sh +++ b/examples/burst_buffer/parallel_run.sh @@ -33,8 +33,14 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do # echo "---- exec=$i" for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -49,6 +55,7 @@ for i in ${check_PROGRAMS} ; do ${TESTSEQRUN} ${VALIDATOR} -q ${TESTOUTDIR}/$i.nc # echo "" done + done # delete output files rm -f ${OUTDIR}/$i.nc done diff --git a/examples/tutorial/parallel_run.sh b/examples/tutorial/parallel_run.sh index 0717303f2..b913f0f94 100755 --- a/examples/tutorial/parallel_run.sh +++ b/examples/tutorial/parallel_run.sh @@ -32,8 +32,14 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -136,6 +142,7 @@ for i in ${check_PROGRAMS} ; do # Validator does not support nc4 fi done + done done rm -f ${OUTDIR}/pnetcdf-*.nc diff --git a/src/drivers/common/mem_alloc.c b/src/drivers/common/mem_alloc.c index 5e11700ef..8c433c9a3 100644 --- a/src/drivers/common/mem_alloc.c +++ b/src/drivers/common/mem_alloc.c @@ -132,8 +132,10 @@ void ncmpii_add_mem_entry(void *buf, /*----< ncmpii_del_mem_entry() >---------------------------------------------*/ /* delete a malloc entry from the table */ static -void ncmpii_del_mem_entry(void *buf) +int ncmpii_del_mem_entry(void *buf) { + int err=0; + #ifdef ENABLE_THREAD_SAFE pthread_mutex_lock(&lock); #endif @@ -146,6 +148,7 @@ void ncmpii_del_mem_entry(void *buf) if (ret == NULL) { fprintf(stderr, "Error at line %d file %s: tfind() buf=%p\n", __LINE__,__FILE__,buf); + err = 1; goto fn_exit; } /* free space for func and filename */ @@ -159,6 +162,7 @@ void ncmpii_del_mem_entry(void *buf) if (ret == NULL) { fprintf(stderr, "Error at line %d file %s: tdelete() buf=%p\n", __LINE__,__FILE__,buf); + err = 1; goto fn_exit; } free(tmp); @@ -170,7 +174,7 @@ void ncmpii_del_mem_entry(void *buf) #ifdef ENABLE_THREAD_SAFE pthread_mutex_unlock(&lock); #endif - return; + return err; } #endif @@ -246,7 +250,9 @@ void *NCI_Realloc_fn(void *ptr, } #ifdef PNC_MALLOC_TRACE - ncmpii_del_mem_entry(ptr); + if (ncmpii_del_mem_entry(ptr) != 0) + fprintf(stderr, "realloc failed in file %s func %s line %d\n", + filename, func, lineno); #endif void *buf = (void *) realloc(ptr, size); #ifdef PNETCDF_DEBUG @@ -275,7 +281,9 @@ void NCI_Free_fn(void *ptr, { if (ptr == NULL) return; #ifdef PNC_MALLOC_TRACE - ncmpii_del_mem_entry(ptr); + if (ncmpii_del_mem_entry(ptr) != 0) + fprintf(stderr, "free failed in file %s func %s line %d\n", + filename, func, lineno); #endif free(ptr); } diff --git a/src/drivers/ncmpio/Makefile.am b/src/drivers/ncmpio/Makefile.am index a63197d8c..c1afe76c1 100644 --- a/src/drivers/ncmpio/Makefile.am +++ b/src/drivers/ncmpio/Makefile.am @@ -52,7 +52,8 @@ C_SRCS = ncmpio_driver.c \ ncmpio_fill.c \ ncmpio_util.c \ ncmpio_hash_func.c \ - ncmpio_file_io.c + ncmpio_file_io.c \ + ncmpio_intra_node.c $(M4_SRCS:.m4=.c): Makefile diff --git a/src/drivers/ncmpio/ncmpio_NC.h b/src/drivers/ncmpio/ncmpio_NC.h index e0b03345f..14a2e9129 100644 --- a/src/drivers/ncmpio/ncmpio_NC.h +++ b/src/drivers/ncmpio/ncmpio_NC.h @@ -435,6 +435,17 @@ struct NC { char *path; /* file name */ struct NC *old; /* contains the previous NC during redef. */ + + /* Below are used for intra-node aggregation */ + int num_aggrs_per_node; /* number of aggregators per compute node. Set + through a user hint. 0 to disable the + intra-node aggregation, -1 to let PnetCDF to + decide. This value must be the same among all + processes. + */ + int my_aggr; /* rank ID of my aggregator */ + int num_nonaggrs; /* number of non-aggregators assigned */ + int *nonaggr_ranks; /* ranks of assigned non-aggregators */ }; #define NC_readonly(ncp) fIsSet((ncp)->flags, NC_MODE_RDONLY) @@ -631,4 +642,17 @@ ncmpio_read_write(NC *ncp, int rw_flag, int coll_indep, MPI_Offset offset, MPI_Offset buf_count, MPI_Datatype buf_type, void *buf, int buftype_is_contig); +/* Begin defined in ncmpio_intranode.c --------------------------------------*/ +extern int +ncmpio_intra_node_aggr_init(NC *ncp); + +extern int +ncmpio_intra_node_aggregation_nreqs(NC *ncp, int num_reqs, NC_req *put_list, + MPI_Offset newnumrecs); +extern int +ncmpio_intra_node_aggregation(NC *ncp, NC_var *varp, const MPI_Offset *start, + const MPI_Offset *count, + const MPI_Offset *stride, MPI_Offset bufCount, + MPI_Datatype bufType, void *buf); + #endif /* H_NC */ diff --git a/src/drivers/ncmpio/ncmpio_close.c b/src/drivers/ncmpio/ncmpio_close.c index d62b7c08c..0f2dff772 100644 --- a/src/drivers/ncmpio/ncmpio_close.c +++ b/src/drivers/ncmpio/ncmpio_close.c @@ -48,10 +48,11 @@ ncmpio_free_NC(NC *ncp) */ if (ncp->mpiinfo != MPI_INFO_NULL) MPI_Info_free(&ncp->mpiinfo); - if (ncp->get_list != NULL) NCI_Free(ncp->get_list); - if (ncp->put_list != NULL) NCI_Free(ncp->put_list); - if (ncp->abuf != NULL) NCI_Free(ncp->abuf); - if (ncp->path != NULL) NCI_Free(ncp->path); + if (ncp->get_list != NULL) NCI_Free(ncp->get_list); + if (ncp->put_list != NULL) NCI_Free(ncp->put_list); + if (ncp->abuf != NULL) NCI_Free(ncp->abuf); + if (ncp->path != NULL) NCI_Free(ncp->path); + if (ncp->nonaggr_ranks != NULL) NCI_Free(ncp->nonaggr_ranks); NCI_Free(ncp); } @@ -144,17 +145,13 @@ ncmpio_close(void *ncdp) } #else if (ncp->numLeadGetReqs > 0) { - int rank; - MPI_Comm_rank(ncp->comm, &rank); - printf("PnetCDF warning: %d nonblocking get requests still pending on process %d. Cancelling ...\n",ncp->numLeadGetReqs,rank); + printf("PnetCDF warning: %d nonblocking get requests still pending on process %d. Cancelling ...\n",ncp->numLeadGetReqs,ncp->rank); err = ncmpio_cancel(ncp, NC_GET_REQ_ALL, NULL, NULL); if (status == NC_NOERR) status = err; if (status == NC_NOERR) status = NC_EPENDING; } if (ncp->numLeadPutReqs > 0) { - int rank; - MPI_Comm_rank(ncp->comm, &rank); - printf("PnetCDF warning: %d nonblocking put requests still pending on process %d. Cancelling ...\n",ncp->numLeadPutReqs,rank); + printf("PnetCDF warning: %d nonblocking put requests still pending on process %d. Cancelling ...\n",ncp->numLeadPutReqs,ncp->rank); err = ncmpio_cancel(ncp, NC_PUT_REQ_ALL, NULL, NULL); if (status == NC_NOERR) status = err; if (status == NC_NOERR) status = NC_EPENDING; diff --git a/src/drivers/ncmpio/ncmpio_create.c b/src/drivers/ncmpio/ncmpio_create.c index 02375e97f..a705ed642 100644 --- a/src/drivers/ncmpio/ncmpio_create.c +++ b/src/drivers/ncmpio/ncmpio_create.c @@ -315,6 +315,19 @@ ncmpio_create(MPI_Comm comm, * be '\0' (null character). In this case, safe_mode is enabled */ } + /* determine whether to enable intra-node aggregation and set up all + * intra-node aggregation metadata. + * ncp->num_aggrs_per_node = 0, or non-zero indicates whether this feature + * is enabled globally for all processes. + * ncp->my_aggr = -1 or >= 0 indicates whether aggregation is effectively + * enabled for the aggregation group of this process. + */ + ncp->my_aggr = -1; + if (ncp->num_aggrs_per_node != 0) { + err = ncmpio_intra_node_aggr_init(ncp); + if (err != NC_NOERR) return err; + } + *ncpp = (void*)ncp; return NC_NOERR; diff --git a/src/drivers/ncmpio/ncmpio_enddef.c b/src/drivers/ncmpio/ncmpio_enddef.c index f6d719831..4eca5f6ab 100644 --- a/src/drivers/ncmpio/ncmpio_enddef.c +++ b/src/drivers/ncmpio/ncmpio_enddef.c @@ -266,7 +266,7 @@ move_record_vars(NC *ncp, NC *old) { static int NC_begins(NC *ncp) { - int i, j, rank, mpireturn; + int i, j, mpireturn; MPI_Offset end_var=0; NC_var *last = NULL; NC_var *first_var = NULL; /* first "non-record" var */ @@ -276,7 +276,6 @@ NC_begins(NC *ncp) */ /* get the true header size (not header extent) */ - MPI_Comm_rank(ncp->comm, &rank); ncp->xsz = ncmpio_hdr_len_NC(ncp); if (ncp->safe_mode && ncp->nprocs > 1) { @@ -489,15 +488,13 @@ NC_begins(NC *ncp) static int write_NC(NC *ncp) { - int status=NC_NOERR, mpireturn, err, rank, is_coll; + int status=NC_NOERR, mpireturn, err, is_coll; MPI_Offset i, header_wlen, ntimes; MPI_File fh; MPI_Status mpistatus; assert(!NC_readonly(ncp)); - MPI_Comm_rank(ncp->comm, &rank); - /* Depending on whether NC_HCOLL is set, writing file header can be done * through either MPI collective or independent write call. * When * ncp->nprocs == 1, ncp->collective_fh == ncp->independent_fh @@ -535,7 +532,7 @@ write_NC(NC *ncp) if (header_wlen % NC_MAX_INT) ntimes++; /* only rank 0's header gets written to the file */ - if (rank == 0) { + if (ncp->rank == 0) { char *buf=NULL, *buf_ptr; MPI_Offset offset, remain; diff --git a/src/drivers/ncmpio/ncmpio_file_io.c b/src/drivers/ncmpio/ncmpio_file_io.c index 4b4a30632..c1b732378 100644 --- a/src/drivers/ncmpio/ncmpio_file_io.c +++ b/src/drivers/ncmpio/ncmpio_file_io.c @@ -50,8 +50,13 @@ ncmpio_read_write(NC *ncp, /* return the first encountered error if there is any */ err = (err == NC_EFILE) ? NC_EREAD : err; } - else if (btype_size == MPI_UNDEFINED) + else if (btype_size == MPI_UNDEFINED) { +#ifdef PNETCDF_DEBUG + fprintf(stderr,"%d: %s line %d: btype_size MPI_UNDEFINED buf_count=%lld\n", + ncp->rank, __func__,__LINE__,buf_count); +#endif DEBUG_ASSIGN_ERROR(err, NC_EINTOVERFLOW) + } if (err != NC_NOERR) { if (coll_indep == NC_REQ_COLL) { @@ -96,8 +101,15 @@ ncmpio_read_write(NC *ncp, MPI_Type_commit(&xbuf_type); xlen = 1; #else - if (coll_indep == NC_REQ_COLL) + if (coll_indep == NC_REQ_COLL) { +#ifdef PNETCDF_DEBUG + fprintf(stderr,"%d: %s line %d: NC_EINTOVERFLOW buf_count=%lld\n", + ncp->rank, __func__,__LINE__,buf_count); +#endif DEBUG_ASSIGN_ERROR(status, NC_EINTOVERFLOW) + /* write nothing, but participate the collective call */ + xlen = 0; + } else DEBUG_RETURN_ERROR(NC_EINTOVERFLOW) #endif @@ -215,8 +227,15 @@ ncmpio_read_write(NC *ncp, MPI_Type_commit(&xbuf_type); xlen = 1; #else - if (coll_indep == NC_REQ_COLL) + if (coll_indep == NC_REQ_COLL) { +#ifdef PNETCDF_DEBUG + fprintf(stderr,"%d: %s line %d: NC_EINTOVERFLOW buf_count=%lld\n", + ncp->rank, __func__,__LINE__,buf_count); +#endif DEBUG_ASSIGN_ERROR(status, NC_EINTOVERFLOW) + /* write nothing, but participate the collective call */ + xlen = 0; + } else DEBUG_RETURN_ERROR(NC_EINTOVERFLOW) #endif @@ -244,8 +263,10 @@ ncmpio_read_write(NC *ncp, mpireturn = MPI_Type_contiguous_c((MPI_Count)req_size, MPI_BYTE, &xbuf_type); if (mpireturn != MPI_SUCCESS) { err = ncmpii_error_mpi2nc(mpireturn, "MPI_Type_contiguous_c"); - if (coll_indep == NC_REQ_COLL) + if (coll_indep == NC_REQ_COLL) { DEBUG_ASSIGN_ERROR(status, err) + xlen = 0; + } else DEBUG_RETURN_ERROR(err) } diff --git a/src/drivers/ncmpio/ncmpio_file_misc.c b/src/drivers/ncmpio/ncmpio_file_misc.c index ef5122fe9..25671c9d6 100644 --- a/src/drivers/ncmpio/ncmpio_file_misc.c +++ b/src/drivers/ncmpio/ncmpio_file_misc.c @@ -59,6 +59,12 @@ dup_NC(const NC *ref) return NULL; } + if (ref->nonaggr_ranks != NULL) { + size_t len = sizeof(int) * ncp->num_nonaggrs; + ncp->nonaggr_ranks = (int*) NCI_Malloc(len); + memcpy(ncp->nonaggr_ranks, ref->nonaggr_ranks, len); + } + /* fields below should not copied from ref */ ncp->comm = MPI_COMM_NULL; ncp->mpiinfo = MPI_INFO_NULL; diff --git a/src/drivers/ncmpio/ncmpio_filetype.c b/src/drivers/ncmpio/ncmpio_filetype.c index 17e0109af..589a08dc5 100644 --- a/src/drivers/ncmpio/ncmpio_filetype.c +++ b/src/drivers/ncmpio/ncmpio_filetype.c @@ -620,7 +620,7 @@ ncmpio_file_set_view(const NC *ncp, MPI_Offset *offset, /* IN/OUT */ MPI_Datatype filetype) { - int rank, err, mpireturn, status=NC_NOERR; + int err, mpireturn, status=NC_NOERR; if (filetype == MPI_BYTE) { /* filetype is a contiguous space, make the whole file visible */ @@ -629,8 +629,7 @@ ncmpio_file_set_view(const NC *ncp, return NC_NOERR; } - MPI_Comm_rank(ncp->comm, &rank); - if (rank == 0) { + if (ncp->rank == 0) { /* prepend the whole file header to filetype */ MPI_Datatype root_filetype=MPI_BYTE, ftypes[2]; #ifdef HAVE_MPI_LARGE_COUNT diff --git a/src/drivers/ncmpio/ncmpio_fill.c b/src/drivers/ncmpio/ncmpio_fill.c index 06391f15a..d96eb9c25 100644 --- a/src/drivers/ncmpio/ncmpio_fill.c +++ b/src/drivers/ncmpio/ncmpio_fill.c @@ -144,16 +144,13 @@ fill_var_rec(NC *ncp, NC_var *varp, MPI_Offset recno) /* record number */ { - int err, status=NC_NOERR, mpireturn, rank, nprocs; + int err, status=NC_NOERR, mpireturn; void *buf; MPI_Offset var_len, start, count, offset; MPI_File fh; MPI_Status mpistatus; MPI_Datatype bufType; - MPI_Comm_rank(ncp->comm, &rank); - MPI_Comm_size(ncp->comm, &nprocs); - if (varp->ndims == 0) /* scalar variable */ var_len = 1; else if (varp->ndims == 1 && IS_RECVAR(varp)) @@ -164,14 +161,14 @@ fill_var_rec(NC *ncp, var_len = varp->dsizes[0]; /* divide total number of elements of this variable among all processes */ - count = var_len / nprocs; - start = count * rank; - if (rank < var_len % nprocs) { - start += rank; + count = var_len / ncp->nprocs; + start = count * ncp->rank; + if (ncp->rank < var_len % ncp->nprocs) { + start += ncp->rank; count++; } else { - start += var_len % nprocs; + start += var_len % ncp->nprocs; } /* allocate buffer space */ @@ -191,7 +188,7 @@ fill_var_rec(NC *ncp, offset += ncp->recsize * recno; offset += start * varp->xsz; - /* when nprocs == 1, we keep I/O mode in independent mode at all time */ + /* when ncp->nprocs == 1, we keep I/O mode in independent mode at all time */ fh = ncp->collective_fh; /* make the entire file visible */ @@ -360,7 +357,7 @@ fill_added_recs(NC *ncp, NC *old_ncp) static int fillerup_aggregate(NC *ncp, NC *old_ncp) { - int i, j, k, rank, nprocs, mpireturn, err, status=NC_NOERR; + int i, j, k, mpireturn, err, status=NC_NOERR; int start_vid, recno, nVarsFill; char *buf_ptr, *noFill; void *buf; @@ -378,9 +375,6 @@ fillerup_aggregate(NC *ncp, NC *old_ncp) MPI_Aint *offset; #endif - MPI_Comm_rank(ncp->comm, &rank); - MPI_Comm_size(ncp->comm, &nprocs); - /* find the starting vid for newly added variables */ start_vid = 0; nrecs = 0; /* the current number of records */ @@ -446,14 +440,14 @@ fillerup_aggregate(NC *ncp, NC *old_ncp) else var_len = varp->dsizes[0]; /* divide evenly total number of variable's elements among processes */ - count[j] = var_len / nprocs; - start = count[j] * rank; - if (rank < var_len % nprocs) { - start += rank; + count[j] = var_len / ncp->nprocs; + start = count[j] * ncp->rank; + if (ncp->rank < var_len % ncp->nprocs) { + start += ncp->rank; count[j]++; } else - start += var_len % nprocs; + start += var_len % ncp->nprocs; /* calculate the starting file offset */ start *= varp->xsz; @@ -483,14 +477,14 @@ fillerup_aggregate(NC *ncp, NC *old_ncp) else var_len = varp->dsizes[1]; /* divide total number of variable's elements among all processes */ - count[j] = var_len / nprocs; - start = count[j] * rank; - if (rank < var_len % nprocs) { - start += rank; + count[j] = var_len / ncp->nprocs; + start = count[j] * ncp->rank; + if (ncp->rank < var_len % ncp->nprocs) { + start += ncp->rank; count[j]++; } else - start += var_len % nprocs; + start += var_len % ncp->nprocs; /* calculate the starting file offset */ start *= varp->xsz; diff --git a/src/drivers/ncmpio/ncmpio_getput.m4 b/src/drivers/ncmpio/ncmpio_getput.m4 index e0a083c61..5fa8aecea 100644 --- a/src/drivers/ncmpio/ncmpio_getput.m4 +++ b/src/drivers/ncmpio/ncmpio_getput.m4 @@ -117,7 +117,7 @@ put_varm(NC *ncp, { void *xbuf=NULL; int mpireturn, err=NC_NOERR, status=NC_NOERR, buftype_is_contig; - int el_size, need_convert, need_swap, need_swap_back_buf=0; + int el_size, need_convert=0, need_swap=0, need_swap_back_buf=0; int coll_indep, xtype_is_contig=1, can_swap_in_place; MPI_Offset nelems=0, bnelems=0, nbytes=0, offset=0; MPI_Datatype itype, xtype=MPI_BYTE, imaptype, filetype=MPI_BYTE; @@ -135,10 +135,19 @@ put_varm(NC *ncp, * el_size: byte size of itype * buftype_is_contig: whether buftype is contiguous */ - err = ncmpii_buftype_decode(varp->ndims, varp->xtype, count, bufcount, - buftype, &itype, &el_size, &bnelems, - &nbytes, &buftype_is_contig); - if (err != NC_NOERR) goto err_check; + if (varp == NULL) { /* zero-sized request */ + itype = MPI_BYTE; + el_size = 0; + bnelems = 0; + nbytes = 0; + buftype_is_contig = 0; + } + else { + err = ncmpii_buftype_decode(varp->ndims, varp->xtype, count, bufcount, + buftype, &itype, &el_size, &bnelems, + &nbytes, &buftype_is_contig); + if (err != NC_NOERR) goto err_check; + } xtype_is_contig = buftype_is_contig; if (buftype == MPI_DATATYPE_NULL) { /* buftype and bufcount are ignored */ @@ -165,8 +174,10 @@ put_varm(NC *ncp, goto err_check; /* check if type conversion and Endianness byte swap is needed */ - need_convert = ncmpii_need_convert(ncp->format, varp->xtype, itype); - need_swap = NEED_BYTE_SWAP(varp->xtype, itype); + if (varp != NULL) { /* non-zero-sized request */ + need_convert = ncmpii_need_convert(ncp->format, varp->xtype, itype); + need_swap = NEED_BYTE_SWAP(varp->xtype, itype); + } /* check if in-place byte swap can be enabled */ can_swap_in_place = 1; @@ -182,8 +193,11 @@ put_varm(NC *ncp, /* check whether this is a true varm call, if yes, imaptype will be a * newly created MPI derived data type, otherwise MPI_DATATYPE_NULL */ - err = ncmpii_create_imaptype(varp->ndims, count, imap, itype, &imaptype); - if (err != NC_NOERR) goto err_check; + imaptype = MPI_DATATYPE_NULL; + if (varp != NULL) { /* non-zero-sized request */ + err = ncmpii_create_imaptype(varp->ndims, count, imap, itype, &imaptype); + if (err != NC_NOERR) goto err_check; + } if (!need_convert && imaptype == MPI_DATATYPE_NULL && (!need_swap || (can_swap_in_place && buftype_is_contig))) { @@ -194,7 +208,7 @@ put_varm(NC *ncp, need_swap_back_buf = 1; } } - else { + else if (varp != NULL) { xbuf = NCI_Malloc((size_t)nbytes); if (xbuf == NULL) { DEBUG_ASSIGN_ERROR(err, NC_ENOMEM) @@ -217,7 +231,7 @@ put_varm(NC *ncp, } /* Set nelems and xtype which will be used in MPI read/write */ - if (buf != xbuf) { + if (buf != xbuf && varp != NULL) { /* xbuf is a contiguous buffer */ xtype = ncmpii_nc2mpitype(varp->xtype); nelems = bnelems; @@ -244,50 +258,59 @@ err_check: filetype = MPI_BYTE; xtype = MPI_BYTE; } + + if (fIsSet(reqMode, NC_REQ_COLL) && ncp->my_aggr >= 0 && ncp->nprocs > 1) { + /* intra-node write aggregation must be in collective mode */ + void *wbuf = (nbytes == 0) ? NULL : xbuf; + err = ncmpio_intra_node_aggregation(ncp, varp, start, count, stride, nelems, xtype, wbuf); + if (status == NC_NOERR) status = err; + } else { - /* Create the filetype for this request and calculate the beginning - * file offset for this request. If this request is contiguous in file, - * then set filetype == MPI_BYTE. Otherwise filetype will be an MPI - * derived data type. + if (nbytes > 0) { + /* Create the filetype for this request and calculate the beginning + * file offset for this request. If this request is contiguous in + * file, then set filetype == MPI_BYTE. Otherwise filetype will + * be an MPI derived data type. + */ + err = ncmpio_filetype_create_vars(ncp, varp, start, count, stride, + &offset, &filetype, NULL); + if (err != NC_NOERR) { + nbytes = 0; + nelems = 0; + filetype = MPI_BYTE; + xtype = MPI_BYTE; + if (status == NC_NOERR) status = err; + } + } + + /* TODO: if record variables are too big to store the stride between + * records in an MPI_Aint, then we will have to process this one record + * at a time. */ - err = ncmpio_filetype_create_vars(ncp, varp, start, count, stride, - &offset, &filetype, NULL); + + fh = ncp->independent_fh; + coll_indep = NC_REQ_INDEP; + if (ncp->nprocs > 1 && fIsSet(reqMode, NC_REQ_COLL)) { + fh = ncp->collective_fh; + coll_indep = NC_REQ_COLL; + } + + /* MPI_File_set_view is collective */ + err = ncmpio_file_set_view(ncp, fh, &offset, filetype); if (err != NC_NOERR) { - nbytes = 0; - nelems = 0; - filetype = MPI_BYTE; - xtype = MPI_BYTE; + nelems = 0; /* skip this request */ if (status == NC_NOERR) status = err; } - } - - /* TODO: if record variables are too big (so big that we cannot store the - * stride between records in an MPI_Aint, for example) then we will - * have to process this one record at a time. - */ + if (filetype != MPI_BYTE) MPI_Type_free(&filetype); - fh = ncp->independent_fh; - coll_indep = NC_REQ_INDEP; - if (ncp->nprocs > 1 && fIsSet(reqMode, NC_REQ_COLL)) { - fh = ncp->collective_fh; - coll_indep = NC_REQ_COLL; - } - - /* MPI_File_set_view is collective */ - err = ncmpio_file_set_view(ncp, fh, &offset, filetype); - if (err != NC_NOERR) { - nelems = 0; /* skip this request */ + /* xtype is the element data type (MPI primitive type) in xbuf to be + * written to the variable defined in file. Note data stored in xbuf + * is in the external data type, ready to be written to file. + */ + err = ncmpio_read_write(ncp, NC_REQ_WR, coll_indep, offset, nelems, + xtype, xbuf, xtype_is_contig); if (status == NC_NOERR) status = err; } - if (filetype != MPI_BYTE) MPI_Type_free(&filetype); - - /* xtype is the element data type (MPI primitive type) in xbuf to be - * written to the variable defined in file. Note data stored in xbuf is in - * the external data type, ready to be written to file. - */ - err = ncmpio_read_write(ncp, NC_REQ_WR, coll_indep, offset, nelems, xtype, - xbuf, xtype_is_contig); - if (status == NC_NOERR) status = err; /* done with xbuf */ if (xbuf != NULL && xbuf != buf) NCI_Free(xbuf); @@ -296,12 +319,12 @@ err_check: ncmpii_in_swapn(buf, bnelems, varp->xsz); /* for record variable, update number of records */ - if (IS_RECVAR(varp)) { + if (varp != NULL && IS_RECVAR(varp)) { /* update header's number of records in memory */ MPI_Offset new_numrecs = ncp->numrecs; /* calculate the max record ID written by this request */ - if (status == NC_NOERR || status == NC_ERANGE) { + if (nelems > 0 && (status == NC_NOERR || status == NC_ERANGE)) { if (stride == NULL) new_numrecs = start[0] + count[0]; else @@ -571,9 +594,17 @@ ncmpio_$1_var(void *ncdp, /* sanity check has been done at dispatchers */ - if (fIsSet(reqMode, NC_REQ_ZERO) && fIsSet(reqMode, NC_REQ_COLL)) + if (fIsSet(reqMode, NC_REQ_ZERO) && fIsSet(reqMode, NC_REQ_COLL)) { + /* In case some processes in an aggregation group have nothing to + * write, they still need to participate the communication part of the + * intra-node aggregation operation. + */ + ifelse(`$1',`put',`if (ncp->my_aggr >= 0) + return $1_varm(ncp, NULL, NULL, NULL, NULL, imap, NULL, 0, buftype, reqMode);') + /* this collective API has a zero-length request */ return ncmpio_getput_zero_req(ncp, reqMode); + } /* obtain NC_var object pointer, varp. Note sanity check for ncdp and * varid has been done in dispatchers */ diff --git a/src/drivers/ncmpio/ncmpio_intra_node.c b/src/drivers/ncmpio/ncmpio_intra_node.c new file mode 100644 index 000000000..058fd642e --- /dev/null +++ b/src/drivers/ncmpio/ncmpio_intra_node.c @@ -0,0 +1,1339 @@ +/* + * Copyright (C) 2024, Northwestern University and Argonne National Laboratory + * See COPYRIGHT notice in top-level directory. + */ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include +#include +#include /* strcmp() strdup() */ +#include +#include +#include + +#include +#include +#include "ncmpio_NC.h" + +#ifdef HAVE_MPI_LARGE_COUNT +#define SWAP(offsets, lengths, bufAddr, x, y) { \ + MPI_Count aint; \ + MPI_Count cint; \ + MPI_Count d0 = (x) - offsets; \ + MPI_Count d1 = (y) - offsets; \ + if (d0 != d1) { \ + cint = *(x) ; *(x) = *(y) ; *(y) = cint ; \ + cint = lengths[d0] ; lengths[d0] = lengths[d1] ; lengths[d1] = cint ; \ + aint = bufAddr[d0] ; bufAddr[d0] = bufAddr[d1] ; bufAddr[d1] = aint ; \ + } \ +} +#else +#define SWAP(offsets, lengths, bufAddr, x, y) { \ + int int4; \ + MPI_Aint aint; \ + MPI_Aint d0 = (x) - offsets; \ + MPI_Aint d1 = (y) - offsets; \ + if (d0 != d1) { \ + aint = *(x) ; *(x) = *(y) ; *(y) = aint ; \ + int4 = lengths[d0] ; lengths[d0] = lengths[d1] ; lengths[d1] = int4 ; \ + aint = bufAddr[d0] ; bufAddr[d0] = bufAddr[d1] ; bufAddr[d1] = aint ; \ + } \ +} +#endif + +#define MEDIAN(a,b,c) ((*(a) < *(b)) ? \ + ((*(b) < *(c)) ? (b) : ((*(a) < *(c)) ? (c) : (a))) : \ + ((*(b) > *(c)) ? (b) : ((*(a) < *(c)) ? (a) : (c)))) + +static void +qsort_off_len_buf(MPI_Aint num, +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Count *offsets, + MPI_Count *lengths, +#else + MPI_Aint *offsets, + int *lengths, +#endif + MPI_Aint *bufAddr) +{ +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Count *pa, *pb, *pc, *pd, *pl, *pm, *pn, cmp_result, swap_cnt; +#else + MPI_Aint *pa, *pb, *pc, *pd, *pl, *pm, *pn, cmp_result, swap_cnt; +#endif + MPI_Aint i, r; + + while (1) { + swap_cnt = 0; + pm = offsets + (num / 2); + if (num > 7) { + pl = offsets; + pn = offsets + (num - 1); + if (num > 40) { + size_t d = (num / 8); + pl = MEDIAN(pl, pl + d, pl + 2 * d); + pm = MEDIAN(pm - d, pm, pm + d); + pn = MEDIAN(pn - 2 * d, pn - d, pn); + } + pm = MEDIAN(pl, pm, pn); + } + SWAP(offsets, lengths, bufAddr, offsets, pm); + pa = pb = offsets; + + pc = pd = offsets + (num - 1); + for (;;) { + while (pb <= pc && (cmp_result = (*pb - *offsets)) <= 0) { + if (cmp_result == 0) { + swap_cnt = 1; + SWAP(offsets, lengths, bufAddr, pa, pb); + pa++; + } + pb++; + } + while (pb <= pc && (cmp_result = (*pc - *offsets)) >= 0) { + if (cmp_result == 0) { + swap_cnt = 1; + SWAP(offsets, lengths, bufAddr, pc, pd); + pd--; + } + pc--; + } + if (pb > pc) + break; + SWAP(offsets, lengths, bufAddr, pb, pc); + swap_cnt = 1; + pb++; + pc--; + } + if (swap_cnt == 0) { /* Switch to insertion sort */ + for (pm = offsets; pm < offsets + num; pm++) + for (pl = pm; pl > offsets && (*(pl-1) > *pl); pl--) + SWAP(offsets, lengths, bufAddr, pl, pl-1); + return; + } + + pn = offsets + num; + r = MIN(pa - offsets, pb - pa); + for (i=0; i 1) + qsort_off_len_buf(r, offsets, lengths, bufAddr); + if ((r = pd - pc) > 1) { + /* Iterate rather than recurse to save stack space */ + lengths = lengths + (num - r); + bufAddr = bufAddr + (num - r); + offsets = pn - r; + num = r; + } + else + break; + } +} + +/*----< ncmpio_init_intra_node_aggr() >--------------------------------------*/ +/* When intra-node write aggregation is enabled, processes on the same node + * will be divided into groups. The number of groups is the number of + * aggregators on that node. The rank IDs of each group must be established. + * + * 1. Find information about MPI processes and their affinity to compute node. + * 2. Determine whether self process is an intra-node aggregator. + * 3. For an aggregator, find the number of non-aggregators assigned to it and + * construct rank IDs of assigned non-aggregators. + * 4. For a non-aggregator, find the rank ID of its assigned aggregator. + */ +int +ncmpio_intra_node_aggr_init(NC *ncp) +{ + char my_procname[MPI_MAX_PROCESSOR_NAME], **all_procnames=NULL; + int i, j, k, my_procname_len, num_nodes, root=0; + int *node_ids=NULL, *all_procname_lens=NULL, *nprocs_per_node; + int naggrs_my_node, num_nonaggrs; + int my_rank_index, *ranks_my_node, my_node_id, nprocs_my_node; + + /* initialize parameters of local-node aggregation */ + ncp->my_aggr = -1; /* rank ID of my aggregator */ + ncp->num_nonaggrs = 0; /* number of non-aggregators assigned */ + ncp->nonaggr_ranks = NULL; /* ranks of assigned non-aggregators */ + + if (ncp->num_aggrs_per_node == 0 || ncp->num_aggrs_per_node == ncp->nprocs) + /* disable intra-node aggregation */ + return NC_NOERR; + + /* allocate space for storing the rank IDs of non-aggregators assigned to + * this rank. Note ncp->nonaggr_ranks[] will be freed when closing the + * file, if allocated. + */ + num_nonaggrs = ncp->nprocs / ncp->num_aggrs_per_node + 1; + ncp->nonaggr_ranks = (int*) NCI_Malloc(sizeof(int) * num_nonaggrs); + + /* Collect info about compute nodes in order to select I/O aggregators. + * Note my_procname is null character terminated, but my_procname_len + * does not include the null character. + */ + MPI_Get_processor_name(my_procname, &my_procname_len); + my_procname_len++; /* to include terminate null character */ + + if (ncp->rank == root) { + /* root collects all procnames */ + all_procnames = (char **) NCI_Malloc(sizeof(char*) * ncp->nprocs); + if (all_procnames == NULL) + DEBUG_RETURN_ERROR(NC_ENOMEM) + + all_procname_lens = (int *) NCI_Malloc(sizeof(int) * ncp->nprocs); + if (all_procname_lens == NULL) { + NCI_Free(all_procnames); + DEBUG_RETURN_ERROR(NC_ENOMEM) + } + } + /* gather process name lengths from all processes first */ + MPI_Gather(&my_procname_len, 1, MPI_INT, all_procname_lens, 1, MPI_INT, + root, ncp->comm); + + if (ncp->rank == root) { + int *disp; + size_t alloc_size = 0; + + for (i=0; inprocs; i++) + alloc_size += all_procname_lens[i]; + + all_procnames[0] = (char *) NCI_Malloc(alloc_size); + if (all_procnames[0] == NULL) { + NCI_Free(all_procname_lens); + NCI_Free(all_procnames); + DEBUG_RETURN_ERROR(NC_ENOMEM) + } + + /* Construct displacement array for the MPI_Gatherv, as each process + * may have a different length for its process name. + */ + disp = (int *) NCI_Malloc(sizeof(int) * ncp->nprocs); + disp[0] = 0; + for (i=1; inprocs; i++) { + all_procnames[i] = all_procnames[i - 1] + all_procname_lens[i - 1]; + disp[i] = disp[i - 1] + all_procname_lens[i - 1]; + } + + /* gather all process names */ + MPI_Gatherv(my_procname, my_procname_len, MPI_CHAR, + all_procnames[0], all_procname_lens, disp, MPI_CHAR, + root, ncp->comm); + + NCI_Free(disp); + NCI_Free(all_procname_lens); + } else + /* send process name to root */ + MPI_Gatherv(my_procname, my_procname_len, MPI_CHAR, + NULL, NULL, NULL, MPI_CHAR, root, ncp->comm); + + /* each MPI process's compute node ID */ + node_ids = (int *) NCI_Malloc(sizeof(int) * ncp->nprocs); + + if (ncp->rank == root) { + /* all_procnames[] can tell us the number of nodes and number of + * processes per node. + */ + char **node_names; + int last; + + /* array of pointers pointing to unique host names (compute nodes) */ + node_names = (char **) NCI_Malloc(sizeof(char*) * ncp->nprocs); + + /* number of MPI processes running on each node */ + nprocs_per_node = (int *) NCI_Malloc(sizeof(int) * ncp->nprocs); + + /* calculate nprocs_per_node[] and node_ids[] */ + last = 0; + num_nodes = 0; /* number of unique compute nodes */ + for (i=0; inprocs; i++) { + k = last; + for (j=0; jnprocs, MPI_INT, root, ncp->comm); + + /* my_node_id is this rank's node ID */ + my_node_id = node_ids[ncp->rank]; + + /* nprocs_my_node: the number of processes in my nodes + * ranks_my_node[]: rank IDs of all processes in my node. + * my_rank_index points to ranks_my_node[] where + * ranks_my_node[my_rank_index] == ncp->rank + */ + ranks_my_node = (int*) NCI_Malloc(sizeof(int) * ncp->nprocs); + my_rank_index = -1; + nprocs_my_node = 0; + for (i=0; inprocs; i++) { + if (node_ids[i] == my_node_id) { + if (i == ncp->rank) + my_rank_index = nprocs_my_node; + ranks_my_node[nprocs_my_node] = i; + nprocs_my_node++; + } + } + assert(my_rank_index >= 0); + + /* Now, ranks_my_node[my_rank_index] == ncp->rank */ + + NCI_Free(node_ids); + + /* make sure number of aggregators in my node <= nprocs_my_node */ + naggrs_my_node = MIN(ncp->num_aggrs_per_node, nprocs_my_node); + + /* calculate the number of non-aggregators assigned to an aggregator. + * Note num_nonaggrs includes self. + */ + num_nonaggrs = nprocs_my_node / naggrs_my_node; + if (nprocs_my_node % naggrs_my_node) num_nonaggrs++; + + if (num_nonaggrs == 1) + /* disable aggregation if the number of non-aggregators assigned to + * this aggregator is 1. Note num_nonaggrs includes self. It is + * possible for aggregation enabled or disabled on different nodes and + * even different aggregation groups on the same node. + * + * Use whether ncp->my_aggr < 0 to tell if aggregation is disabled or + * enabled. + */ + ncp->my_aggr = -1; + else { + /* find the rank ID of aggregator assigned to this rank */ + ncp->my_aggr = ranks_my_node[my_rank_index - my_rank_index % num_nonaggrs]; + + if (ncp->my_aggr == ncp->rank) { /* this rank is an aggregator */ + /* Set the number of non-aggregators assigned to this rank. For the + * last group, make sure it does not go beyond nprocs_my_node. + */ + ncp->num_nonaggrs = MIN(num_nonaggrs, nprocs_my_node - my_rank_index); + if (ncp->num_nonaggrs == 1) + /* disable aggregation, as this aggregation group contains only + * self rank + */ + ncp->my_aggr = -1; + else + /* copy the rank IDs over to ncp->nonaggr_ranks[] */ + memcpy(ncp->nonaggr_ranks, + ranks_my_node + my_rank_index, + sizeof(int) * num_nonaggrs); + } + } + NCI_Free(ranks_my_node); + + if (ncp->my_aggr < 0) { + /* free ncp->nonaggr_ranks if aggregation is not enabled */ + NCI_Free(ncp->nonaggr_ranks); + ncp->nonaggr_ranks = NULL; + } + + /* TODO: For automatically determine Whether to enable intra-node write + * aggregation, this should be done right before each collective write + * call. + * 1. obtain hint cb_noddes, and striping_unit + * 2. calculate aggregate access region + * In each round of two-phase I/O, when the number of senders to each + * cb_nodes is very large, then intra-node aggregation should be enabled. + * Average of all nprocs_per_node may be a factor for determining whether + * to enable intra-node aggregation. It indicates whether the high number + * of processes are allocated on the same node. + */ + + return NC_NOERR; +} + +/*----< flatten_subarray() >-------------------------------------------------*/ +/* flatten a subarray request into a list of offset-length pairs */ +static int +flatten_subarray(int ndim, /* number of dimensions */ + int el_size, /* array element size */ + MPI_Offset var_begin, /* starting file offset */ + const MPI_Offset *dimlen, /* [ndim] dimension lengths */ + const MPI_Offset *start, /* [ndim] starts of subarray */ + const MPI_Offset *count, /* [ndim] counts of subarray */ + const MPI_Offset *stride, /* [ndim] strides of subarray */ + MPI_Aint *npairs, /* OUT: num of off-len pairs */ +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Count *offsets, /* OUT: array of offsets */ + MPI_Count *lengths /* OUT: array of lengths */ +#else + MPI_Aint *offsets, /* OUT: array of offsets */ + int *lengths /* OUT: array of lengths */ +#endif + ) +{ + int i, j; + MPI_Offset length, nstride, array_len, off, subarray_len; + size_t idx=0, idx0; + + *npairs = 0; + if (ndim < 0) return NC_NOERR; + + if (ndim == 0) { /* scalar record variable */ + *npairs = 1; + offsets[0] = var_begin; + lengths[0] = el_size; + return NC_NOERR; + } + + /* TODO: check if all stride[] >= 1 + Q: Is it legal if any stride[] <= 0 ? */ + + /* calculate the number of offset-length pairs */ + *npairs = (stride[ndim-1] == 1) ? 1 : count[ndim-1]; + for (i=0; i 0) { + /* array_len is global array size from lowest up to ndim */ + array_len *= dimlen[ndim]; + + /* off is the global array offset for this dimension, ndim-1 */ + off = start[ndim-1] * array_len * el_size; + + /* update all offsets from lowest up to dimension ndim-1 */ + idx0 = 0; + for (j=0; j-----------------------------------------------------*/ +/* flatten one write request into offset-length pairs. + * offsets and lengths are allocated here and need to be freed by the caller + */ +static int +flatten_req(NC *ncp, + NC_var *varp, + const MPI_Offset *start, + const MPI_Offset *count, + const MPI_Offset *stride, + MPI_Aint *num_pairs, /* OUT: number of off-len pairs */ +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Count **offsets, /* OUT: array of flattened offsets */ + MPI_Count **lengths /* OUT: array of flattened lengths */ +#else + MPI_Aint **offsets, /* OUT: array of flattened offsets */ + int **lengths /* OUT: array of flattened lengths */ +#endif + ) +{ + int j, err=NC_NOERR, ndims, count0; + MPI_Aint num, idx; + MPI_Offset var_begin, *shape, *ones=NULL; + + *num_pairs = 0; /* total number of offset-length pairs */ + + /* Count the number off-len pairs, so we can malloc a contiguous memory + * space for storing off-len pairs + */ + if (varp->ndims == 0) { /* scalar variable */ +#ifdef HAVE_MPI_LARGE_COUNT + *offsets = (MPI_Count*)NCI_Malloc(sizeof(MPI_Count)); + *lengths = (MPI_Count*)NCI_Malloc(sizeof(MPI_Count)); +#else + *offsets = (MPI_Aint*)NCI_Malloc(sizeof(MPI_Aint)); + *lengths = (int*) NCI_Malloc(sizeof(int)); +#endif + (*offsets)[0] = varp->begin; + (*lengths)[0] = varp->xsz; + *num_pairs = 1; + return NC_NOERR; + } + else if (varp->ndims == 1 && IS_RECVAR(varp)) { /* scalar variable */ + num = count[0]; + } + else { + num = 1; + if (stride != NULL && stride[varp->ndims-1] > 1) + num = count[varp->ndims-1]; /* count of last dimension */ + for (j=0; jndims-1; j++) + num *= count[j]; /* all count[] except the last dimension */ + } + *num_pairs = num; + +#ifdef HAVE_MPI_LARGE_COUNT + *offsets = (MPI_Count*)NCI_Malloc(sizeof(MPI_Count) * num); + *lengths = (MPI_Count*)NCI_Malloc(sizeof(MPI_Count) * num); +#else + *offsets = (MPI_Aint*)NCI_Malloc(sizeof(MPI_Aint) * num); + *lengths = (int*) NCI_Malloc(sizeof(int) * num); +#endif + + if (stride == NULL) { /* equivalent to {1, 1, ..., 1} */ + ones = (MPI_Offset*) NCI_Malloc(sizeof(MPI_Offset) * varp->ndims); + for (j=0; jndims; j++) ones[j] = 1; + } + + ndims = varp->ndims; + var_begin = varp->begin; + shape = varp->shape; + if (IS_RECVAR(varp)) { + count0 = count[0]; + var_begin += start[0] * ncp->recsize; + ndims--; + start++; + count++; + shape++; + if (stride != NULL) stride++; + } + else + count0 = 1; + + idx = 0; + for (j=0; jxsz, var_begin, shape, + start, count, (stride == NULL) ? ones : stride, + &num, /* OUT: num of off-len pairs */ + *offsets + idx, /* OUT: array of offsets */ + *lengths + idx); /* OUT: array of lengths */ + idx += num; +assert(idx <= *num_pairs); + + if (IS_RECVAR(varp)) + var_begin += ncp->recsize; + } + if (ones != NULL) + NCI_Free(ones); + + return err; +} + +/*----< flatten_reqs() >-----------------------------------------------------*/ +/* flatten all write requests into offset-length pairs. + * offsets and lengths are allocated here and need to be freed by the caller + */ +static int +flatten_reqs(NC *ncp, + int num_reqs, /* IN: # requests */ + const NC_req *reqs, /* [num_reqs] requests */ + MPI_Aint *num_pairs, /* OUT: total number of off-len pairs */ +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Count **offsets, /* OUT: array of flattened offsets */ + MPI_Count **lengths /* OUT: array of flattened lengths */ +#else + MPI_Aint **offsets, /* OUT: array of flattened offsets */ + int **lengths /* OUT: array of flattened lengths */ +#endif + ) +{ + int i, j, status=NC_NOERR, ndims, max_ndims=0; + MPI_Aint num, idx; + MPI_Offset *start, *count, *shape, *stride, *ones; + + *num_pairs = 0; /* total number of offset-length pairs */ + + /* Count the number off-len pairs from reqs[], so we can malloc a + * contiguous memory space for storing off-len pairs + */ + for (i=0; iput_lead_list + reqs[i].lead_off; + ndims = lead->varp->ndims; + max_ndims = MAX(max_ndims, ndims); + if (ndims > 0) { + start = reqs[i].start; + count = start + ndims; + stride = count + ndims; + } + else + start = count = stride = NULL; + + /* for record variable, each reqs[] is within a record */ + if (IS_RECVAR(lead->varp)) { + ndims--; + start++; + count++; + stride++; + } + if (fIsSet(lead->flag, NC_REQ_STRIDE_NULL)) stride = NULL; + + if (ndims < 0) continue; + if (ndims == 0) { /* 1D record variable */ + (*num_pairs)++; + continue; + } + num = 1; + if (stride != NULL && stride[ndims-1] > 1) + num = count[ndims-1]; /* count of last dimension */ + for (j=0; jput_lead_list + reqs[i].lead_off; + + ndims = lead->varp->ndims; + if (ndims > 0) { + start = reqs[i].start; + count = start + ndims; + stride = count + ndims; + } + else + start = count = stride = NULL; + + shape = lead->varp->shape; + + /* find the starting file offset for this variable */ + var_begin = lead->varp->begin; + + /* for record variable, each reqs[] is within a record */ + if (IS_RECVAR(lead->varp)) { + ndims--; + start++; + count++; + stride++; + shape++; + /* find the starting file offset for this record */ + var_begin += reqs[i].start[0] * ncp->recsize; + } + + if (fIsSet(lead->flag, NC_REQ_STRIDE_NULL)) stride = NULL; + + /* flatten each request into a list of offset-length pairs and + * append to the end of offsets and lengths + */ + flatten_subarray(ndims, lead->varp->xsz, var_begin, shape, + start, count, (stride == NULL) ? ones : stride, + &num, /* OUT: number of off-len pairs */ + *offsets + idx, /* OUT: array of offsets */ + *lengths + idx); /* OUT: array of lengths */ + idx += num; + } + NCI_Free(ones); + + for (i=0; iput_lead_list + reqs[i].lead_off; + if (fIsSet(lead->flag, NC_REQ_TO_FREE)) { + NCI_Free(lead->start); + lead->start = NULL; + } + } + + return status; +} + +/*----< construct_buf_type() >-----------------------------------------------*/ +/* construct an MPI derived datatype for I/O buffers from the request list, by + * concatenate all buffers. + */ +static int +construct_buf_type(const NC *ncp, + int num_reqs, /* IN: # requests */ + const NC_req *reqs, /* [num_reqs] requests */ + MPI_Aint *bufLen, /* OUT: buffer size in bytes */ + MPI_Datatype *bufType) /* OUT: buffer datatype */ +{ + int i, err, mpireturn, status=NC_NOERR; + NC_lead_req *lead; + +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Count *blocklens = (MPI_Count*)NCI_Malloc(sizeof(MPI_Count) * num_reqs); + MPI_Count *disps = (MPI_Count*)NCI_Malloc(sizeof(MPI_Count) * num_reqs); +#else + int *blocklens = (int*) NCI_Malloc(sizeof(int) * num_reqs); + MPI_Aint *disps = (MPI_Aint*) NCI_Malloc(sizeof(MPI_Aint) * num_reqs); +#endif + + *bufLen = 0; + for (i=0; iput_lead_list + reqs[i].lead_off; + blocklens[i] = reqs[i].nelems * lead->varp->xsz; + + *bufLen += blocklens[i]; + } + + /* construct buffer derived datatype */ +#ifdef HAVE_MPI_LARGE_COUNT + mpireturn = MPI_Type_create_hindexed_c(num_reqs, blocklens, disps, + MPI_BYTE, bufType); +#else + mpireturn = MPI_Type_create_hindexed(num_reqs, blocklens, disps, + MPI_BYTE, bufType); +#endif + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn, "MPI_Type_create_hindexed"); + /* return the first encountered error if there is any */ + if (status == NC_NOERR) status = err; + + *bufType = MPI_DATATYPE_NULL; + } + else { + MPI_Type_commit(bufType); +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Count typeSize; + MPI_Type_size_c(*bufType, &typeSize); +#else + int typeSize; + MPI_Type_size(*bufType, &typeSize); +#endif + assert(typeSize == *bufLen); + } + + NCI_Free(blocklens); + NCI_Free(disps); + + return status; +} + +/*----< intra_node_aggregation() >-------------------------------------------*/ +/* This is a collective call */ +static int +intra_node_aggregation(NC *ncp, + MPI_Aint num_pairs, +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Count *offsets, + MPI_Count *lengths, +#else + MPI_Aint *offsets, + int *lengths, +#endif + MPI_Offset bufCount, + MPI_Datatype bufType, + void *buf) +{ + int i, j, err, mpireturn, status=NC_NOERR, nreqs; + char *recv_buf=NULL, *wr_buf = NULL; + MPI_Aint npairs, *msg; + MPI_Offset offset=0, buf_count; + MPI_Datatype recvTypes, fileType=MPI_BYTE; + MPI_File fh; + MPI_Request *req=NULL; + +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Count bufLen; + MPI_Type_size_c(bufType, &bufLen); +#else + int bufLen; + MPI_Type_size(bufType, &bufLen); +#endif + bufLen *= bufCount; + + /* First, tell aggregator how much to receive by sending: + * (num_pairs and bufLen). The message size to be sent by this rank + * is num_pairs * 2 * sizeof(MPI_Offset) + bufLen + */ + if (ncp->rank == ncp->my_aggr) + msg = (MPI_Aint*) NCI_Malloc(sizeof(MPI_Aint) * ncp->num_nonaggrs * 2); + else + msg = (MPI_Aint*) NCI_Malloc(sizeof(MPI_Aint) * 2); + + msg[0] = num_pairs; + msg[1] = bufLen; + + /* Aggregator collects each non-aggregator's num_pairs and bufLen */ + if (ncp->rank == ncp->my_aggr) { + req = (MPI_Request*)NCI_Malloc(sizeof(MPI_Request) * ncp->num_nonaggrs); + nreqs = 0; + for (i=1; inum_nonaggrs; i++) + MPI_Irecv(msg + i*2, 2, MPI_AINT, ncp->nonaggr_ranks[i], 0, + ncp->comm, &req[nreqs++]); + + mpireturn = MPI_Waitall(nreqs, req, MPI_STATUSES_IGNORE); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn,"MPI_Waitall"); + /* return the first encountered error if there is any */ + if (status == NC_NOERR) status = err; + } + } + else { /* non-aggregator */ + MPI_Send(msg, 2, MPI_AINT, ncp->my_aggr, 0, ncp->comm); + if (num_pairs == 0) + NCI_Free(msg); + } + + /* Aggregator collects offset-length pairs from non-aggregators */ + if (ncp->rank == ncp->my_aggr) { + /* calculate the total number of offset-length pairs */ + npairs = num_pairs; + for (i=1; inum_nonaggrs; i++) npairs += msg[i*2]; + +#ifdef HAVE_MPI_LARGE_COUNT + if (npairs > num_pairs) { + /* realloc to store all pairs in a contiguous buffer */ + offsets = (MPI_Count*) NCI_Realloc(offsets, sizeof(MPI_Count) * npairs); + lengths = (MPI_Count*) NCI_Realloc(lengths, sizeof(MPI_Count) * npairs); + } +#else + if (npairs > num_pairs) { + /* realloc to store all pairs in a contiguous buffer */ + offsets = (MPI_Aint*) NCI_Realloc(offsets, sizeof(MPI_Aint) * npairs); + lengths = (int*) NCI_Realloc(lengths, sizeof(int) * npairs); + } +#endif + + nreqs = 0; +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Aint aint; + MPI_Count bklens[2]; + MPI_Count disps[2]; + + MPI_Get_address(offsets, &aint); + disps[0] = MPI_Aint_add(aint, sizeof(MPI_Count) * msg[0]); + MPI_Get_address(lengths, &aint); + disps[1] = MPI_Aint_add(aint, sizeof(MPI_Count) * msg[0]); + for (i=1; inum_nonaggrs; i++) { + if (msg[i*2] == 0) continue; + bklens[0] = msg[i*2] * sizeof(MPI_Count); + bklens[1] = msg[i*2] * sizeof(MPI_Count); + mpireturn = MPI_Type_create_hindexed_c(2, bklens, disps, MPI_BYTE, + &recvTypes); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn,"MPI_Type_create_hindexed_c"); + /* return the first encountered error if there is any */ + if (status == NC_NOERR) status = err; + } + else { + mpireturn = MPI_Type_commit(&recvTypes); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn,"MPI_Type_commit"); + /* return the first encountered error if there is any */ + if (status == NC_NOERR) status = err; + } + } + /* post to receive offset-length pairs from non-aggregators */ + MPI_Irecv_c(MPI_BOTTOM, 1, recvTypes, ncp->nonaggr_ranks[i], + 0, ncp->comm, &req[nreqs]); + MPI_Type_free(&recvTypes); + + disps[0] = MPI_Aint_add(disps[0], bklens[0]); + disps[1] = MPI_Aint_add(disps[1], bklens[1]); + nreqs++; + } +#else + int bklens[2]; + MPI_Aint aint, disps[2]; + + MPI_Get_address(offsets, &aint); + disps[0] = MPI_Aint_add(aint, sizeof(MPI_Aint) * msg[0]); + MPI_Get_address(lengths, &aint); + disps[1] = MPI_Aint_add(aint, sizeof(int) * msg[0]); + for (i=1; inum_nonaggrs; i++) { + if (msg[i*2] == 0) continue; + bklens[0] = msg[i*2] * sizeof(MPI_Aint); + bklens[1] = msg[i*2] * sizeof(int); + mpireturn = MPI_Type_create_hindexed(2, bklens, disps, MPI_BYTE, + &recvTypes); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn,"MPI_Type_create_hindexed"); + /* return the first encountered error if there is any */ + if (status == NC_NOERR) status = err; + } + else { + mpireturn = MPI_Type_commit(&recvTypes); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn,"MPI_Type_commit"); + /* return the first encountered error if there is any */ + if (status == NC_NOERR) status = err; + } + } + /* post to receive offset-length pairs from non-aggregators */ + MPI_Irecv(MPI_BOTTOM, 1, recvTypes, ncp->nonaggr_ranks[i], + 0, ncp->comm, &req[nreqs]); + MPI_Type_free(&recvTypes); + + disps[0] = MPI_Aint_add(disps[0], bklens[0]); + disps[1] = MPI_Aint_add(disps[1], bklens[1]); + nreqs++; + } +#endif + mpireturn = MPI_Waitall(nreqs, req, MPI_STATUSES_IGNORE); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn,"MPI_Waitall"); + /* return the first encountered error if there is any */ + if (status == NC_NOERR) status = err; + } + } + else if (num_pairs > 0) { /* non-aggregator */ + /* send offset-length pairs data to the aggregator */ +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Aint aint; + MPI_Count bklens[2]; + MPI_Count disps[2]; + + bklens[0] = msg[0] * sizeof(MPI_Count); + bklens[1] = bklens[0]; + MPI_Get_address(offsets, &aint); + disps[0] = aint; + MPI_Get_address(lengths, &aint); + disps[1] = aint; + mpireturn = MPI_Type_create_hindexed_c(2, bklens, disps, MPI_BYTE, + &recvTypes); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn,"MPI_Type_create_hindexed_c"); + /* return the first encountered error if there is any */ + if (status == NC_NOERR) status = err; + } + else { + mpireturn = MPI_Type_commit(&recvTypes); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn,"MPI_Type_commit"); + /* return the first encountered error if there is any */ + if (status == NC_NOERR) status = err; + } + } + MPI_Send_c(MPI_BOTTOM, 1, recvTypes, ncp->my_aggr, 0, ncp->comm); + MPI_Type_free(&recvTypes); +#else + int bklens[2]; + MPI_Aint aint, disps[2]; + + bklens[0] = msg[0] * sizeof(MPI_Aint); + bklens[1] = msg[0] * sizeof(int); + MPI_Get_address(offsets, &disps[0]); + MPI_Get_address(lengths, &disps[1]); + mpireturn = MPI_Type_create_hindexed(2, bklens, disps, MPI_BYTE, + &recvTypes); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn,"MPI_Type_create_hindexed"); + /* return the first encountered error if there is any */ + if (status == NC_NOERR) status = err; + } + else { + mpireturn = MPI_Type_commit(&recvTypes); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn,"MPI_Type_commit"); + /* return the first encountered error if there is any */ + if (status == NC_NOERR) status = err; + } + } + MPI_Send(MPI_BOTTOM, 1, recvTypes, ncp->my_aggr, 0, ncp->comm); + MPI_Type_free(&recvTypes); +#endif + NCI_Free(msg); + } + + /* + * TODO, define a datatype to combine sends of offset-length pairs with the + * write data into a single send call. + */ + nreqs = 0; + if (ncp->rank == ncp->my_aggr) { + /* calculate the total write account */ + buf_count = bufLen; + for (i=1; inum_nonaggrs; i++) buf_count += msg[i*2 + 1]; + + /* Allocate receive buffer, which will be sorted into an increasing + * order based on the file offsets. Thus, after sorting pack recv_buf + * to wr_buf to avoid creating another buffer datatype. + */ + if (buf_count > 0) { + recv_buf = (char*) NCI_Malloc(buf_count); + wr_buf = (char*) NCI_Malloc(buf_count); + } + + /* First, pack self write data into front of the recv_buf */ + if (bufLen > 0) { + if (bufType == MPI_BYTE) + memcpy(recv_buf, buf, bufLen); + else { + void *inbuf = (buf == NULL) ? MPI_BOTTOM : buf; +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Count position=0; + MPI_Count incount = (buf == NULL) ? 1 : bufCount; + MPI_Pack_c(inbuf, incount, bufType, recv_buf, bufLen, &position, + MPI_COMM_SELF); +#else + int position=0; + int incount = (buf == NULL) ? 1 : bufCount; + MPI_Pack(inbuf, incount, bufType, recv_buf, bufLen, &position, + MPI_COMM_SELF); +#endif + } + } + + /* post requests to receive write data from non-aggregators */ + char *ptr = recv_buf + bufLen; + for (i=1; inum_nonaggrs; i++) { + if (msg[i*2 + 1] == 0) continue; +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Irecv_c(ptr, msg[i*2 + 1], MPI_BYTE, ncp->nonaggr_ranks[i], 0, + ncp->comm, &req[nreqs++]); +#else + MPI_Irecv(ptr, msg[i*2 + 1], MPI_BYTE, ncp->nonaggr_ranks[i], 0, + ncp->comm, &req[nreqs++]); +#endif + ptr += msg[i*2 + 1]; + } + mpireturn = MPI_Waitall(nreqs, req, MPI_STATUSES_IGNORE); + if (mpireturn != MPI_SUCCESS) { + err = ncmpii_error_mpi2nc(mpireturn,"MPI_Waitall"); + /* return the first encountered error if there is any */ + if (status == NC_NOERR) status = err; + } + NCI_Free(req); + NCI_Free(msg); + } + else if (bufLen > 0) { + /* send write data to the aggregator */ + void *buf_ptr = (buf == NULL) ? MPI_BOTTOM : buf; +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Count num = (buf == NULL) ? 1 : bufCount; + MPI_Send_c(buf_ptr, num, bufType, ncp->my_aggr, 0, ncp->comm); +#else + int num = (buf == NULL) ? 1 : bufCount; + MPI_Send(buf_ptr, num, bufType, ncp->my_aggr, 0, ncp->comm); +#endif + NCI_Free(offsets); + NCI_Free(lengths); + } + + /* aggregator sorts the offset-length pairs, along with the buffer */ + if (ncp->rank == ncp->my_aggr && npairs > 0) { + + /* construct array of buffer addresses */ + MPI_Aint *bufAddr = (MPI_Aint*)NCI_Malloc(sizeof(MPI_Aint) * npairs); + bufAddr[0] = 0; + for (i=1; i= offsets[j] + lengths[j]) + /* segment i completely covers segment j, skip j */ + continue; + + MPI_Offset gap = offsets[i] + lengths[i] - offsets[j]; + if (gap >= 0) { /* segments i and j overlaps */ + if (bufAddr[i] + lengths[i] == bufAddr[j] + gap) { + /* buffers i and j are contiguous, merge j to i */ + lengths[i] = MPI_Aint_add(lengths[i], lengths[j] - gap); + } + else { /* buffers are not contiguous, reduce j's len */ + offsets[i+1] = offsets[j] + gap; + lengths[i+1] = lengths[j] - gap; + bufAddr[i+1] = bufAddr[j] + gap; + i++; + } + } + else { /* i and j do not overlap */ + i++; + if (i < j) { + offsets[i] = offsets[j]; + lengths[i] = lengths[j]; + bufAddr[i] = bufAddr[j]; + } + } + } + /* update number of pairs, now all off-len pairs are not overlapped */ + npairs = i+1; + + /* pack recv_buf, data received from non-aggregators, into wr_buf, a + * contiguous buffer, wr_buf, which will later be used in a call to + * MPI_File_write_all() + */ + char *ptr = wr_buf; + buf_count = 0; + if (npairs > 0) { + memcpy(ptr, recv_buf + bufAddr[0], lengths[0]); + ptr += lengths[0]; + buf_count = lengths[0]; + } + for (i=0, j=1; jrank != ncp->my_aggr) /* non-aggregator writes nothing */ + buf_count = 0; + + /* Only aggregators writes non-zero sized of data to the file. The + * non-aggregators participate the collective write call with zero-length + * write requests. + */ + fh = ncp->collective_fh; + + /* set the MPI-IO fileview, this is a collective call */ + err = ncmpio_file_set_view(ncp, fh, &offset, fileType); + if (fileType != MPI_BYTE) MPI_Type_free(&fileType); + if (err != NC_NOERR) { + if (status == NC_NOERR) status = err; + buf_count = 0; + } + + /* call MPI_File_write_at_all */ + err = ncmpio_read_write(ncp, NC_REQ_WR, NC_REQ_COLL, offset, buf_count, + MPI_BYTE, wr_buf, 1); + if (status == NC_NOERR) status = err; + + if (wr_buf != NULL) NCI_Free(wr_buf); + + return status; +} + +/*----< ncmpio_intra_node_aggregation_nreqs() >------------------------------*/ +/* This is a collective call */ +int +ncmpio_intra_node_aggregation_nreqs(NC *ncp, + int num_reqs, + NC_req *put_list, + MPI_Offset newnumrecs) +{ + int err, status=NC_NOERR; + MPI_Aint bufLen, num_pairs; +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Count *offsets=NULL, *lengths=NULL; +#else + MPI_Aint *offsets=NULL; + int *lengths=NULL; +#endif + MPI_Datatype bufType=MPI_BYTE; + + assert(ncp->my_aggr >= 0); + + /* construct file offset-length pairs + * num_pairs: total number of off-len pairs + * offsets: array of flattened offsets + * lengths: array of flattened lengths + */ + if (num_reqs > 0) + flatten_reqs(ncp, num_reqs, put_list, &num_pairs, &offsets, &lengths); + else + num_pairs = 0; + + /* construct write buffer datatype, bufType. + * bufLen is the buffer size in bytes + */ + if (num_reqs > 0) { + construct_buf_type(ncp, num_reqs, put_list, &bufLen, &bufType); + bufLen = 1; + } + else + bufLen = 0; + + if (put_list != NULL) + NCI_Free(put_list); + + err = intra_node_aggregation(ncp, num_pairs, offsets, lengths, bufLen, + bufType, NULL); + if (status == NC_NOERR) status = err; + + /* free and reset bufType */ + if (bufType != MPI_BYTE && bufType != MPI_DATATYPE_NULL) + MPI_Type_free(&bufType); + + /* Update the number of records if new records have been created. + * For nonblocking APIs, there is no way for a process to know whether + * others write to a record variable or not. Note newnumrecs has been + * sync-ed and always >= ncp->numrecs. + */ + if (newnumrecs > ncp->numrecs) { + /* update new record number in file. Note newnumrecs is already + * sync-ed among all processes and in collective mode + * ncp->numrecs is always sync-ed in memory among processes, + * thus no need another MPI_Allreduce to sync it. */ + err = ncmpio_write_numrecs(ncp, newnumrecs); + if (status == NC_NOERR) status = err; + /* retain the first error if there is any */ + if (ncp->numrecs < newnumrecs) ncp->numrecs = newnumrecs; + } + + return status; +} + +/*----< ncmpio_intra_node_aggregation() >------------------------------------*/ +/* This is a collective call */ +int +ncmpio_intra_node_aggregation(NC *ncp, + NC_var *varp, + const MPI_Offset *start, + const MPI_Offset *count, + const MPI_Offset *stride, + MPI_Offset bufCount, + MPI_Datatype bufType, + void *buf) +{ + int i, err, status=NC_NOERR; + MPI_Aint bufLen, num_pairs; +#ifdef HAVE_MPI_LARGE_COUNT + MPI_Count *offsets=NULL, *lengths=NULL; +#else + MPI_Aint *offsets=NULL; + int *lengths=NULL; +#endif + + if (buf == NULL) /* zero-length request */ + return intra_node_aggregation(ncp, 0, NULL, NULL, 0, MPI_BYTE, NULL); + + /* construct file offset-length pairs + * num_pairs: total number of off-len pairs + * offsets: array of flattened offsets + * lengths: array of flattened lengths + */ + err = flatten_req(ncp, varp, start, count, stride, &num_pairs, &offsets, + &lengths); + if (err != NC_NOERR) { + bufLen = 0; + num_pairs = 0; + if (offsets != NULL) + NCI_Free(offsets); + offsets = NULL; + } + else { + bufLen = varp->xsz; + for (i=0; indims; i++) + bufLen *= count[i]; + } + if (status == NC_NOERR) status = err; + + err = intra_node_aggregation(ncp, num_pairs, offsets, lengths, bufCount, + bufType, buf); + if (status == NC_NOERR) status = err; + + return status; +} + diff --git a/src/drivers/ncmpio/ncmpio_open.c b/src/drivers/ncmpio/ncmpio_open.c index a350a56d5..cdbfbbdc6 100644 --- a/src/drivers/ncmpio/ncmpio_open.c +++ b/src/drivers/ncmpio/ncmpio_open.c @@ -191,6 +191,19 @@ ncmpio_open(MPI_Comm comm, ncp->vars.value[i]->attrs.hash_size = ncp->hash_size_attr; #endif + /* determine whether to enable intra-node aggregation and set up all + * intra-node aggregation metadata. + * ncp->num_aggrs_per_node = 0, or non-zero indicates whether this feature + * is enabled globally for all processes. + * ncp->my_aggr = -1 or >= 0 indicates whether aggregation is effectively + * enabled for the aggregation group of this process. + */ + ncp->my_aggr = -1; + if (ncp->num_aggrs_per_node != 0) { + err = ncmpio_intra_node_aggr_init(ncp); + if (err != NC_NOERR) return err; + } + *ncpp = (void*)ncp; return status; diff --git a/src/drivers/ncmpio/ncmpio_sync.c b/src/drivers/ncmpio/ncmpio_sync.c index 0090fd6bc..096d9fe9b 100644 --- a/src/drivers/ncmpio/ncmpio_sync.c +++ b/src/drivers/ncmpio/ncmpio_sync.c @@ -68,12 +68,11 @@ int ncmpio_write_numrecs(NC *ncp, MPI_Offset new_numrecs) { - int rank, mpireturn, err; + int mpireturn, err; MPI_File fh; MPI_Status mpistatus; - MPI_Comm_rank(ncp->comm, &rank); - if (!fIsSet(ncp->flags, NC_HCOLL) && rank > 0) + if (!fIsSet(ncp->flags, NC_HCOLL) && ncp->rank > 0) /* Only root process writes numrecs in file */ return NC_NOERR; @@ -84,7 +83,7 @@ ncmpio_write_numrecs(NC *ncp, if (ncp->nprocs > 1 && !NC_indep(ncp)) fh = ncp->collective_fh; - if (rank > 0 && fIsSet(ncp->flags, NC_HCOLL)) { + if (ncp->rank > 0 && fIsSet(ncp->flags, NC_HCOLL)) { /* other processes participate the collective call */ TRACE_IO(MPI_File_write_at_all)(fh, 0, NULL, 0, MPI_BYTE, &mpistatus); return NC_NOERR; diff --git a/src/drivers/ncmpio/ncmpio_util.c b/src/drivers/ncmpio/ncmpio_util.c index 82697b5f8..38febc957 100644 --- a/src/drivers/ncmpio/ncmpio_util.c +++ b/src/drivers/ncmpio/ncmpio_util.c @@ -52,10 +52,12 @@ void ncmpio_set_pnetcdf_hints(NC *ncp, ncp->env_h_align = strtoll(value, NULL, 10); if (errno != 0) ncp->env_h_align = 0; else if (ncp->env_h_align < 0) ncp->env_h_align = 0; - sprintf(value, "%lld", ncp->env_h_align); } } - if (!flag) sprintf(value, "%d", FILE_ALIGNMENT_DEFAULT); + if (ncp->env_h_align == 0) + sprintf(value, "%d", FILE_ALIGNMENT_DEFAULT); + else + sprintf(value, "%lld", ncp->env_h_align); MPI_Info_set(info_used, "nc_header_align_size", value); ncp->env_v_align = 0; @@ -68,10 +70,12 @@ void ncmpio_set_pnetcdf_hints(NC *ncp, ncp->env_v_align = strtoll(value, NULL, 10); if (errno != 0) ncp->env_v_align = 0; else if (ncp->env_v_align < 0) ncp->env_v_align = 0; - sprintf(value, "%lld", ncp->env_v_align); } } - if (!flag) sprintf(value, "%d", FILE_ALIGNMENT_DEFAULT); + if (ncp->env_v_align == 0) + sprintf(value, "%d", FILE_ALIGNMENT_DEFAULT); + else + sprintf(value, "%lld", ncp->env_v_align); MPI_Info_set(info_used, "nc_var_align_size", value); ncp->env_r_align = 0; @@ -84,12 +88,15 @@ void ncmpio_set_pnetcdf_hints(NC *ncp, ncp->env_r_align = strtoll(value, NULL, 10); if (errno != 0) ncp->env_r_align = 0; else if (ncp->env_r_align < 0) ncp->env_r_align = 0; - sprintf(value, "%lld", ncp->env_r_align); } } - if (!flag) sprintf(value, "%d", FILE_ALIGNMENT_DEFAULT); + if (ncp->env_r_align == 0) + sprintf(value, "%d", FILE_ALIGNMENT_DEFAULT); + else + sprintf(value, "%lld", ncp->env_r_align); MPI_Info_set(info_used, "nc_record_align_size", value); + ncp->chunk = PNC_DEFAULT_CHUNKSIZE; if (user_info != MPI_INFO_NULL) { /* header reading chunk size */ MPI_Info_get(user_info, "nc_header_read_chunk_size", MPI_MAX_INFO_VAL-1, @@ -103,12 +110,12 @@ void ncmpio_set_pnetcdf_hints(NC *ncp, ncp->chunk = 0; else if (chunk > NC_MAX_INT) /* limit to NC_MAX_INT */ ncp->chunk = NC_MAX_INT; - sprintf(value, "%d", ncp->chunk); } } - if (!flag) sprintf(value, "%d", PNC_DEFAULT_CHUNKSIZE); + sprintf(value, "%d", ncp->chunk); MPI_Info_set(info_used, "nc_header_read_chunk_size", value); + strcpy(value, "auto"); if (user_info != MPI_INFO_NULL) { /* setting in-place byte swap (matters only for Little Endian) */ MPI_Info_get(user_info, "nc_in_place_swap", MPI_MAX_INFO_VAL-1, value, &flag); @@ -127,9 +134,9 @@ void ncmpio_set_pnetcdf_hints(NC *ncp, } } } - if (!flag) strcpy(value, "auto"); MPI_Info_set(info_used, "nc_in_place_swap", value); + ncp->ibuf_size = PNC_DEFAULT_IBUF_SIZE; if (user_info != MPI_INFO_NULL) { /* temporal buffer size used to pack noncontiguous aggregated user * buffers when calling ncmpi_wait/wait_all, Default 16 MiB @@ -141,13 +148,13 @@ void ncmpio_set_pnetcdf_hints(NC *ncp, errno = 0; /* errno must set to zero before calling strtoll */ ibuf_size = strtoll(value, NULL, 10); if (errno == 0 && ncp->ibuf_size > 0) ncp->ibuf_size = ibuf_size; - sprintf(value, "%lld", ncp->ibuf_size); } } - if (!flag) sprintf(value, "%d", PNC_DEFAULT_IBUF_SIZE); + sprintf(value, "%lld", ncp->ibuf_size); MPI_Info_set(info_used, "nc_ibuf_size", value); #ifdef ENABLE_SUBFILING + ncp->subfile_mode = 0; if (user_info != MPI_INFO_NULL) { MPI_Info_get(user_info, "pnetcdf_subfiling", MPI_MAX_INFO_VAL-1, value, &flag); @@ -156,9 +163,12 @@ void ncmpio_set_pnetcdf_hints(NC *ncp, ncp->subfile_mode = 1; } } - if (!flag) strcpy(value, "disable"); - MPI_Info_set(info_used, "pnetcdf_subfiling", value); + if (ncp->subfile_mode) + MPI_Info_set(info_used, "pnetcdf_subfiling", "enable"); + else + MPI_Info_set(info_used, "pnetcdf_subfiling", "disable"); + ncp->num_subfiles = 0; if (user_info != MPI_INFO_NULL) { MPI_Info_get(user_info, "nc_num_subfiles", MPI_MAX_INFO_VAL-1, value, &flag); @@ -167,10 +177,9 @@ void ncmpio_set_pnetcdf_hints(NC *ncp, ncp->num_subfiles = atoi(value); if (errno != 0) ncp->num_subfiles = 0; else if (ncp->num_subfiles < 0) ncp->num_subfiles = 0; - sprintf(value, "%d", ncp->num_subfiles); } } - if (!flag) strcpy(value, "0"); + sprintf(value, "%d", ncp->num_subfiles); MPI_Info_set(info_used, "nc_num_subfiles", value); if (ncp->subfile_mode == 0) ncp->num_subfiles = 0; @@ -202,10 +211,9 @@ void ncmpio_set_pnetcdf_hints(NC *ncp, ncp->dims.hash_size = atoi(value); if (errno != 0 || ncp->dims.hash_size < 0) ncp->dims.hash_size = PNC_HSIZE_DIM; - sprintf(value, "%d", ncp->dims.hash_size); } } - if (!flag) sprintf(value, "%d", PNC_HSIZE_DIM); + sprintf(value, "%d", ncp->dims.hash_size); MPI_Info_set(info_used, "nc_hash_size_dim", value); ncp->vars.hash_size = PNC_HSIZE_VAR; @@ -218,10 +226,9 @@ void ncmpio_set_pnetcdf_hints(NC *ncp, ncp->vars.hash_size = atoi(value); if (errno != 0 || ncp->vars.hash_size < 0) ncp->vars.hash_size = PNC_HSIZE_VAR; - sprintf(value, "%d", ncp->vars.hash_size); } } - if (!flag) sprintf(value, "%d", PNC_HSIZE_VAR); + sprintf(value, "%d", ncp->vars.hash_size); MPI_Info_set(info_used, "nc_hash_size_var", value); ncp->attrs.hash_size = PNC_HSIZE_GATTR; @@ -234,10 +241,9 @@ void ncmpio_set_pnetcdf_hints(NC *ncp, ncp->attrs.hash_size = atoi(value); if (errno != 0 || ncp->attrs.hash_size < 0) ncp->attrs.hash_size = PNC_HSIZE_GATTR; - sprintf(value, "%d", ncp->attrs.hash_size); } } - if (!flag) sprintf(value, "%d", PNC_HSIZE_GATTR); + sprintf(value, "%d", ncp->attrs.hash_size); MPI_Info_set(info_used, "nc_hash_size_gattr", value); ncp->hash_size_attr = PNC_HSIZE_VATTR; @@ -250,11 +256,25 @@ void ncmpio_set_pnetcdf_hints(NC *ncp, ncp->hash_size_attr = atoi(value); if (errno != 0 || ncp->hash_size_attr < 0) ncp->hash_size_attr = PNC_HSIZE_VATTR; - sprintf(value, "%d", ncp->hash_size_attr); } } - if (!flag) sprintf(value, "%d", PNC_HSIZE_VATTR); + sprintf(value, "%d", ncp->hash_size_attr); MPI_Info_set(info_used, "nc_hash_size_vattr", value); + + ncp->num_aggrs_per_node = 0; + if (user_info != MPI_INFO_NULL) { + /* Hash table size for non-global attributes */ + MPI_Info_get(user_info, "nc_num_aggrs_per_node", MPI_MAX_INFO_VAL-1, + value, &flag); + if (flag) { + errno = 0; /* errno must set to zero before calling atoi */ + ncp->num_aggrs_per_node = atoi(value); + if (errno != 0 || ncp->num_aggrs_per_node < 0) + ncp->num_aggrs_per_node = 0; + } + } + sprintf(value, "%d", ncp->num_aggrs_per_node); + MPI_Info_set(info_used, "nc_num_aggrs_per_node", value); } /*----< ncmpio_first_offset() >-----------------------------------------------*/ diff --git a/src/drivers/ncmpio/ncmpio_wait.c b/src/drivers/ncmpio/ncmpio_wait.c index bd5055c1a..cd7473d86 100644 --- a/src/drivers/ncmpio/ncmpio_wait.c +++ b/src/drivers/ncmpio/ncmpio_wait.c @@ -984,9 +984,15 @@ req_commit(NC *ncp, } /* carry out writes and reads separately (writes first) */ + if (do_write > 0) { - err = wait_getput(ncp, num_w_reqs, put_list, NC_REQ_WR, coll_indep, - newnumrecs); + + if (ncp->my_aggr >= 0 && coll_indep == NC_REQ_COLL && ncp->nprocs > 1) + /* intra-node write aggregation must be in collective mode */ + err = ncmpio_intra_node_aggregation_nreqs(ncp, num_w_reqs, put_list, newnumrecs); + else + err = wait_getput(ncp, num_w_reqs, put_list, NC_REQ_WR, coll_indep, + newnumrecs); put_list = NULL; /* has been freed in wait_getput() */ } @@ -1258,8 +1264,8 @@ off_compare(const void *a, const void *b) static MPI_Offset vars_flatten(int ndim, /* number of dimensions */ int el_size, /* array element size */ - MPI_Offset *dimlen, /* [ndim] dimension lengths */ MPI_Offset offset, /* starting file offset of variable */ + MPI_Offset *dimlen, /* [ndim] dimension lengths */ MPI_Aint buf_addr,/* starting buffer address */ MPI_Offset *start, /* [ndim] starts of subarray */ MPI_Offset *count, /* [ndim] counts of subarray */ @@ -1357,6 +1363,9 @@ vars_flatten(int ndim, /* number of dimensions */ } /*----< merge_requests() >---------------------------------------------------*/ +/* flatten all requests into offset-length pairs, sort them into an increasing + * order, and resolve the overlapped offset-length pairs. + */ static int merge_requests(NC *ncp, NC_lead_req *lead_list, @@ -1457,7 +1466,7 @@ merge_requests(NC *ncp, if (fIsSet(lead->flag, NC_REQ_STRIDE_NULL)) stride = NULL; /* flatten each request to a list of offset-length pairs */ - vars_flatten(ndims, lead->varp->xsz, shape, var_begin, + vars_flatten(ndims, lead->varp->xsz, var_begin, shape, addr, start, count, stride, &nseg, /* OUT: number of offset-length pairs */ seg_ptr); /* OUT: array of offset-length pairs */ @@ -1759,7 +1768,7 @@ req_aggregation(NC *ncp, * fileview must contain only monotonic non-decreasing file offsets. Thus * if the nonblocking requests interleave with each other (although not * overlap), then we cannot simply concatenate the filetypes of individual - * requests. This approach flattens the requests of "interleaved" groups + * requests. Codes below flatten the requests of "interleaved" groups * into offset-length pairs, sorts, and merges them into an aggregated * filetype. Similar for building an aggregated I/O buffer type. */ @@ -1913,28 +1922,32 @@ req_aggregation(NC *ncp, else { /* this group is interleaved */ /* flatten the interleaved requests in this group, so interleaved * requests can be sorted and merged into a monotonically - * non-decreasing filetype. For example, multiple nonblocking - * requests each accessing a single column of a 2D array, that each - * produces a filetype interleaving with others'. + * non-decreasing order for constructing the filetype. For example, + * multiple nonblocking requests each writing/reading a single + * column of a 2D array will produces an interleaving filetype + * among all processes. * * The pitfall of this flattening is the additional memory * requirement, as it will have to break down each request into a - * list of offset-length pairs, and merge all lists into a sorted - * list based on their offsets into an increasing order. + * list of offset-length pairs, and then merge all lists into a + * sorted list based on their offsets into an increasing order. * * Be warned! The additional memory requirement for this merging can - * be more than the I/O data itself. For example, each nonblocking - * request access a single column of a 2D array of 4-byte integer - * type. Each off-len pair represents only a 4-byte integer, but the - * off-len pair itself takes 24 bytes. Additional memory is also - * required for MPI arguments of displacements and blocklengths. + * be more than the I/O data itself. For example, in the + * column-wise data partitioning pattern, a process makes a + * nonblocking request for accessing a single column of a 2D array + * of 4-byte integer type. However, each element of the column is + * flattened into an off-len pair and this off-len pair itself + * takes 24 bytes, sizeof(struct off_len). Additional memory is + * also required for MPI arguments of displacements and + * blocklengths when constructing the filetype. */ MPI_Offset nsegs=0; /* number of merged offset-length pairs */ off_len *segs=NULL; /* array of the offset-length pairs */ void *merged_buf; - /* merge all requests into sorted offset-length pairs. Note - * g_reqs[].offset_start and offset_end are relative to the + /* flatten and merge all requests into sorted offset-length pairs. + * Note g_reqs[].offset_start and offset_end are relative to the * beginning of file */ err = merge_requests(ncp, lead_list, g_num_reqs, g_reqs, &merged_buf, &nsegs, &segs); diff --git a/test/C/parallel_run.sh b/test/C/parallel_run.sh index 657df8292..42ef148bb 100755 --- a/test/C/parallel_run.sh +++ b/test/C/parallel_run.sh @@ -27,8 +27,14 @@ fi unset PNETCDF_HINTS for j in ${safe_modes} ; do +for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -60,6 +66,7 @@ for j in ${safe_modes} ; do # Validator does not support nc4 fi done +done rm -f ${OUTDIR}/*.nc rm -f ${OUTDIR}/*.nc4 diff --git a/test/CXX/parallel_run.sh b/test/CXX/parallel_run.sh index b7e1b7719..4ccd6f06a 100755 --- a/test/CXX/parallel_run.sh +++ b/test/CXX/parallel_run.sh @@ -29,8 +29,14 @@ unset PNETCDF_HINTS for i in ${TESTPROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -60,6 +66,7 @@ for i in ${TESTPROGRAMS} ; do # Validator does not support NetCDF-4 format fi done + done rm -f ${OUTDIR}/$i.nc rm -f ${OUTDIR}/$i.bb.nc done diff --git a/test/F90/parallel_run.sh b/test/F90/parallel_run.sh index 10a9b8ffb..c2d53ea8b 100755 --- a/test/F90/parallel_run.sh +++ b/test/F90/parallel_run.sh @@ -29,8 +29,14 @@ unset PNETCDF_HINTS for i in ${PARALLEL_PROGS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -62,6 +68,7 @@ for i in ${PARALLEL_PROGS} ; do # Validator does not support nc4 fi done + done rm -f ${OUTDIR}/$i.nc rm -f ${OUTDIR}/$i.bb.nc done diff --git a/test/adios/parallel_run.sh b/test/adios/parallel_run.sh index 559cc65ee..7c691c751 100755 --- a/test/adios/parallel_run.sh +++ b/test/adios/parallel_run.sh @@ -26,8 +26,14 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -44,5 +50,6 @@ for i in ${check_PROGRAMS} ; do ${MPIRUN} ./$i ${srcdir}/arrays.bp fi done + done done diff --git a/test/burst_buffer/parallel_run.sh b/test/burst_buffer/parallel_run.sh index 2804595bd..efde144d9 100755 --- a/test/burst_buffer/parallel_run.sh +++ b/test/burst_buffer/parallel_run.sh @@ -29,8 +29,14 @@ unset PNETCDF_HINTS for i in ${TESTPROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -52,6 +58,7 @@ for i in ${TESTPROGRAMS} ; do # echo "--- validating file ${TESTOUTDIR}/$i.nc" ${TESTSEQRUN} ${VALIDATOR} -q ${TESTOUTDIR}/$i.nc done + done rm -f ${OUTDIR}/$i.nc rm -f ${OUTDIR}/$i.nc*.data rm -f ${OUTDIR}/$i.nc*.meta diff --git a/test/cdf_format/parallel_run.sh b/test/cdf_format/parallel_run.sh index 8a8c31b13..2b6bb5e12 100755 --- a/test/cdf_format/parallel_run.sh +++ b/test/cdf_format/parallel_run.sh @@ -29,8 +29,14 @@ fi unset PNETCDF_HINTS for j in ${safe_modes} ; do +for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -63,6 +69,7 @@ for j in ${safe_modes} ; do # ${MPIRUN} ${NCMPIDIFF} -q ${TESTOUTDIR}/dim_cdf12.nc ${TESTOUTDIR}/dim_cdf12.bb.nc fi done +done rm -f ${OUTDIR}/dim_cdf12.nc rm -f ${OUTDIR}/cdf_type.nc diff --git a/test/header/parallel_run.sh b/test/header/parallel_run.sh index a69ea71c3..9f090bca7 100755 --- a/test/header/parallel_run.sh +++ b/test/header/parallel_run.sh @@ -29,8 +29,14 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -61,6 +67,7 @@ for i in ${check_PROGRAMS} ; do # Validator does not support nc4 fi done + done rm -f ${OUTDIR}/$i.nc rm -f ${OUTDIR}/$i.bb.nc done diff --git a/test/nc4/parallel_run.sh b/test/nc4/parallel_run.sh index 26fa37bdb..e616c3cbf 100755 --- a/test/nc4/parallel_run.sh +++ b/test/nc4/parallel_run.sh @@ -26,13 +26,20 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" ${MPIRUN} ./$i ${TESTOUTDIR}/$i.nc done + done rm -f ${OUTDIR}/$i.nc rm -f ${OUTDIR}/$i.nc.cdf4 done diff --git a/test/nonblocking/parallel_run.sh b/test/nonblocking/parallel_run.sh index 1c4fffae0..5bdc4d932 100755 --- a/test/nonblocking/parallel_run.sh +++ b/test/nonblocking/parallel_run.sh @@ -29,8 +29,14 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -69,6 +75,7 @@ for i in ${check_PROGRAMS} ; do # Validator does not support nc4 fi done + done rm -f ${OUTDIR}/$i.nc rm -f ${OUTDIR}/$i.bb.nc rm -f ${OUTDIR}/$i.nc.* diff --git a/test/subfile/parallel_run.sh b/test/subfile/parallel_run.sh index 6f8139742..83aa22d9f 100755 --- a/test/subfile/parallel_run.sh +++ b/test/subfile/parallel_run.sh @@ -29,8 +29,14 @@ unset PNETCDF_HINTS for i in ${check_PROGRAMS} ; do for j in ${safe_modes} ; do + for intra_aggr in 0 1 ; do if test "$j" = 1 ; then # test only in safe mode export PNETCDF_HINTS="romio_no_indep_rw=true" + else + export PNETCDF_HINTS= + fi + if test "$intra_aggr" = 1 ; then + export PNETCDF_HINTS="${PNETCDF_HINTS};nc_num_aggrs_per_node=2" fi export PNETCDF_SAFE_MODE=$j # echo "set PNETCDF_SAFE_MODE ${PNETCDF_SAFE_MODE}" @@ -69,6 +75,7 @@ for i in ${check_PROGRAMS} ; do ${MPIRUN} ${NCMPIDIFF} -q ${TESTOUTDIR}/$i.nc.subfile_1.nc ${TESTOUTDIR}/$i.bb.nc.subfile_1.nc fi done + done rm -f ${OUTDIR}/$i.nc rm -f ${OUTDIR}/$i.nc.subfile_0.nc rm -f ${OUTDIR}/$i.nc.subfile_1.nc diff --git a/test/testcases/Makefile.am b/test/testcases/Makefile.am index c679c1f5f..9e8de6ce3 100644 --- a/test/testcases/Makefile.am +++ b/test/testcases/Makefile.am @@ -54,7 +54,8 @@ if NAGFORT AM_FCFLAGS += -w=uparam endif -TESTPROGRAMS = ncmpi_vars_null_stride \ +TESTPROGRAMS = file_create_open \ + ncmpi_vars_null_stride \ vectors \ collective_error \ test_varm \ diff --git a/test/testcases/file_create_open.c b/test/testcases/file_create_open.c new file mode 100644 index 000000000..e6383650c --- /dev/null +++ b/test/testcases/file_create_open.c @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2024, Northwestern University and Argonne National Laboratory + * See COPYRIGHT notice in top-level directory. + */ + +/* + This program tests ncmpi_create(), ncmpi_open(), and ncmpi_close(). +*/ + +#include +#include +#include +#include /* basename() */ + +#include +#include +#include + +int main(int argc, char **argv) +{ + char filename[512]; + int i, err, nprocs, rank, nerrs=0, ncid; + int format[3] = {0, NC_64BIT_OFFSET, NC_64BIT_DATA}; + + MPI_Init(&argc, &argv); + MPI_Comm_size(MPI_COMM_WORLD, &nprocs); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + if (argc > 2) { + if (!rank) printf("Usage: %s [filename]\n",argv[0]); + MPI_Finalize(); + return 1; + } + if (argc == 2) snprintf(filename, 512, "%s", argv[1]); + else sprintf(filename, "%s.nc", argv[0]); + + if (rank == 0) { + char *cmd_str = (char *)malloc(strlen(argv[0]) + 256); + sprintf(cmd_str, "*** TESTING C %s for file create", basename(argv[0])); + printf("%-66s ------ ", cmd_str); + free(cmd_str); + } + + for (i=0; i<3; i++) { + /* Create a new file */ + int cmode = NC_CLOBBER | format[i]; + err = ncmpi_create(MPI_COMM_WORLD, filename, cmode, MPI_INFO_NULL, &ncid); + CHECK_ERR + + /* Close the file. */ + err = ncmpi_close(ncid); + CHECK_ERR + + /* Open the file */ + err = ncmpi_open(MPI_COMM_WORLD, filename, NC_WRITE, MPI_INFO_NULL, &ncid); + CHECK_ERR + + /* Close the file. */ + err = ncmpi_close(ncid); + CHECK_ERR + } + + /* check if there is any malloc residue */ + MPI_Offset malloc_size, sum_size; + err = ncmpi_inq_malloc_size(&malloc_size); + if (err == NC_NOERR) { + MPI_Reduce(&malloc_size, &sum_size, 1, MPI_OFFSET, MPI_SUM, 0, MPI_COMM_WORLD); + if (rank == 0 && sum_size > 0) + printf("heap memory allocated by PnetCDF internally has %lld bytes yet to be freed\n", + sum_size); + } + + MPI_Allreduce(MPI_IN_PLACE, &nerrs, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD); + if (rank == 0) { + if (nerrs) + printf(FAIL_STR, nerrs); + else + printf(PASS_STR); + } + + MPI_Finalize(); + + return (nerrs > 0); +} diff --git a/test/testcases/flexible.c b/test/testcases/flexible.c index 81ba03cfd..2e5cf9e9b 100644 --- a/test/testcases/flexible.c +++ b/test/testcases/flexible.c @@ -109,7 +109,7 @@ int main(int argc, char **argv) { #endif /* initialize the contents of the array */ - for (j=0; j