diff --git a/sneak_peek.md b/sneak_peek.md index 12b7c7566..d8cd5ff0b 100644 --- a/sneak_peek.md +++ b/sneak_peek.md @@ -140,6 +140,8 @@ This is essentially a placeholder for the next release note ... programs and collapsible bullets to display their manual pages. * Other updates: + + When file header extent size grows, use 64 MiB per process as the move unit + size. See [PR #137](https://github.com/Parallel-NetCDF/PnetCDF/pull/137) + Since version 1.1.0, PnetCDF has been using file striping size, if obtainable from hint `striping_unit` set by users or MPI-IO underneath, to align the starting file offset of data section. This offset is also diff --git a/src/drivers/ncmpio/ncmpio_enddef.c b/src/drivers/ncmpio/ncmpio_enddef.c index eb1804ec3..50610b7ac 100644 --- a/src/drivers/ncmpio/ncmpio_enddef.c +++ b/src/drivers/ncmpio/ncmpio_enddef.c @@ -35,25 +35,34 @@ /*----< move_file_block() >--------------------------------------------------*/ static int move_file_block(NC *ncp, - MPI_Offset to, - MPI_Offset from, - MPI_Offset nbytes) + MPI_Offset to, /* destination file starting offset */ + MPI_Offset from, /* source file starting offset */ + MPI_Offset nbytes) /* amount to be moved */ { int rank, nprocs, bufcount, mpireturn, err, status=NC_NOERR, min_st; void *buf; - int chunk_size=1048576; /* move 1 MB per process at a time */ + size_t chunk_size; MPI_Status mpistatus; MPI_Comm_size(ncp->comm, &nprocs); MPI_Comm_rank(ncp->comm, &rank); - /* if the file striping unit size is known (obtained from MPI-IO), then - * we use that instead of 1 MB */ - if (ncp->striping_unit > 0) chunk_size = ncp->striping_unit; + /* Divide amount nbytes among all processes. If the divided amount, + * chunk_size, is larger then MOVE_UNIT, set chunk_size to be the move unit + * size per process (make sure it is <= NC_MAX_INT, as MPI read/write APIs + * use 4-byte int in their count argument.) + */ +#define MOVE_UNIT 67108864 + chunk_size = nbytes / nprocs; + if (nbytes % nprocs) chunk_size++; + if (chunk_size > MOVE_UNIT) { + /* move data in multiple rounds, MOVE_UNIT per process at a time */ + chunk_size = MOVE_UNIT; + } /* buf will be used as a temporal buffer to move data in chunks, i.e. * read a chunk and later write to the new location */ - buf = NCI_Malloc((size_t)chunk_size); + buf = NCI_Malloc(chunk_size); if (buf == NULL) DEBUG_RETURN_ERROR(NC_ENOMEM) /* make fileview entire file visible */ @@ -64,14 +73,17 @@ move_file_block(NC *ncp, while (nbytes > 0) { int get_size=0; - /* calculate how much to move at each time */ - bufcount = chunk_size; + /* calculate how much to move at each time. chunk_size has been + * checked, must be < NC_MAX_INT + */ + bufcount = (int)chunk_size; if (nbytes < (MPI_Offset)nprocs * chunk_size) { /* handle the last group of chunks */ MPI_Offset rem_chunks = nbytes / chunk_size; if (rank > rem_chunks) /* these processes do not read/write */ bufcount = 0; else if (rank == rem_chunks) /* this process reads/writes less */ + /* make bufcount < chunk_size */ bufcount = (int)(nbytes % chunk_size); nbytes = 0; } @@ -93,6 +105,7 @@ move_file_block(NC *ncp, if (mpireturn != MPI_SUCCESS) { err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_read_at_all"); if (err == NC_EFILE) DEBUG_ASSIGN_ERROR(status, NC_EREAD) + get_size = bufcount; } else { /* for zero-length read, MPI_Get_count may report incorrect result @@ -105,6 +118,11 @@ move_file_block(NC *ncp, * read from a file may be less than bufcount. Because we are * moving whatever read to a new file offset, we must use the * amount actually read to call MPI_File_write_at_all below. + * + * Update the number of bytes read since file open. + * Because each rank reads and writes no more than one chunk_size + * at a time and chunk_size is < NC_MAX_INT, it is OK to call + * MPI_Get_count, instead of MPI_Get_count_c. */ MPI_Get_count(&mpistatus, MPI_BYTE, &get_size); ncp->get_size += get_size; @@ -143,26 +161,24 @@ move_file_block(NC *ncp, TRACE_IO(MPI_File_write_at_all)(ncp->collective_fh, to+nbytes+rank*chunk_size, - buf, get_size /* bufcount */, + buf, get_size /* NOT bufcount */, MPI_BYTE, &mpistatus); if (mpireturn != MPI_SUCCESS) { err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_write_at_all"); if (err == NC_EFILE) DEBUG_ASSIGN_ERROR(status, NC_EWRITE) } else { - /* update the number of bytes written since file open */ -#ifdef HAVE_MPI_GET_COUNT_C - MPI_Count put_size; - MPI_Get_count_c(&mpistatus, MPI_BYTE, &put_size); - ncp->put_size += put_size; -#else + /* update the number of bytes written since file open. + * Because each rank reads and writes no more than one chunk_size + * at a time and chunk_size is < NC_MAX_INT, it is OK to call + * MPI_Get_count, instead of MPI_Get_count_c. + */ int put_size; mpireturn = MPI_Get_count(&mpistatus, MPI_BYTE, &put_size); if (mpireturn != MPI_SUCCESS || put_size == MPI_UNDEFINED) ncp->put_size += get_size; /* or bufcount */ else ncp->put_size += put_size; -#endif } TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN, ncp->comm); status = min_st; @@ -501,7 +517,6 @@ write_NC(NC *ncp) /* only rank 0's header gets written to the file */ if (rank == 0) { void *buf=NULL; - int bufCount; MPI_Offset remain; #ifdef ENABLE_NULL_BYTE_HEADER_PADDING @@ -536,7 +551,7 @@ write_NC(NC *ncp) /* write the header in chunks */ remain = header_wlen; for (i=0; iflags, NC_HCOLL)) TRACE_IO(MPI_File_write_at_all)(ncp->collective_fh, 0, buf, bufCount, MPI_BYTE, &mpistatus); @@ -549,21 +564,18 @@ write_NC(NC *ncp) if (err == NC_EFILE) DEBUG_ASSIGN_ERROR(status, NC_EWRITE) } else { - /* update the number of bytes written since file open */ -#ifdef HAVE_MPI_GET_COUNT_C - MPI_Count put_size; - MPI_Get_count_c(&mpistatus, MPI_BYTE, &put_size); - ncp->put_size += put_size; -#else + /* Update the number of bytes read since file open. + * Because each rank writes no more than NC_MAX_INT at a time, + * it is OK to call MPI_Get_count, instead of MPI_Get_count_c. + */ int put_size; mpireturn = MPI_Get_count(&mpistatus, MPI_BYTE, &put_size); if (mpireturn != MPI_SUCCESS || put_size == MPI_UNDEFINED) - ncp->put_size += header_wlen; + ncp->put_size += bufCount; else ncp->put_size += put_size; -#endif } - remain -= NC_MAX_INT; + remain -= bufCount; } NCI_Free(buf); } diff --git a/src/drivers/ncmpio/ncmpio_header_get.c b/src/drivers/ncmpio/ncmpio_header_get.c index a09e89282..4067fd642 100644 --- a/src/drivers/ncmpio/ncmpio_header_get.c +++ b/src/drivers/ncmpio/ncmpio_header_get.c @@ -371,6 +371,9 @@ hdr_fetch(bufferinfo *gbp) { else { /* Obtain the actual read amount. It may be smaller than readLen, * when the remaining file size is smaller than read chunk size. + * Because each MPI File_read reads amount of readLen bytes, and + * readLen <= read chunk size which is <= NC_MAX_INT, calling + * MPI_Get_count() is sufficient. No need to call MPI_Get_count_c() */ int get_size; MPI_Get_count(&mpistatus, MPI_BYTE, &get_size); diff --git a/src/drivers/ncmpio/ncmpio_sync.c b/src/drivers/ncmpio/ncmpio_sync.c index eae730b38..c6fe19464 100644 --- a/src/drivers/ncmpio/ncmpio_sync.c +++ b/src/drivers/ncmpio/ncmpio_sync.c @@ -126,19 +126,17 @@ ncmpio_write_numrecs(NC *ncp, if (err == NC_EFILE) DEBUG_RETURN_ERROR(NC_EWRITE) } else { - /* update the number of bytes written since file open */ -#ifdef HAVE_MPI_GET_COUNT_C - MPI_Count put_size; - MPI_Get_count_c(&mpistatus, MPI_BYTE, &put_size); - ncp->put_size += put_size; -#else + /* update the number of bytes written since file open. + * Because the above MPI write writes either 4 or 8 bytes, + * calling MPI_Get_count() is sufficient. No need to call + * MPI_Get_count_c() + */ int put_size; mpireturn = MPI_Get_count(&mpistatus, MPI_BYTE, &put_size); if (mpireturn != MPI_SUCCESS || put_size == MPI_UNDEFINED) ncp->put_size += len; else ncp->put_size += put_size; -#endif } } return NC_NOERR; diff --git a/test/largefile/large_reqs.c b/test/largefile/large_reqs.c index 5a32d6227..19c32965e 100644 --- a/test/largefile/large_reqs.c +++ b/test/largefile/large_reqs.c @@ -117,8 +117,8 @@ int tst_one_var(char *filename, MPI_Comm comm) gsize[0] = NY; gsize[1] = NX; - lsize[0] = count[1]; - lsize[1] = count[2]; + lsize[0] = (int)count[1]; + lsize[1] = (int)count[2]; lstart[0] = 0; lstart[1] = 0; MPI_Type_create_subarray(2, gsize, lsize, lstart, MPI_ORDER_C, @@ -203,32 +203,32 @@ int tst_vars(char *filename, MPI_Comm comm) buf = (int*) malloc(buf_len * sizeof(int)); for (i=0; i 0) goto err_out; + nerrs += check_rec_vars(comm, ncid, varid+2); + if (nerrs > 0) goto err_out; + err = ncmpi_close(ncid); CHECK_ERR /* reopen the file and check file header size and extent */ err = ncmpi_open(comm, filename, NC_WRITE, MPI_INFO_NULL, &ncid); CHECK_ERR + err = ncmpi_inq_varid(ncid, "fa", &varid[0]); CHECK_ERR + err = ncmpi_inq_varid(ncid, "fb", &varid[1]); CHECK_ERR + err = ncmpi_inq_varid(ncid, "ta", &varid[2]); CHECK_ERR + err = ncmpi_inq_varid(ncid, "tb", &varid[3]); CHECK_ERR + err = ncmpi_inq_header_size(ncid, &hsize); CHECK_ERR err = ncmpi_inq_header_extent(ncid, &extent); CHECK_ERR if (verbose) @@ -103,6 +201,11 @@ tst_fmt(char *filename, int cmode) __LINE__,__FILE__, extent, old_extent); } + nerrs += check_fix_vars(comm, ncid, varid); + if (nerrs > 0) goto err_out; + nerrs += check_rec_vars(comm, ncid, varid+2); + if (nerrs > 0) goto err_out; + /* enter redefine mode and add nothing */ err = ncmpi_redef(ncid); CHECK_ERR @@ -131,6 +234,11 @@ tst_fmt(char *filename, int cmode) __LINE__,__FILE__, extent, old_extent + minfree); } + nerrs += check_fix_vars(comm, ncid, varid); + if (nerrs > 0) goto err_out; + nerrs += check_rec_vars(comm, ncid, varid+2); + if (nerrs > 0) goto err_out; + /* enter redefine mode and add nothing */ err = ncmpi_redef(ncid); CHECK_ERR @@ -159,6 +267,11 @@ tst_fmt(char *filename, int cmode) __LINE__,__FILE__, extent); } + nerrs += check_fix_vars(comm, ncid, varid); + if (nerrs > 0) goto err_out; + nerrs += check_rec_vars(comm, ncid, varid+2); + if (nerrs > 0) goto err_out; + err = ncmpi_close(ncid); CHECK_ERR @@ -167,6 +280,11 @@ tst_fmt(char *filename, int cmode) /* reopen the file and check file header size and extent */ err = ncmpi_open(comm, filename, NC_WRITE, MPI_INFO_NULL, &ncid); CHECK_ERR + err = ncmpi_inq_varid(ncid, "fa", &varid[0]); CHECK_ERR + err = ncmpi_inq_varid(ncid, "fb", &varid[1]); CHECK_ERR + err = ncmpi_inq_varid(ncid, "ta", &varid[2]); CHECK_ERR + err = ncmpi_inq_varid(ncid, "tb", &varid[3]); CHECK_ERR + /* enter redefine mode and add nothing */ err = ncmpi_redef(ncid); CHECK_ERR @@ -198,6 +316,11 @@ tst_fmt(char *filename, int cmode) unsetenv("PNETCDF_HINTS"); + nerrs += check_fix_vars(comm, ncid, varid); + if (nerrs > 0) goto err_out; + nerrs += check_rec_vars(comm, ncid, varid+2); + if (nerrs > 0) goto err_out; + /* enter redefine mode and add nothing */ err = ncmpi_redef(ncid); CHECK_ERR @@ -229,6 +352,11 @@ tst_fmt(char *filename, int cmode) /* obtain 1st record variable's file offset */ err = ncmpi_inq_varoffset(ncid, varid[2], &old_var_off); CHECK_ERR + nerrs += check_fix_vars(comm, ncid, varid); + if (nerrs > 0) goto err_out; + nerrs += check_rec_vars(comm, ncid, varid+2); + if (nerrs > 0) goto err_out; + /* enter redefine mode and add nothing */ err = ncmpi_redef(ncid); CHECK_ERR @@ -245,12 +373,21 @@ tst_fmt(char *filename, int cmode) __LINE__,__FILE__, var_off, old_var_off+400); } + nerrs += check_fix_vars(comm, ncid, varid); + if (nerrs > 0) goto err_out; + nerrs += check_rec_vars(comm, ncid, varid+2); + if (nerrs > 0) goto err_out; + #if 0 err = ncmpi_close(ncid); CHECK_ERR /* reopen the file and set r_align */ err = ncmpi_open(comm, filename, NC_WRITE, MPI_INFO_NULL, &ncid); CHECK_ERR #endif + + /* obtained the old offset of 1st record variable */ + err = ncmpi_inq_varoffset(ncid, varid[2], &old_var_off); CHECK_ERR + /* enter redefine mode and add nothing */ err = ncmpi_redef(ncid); CHECK_ERR @@ -261,13 +398,21 @@ tst_fmt(char *filename, int cmode) /* obtain 1st record variable's file offset */ err = ncmpi_inq_varoffset(ncid, varid[2], &var_off); CHECK_ERR + /* round up to r_align */ + exp_var_off = NUM_RNDUP(old_var_off, r_align); + /* var_off should grows into 1500 bytes */ - if (var_off != r_align) { + if (var_off != exp_var_off) { nerrs++; printf("Error at line %d in %s: 1st record variable offset %lld (expecting %lld)\n", - __LINE__,__FILE__, var_off, r_align); + __LINE__,__FILE__, var_off, exp_var_off); } + nerrs += check_fix_vars(comm, ncid, varid); + if (nerrs > 0) goto err_out; + nerrs += check_rec_vars(comm, ncid, varid+2); + if (nerrs > 0) goto err_out; + err = ncmpi_close(ncid); CHECK_ERR unsetenv("PNETCDF_HINTS"); @@ -278,7 +423,7 @@ tst_fmt(char *filename, int cmode) /* define only record variables */ err = ncmpi_def_dim(ncid, "time", NC_UNLIMITED, &dimid[0]); CHECK_ERR - err = ncmpi_def_dim(ncid, "dim", 25, &dimid[1]); CHECK_ERR + err = ncmpi_def_dim(ncid, "dim", LEN*nprocs, &dimid[1]); CHECK_ERR err = ncmpi_def_var(ncid, "ta", NC_INT, 2, dimid, &varid[2]); CHECK_ERR err = ncmpi_def_var(ncid, "tb", NC_INT, 2, dimid, &varid[3]); CHECK_ERR @@ -289,6 +434,14 @@ tst_fmt(char *filename, int cmode) r_align = 512; err = ncmpi__enddef(ncid, 0, v_align, 0, r_align); CHECK_ERR + start[0] = 0; start[1] = rank * LEN; + count[0] = 2; count[1] = LEN; + + for (i=0; i<2*LEN; i++) buf[i] = rank + i + 1000; + err = ncmpi_put_vara_int_all(ncid, varid[2], start, count, buf); CHECK_ERR + for (i=0; i<2*LEN; i++) buf[i] = rank + i + 10000; + err = ncmpi_put_vara_int_all(ncid, varid[3], start, count, buf); CHECK_ERR + err = ncmpi_inq_header_size(ncid, &hsize); CHECK_ERR err = ncmpi_inq_header_extent(ncid, &extent); CHECK_ERR if (verbose) @@ -309,7 +462,12 @@ tst_fmt(char *filename, int cmode) __LINE__,__FILE__, extent, r_align); } + nerrs += check_rec_vars(comm, ncid, varid+2); + if (nerrs > 0) goto err_out; + +err_out: err = ncmpi_close(ncid); CHECK_ERR + free(buf); return nerrs; }