Skip to content

Commit

Permalink
Add MDI_Check_for_communicator function
Browse files Browse the repository at this point in the history
  • Loading branch information
taylor-a-barnes committed Jun 26, 2024
1 parent 686690a commit 3a1a0ef
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 28 deletions.
19 changes: 17 additions & 2 deletions MDI_Library/mdi.c
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,21 @@ int MDI_Initialized(int* flag)
return 0;
}

/*! \brief Check if a new communicator is available
*
* The function returns whether there is currently a new communicator that can be accepted via MDI_Accept_communicator.
* If new communicators are available, the function returns \p 1. Otherwise, it returns \p 0.
*
*/
int MDI_Check_for_communicator(int* flag)
{
if ( codes.initialized == 0 ) {
mdi_error("MDI_Check_for_communicator called but MDI has not been initialized");
return 1;
}
return general_check_communicator(flag);
}

/*! \brief Accept a new MDI communicator
*
* The function returns an MDI_Comm that describes a connection between two codes.
Expand All @@ -294,14 +309,14 @@ int MDI_Accept_communicator(MDI_Comm* comm)
int ret;

if ( codes.initialized == 0 ) {
mdi_error("MDI_Accept_Communicator called but MDI has not been initialized");
mdi_error("MDI_Accept_communicator called but MDI has not been initialized");
return 1;
}
*comm = general_accept_communicator();

ret = mdi_debug("[MDI:MDI_Accept_communicator] Comm: %d\n", *comm);
if ( ret != 0 ) {
mdi_error("Error in MDI_Accept_Communicator: mdi_debug failed");
mdi_error("Error in MDI_Accept_communicator: mdi_debug failed");
return ret;
}

Expand Down
1 change: 1 addition & 0 deletions MDI_Library/mdi.h
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ DllExport extern const int MDI_ENGINE;
// functions for handling MDI communication
DllExport int MDI_Init(int* argc, char ***argv);
DllExport int MDI_Initialized(int* flag);
DllExport int MDI_Check_for_communicator(int* flag);
DllExport int MDI_Accept_Communicator(MDI_Comm* comm);
DllExport int MDI_Accept_communicator(MDI_Comm* comm);
DllExport int MDI_Send(const void* buf, int count, MDI_Datatype datatype, MDI_Comm comm);
Expand Down
67 changes: 46 additions & 21 deletions MDI_Library/mdi_general.c
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ int general_init(const char* options) {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in general_init: get_current_code failed");
return 1;
return ret;
}

// Create method objects for each supported method
Expand Down Expand Up @@ -376,6 +376,31 @@ int general_init(const char* options) {
}


/*! \brief Check if a new communicator is available
*
* The function returns whether there is currently a new communicator that can be accepted via MDI_Accept_communicator.
* If new communicators are available, the function returns \p 1. Otherwise, it returns \p 0.
*
*/
int general_check_communicator(int* flag) {
int ret;

code* this_code;
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in general_check_communicator: get_current_code failed");
return ret;
}
method* selected_method;
ret = get_method(codes.current_key, this_code->selected_method_id, &selected_method);
if ( ret != 0 ) {
mdi_error("Error in general_check_communicator: get_method failed");
return ret;
}
return selected_method->on_accept_communicator(flag);
}


