From 3a1a0ef55ae6188e852e8627a37ee88b45f289b8 Mon Sep 17 00:00:00 2001 From: Taylor Barnes Date: Wed, 26 Jun 2024 19:56:43 +0000 Subject: [PATCH] Add MDI_Check_for_communicator function --- MDI_Library/mdi.c | 19 ++++++++- MDI_Library/mdi.h | 1 + MDI_Library/mdi_general.c | 67 +++++++++++++++++++++---------- MDI_Library/mdi_general.h | 1 + MDI_Library/mdi_global.h | 1 + MDI_Library/mdi_lib.c | 33 ++++++++++++++- MDI_Library/mdi_lib.h | 1 + MDI_Library/mdi_mpi.c | 25 +++++++++++- MDI_Library/mdi_mpi.h | 1 + MDI_Library/mdi_tcp.c | 84 ++++++++++++++++++++++++++++++++++++++- MDI_Library/mdi_tcp.h | 2 + MDI_Library/mdi_test.c | 25 +++++++++++- MDI_Library/mdi_test.h | 1 + 13 files changed, 233 insertions(+), 28 deletions(-) mode change 100644 => 100755 MDI_Library/mdi.c mode change 100644 => 100755 MDI_Library/mdi.h mode change 100644 => 100755 MDI_Library/mdi_general.c mode change 100644 => 100755 MDI_Library/mdi_general.h mode change 100644 => 100755 MDI_Library/mdi_global.h mode change 100644 => 100755 MDI_Library/mdi_lib.c mode change 100644 => 100755 MDI_Library/mdi_lib.h mode change 100644 => 100755 MDI_Library/mdi_mpi.c mode change 100644 => 100755 MDI_Library/mdi_mpi.h mode change 100644 => 100755 MDI_Library/mdi_tcp.c mode change 100644 => 100755 MDI_Library/mdi_tcp.h mode change 100644 => 100755 MDI_Library/mdi_test.c mode change 100644 => 100755 MDI_Library/mdi_test.h diff --git a/MDI_Library/mdi.c b/MDI_Library/mdi.c old mode 100644 new mode 100755 index 7366152b..604a5c20 --- a/MDI_Library/mdi.c +++ b/MDI_Library/mdi.c @@ -271,6 +271,21 @@ int MDI_Initialized(int* flag) return 0; } +/*! \brief Check if a new communicator is available + * + * The function returns whether there is currently a new communicator that can be accepted via MDI_Accept_communicator. + * If new communicators are available, the function returns \p 1. Otherwise, it returns \p 0. + * + */ +int MDI_Check_for_communicator(int* flag) +{ + if ( codes.initialized == 0 ) { + mdi_error("MDI_Check_for_communicator called but MDI has not been initialized"); + return 1; + } + return general_check_communicator(flag); +} + /*! \brief Accept a new MDI communicator * * The function returns an MDI_Comm that describes a connection between two codes. @@ -294,14 +309,14 @@ int MDI_Accept_communicator(MDI_Comm* comm) int ret; if ( codes.initialized == 0 ) { - mdi_error("MDI_Accept_Communicator called but MDI has not been initialized"); + mdi_error("MDI_Accept_communicator called but MDI has not been initialized"); return 1; } *comm = general_accept_communicator(); ret = mdi_debug("[MDI:MDI_Accept_communicator] Comm: %d\n", *comm); if ( ret != 0 ) { - mdi_error("Error in MDI_Accept_Communicator: mdi_debug failed"); + mdi_error("Error in MDI_Accept_communicator: mdi_debug failed"); return ret; } diff --git a/MDI_Library/mdi.h b/MDI_Library/mdi.h old mode 100644 new mode 100755 index 17b64ee4..4fd487df --- a/MDI_Library/mdi.h +++ b/MDI_Library/mdi.h @@ -86,6 +86,7 @@ DllExport extern const int MDI_ENGINE; // functions for handling MDI communication DllExport int MDI_Init(int* argc, char ***argv); DllExport int MDI_Initialized(int* flag); +DllExport int MDI_Check_for_communicator(int* flag); DllExport int MDI_Accept_Communicator(MDI_Comm* comm); DllExport int MDI_Accept_communicator(MDI_Comm* comm); DllExport int MDI_Send(const void* buf, int count, MDI_Datatype datatype, MDI_Comm comm); diff --git a/MDI_Library/mdi_general.c b/MDI_Library/mdi_general.c old mode 100644 new mode 100755 index 49745809..6921a85b --- a/MDI_Library/mdi_general.c +++ b/MDI_Library/mdi_general.c @@ -61,7 +61,7 @@ int general_init(const char* options) { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in general_init: get_current_code failed"); - return 1; + return ret; } // Create method objects for each supported method @@ -376,6 +376,31 @@ int general_init(const char* options) { } +/*! \brief Check if a new communicator is available + * + * The function returns whether there is currently a new communicator that can be accepted via MDI_Accept_communicator. + * If new communicators are available, the function returns \p 1. Otherwise, it returns \p 0. + * + */ +int general_check_communicator(int* flag) { + int ret; + + code* this_code; + ret = get_current_code(&this_code); + if ( ret != 0 ) { + mdi_error("Error in general_check_communicator: get_current_code failed"); + return ret; + } + method* selected_method; + ret = get_method(codes.current_key, this_code->selected_method_id, &selected_method); + if ( ret != 0 ) { + mdi_error("Error in general_check_communicator: get_method failed"); + return ret; + } + return selected_method->on_accept_communicator(flag); +} + + /*! \brief Accept a new MDI communicator * * The function returns an MDI_Comm that describes a connection between two codes. @@ -389,7 +414,7 @@ int general_accept_communicator() { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in general_accept_communicator: get_current_code failed"); - return 1; + return ret; } method* selected_method; ret = get_method(codes.current_key, this_code->selected_method_id, &selected_method); @@ -422,7 +447,7 @@ int general_send(const void* buf, int count, MDI_Datatype datatype, MDI_Comm com ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in general_send: get_current_code failed"); - return 1; + return ret; } communicator* this; ret = get_communicator(codes.current_key, comm, &this); @@ -486,7 +511,7 @@ int general_recv(void* buf, int count, MDI_Datatype datatype, MDI_Comm comm) { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in general_recv: get_current_code failed"); - return 1; + return ret; } communicator* this; ret = get_communicator(codes.current_key, comm, &this); @@ -597,7 +622,7 @@ int general_send_command(const char* buf, MDI_Comm comm) { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in general_send_command: get_current_code failed"); - return 1; + return ret; } method* selected_method; ret = get_method(codes.current_key, this_code->selected_method_id, &selected_method); @@ -676,7 +701,7 @@ int general_builtin_command(const char* buf, MDI_Comm comm, int* flag) { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in general_builtin_command: get_current_code failed"); - return 1; + return ret; } // check if this command corresponds to one of MDI's standard built-in commands @@ -756,7 +781,7 @@ int general_recv_command(char* buf, MDI_Comm comm) { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in general_recv_command: get_current_code failed"); - return 1; + return ret; } communicator* this; ret = get_communicator(codes.current_key, comm, &this); @@ -782,7 +807,7 @@ int general_recv_command(char* buf, MDI_Comm comm) { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in general_recv_command: second get_current_code failed"); - return 1; + return ret; } // only receive on rank 0 @@ -841,7 +866,7 @@ int register_node(vector* node_vec, const char* node_name) ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in register_node: get_current_code failed"); - return 1; + return ret; } if ( this_code->intra_rank != 0 ) { return 0; @@ -858,7 +883,7 @@ int register_node(vector* node_vec, const char* node_name) ret = get_node_index(node_vec, node_name, &node_index); if ( ret != 0 ) { mdi_error("Error in register_node: get_node_index failed"); - return 1; + return ret; } if ( node_index != -1 ) { mdi_error("This node is already registered"); @@ -902,7 +927,7 @@ int register_command(vector* node_vec, const char* node_name, const char* comman ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in register_command: get_current_code failed"); - return 1; + return ret; } if ( this_code->intra_rank != 0 ) { return 0; @@ -925,7 +950,7 @@ int register_command(vector* node_vec, const char* node_name, const char* comman ret = get_node_index(node_vec, node_name, &node_index); if ( ret != 0 ) { mdi_error("Error in register_command: get_node_index failed"); - return 1; + return ret; } if ( node_index == -1 ) { mdi_error("Attempting to register a command on an unregistered node"); @@ -939,7 +964,7 @@ int register_command(vector* node_vec, const char* node_name, const char* comman ret = get_command_index(target_node, command_name, &command_index); if ( ret != 0 ) { mdi_error("Error in register_command: get_command_index failed"); - return 1; + return ret; } if ( command_index != -1 ) { mdi_error("This command is already registered for this node"); @@ -979,7 +1004,7 @@ int register_callback(vector* node_vec, const char* node_name, const char* callb ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in register_callback: get_current_code failed"); - return 1; + return ret; } if ( this_code->intra_rank != 0 ) { return 0; @@ -1002,7 +1027,7 @@ int register_callback(vector* node_vec, const char* node_name, const char* callb ret = get_node_index(node_vec, node_name, &node_index); if ( ret != 0 ) { mdi_error("Error in register_callback: get_node_index failed"); - return 1; + return ret; } if ( node_index == -1 ) { mdi_error("Attempting to register a callback on an unregistered node"); @@ -1016,7 +1041,7 @@ int register_callback(vector* node_vec, const char* node_name, const char* callb ret = get_callback_index(target_node, callback_name, &callback_index); if ( ret != 0 ) { mdi_error("Error in register_callback: get_callback_index failed"); - return 1; + return ret; } if ( callback_index != -1 ) { mdi_error("This callback is already registered for this node"); @@ -1051,7 +1076,7 @@ int send_command_list(MDI_Comm comm) { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in send_command_list: get_current_code failed"); - return 1; + return ret; } communicator* this_comm; ret = get_communicator(codes.current_key, comm, &this_comm); @@ -1247,7 +1272,7 @@ int send_node_list(MDI_Comm comm) { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in send_node_list: get_current_code failed"); - return 1; + return ret; } communicator* this_comm; ret = get_communicator(codes.current_key, comm, &this_comm); @@ -1315,7 +1340,7 @@ int send_ncommands(MDI_Comm comm) { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in send_ncommands: get_current_code failed"); - return 1; + return ret; } if ( this_code->intra_rank != 0 ) { mdi_error("Attempting to send command information from the incorrect rank"); @@ -1352,7 +1377,7 @@ int send_ncallbacks(MDI_Comm comm) { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in send_ncallbacks: get_current_code failed"); - return 1; + return ret; } if ( this_code->intra_rank != 0 ) { mdi_error("Attempting to send callback information from the incorrect rank"); @@ -1389,7 +1414,7 @@ int send_nnodes(MDI_Comm comm) { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in send_nnodes: get_current_code failed"); - return 1; + return ret; } if ( this_code->intra_rank != 0 ) { mdi_error("Attempting to send callback information from the incorrect rank"); diff --git a/MDI_Library/mdi_general.h b/MDI_Library/mdi_general.h old mode 100644 new mode 100755 index dc3ded7c..234f3c5e --- a/MDI_Library/mdi_general.h +++ b/MDI_Library/mdi_general.h @@ -11,6 +11,7 @@ int general_init_code(); int general_init(const char* options); +int general_check_communicator(int* flag); int general_accept_communicator(); int general_send(const void* buf, int count, MDI_Datatype datatype, MDI_Comm comm); int general_recv(void* buf, int count, MDI_Datatype datatype, MDI_Comm comm); diff --git a/MDI_Library/mdi_global.h b/MDI_Library/mdi_global.h old mode 100644 new mode 100755 index 833cf3a2..b070aeee --- a/MDI_Library/mdi_global.h +++ b/MDI_Library/mdi_global.h @@ -119,6 +119,7 @@ typedef struct element_struct { typedef struct method_struct { /*! \brief Function pointer for method initialization work */ int (*on_selection)(); + int (*on_check_communicator)(int* flag); int (*on_accept_communicator)(); int (*on_send_command)(const char*, MDI_Comm_Type, int* skip_flag); int (*after_send_command)(const char*, MDI_Comm_Type); diff --git a/MDI_Library/mdi_lib.c b/MDI_Library/mdi_lib.c old mode 100644 new mode 100755 index 0f5319f3..915291d3 --- a/MDI_Library/mdi_lib.c +++ b/MDI_Library/mdi_lib.c @@ -37,6 +37,7 @@ int enable_plug_support( int code_id ) { return ret; } this_method->on_selection = plug_on_selection; + this_method->on_check_communicator = plug_on_check_communicator; this_method->on_accept_communicator = plug_on_accept_communicator; this_method->on_send_command = plug_on_send_command; this_method->after_send_command = plug_after_send_command; @@ -68,6 +69,36 @@ int plug_on_selection() { +/*! \brief Callback when the PLUG method must check whether a new communicator exists */ +int plug_on_check_communicator(int* flag) { + int ret; + + ret = mdi_debug("[MDI:plug_on_accept_communicator] Start\n"); + if ( ret != 0 ) { + mdi_error("Error in plug_on_accept_communicator: mdi_debug failed"); + return ret; + } + + code* this_code; + ret = get_current_code(&this_code); + if ( ret != 0 ) { + mdi_error("Error in plug_on_accept_communicator: get_current_code failed"); + return ret; + } + + // If MDI hasn't returned some connections, do that now + if ( this_code->returned_comms < this_code->next_comm - 1 ) { + *flag = 1; + } + else { + *flag = 0; + } + + return 0; +} + + + /*! \brief Callback when the PLUG method must accept a communicator */ int plug_on_accept_communicator() { int ret; @@ -82,7 +113,7 @@ int plug_on_accept_communicator() { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in plug_on_accept_communicator: get_current_code failed"); - return 1; + return ret; } // Give the library method an opportunity to update the current code diff --git a/MDI_Library/mdi_lib.h b/MDI_Library/mdi_lib.h old mode 100644 new mode 100755 index 5a51a8ab..96a9163b --- a/MDI_Library/mdi_lib.h +++ b/MDI_Library/mdi_lib.h @@ -118,6 +118,7 @@ typedef struct library_data_struct { int enable_plug_support(int code_id); int plug_on_selection(); +int plug_on_check_communicator(int* flag); int plug_on_accept_communicator(); int plug_on_send_command(const char* command, MDI_Comm comm, int* skip_flag); int plug_after_send_command(const char* command, MDI_Comm comm); diff --git a/MDI_Library/mdi_mpi.c b/MDI_Library/mdi_mpi.c old mode 100644 new mode 100755 index 83273989..edc9d97e --- a/MDI_Library/mdi_mpi.c +++ b/MDI_Library/mdi_mpi.c @@ -70,6 +70,7 @@ int enable_mpi_support(int code_id) { return ret; } this_method->on_selection = mpi_on_selection; + this_method->on_check_communicator = mpi_on_check_communicator; this_method->on_accept_communicator = mpi_on_accept_communicator; this_method->on_send_command = mpi_on_send_command; this_method->after_send_command = mpi_after_send_command; @@ -178,6 +179,28 @@ int mpi_on_selection() { } +/*! \brief Callback when the MPI method must check whether there is a new communicator */ +int mpi_on_check_communicator(int* flag) { + int ret; + + code* this_code; + ret = get_current_code(&this_code); + if ( ret != 0 ) { + mdi_error("Error in mpi_on_accept_communicator: get_current_code failed"); + return ret; + } + + // If MDI hasn't returned some connections, return true + if ( this_code->returned_comms < this_code->next_comm - 1 ) { + *flag = 1; + } + else { + *flag = 0; + } + + return 0; +} + /*! \brief Callback when the MPI method must accept a communicator */ int mpi_on_accept_communicator() { @@ -187,7 +210,7 @@ int mpi_on_accept_communicator() { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in mpi_on_accept_communicator: get_current_code failed"); - return 1; + return ret; } // If MDI hasn't returned some connections, do that now diff --git a/MDI_Library/mdi_mpi.h b/MDI_Library/mdi_mpi.h old mode 100644 new mode 100755 index fb09417d..c8045037 --- a/MDI_Library/mdi_mpi.h +++ b/MDI_Library/mdi_mpi.h @@ -23,6 +23,7 @@ int set_world_rank(int world_rank_in); int enable_mpi_support(int code_id); int mpi_on_selection(); +int mpi_on_check_communicator(int* flag); int mpi_on_accept_communicator(); int mpi_on_send_command(const char* command, MDI_Comm comm, int* skip_flag); int mpi_after_send_command(const char* command, MDI_Comm comm); diff --git a/MDI_Library/mdi_tcp.c b/MDI_Library/mdi_tcp.c old mode 100644 new mode 100755 index eec1cdea..a6a5ab16 --- a/MDI_Library/mdi_tcp.c +++ b/MDI_Library/mdi_tcp.c @@ -16,6 +16,7 @@ #include #include #include +#include #include "mdi.h" #include "mdi_tcp.h" #include "mdi_global.h" @@ -55,6 +56,7 @@ int enable_tcp_support(int code_id) { return ret; } this_method->on_selection = tcp_on_selection; + this_method->on_check_communicator = tcp_on_check_communicator; this_method->on_accept_communicator = tcp_on_accept_communicator; this_method->on_send_command = tcp_on_send_command; this_method->after_send_command = tcp_after_send_command; @@ -123,6 +125,47 @@ int tcp_on_selection() { +/*! \brief Callback when the TCP method must check whether a new communicator exists */ +int tcp_on_check_communicator(int* flag) { + int ret; + + code* this_code; + ret = get_current_code(&this_code); + if ( ret != 0 ) { + mdi_error("Error in tcp_on_accept_communicator: get_current_code failed"); + return ret; + } + + // If MDI hasn't returned some connections, return true + if ( this_code->returned_comms < this_code->next_comm - 1 ) { + *flag = 1; + return 0; + } + + // Check for any production codes connecting via TCP + if ( this_code->tcp_socket > 0 ) { + /* + // Accept a connection via TCP + // NOTE: If this is not intra_rank==0, this will always create a dummy communicator + int size_before = this_code->comms->size; + tcp_accept_connection(); + int size_after = this_code->comms->size; + + // if MDI hasn't returned some connections, return true + if ( size_before < size_after ) { + *flag = 1; + return 0; + } + */ + tcp_check_for_connection(flag); + return 0; + } + + *flag = 0; + return 0; +} + + /*! \brief Callback when the TCP method must accept a communicator */ int tcp_on_accept_communicator() { int ret; @@ -131,7 +174,7 @@ int tcp_on_accept_communicator() { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in tcp_on_accept_communicator: get_current_code failed"); - return 1; + return ret; } // If MDI hasn't returned some connections, do that now @@ -413,6 +456,43 @@ int tcp_request_connection(int port_in, char* hostname_ptr) { } +/*! \brief Accept a TCP connection request + */ +int tcp_check_for_connection(int* flag) { + int ret; + + code* this_code; + ret = get_current_code(&this_code); + if ( ret != 0 ) { + mdi_error("Error in tcp_check_for_connection: get_current_code failed"); + return ret; + } + + struct pollfd poll_args; + poll_args.fd = this_code->tcp_socket; + poll_args.events = POLLIN; + + if ( this_code->intra_rank == 0 ) { // Running on rank 0 + + ret = poll(&poll_args, 1, 0); + if (ret > 0) { + if ( poll_args.revents & POLLIN ) { + *flag = 1; + return 0; + } + } + else if ( ret == -1 ) { + mdi_error("Error in tcp_check_for_connection: poll failed"); + return ret; + } + *flag = 0; + + } + + return 0; +} + + /*! \brief Accept a TCP connection request */ int tcp_accept_connection() { @@ -423,7 +503,7 @@ int tcp_accept_connection() { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in tcp_accept_connection: get_current_code failed"); - return 1; + return ret; } if ( this_code->intra_rank == 0 ) { // Running on rank 0 diff --git a/MDI_Library/mdi_tcp.h b/MDI_Library/mdi_tcp.h old mode 100644 new mode 100755 index e406a5b7..dc466562 --- a/MDI_Library/mdi_tcp.h +++ b/MDI_Library/mdi_tcp.h @@ -13,6 +13,7 @@ void sigint_handler(int dummy); int enable_tcp_support(int code_id); int tcp_on_selection(); +int tcp_on_check_communicator(int* flag); int tcp_on_accept_communicator(); int tcp_on_send_command(const char* command, MDI_Comm comm, int* skip_flag); int tcp_after_send_command(const char* command, MDI_Comm comm); @@ -21,6 +22,7 @@ int tcp_on_recv_command(MDI_Comm comm); int tcp_listen(int port_in); int tcp_request_connection(int port_in, char* hostname_ptr); +int tcp_check_for_connection(int* flag); int tcp_accept_connection(); int tcp_send(const void* buf, int count, MDI_Datatype datatype, MDI_Comm comm, int msg_flag); int tcp_recv(void* buf, int count, MDI_Datatype datatype, MDI_Comm comm, int msg_flag); diff --git a/MDI_Library/mdi_test.c b/MDI_Library/mdi_test.c old mode 100644 new mode 100755 index 90ff476f..e2a574e4 --- a/MDI_Library/mdi_test.c +++ b/MDI_Library/mdi_test.c @@ -28,6 +28,7 @@ int enable_test_support(int code_id) { return ret; } this_method->on_selection = test_on_selection; + this_method->on_check_communicator = test_on_check_communicator; this_method->on_accept_communicator = test_on_accept_communicator; this_method->on_send_command = test_on_send_command; this_method->after_send_command = test_after_send_command; @@ -60,6 +61,28 @@ int test_on_selection() { } +/*! \brief Callback when the TEST method must check whether a new communicator exists */ +int test_on_check_communicator(int* flag) { + int ret; + + code* this_code; + ret = get_current_code(&this_code); + if ( ret != 0 ) { + mdi_error("Error in test_on_accept_communicator: get_current_code failed"); + return ret; + } + + // If MDI hasn't returned some connections, do that now + if ( this_code->returned_comms < this_code->next_comm - 1 ) { + *flag = 1; + } + else { + *flag = 0; + } + + return 0; +} + /*! \brief Callback when the TEST method must accept a communicator */ int test_on_accept_communicator() { @@ -69,7 +92,7 @@ int test_on_accept_communicator() { ret = get_current_code(&this_code); if ( ret != 0 ) { mdi_error("Error in test_on_accept_communicator: get_current_code failed"); - return 1; + return ret; } // If MDI hasn't returned some connections, do that now diff --git a/MDI_Library/mdi_test.h b/MDI_Library/mdi_test.h old mode 100644 new mode 100755 index 2bff0d81..e1f12658 --- a/MDI_Library/mdi_test.h +++ b/MDI_Library/mdi_test.h @@ -10,6 +10,7 @@ int enable_test_support(int code_id); int test_on_selection(); +int test_on_check_communicator(int* flag); int test_on_accept_communicator(); int test_on_send_command(const char* command, MDI_Comm comm, int* skip_flag); int test_after_send_command(const char* command, MDI_Comm comm);