Skip to content

Commit

Permalink
Fix some issues with interleaved output from ph5diff
Browse files Browse the repository at this point in the history
  • Loading branch information
jhendersonHDF committed Apr 5, 2024
1 parent 90a1bae commit 4c34bd1
Showing 1 changed file with 70 additions and 41 deletions.
111 changes: 70 additions & 41 deletions tools/lib/h5diff.c
Original file line number Diff line number Diff line change
Expand Up @@ -1265,38 +1265,38 @@ diff_match(hid_t file1_id, const char *grp1, trav_info_t *info1, hid_t file2_id,
int incomingMessage;

/* check if any tasks freed up, and didn't need to print. */
MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &incomingMessage, &Status);

/* first block*/
if (incomingMessage) {
workerTasks[Status.MPI_SOURCE - 1] = 1;
MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE,
MPI_TAG_DONE, MPI_COMM_WORLD, &Status);
nfound += nFoundbyWorker.nfound;
opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp;
busyTasks--;
} /* end if */

/* check to see if the print token was returned. */
if (!havePrintToken) {
/* If we don't have the token, someone is probably sending us output */
print_incoming_data();

/* check incoming queue for token */
MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage,
do {
MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &incomingMessage,
&Status);

/* incoming token implies free task. */
if (incomingMessage) {
workerTasks[Status.MPI_SOURCE - 1] = 1;
MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE,
MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
MPI_TAG_DONE, MPI_COMM_WORLD, &Status);
nfound += nFoundbyWorker.nfound;
opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp;
busyTasks--;
havePrintToken = 1;
} /* end if */
} /* end if */
}
} while(incomingMessage);

/* check to see if the print token was returned. */
if (!havePrintToken) {
/* If we don't have the token, someone is probably sending us output */
do {
/* check incoming queue for token */
MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage,
&Status);

print_incoming_data();
} while (!incomingMessage);

workerTasks[Status.MPI_SOURCE - 1] = 1;
MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE,
MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
nfound += nFoundbyWorker.nfound;
opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp;
busyTasks--;
havePrintToken = 1;
} /* end if */

/* check to see if anyone needs the print token. */
if (havePrintToken) {
Expand Down Expand Up @@ -1332,7 +1332,7 @@ diff_match(hid_t file1_id, const char *grp1, trav_info_t *info1, hid_t file2_id,

if (!workerFound) {
/* if they were all busy, we've got to wait for one free up
* before we can move on. If we don't have the token, some
* before we can move on. If we don't have the token, some
* task is currently printing so we'll wait for that task to
* return it.
*/
Expand All @@ -1359,8 +1359,6 @@ diff_match(hid_t file1_id, const char *grp1, trav_info_t *info1, hid_t file2_id,
/* if we do have the token, check for task to free up, or wait for a task to request
* it */
else {
/* But first print all the data in our incoming queue */
print_incoming_data();
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status);
if (Status.MPI_TAG == MPI_TAG_DONE) {
MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE,
Expand Down Expand Up @@ -1413,13 +1411,16 @@ diff_match(hid_t file1_id, const char *grp1, trav_info_t *info1, hid_t file2_id,
if (g_Parallel) {
/* make sure all tasks are done */
while (busyTasks > 0) {
int task = 0;

MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status);
task = Status.MPI_SOURCE - 1;

if (Status.MPI_TAG == MPI_TAG_DONE) {
MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE,
MPI_TAG_DONE, MPI_COMM_WORLD, &Status);
nfound += nFoundbyWorker.nfound;
opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp;
busyTasks--;
} /* end if */
else if (Status.MPI_TAG == MPI_TAG_TOK_REQUEST) {
MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD,
Expand All @@ -1440,14 +1441,17 @@ diff_match(hid_t file1_id, const char *grp1, trav_info_t *info1, hid_t file2_id,
MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
nfound += nFoundbyWorker.nfound;
opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp;
busyTasks--;
} /* end if */
/* someone else must have it...wait for them to return it, then give it to the task that
* just asked for it. */
else {
int source = Status.MPI_SOURCE;
int incomingMessage;

/*
* Another worker task must have the print token. Wait for that task to
* return it, then give the token to the task that just asked for it and
* wait until that task returns it.
*/

do {
MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage,
&Status);
Expand All @@ -1459,35 +1463,60 @@ diff_match(hid_t file1_id, const char *grp1, trav_info_t *info1, hid_t file2_id,
MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
nfound += nFoundbyWorker.nfound;
opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp;
/* Mark previously busy worker task as free */
workerTasks[Status.MPI_SOURCE - 1] = 1;
busyTasks--;

MPI_Send(NULL, 0, MPI_BYTE, source, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD);

do {
MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD,
&incomingMessage, &Status);

print_incoming_data();
} while (!incomingMessage);

MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE,
MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
nfound += nFoundbyWorker.nfound;
opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp;

havePrintToken = 1;
} /* end else */
} /* end else-if */
else if (Status.MPI_TAG == MPI_TAG_TOK_RETURN) {
MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE,
MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
nfound += nFoundbyWorker.nfound;
opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp;
busyTasks--;
havePrintToken = 1;
} /* end else-if */
else if (Status.MPI_TAG == MPI_TAG_PRINT_DATA) {
char data[PRINT_DATA_MAX_SIZE + 1];
memset(data, 0, PRINT_DATA_MAX_SIZE + 1);
int incomingMessage;

MPI_Recv(data, PRINT_DATA_MAX_SIZE, MPI_CHAR, Status.MPI_SOURCE, MPI_TAG_PRINT_DATA,
MPI_COMM_WORLD, &Status);
do {
MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD,
&incomingMessage, &Status);

parallel_print("%s", data);
print_incoming_data();
} while (!incomingMessage);

MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE,
MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status);
nfound += nFoundbyWorker.nfound;
opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp;

havePrintToken = 1;
} /* end else-if */
else {
fprintf(stderr, "ph5diff-manager: ERROR!! Invalid tag (%d) received \n", Status.MPI_TAG);
MPI_Abort(MPI_COMM_WORLD, 0);
} /* end else */
} /* end while */

/* Print any final data waiting in our queue */
print_incoming_data();
/* Mark worker task as free */
workerTasks[task] = 1;
busyTasks--;
} /* end while */
} /* end if */
H5TOOLS_DEBUG("done with if block");

Expand Down

0 comments on commit 4c34bd1

Please sign in to comment.