/*! \brief Accept a new MDI communicator
*
* The function returns an MDI_Comm that describes a connection between two codes.
Expand All @@ -389,7 +414,7 @@ int general_accept_communicator() {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in general_accept_communicator: get_current_code failed");
return 1;
return ret;
}
method* selected_method;
ret = get_method(codes.current_key, this_code->selected_method_id, &selected_method);
Expand Down Expand Up @@ -422,7 +447,7 @@ int general_send(const void* buf, int count, MDI_Datatype datatype, MDI_Comm com
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in general_send: get_current_code failed");
return 1;
return ret;
}
communicator* this;
ret = get_communicator(codes.current_key, comm, &this);
Expand Down Expand Up @@ -486,7 +511,7 @@ int general_recv(void* buf, int count, MDI_Datatype datatype, MDI_Comm comm) {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in general_recv: get_current_code failed");
return 1;
return ret;
}
communicator* this;
ret = get_communicator(codes.current_key, comm, &this);
Expand Down Expand Up @@ -597,7 +622,7 @@ int general_send_command(const char* buf, MDI_Comm comm) {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in general_send_command: get_current_code failed");
return 1;
return ret;
}
method* selected_method;
ret = get_method(codes.current_key, this_code->selected_method_id, &selected_method);
Expand Down Expand Up @@ -676,7 +701,7 @@ int general_builtin_command(const char* buf, MDI_Comm comm, int* flag) {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in general_builtin_command: get_current_code failed");
return 1;
return ret;
}

// check if this command corresponds to one of MDI's standard built-in commands
Expand Down Expand Up @@ -756,7 +781,7 @@ int general_recv_command(char* buf, MDI_Comm comm) {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in general_recv_command: get_current_code failed");
return 1;
return ret;
}
communicator* this;
ret = get_communicator(codes.current_key, comm, &this);
Expand All @@ -782,7 +807,7 @@ int general_recv_command(char* buf, MDI_Comm comm) {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in general_recv_command: second get_current_code failed");
return 1;
return ret;
}

// only receive on rank 0
Expand Down Expand Up @@ -841,7 +866,7 @@ int register_node(vector* node_vec, const char* node_name)
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in register_node: get_current_code failed");
return 1;
return ret;
}
if ( this_code->intra_rank != 0 ) {
return 0;
Expand All @@ -858,7 +883,7 @@ int register_node(vector* node_vec, const char* node_name)
ret = get_node_index(node_vec, node_name, &node_index);
if ( ret != 0 ) {
mdi_error("Error in register_node: get_node_index failed");
return 1;
return ret;
}
if ( node_index != -1 ) {
mdi_error("This node is already registered");
Expand Down Expand Up @@ -902,7 +927,7 @@ int register_command(vector* node_vec, const char* node_name, const char* comman
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in register_command: get_current_code failed");
return 1;
return ret;
}
if ( this_code->intra_rank != 0 ) {
return 0;
Expand All @@ -925,7 +950,7 @@ int register_command(vector* node_vec, const char* node_name, const char* comman
ret = get_node_index(node_vec, node_name, &node_index);
if ( ret != 0 ) {
mdi_error("Error in register_command: get_node_index failed");
return 1;
return ret;
}
if ( node_index == -1 ) {
mdi_error("Attempting to register a command on an unregistered node");
Expand All @@ -939,7 +964,7 @@ int register_command(vector* node_vec, const char* node_name, const char* comman
ret = get_command_index(target_node, command_name, &command_index);
if ( ret != 0 ) {
mdi_error("Error in register_command: get_command_index failed");
return 1;
return ret;
}
if ( command_index != -1 ) {
mdi_error("This command is already registered for this node");
Expand Down Expand Up @@ -979,7 +1004,7 @@ int register_callback(vector* node_vec, const char* node_name, const char* callb
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in register_callback: get_current_code failed");
return 1;
return ret;
}
if ( this_code->intra_rank != 0 ) {
return 0;
Expand All @@ -1002,7 +1027,7 @@ int register_callback(vector* node_vec, const char* node_name, const char* callb
ret = get_node_index(node_vec, node_name, &node_index);
if ( ret != 0 ) {
mdi_error("Error in register_callback: get_node_index failed");
return 1;
return ret;
}
if ( node_index == -1 ) {
mdi_error("Attempting to register a callback on an unregistered node");
Expand All @@ -1016,7 +1041,7 @@ int register_callback(vector* node_vec, const char* node_name, const char* callb
ret = get_callback_index(target_node, callback_name, &callback_index);
if ( ret != 0 ) {
mdi_error("Error in register_callback: get_callback_index failed");
return 1;
return ret;
}
if ( callback_index != -1 ) {
mdi_error("This callback is already registered for this node");
Expand Down Expand Up @@ -1051,7 +1076,7 @@ int send_command_list(MDI_Comm comm) {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in send_command_list: get_current_code failed");
return 1;
return ret;
}
communicator* this_comm;
ret = get_communicator(codes.current_key, comm, &this_comm);
Expand Down Expand Up @@ -1247,7 +1272,7 @@ int send_node_list(MDI_Comm comm) {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in send_node_list: get_current_code failed");
return 1;
return ret;
}
communicator* this_comm;
ret = get_communicator(codes.current_key, comm, &this_comm);
Expand Down Expand Up @@ -1315,7 +1340,7 @@ int send_ncommands(MDI_Comm comm) {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in send_ncommands: get_current_code failed");
return 1;
return ret;
}
if ( this_code->intra_rank != 0 ) {
mdi_error("Attempting to send command information from the incorrect rank");
Expand Down Expand Up @@ -1352,7 +1377,7 @@ int send_ncallbacks(MDI_Comm comm) {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in send_ncallbacks: get_current_code failed");
return 1;
return ret;
}
if ( this_code->intra_rank != 0 ) {
mdi_error("Attempting to send callback information from the incorrect rank");
Expand Down Expand Up @@ -1389,7 +1414,7 @@ int send_nnodes(MDI_Comm comm) {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in send_nnodes: get_current_code failed");
return 1;
return ret;
}
if ( this_code->intra_rank != 0 ) {
mdi_error("Attempting to send callback information from the incorrect rank");
Expand Down
1 change: 1 addition & 0 deletions MDI_Library/mdi_general.h
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

int general_init_code();
int general_init(const char* options);
int general_check_communicator(int* flag);
int general_accept_communicator();
int general_send(const void* buf, int count, MDI_Datatype datatype, MDI_Comm comm);
int general_recv(void* buf, int count, MDI_Datatype datatype, MDI_Comm comm);
Expand Down
1 change: 1 addition & 0 deletions MDI_Library/mdi_global.h
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ typedef struct element_struct {
typedef struct method_struct {
/*! \brief Function pointer for method initialization work */
int (*on_selection)();
int (*on_check_communicator)(int* flag);
int (*on_accept_communicator)();
int (*on_send_command)(const char*, MDI_Comm_Type, int* skip_flag);
int (*after_send_command)(const char*, MDI_Comm_Type);
Expand Down
33 changes: 32 additions & 1 deletion MDI_Library/mdi_lib.c
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ int enable_plug_support( int code_id ) {
return ret;
}
this_method->on_selection = plug_on_selection;
this_method->on_check_communicator = plug_on_check_communicator;
this_method->on_accept_communicator = plug_on_accept_communicator;
this_method->on_send_command = plug_on_send_command;
this_method->after_send_command = plug_after_send_command;
Expand Down Expand Up @@ -68,6 +69,36 @@ int plug_on_selection() {



/*! \brief Callback when the PLUG method must check whether a new communicator exists */
int plug_on_check_communicator(int* flag) {
int ret;

ret = mdi_debug("[MDI:plug_on_accept_communicator] Start\n");
if ( ret != 0 ) {
mdi_error("Error in plug_on_accept_communicator: mdi_debug failed");
return ret;
}

code* this_code;
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in plug_on_accept_communicator: get_current_code failed");
return ret;
}

// If MDI hasn't returned some connections, do that now
if ( this_code->returned_comms < this_code->next_comm - 1 ) {
*flag = 1;
}
else {
*flag = 0;
}

return 0;
}



