diff --git a/config/cmake/runTest.cmake b/config/cmake/runTest.cmake index ff9732e0821..5823337b8d0 100644 --- a/config/cmake/runTest.cmake +++ b/config/cmake/runTest.cmake @@ -266,12 +266,22 @@ if (NOT TEST_SKIP_COMPARE) endif () math (EXPR _FP_LEN "${len_ref} - 1") foreach (line RANGE 0 ${_FP_LEN}) - list (GET test_act ${line} str_act) - list (GET test_ref ${line} str_ref) - if (NOT str_act STREQUAL str_ref) - if (str_act) - set (TEST_COMPARE_RESULT 1) - message (STATUS "line = ${line}\n***ACTUAL: ${str_act}\n****REFER: ${str_ref}\n") + if (line GREATER_EQUAL len_act) + message (STATUS "COMPARE FAILED: ran out of lines in ${TEST_FOLDER}/${TEST_OUTPUT}") + set (TEST_COMPARE_RESULT 1) + break () + elseif (line GREATER_EQUAL len_ref) + message (STATUS "COMPARE FAILED: ran out of lines in ${TEST_FOLDER}/${TEST_REFERENCE}") + set (TEST_COMPARE_RESULT 1) + break () + else () + list (GET test_act ${line} str_act) + list (GET test_ref ${line} str_ref) + if (NOT str_act STREQUAL str_ref) + if (str_act) + set (TEST_COMPARE_RESULT 1) + message (STATUS "line = ${line}\n***ACTUAL: ${str_act}\n****REFER: ${str_ref}\n") + endif () endif () endif () endforeach () diff --git a/tools/lib/h5diff.c b/tools/lib/h5diff.c index bdbda6e74bc..9329872d965 100644 --- a/tools/lib/h5diff.c +++ b/tools/lib/h5diff.c @@ -16,6 +16,12 @@ #include "h5diff.h" #include "ph5diff.h" +#ifdef H5_HAVE_PARALLEL +static diff_err_t handle_worker_request(char *worker_tasks, int *n_busy_tasks, diff_opt_t *opts, + hsize_t *n_diffs); +static diff_err_t dispatch_diff_to_worker(struct diff_mpi_args *args, char *worker_tasks, int *n_busy_tasks); +#endif + /*------------------------------------------------------------------------- * Function: print_objname * @@ -91,35 +97,6 @@ phdiff_dismiss_workers(void) for (i = 1; i < g_nTasks; i++) MPI_Send(NULL, 0, MPI_BYTE, i, MPI_TAG_END, MPI_COMM_WORLD); } - -/*------------------------------------------------------------------------- - * Function: print_incoming_data - * - * Purpose: special function that prints any output that has been sent to the manager - * and is currently sitting in the incoming message queue - * - * Return: none - *------------------------------------------------------------------------- - */ - -static void -print_incoming_data(void) -{ - char data[PRINT_DATA_MAX_SIZE + 1]; - int incomingMessage; - MPI_Status Status; - - do { - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_PRINT_DATA, MPI_COMM_WORLD, &incomingMessage, &Status); - if (incomingMessage) { - memset(data, 0, PRINT_DATA_MAX_SIZE + 1); - MPI_Recv(data, PRINT_DATA_MAX_SIZE, MPI_CHAR, Status.MPI_SOURCE, MPI_TAG_PRINT_DATA, - MPI_COMM_WORLD, &Status); - - parallel_print("%s", data); - } - } while (incomingMessage); -} #endif /*------------------------------------------------------------------------- @@ -992,24 +969,6 @@ h5diff(const char *fname1, const char *fname2, const char *objname1, const char } H5TOOLS_DEBUG("groups traversed - errstat:%d", opts->err_stat); -#ifdef H5_HAVE_PARALLEL - if (g_Parallel) { - int i; - - if ((strlen(fname1) > MAX_FILENAME) || (strlen(fname2) > MAX_FILENAME)) { - fprintf(stderr, "The parallel diff only supports path names up to %d characters\n", MAX_FILENAME); - MPI_Abort(MPI_COMM_WORLD, 0); - } /* end if */ - - strcpy(filenames[0], fname1); - strcpy(filenames[1], fname2); - - /* Alert the worker tasks that there's going to be work. */ - for (i = 1; i < g_nTasks; i++) - MPI_Send(filenames, (MAX_FILENAME * 2), MPI_CHAR, i, MPI_TAG_PARALLEL, MPI_COMM_WORLD); - } /* end if */ -#endif - H5TOOLS_DEBUG("build_match_list next - errstat:%d", opts->err_stat); /* process the objects */ build_match_list(obj1fullname, info1_lp, obj2fullname, info2_lp, &match_list, opts); @@ -1042,6 +1001,24 @@ h5diff(const char *fname1, const char *fname2, const char *objname1, const char parallel_print("\n"); } /* end if */ } + +#ifdef H5_HAVE_PARALLEL + if (g_Parallel) { + if ((strlen(fname1) > MAX_FILENAME - 1) || (strlen(fname2) > MAX_FILENAME - 1)) { + fprintf(stderr, "The parallel diff only supports path names up to %d characters\n", + MAX_FILENAME - 1); + MPI_Abort(MPI_COMM_WORLD, 0); + } /* end if */ + + strcpy(filenames[0], fname1); + strcpy(filenames[1], fname2); + + /* Alert the worker tasks that there's going to be work. */ + for (int i = 1; i < g_nTasks; i++) + MPI_Send(filenames, (MAX_FILENAME * 2), MPI_CHAR, i, MPI_TAG_PARALLEL, MPI_COMM_WORLD); + } /* end if */ +#endif + H5TOOLS_DEBUG("diff_match next - errstat:%d", opts->err_stat); nfound = diff_match(file1_id, obj1fullname, info1_lp, file2_id, obj2fullname, info2_lp, match_list, opts); H5TOOLS_DEBUG("diff_match nfound: %d - errstat:%d", nfound, opts->err_stat); @@ -1117,11 +1094,26 @@ diff_match(hid_t file1_id, const char *grp1, trav_info_t *info1, hid_t file2_id, char *obj1_fullpath = NULL; char *obj2_fullpath = NULL; diff_args_t argdata; - size_t idx1 = 0; - size_t idx2 = 0; - diff_err_t ret_value = opts->err_stat; + size_t idx1 = 0; + size_t idx2 = 0; +#ifdef H5_HAVE_PARALLEL + char *workerTasks = NULL; + int busyTasks = 0; +#endif + diff_err_t ret_value = opts->err_stat; H5TOOLS_START_DEBUG(" - errstat:%d", opts->err_stat); + +#ifdef H5_HAVE_PARALLEL + if (g_Parallel) { + if (NULL == (workerTasks = malloc((size_t)(g_nTasks - 1) * sizeof(char)))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "unable to allocate worker tasks array"); + + /*set all tasks as free */ + memset(workerTasks, 1, (size_t)(g_nTasks - 1) * sizeof(char)); + } +#endif + /* * if not root, prepare object name to be pre-appended to group path to * make full path @@ -1162,339 +1154,115 @@ diff_match(hid_t file1_id, const char *grp1, trav_info_t *info1, hid_t file2_id, * do the diff for common objects *------------------------------------------------------------------------- */ -#ifdef H5_HAVE_PARALLEL - { - char *workerTasks = (char *)malloc((size_t)(g_nTasks - 1) * sizeof(char)); - int n; - int busyTasks = 0; - struct diffs_found nFoundbyWorker; - struct diff_mpi_args args; - int havePrintToken = 1; - MPI_Status Status; + for (i = 0; i < table->nobjs; i++) { + H5TOOLS_DEBUG("diff for common objects[%d] - errstat:%d", i, opts->err_stat); - /*set all tasks as free */ - memset(workerTasks, 1, (size_t)(g_nTasks - 1) * sizeof(char)); -#endif + /* Check if object is present in both files first before diffing */ + if (!(table->objs[i].flags[0] && table->objs[i].flags[1])) + continue; - for (i = 0; i < table->nobjs; i++) { - H5TOOLS_DEBUG("diff for common objects[%d] - errstat:%d", i, opts->err_stat); - if (table->objs[i].flags[0] && table->objs[i].flags[1]) { - /* make full path for obj1 */ + /* Make full paths for objects */ #ifdef H5_HAVE_ASPRINTF - /* Use the asprintf() routine, since it does what we're trying to do below */ - if (asprintf(&obj1_fullpath, "%s%s", grp1_path, table->objs[i].name) < 0) { - H5TOOLS_ERROR(H5DIFF_ERR, "name buffer allocation failed"); - } -#else /* H5_HAVE_ASPRINTF */ - if ((obj1_fullpath = (char *)malloc(strlen(grp1_path) + strlen(table->objs[i].name) + 1)) == - NULL) { - H5TOOLS_ERROR(H5DIFF_ERR, "name buffer allocation failed"); - } - else { - strcpy(obj1_fullpath, grp1_path); - strcat(obj1_fullpath, table->objs[i].name); - } -#endif /* H5_HAVE_ASPRINTF */ - H5TOOLS_DEBUG("diff_match path1 - %s", obj1_fullpath); + /* Use the asprintf() routine, since it does what we're trying to do below */ + if (asprintf(&obj1_fullpath, "%s%s", grp1_path, table->objs[i].name) < 0) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "name buffer allocation failed"); + if (asprintf(&obj2_fullpath, "%s%s", grp2_path, table->objs[i].name) < 0) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "name buffer allocation failed"); +#else + if (NULL == (obj1_fullpath = malloc(strlen(grp1_path) + strlen(table->objs[i].name) + 1))) + H5TOOLS_ERROR(H5DIFF_ERR, "name buffer allocation failed"); + else { + strcpy(obj1_fullpath, grp1_path); + strcat(obj1_fullpath, table->objs[i].name); + } + if (NULL == (obj2_fullpath = malloc(strlen(grp2_path) + strlen(table->objs[i].name) + 1))) + H5TOOLS_ERROR(H5DIFF_ERR, "name buffer allocation failed"); + else { + strcpy(obj2_fullpath, grp2_path); + strcat(obj2_fullpath, table->objs[i].name); + } +#endif - /* make full path for obj2 */ -#ifdef H5_HAVE_ASPRINTF - /* Use the asprintf() routine, since it does what we're trying to do below */ - if (asprintf(&obj2_fullpath, "%s%s", grp2_path, table->objs[i].name) < 0) { - H5TOOLS_ERROR(H5DIFF_ERR, "name buffer allocation failed"); - } -#else /* H5_HAVE_ASPRINTF */ - if ((obj2_fullpath = (char *)malloc(strlen(grp2_path) + strlen(table->objs[i].name) + 1)) == - NULL) { - H5TOOLS_ERROR(H5DIFF_ERR, "name buffer allocation failed"); - } - else { - strcpy(obj2_fullpath, grp2_path); - strcat(obj2_fullpath, table->objs[i].name); - } -#endif /* H5_HAVE_ASPRINTF */ - H5TOOLS_DEBUG("diff_match path2 - %s", obj2_fullpath); - - /* get index to figure out type of the object in file1 */ - while (info1->paths[idx1].path && (strcmp(obj1_fullpath, info1->paths[idx1].path) != 0)) - idx1++; - /* get index to figure out type of the object in file2 */ - while (info2->paths[idx2].path && (strcmp(obj2_fullpath, info2->paths[idx2].path) != 0)) - idx2++; - - /* Set argdata to pass other args into diff() */ - argdata.type[0] = info1->paths[idx1].type; - argdata.type[1] = info2->paths[idx2].type; - argdata.is_same_trgobj = table->objs[i].is_same_trgobj; - - opts->cmn_objs = 1; - if (!g_Parallel) { - H5TOOLS_DEBUG("diff paths - errstat:%d", opts->err_stat); - nfound += diff(file1_id, obj1_fullpath, file2_id, obj2_fullpath, opts, &argdata); - } /* end if */ -#ifdef H5_HAVE_PARALLEL - else { - int workerFound = 0; - - H5TOOLS_DEBUG("Beginning of big else block"); - /* We're in parallel mode */ - /* Since the data type of diff value is hsize_t which can - * be arbitrary large such that there is no MPI type that - * matches it, the value is passed between processes as - * an array of bytes in order to be portable. But this - * may not work in non-homogeneous MPI environments. - */ - - /*Set up args to pass to worker task. */ - if (strlen(obj1_fullpath) > 255 || strlen(obj2_fullpath) > 255) { - fprintf(stderr, - "The parallel diff only supports object names up to 255 characters\n"); - MPI_Abort(MPI_COMM_WORLD, 0); - } /* end if */ - - /* set args struct to pass */ - strcpy(args.name1, obj1_fullpath); - strcpy(args.name2, obj2_fullpath); - args.opts = *opts; - args.argdata.type[0] = info1->paths[idx1].type; - args.argdata.type[1] = info2->paths[idx2].type; - args.argdata.is_same_trgobj = table->objs[i].is_same_trgobj; - - /* if there are any outstanding print requests, let's handle one. */ - if (busyTasks > 0) { - int incomingMessage; - - /* check if any tasks freed up, and didn't need to print. */ - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_DONE, MPI_COMM_WORLD, &incomingMessage, &Status); - - /* first block*/ - if (incomingMessage) { - workerTasks[Status.MPI_SOURCE - 1] = 1; - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_DONE, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - } /* end if */ - - /* check to see if the print token was returned. */ - if (!havePrintToken) { - /* If we don't have the token, someone is probably sending us output */ - print_incoming_data(); - - /* check incoming queue for token */ - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, - &Status); - - /* incoming token implies free task. */ - if (incomingMessage) { - workerTasks[Status.MPI_SOURCE - 1] = 1; - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - havePrintToken = 1; - } /* end if */ - } /* end if */ - - /* check to see if anyone needs the print token. */ - if (havePrintToken) { - /* check incoming queue for print token requests */ - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &incomingMessage, - &Status); - if (incomingMessage) { - MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, - MPI_COMM_WORLD, &Status); - MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, - MPI_COMM_WORLD); - havePrintToken = 0; - } /* end if */ - } /* end if */ - } /* end if */ - - /* check array of tasks to see which ones are free. - * Manager task never does work, so freeTasks[0] is really - * worker task 0. */ - for (n = 1; (n < g_nTasks) && !workerFound; n++) { - if (workerTasks[n - 1]) { - /* send file id's and names to first free worker */ - MPI_Send(&args, sizeof(args), MPI_BYTE, n, MPI_TAG_ARGS, MPI_COMM_WORLD); - - /* increment counter for total number of prints. */ - busyTasks++; - - /* mark worker as busy */ - workerTasks[n - 1] = 0; - workerFound = 1; - } /* end if */ - } /* end for */ - - if (!workerFound) { - /* if they were all busy, we've got to wait for one free up - * before we can move on. If we don't have the token, some - * task is currently printing so we'll wait for that task to - * return it. - */ - - if (!havePrintToken) { - while (!havePrintToken) { - int incomingMessage; - - print_incoming_data(); - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, - &incomingMessage, &Status); - if (incomingMessage) { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, - MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - havePrintToken = 1; - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - /* send this task the work unit. */ - MPI_Send(&args, sizeof(args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, - MPI_COMM_WORLD); - } /* end if */ - } /* end while */ - } /* end if */ - /* if we do have the token, check for task to free up, or wait for a task to request - * it */ - else { - /* But first print all the data in our incoming queue */ - print_incoming_data(); - MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status); - if (Status.MPI_TAG == MPI_TAG_DONE) { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_DONE, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - MPI_Send(&args, sizeof(args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, - MPI_COMM_WORLD); - } /* end if */ - else if (Status.MPI_TAG == MPI_TAG_TOK_REQUEST) { - int incomingMessage; - - MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, - MPI_COMM_WORLD, &Status); - MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, - MPI_COMM_WORLD); - - do { - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, - &incomingMessage, &Status); - - print_incoming_data(); - } while (!incomingMessage); - - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - MPI_Send(&args, sizeof(args), MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_ARGS, - MPI_COMM_WORLD); - } /* end else-if */ - else { - fprintf(stderr, "ERROR: Invalid tag (%d) received \n", Status.MPI_TAG); - MPI_Abort(MPI_COMM_WORLD, 0); - MPI_Finalize(); - } /* end else */ - } /* end else */ - } /* end if */ - } /* end else */ -#endif /* H5_HAVE_PARALLEL */ - if (obj1_fullpath) - free(obj1_fullpath); - if (obj2_fullpath) - free(obj2_fullpath); - } /* end if */ - } /* end for */ - H5TOOLS_DEBUG("done with for loop - errstat:%d", opts->err_stat); + H5TOOLS_DEBUG("diff_match path1 - %s", obj1_fullpath); + H5TOOLS_DEBUG("diff_match path2 - %s", obj2_fullpath); + + /* get index to figure out type of the object in file1 */ + while (info1->paths[idx1].path && (strcmp(obj1_fullpath, info1->paths[idx1].path) != 0)) + idx1++; + /* get index to figure out type of the object in file2 */ + while (info2->paths[idx2].path && (strcmp(obj2_fullpath, info2->paths[idx2].path) != 0)) + idx2++; + + /* Set argdata to pass other args into diff() */ + argdata.type[0] = info1->paths[idx1].type; + argdata.type[1] = info2->paths[idx2].type; + argdata.is_same_trgobj = table->objs[i].is_same_trgobj; + + opts->cmn_objs = 1; + H5TOOLS_DEBUG("diff paths - errstat:%d", opts->err_stat); + + if (!g_Parallel) + nfound += diff(file1_id, obj1_fullpath, file2_id, obj2_fullpath, opts, &argdata); #ifdef H5_HAVE_PARALLEL - if (g_Parallel) { - /* make sure all tasks are done */ - while (busyTasks > 0) { - MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &Status); - if (Status.MPI_TAG == MPI_TAG_DONE) { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_DONE, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - } /* end if */ - else if (Status.MPI_TAG == MPI_TAG_TOK_REQUEST) { - MPI_Recv(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, - &Status); - if (havePrintToken) { - int incomingMessage; - - MPI_Send(NULL, 0, MPI_BYTE, Status.MPI_SOURCE, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); - - do { - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, - &Status); - - print_incoming_data(); - } while (!incomingMessage); - - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - } /* end if */ - /* someone else must have it...wait for them to return it, then give it to the task that - * just asked for it. */ - else { - int source = Status.MPI_SOURCE; - int incomingMessage; - - do { - MPI_Iprobe(MPI_ANY_SOURCE, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &incomingMessage, - &Status); - - print_incoming_data(); - } while (!incomingMessage); - - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, MPI_ANY_SOURCE, - MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - MPI_Send(NULL, 0, MPI_BYTE, source, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD); - } /* end else */ - } /* end else-if */ - else if (Status.MPI_TAG == MPI_TAG_TOK_RETURN) { - MPI_Recv(&nFoundbyWorker, sizeof(nFoundbyWorker), MPI_BYTE, Status.MPI_SOURCE, - MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &Status); - nfound += nFoundbyWorker.nfound; - opts->not_cmp = opts->not_cmp | nFoundbyWorker.not_cmp; - busyTasks--; - havePrintToken = 1; - } /* end else-if */ - else if (Status.MPI_TAG == MPI_TAG_PRINT_DATA) { - char data[PRINT_DATA_MAX_SIZE + 1]; - memset(data, 0, PRINT_DATA_MAX_SIZE + 1); - - MPI_Recv(data, PRINT_DATA_MAX_SIZE, MPI_CHAR, Status.MPI_SOURCE, MPI_TAG_PRINT_DATA, - MPI_COMM_WORLD, &Status); - - parallel_print("%s", data); - } /* end else-if */ - else { - fprintf(stderr, "ph5diff-manager: ERROR!! Invalid tag (%d) received \n", Status.MPI_TAG); - MPI_Abort(MPI_COMM_WORLD, 0); - } /* end else */ - } /* end while */ - - /* Print any final data waiting in our queue */ - print_incoming_data(); - } /* end if */ - H5TOOLS_DEBUG("done with if block"); + else { + struct diff_mpi_args args; + + /* Dispatch diff requests to as many worker tasks as possible before + * handling incoming requests from worker tasks. + */ - free(workerTasks); + /* Check length of object names before handling and dispatching work */ + if (strlen(obj1_fullpath) > 255 || strlen(obj2_fullpath) > 255) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, + "parallel h5diff only supports object names up to 255 characters"); + + /* If no worker tasks are available, handle requests until one is */ + if (busyTasks == g_nTasks - 1) + if (H5DIFF_ERR == handle_worker_request(workerTasks, &busyTasks, opts, &nfound)) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't handle parallel worker task request"); + + /* Set up args to pass to worker task. */ + strcpy(args.name1, obj1_fullpath); + strcpy(args.name2, obj2_fullpath); + args.opts = *opts; + args.argdata = argdata; + + /* Dispatch diff request for this object to a worker task */ + if (H5DIFF_ERR == dispatch_diff_to_worker(&args, workerTasks, &busyTasks)) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't dispatch diff command to worker task"); + } +#endif + + if (obj1_fullpath) { + free(obj1_fullpath); + obj1_fullpath = NULL; + } + if (obj2_fullpath) { + free(obj2_fullpath); + obj2_fullpath = NULL; + } + } + H5TOOLS_DEBUG("done with for loop - errstat:%d", opts->err_stat); + +#ifdef H5_HAVE_PARALLEL + if (g_Parallel) { + /* Make sure all worker tasks are done */ + while (busyTasks > 0) { + if (H5DIFF_ERR == handle_worker_request(workerTasks, &busyTasks, opts, &nfound)) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't handle parallel worker task request"); + } } #endif /* H5_HAVE_PARALLEL */ +done: + free(obj1_fullpath); + free(obj2_fullpath); + +#ifdef H5_HAVE_PARALLEL + free(workerTasks); +#endif + opts->err_stat = opts->err_stat | ret_value; free_exclude_attr_list(opts); @@ -1932,3 +1700,148 @@ diff(hid_t file1_id, const char *path1, hid_t file2_id, const char *path2, diff_ return nfound; } + +#ifdef H5_HAVE_PARALLEL +/*------------------------------------------------------------------------- + * Function: handle_worker_request + * + * Purpose: Handles MPI communication from a worker task. Returns when a + * worker task becomes free (either a MPI_TAG_DONE message is + * received from it or a MPI_TAG_TOK_RETURN message is received + * from it after processing a MPI_TAG_TOK_REQUEST message event). + * + * Return: H5DIFF_NO_ERR on success/H5DIFF_ERR on failure + *------------------------------------------------------------------------- + */ +static diff_err_t +handle_worker_request(char *worker_tasks, int *n_busy_tasks, diff_opt_t *opts, hsize_t *n_diffs) +{ + struct diffs_found ndiffs_found; + MPI_Status status; + int task_idx = 0; + int source = 0; + herr_t ret_value = H5DIFF_NO_ERR; + + /* Must have at least one busy worker task */ + assert(*n_busy_tasks > 0); + + if (MPI_SUCCESS != (MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't check for message from worker task"); + source = status.MPI_SOURCE; + task_idx = source - 1; + + /* Currently, only MPI_TAG_DONE or MPI_TAG_TOK_REQUEST messages should be received + * from worker tasks. MPI_TAG_TOK_REQUEST messages begin a sequence that is handled + * "atomically" to simplify things and prevent the potential for interleaved output, + * out-of-order or unreceived messages, etc. + */ + if (status.MPI_TAG != MPI_TAG_DONE && status.MPI_TAG != MPI_TAG_TOK_REQUEST) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "invalid MPI message tag received from worker task"); + + if (status.MPI_TAG == MPI_TAG_DONE) { + if (MPI_SUCCESS != (MPI_Recv(&ndiffs_found, sizeof(ndiffs_found), MPI_BYTE, source, MPI_TAG_DONE, + MPI_COMM_WORLD, &status))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't receive 'done' message from worker"); + + /* Update diff stats */ + opts->not_cmp = opts->not_cmp | ndiffs_found.not_cmp; + (*n_diffs) += ndiffs_found.nfound; + + /* Mark worker task as free */ + worker_tasks[task_idx] = 1; + (*n_busy_tasks)--; + } + else if (status.MPI_TAG == MPI_TAG_TOK_REQUEST) { + char data[PRINT_DATA_MAX_SIZE + 1]; + int incoming_output = 0; + + if (MPI_SUCCESS != + (MPI_Recv(NULL, 0, MPI_BYTE, source, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD, &status))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't receive print token request message"); + + /* Give print token to worker task */ + if (MPI_SUCCESS != (MPI_Send(NULL, 0, MPI_BYTE, source, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't send print token to worker"); + + /* Print incoming output until print token is returned */ + incoming_output = 1; + do { + if (MPI_SUCCESS != (MPI_Probe(source, MPI_ANY_TAG, MPI_COMM_WORLD, &status))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't check for message from worker task"); + + if (status.MPI_TAG == MPI_TAG_PRINT_DATA) { + memset(data, 0, PRINT_DATA_MAX_SIZE + 1); + if (MPI_SUCCESS != (MPI_Recv(data, PRINT_DATA_MAX_SIZE, MPI_CHAR, source, MPI_TAG_PRINT_DATA, + MPI_COMM_WORLD, &status))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't receive output from worker task"); + + parallel_print("%s", data); + } + else if (status.MPI_TAG == MPI_TAG_TOK_RETURN) { + if (MPI_SUCCESS != (MPI_Recv(&ndiffs_found, sizeof(ndiffs_found), MPI_BYTE, source, + MPI_TAG_TOK_RETURN, MPI_COMM_WORLD, &status))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't receive print token message from worker"); + + incoming_output = 0; + } + } while (incoming_output); + + /* Update diff stats */ + opts->not_cmp = opts->not_cmp | ndiffs_found.not_cmp; + (*n_diffs) += ndiffs_found.nfound; + + /* Mark worker task as free */ + worker_tasks[task_idx] = 1; + (*n_busy_tasks)--; + } + +done: + return ret_value; +} + +/*------------------------------------------------------------------------- + * Function: dispatch_diff_to_worker + * + * Purpose: Sends arguments to a worker task to allow it to start + * processing the differences between two objects. + * + * Return: H5DIFF_NO_ERR on success/H5DIFF_ERR on failure + *------------------------------------------------------------------------- + */ +static diff_err_t +dispatch_diff_to_worker(struct diff_mpi_args *args, char *worker_tasks, int *n_busy_tasks) +{ + int target_task = -1; + diff_err_t ret_value = H5DIFF_NO_ERR; + + /* Must have a free worker task */ + assert(*n_busy_tasks < g_nTasks - 1); + + /* Check array of tasks to see which ones are free. + * Manager task never does work, so workerTasks[0] is + * really worker task 0, or MPI rank 1. + */ + target_task = -1; + for (int n = 1; n < g_nTasks; n++) + if (worker_tasks[n - 1]) { + target_task = n - 1; + break; + } + + /* We should always find a free worker here */ + if (target_task < 0) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't find a free worker task to dispatch diff request to"); + + /* Send diff arguments to worker */ + if (MPI_SUCCESS != (MPI_Send(args, sizeof(struct diff_mpi_args), MPI_BYTE, target_task + 1, MPI_TAG_ARGS, + MPI_COMM_WORLD))) + H5TOOLS_GOTO_ERROR(H5DIFF_ERR, "couldn't send diff arguments to worker task"); + + /* Mark worker task as busy */ + worker_tasks[target_task] = 0; + (*n_busy_tasks)++; + +done: + return ret_value; +} +#endif diff --git a/tools/src/h5diff/ph5diff_main.c b/tools/src/h5diff/ph5diff_main.c index 98e0c1da1b7..07fdb58bc09 100644 --- a/tools/src/h5diff/ph5diff_main.c +++ b/tools/src/h5diff/ph5diff_main.c @@ -48,6 +48,11 @@ main(int argc, char *argv[]) const char *objname2 = NULL; diff_opt_t opts; + MPI_Init(&argc, (char ***)&argv); + + MPI_Comm_rank(MPI_COMM_WORLD, &nID); + MPI_Comm_size(MPI_COMM_WORLD, &g_nTasks); + h5tools_setprogname(PROGRAMNAME); h5tools_setstatus(EXIT_SUCCESS); @@ -57,11 +62,6 @@ main(int argc, char *argv[]) outBuffOffset = 0; g_Parallel = 1; - MPI_Init(&argc, (char ***)&argv); - - MPI_Comm_rank(MPI_COMM_WORLD, &nID); - MPI_Comm_size(MPI_COMM_WORLD, &g_nTasks); - if (g_nTasks == 1) { fprintf(stderr, "Only 1 task available...doing serial diff\n"); @@ -155,6 +155,7 @@ ph5diff_worker(int nID) /* Make certain we've received the filenames and opened the files already */ if (file1_id < 0 || file2_id < 0) { printf("ph5diff_worker: ERROR: work received before/without filenames\n"); + MPI_Abort(MPI_COMM_WORLD, 0); break; } @@ -165,34 +166,44 @@ ph5diff_worker(int nID) diffs.nfound = diff(file1_id, args.name1, file2_id, args.name2, &(args.opts), &(args.argdata)); diffs.not_cmp = args.opts.not_cmp; - /* If print buffer has something in it, request print token.*/ - if (outBuffOffset > 0) { + if ((outBuffOffset == 0) && !overflow_file) + /* Nothing to print. Send diffs to manager */ + MPI_Send(&diffs, sizeof(diffs), MPI_BYTE, 0, MPI_TAG_DONE, MPI_COMM_WORLD); + else { + /* + * If print buffer or overflow file have something in + * them, request print token. + */ MPI_Send(NULL, 0, MPI_BYTE, 0, MPI_TAG_TOK_REQUEST, MPI_COMM_WORLD); /* Wait for print token. */ MPI_Recv(NULL, 0, MPI_BYTE, 0, MPI_TAG_PRINT_TOK, MPI_COMM_WORLD, &Status); - /* When get token, send all of our output to the manager task and then return the token */ - for (i = 0; i < outBuffOffset; i += PRINT_DATA_MAX_SIZE) - MPI_Send(outBuff + i, PRINT_DATA_MAX_SIZE, MPI_CHAR, 0, MPI_TAG_PRINT_DATA, - MPI_COMM_WORLD); + if (outBuffOffset > 0) { + /* When get token, send all of our output to the manager task and then return the token */ + for (i = 0; i < outBuffOffset; i += PRINT_DATA_MAX_SIZE) + MPI_Send(outBuff + i, PRINT_DATA_MAX_SIZE, MPI_CHAR, 0, MPI_TAG_PRINT_DATA, + MPI_COMM_WORLD); + } - /* An overflow file exists, so we send it's output to the manager too and then delete it */ + /* An overflow file exists, so we send its output to + * the manager too and then delete it. + */ if (overflow_file) { - char out_data[PRINT_DATA_MAX_SIZE]; + char out_data[PRINT_DATA_MAX_SIZE + 1]; int tmp; - memset(out_data, 0, PRINT_DATA_MAX_SIZE); + memset(out_data, 0, PRINT_DATA_MAX_SIZE + 1); i = 0; rewind(overflow_file); - while ((tmp = getc(overflow_file)) >= 0) { + while ((tmp = getc(overflow_file)) != EOF) { *(out_data + i++) = (char)tmp; if (i == PRINT_DATA_MAX_SIZE) { MPI_Send(out_data, PRINT_DATA_MAX_SIZE, MPI_CHAR, 0, MPI_TAG_PRINT_DATA, MPI_COMM_WORLD); i = 0; - memset(out_data, 0, PRINT_DATA_MAX_SIZE); + memset(out_data, 0, PRINT_DATA_MAX_SIZE + 1); } } @@ -210,8 +221,6 @@ ph5diff_worker(int nID) MPI_Send(&diffs, sizeof(diffs), MPI_BYTE, 0, MPI_TAG_TOK_RETURN, MPI_COMM_WORLD); } - else - MPI_Send(&diffs, sizeof(diffs), MPI_BYTE, 0, MPI_TAG_DONE, MPI_COMM_WORLD); } /* Check for leaving */ else if (Status.MPI_TAG == MPI_TAG_END) { @@ -220,10 +229,14 @@ ph5diff_worker(int nID) } else { printf("ph5diff_worker: ERROR: invalid tag (%d) received\n", Status.MPI_TAG); + MPI_Abort(MPI_COMM_WORLD, 0); break; } } + H5Fclose(file1_id); + H5Fclose(file2_id); + return; } @@ -241,13 +254,14 @@ void print_manager_output(void) { /* If there was something we buffered, let's print it now */ - if ((outBuffOffset > 0) && g_Parallel) { - printf("%s", outBuff); + if (g_Parallel) { + if (outBuffOffset > 0) + printf("%s", outBuff); if (overflow_file) { int tmp; rewind(overflow_file); - while ((tmp = getc(overflow_file)) >= 0) + while ((tmp = getc(overflow_file)) != EOF) putchar(tmp); fclose(overflow_file); overflow_file = NULL; @@ -257,8 +271,8 @@ print_manager_output(void) memset(outBuff, 0, OUTBUFF_SIZE); outBuffOffset = 0; } - else if ((outBuffOffset > 0) && !g_Parallel) { - fprintf(stderr, "h5diff error: outBuffOffset>0, but we're not in parallel!\n"); + else if (outBuffOffset > 0) { + fprintf(stderr, "h5diff error: outBuffOffset > 0, but we're not in parallel!\n"); } }