Skip to content

Commit

Permalink
Merge pull request #149 from Parallel-NetCDF/serial
Browse files Browse the repository at this point in the history
Use MPI independent I/O when number of processes is 1
  • Loading branch information
wkliao authored Aug 28, 2024
2 parents 485fcd9 + 275d223 commit 9f9cf3b
Show file tree
Hide file tree
Showing 16 changed files with 199 additions and 147 deletions.
4 changes: 3 additions & 1 deletion src/drivers/ncmpio/ncmpio_NC.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ struct NC {
MPI_Offset get_size; /* amount of reads committed so far in bytes */

MPI_Comm comm; /* MPI communicator */
int rank; /* MPI rank of this process */
int nprocs; /* number of MPI processes */
MPI_Info mpiinfo; /* used MPI info object */
MPI_File collective_fh; /* file handle for collective mode */
MPI_File independent_fh; /* file handle for independent mode */
Expand Down Expand Up @@ -474,7 +476,7 @@ typedef struct bufferinfo {
int chunk; /* chunk size for reading the header */
int version; /* 1, 2, and 5 for CDF-1, 2, and 5 respectively */
int safe_mode;/* 0: disabled, 1: enabled */
int rw_mode; /* 0: independent, 1: collective */
int coll_mode;/* 0: independent, 1: collective */
char *base; /* beginning of read/write buffer */
char *pos; /* current position in buffer */
char *end; /* end position of buffer */
Expand Down
9 changes: 5 additions & 4 deletions src/drivers/ncmpio/ncmpio_attr.m4
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ ncmpio_rename_att(void *ncdp,

err_check:
if (nname != NULL) NCI_Free(nname);
if (ncp->safe_mode) {
if (ncp->safe_mode && ncp->nprocs > 1) {
int minE, mpireturn;

/* check error code across processes */
Expand Down Expand Up @@ -597,7 +597,7 @@ ncmpio_copy_att(void *ncdp_in,
}

err_check:
if (ncp_out->safe_mode) {
if (ncp_out->safe_mode && ncp_out->nprocs > 1) {
int minE, mpireturn;

/* check the error code across processes */
Expand Down Expand Up @@ -710,7 +710,7 @@ ncmpio_del_att(void *ncdp,

err_check:
if (nname != NULL) NCI_Free(nname);
if (ncp->safe_mode) {
if (ncp->safe_mode && ncp->nprocs > 1) {
int minE, mpireturn;

/* find min error code across processes */
Expand Down Expand Up @@ -1044,7 +1044,8 @@ ncmpio_put_att(void *ncdp,
}

err_check:
if (ncp->safe_mode) { /* check the error code across processes */
if (ncp->safe_mode && ncp->nprocs > 1) {
/* check the error code across processes */
int minE, mpireturn;

TRACE_COMM(MPI_Allreduce)(&err, &minE, 1, MPI_INT, MPI_MIN, ncp->comm);
Expand Down
21 changes: 11 additions & 10 deletions src/drivers/ncmpio/ncmpio_close.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ ncmpio_close_files(NC *ncp, int doUnlink) {
return ncmpii_error_mpi2nc(mpireturn, "MPI_File_close");
}

if (ncp->collective_fh != MPI_FILE_NULL) {
if (ncp->nprocs > 1 && ncp->collective_fh != MPI_FILE_NULL) {
TRACE_IO(MPI_File_close)(&ncp->collective_fh);
if (mpireturn != MPI_SUCCESS)
return ncmpii_error_mpi2nc(mpireturn, "MPI_File_close");
Expand All @@ -78,9 +78,13 @@ ncmpio_close_files(NC *ncp, int doUnlink) {
if (doUnlink) {
/* called from ncmpi_abort, if the file is being created and is still
* in define mode, the file is deleted */
TRACE_IO(MPI_File_delete)((char *)ncp->path, ncp->mpiinfo);
if (mpireturn != MPI_SUCCESS)
return ncmpii_error_mpi2nc(mpireturn, "MPI_File_delete");
if (ncp->rank == 0) {
TRACE_IO(MPI_File_delete)((char *)ncp->path, ncp->mpiinfo);
if (mpireturn != MPI_SUCCESS)
return ncmpii_error_mpi2nc(mpireturn, "MPI_File_delete");
}
if (ncp->nprocs > 1)
MPI_Barrier(ncp->comm);
}
return NC_NOERR;
}
Expand Down Expand Up @@ -163,13 +167,10 @@ ncmpio_close(void *ncdp)

/* file is open for write and no variable has been defined */
if (!NC_readonly(ncp) && ncp->vars.ndefined == 0) {
int rank;

/* wait until all processes close the file */
MPI_Barrier(ncp->comm);
if (ncp->nprocs > 1) MPI_Barrier(ncp->comm);

MPI_Comm_rank(ncp->comm, &rank);
if (rank == 0) {
if (ncp->rank == 0) {
/* ignore all errors, as unexpected file size if not a fatal error */
#ifdef HAVE_TRUNCATE
/* when calling POSIX I/O, remove file type prefix from file name */
Expand Down Expand Up @@ -222,7 +223,7 @@ ncmpio_close(void *ncdp)
}
#endif
}
MPI_Barrier(ncp->comm);
if (ncp->nprocs > 1) MPI_Barrier(ncp->comm);
}

/* free up space occupied by the header metadata */
Expand Down
4 changes: 3 additions & 1 deletion src/drivers/ncmpio/ncmpio_create.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,10 @@ ncmpio_create(MPI_Comm comm,
ncp->comm = comm; /* reuse comm duplicated in dispatch layer */
ncp->mpiinfo = info_used; /* is not MPI_INFO_NULL */
ncp->mpiomode = mpiomode;
ncp->rank = rank;
ncp->nprocs = nprocs;
ncp->collective_fh = fh;
ncp->independent_fh = MPI_FILE_NULL;
ncp->independent_fh = (nprocs > 1) ? MPI_FILE_NULL : fh;
ncp->path = (char*) NCI_Malloc(strlen(path) + 1);
strcpy(ncp->path, path);

Expand Down
4 changes: 2 additions & 2 deletions src/drivers/ncmpio/ncmpio_dim.c
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,10 @@ ncmpio_rename_dim(void *ncdp,
#endif

err_check:
if (ncp->safe_mode) {
if (ncp->safe_mode && ncp->nprocs > 1) {
/* check the error so far across processes */
int status, mpireturn;

/* check the error so far across processes */
TRACE_COMM(MPI_Allreduce)(&err, &status, 1, MPI_INT, MPI_MIN,ncp->comm);
if (mpireturn != MPI_SUCCESS) {
NCI_Free(nnewname);
Expand Down
79 changes: 47 additions & 32 deletions src/drivers/ncmpio/ncmpio_enddef.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,25 @@ move_file_block(NC *ncp,
MPI_Offset from, /* source file starting offset */
MPI_Offset nbytes) /* amount to be moved */
{
int rank, nprocs, bufcount, mpireturn, err, status=NC_NOERR, min_st;
int rank, bufcount, mpireturn, err, status=NC_NOERR, min_st;
void *buf;
size_t chunk_size;
MPI_Status mpistatus;
MPI_File fh;

MPI_Comm_size(ncp->comm, &nprocs);
MPI_Comm_rank(ncp->comm, &rank);
rank = ncp->rank;

/* moving file blocks must be done in collective mode, ignoring NC_HCOLL */
fh = ncp->collective_fh;

/* Divide amount nbytes among all processes. If the divided amount,
* chunk_size, is larger then MOVE_UNIT, set chunk_size to be the move unit
* size per process (make sure it is <= NC_MAX_INT, as MPI read/write APIs
* use 4-byte int in their count argument.)
*/
#define MOVE_UNIT 67108864
chunk_size = nbytes / nprocs;
if (nbytes % nprocs) chunk_size++;
chunk_size = nbytes / ncp->nprocs;
if (nbytes % ncp->nprocs) chunk_size++;
if (chunk_size > MOVE_UNIT) {
/* move data in multiple rounds, MOVE_UNIT per process at a time */
chunk_size = MOVE_UNIT;
Expand All @@ -66,8 +69,8 @@ move_file_block(NC *ncp,
if (buf == NULL) DEBUG_RETURN_ERROR(NC_ENOMEM)

/* make fileview entire file visible */
TRACE_IO(MPI_File_set_view)(ncp->collective_fh, 0, MPI_BYTE, MPI_BYTE,
"native", MPI_INFO_NULL);
TRACE_IO(MPI_File_set_view)(fh, 0, MPI_BYTE, MPI_BYTE, "native",
MPI_INFO_NULL);

/* move the variable starting from its tail toward its beginning */
while (nbytes > 0) {
Expand All @@ -77,7 +80,7 @@ move_file_block(NC *ncp,
* checked, must be < NC_MAX_INT
*/
bufcount = (int)chunk_size;
if (nbytes < (MPI_Offset)nprocs * chunk_size) {
if (nbytes < (MPI_Offset)ncp->nprocs * chunk_size) {
/* handle the last group of chunks */
MPI_Offset rem_chunks = nbytes / chunk_size;
if (rank > rem_chunks) /* these processes do not read/write */
Expand All @@ -88,7 +91,7 @@ move_file_block(NC *ncp,
nbytes = 0;
}
else {
nbytes -= chunk_size*nprocs;
nbytes -= chunk_size*ncp->nprocs;
}

/* explicitly initialize mpistatus object to 0. For zero-length read,
Expand All @@ -99,8 +102,7 @@ move_file_block(NC *ncp,
memset(&mpistatus, 0, sizeof(MPI_Status));

/* read the original data @ from+nbytes+rank*chunk_size */
TRACE_IO(MPI_File_read_at_all)(ncp->collective_fh,
from+nbytes+rank*chunk_size,
TRACE_IO(MPI_File_read_at_all)(fh, from+nbytes+rank*chunk_size,
buf, bufcount, MPI_BYTE, &mpistatus);
if (mpireturn != MPI_SUCCESS) {
err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_read_at_all");
Expand Down Expand Up @@ -128,11 +130,13 @@ move_file_block(NC *ncp,
ncp->get_size += get_size;
}

/* MPI_Barrier(ncp->comm); */
/* important, in case new region overlaps old region */
TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN,
ncp->comm);
status = min_st;
if (ncp->nprocs > 1) {
/* MPI_Barrier(ncp->comm); */
/* important, in case new region overlaps old region */
TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN,
ncp->comm);
status = min_st;
}
if (status != NC_NOERR) break;

/* write to new location @ to+nbytes+rank*chunk_size
Expand All @@ -159,8 +163,7 @@ move_file_block(NC *ncp,
*/
memset(&mpistatus, 0, sizeof(MPI_Status));

TRACE_IO(MPI_File_write_at_all)(ncp->collective_fh,
to+nbytes+rank*chunk_size,
TRACE_IO(MPI_File_write_at_all)(fh, to+nbytes+rank*chunk_size,
buf, get_size /* NOT bufcount */,
MPI_BYTE, &mpistatus);
if (mpireturn != MPI_SUCCESS) {
Expand All @@ -180,8 +183,10 @@ move_file_block(NC *ncp,
else
ncp->put_size += put_size;
}
TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN, ncp->comm);
status = min_st;
if (ncp->nprocs > 1) {
TRACE_COMM(MPI_Allreduce)(&status, &min_st, 1, MPI_INT, MPI_MIN, ncp->comm);
status = min_st;
}
if (status != NC_NOERR) break;
}
NCI_Free(buf);
Expand Down Expand Up @@ -269,8 +274,10 @@ NC_begins(NC *ncp)
MPI_Comm_rank(ncp->comm, &rank);
ncp->xsz = ncmpio_hdr_len_NC(ncp);

if (ncp->safe_mode) { /* this consistency check is redundant as metadata is
kept consistent at all time when safe mode is on */
if (ncp->safe_mode && ncp->nprocs > 1) {
/* this consistency check is redundant as metadata is kept consistent
* at all time when safe mode is on
*/
int err, status;
MPI_Offset root_xsz = ncp->xsz;

Expand Down Expand Up @@ -477,14 +484,22 @@ NC_begins(NC *ncp)
static int
write_NC(NC *ncp)
{
int status=NC_NOERR, mpireturn, err, rank;
int status=NC_NOERR, mpireturn, err, rank, is_coll;
MPI_Offset i, header_wlen, ntimes;
MPI_File fh;
MPI_Status mpistatus;

assert(!NC_readonly(ncp));

MPI_Comm_rank(ncp->comm, &rank);

/* Depending on whether NC_HCOLL is set, writing file header can be done
* through either MPI collective or independent write call.
* When * ncp->nprocs == 1, ncp->collective_fh == ncp->independent_fh
*/
is_coll = (ncp->nprocs > 1 && fIsSet(ncp->flags, NC_HCOLL)) ? 1 : 0;
fh = ncp->collective_fh;

/* In NC_begins(), root's ncp->xsz and ncp->begin_var, root's header
* size and extent, have been broadcast (sync-ed) among processes.
*/
Expand Down Expand Up @@ -554,12 +569,12 @@ write_NC(NC *ncp)
buf_ptr = buf;
for (i=0; i<ntimes; i++) {
int bufCount = (int) MIN(remain, NC_MAX_INT);
if (fIsSet(ncp->flags, NC_HCOLL))
TRACE_IO(MPI_File_write_at_all)(ncp->collective_fh, offset, buf_ptr,
bufCount, MPI_BYTE, &mpistatus);
if (is_coll)
TRACE_IO(MPI_File_write_at_all)(fh, offset, buf_ptr, bufCount,
MPI_BYTE, &mpistatus);
else
TRACE_IO(MPI_File_write_at)(ncp->collective_fh, offset, buf_ptr,
bufCount, MPI_BYTE, &mpistatus);
TRACE_IO(MPI_File_write_at)(fh, offset, buf_ptr, bufCount,
MPI_BYTE, &mpistatus);
if (mpireturn != MPI_SUCCESS) {
err = ncmpii_error_mpi2nc(mpireturn, "MPI_File_write_at");
/* write has failed, which is more serious than inconsistency */
Expand All @@ -586,12 +601,12 @@ write_NC(NC *ncp)
else if (fIsSet(ncp->flags, NC_HCOLL)) {
/* other processes participate the collective call */
for (i=0; i<ntimes; i++)
TRACE_IO(MPI_File_write_at_all)(ncp->collective_fh, 0, NULL,
0, MPI_BYTE, &mpistatus);
TRACE_IO(MPI_File_write_at_all)(fh, 0, NULL, 0, MPI_BYTE,
&mpistatus);
}

fn_exit:
if (ncp->safe_mode == 1) {
if (ncp->safe_mode == 1 && ncp->nprocs > 1) {
/* broadcast root's status, because only root writes to the file */
int root_status = status;
TRACE_COMM(MPI_Bcast)(&root_status, 1, MPI_INT, 0, ncp->comm);
Expand All @@ -611,7 +626,7 @@ write_NC(NC *ncp)
* do not get error and proceed to the next subroutine call.
*/
#define CHECK_ERROR(err) { \
if (ncp->safe_mode == 1) { \
if (ncp->safe_mode == 1 && ncp->nprocs > 1) { \
int status; \
TRACE_COMM(MPI_Allreduce)(&err, &status, 1, MPI_INT, MPI_MIN, \
ncp->comm); \
Expand Down
4 changes: 2 additions & 2 deletions src/drivers/ncmpio/ncmpio_file_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ ncmpio_read_write(NC *ncp,
xbuf = NCI_Malloc((size_t)req_size);
}

if (coll_indep == NC_REQ_COLL) {
if (ncp->nprocs > 1 && coll_indep == NC_REQ_COLL) {
TRACE_IO(MPI_File_read_at_all)(fh, offset, xbuf, xlen, xbuf_type,
&mpistatus);
if (mpireturn != MPI_SUCCESS) {
Expand Down Expand Up @@ -274,7 +274,7 @@ ncmpio_read_write(NC *ncp,
}
}

if (coll_indep == NC_REQ_COLL) {
if (ncp->nprocs > 1 && coll_indep == NC_REQ_COLL) {
TRACE_IO(MPI_File_write_at_all)(fh, offset, xbuf, xlen, xbuf_type,
&mpistatus);
if (mpireturn != MPI_SUCCESS) {
Expand Down
Loading

0 comments on commit 9f9cf3b

Please sign in to comment.