/*! \brief Callback when the PLUG method must accept a communicator */
int plug_on_accept_communicator() {
int ret;
Expand All @@ -82,7 +113,7 @@ int plug_on_accept_communicator() {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in plug_on_accept_communicator: get_current_code failed");
return 1;
return ret;
}

// Give the library method an opportunity to update the current code
Expand Down
1 change: 1 addition & 0 deletions MDI_Library/mdi_lib.h
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ typedef struct library_data_struct {

int enable_plug_support(int code_id);
int plug_on_selection();
int plug_on_check_communicator(int* flag);
int plug_on_accept_communicator();
int plug_on_send_command(const char* command, MDI_Comm comm, int* skip_flag);
int plug_after_send_command(const char* command, MDI_Comm comm);
Expand Down
25 changes: 24 additions & 1 deletion MDI_Library/mdi_mpi.c
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ int enable_mpi_support(int code_id) {
return ret;
}
this_method->on_selection = mpi_on_selection;
this_method->on_check_communicator = mpi_on_check_communicator;
this_method->on_accept_communicator = mpi_on_accept_communicator;
this_method->on_send_command = mpi_on_send_command;
this_method->after_send_command = mpi_after_send_command;
Expand Down Expand Up @@ -178,6 +179,28 @@ int mpi_on_selection() {
}


/*! \brief Callback when the MPI method must check whether there is a new communicator */
int mpi_on_check_communicator(int* flag) {
int ret;

code* this_code;
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in mpi_on_accept_communicator: get_current_code failed");
return ret;
}

// If MDI hasn't returned some connections, return true
if ( this_code->returned_comms < this_code->next_comm - 1 ) {
*flag = 1;
}
else {
*flag = 0;
}

return 0;
}


/*! \brief Callback when the MPI method must accept a communicator */
int mpi_on_accept_communicator() {
Expand All @@ -187,7 +210,7 @@ int mpi_on_accept_communicator() {
ret = get_current_code(&this_code);
if ( ret != 0 ) {
mdi_error("Error in mpi_on_accept_communicator: get_current_code failed");
return 1;
return ret;
}

// If MDI hasn't returned some connections, do that now
Expand Down
1 change: 1 addition & 0 deletions MDI_Library/mdi_mpi.h
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ int set_world_rank(int world_rank_in);

int enable_mpi_support(int code_id);
int mpi_on_selection();
int mpi_on_check_communicator(int* flag);
int mpi_on_accept_communicator();
int mpi_on_send_command(const char* command, MDI_Comm comm, int* skip_flag);
int mpi_after_send_command(const char* command, MDI_Comm comm);
Expand Down
Loading

0 comments on commit 3a1a0ef

Please sign in to comment.