Skip to content

Commit

Permalink
Merge pull request #137 from Parallel-NetCDF/move_64M
Browse files Browse the repository at this point in the history
Update data section movement when file header extent grows
  • Loading branch information
wkliao authored Mar 20, 2024
2 parents eb65b88 + 8cc0138 commit 3013667
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 64 deletions.
2 changes: 2 additions & 0 deletions sneak_peek.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ This is essentially a placeholder for the next release note ...
programs and collapsible bullets to display their manual pages.

* Other updates:
+ When file header extent size grows, use 64 MiB per process as the move unit
size. See [PR #137](https://github.com/Parallel-NetCDF/PnetCDF/pull/137)
+ Since version 1.1.0, PnetCDF has been using file striping size, if
obtainable from hint `striping_unit` set by users or MPI-IO underneath, to
align the starting file offset of data section. This offset is also
Expand Down
70 changes: 41 additions & 29 deletions src/drivers/ncmpio/ncmpio_enddef.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,34 @@
/*----< move_file_block() >--------------------------------------------------*/
static int
move_file_block(NC *ncp,
MPI_Offset to,
MPI_Offset from,
MPI_Offset nbytes)
MPI_Offset to, /* destination file starting offset */
MPI_Offset from, /* source file starting offset */
MPI_Offset nbytes) /* amount to be moved */
{
int rank, nprocs, bufcount, mpireturn, err, status=NC_NOERR, min_st;
void *buf;
int chunk_size=1048576; /* move 1 MB per process at a time */
size_t chunk_size;
MPI_Status mpistatus;

MPI_Comm_size(ncp->comm, &nprocs);
MPI_Comm_rank(ncp->comm, &rank);

/* if the file striping unit size is known (obtained from MPI-IO), then
* we use that instead of 1 MB */
if (ncp->striping_unit > 0) chunk_size = ncp->striping_unit;
/* Divide amount nbytes among all processes. If the divided amount,
* chunk_size, is larger then MOVE_UNIT, set chunk_size to be the move unit
* size per process (make sure it is <= NC_MAX_INT, as MPI read/write APIs
* use 4-byte int in their count argument.)
*/
#define MOVE_UNIT 67108864
chunk_size = nbytes / nprocs;
if (nbytes % nprocs) chunk_size++;
if (chunk_size > MOVE_UNIT) {
/* move data in multiple rounds, MOVE_UNIT per process at a time */
chunk_size = MOVE_UNIT;
}

/* buf will be used as a temporal buffer to move data in chunks, i.e.
* read a chunk and later write to the new location */
buf = NCI_Malloc((size_t)chunk_size);
buf = NCI_Malloc(chunk_size);
if (buf == NULL) DEBUG_RETURN_ERROR(NC_ENOMEM)

/* make fileview entire file visible */
Expand All @@ -64,14 +73,17 @@ move_file_block(NC *ncp,
while (nbytes > 0) {
int get_size=0;

/* calculate how much to move at each time */
bufcount = chunk_size;
/* calculate how much to move at each time. chunk_size has been
* checked, must be < NC_MAX_INT
*/
bufcount = (int)chunk_size;
if (nbytes < (MPI_Offset)nprocs * chunk_size) {
/* handle the last group of chunks */
MPI_Offset rem_chunks = nbytes / chunk_size;
if (rank > rem_chunks) /* these processes do not read/write */
bufcount = 0;
else if (rank == rem_chunks) /* this process reads/writes less */
/* make bufcount < chunk_size */
bufcount = (int)(nbytes % chunk_size);
nbytes = 0;
}
Expand All @@ -93,6 +105,7 @@ move_file_block(NC *ncp,
if (mpireturn != MPI_SUCCESS) {
err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_read_at_all");
if (err == NC_EFILE) DEBUG_ASSIGN_ERROR(status, NC_EREAD)
get_size = bufcount;
}
else {
/* for zero-length read, MPI_Get_count may report incorrect result
Expand All @@ -105,6 +118,11 @@ move_file_block(NC *ncp,
* read from a file may be less than bufcount. Because we are
* moving whatever read to a new file offset, we must use the
* amount actually read to call MPI_File_write_at_all below.
*
* Update the number of bytes read since file open.
* Because each rank reads and writes no more than one chunk_size
* at a time and chunk_size is < NC_MAX_INT, it is OK to call
* MPI_Get_count, instead of MPI_Get_count_c.
*/
MPI_Get_count(&mpistatus, MPI_BYTE, &get_size);
ncp->get_size += get_size;
Expand Down Expand Up @@ -143,26 +161,24 @@ move_file_block(NC *ncp,

TRACE_IO(MPI_File_write_at_all)(ncp->collective_fh,
to+nbytes+rank*chunk_size,
buf, get_size /* bufcount */,
buf, get_size /* NOT bufcount */,
MPI_BYTE, &mpistatus);
if (mpireturn != MPI_SUCCESS) {
err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_write_at_all");
if (err == NC_EFILE) DEBUG_ASSIGN_ERROR(status, NC_EWRITE)
}
else {
/* update the number of bytes written since file open */
#ifdef HAVE_MPI_GET_COUNT_C
MPI_Count put_size;
MPI_Get_count_c(&mpistatus, MPI_BYTE, &put_size);
ncp->put_size += put_size;
#else
/* update the number of bytes written since file open.
* Because each rank reads and writes no more than one chunk_size
* at a time and chunk_size is < NC_MAX_INT, it is OK to call
* MPI_Get_count, instead of MPI_Get_count_c.
*/
int put_size;
mpireturn = MPI_Get_count(&mpistatus, MPI_BYTE, &put_size);
if (mpireturn != MPI_SUCCESS || put_size == MPI_UNDEFINED)
ncp->put_size += get_size; /* or bufcount */
else
ncp->put_size += put_size;
#endif
}
TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN, ncp->comm);
status = min_st;
Expand Down Expand Up @@ -501,7 +517,6 @@ write_NC(NC *ncp)
/* only rank 0's header gets written to the file */
if (rank == 0) {
void *buf=NULL;
int bufCount;
MPI_Offset remain;

#ifdef ENABLE_NULL_BYTE_HEADER_PADDING
Expand Down Expand Up @@ -536,7 +551,7 @@ write_NC(NC *ncp)
/* write the header in chunks */
remain = header_wlen;
for (i=0; i<ntimes; i++) {
bufCount = (int) MIN(remain, NC_MAX_INT);
int bufCount = (int) MIN(remain, NC_MAX_INT);
if (fIsSet(ncp->flags, NC_HCOLL))
TRACE_IO(MPI_File_write_at_all)(ncp->collective_fh, 0, buf,
bufCount, MPI_BYTE, &mpistatus);
Expand All @@ -549,21 +564,18 @@ write_NC(NC *ncp)
if (err == NC_EFILE) DEBUG_ASSIGN_ERROR(status, NC_EWRITE)
}
else {
/* update the number of bytes written since file open */
#ifdef HAVE_MPI_GET_COUNT_C
MPI_Count put_size;
MPI_Get_count_c(&mpistatus, MPI_BYTE, &put_size);
ncp->put_size += put_size;
#else
/* Update the number of bytes read since file open.
* Because each rank writes no more than NC_MAX_INT at a time,
* it is OK to call MPI_Get_count, instead of MPI_Get_count_c.
*/
int put_size;
mpireturn = MPI_Get_count(&mpistatus, MPI_BYTE, &put_size);
if (mpireturn != MPI_SUCCESS || put_size == MPI_UNDEFINED)
ncp->put_size += header_wlen;
ncp->put_size += bufCount;
else
ncp->put_size += put_size;
#endif
}
remain -= NC_MAX_INT;
remain -= bufCount;
}
NCI_Free(buf);
}
Expand Down
3 changes: 3 additions & 0 deletions src/drivers/ncmpio/ncmpio_header_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ hdr_fetch(bufferinfo *gbp) {
else {
/* Obtain the actual read amount. It may be smaller than readLen,
* when the remaining file size is smaller than read chunk size.
* Because each MPI File_read reads amount of readLen bytes, and
* readLen <= read chunk size which is <= NC_MAX_INT, calling
* MPI_Get_count() is sufficient. No need to call MPI_Get_count_c()
*/
int get_size;
MPI_Get_count(&mpistatus, MPI_BYTE, &get_size);
Expand Down
12 changes: 5 additions & 7 deletions src/drivers/ncmpio/ncmpio_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,17 @@ ncmpio_write_numrecs(NC *ncp,
if (err == NC_EFILE) DEBUG_RETURN_ERROR(NC_EWRITE)
}
else {
/* update the number of bytes written since file open */
#ifdef HAVE_MPI_GET_COUNT_C
MPI_Count put_size;
MPI_Get_count_c(&mpistatus, MPI_BYTE, &put_size);
ncp->put_size += put_size;
#else
/* update the number of bytes written since file open.
* Because the above MPI write writes either 4 or 8 bytes,
* calling MPI_Get_count() is sufficient. No need to call
* MPI_Get_count_c()
*/
int put_size;
mpireturn = MPI_Get_count(&mpistatus, MPI_BYTE, &put_size);
if (mpireturn != MPI_SUCCESS || put_size == MPI_UNDEFINED)
ncp->put_size += len;
else
ncp->put_size += put_size;
#endif
}
}
return NC_NOERR;
Expand Down
32 changes: 16 additions & 16 deletions test/largefile/large_reqs.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ int tst_one_var(char *filename, MPI_Comm comm)

gsize[0] = NY;
gsize[1] = NX;
lsize[0] = count[1];
lsize[1] = count[2];
lsize[0] = (int)count[1];
lsize[1] = (int)count[2];
lstart[0] = 0;
lstart[1] = 0;
MPI_Type_create_subarray(2, gsize, lsize, lstart, MPI_ORDER_C,
Expand Down Expand Up @@ -203,32 +203,32 @@ int tst_vars(char *filename, MPI_Comm comm)
buf = (int*) malloc(buf_len * sizeof(int));
for (i=0; i<buf_len; i++) buf[i] = (i + rank) % 128;

/* set subarray offset and length */
start[0] = 0;
start[1] = LEN * (rank / psize[1]);
start[2] = LEN * (rank % psize[1]);
count[0] = 1;
count[1] = LEN - gap;
count[2] = LEN - gap;

if (verbose)
printf("rank %d start=%lld %lld count=%lld %lld\n",
rank, start[1],start[2], count[1],count[2]);

/* create a subarray datatype for user buffer */
int gsize[2], lsize[2], lstart[2];
MPI_Datatype buftype;

gsize[0] = LEN;
gsize[1] = LEN;
lsize[0] = count[1];
lsize[1] = count[2];
lsize[0] = LEN - gap;
lsize[1] = LEN - gap;
lstart[0] = 0;
lstart[1] = 0;
MPI_Type_create_subarray(2, gsize, lsize, lstart, MPI_ORDER_C,
MPI_INT, &buftype);
MPI_Type_commit(&buftype);

/* set subarray offset and length */
start[0] = 0;
start[1] = LEN * (rank / psize[1]);
start[2] = LEN * (rank % psize[1]);
count[0] = 1;
count[1] = lsize[0];
count[2] = lsize[1];

if (verbose)
printf("rank %d start=%lld %lld count=%lld %lld\n",
rank, start[1],start[2], count[1],count[2]);

if (verbose)
printf("%d: nonblocking write total amount = %.1f GiB\n",
rank, (float)count[1]*count[2]*NVARS*sizeof(int)/1073741824);
Expand Down
11 changes: 6 additions & 5 deletions test/largefile/large_var.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ swapn(void *buf,
int main(int argc, char** argv)
{
char filename[256];
int i, j, rank, nprocs, err, nerrs=0, bufsize, expected;
size_t bufsize;
int i, j, rank, nprocs, err, nerrs=0, expected;
int ncid, cmode, varid, dimid[3], req[3], st[3], *buf, *buf_ptr;
MPI_Offset offset, var_offset, start[3], count[3];
MPI_File fh;
Expand Down Expand Up @@ -436,7 +437,7 @@ int main(int argc, char** argv)
for (i=0; i<count[0]; i++) {
for (j=0; j<count[1]; j++) {
offset = var_offset + ((start[0] + i) * NY * NX + (start[1] + j) * NX + start[2]) * sizeof(int);
MPI_File_read_at(fh, offset, buf_ptr, count[2], MPI_INT, &status);
MPI_File_read_at(fh, offset, buf_ptr, (int)count[2], MPI_INT, &status);
#ifndef WORDS_BIGENDIAN
swapn(buf_ptr, count[2]);
#endif
Expand Down Expand Up @@ -466,7 +467,7 @@ int main(int argc, char** argv)
for (i=0; i<count[0]; i++) {
for (j=0; j<count[1]; j++) {
offset = var_offset + ((start[0] + i) * NY * NX + (start[1] + j) * NX + start[2]) * sizeof(int);
MPI_File_read_at(fh, offset, buf_ptr, count[2], MPI_INT, &status);
MPI_File_read_at(fh, offset, buf_ptr, (int)count[2], MPI_INT, &status);
#ifndef WORDS_BIGENDIAN
swapn(buf_ptr, count[2]);
#endif
Expand Down Expand Up @@ -495,7 +496,7 @@ int main(int argc, char** argv)
for (i=0; i<count[0]; i++) {
for (j=0; j<count[1]; j++) {
offset = var_offset + ((start[0] + i) * NY * NX + (start[1] + j) * NX + start[2]) * sizeof(int);
MPI_File_read_at(fh, offset, buf_ptr, count[2], MPI_INT, &status);
MPI_File_read_at(fh, offset, buf_ptr, (int)count[2], MPI_INT, &status);
#ifndef WORDS_BIGENDIAN
swapn(buf_ptr, count[2]);
#endif
Expand Down Expand Up @@ -523,7 +524,7 @@ int main(int argc, char** argv)
for (i=0; i<count[0]; i++) {
for (j=0; j<count[1]; j++) {
offset = var_offset + ((start[0] + i) * NY * NX + (start[1] + j) * NX + start[2]) * sizeof(int);
MPI_File_read_at(fh, offset, buf_ptr, count[2], MPI_INT, &status);
MPI_File_read_at(fh, offset, buf_ptr, (int)count[2], MPI_INT, &status);
#ifndef WORDS_BIGENDIAN
swapn(buf_ptr, count[2]);
#endif
Expand Down
Loading

0 comments on commit 3013667

Please sign in to comment.