From 6114d1adfb86de3dcef4e423ae2d8946d53f3745 Mon Sep 17 00:00:00 2001 From: jhendersonHDF Date: Fri, 3 May 2024 15:00:15 -0500 Subject: [PATCH] Fixes and cleanup for ph5diff (#4460) * Fixes and cleanup for ph5diff Fixes concurrency issues in ph5diff that can cause interleaved output Fixes an issue where output can sometimes be dropped if it ended up in ph5diff's output overflow file Fixes an issue where MPI_Init is called after HDF5 has been initialized, preventing the library from setting up an MPI attribute to perform cleanup on MPI_Finalize Fixes an issue in config/cmake/runTest.cmake where the CMake logic would try to access an invalid list index if the number of lines in a test's output and reference files don't match * Add release note --- config/cmake/runTest.cmake | 22 +- release_docs/RELEASE.txt | 46 ++- tools/lib/h5diff.c | 659 ++++++++++++++------------------ tools/src/h5diff/ph5diff_main.c | 60 +-- 4 files changed, 370 insertions(+), 417 deletions(-) diff --git a/config/cmake/runTest.cmake b/config/cmake/runTest.cmake index efb03e7c115..8eed074ecb6 100644 --- a/config/cmake/runTest.cmake +++ b/config/cmake/runTest.cmake @@ -265,12 +265,22 @@ if (NOT TEST_SKIP_COMPARE) endif () math (EXPR _FP_LEN "${len_ref} - 1") foreach (line RANGE 0 ${_FP_LEN}) - list (GET test_act ${line} str_act) - list (GET test_ref ${line} str_ref) - if (NOT str_act STREQUAL str_ref) - if (str_act) - set (TEST_COMPARE_RESULT 1) - message (STATUS "line = ${line}\n***ACTUAL: ${str_act}\n****REFER: ${str_ref}\n") + if (line GREATER_EQUAL len_act) + message (STATUS "COMPARE FAILED: ran out of lines in ${TEST_FOLDER}/${TEST_OUTPUT}") + set (TEST_COMPARE_RESULT 1) + break () + elseif (line GREATER_EQUAL len_ref) + message (STATUS "COMPARE FAILED: ran out of lines in ${TEST_FOLDER}/${TEST_REFERENCE}") + set (TEST_COMPARE_RESULT 1) + break () + else () + list (GET test_act ${line} str_act) + list (GET test_ref ${line} str_ref) + if (NOT str_act STREQUAL str_ref) + if (str_act) + set (TEST_COMPARE_RESULT 1) + message (STATUS "line = ${line}\n***ACTUAL: ${str_act}\n****REFER: ${str_ref}\n") + endif () endif () endif () endforeach () diff --git a/release_docs/RELEASE.txt b/release_docs/RELEASE.txt index a121ed372ac..ac032584875 100644 --- a/release_docs/RELEASE.txt +++ b/release_docs/RELEASE.txt @@ -747,20 +747,6 @@ Support for new platforms, languages and compilers Bug Fixes since HDF5-1.14.0 release =================================== - Configuration: - ------------- - - Fix Autotools -Werror cleanup - - The Autotools temporarily scrub -Werror(=whatever) from CFLAGS, etc. - so configure checks don't trip over warnings generated by configure - check programs. The sed line originally only scrubbed -Werror but not - -Werror=something, which would cause errors when the '=something' was - left behind in CFLAGS. - - The sed line has been updated to handle -Werror=something lines. - - Fixes one issue raised in #3872 - Library ------- @@ -1672,6 +1658,24 @@ Bug Fixes since HDF5-1.14.0 release Configuration ------------- + - Fixed a list index out of range issue in the runTest.cmake file + + Fixed an issue in config/cmake/runTest.cmake where the CMake logic + would try to access an invalid list index if the number of lines in + a test's output and reference files don't match + + - Fix Autotools -Werror cleanup + + The Autotools temporarily scrub -Werror(=whatever) from CFLAGS, etc. + so configure checks don't trip over warnings generated by configure + check programs. The sed line originally only scrubbed -Werror but not + -Werror=something, which would cause errors when the '=something' was + left behind in CFLAGS. + + The sed line has been updated to handle -Werror=something lines. + + Fixes one issue raised in #3872 + - Changed default of 'Error on HDF5 doxygen warnings' DOXYGEN_WARN_AS_ERROR option. The default setting of DOXYGEN_WARN_AS_ERROR to 'FAIL_ON_WARNINGS' has been changed @@ -1792,12 +1796,24 @@ Bug Fixes since HDF5-1.14.0 release Tools ----- + - Fixed several issues in ph5diff + + The parallel logic for the ph5diff tool inside the shared h5diff code was + refactored and cleaned up to fix several issues with the ph5diff tool. This + fixed: + + - several concurrency issues in ph5diff that can result in interleaved + output + - an issue where output can sometimes be dropped when it ends up in + ph5diff's output overflow file + - an issue where MPI_Init was called after HDF5 had been initialized, + preventing the library from setting up an MPI communicator attribute + to perform library cleanup on MPI_Finalize - Renamed h5fuse.sh to h5fuse Addresses Discussion #3791 - - Fixed an issue with unmatched MPI messages in ph5diff The "manager" MPI rank in ph5diff was unintentionally sending "program end" diff --git a/tools/lib/h5diff.c b/tools/lib/h5diff.c index bdbda6e74bc..9329872d965 100644 --- a/tools/lib/h5diff.c +++ b/tools/lib/h5diff.c @@ -16,6 +16,12 @@ #include "h5diff.h" #include "ph5diff.h" +#ifdef H5_HAVE_PARALLEL +static diff_err_t handle_worker_request(char *worker_tasks, int *n_busy_tasks, diff_opt_t *opts, + hsize_t *n_diffs); +static diff_err_t dispatch_diff_to_worker(struct diff_mpi_args *args, char *worker_tasks, int *n_busy_tasks); +#endif + /*------------------------------------------------------------------------- * Function: print_objname * @@ -91,35 +97,6 @@ phdiff_dismiss_workers(void) for (i = 1; i < g_nTasks; i++) MPI_Send(NULL, 0, MPI_BYTE, i, MPI_TAG_END, MPI_COMM_WORLD); } - -/*------------------------------------------------------------------------- - * Function: print_incoming_data - * - * Purpose: special function that prints any output that has been sent to the manager - * and is currently sitting in the incoming message queue - * - * Return: none - *------------------------------------------------------------------------- - */ - -static void -print_incoming_data(void) -{ - char data[PRINT_DATA_MAX_SIZE + 1]; - int incomingMessage; - MPI_Status Status; - - do { - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_PRINT_DATA, MPI_COMM_WORLD, &incomingMessage, &Status); - if (incomingMessage) { - memset(data, 0, PRINT_DATA_MAX_SIZE + 1); - MPI_Recv(data, PRINT_DATA_MAX_SIZE, MPI_CHAR, Status.MPI_SOURCE, MPI_TAG_PRINT_DATA, - MPI_COMM_WORLD, &Status); - - parallel_print("%s", data); - } - } while (incomingMessage); -} #endif /*------------------------------------------------------------------------- @@ -992,24 +969,6 @@ h5diff(const char *fname1, const char *fname2, const char *objname1, const char } H5TOOLS_DEBUG("groups traversed - errstat:%d", opts->err_stat); -#ifdef H5_HAVE_PARALLEL - if (g_Parallel) { - int i; - - if ((strlen(fname1) > MAX_FILENAME) || (strlen(fname2) > MAX_FILENAME)) { - fprintf(stderr, "The parallel diff only supports path names up to %d characters\n", MAX_FILENAME); - MPI_Abort(MPI_COMM_WORLD, 0); - } /* end if */ - - strcpy(filenames[0], fname1); - strcpy(filenames[1], fname2); - - /* Alert the worker tasks that there's going to be work. */ - for (i = 1; i < g_nTasks; i++) - MPI_Send(filenames, (MAX_FILENAME * 2), MPI_CHAR, i, MPI_TAG_PARALLEL, MPI_COMM_WORLD); - } /* end if */ -#endif - H5TOOLS_DEBUG("build_match_list next - errstat:%d", opts->err_stat); /* process the objects */ build_match_list(obj1fullname, info1_lp, obj2fullname, info2_lp, &match_list, opts); @@ -1042,6 +1001,24 @@ h5diff(const char *fname1, const char *fname2, const char *objname1, const char parallel_print("\n"); } /* end if */ } + +#ifdef H5_HAVE_PARALLEL + if (g_Parallel) { + if ((strlen(fname1) > MAX_FILENAME - 1) || (strlen(fname2) > MAX_FILENAME - 1)) { + fprintf(stderr, "The parallel diff only supports path names up to %d characters\n", + MAX_FILENAME - 1); + MPI_Abort(MPI_COMM_WORLD, 0); + } /* end if */ + + strcpy(filenames[0], fname1); + strcpy(filenames[1], fname2); + + /* Alert the worker tasks that there's going to be work. */ + for (int i = 1; i < g_nTasks; i++) + MPI_Send(filenames, (MAX_FILENAME * 2), MPI_CHAR, i, MPI_TAG_PARALLEL, MPI_COMM_WORLD); + } /* end if */ +#endif + H5TOOLS_DEBUG("diff_match next - errstat:%d", opts->err_stat); nfound = diff_match(file1_id, obj1fullname, info1_lp, file2_id, obj2fullname, info2_lp, match_list, opts); H5TOOLS_DEBUG("diff_match nfound: %d - errstat:%d", nfound, opts->err_stat); @@ -1117,11 +1094,26 @@ diff_match(hid_t file1_id, const char *grp1, trav_info_t *info1, hid_t file2_id, char *obj1_fullpath = NULL; char *obj2_fullpath = NULL; diff_args_t argdata; - size_t idx1 = 0; - size_t idx2 = 0; - diff_err_t ret_value = opts->err_stat; + size_t idx1 = 0; + size_t idx2 = 0; +#ifdef H5_HAVE_PARALLEL + char *workerTasks = NULL; + int busyTasks = 0; +#endif + diff_err_t ret_value = opts->err_stat; H5TOOLS_START_DEBUG(" - errstat:%d", opts->err_stat); + +#ifdef H5_HAVE_PARALLEL + if (g_Parallel) { + if (NULL == (workerTasks = malloc((size_t)(g_nTasks - 1) * sizeof(char)))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "unable to allocate worker tasks array"); + + /*set all tasks as free */ + memset(workerTasks, 1, (size_t)(g_nTasks - 1) * sizeof(char)); + } +#endif + /* * if not root, prepare object name to be pre-appended to group path to * make full path @@ -1162,339 +1154,115 @@ diff_match(hid_t file1_id, const char *grp1, trav_info_t *info1, hid_t file2_id, * do the diff for common objects *------------------------------------------------------------------------- */ -#ifdef H5_HAVE_PARALLEL - { - char *workerTasks = (char *)malloc((size_t)(g_nTasks - 1) * sizeof(char)); - int n; - int busyTasks = 0; - struct diffs_found nFoundbyWorker; - struct diff_mpi_args args; - int havePrintToken = 1; - MPI_Status Status; + for (i = 0; i < table->nobjs; i++) { + H5TOOLS_DEBUG("diff for common objects[%d] - errstat:%d", i, opts->err_stat); - /*set all tasks as free */ - memset(workerTasks, 1, (size_t)(g_nTasks - 1) * sizeof(char)); -#endif + /* Check if object is present in both files first before diffing */ + if (!(table->objs[i].flags[0] && table->objs[i].flags[1])) + continue; - for (i = 0; i < table->nobjs; i++) { - H5TOOLS_DEBUG("diff for common objects[%d] - errstat:%d", i, opts->err_stat); - if (table->objs[i].flags[0] && table->objs[i].flags[1]) { - /* make full path for obj1 */ + /* Make full paths for objects */ #ifdef H5_HAVE_ASPRINTF - /* Use the asprintf() routine, since it does what we're trying to do below */ - if (asprintf(&obj1_fullpath, "%s%s", grp1_path, table->objs[i].name) < 0) { - H5TOOLS_ERROR(H5DIFF_ERR, "name buffer allocation failed"); - } -#else /* H5_HAVE_ASPRINTF */ - if ((obj1_fullpath = (char *)malloc(strlen(grp1_path) + strlen(table->objs[i].name) + 1)) == - NULL) { - H5TOOLS_ERROR(H5DIFF_ERR, "name buffer allocation failed"); - } - else { - strcpy(obj1_fullpath, grp1_path); - strcat(obj1_fullpath, table->objs[i].name); - } -#endif /* H5_HAVE_ASPRINTF */ - H5TOOLS_DEBUG("diff_match path1 - %s", obj1_fullpath); + /* Use the asprintf() routine, since it does what we're trying to do below */ + if (asprintf(&obj1_fullpath, "%s%s", grp1_path, table->objs[i].name) < 0) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "name buffer allocation failed"); + if (asprintf(&obj2_fullpath, "%s%s", grp2_path, table->objs[i].name) < 0) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "name buffer allocation failed"); +#else + if (NULL == (obj1_fullpath = malloc(strlen(grp1_path) + strlen(table->objs[i].name) + 1))) + H5TOOLS_ERROR(H5DIFF_ERR, "name buffer allocation failed"); + else { + strcpy(obj1_fullpath, grp1_path); + strcat(obj1_fullpath, table->objs[i].name); + } + if (NULL == (obj2_fullpath = malloc(strlen(grp2_path) + strlen(table->objs[i].name) + 1))) + H5TOOLS_ERROR(H5DIFF_ERR, "name buffer allocation failed"); + else { + strcpy(obj2_fullpath, grp2_path); + strcat(obj2_fullpath, table->objs[i].name); + } +#endif - /* make full path for obj2 */ -#ifdef H5_HAVE_ASPRINTF - /* Use the asprintf() routine, since it does what we're trying to do below */ - if (asprintf(&obj2_fullpath, "%s%s", grp2_path, table->objs[i].name) < 0) { - H5TOOLS_ERROR(H5DIFF_ERR, "name buffer allocation failed"); - } -#else /* H5_HAVE_ASPRINTF */ - if ((obj2_fullpath = (char *)malloc(strlen(grp2_path) + strlen(table->objs[i].name) + 1)) == - NULL) { - H5TOOLS_ERROR(H5DIFF_ERR, "name buffer allocation failed"); - } - else { - strcpy(obj2_fullpath, grp2_path); - strcat(obj2_fullpath, table->objs[i].name); - } -#endif /* H5_HAVE_ASPRINTF */ - H5TOOLS_DEBUG("diff_match path2 - %s", obj2_fullpath); - - /* get index to figure out type of the object in file1 */ - while (info1->paths[idx1].path && (strcmp(obj1_fullpath, info1->paths[idx1].path) != 0)) - idx1++; - /* get index to figure out type of the object in file2 */ - while (info2->paths[idx2].path && (strcmp(obj2_fullpath, info2->paths[idx2].path) != 0)) - idx2++; - - /* Set argdata to pass other args into diff() */ - argdata.type[0] = info1->paths[idx1].type; - argdata.type[1] = info2->paths[idx2].type; - argdata.is_same_trgobj = table->objs[i].is_same_trgobj; - - opts->cmn_objs = 1; - if (!g_Parallel) { - H5TOOLS_DEBUG("diff paths - errstat:%d", opts->err_stat); - nfound += diff(file1_id, obj1_fullpath, file2_id, obj2_fullpath, opts, &argdata); - } /* end if */ -#ifdef H5_HAVE_PARALLEL - else { - int workerFound = 0; - - H5TOOLS_DEBUG("Beginning of big else block"); - /* We're in parallel mode */ - /* Since the data type of diff value is hsize_t which can - * be arbitrary large such that there is no MPI type that - * matches it, the value is passed between processes as - * an array of bytes in order to be portable. But this - * may not work in non-homogeneous MPI environments. - */ - - /*Set up args to pass to worker task. */ - if (strlen(obj1_fullpath) > 255 || strlen(obj2_fullpath) > 255) { - fprintf(stderr, - "The parallel diff only supports object names up to 255 characters\n"); - MPI_Abort(MPI_COMM_WORLD, 0); - } /* end if */ - - /* set args struct to pass */ - strcpy(args.name1, obj1_fullpath); - strcpy(args.name2, obj2_fullpath); - args.opts = *opts; - args.argdata.type[0] = info1->paths[idx1].type; - args.argdata.type[1] = info2->paths[idx2].type; - args.argdata.is_same_trgobj = table->objs[i].is_same_trgobj; - - /* if there are any outstanding print requests, let's handle one. */ - if (busyTasks > 0) { - int incomingMessage; - - /* check if any tasks freed up, and didn't need to print. */ - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &incomingMessage, &Status); - - /* first block*/ - if (incomingMessage) { - workerTasks[Status.MPI_SOURCE - 1] = 1; - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_DONE, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - } /* end if */ - - /* check to see if the print token was returned. */ - if (!havePrintToken) { - /* If we don't have the token, someone is probably sending us output */ - print_incoming_data(); - - /* check incoming queue for token */ - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, - &Status); - - /* incoming token implies free task. */ - if (incomingMessage) { - workerTasks[Status.MPI_SOURCE - 1] = 1; - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - havePrintToken = 1; - } /* end if */ - } /* end if */ - - /* check to see if anyone needs the print token. */ - if (havePrintToken) { - /* check incoming queue for print token requests */ - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &incomingMessage, - &Status); - if (incomingMessage) { - MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, - MPI_COMM_WORLD, &Status); - MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, - MPI_COMM_WORLD); - havePrintToken = 0; - } /* end if */ - } /* end if */ - } /* end if */ - - /* check array of tasks to see which ones are free. - * Manager task never does work, so freeTasks[0] is really - * worker task 0. */ - for (n = 1; (n < g_nTasks) && !workerFound; n++) { - if (workerTasks[n - 1]) { - /* send file id's and names to first free worker */ - MPI_Send(&args, sizeof(args), MPI_BYTE, n, MPI_TAG_ARGS, MPI_COMM_WORLD); - - /* increment counter for total number of prints. */ - busyTasks++; - - /* mark worker as busy */ - workerTasks[n - 1] = 0; - workerFound = 1; - } /* end if */ - } /* end for */ - - if (!workerFound) { - /* if they were all busy, we've got to wait for one free up - * before we can move on. If we don't have the token, some - * task is currently printing so we'll wait for that task to - * return it. - */ - - if (!havePrintToken) { - while (!havePrintToken) { - int incomingMessage; - - print_incoming_data(); - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, - &incomingMessage, &Status); - if (incomingMessage) { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, - MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - havePrintToken = 1; - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - /* send this task the work unit. */ - MPI_Send(&args, sizeof(args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, - MPI_COMM_WORLD); - } /* end if */ - } /* end while */ - } /* end if */ - /* if we do have the token, check for task to free up, or wait for a task to request - * it */ - else { - /* But first print all the data in our incoming queue */ - print_incoming_data(); - MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status); - if (Status.MPI_TAG == MPI_TAG_DONE) { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_DONE, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - MPI_Send(&args, sizeof(args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, - MPI_COMM_WORLD); - } /* end if */ - else if (Status.MPI_TAG == MPI_TAG_TOK_REQUEST) { - int incomingMessage; - - MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, - MPI_COMM_WORLD, &Status); - MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, - MPI_COMM_WORLD); - - do { - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, - &incomingMessage, &Status); - - print_incoming_data(); - } while (!incomingMessage); - - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - MPI_Send(&args, sizeof(args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, - MPI_COMM_WORLD); - } /* end else-if */ - else { - fprintf(stderr, "ERROR: Invalid tag (%d) received \n", Status.MPI_TAG); - MPI_Abort(MPI_COMM_WORLD, 0); - MPI_Finalize(); - } /* end else */ - } /* end else */ - } /* end if */ - } /* end else */ -#endif /* H5_HAVE_PARALLEL */ - if (obj1_fullpath) - free(obj1_fullpath); - if (obj2_fullpath) - free(obj2_fullpath); - } /* end if */ - } /* end for */ - H5TOOLS_DEBUG("done with for loop - errstat:%d", opts->err_stat); + H5TOOLS_DEBUG("diff_match path1 - %s", obj1_fullpath); + H5TOOLS_DEBUG("diff_match path2 - %s", obj2_fullpath); + + /* get index to figure out type of the object in file1 */ + while (info1->paths[idx1].path && (strcmp(obj1_fullpath, info1->paths[idx1].path) != 0)) + idx1++; + /* get index to figure out type of the object in file2 */ + while (info2->paths[idx2].path && (strcmp(obj2_fullpath, info2->paths[idx2].path) != 0)) + idx2++; + + /* Set argdata to pass other args into diff() */ + argdata.type[0] = info1->paths[idx1].type; + argdata.type[1] = info2->paths[idx2].type; + argdata.is_same_trgobj = table->objs[i].is_same_trgobj; + + opts->cmn_objs = 1; + H5TOOLS_DEBUG("diff paths - errstat:%d", opts->err_stat); + + if (!g_Parallel) + nfound += diff(file1_id, obj1_fullpath, file2_id, obj2_fullpath, opts, &argdata); #ifdef H5_HAVE_PARALLEL - if (g_Parallel) { - /* make sure all tasks are done */ - while (busyTasks > 0) { - MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status); - if (Status.MPI_TAG == MPI_TAG_DONE) { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_DONE, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - } /* end if */ - else if (Status.MPI_TAG == MPI_TAG_TOK_REQUEST) { - MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, - &Status); - if (havePrintToken) { - int incomingMessage; - - MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); - - do { - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, - &Status); - - print_incoming_data(); - } while (!incomingMessage); - - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - } /* end if */ - /* someone else must have it...wait for them to return it, then give it to the task that - * just asked for it. */ - else { - int source = Status.MPI_SOURCE; - int incomingMessage; - - do { - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, - &Status); - - print_incoming_data(); - } while (!incomingMessage); - - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, MPI_ANY_SOURCE, - MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - MPI_Send(NULL, 0, MPI_BYTE, source, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); - } /* end else */ - } /* end else-if */ - else if (Status.MPI_TAG == MPI_TAG_TOK_RETURN) { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - havePrintToken = 1; - } /* end else-if */ - else if (Status.MPI_TAG == MPI_TAG_PRINT_DATA) { - char data[PRINT_DATA_MAX_SIZE + 1]; - memset(data, 0, PRINT_DATA_MAX_SIZE + 1); - - MPI_Recv(data, PRINT_DATA_MAX_SIZE, MPI_CHAR, Status.MPI_SOURCE, MPI_TAG_PRINT_DATA, - MPI_COMM_WORLD, &Status); - - parallel_print("%s", data); - } /* end else-if */ - else { - fprintf(stderr, "ph5diff-manager: ERROR!! Invalid tag (%d) received \n", Status.MPI_TAG); - MPI_Abort(MPI_COMM_WORLD, 0); - } /* end else */ - } /* end while */ - - /* Print any final data waiting in our queue */ - print_incoming_data(); - } /* end if */ - H5TOOLS_DEBUG("done with if block"); + else { + struct diff_mpi_args args; + + /* Dispatch diff requests to as many worker tasks as possible before + * handling incoming requests from worker tasks. + */ - free(workerTasks); + /* Check length of object names before handling and dispatching work */ + if (strlen(obj1_fullpath) > 255 || strlen(obj2_fullpath) > 255) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, + "parallel h5diff only supports object names up to 255 characters"); + + /* If no worker tasks are available, handle requests until one is */ + if (busyTasks == g_nTasks - 1) + if (H5DIFF_ERR == handle_worker_request(workerTasks, &busyTasks, opts, &nfound)) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't handle parallel worker task request"); + + /* Set up args to pass to worker task. */ + strcpy(args.name1, obj1_fullpath); + strcpy(args.name2, obj2_fullpath); + args.opts = *opts; + args.argdata = argdata; + + /* Dispatch diff request for this object to a worker task */ + if (H5DIFF_ERR == dispatch_diff_to_worker(&args, workerTasks, &busyTasks)) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't dispatch diff command to worker task"); + } +#endif + + if (obj1_fullpath) { + free(obj1_fullpath); + obj1_fullpath = NULL; + } + if (obj2_fullpath) { + free(obj2_fullpath); + obj2_fullpath = NULL; + } + } + H5TOOLS_DEBUG("done with for loop - errstat:%d", opts->err_stat); + +#ifdef H5_HAVE_PARALLEL + if (g_Parallel) { + /* Make sure all worker tasks are done */ + while (busyTasks > 0) { + if (H5DIFF_ERR == handle_worker_request(workerTasks, &busyTasks, opts, &nfound)) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't handle parallel worker task request"); + } } #endif /* H5_HAVE_PARALLEL */ +done: + free(obj1_fullpath); + free(obj2_fullpath); + +#ifdef H5_HAVE_PARALLEL + free(workerTasks); +#endif + opts->err_stat = opts->err_stat | ret_value; free_exclude_attr_list(opts); @@ -1932,3 +1700,148 @@ diff(hid_t file1_id, const char *path1, hid_t file2_id, const char *path2, diff_ return nfound; } + +#ifdef H5_HAVE_PARALLEL +/*------------------------------------------------------------------------- + * Function: handle_worker_request + * + * Purpose: Handles MPI communication from a worker task. Returns when a + * worker task becomes free (either a MPI_TAG_DONE message is + * received from it or a MPI_TAG_TOK_RETURN message is received + * from it after processing a MPI_TAG_TOK_REQUEST message event). + * + * Return: H5DIFF_NO_ERR on success/H5DIFF_ERR on failure + *------------------------------------------------------------------------- + */ +static diff_err_t +handle_worker_request(char *worker_tasks, int *n_busy_tasks, diff_opt_t *opts, hsize_t *n_diffs) +{ + struct diffs_found ndiffs_found; + MPI_Status status; + int task_idx = 0; + int source = 0; + herr_t ret_value = H5DIFF_NO_ERR; + + /* Must have at least one busy worker task */ + assert(*n_busy_tasks > 0); + + if (MPI_SUCCESS != (MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't check for message from worker task"); + source = status.MPI_SOURCE; + task_idx = source - 1; + + /* Currently, only MPI_TAG_DONE or MPI_TAG_TOK_REQUEST messages should be received + * from worker tasks. MPI_TAG_TOK_REQUEST messages begin a sequence that is handled + * "atomically" to simplify things and prevent the potential for interleaved output, + * out-of-order or unreceived messages, etc. + */ + if (status.MPI_TAG != MPI_TAG_DONE && status.MPI_TAG != MPI_TAG_TOK_REQUEST) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "invalid MPI message tag received from worker task"); + + if (status.MPI_TAG == MPI_TAG_DONE) { + if (MPI_SUCCESS != (MPI_Recv(&ndiffs_found, sizeof(ndiffs_found), MPI_BYTE, source, MPI_TAG_DONE, + MPI_COMM_WORLD, &status))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't receive 'done' message from worker"); + + /* Update diff stats */ + opts->not_cmp = opts->not_cmp | ndiffs_found.not_cmp; + (*n_diffs) += ndiffs_found.nfound; + + /* Mark worker task as free */ + worker_tasks[task_idx] = 1; + (*n_busy_tasks)--; + } + else if (status.MPI_TAG == MPI_TAG_TOK_REQUEST) { + char data[PRINT_DATA_MAX_SIZE + 1]; + int incoming_output = 0; + + if (MPI_SUCCESS != + (MPI_Recv(NULL, 0, MPI_BYTE, source, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &status))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't receive print token request message"); + + /* Give print token to worker task */ + if (MPI_SUCCESS != (MPI_Send(NULL, 0, MPI_BYTE, source, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't send print token to worker"); + + /* Print incoming output until print token is returned */ + incoming_output = 1; + do { + if (MPI_SUCCESS != (MPI_Probe(source, MPI_ANY_TAG, MPI_COMM_WORLD, &status))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't check for message from worker task"); + + if (status.MPI_TAG == MPI_TAG_PRINT_DATA) { + memset(data, 0, PRINT_DATA_MAX_SIZE + 1); + if (MPI_SUCCESS != (MPI_Recv(data, PRINT_DATA_MAX_SIZE, MPI_CHAR, source, MPI_TAG_PRINT_DATA, + MPI_COMM_WORLD, &status))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't receive output from worker task"); + + parallel_print("%s", data); + } + else if (status.MPI_TAG == MPI_TAG_TOK_RETURN) { + if (MPI_SUCCESS != (MPI_Recv(&ndiffs_found, sizeof(ndiffs_found), MPI_BYTE, source, + MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &status))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't receive print token message from worker"); + + incoming_output = 0; + } + } while (incoming_output); + + /* Update diff stats */ + opts->not_cmp = opts->not_cmp | ndiffs_found.not_cmp; + (*n_diffs) += ndiffs_found.nfound; + + /* Mark worker task as free */ + worker_tasks[task_idx] = 1; + (*n_busy_tasks)--; + } + +done: + return ret_value; +} + +/*------------------------------------------------------------------------- + * Function: dispatch_diff_to_worker + * + * Purpose: Sends arguments to a worker task to allow it to start + * processing the differences between two objects. + * + * Return: H5DIFF_NO_ERR on success/H5DIFF_ERR on failure + *------------------------------------------------------------------------- + */ +static diff_err_t +dispatch_diff_to_worker(struct diff_mpi_args *args, char *worker_tasks, int *n_busy_tasks) +{ + int target_task = -1; + diff_err_t ret_value = H5DIFF_NO_ERR; + + /* Must have a free worker task */ + assert(*n_busy_tasks < g_nTasks - 1); + + /* Check array of tasks to see which ones are free. + * Manager task never does work, so workerTasks[0] is + * really worker task 0, or MPI rank 1. + */ + target_task = -1; + for (int n = 1; n < g_nTasks; n++) + if (worker_tasks[n - 1]) { + target_task = n - 1; + break; + } + + /* We should always find a free worker here */ + if (target_task < 0) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't find a free worker task to dispatch diff request to"); + + /* Send diff arguments to worker */ + if (MPI_SUCCESS != (MPI_Send(args, sizeof(struct diff_mpi_args), MPI_BYTE, target_task + 1, MPI_TAG_ARGS, + MPI_COMM_WORLD))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't send diff arguments to worker task"); + + /* Mark worker task as busy */ + worker_tasks[target_task] = 0; + (*n_busy_tasks)++; + +done: + return ret_value; +} +#endif diff --git a/tools/src/h5diff/ph5diff_main.c b/tools/src/h5diff/ph5diff_main.c index 98e0c1da1b7..07fdb58bc09 100644 --- a/tools/src/h5diff/ph5diff_main.c +++ b/tools/src/h5diff/ph5diff_main.c @@ -48,6 +48,11 @@ main(int argc, char *argv[]) const char *objname2 = NULL; diff_opt_t opts; + MPI_Init(&argc, (char ***)&argv); + + MPI_Comm_rank(MPI_COMM_WORLD, &nID); + MPI_Comm_size(MPI_COMM_WORLD, &g_nTasks); + h5tools_setprogname(PROGRAMNAME); h5tools_setstatus(EXIT_SUCCESS); @@ -57,11 +62,6 @@ main(int argc, char *argv[]) outBuffOffset = 0; g_Parallel = 1; - MPI_Init(&argc, (char ***)&argv); - - MPI_Comm_rank(MPI_COMM_WORLD, &nID); - MPI_Comm_size(MPI_COMM_WORLD, &g_nTasks); - if (g_nTasks == 1) { fprintf(stderr, "Only 1 task available...doing serial diff\n"); @@ -155,6 +155,7 @@ ph5diff_worker(int nID) /* Make certain we've received the filenames and opened the files already */ if (file1_id < 0 || file2_id < 0) { printf("ph5diff_worker: ERROR: work received before/without filenames\n"); + MPI_Abort(MPI_COMM_WORLD, 0); break; } @@ -165,34 +166,44 @@ ph5diff_worker(int nID) diffs.nfound = diff(file1_id, args.name1, file2_id, args.name2, &(args.opts), &(args.argdata)); diffs.not_cmp = args.opts.not_cmp; - /* If print buffer has something in it, request print token.*/ - if (outBuffOffset > 0) { + if ((outBuffOffset == 0) && !overflow_file) + /* Nothing to print. Send diffs to manager */ + MPI_Send(&diffs, sizeof(diffs), MPI_BYTE, 0, MPI_TAG_DONE, MPI_COMM_WORLD); + else { + /* + * If print buffer or overflow file have something in + * them, request print token. + */ MPI_Send(NULL, 0, MPI_BYTE, 0, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD); /* Wait for print token. */ MPI_Recv(NULL, 0, MPI_BYTE, 0, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD, &Status); - /* When get token, send all of our output to the manager task and then return the token */ - for (i = 0; i < outBuffOffset; i += PRINT_DATA_MAX_SIZE) - MPI_Send(outBuff + i, PRINT_DATA_MAX_SIZE, MPI_CHAR, 0, MPI_TAG_PRINT_DATA, - MPI_COMM_WORLD); + if (outBuffOffset > 0) { + /* When get token, send all of our output to the manager task and then return the token */ + for (i = 0; i < outBuffOffset; i += PRINT_DATA_MAX_SIZE) + MPI_Send(outBuff + i, PRINT_DATA_MAX_SIZE, MPI_CHAR, 0, MPI_TAG_PRINT_DATA, + MPI_COMM_WORLD); + } - /* An overflow file exists, so we send it's output to the manager too and then delete it */ + /* An overflow file exists, so we send its output to + * the manager too and then delete it. + */ if (overflow_file) { - char out_data[PRINT_DATA_MAX_SIZE]; + char out_data[PRINT_DATA_MAX_SIZE + 1]; int tmp; - memset(out_data, 0, PRINT_DATA_MAX_SIZE); + memset(out_data, 0, PRINT_DATA_MAX_SIZE + 1); i = 0; rewind(overflow_file); - while ((tmp = getc(overflow_file)) >= 0) { + while ((tmp = getc(overflow_file)) != EOF) { *(out_data + i++) = (char)tmp; if (i == PRINT_DATA_MAX_SIZE) { MPI_Send(out_data, PRINT_DATA_MAX_SIZE, MPI_CHAR, 0, MPI_TAG_PRINT_DATA, MPI_COMM_WORLD); i = 0; - memset(out_data, 0, PRINT_DATA_MAX_SIZE); + memset(out_data, 0, PRINT_DATA_MAX_SIZE + 1); } } @@ -210,8 +221,6 @@ ph5diff_worker(int nID) MPI_Send(&diffs, sizeof(diffs), MPI_BYTE, 0, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD); } - else - MPI_Send(&diffs, sizeof(diffs), MPI_BYTE, 0, MPI_TAG_DONE, MPI_COMM_WORLD); } /* Check for leaving */ else if (Status.MPI_TAG == MPI_TAG_END) { @@ -220,10 +229,14 @@ ph5diff_worker(int nID) } else { printf("ph5diff_worker: ERROR: invalid tag (%d) received\n", Status.MPI_TAG); + MPI_Abort(MPI_COMM_WORLD, 0); break; } } + H5Fclose(file1_id); + H5Fclose(file2_id); + return; } @@ -241,13 +254,14 @@ void print_manager_output(void) { /* If there was something we buffered, let's print it now */ - if ((outBuffOffset > 0) && g_Parallel) { - printf("%s", outBuff); + if (g_Parallel) { + if (outBuffOffset > 0) + printf("%s", outBuff); if (overflow_file) { int tmp; rewind(overflow_file); - while ((tmp = getc(overflow_file)) >= 0) + while ((tmp = getc(overflow_file)) != EOF) putchar(tmp); fclose(overflow_file); overflow_file = NULL; @@ -257,8 +271,8 @@ print_manager_output(void) memset(outBuff, 0, OUTBUFF_SIZE); outBuffOffset = 0; } - else if ((outBuffOffset > 0) && !g_Parallel) { - fprintf(stderr, "h5diff error: outBuffOffset>0, but we're not in parallel!\n"); + else if (outBuffOffset > 0) { + fprintf(stderr, "h5diff error: outBuffOffset > 0, but we're not in parallel!\n"); } }